Blog>>Networks>>Kubernetes networking>>How to create a custom resource with Kubernetes Operator

How to create a custom resource with Kubernetes Operator

While developing projects on the Kubernetes platform I came across an interesting problem. I had quite a few scripts that ran in containers and needed to be triggered only once on every node in my Kubernetes cluster. This could not be solved using default Kubernetes resources such as DaemonSet and Job. So I decided to write my own resource using Kubernetes Operator Framework. How I went about it is the subject of this blog post.

When I confronted this problem, my first thought was to use a DaemonSet resource that utilizes initContainers and then starts a dummy busybox container running tail -f /dev/null or another command that does nothing. However, after analyzing the question more thoroughly, I realized that it would be very problematic on a production setup with many such DaemonSets. Each dummy container eats up resources to execute a useless task. Moreover, from the perspective of the cluster administrator, such a DaemonSet seems exactly the same as other legitimate services. Should an error occur, it may be confusing and require a deep dive into running resources.

My second idea was to use a simple Job with a huge number of Completions and Parallelism      link-icon. Using anti-affinity      link-icon specification to ensure that only one Pod will run on a single node at a time, I would achieve a simulated effect of the DaemonSet’s behavior. Though a new node would appear in the cluster, a new Pod would be scheduled.

Yet this idea, with a fixed number of Completions, seemed a bit amateurish. Even if I defined it to be 1000—what if a 1001st node were to appear in the cluster? Further, from the perspective of the cluster administrator, such a Job seems like a defective resource that cannot finish all Completions.

Creating a custom resource in Kubernetes

But if I could go a bit more low-level and create a resource that merges the functionality of the DaemonSet and the Job? I went online to find out whether someone has already created such a resource. But all I turned up were a few Issues on Kubernetes GitHub posted by people who’d had the same problem. For example, in this issue      link-icon the idea of adding such a DaemonJob to the official Kubernetes resources has been discussed for over two years, but a conclusion has remained elusive. Long story short, I would be forced to create such a resource by myself. I chose to use Operator Framework and code a resource that would be defined in a very similar style as a standard Kubernetes Job resource but would automatically schedule a pod on every node, à la DaemonSet. Before I go further, let’s have a brief overview of the Kubernetes operator architecture.

Ebook Application networking in Kubernetes

An overview of Kubernetes operator architecture

A Kubernetes operator has two main components: Custom Resource Definition (CRD) and Operator Controller. Custom Resource Definition is a standard Kubernetes resource that allows users to extend Kubernetes API with new resources. However, this only enables users to apply manifests of newly created resources but will have no effect on the cluster. This is because Kubernetes architecture contains the Kubernetes controller which has all the logic behind every resource in the cluster. Since a user-defined resource has not been coded in the built-in Kubernetes controller, the user has to provide its controller separately. This is where the operator controller comes up, usually deployed as a separate pod on the cluster with code that contains logic on how to manage custom resources.

The controller has the main parts which are important for every operator developer. The first is the definition of what the controller watches. It defines changes in specific resources or events that will trigger the operator’s control logic. For example, if a custom resource’s work is based on information provided in ConfigMap, then the operator should watch changes of ConfigMap to trigger code on every change and adjust the resource to the new configuration. The other important part is the Reconcile loop. This is in fact a function that contains the whole logic behind the operator and is triggered by changes in the resources being watched.

Here, you can read more about our environment services.

How to start working with Kubernetes Operator Framework

Development of such a project code template may seem a tremendous challenge for a developer. However, Operator Framework provides a clever CLI tool called operator-sdk      link-icon, which will do most of the job. Whole code infrastructure may be generated with just two commands and then the developer just edits specific structures or functions to adjust the resource-specific logic.

Operator Framework allows users to create multiple Kubernetes custom resources in the scope of a single operator. In such a case, every resource is managed by a single controller, which can be used to build more complicated projects. These can often have multiple components, with each deployed as a separate custom resource. In order to initialize an empty operator, you may just run simply operator-sdk init (docs)      link-icon command with proper flags for our project and the whole basic architecture will be created.

Having initialized the operator, you can now add code for specific resources. For the problem described in this blogpost, I will create just one resource. The new resource can be added with the command operator-sdk create api (docs)      link-icon, which will generate API code for the specific resource described with passed parameters that are additional flags added after a CLI command that specify what should exactly happen after running this command. This means the developer will be able to specify only the structure of a custom resource, but will not have code generated for the controller part. To additionally generate controller code it’s necessary to provide the flag --controller=True  when creating the API. Why does operator-sdk not do that by default? In more advanced projects that periodically release new versions of the product, the API may change and leave old manifests not working—and all customers using the old manifests facing the need to migrate to the new version. However, because a new API can be created, both the old and new API of a resource are supported, and both versions can be managed with a single controller, at least until legacy support is no longer required.

