How to Deploy Automated & Isolated Airflow Test Environments with Gitlab Bot on Kubernetes

How to Deploy Automated & Isolated Airflow Test Environments with Gitlab Bot on Kubernetes

Background

In Akros Technologies, every engineer uses Apache Airflow as a workflow scheduler for various tasks such as running alpha-inference, portfolio operation, and building financial data pipelines.

When we first deployed Apache Airflow on our RKE2 Kubernetes cluster, everything seemed flawless in the beginning, however, soon later we started to face problems, especially when multiple engineers were developing and testing Airflow DAGs on the same repository.

One problem was that making changes to an already running Airflow DAG could affect this DAG in a way that either a scheduler changing the task status to removed or new tasks being executed within the already running DAG.

As git-sync is enabled (Git integration to synchronize DAGs), it was becoming troublesome for multiple engineers to commit and push their changes without affecting each other. This made engineers very cautious before testing a new pipeline on a single repository.

Witnessing the above issues reducing each others’ productivity, we thought an isolated test environment for Airflow is required for everyone to safely develop and test Airflow DAGs. Thus, we have decided to provide easy access to a temporary Airflow while automating the process with Gitlab CI and a custom Gitlab Bot API (On top of the gidgetlab written by Benjamin Bertrand).

Overview

A simplified diagram describing the high-level architecture

Inspired by a tech blog post from NFTBank, it seemed fascinating to be able to deploy and withdraw test environments within a merge request (pull request). Not knowing how this could be done with Gitlab, followed by some research, we were successfully able to launch Akros’ first Gitlab Bot, Akrossy with an architecture described in the above diagram!

In this post, I will describe the whole process briefly in the following order.

  1. Setting up a Gitlab bot account
  2. Writing a skeleton for a custom Gitlab Bot using gidgetlab
  3. Setting up an Ingress and ClusterIssuer to redirect subdomain traffic to the above server and automatically issue a TLS certificate with Let’s Encrypt
  4. Registering a webhook on the (Airflow DAGs) Gitlab repository
  5. Preparing an override values YAML for an Airflow Helm Chart
  6. Scripts to be used by Gitlab Runner for deploying and withdrawing a test Airflow environment
  7. Triggering Gitlab pipelines using MR-related events (Deployment & Withdrawal)
  8. Using a custom implementation of the gidgetlab using Starlette

Steps

(1) Setting up a Gitlab bot account

We first need a new Gitlab account to be used as a bot, so simply go to https://gitlab.com and create one!

Introducing our Gitlab bot Akrossy!

Then we need to generate an SSH key pair and register it on the above account in User Settings — SSH Keys.

User Settings — SSH Keys

Also, don’t forget to create a new Access Token with API scope in User Settings — Access Tokens — Personal Access Tokens

User Settings — Access Tokens — Personal Access Tokens

(2) Writing a skeleton for a custom Gitlab Bot using gidgetlab

Before stepping in, the documentation and guide for gidgetlab can be found here in both its documentation and blog.

Configuration — (config.py)

import os
from dotenv import load_dotenv

load_dotenv()

# Server
PORT = 80

# Gitlab General
AKROSSY_USER_ID = XXXXXXXX # Edit to your bot user ID
GITLAB_PROJECT_ID = XXXXXXXX # Edit to your project ID

# Gitlab Bot Token
GL_ACCESS_TOKEN = os.getenv('GL_ACCESS_TOKEN')
GITLAB_ACCESS_TOKEN_HEADERS = { 'PRIVATE-TOKEN': GL_ACCESS_TOKEN }

# Gitlab Project's CI/CD
GITLAB_TEST_AIRFLOW_TRIGGER_TOKEN = os.getenv('GITLAB_TEST_AIRFLOW_TRIGGER_TOKEN')
GITLAB_PIPELINE_TRIGGER_URL = f"https://gitlab.com/api/v4/projects/{GITLAB_PROJECT_ID}/trigger/pipeline"
GITLAB_PIPELINE_URL = f"https://gitlab.com/api/v4/projects/{GITLAB_PROJECT_ID}/pipelines"
GITLAB_VALID_PIPELINE_STATUS_LIST = ['created', 'waiting_for_resource', 'preparing', 'pending', 'running']

# Const
COMMAND_DEPLOY = '/deploy'
COMMAND_WITHDRAW = '/withdraw'

Environment variables — (.env)

