How to Deploy Automated & Isolated Airflow Test Environments with Gitlab Bot on Kubernetes
Background
In Akros Technologies, every engineer uses Apache Airflow as a workflow scheduler for various tasks such as running alpha-inference, portfolio operation, and building financial data pipelines.
When we first deployed Apache Airflow on our RKE2 Kubernetes cluster, everything seemed flawless in the beginning, however, soon later we started to face problems, especially when multiple engineers were developing and testing Airflow DAGs on the same repository.
One problem was that making changes to an already running Airflow DAG could affect this DAG in a way that either a scheduler changing the task status to removed or new tasks being executed within the already running DAG.
As git-sync is enabled (Git integration to synchronize DAGs), it was becoming troublesome for multiple engineers to commit and push their changes without affecting each other. This made engineers very cautious before testing a new pipeline on a single repository.
Witnessing the above issues reducing each others’ productivity, we thought an isolated test environment for Airflow is required for everyone to safely develop and test Airflow DAGs. Thus, we have decided to provide easy access to a temporary Airflow while automating the process with Gitlab CI and a custom Gitlab Bot API (On top of the gidgetlab written by Benjamin Bertrand).
Overview
Inspired by a tech blog post from NFTBank, it seemed fascinating to be able to deploy and withdraw test environments within a merge request (pull request). Not knowing how this could be done with Gitlab, followed by some research, we were successfully able to launch Akros’ first Gitlab Bot, Akrossy with an architecture described in the above diagram!
In this post, I will describe the whole process briefly in the following order.
- Setting up a Gitlab bot account
- Writing a skeleton for a custom Gitlab Bot using gidgetlab
- Setting up an Ingress and ClusterIssuer to redirect subdomain traffic to the above server and automatically issue a TLS certificate with Let’s Encrypt
- Registering a webhook on the (Airflow DAGs) Gitlab repository
- Preparing an override values YAML for an Airflow Helm Chart
- Scripts to be used by Gitlab Runner for deploying and withdrawing a test Airflow environment
- Triggering Gitlab pipelines using MR-related events (Deployment & Withdrawal)
- Using a custom implementation of the gidgetlab using Starlette
Steps
(1) Setting up a Gitlab bot account
We first need a new Gitlab account to be used as a bot, so simply go to https://gitlab.com and create one!
Then we need to generate an SSH key pair and register it on the above account in User Settings — SSH Keys.
Also, don’t forget to create a new Access Token with API scope in User Settings — Access Tokens — Personal Access Tokens
(2) Writing a skeleton for a custom Gitlab Bot using gidgetlab
Before stepping in, the documentation and guide for gidgetlab can be found here in both its documentation and blog.
Configuration — (config.py
)
import os
from dotenv import load_dotenv
load_dotenv()
# Server
PORT = 80
# Gitlab General
AKROSSY_USER_ID = XXXXXXXX # Edit to your bot user ID
GITLAB_PROJECT_ID = XXXXXXXX # Edit to your project ID
# Gitlab Bot Token
GL_ACCESS_TOKEN = os.getenv('GL_ACCESS_TOKEN')
GITLAB_ACCESS_TOKEN_HEADERS = { 'PRIVATE-TOKEN': GL_ACCESS_TOKEN }
# Gitlab Project's CI/CD
GITLAB_TEST_AIRFLOW_TRIGGER_TOKEN = os.getenv('GITLAB_TEST_AIRFLOW_TRIGGER_TOKEN')
GITLAB_PIPELINE_TRIGGER_URL = f"https://gitlab.com/api/v4/projects/{GITLAB_PROJECT_ID}/trigger/pipeline"
GITLAB_PIPELINE_URL = f"https://gitlab.com/api/v4/projects/{GITLAB_PROJECT_ID}/pipelines"
GITLAB_VALID_PIPELINE_STATUS_LIST = ['created', 'waiting_for_resource', 'preparing', 'pending', 'running']
# Const
COMMAND_DEPLOY = '/deploy'
COMMAND_WITHDRAW = '/withdraw'
Environment variables — (.env
)
# Gitlab Access Token (Bot)
GL_ACCESS_TOKEN=xxxx-xxxxxx_xxxxxxxxxxxxxxxxxxx
# Gitlab Repo (This skeleton) CI/CD Pipeline Trigger Token
GITLAB_TEST_AIRFLOW_TRIGGER_TOKEN=xxxxx-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
Main — (main.py
)
import http
import httpx
import logging
import uvicorn
from enum import Enum
from gidgetlab_custom.starlette import GitLabBot
from starlette.middleware.cors import CORSMiddleware
class AirflowStatus(Enum):
DEPLOYABLE = 'DEPLOYABLE'
RUNNING = 'RUNNING'
ERROR = 'ERROR'
async def post_gl(gl, url, data):
response = None
try:
response = await gl.post(url=url, data=data)
except Exception as e:
logging.error(e)
return response
async def deploy_airflow(event, gl, url, merge_request_iid, test_airflow_k8s_name):
# To be implemented
pass
async def withdraw_airflow(event, gl, url, merge_request_iid, test_airflow_k8s_name):
# To be implemented
pass
async def get_airflow_status(event, gl, url, test_airflow_k8s_name: str):
# To be implemented
pass
@bot.webhook_router.register("Merge Request Hook", action="open")
async def merge_request_opened_event(event, gl, *args, **kwargs):
"""Whenever a merge request to the main branch is opened, greet the author and say thanks."""
# To be implemented
pass
@bot.webhook_router.register("Note Hook", noteable_type="MergeRequest")
async def comment_on_merge_request_event(event, gl, *args, **kwargs):
"""Whenever a user asks for an Airflow server, it replies"""
# To be implemented
pass
@bot.webhook_router.register("Merge Request Hook", action="close")
async def merge_request_closed_event(event, gl, *args, **kwargs):
"""Whenever a merge request to the main branch is closed, teardown test Airflow if present"""
# To be implemented
pass
@bot.webhook_router.register("Merge Request Hook", action="merge")
async def merge_request_merged_event(event, gl, *args, **kwargs):
"""Whenever a merge request to the main branch is merged, teardown test Airflow if present"""
# To be implemented
pass
@bot.webhook_router.register("Issue Hook", action="open")
async def issue_opened_event(event, gl, *args, **kwargs):
""" Whenever an issue is opened, greet the author and say thanks """
# Check whether your bot works with this!
url = f"/projects/{event.project_id}/issues/{event.object_attributes['iid']}/notes"
message = f"Thanks for the report @{event.data['user']['username']}! I will look into it tomorrow! :P"
await post_gl(gl=gl, url=url, data={"body": message})
bot = GitLabBot(
"akrosbot",
access_token=GL_ACCESS_TOKEN,
wait_consistency=False,
debug=True
)
bot.add_middleware(
CORSMiddleware,
allow_origins=[],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
if __name__ == "__main__":
uvicorn.run(
"__main__:bot",
host='0.0.0.0',
port=PORT
)
The above example only includes the implementation of an Issue Hook (Open) event for now and to test, we can use something like ngrok as a temporary reverse proxy for fronting this skeleton server.
Before testing, please make sure you have registered a webhook at the Airflow DAG repository in Project — Settings — Webhooks.
Now we can successfully test the webhook by simply opening a new issue in the same repository like below.
(3) Setting up an Ingress and ClusterIssuer to redirect our sub-domain traffic to the above server and automatically issue a TLS certificate with Let’s Encrypt
This stage is for those who deployed the Gitlab bot on Kubernetes.
As we use Amazon Route 53 for a DNS service, we are utilising the following ClusterIssuer to issue a TLS certificate for the given domain.
apiVersion: cert-manager.io/v1
kind: ClusterIssuer
metadata:
name: letsencrypt-prod
spec:
acme:
server: https://acme-v02.api.letsencrypt.org/directory
email: contact@akrostec.com
privateKeySecretRef:
name: letsencrypt-prod
solvers:
- dns01:
route53:
region: ap-northeast-2
accessKeyID: XXXXXXXXXXXXXXXXXXXXXXXX
secretAccessKeySecretRef:
name: prod-route53-credentials-secret
key: secret-access-key
For the actual TLS certificate request, sub-domain redirection and reverse proxy, we created an Ingress as below.
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: akros-gitlab-bot-ingress
annotations:
cert-manager.io/cluster-issuer: letsencrypt-prod
kubernetes.io/ingress.class: "nginx"
kubernetes.io/tls-acme: 'True'
nginx.ingress.kubernetes.io/ssl-redirect: 'true'
nginx.ingress.kubernetes.io/add-base-url: "true"
nginx.ingress.kubernetes.io/rewrite-target: /$2
nginx.ingress.kubernetes.io/configuration-snippet: |
rewrite ^(/akrossy)$ $1/ redirect;
add_header Cache-Control "max-age=0, no-cache, no-store, must-revalidate";
# nginx.ingress.kubernetes.io/whitelist-source-range: X.X.X.X/X, X.X.X.X/X
spec:
tls:
- hosts:
- "test.akrostec.com"
secretName: test-akrostec-com
rules:
- host: "test.akrostec.com"
http:
paths:
- path: "/akrossy(/|$)(.*)"
pathType: Prefix
backend:
service:
name: your-gitlab-bot-name
port:
number: X
Make sure to uncomment the whitelist source range and use them for your security needs.
(4) Registering a webhook on the (Airflow DAGs) Gitlab repository
If you have previously used ngrok to test the Gitlab bot, the provided URL is only temporary.
Following by previous Ingress setup, we now have proper subdomain redirection & TLS. Please re-register the webhook accordingly.
(5) Preparing an override values YAML for an Airflow Helm Chart
To deploy a new test Airflow environment, we have first taken the common override values out of the default Helm values as below.
Overriding Helm Values — (helm/override.yml
)
config:
api:
auth_backends: airflow.api.auth.backend.basic_auth
celery:
worker_concurrency: 16
celery_kubernetes_executor:
kubernetes_queue: kubernetes
webserver:
base_url: "http://akros.cluster/testairflow" # To be injected
enable_proxy_fix: "True"
expose_config: "True"
rbac: "True"
dags:
gitSync:
branch: "test/test" # To be injected
containerName: git-sync
credentialsSecret: git-credentials
depth: 1
enabled: true
env: [ ]
extraVolumeMounts: [ ]
maxFailures: 0
repo: git@gitlab.com:xxxxxxxxxx/xxxxxxxxx/your-airflow-dag-repo.git # Change me
resources: { }
rev: HEAD
securityContext: { }
subPath: ""
uid: 65533
wait: 1
sshKeySecret: "test-airflow-ssh-secret" # To be injected
defaultAirflowRepository: apache/airflow
defaultAirflowTag: 2.4.2
executor: KubernetesExecutor
multiNamespaceMode: true
extraEnv: | # To be injected
- name: AIRFLOW__WEBSERVER__BASE_URL
value: 'http://akros.cluster/testairflow'
- name: AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC
value: '15'
- name: AIRFLOW__CORE__LOAD_EXAMPLES
value: 'False'
ingress:
web:
annotations:
kubernetes.io/ingress.class: "nginx"
nginx.ingress.kubernetes.io/add-base-url: "true"
nginx.ingress.kubernetes.io/configuration-snippet: |
add_header Cache-Control "max-age=0, no-cache, no-store, must-revalidate";
enabled: true
host: "akros.cluster" # Change me
hosts: [ ]
ingressClassName: ""
path: "/testairflow(/|$)(.*)" # To be injected
pathType: Prefix
precedingPaths: [ ]
succeedingPaths: [ ]
tls:
enabled: false
secretName: ""
webserver:
defaultUser:
email: gitlab@akrostec.com # Change me
enabled: true
firstName: Akrossy # Change me
lastName: Roxy # Change me
password: xxxxxxxx # Change me
role: Admin
username: akrossy # Change me
You may change the above YAML to suit your needs!
On top of these common override values, every test environment can have a different base URL, git-sync branch, git-sync secret name and Ingress path.
In the next step, we will feed them simply by passing some configuration values with the --set
option in Helm install command.
When both of them are used, --set
values are merged into --values
with higher precedence.
(6) Scripts to be used by Gitlab Runner for deploying and withdrawing a test Airflow environment
For deployment and withdrawal of isolated test Airflow environments, we are using Gitlab CI/CD Pipelines with the Shell executor of Gitlab Runner. The Shell executor is a simple executor that is used to execute builds locally on the machine where Gitlab Runner is installed.
In this post, for simplicity, we are using the Shell Executor over others (e.g Kubernetes Executor).
Gitlab CI Script — (.gitlab-ci.yml
)
stages:
- test
variables:
# Gitlab Project CI/CD Variables (Gitlab Bot)
GL_ACCESS_TOKEN: $GL_ACCESS_TOKEN
GITLAB_TEST_AIRFLOW_TRIGGER_TOKEN: $GITLAB_TEST_AIRFLOW_TRIGGER_TOKEN
deploy-test-airflow:
stage: test
variables:
TEST_AIRFLOW_NAME: $TEST_AIRFLOW_NAME
TEST_AIRFLOW_SOURCE_BRANCH: $TEST_AIRFLOW_SOURCE_BRANCH
tags:
- akros-master-shell
script:
- echo "Deploying test airflow..."
- chmod +x ./deploy-test-airflow.bash
- sh ./deploy-test-airflow.bash
- echo "Successful!"
only:
variables:
- $TRIGGER_JOB == "deploy-test-airflow"
withdraw-test-airflow:
stage: test
variables:
TEST_AIRFLOW_NAME: $TEST_AIRFLOW_NAME
tags:
- akros-master-shell
script:
- echo "Withdrawing test airflow..."
- chmod +x ./withdraw-test-airflow.bash
- sh ./withdraw-test-airflow.bash
- echo "Successful!"
only:
variables:
- $TRIGGER_JOB == "withdraw-test-airflow"
Airflow Deployment — (deploy-test-airflow.bash
)
export PATH="/var/lib/rancher/rke2/bin:$PATH"
export KUBECONFIG="/etc/rancher/rke2/rke2.yaml"
AIRFLOW_NAME="test-airflow-$TEST_AIRFLOW_NAME"
GIT_SYNC_SECRET_NAME="$AIRFLOW_NAME-ssh-secret"
echo "TEST_AIRFLOW_NAME=$TEST_AIRFLOW_NAME"
echo "TEST_AIRFLOW_SOURCE_BRANCH=$TEST_AIRFLOW_SOURCE_BRANCH"
echo "------------------------------------------------------------------"
echo "Creating k8s namespace $AIRFLOW_NAME..."
kubectl create namespace "$AIRFLOW_NAME"
echo "------------------------------------------------------------------"
echo "Creating a git ssh secret $GIT_SYNC_SECRET_NAME..."
kubectl create secret generic "$GIT_SYNC_SECRET_NAME" \
--from-file=gitSshKey=/home/ubuntu/.ssh/id_rsa-akrossy \
--from-file=known_hosts=/home/ubuntu/.ssh/known_hosts \
--from-file=id_rsa.pub=/home/ubuntu/.ssh/id_rsa-akrossy.pub -n "$AIRFLOW_NAME"
echo "------------------------------------------------------------------"
echo "Installing a test Airflow Helm chart $AIRFLOW_NAME"
helm repo add apache-airflow https://airflow.apache.org
helm repo update
helm install "$AIRFLOW_NAME" apache-airflow/airflow \
-f helm/override.yml \
--set config.webserver.base_url="http://akros.cluster/$AIRFLOW_NAME" \
--set dags.gitSync.branch="$TEST_AIRFLOW_SOURCE_BRANCH" \
--set dags.gitSync.sshKeySecret="$GIT_SYNC_SECRET_NAME" \
--set "extraEnv=- name: AIRFLOW__WEBSERVER__BASE_URL
value: \'http://akros.cluster/$AIRFLOW_NAME\'
- name: AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC
value: \'15\'
- name: AIRFLOW__CORE__LOAD_EXAMPLES
value: \'False\'" \
--set ingress.web.path="/$AIRFLOW_NAME(/|$)(.*)" \
-n "$AIRFLOW_NAME"
For creating a Kubernetes Secret, we can use the SSH key pair we generated in the very first step.
Airflow Withdrawal — (withdraw-test-airflow.bash
)
export PATH="/var/lib/rancher/rke2/bin:$PATH"
export KUBECONFIG="/etc/rancher/rke2/rke2.yaml"
AIRFLOW_NAME="test-airflow-$TEST_AIRFLOW_NAME"
GIT_SYNC_SECRET_NAME="$AIRFLOW_NAME-ssh-secret"
kubectl delete secret "$GIT_SYNC_SECRET_NAME" -n "$AIRFLOW_NAME"
helm uninstall "$AIRFLOW_NAME" -n "$AIRFLOW_NAME"
kubectl delete namespace "$AIRFLOW_NAME"
So we are ready to dive into the highlight of implementing the second step’s skeleton to trigger the above pipelines for deployment and withdrawal!
(7) Triggering Gitlab pipelines using MR-related events (Deployment & Withdrawal)
Once the deployment or withdrawal pipeline is triggered, we use a simple polling technique to keep track of the pipeline status.
deploy_airflow()
async def deploy_airflow(event, gl, url, merge_request_iid, test_airflow_k8s_name):
trigger_response = None
# Triggering Gitlab job to deploy airflow
async with httpx.AsyncClient() as client:
trigger_response = await client.post(
url=GITLAB_PIPELINE_TRIGGER_URL,
timeout=60,
data={
'token': GITLAB_TEST_AIRFLOW_TRIGGER_TOKEN,
'ref': 'main',
'variables[TRIGGER_JOB]': 'deploy-test-airflow',
'variables[TEST_AIRFLOW_NAME]': merge_request_iid,
'variables[TEST_AIRFLOW_SOURCE_BRANCH]': event.data['merge_request']['source_branch']
}
)
trigger_response = trigger_response.json()
logging.info(trigger_response)
if trigger_response and trigger_response['status'] == 'created':
message = f"""Fabulous @{event.data['user']['username']},
*Operation Enduring Freedom* has begun, deploying a West Coast-based SEAL Team..."""
await post_gl(gl=gl, url=url, data={"body": message})
pipeline_id = trigger_response['id']
pipeline_status = trigger_response['status']
# Polling until the pipeline finishes
while pipeline_status in GITLAB_VALID_PIPELINE_STATUS_LIST:
logging.info(f'Sending GET requests to {GITLAB_PIPELINE_URL}/{pipeline_id}...')
pipeline_response = None
async with httpx.AsyncClient() as client:
pipeline_response = await client.get(
url=f'{GITLAB_PIPELINE_URL}/{pipeline_id}',
headers=GITLAB_ACCESS_TOKEN_HEADERS,
timeout=60
)
pipeline_response = pipeline_response.json()
logging.info(f'pipeline_response={pipeline_response}')
if 'status' in pipeline_response:
pipeline_status = pipeline_response['status']
logging.info(f"status = {pipeline_status}")
else:
logging.info(f'something went wrong!, res={pipeline_response}')
break
await gl.sleep(5)
logging.info(f'pipeline_status = {pipeline_status}')
if pipeline_status == 'success':
message = f"""Fantastic @{event.data['user']['username']}, *Operation Successful!*
--------------------------------------------
http://akros.cluster/{test_airflow_k8s_name}
username: akrossy
password: akrossy
--------------------------------------------
"""
await post_gl(gl=gl, url=url, data={"body": message})
return True
else:
message = f"""Apologies @{event.data['user']['username']}, *Operation Failed!*
Not able to create our private Airflow. Pipeline status -> *{pipeline_status}*"""
await post_gl(gl=gl, url=url, data={"body": message})
return False
else:
message = f"""My apologies @{event.data['user']['username']}, I wasn't able to create your Airflow.
I wish I had been more thoughtful.
Trigger response: {trigger_response}"""
await post_gl(gl=gl, url=url, data={"body": message})
return False
withdraw_airflow()
async def withdraw_airflow(event, gl, url, merge_request_iid, test_airflow_k8s_name):
trigger_response = None
# Triggering Gitlab job to withdraw airflow
async with httpx.AsyncClient() as client:
trigger_response = await client.post(
url=GITLAB_PIPELINE_TRIGGER_URL,
timeout=60,
data={
'token': GITLAB_TEST_AIRFLOW_TRIGGER_TOKEN,
'ref': 'main',
'variables[TRIGGER_JOB]': 'withdraw-test-airflow',
'variables[TEST_AIRFLOW_NAME]': merge_request_iid,
}
)
trigger_response = trigger_response.json()
logging.info(trigger_response)
if trigger_response and trigger_response['status'] == 'created':
message = f"""Fabulous @{event.data['user']['username']}, *Operation Neptune Spear* has begun! \r\n"""
await post_gl(gl=gl, url=url, data={"body": message})
pipeline_id = trigger_response['id']
pipeline_status = trigger_response['status']
# Polling until the pipeline finishes
while pipeline_status in GITLAB_VALID_PIPELINE_STATUS_LIST:
logging.info(f'Sending GET requests to {GITLAB_PIPELINE_URL}/{pipeline_id}...')
pipeline_response = None
async with httpx.AsyncClient() as client:
pipeline_response = await client.get(
url=f'{GITLAB_PIPELINE_URL}/{pipeline_id}',
headers=GITLAB_ACCESS_TOKEN_HEADERS,
timeout=60
)
pipeline_response = pipeline_response.json()
logging.info(f'pipeline_response={pipeline_response}')
if 'status' in pipeline_response:
pipeline_status = pipeline_response['status']
logging.info(pipeline_status)
else:
logging.info(f'something went wrong!, res={pipeline_response}')
break
await gl.sleep(5)
logging.info(f'pipeline_status = {pipeline_status}')
if pipeline_status == 'success':
message = f"""Astonishing @{event.data['user']['username']}, *Operation Successful!*
The private Airflow [{test_airflow_k8s_name}] is successfully withdrawn!"""
await post_gl(gl=gl, url=url, data={"body": message})
return True
message = f"""Blyat! Apologies @{event.data['user']['username']}, *Operation Failed!*
Not able to withdraw your Airflow. pipeline status -> *{pipeline_status}*"""
await post_gl(gl=gl, url=url, data={"body": message})
return False
else:
message = f"""Blyat! Apologies @{event.data['user']['username']},
I wasn't able to withdraw your Airflow.
*Trigger Response*: {trigger_response}"""
await post_gl(gl=gl, url=url, data={"body": message})
return False
get_airflow_status()
Here we are using our internal API to check whether the deployed Kubernetes Service object exists however you can simply use Kubernetes Python Client to do the same thing!
async def get_airflow_status(event, gl, url, test_airflow_k8s_name: str):
k8s_response = None
# Checking the given Airflow's status on k8s
k8s_response = await check_airflow_service_on_kubernetes(name=test_airflow_k8s_name)
message = '*Airflow status check*: '
if not k8s_response or k8s_response.status_code not in [http.HTTPStatus.OK, http.HTTPStatus.NO_CONTENT]:
message += f"""Oooooops @{event.data['user']['username']}, Akros Resource Manager seems to offline."""
await post_gl(gl=gl, url=url, data={"body": message})
return AirflowStatus.ERROR
service_names = k8s_response.json()['result']['names']
logging.info(f"service names = {service_names}")
if f"{test_airflow_k8s_name}-webserver" in service_names:
message += f"""Found {test_airflow_k8s_name}-webserver is running on K8S Master."""
await post_gl(gl=gl, url=url, data={"body": message})
return AirflowStatus.RUNNING
return AirflowStatus.DEPLOYABLE
merge_request_opened_event()
Similar to the issue opened event we tried in the step (2), the bot sends greetings when a user opens up a new MR into the main branch.
@bot.webhook_router.register("Merge Request Hook", action="open")
async def merge_request_opened_event(event, gl, *args, **kwargs):
"""Whenever a merge request to main branch is opened, greet the author and say thanks."""
url = f"/projects/{event.project_id}/merge_requests/{event.object_attributes['iid']}/notes"
target_branch = event.object_attributes['target_branch']
if target_branch == 'main':
message = f"""Thanks for the merge request to *{target_branch}* branch, @{event.data['user']['username']}!
Just between you and me, by the way, shall we get a private Airflow?
Command Hint: /deploy | /withdraw
"""
else:
message = f"""Thanks for the merge request, but I am only available at the main branch office.
Welcome anyway to {target_branch} branch, @{event.data['user']['username']}!
"""
await post_gl(gl=gl, url=url, data={"body": message})
comment_on_merge_request_event()
Here, we first make sure the target branch is main and see whether the user comment is Airflow command /deploy
and /withdraw
.
@bot.webhook_router.register("Note Hook", noteable_type="MergeRequest")
async def comment_on_merge_request_event(event, gl, *args, **kwargs):
"""Whenever a user asks for an Airflow server, it replies"""
# Do not reply to bot itself
if event.object_attributes['author_id'] == AKROSSY_USER_ID:
return
commands = [COMMAND_DEPLOY, COMMAND_WITHDRAW]
# Only react on main branch MR
if 'merge_request' in event.data and event.data['merge_request']['target_branch'] == 'main':
merge_request_iid = event.data['merge_request']['iid']
user_comment = event.object_attributes['note']
if user_comment in commands:
url = f"/projects/{event.project_id}/merge_requests/{merge_request_iid}/notes"
test_airflow_k8s_name = f"test-airflow-{merge_request_iid}"
# Check if the airflow is already deployed
status = await get_airflow_status(
event=event,
gl=gl,
url=url,
test_airflow_k8s_name=test_airflow_k8s_name
)
if user_comment == COMMAND_DEPLOY:
if not status == AirflowStatus.DEPLOYABLE:
message = f"*Error*, Airflow is already running, @{event.data['user']['username']}! \r\n"
await post_gl(gl=gl, url=url, data={"body": message})
return
# Deploy Airflow
result = await deploy_airflow(
event=event,
gl=gl,
url=url,
merge_request_iid=merge_request_iid,
test_airflow_k8s_name=test_airflow_k8s_name
)
logging.info(f'DEPLOY RESULT={result}')
elif user_comment == COMMAND_WITHDRAW:
if not status == AirflowStatus.RUNNING:
message = f"*Error*, There's no Airflow running, @{event.data['user']['username']}! \r\n"
await post_gl(gl=gl, url=url, data={"body": message})
return
# Teardown Airflow
result = await withdraw_airflow(
event=event,
gl=gl,
url=url,
merge_request_iid=merge_request_iid,
test_airflow_k8s_name=test_airflow_k8s_name
)
logging.info(f'WITHDRAW RESULT={result}')
merge_request_closed_event()
& merge_request_merged_event()
The two methods below are very similar in the way they both react to MR status change and then make sure the test environment is withdrawn.
@bot.webhook_router.register("Merge Request Hook", action="close")
async def merge_request_closed_event(event, gl, *args, **kwargs):
"""Whenever a merge request to main branch is closed, teardown test Airflow if present"""
if event.object_attributes['target_branch'] == 'main':
merge_request_iid = event.object_attributes['iid']
url = f"/projects/{event.project_id}/merge_requests/{merge_request_iid}/notes"
test_airflow_k8s_name = f"test-airflow-{merge_request_iid}"
message = f"Striking decision to close MR, @{event.data['user']['username']}! \r\n"
await post_gl(gl=gl, url=url, data={"body": message})
# Check if the airflow is running
status = await get_airflow_status(
event=event,
gl=gl,
url=url,
test_airflow_k8s_name=test_airflow_k8s_name
)
if not status == AirflowStatus.RUNNING:
return
# Teardown Airflow
await withdraw_airflow(
event=event,
gl=gl,
url=url,
merge_request_iid=merge_request_iid,
test_airflow_k8s_name=test_airflow_k8s_name
)
@bot.webhook_router.register("Merge Request Hook", action="merge")
async def merge_request_merged_event(event, gl, *args, **kwargs):
"""Whenever a merge request to main branch is merged, teardown test Airflow if present"""
if event.object_attributes['target_branch'] == 'main':
merge_request_iid = event.object_attributes['iid']
url = f"/projects/{event.project_id}/merge_requests/{merge_request_iid}/notes"
test_airflow_k8s_name = f"test-airflow-{merge_request_iid}"
message = f"Breathtaking decision to merge MR, @{event.data['user']['username']}! My palms are sweaty too! \r\n"
await post_gl(gl=gl, url=url, data={"body": message})
# Check if the airflow is running
status = await get_airflow_status(
event=event,
gl=gl,
url=url,
test_airflow_k8s_name=test_airflow_k8s_name
)
if not status == AirflowStatus.RUNNING:
return
# Teardown Airflow
await withdraw_airflow(
event=event,
gl=gl,
url=url,
merge_request_iid=merge_request_iid,
test_airflow_k8s_name=test_airflow_k8s_name
)
(8) Using a custom implementation of the bot using Starlette
While polling the pipeline status in step (7), tasks were cancelled in the middle and the issue was found to be that the Gitlab bot wasn’t sending any reply to the webhook until the handler was finished.
Fortunately, the author of gidgetlab, Benjamin kindly shared with me a new custom implementation of the bot using Starlette. In the custom implementation, the handler is executed in a background task so the webhook receives a response without holding.
So after copying the gidgetlab project into the bot project and applying the following custom implementation, the issues were fixed!
import cachetools
import httpx
import gidgetlab
import gidgetlab.abc
import gidgetlab.routing
from typing import Any, Optional, Sequence, Union
from starlette.datastructures import Secret
from starlette.applications import Starlette
from starlette.responses import Response, PlainTextResponse
from starlette.requests import Request
from starlette.routing import Route
from starlette.background import BackgroundTask
from starlette.exceptions import HTTPException
from gidgetlab import sansio
from gidgetlab import httpx as gl_httpx
class GitLabBot(Starlette):
def __init__(
self,
requester: str,
*,
secret: Union[str, Secret] = None,
access_token: Union[str, Secret] = None,
cache: Optional[gidgetlab.abc.CACHE_TYPE] = None,
wait_consistency: Optional[bool] = True,
routers: Sequence[gidgetlab.routing.Router] = None,
debug: bool = False,
**kwargs: Any,
) -> None:
self.requester = requester
self.secret = secret
self.access_token = access_token
self.cache = cache or cachetools.LRUCache(maxsize=500)
self.wait_consistency = wait_consistency
# Additional keyword arguments to pass to GitLabAPI (url and api_version)
self.kwargs = kwargs
if routers is not None:
self._webhook_router = gidgetlab.routing.Router(*routers)
routes = [
Route("/", self.webhook_handler, methods=["POST"]),
Route("/health", self.health_handler),
]
super().__init__(
debug=debug,
routes=routes,
on_startup=[self.create_gl],
on_shutdown=[self.close_gl_client],
)
@property
def webhook_router(self) -> gidgetlab.routing.Router:
"""The bot :class:`gidgetlab.routing.Router` instance that routes webhooks events callbacks"""
if not hasattr(self, "_webhook_router"):
self._webhook_router = gidgetlab.routing.Router()
return self._webhook_router
async def create_gl(self) -> None:
"""Startup handler that creates the GitLabAPI instance"""
client = httpx.AsyncClient()
self.state.gl = gl_httpx.GitLabAPI(
client,
self.requester,
cache=self.cache,
access_token=str(self.access_token),
**self.kwargs,
)
async def close_gl_client(self) -> None:
"""Shutdown handler that closes the httpx client"""
await self.state.gl._client.aclose()
async def health_handler(self, request: Request) -> Response:
"""Handler to check the health of the bot
Return 'Bot OK'
"""
return PlainTextResponse("Bot OK")
async def webhook_handler(self, request: Request) -> Response:
"""Handler that processes GitLab webhook requests"""
body = await request.body()
try:
event = sansio.Event.from_http(request.headers, body, secret=self.secret)
except gidgetlab.HTTPException as e:
raise HTTPException(status_code=e.status_code, detail=str(e))
except gidgetlab.GitLabException as e:
raise HTTPException(status_code=500, detail=str(e))
# Call the appropriate callback(s) for the event in a background task
task = BackgroundTask(self.webhook_router.dispatch, event, request.app.state.gl)
return Response(status_code=200, background=task)
Result
Congratulations on following all the steps, we can now see the test environments are successfully deployed and withdrawn within a Kubernetes cluster as below!
Conclusion
Thank you for reading this and hope you enjoyed this article!
If have any questions regarding this post please let me know in the comments below!
Akros Technologies is also hiring! If you want to be part of our team and solve exciting problems make sure you visit us on the career page and follow us on Medium!
Reference
- https://docs.gitlab.com/ee/development/internal_users.html
- https://beenje.github.io/blog/posts/building-a-gitlab-bot-using-gidgetlab-and-aiohttp/
- https://gitlab.com/beenje/gidgetlab
- https://github.com/kubernetes/ingress-nginx/
- https://blog.nftbank.ai/nftbank에서-airflow-데이터-파이프라인을-안전하게-빠르게-개발과-테스트를-할-수-있는-이유-653aa18b683e