Having a ready project structure for development of a new custom resource, you can start writing the code. All definitions of API should be written in files with names ending with _types.go located under api/<API version> directory. And all controllers code is located under controllers/ directory in files with names ending with _controller.go. These are the main two file types where developers should edit code. All other files are generated automatically. When a developer finishes editing code of the custom resource then all he has to do is run the make manifests command and the operator code will be ready to be installed on the cluster(docs)      link-icon.

DeamonJob step-by-step

Given all this, I started implementing the idea of a DaemonJob. The API was the easy part—just remove the Completions and Parallelism fields from the Job API and leave everything as is. The API code in the Kubernetes resources is written as Go structs and most of it looks the same (code)      link-icon. From the user’s perspective, a sample manifest to be applied on the cluster would look like this:

apiVersion: dj.dysproz.io/v1
kind: DaemonJob
metadata:
  name: daemonjob-sample
spec:
  template:
    spec:
      containers:
       - name: test-job
         image: busybox
         command:
          - sleep
          - "20"
      nodeSelector:
        app: v1
      restartPolicy: OnFailure

In this example, on every node which has label app=v1, a busybox container will be created and will run a sleep command for 20 seconds. As you may be able to see, the above manifest is very similar to one users would create for a simple Kubernetes Job resource.

However, the logic behind the controller for such a resource presented a more daunting challenge (code)      link-icon.

It required, first of all, that the resources to watch be defined. In this case, the operator would have to look just for changes on nodes which may seem fairly easy.  This brings me to the first problem. Kubernetes operator code is usually designed to create resources that are owned by a single custom resource and watch only these resources. For example, if a custom resource creates ConfigMap with configuration and watches for ConfigMap, then it should look only for changes on ConfigMaps that are owned by this custom resource.

When a watched resource triggers the Reconcile loop of custom resources, an argument request is passed containing two fields: custom resource name and namespace. Although nodes in the Kubernetes cluster are represented as regular resources, there is no built-in function to watch specific resources without ownership of a custom resource. As a result, the Reconcile loop would be triggered with the request name pointing to a node that triggered action rather than to a specific custom resource.

To keep up with the original design of developing operators, it is necessary to build a custom method that will trigger the Reconcile loop with a proper request name for every DaemonJob resource when any node changes its state. This can be achieved with the following code:

func (r *DaemonJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
    if err := ctrl.NewControllerManagedBy(mgr).
        For(&djv1.DaemonJob{}).
        Complete(r); err != nil {
        return err
    }
 
    if err := ctrl.NewControllerManagedBy(mgr).
        For(&corev1.Node{}).
        Watches(&source.Kind{Type: &corev1.Node{}}, &handler.EnqueueRequestsFromMapFunc{
            ToRequests: handler.ToRequestsFunc(func(nodeObject handler.MapObject) []reconcile.Request {
                var djObjects djv1.DaemonJobList
                _ = mgr.GetClient().List(context.TODO(), &djObjects)
                var requests = []reconcile.Request{}
                for _, djObject := range djObjects.Items {
                    requests = append(requests, reconcile.Request{
                        NamespacedName: types.NamespacedName{
                            Name:      djObject.Name,
                            Namespace: djObject.Namespace,
                        },
                    })
                }
                return requests
            }),
        }).
        Complete(r); err != nil {
        return err
    }
 
    return nil
}

The first operation adds a watch for the custom resource itself. This means that a Reconcile loop will be triggered whenever a new manifest with this resource is applied on the cluster or edited by the user during runtime.

The second operation defines a custom watch for changes in node resources. If any node changes its state, an anonymous function is triggered. The anonymous function lists all DaemonJob resources that exist in the cluster and then prepares requests for every resource found. As a result, node watch will simultaneously trigger a set of requests to reconcile DaemonJob resources and update their state.

Once the logic behind the proper triggers of the Reconcile loop is handled, it is time to concentrate on the Reconcile loop itself.

The Controller logic is based on two steps. The first is to define how many nodes are currently in the cluster and which nodes should run the Job (which may be defined with, for example, a nodeSelector field in the manifest). The second step is to prepare a Job resource that will have defined that number as the number of Completions.

Starting the Reconcile loop by setting some basic variables, it has to gather the filled structure of the DaemonJob resource applied in the cluster to be used to fill out the Job resource. In this way, you create a data structure to which you can apply the logic embedded in the code.

