Skip to main content

Porting your existing Python scripts to Airflow with CNDI

Overview

This tutorial is designed to walk you through the process of porting an existing Python script or application to CNDI and running the script on an Airflow instance that is deployed by CNDI. In this tutorial, we will primarily use the KubernetesPodOperator to execute the containerized Python script in a pod in a CNDI Kubernetes cluster.

Intended users

This tutorial is written both for data scientists and engineers who are looking to develop containerized workflows in Airflow. It would helpful for users to be familiar with containers, know how to create a base image with Docker and understand basic Airflow concepts before starting this tutorial

Intended Outcomes for Tutorial

In this tutorial, you will:

✅ Learn how to containerize a Python script with Docker
✅ Create and configure a Dockerfile for a Python script
✅ Build a custom Docker image
✅ Push a Docker image to the Docker Hub registry
✅ Installing packages using pip in a Python virtual environment
✅ Create an Airflow DAG in a CNDI Cluster
✅ Execute the Airflow DAG data pipeline that will run the containerized Python script as a single task in the pipeline.

Pre-Tutorial Prerequisites

  • CNDI is deployed on your local machine or locally on a cloud service provider
  • Python installed locally.
  • Docker Hub account.
  • Docker installed locally.

Complete Tutorial Code

Completed Python script - reddit_scraper.py

This is the completed Python script reddit_scraper.py that we ported to Airflow.

import datetime as dt
import pandas as pd
from psaw import PushshiftAPI

api = PushshiftAPI()

# Scraping the`data engineering` subreddit
api_request_generator = api.search_submissions(
start_epoch=int(dt.datetime(2010, 1, 1).timestamp()),
end_epoch=int(dt.datetime(2022, 1, 1).timestamp()),
subreddit='dataengineering',
q='(data)|(integration)',
limit=1000
)

df = pd.DataFrame([submission.d_ for submission in api_request_generator])
df.to_csv('/scratch/porting-python-scripts-project/data_engineering_dataset.csv')
Completed Airflow DAG - reddit_scraper_dag.py

This is the completed reddit_scraper_dag.py Airflow DAG

from airflow import DAG
from datetime import datetime
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import models as k8s

with DAG(
dag_id='reddit_scraper_dag',
catchup=False,
schedule_interval=None,
is_paused_upon_creation=True,
start_date=datetime(2021, 1, 1),
) as dag:

scrape_reddit_and_create_csv = KubernetesPodOperator(
name="scrape_reddit_and_create_csv",
task_id="scrape_reddit_and_create_csv",
namespace='airflow',
image='polyseamdev/port-to-airflow-tutorial:1.0.0',
image_pull_policy= 'Always',
volumes=[k8s.V1Volume( name='scratch-vol', persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name='scratch-vol'))],
volume_mounts=[k8s.V1VolumeMount(name='scratch-vol', mount_path='/scratch/', sub_path=None, read_only=False)]
)
Completed Dockerfile - Dockerfile
# set base image
FROM python:3.10-slim

# copy the Python script to the container
COPY reddit_scraper.py .

# copy requirements.txt to the container
COPY requirements.txt .

# install and upgrade the required package manager
RUN python -m pip install --upgrade pip

# install the required dependencies
RUN pip install --no-cache-dir -r requirements.txt

# execute this Python script inside the container.
CMD ["python", "./reddit_scraper.py"]

Tutorial

Original Python Script

This is the Python script that we will be porting to Airflow. The goal of this Python script is to scrape data from Reddit. This script contacts an external API called Pushshift API to fetch Reddit users' posts that contain the keywords from certain subreddits for a given range of dates then saves that data in CSV format into the porting-python-scripts-project directory on the user's computer.

import datetime as dt
import pandas as pd
from psaw import PushshiftAPI

api = PushshiftAPI()

# Scraping the`data engineering` subreddit
api_request_generator = api.search_submissions(
start_epoch=int(dt.datetime(2010, 1, 1).timestamp()),
end_epoch=int(dt.datetime(2022, 1, 1).timestamp()),
subreddit='dataengineering',
q='(data)|(integration)',
limit=1000
)

df = pd.DataFrame([submission.d_ for submission in api_request_generator])
df.to_csv('/porting-python-scripts-project/data_engineering_dataset.csv')

The diagram below shows the directory structure so far

📁 porting-python-scripts-project/
├──reddit_scraper.py
└── data_engineering_dataset.csv

Change the directory the where the CSV's will be generated

Edit into your reddit scraper.py Python script and change the directory where the CSVs are created.

The previous location of created CSV files in the reddit_scraper.py script was in the porting-python-scripts-project directory

import datetime as dt
import pandas as pd
from psaw import PushshiftAPI

api = PushshiftAPI()