# Gitlab Access Token (Bot)
GL_ACCESS_TOKEN=xxxx-xxxxxx_xxxxxxxxxxxxxxxxxxx

# Gitlab Repo (This skeleton) CI/CD Pipeline Trigger Token
GITLAB_TEST_AIRFLOW_TRIGGER_TOKEN=xxxxx-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

Main — (main.py)

import http
import httpx
import logging
import uvicorn

from enum import Enum
from gidgetlab_custom.starlette import GitLabBot
from starlette.middleware.cors import CORSMiddleware

class AirflowStatus(Enum):
    DEPLOYABLE = 'DEPLOYABLE'
    RUNNING = 'RUNNING'
    ERROR = 'ERROR'

async def post_gl(gl, url, data):
    response = None
    try:
        response = await gl.post(url=url, data=data)
    except Exception as e:
        logging.error(e)

    return response

async def deploy_airflow(event, gl, url, merge_request_iid, test_airflow_k8s_name):
    # To be implemented
    pass

async def withdraw_airflow(event, gl, url, merge_request_iid, test_airflow_k8s_name):
    # To be implemented
    pass

async def get_airflow_status(event, gl, url, test_airflow_k8s_name: str):
    # To be implemented
    pass

@bot.webhook_router.register("Merge Request Hook", action="open")
async def merge_request_opened_event(event, gl, *args, **kwargs):
    """Whenever a merge request to the main branch is opened, greet the author and say thanks."""
    # To be implemented    
    pass

@bot.webhook_router.register("Note Hook", noteable_type="MergeRequest")
async def comment_on_merge_request_event(event, gl, *args, **kwargs):
    """Whenever a user asks for an Airflow server, it replies"""
    # To be implemented    
    pass

@bot.webhook_router.register("Merge Request Hook", action="close")
async def merge_request_closed_event(event, gl, *args, **kwargs):
    """Whenever a merge request to the main branch is closed, teardown test Airflow if present"""
    # To be implemented    
    pass

@bot.webhook_router.register("Merge Request Hook", action="merge")
async def merge_request_merged_event(event, gl, *args, **kwargs):
    """Whenever a merge request to the main branch is merged, teardown test Airflow if present"""
    # To be implemented    
    pass

@bot.webhook_router.register("Issue Hook", action="open")
async def issue_opened_event(event, gl, *args, **kwargs):
    """ Whenever an issue is opened, greet the author and say thanks """
    # Check whether your bot works with this!
    url = f"/projects/{event.project_id}/issues/{event.object_attributes['iid']}/notes"
    message = f"Thanks for the report @{event.data['user']['username']}! I will look into it tomorrow! :P"
    await post_gl(gl=gl, url=url, data={"body": message})

bot = GitLabBot(
    "akrosbot",
    access_token=GL_ACCESS_TOKEN,
    wait_consistency=False,
    debug=True
)