func (r *DaemonJobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
    _ = context.Background()
    _ = r.Log.WithValues("daemonjob", req.NamespacedName)
    r.Log.Info("Reconciling DaemonJob", "request name", req.Name, "request namespace", req.Namespace)
    instance := &djv1.DaemonJob{}
    instanceType := "daemonjob"
    ctx := context.TODO()
 
    if err := r.Client.Get(ctx, req.NamespacedName, instance); err != nil {
        if errors.IsNotFound(err) {
            return reconcile.Result{}, nil
        }
        return reconcile.Result{}, err
    }
 
    if !instance.GetDeletionTimestamp().IsZero() {
        return reconcile.Result{}, nil
    }

Next, a selector needs to be prepared to filter nodes based on the selector passed in DaemonJob manifest and list all nodes that apply to these constraints. The length of the gathered nodes will be the number of desired completions of the final Job.

nodeSelector := client.MatchingLabels{}
if instance.Spec.Template.Spec.NodeSelector != nil {
    nodeSelector = instance.Spec.Template.Spec.NodeSelector
}
var nodesListOptions client.MatchingLabels = nodeSelector
var nodes corev1.NodeList
if err := r.Client.List(ctx, &nodes, nodesListOptions); err != nil && errors.IsNotFound(err) {
    return reconcile.Result{}, nil
}
jobReplicas := int32(len(nodes.Items))

With all necessary values for a Job resource, the structure desired can be filled out:

func getJob(instance *djv1.DaemonJob, replicas *int32, reqName, instanceType string) *batchv1.Job {
    var jobAffinity = corev1.Affinity{
        PodAntiAffinity: &corev1.PodAntiAffinity{
            RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{{
                LabelSelector: &metav1.LabelSelector{
                    MatchExpressions: []metav1.LabelSelectorRequirement{{
                        Key:      instanceType,
                        Operator: "In",
                        Values:   []string{reqName},
                    }},
                },
                TopologyKey: "kubernetes.io/hostname",
            }},
        },
    }
 
    var podSpec = instance.Spec.Template
    podSpec.Spec.Affinity = &jobAffinity
 
    if podSpec.Spec.RestartPolicy == "Always" {
        podSpec.Spec.RestartPolicy = "OnFailure"
    }
 
    return &batchv1.Job{
        TypeMeta: metav1.TypeMeta{
            Kind:       "Job",
            APIVersion: "batch/v1",
        },
        ObjectMeta: metav1.ObjectMeta{
            Name:      instance.Name + "-job",
            Namespace: instance.Namespace,
            Labels:    instance.Labels,
        },
        Spec: batchv1.JobSpec{
            Parallelism:             replicas,
            Completions:             replicas,
            Selector:                instance.Spec.Selector,
            Template:                podSpec,
            ManualSelector:          instance.Spec.ManualSelector,
            TTLSecondsAfterFinished: instance.Spec.TTLSecondsAfterFinished,
            BackoffLimit:            instance.Spec.BackoffLimit,
            ActiveDeadlineSeconds:   instance.Spec.ActiveDeadlineSeconds,
        },
    }
}

Here, aside from copying the same values as were passed in the manifest to fields that do not vary, it is important to specify three elements. The first is the anti-affinity. The above code specifies that a pod created from a Job cannot run on the same node another Pod is running on. To achieve this affinity, the mechanism uses the built-in node label kubernetes.io/hostname which contains the hostname of a node and ensures that every pod created from a Job resource will have a unique hostname value. This results in running only one pod on a single node.

Apart from setting anti-affinity, the function writes a number of nodes in the cluster to schedule Job pods into the Completions and Parallelism fields. Other fields are filled based on values the user passes on to the DaemonJob manifest.

Webinar Application networking in Kubernetes

Final steps

In the last step, it is vital to create a Job in the cluster or update of an already existing Job. Usually, this can be handled with a simple CreateOrUpdate method from sigs.k8s.io/controller-runtime package. CreateOrUpdate method ensures that a specified resource exists on the cluster and, if it does, that it has specified field values. However, Job resource does not allow you to update the Completions or Parallelism fields on existing resources. So here's a little trick that will allow you to edit these parameters.

var clusterJob batchv1.Job
clusterJob.ObjectMeta = job.ObjectMeta
_, err = ctrl.CreateOrUpdate(ctx, r, &clusterJob, func() error {
    modifyJob(job, &clusterJob)
    return controllerutil.SetControllerReference(instance, &clusterJob, r.Scheme)
})
if err != nil {
    if errors.IsInvalid(err) {
        _ = r.Client.Delete(ctx, &batchv1.Job{ObjectMeta: metav1.ObjectMeta{Name: job.Name, Namespace: job.Namespace}}, client.PropagationPolicy("Background"))
        return reconcile.Result{RequeueAfter: 5}, nil
    }
    return reconcile.Result{}, err
}

When the CreateOrUpdate function fails with an error IsInvalid (which indicates that an update on an existing resource could not be completed), the resource is deleted and the Reconcile loop rescheduled after five seconds.

Summary

As you can see, this fully working resource was created with Kubernetes operator. It can now be applied to the cluster and used together with other standard Kubernetes resources to develop advanced projects on the Kubernetes platform. I hope that my blog post will help you do just that. The full code can be found in my GitHub repository      link-icon.

Krasuski Szymon

Szymon Krasuski

Software Engineer

Szymon is a Software Engineer at CodiLime. On a daily basis he develops networking solutions for cloud infrastructures, usually based on Kubernetes. He’s also a huge fan of cinematography and architecture.Read about author >

Read also

Get your project estimate

For businesses that need support in their software or network engineering projects, please fill in the form and we'll get back to you within one business day.