# Scraping the`data engineering` subreddit
api_request_generator = api.search_submissions(
start_epoch=int(dt.datetime(2010, 1, 1).timestamp()),
end_epoch=int(dt.datetime(2022, 1, 1).timestamp()),
subreddit='dataengineering',
q='(data)|(integration)',
limit=1000
)
df = pd.DataFrame([submission.d_ for submission in api_request_generator])
df.to_csv('/porting-python-scripts-project/data_engineering_dataset.csv')

The new location of created CSV files is /scratch/ mount path

import datetime as dt
import pandas as pd
from psaw import PushshiftAPI

api = PushshiftAPI()

# Scraping the`data engineering` subreddit
api_request_generator = api.search_submissions(
start_epoch=int(dt.datetime(2010, 1, 1).timestamp()),
end_epoch=int(dt.datetime(2022, 1, 1).timestamp()),
subreddit='dataengineering',
q='(data)|(integration)',
limit=1000
)
df = pd.DataFrame([submission.d_ for submission in api_request_generator])
df.to_csv('/scratch/data_engineering_dataset.csv')

Using Python virtual environments

Create a Python virtual environment

A Python Virtual Environment is an isolated working copy of Python which allows you to work on a specific project without worry of affecting other projects. Make sure you are inside your project directory, in the terminal, create a Python virtual environment directory by typing :

Example command structure

python3 -m venv <directory>

Actual command

python3 -m venv myvenv

The above command tells the Python to use venv module that helps to create the virtual environment with the name myvenv.

Activate your new Python virtual environment directory

Once we have the virtual environment created, we need to activate that virtual environment.

Type the following command to activate Python virtual environment

Example command structure

source <directory>/bin/activate

Actual command

source myvenv/bin/activate

After successful activation of the virtual environment, you will see the name of the environment that got created will be prefixed to the command line terminal in rounded brackets.

Upgrade pip

The first thing you should do after installing and activating a virtual environment is to upgrade pip. pip is a module manager in Python that will install Python modules

Run the following command to upgrade pip:

python3 -m pip install --upgrade pip

Install Modules in the activated virtual environment directory

From now on, you can install any project-related modules. If we return to the reddit_scraper.py Python script, we can see that it needs to have the 3 external modules imported to run properly.

import datetime as dt
import pandas as pd
from psaw import PushshiftAPI

In order to use those 3 external modules, pandas, datetime, and psaw we will need to containerize those modules and their other dependencies into a docker image. You can install the modules by running the command in the activated virtual environment shell as follows:

pip install pandas datetime psaw

Save dependencies in the requirements.txt file

The command below records an environment's current module list or dependencies and saves those into a requirements.txt file. The requirements.txt contains the names and precise versions of all installed modules; this makes it easier to reproduce your environment.

pip freeze > requirements.txt

Deactivate your new Python virtual environment

Once we have the requirements.txt file created, it’s a good habit to deactivate myvenv. By deactivating, you leave the virtual environment. Without deactivating it, all other Python code you execute, even if it is outside of your project directory, will also run inside of it.

Type the following command to deactivate Python virtual environment

deactivate

Using Docker

Login into Docker Hub

Docker Hub is a cloud-based container registry provided by Docker for creating, finding, and sharing public or private container images. Use Sudo docker login to sign in to an existing Docker Hub account, so we can upload our newly built image in an upcoming step.

sudo docker login
tip

If you don’t want to always have to preface the docker command with sudo to avoid getting permission errors, Follow the Post-installation steps for Linux

Create a Dockerfile

Inside your porting-python-scripts-project directory, create a file called Dockerfile with no extension. A Dockerfile is a file that describes to Docker how to build the image. Each line and or instruction of the Dockerfile tells Docker to perform a specific task when building the image. Each instruction is written on one row in uppercase.

Example Dockerfile:

# set base image
FROM python:3.10-slim-bullseye

# copy requirements.txt to the container
COPY requirements.txt .

# copy the Python script to the container
COPY reddit_scraper.py .

# install and upgrade the required package manager
RUN python -m pip install --upgrade pip

# install the required dependencies
RUN pip install --no-cache-dir -r requirements.txt

# execute any this Python script inside the container.
CMD ["python", "./reddit_scraper.py"]

The diagram below shows the directory structure so far

📁 porting-python-scripts-project/
├──Dockerfile
├──reddit_scraper.py
└── data_engineering_dataset.csv

Build your Docker Image

Once you have a Dockerfile, you can build an image, using the docker build command. The -t flag tags our image with a name, we can reference. The first component of the name is the Docker username. Afterwards, a forward slash is used to separate the username from the image name. Finally, a colon is used followed by the name of a specific tag. The dot at the end of the docker build command tells that Docker should look for the Dockerfile in the current directory.

Example build structure

sudo docker build -t <docker-username>/<image-name>:<tag> <filepath>

Actual docker command

sudo docker build -t polyseamdev/port-to-airflow-tutorial:1.0.0 .

Push your custom Docker Image to Docker Hub

The next step is fairly simple. All we need to do is issue the docker push command specifying our docker username, image name, and tag. This command will upload your image to your account on Docker Hub, which you should be able to see in the web interface.