bot.add_middleware(
    CORSMiddleware,
    allow_origins=[],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

if __name__ == "__main__":
    uvicorn.run(
        "__main__:bot",
        host='0.0.0.0',
        port=PORT
    )

The above example only includes the implementation of an Issue Hook (Open) event for now and to test, we can use something like ngrok as a temporary reverse proxy for fronting this skeleton server.

Before testing, please make sure you have registered a webhook at the Airflow DAG repository in Project — Settings — Webhooks.

(Temporary) Webhook registered for Airflow DAG repository

Now we can successfully test the webhook by simply opening a new issue in the same repository like below.

Akrossy responding to an issue “open” event

(3) Setting up an Ingress and ClusterIssuer to redirect our sub-domain traffic to the above server and automatically issue a TLS certificate with Let’s Encrypt

This stage is for those who deployed the Gitlab bot on Kubernetes.

As we use Amazon Route 53 for a DNS service, we are utilising the following ClusterIssuer to issue a TLS certificate for the given domain.

apiVersion: cert-manager.io/v1
kind: ClusterIssuer
metadata:
  name: letsencrypt-prod
spec:
  acme:
    server: https://acme-v02.api.letsencrypt.org/directory
    email: contact@akrostec.com
    privateKeySecretRef:
      name: letsencrypt-prod
    solvers:
    - dns01:
        route53:
          region: ap-northeast-2
          accessKeyID: XXXXXXXXXXXXXXXXXXXXXXXX
          secretAccessKeySecretRef:
            name: prod-route53-credentials-secret
            key: secret-access-key

For the actual TLS certificate request, sub-domain redirection and reverse proxy, we created an Ingress as below.

apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: akros-gitlab-bot-ingress
  annotations:
    cert-manager.io/cluster-issuer: letsencrypt-prod
    kubernetes.io/ingress.class: "nginx"
    kubernetes.io/tls-acme: 'True'
    nginx.ingress.kubernetes.io/ssl-redirect: 'true'
    nginx.ingress.kubernetes.io/add-base-url: "true"
    nginx.ingress.kubernetes.io/rewrite-target: /$2
    nginx.ingress.kubernetes.io/configuration-snippet: |
      rewrite ^(/akrossy)$ $1/ redirect;
      add_header Cache-Control "max-age=0, no-cache, no-store, must-revalidate";
    # nginx.ingress.kubernetes.io/whitelist-source-range: X.X.X.X/X, X.X.X.X/X
spec:
  tls:
    - hosts:
        - "test.akrostec.com"
      secretName: test-akrostec-com
  rules:
  - host: "test.akrostec.com"
    http:
      paths:
      - path: "/akrossy(/|$)(.*)"
        pathType: Prefix
        backend:
          service:
            name: your-gitlab-bot-name
            port:
              number: X

Make sure to uncomment the whitelist source range and use them for your security needs.

(4) Registering a webhook on the (Airflow DAGs) Gitlab repository

If you have previously used ngrok to test the Gitlab bot, the provided URL is only temporary.

Following by previous Ingress setup, we now have proper subdomain redirection & TLS. Please re-register the webhook accordingly.

Webhook registered for Airflow DAG repository

(5) Preparing an override values YAML for an Airflow Helm Chart

To deploy a new test Airflow environment, we have first taken the common override values out of the default Helm values as below.

Overriding Helm Values — (helm/override.yml)

config:
  api:
    auth_backends: airflow.api.auth.backend.basic_auth
  celery:
    worker_concurrency: 16
  celery_kubernetes_executor:
    kubernetes_queue: kubernetes
  webserver:
    base_url: "http://akros.cluster/testairflow" # To be injected
    enable_proxy_fix: "True"
    expose_config: "True"
    rbac: "True"
dags:
  gitSync:
    branch: "test/test" # To be injected
    containerName: git-sync
    credentialsSecret: git-credentials
    depth: 1
    enabled: true
    env: [ ]
    extraVolumeMounts: [ ]
    maxFailures: 0
    repo: git@gitlab.com:xxxxxxxxxx/xxxxxxxxx/your-airflow-dag-repo.git # Change me
    resources: { }
    rev: HEAD
    securityContext: { }
    subPath: ""
    uid: 65533
    wait: 1
    sshKeySecret: "test-airflow-ssh-secret" # To be injected

defaultAirflowRepository: apache/airflow
defaultAirflowTag: 2.4.2

executor: KubernetesExecutor
multiNamespaceMode: true
extraEnv: | # To be injected
  - name: AIRFLOW__WEBSERVER__BASE_URL
    value: 'http://akros.cluster/testairflow'
  - name: AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC
    value: '15'
  - name: AIRFLOW__CORE__LOAD_EXAMPLES
    value: 'False'
ingress:
  web:
    annotations:
      kubernetes.io/ingress.class: "nginx"
      nginx.ingress.kubernetes.io/add-base-url: "true"
      nginx.ingress.kubernetes.io/configuration-snippet: |
        add_header Cache-Control "max-age=0, no-cache, no-store, must-revalidate";
    enabled: true
    host: "akros.cluster" # Change me
    hosts: [ ]
    ingressClassName: ""
    path: "/testairflow(/|$)(.*)" # To be injected
    pathType: Prefix
    precedingPaths: [ ]
    succeedingPaths: [ ]
    tls:
      enabled: false
      secretName: ""
webserver:
  defaultUser:
    email: gitlab@akrostec.com # Change me
    enabled: true
    firstName: Akrossy # Change me
    lastName: Roxy # Change me
    password: xxxxxxxx # Change me
    role: Admin
    username: akrossy # Change me

You may change the above YAML to suit your needs!

On top of these common override values, every test environment can have a different base URL, git-sync branch, git-sync secret name and Ingress path.

In the next step, we will feed them simply by passing some configuration values with the --set option in Helm install command.

When both of them are used, --set values are merged into --values with higher precedence.

(6) Scripts to be used by Gitlab Runner for deploying and withdrawing a test Airflow environment

For deployment and withdrawal of isolated test Airflow environments, we are using Gitlab CI/CD Pipelines with the Shell executor of Gitlab Runner. The Shell executor is a simple executor that is used to execute builds locally on the machine where Gitlab Runner is installed.

In this post, for simplicity, we are using the Shell Executor over others (e.g Kubernetes Executor).

Gitlab CI Script — (.gitlab-ci.yml)

stages:
  - test

variables:
  # Gitlab Project CI/CD Variables (Gitlab Bot)
  GL_ACCESS_TOKEN: $GL_ACCESS_TOKEN
  GITLAB_TEST_AIRFLOW_TRIGGER_TOKEN: $GITLAB_TEST_AIRFLOW_TRIGGER_TOKEN

deploy-test-airflow:
  stage: test
  variables:
    TEST_AIRFLOW_NAME: $TEST_AIRFLOW_NAME
    TEST_AIRFLOW_SOURCE_BRANCH: $TEST_AIRFLOW_SOURCE_BRANCH
  tags:
    - akros-master-shell
  script:
    - echo "Deploying test airflow..."
    - chmod +x ./deploy-test-airflow.bash
    - sh ./deploy-test-airflow.bash
    - echo "Successful!"
  only:
    variables:
      - $TRIGGER_JOB == "deploy-test-airflow"

withdraw-test-airflow:
  stage: test
  variables:
    TEST_AIRFLOW_NAME: $TEST_AIRFLOW_NAME
  tags:
    - akros-master-shell
  script:
    - echo "Withdrawing test airflow..."
    - chmod +x ./withdraw-test-airflow.bash
    - sh ./withdraw-test-airflow.bash
    - echo "Successful!"
  only:
    variables:
      - $TRIGGER_JOB == "withdraw-test-airflow"

Airflow Deployment — (deploy-test-airflow.bash)

export PATH="/var/lib/rancher/rke2/bin:$PATH"
export KUBECONFIG="/etc/rancher/rke2/rke2.yaml"

AIRFLOW_NAME="test-airflow-$TEST_AIRFLOW_NAME"
GIT_SYNC_SECRET_NAME="$AIRFLOW_NAME-ssh-secret"

echo "TEST_AIRFLOW_NAME=$TEST_AIRFLOW_NAME"
echo "TEST_AIRFLOW_SOURCE_BRANCH=$TEST_AIRFLOW_SOURCE_BRANCH"

echo "------------------------------------------------------------------"
echo "Creating k8s namespace $AIRFLOW_NAME..."
kubectl create namespace "$AIRFLOW_NAME"

echo "------------------------------------------------------------------"
echo "Creating a git ssh secret $GIT_SYNC_SECRET_NAME..."
kubectl create secret generic "$GIT_SYNC_SECRET_NAME" \
  --from-file=gitSshKey=/home/ubuntu/.ssh/id_rsa-akrossy \
  --from-file=known_hosts=/home/ubuntu/.ssh/known_hosts \
  --from-file=id_rsa.pub=/home/ubuntu/.ssh/id_rsa-akrossy.pub -n "$AIRFLOW_NAME"

echo "------------------------------------------------------------------"
echo "Installing a test Airflow Helm chart $AIRFLOW_NAME"
helm repo add apache-airflow https://airflow.apache.org
helm repo update
helm install "$AIRFLOW_NAME" apache-airflow/airflow \
 -f helm/override.yml \
 --set config.webserver.base_url="http://akros.cluster/$AIRFLOW_NAME" \
 --set dags.gitSync.branch="$TEST_AIRFLOW_SOURCE_BRANCH" \
 --set dags.gitSync.sshKeySecret="$GIT_SYNC_SECRET_NAME" \
 --set "extraEnv=- name: AIRFLOW__WEBSERVER__BASE_URL
  value: \'http://akros.cluster/$AIRFLOW_NAME\'
- name: AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC
  value: \'15\'
- name: AIRFLOW__CORE__LOAD_EXAMPLES
  value: \'False\'" \
 --set ingress.web.path="/$AIRFLOW_NAME(/|$)(.*)" \
 -n "$AIRFLOW_NAME"

For creating a Kubernetes Secret, we can use the SSH key pair we generated in the very first step.

Airflow Withdrawal — (withdraw-test-airflow.bash)

export PATH="/var/lib/rancher/rke2/bin:$PATH"
export KUBECONFIG="/etc/rancher/rke2/rke2.yaml"

AIRFLOW_NAME="test-airflow-$TEST_AIRFLOW_NAME"
GIT_SYNC_SECRET_NAME="$AIRFLOW_NAME-ssh-secret"

kubectl delete secret "$GIT_SYNC_SECRET_NAME" -n "$AIRFLOW_NAME"
helm uninstall "$AIRFLOW_NAME" -n "$AIRFLOW_NAME"
kubectl delete namespace "$AIRFLOW_NAME"

So we are ready to dive into the highlight of implementing the second step’s skeleton to trigger the above pipelines for deployment and withdrawal!

(7) Triggering Gitlab pipelines using MR-related events (Deployment & Withdrawal)

Once the deployment or withdrawal pipeline is triggered, we use a simple polling technique to keep track of the pipeline status.

deploy_airflow()

async def deploy_airflow(event, gl, url, merge_request_iid, test_airflow_k8s_name):
    trigger_response = None

    # Triggering Gitlab job to deploy airflow
    async with httpx.AsyncClient() as client:
        trigger_response = await client.post(
            url=GITLAB_PIPELINE_TRIGGER_URL,
            timeout=60,
            data={
                'token': GITLAB_TEST_AIRFLOW_TRIGGER_TOKEN,
                'ref': 'main',
                'variables[TRIGGER_JOB]': 'deploy-test-airflow',
                'variables[TEST_AIRFLOW_NAME]': merge_request_iid,
                'variables[TEST_AIRFLOW_SOURCE_BRANCH]': event.data['merge_request']['source_branch']
            }
        )
        trigger_response = trigger_response.json()
        logging.info(trigger_response)

    if trigger_response and trigger_response['status'] == 'created':
        message = f"""Fabulous @{event.data['user']['username']},   
        *Operation Enduring Freedom* has begun, deploying a West Coast-based SEAL Team..."""
        await post_gl(gl=gl, url=url, data={"body": message})

        pipeline_id = trigger_response['id']
        pipeline_status = trigger_response['status']
        
        # Polling until the pipeline finishes
        while pipeline_status in GITLAB_VALID_PIPELINE_STATUS_LIST:
            logging.info(f'Sending GET requests to {GITLAB_PIPELINE_URL}/{pipeline_id}...')

            pipeline_response = None

            async with httpx.AsyncClient() as client:
                pipeline_response = await client.get(
                    url=f'{GITLAB_PIPELINE_URL}/{pipeline_id}',
                    headers=GITLAB_ACCESS_TOKEN_HEADERS,
                    timeout=60
                )
                pipeline_response = pipeline_response.json()
                logging.info(f'pipeline_response={pipeline_response}')

            if 'status' in pipeline_response:
                pipeline_status = pipeline_response['status']
                logging.info(f"status = {pipeline_status}")
            else:
                logging.info(f'something went wrong!, res={pipeline_response}')
                break

            await gl.sleep(5)

        logging.info(f'pipeline_status = {pipeline_status}')

        if pipeline_status == 'success':
            message = f"""Fantastic @{event.data['user']['username']}, *Operation Successful!*    
            --------------------------------------------    
            http://akros.cluster/{test_airflow_k8s_name}   
            username: akrossy   
            password: akrossy   
            --------------------------------------------    
            """
            await post_gl(gl=gl, url=url, data={"body": message})
            return True
        else:
            message = f"""Apologies @{event.data['user']['username']}, *Operation Failed!*   
            Not able to create our private Airflow. Pipeline status -> *{pipeline_status}*"""
            await post_gl(gl=gl, url=url, data={"body": message})
            return False

    else:
        message = f"""My apologies @{event.data['user']['username']}, I wasn't able to create your Airflow.   
        I wish I had been more thoughtful.   
        Trigger response: {trigger_response}"""
        await post_gl(gl=gl, url=url, data={"body": message})
        return False

withdraw_airflow()

async def withdraw_airflow(event, gl, url, merge_request_iid, test_airflow_k8s_name):
    trigger_response = None

    # Triggering Gitlab job to withdraw airflow
    async with httpx.AsyncClient() as client:
        trigger_response = await client.post(
            url=GITLAB_PIPELINE_TRIGGER_URL,
            timeout=60,
            data={
                'token': GITLAB_TEST_AIRFLOW_TRIGGER_TOKEN,
                'ref': 'main',
                'variables[TRIGGER_JOB]': 'withdraw-test-airflow',
                'variables[TEST_AIRFLOW_NAME]': merge_request_iid,
            }
        )
        trigger_response = trigger_response.json()
        logging.info(trigger_response)

    if trigger_response and trigger_response['status'] == 'created':
        message = f"""Fabulous @{event.data['user']['username']}, *Operation Neptune Spear* has begun! \r\n"""
        await post_gl(gl=gl, url=url, data={"body": message})

        pipeline_id = trigger_response['id']
        pipeline_status = trigger_response['status']

        # Polling until the pipeline finishes
        while pipeline_status in GITLAB_VALID_PIPELINE_STATUS_LIST:
            logging.info(f'Sending GET requests to {GITLAB_PIPELINE_URL}/{pipeline_id}...')

            pipeline_response = None

            async with httpx.AsyncClient() as client:
                pipeline_response = await client.get(
                    url=f'{GITLAB_PIPELINE_URL}/{pipeline_id}',
                    headers=GITLAB_ACCESS_TOKEN_HEADERS,
                    timeout=60
                )
                pipeline_response = pipeline_response.json()
                logging.info(f'pipeline_response={pipeline_response}')

            if 'status' in pipeline_response:
                pipeline_status = pipeline_response['status']
                logging.info(pipeline_status)
            else:
                logging.info(f'something went wrong!, res={pipeline_response}')
                break

            await gl.sleep(5)

        logging.info(f'pipeline_status = {pipeline_status}')

        if pipeline_status == 'success':
            message = f"""Astonishing @{event.data['user']['username']}, *Operation Successful!*   
            The private Airflow [{test_airflow_k8s_name}] is successfully withdrawn!"""
            await post_gl(gl=gl, url=url, data={"body": message})
            return True

        message = f"""Blyat! Apologies @{event.data['user']['username']}, *Operation Failed!*   
        Not able to withdraw your Airflow. pipeline status -> *{pipeline_status}*"""
        await post_gl(gl=gl, url=url, data={"body": message})
        return False
    else:
        message = f"""Blyat! Apologies @{event.data['user']['username']},   
        I wasn't able to withdraw your Airflow.   
        *Trigger Response*: {trigger_response}"""
        await post_gl(gl=gl, url=url, data={"body": message})
        return False

get_airflow_status()

Here we are using our internal API to check whether the deployed Kubernetes Service object exists however you can simply use Kubernetes Python Client to do the same thing!

async def get_airflow_status(event, gl, url, test_airflow_k8s_name: str):
    k8s_response = None
    
    # Checking the given Airflow's status on k8s
    k8s_response = await check_airflow_service_on_kubernetes(name=test_airflow_k8s_name)

    message = '*Airflow status check*: '
    if not k8s_response or k8s_response.status_code not in [http.HTTPStatus.OK, http.HTTPStatus.NO_CONTENT]:
        message += f"""Oooooops @{event.data['user']['username']}, Akros Resource Manager seems to offline."""
        await post_gl(gl=gl, url=url, data={"body": message})
        return AirflowStatus.ERROR

    service_names = k8s_response.json()['result']['names']
    logging.info(f"service names = {service_names}")
    if f"{test_airflow_k8s_name}-webserver" in service_names:
        message += f"""Found {test_airflow_k8s_name}-webserver is running on K8S Master."""
        await post_gl(gl=gl, url=url, data={"body": message})
        return AirflowStatus.RUNNING

    return AirflowStatus.DEPLOYABLE

merge_request_opened_event()

Similar to the issue opened event we tried in the step (2), the bot sends greetings when a user opens up a new MR into the main branch.

@bot.webhook_router.register("Merge Request Hook", action="open")
async def merge_request_opened_event(event, gl, *args, **kwargs):
    """Whenever a merge request to main branch is opened, greet the author and say thanks."""
    url = f"/projects/{event.project_id}/merge_requests/{event.object_attributes['iid']}/notes"

    target_branch = event.object_attributes['target_branch']
    if target_branch == 'main':
        message = f"""Thanks for the merge request to *{target_branch}* branch, @{event.data['user']['username']}!   
        Just between you and me, by the way, shall we get a private Airflow?   
           
        Command Hint: /deploy | /withdraw
           
        """

    else:
        message = f"""Thanks for the merge request, but I am only available at the main branch office.   
        Welcome anyway to {target_branch} branch, @{event.data['user']['username']}!
        """

    await post_gl(gl=gl, url=url, data={"body": message})

comment_on_merge_request_event()

Here, we first make sure the target branch is main and see whether the user comment is Airflow command /deploy and /withdraw.

@bot.webhook_router.register("Note Hook", noteable_type="MergeRequest")
async def comment_on_merge_request_event(event, gl, *args, **kwargs):
    """Whenever a user asks for an Airflow server, it replies"""

    # Do not reply to bot itself
    if event.object_attributes['author_id'] == AKROSSY_USER_ID:
        return

    commands = [COMMAND_DEPLOY, COMMAND_WITHDRAW]

    # Only react on main branch MR
    if 'merge_request' in event.data and event.data['merge_request']['target_branch'] == 'main':
        merge_request_iid = event.data['merge_request']['iid']
        user_comment = event.object_attributes['note']

        if user_comment in commands:
            url = f"/projects/{event.project_id}/merge_requests/{merge_request_iid}/notes"
            test_airflow_k8s_name = f"test-airflow-{merge_request_iid}"

            # Check if the airflow is already deployed
            status = await get_airflow_status(
                event=event,
                gl=gl,
                url=url,
                test_airflow_k8s_name=test_airflow_k8s_name
            )

            if user_comment == COMMAND_DEPLOY:
                if not status == AirflowStatus.DEPLOYABLE:
                    message = f"*Error*, Airflow is already running, @{event.data['user']['username']}! \r\n"
                    await post_gl(gl=gl, url=url, data={"body": message})
                    return

                # Deploy Airflow
                result = await deploy_airflow(
                    event=event,
                    gl=gl,
                    url=url,
                    merge_request_iid=merge_request_iid,
                    test_airflow_k8s_name=test_airflow_k8s_name
                )

                logging.info(f'DEPLOY RESULT={result}')
            elif user_comment == COMMAND_WITHDRAW:
                if not status == AirflowStatus.RUNNING:
                    message = f"*Error*, There's no Airflow running, @{event.data['user']['username']}! \r\n"
                    await post_gl(gl=gl, url=url, data={"body": message})
                    return

                # Teardown Airflow
                result = await withdraw_airflow(
                    event=event,
                    gl=gl,
                    url=url,
                    merge_request_iid=merge_request_iid,
                    test_airflow_k8s_name=test_airflow_k8s_name
                )
                logging.info(f'WITHDRAW RESULT={result}')

merge_request_closed_event() & merge_request_merged_event()

The two methods below are very similar in the way they both react to MR status change and then make sure the test environment is withdrawn.

@bot.webhook_router.register("Merge Request Hook", action="close")
async def merge_request_closed_event(event, gl, *args, **kwargs):
    """Whenever a merge request to main branch is closed, teardown test Airflow if present"""
    if event.object_attributes['target_branch'] == 'main':
        merge_request_iid = event.object_attributes['iid']
        url = f"/projects/{event.project_id}/merge_requests/{merge_request_iid}/notes"
        test_airflow_k8s_name = f"test-airflow-{merge_request_iid}"

        message = f"Striking decision to close MR, @{event.data['user']['username']}! \r\n"
        await post_gl(gl=gl, url=url, data={"body": message})

        # Check if the airflow is running
        status = await get_airflow_status(
            event=event,
            gl=gl,
            url=url,
            test_airflow_k8s_name=test_airflow_k8s_name
        )

        if not status == AirflowStatus.RUNNING:
            return

        # Teardown Airflow
        await withdraw_airflow(
            event=event,
            gl=gl,
            url=url,
            merge_request_iid=merge_request_iid,
            test_airflow_k8s_name=test_airflow_k8s_name
        )

@bot.webhook_router.register("Merge Request Hook", action="merge")
async def merge_request_merged_event(event, gl, *args, **kwargs):
    """Whenever a merge request to main branch is merged, teardown test Airflow if present"""
    if event.object_attributes['target_branch'] == 'main':
        merge_request_iid = event.object_attributes['iid']
        url = f"/projects/{event.project_id}/merge_requests/{merge_request_iid}/notes"
        test_airflow_k8s_name = f"test-airflow-{merge_request_iid}"

        message = f"Breathtaking decision to merge MR, @{event.data['user']['username']}! My palms are sweaty too! \r\n"
        await post_gl(gl=gl, url=url, data={"body": message})

        # Check if the airflow is running
        status = await get_airflow_status(
            event=event,
            gl=gl,
            url=url,
            test_airflow_k8s_name=test_airflow_k8s_name
        )

        if not status == AirflowStatus.RUNNING:
            return

        # Teardown Airflow
        await withdraw_airflow(
            event=event,
            gl=gl,
            url=url,
            merge_request_iid=merge_request_iid,
            test_airflow_k8s_name=test_airflow_k8s_name
        )

(8) Using a custom implementation of the bot using Starlette

While polling the pipeline status in step (7), tasks were cancelled in the middle and the issue was found to be that the Gitlab bot wasn’t sending any reply to the webhook until the handler was finished.

Fortunately, the author of gidgetlab, Benjamin kindly shared with me a new custom implementation of the bot using Starlette. In the custom implementation, the handler is executed in a background task so the webhook receives a response without holding.

So after copying the gidgetlab project into the bot project and applying the following custom implementation, the issues were fixed!

import cachetools
import httpx
import gidgetlab
import gidgetlab.abc
import gidgetlab.routing
from typing import Any, Optional, Sequence, Union
from starlette.datastructures import Secret
from starlette.applications import Starlette
from starlette.responses import Response, PlainTextResponse
from starlette.requests import Request
from starlette.routing import Route
from starlette.background import BackgroundTask
from starlette.exceptions import HTTPException
from gidgetlab import sansio
from gidgetlab import httpx as gl_httpx

class GitLabBot(Starlette):
    def __init__(
        self,
        requester: str,
        *,
        secret: Union[str, Secret] = None,
        access_token: Union[str, Secret] = None,
        cache: Optional[gidgetlab.abc.CACHE_TYPE] = None,
        wait_consistency: Optional[bool] = True,
        routers: Sequence[gidgetlab.routing.Router] = None,
        debug: bool = False,
        **kwargs: Any,
    ) -> None:
        self.requester = requester
        self.secret = secret
        self.access_token = access_token
        self.cache = cache or cachetools.LRUCache(maxsize=500)
        self.wait_consistency = wait_consistency
        # Additional keyword arguments to pass to GitLabAPI (url and api_version)
        self.kwargs = kwargs
        if routers is not None:
            self._webhook_router = gidgetlab.routing.Router(*routers)
        routes = [
            Route("/", self.webhook_handler, methods=["POST"]),
            Route("/health", self.health_handler),
        ]
        super().__init__(
            debug=debug,
            routes=routes,
            on_startup=[self.create_gl],
            on_shutdown=[self.close_gl_client],
        )

    @property
    def webhook_router(self) -> gidgetlab.routing.Router:
        """The bot :class:`gidgetlab.routing.Router` instance that routes webhooks events callbacks"""
        if not hasattr(self, "_webhook_router"):
            self._webhook_router = gidgetlab.routing.Router()
        return self._webhook_router

    async def create_gl(self) -> None:
        """Startup handler that creates the GitLabAPI instance"""
        client = httpx.AsyncClient()
        self.state.gl = gl_httpx.GitLabAPI(
            client,
            self.requester,
            cache=self.cache,
            access_token=str(self.access_token),
            **self.kwargs,
        )

    async def close_gl_client(self) -> None:
        """Shutdown handler that closes the httpx client"""
        await self.state.gl._client.aclose()

    async def health_handler(self, request: Request) -> Response:
        """Handler to check the health of the bot

        Return 'Bot OK'
        """
        return PlainTextResponse("Bot OK")

    async def webhook_handler(self, request: Request) -> Response:
        """Handler that processes GitLab webhook requests"""
        body = await request.body()
        try:
            event = sansio.Event.from_http(request.headers, body, secret=self.secret)
        except gidgetlab.HTTPException as e:
            raise HTTPException(status_code=e.status_code, detail=str(e))
        except gidgetlab.GitLabException as e:
            raise HTTPException(status_code=500, detail=str(e))
        # Call the appropriate callback(s) for the event in a background task
        task = BackgroundTask(self.webhook_router.dispatch, event, request.app.state.gl)
        return Response(status_code=200, background=task)

Result

Cheers! in Wolf of Wall Street

Congratulations on following all the steps, we can now see the test environments are successfully deployed and withdrawn within a Kubernetes cluster as below!

Deployed Airflow shown in a Kubernetes IDE (Lens)
Airflow Web (Deployed to our private cloud at http://akros.cluster/test-airflow-155)

Conclusion

Thank you for reading this and hope you enjoyed this article!

If have any questions regarding this post please let me know in the comments below!

Akros Technologies is also hiring! If you want to be part of our team and solve exciting problems make sure you visit us on the career page and follow us on Medium!

Reference