Example push structure

sudo docker push <docker-username>/<image-name>:<tag>

Actual docker command

sudo docker push polyseamdev/port-to-airflow-tutorial:1.0.0

Edit the permissions of the Airflow Dags directory

Make the /cluster/airflow_dags directory readable and writable by all users, so you can edit or create DAGs there

sudo chmod -R 777 /cluster/airflow_dags
info

When CNDI is deployed, it creates a cluster directory on your machine, it is located in the topmost directory or root directory (titled Computer on Ubuntu). To find the Computer Directory go to the Other Locations on the file browser. This cluster directory is where the DAGs, Python scripts, and data should be stored.

Creating Directed Acyclic Graphs in Airflow

Create a DAG in the Airflow Dags directory

Create a DAG named reddit_scraper_dag.py in the /cluster/airflow_dags directory Use the code editor of your choice, create a new file and enter the following three code blocks in the DAG

Import necessary libraries into your reddit_scraper_dag.py DAG

The first step to creating a DAG in Airflow is importing the DAG class. After importing the DAG class, comes the imports of operators. For each operator you want to use, you will have to make the corresponding import. For example, if we wish to execute a Bash command, we have Bash operator. DAGs are composed of tasks represented by operators.

In this DAG, we will be executing a Python script in a Kubernetes Pod, so we will need to import the KubernetesPodOperator and we also need to import the Kubernetes api models to access the cluster/scratchvol volume.

Finally, the last import is usually the datetime class as you need to specify a start date to your DAG.

from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import models as k8s
from datetime import datetime

Create the DAG object in your reddit_scraper_dag.py DAG

Create a DAG object and set the dag_id which is the unique name of the DAG to reddit_scraper_dag

Set the schedule_interval to None, this will make sure that the DAG will only be executed manually

Set the start_date with the datetime module, to any date in the past

with DAG(
dag_id='reddit_scraper_dag',
catchup=False,
schedule_interval=None,
is_paused_upon_creation=True,
start_date=datetime(2021, 1, 1),
) as dag:

Add KubernetesPodOperator to your reddit_scraper_dag.py DAG

The only task in our reddit_scraper_dag.py DAG will execute the reddit_scraper.py Python script.

Before we can run tasks using the KubernetesPodOperator, we need to fill in a few arguments that tell the KubernetesPodOperator how to run our task as a container:

The image argument tells Kubernetes which Docker image to use, while the cmds and args parameters define which executable to run and which arguments to pass to the executable. Note if you have multiple scripts in your Dockerfile you might want to add those parameters into the Kubernetes pod operator to specify what script to execute for that particular task You can learn how to create a custom Docker image here.

The remaining arguments tell Kubernetes, which namespace to run the pod in, and what name to use for the pod. We also supply two extra arguments, volumes and volume_mounts, which specify how the storage volumes should be mounted into the tasks in the Kubernetes pod.

Note, that the file path used in the arguments parameter is the file path that the Kubernetes Pod sees, e.g. the one declared under volume_mounts as mount_path parameter /scratch/. This is mapped by Kubernetes/CNDI to the directory /cluster/scratchvol on the controller node (local PC if you’re using a local CNDI deployment). The purpose of this scratch volume is to provide a shared storage volume that your DAG can write to.

scrape_reddit_and_create_csv = KubernetesPodOperator(
name="scrape_reddit_and_create_csv",# Name to use to run the pod
task_id="scrape_reddit_and_create_csv", # airflow task identifier
namespace='airflow',# Kubernetes namespace to run the pod in
image='polyseamdev/port-to-airflow-tutorial:1.0.0',# Which image to use
image_pull_policy= 'Always',# The image is pulled even if it already present locally.
volumes=[k8s.V1Volume( name='scratch-vol', persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name='scratch-vol'))],
volume_mounts=[k8s.V1VolumeMount(name='scratch-vol', mount_path='/scratch/', sub_path=None, read_only=False)])

Allow all users to access and execute the Dags in the Airflow dags Directory

sudo chmod -R 777 /cluster/airflow_dags/reddit_scraper_dag.py

Check if your Dag has been successfully executed

You are now finished writing your DAG! Go to the Airflow UI, trigger the DAG and check if you have successfully ported your Python script to Airflow

Airflow main display

The DAG ran successfully as we can see green boxes that represent a successful DAG run. If there had been an error the boxes would be red.

Tree view of reddit_scraper_dag

When you now go to the /cluster/scratchvol/ directory, you should see the file your script saved, data_engineering_dataset.csv

Directory where scraped data is converted to csv

info

If you want to gain access to the newly created CSV inside the scratch directory, you will need to set the permissions again in the terminal with:

sudo chmod 777 /cluster/scratchvol/data_engineering_dataset.csv

Conclusion

This tutorial was a step-by-step guide on how to execute containerized Python scripts in CNDI with Apache Airflow.

Next Steps