Big DataData Engineering

Airflow Operators

Airflow operators are core components of any workflow defined in airflow. The operator represents a single task that runs independently without sharing any information. Operators can execute various actions such as python function, bash command, SQL query, triggering API, sending email, and performing conditional operations. In airflow documentation, operator and task terms are used interchangeably. Tasks used to manage the execution of an operator in a DAG.

Tasks and operators

Types of Airflow Operators

Airflow has a wide variety of built-in operators that can perform the required operation such as python function, bash command, SQL query, triggering API, sending email, and performing conditional operations. Let’s see a few examples of airflow operators:

  • BashOperator: It is used to execute a bash command
  • PythonOperator: It is used to run the python callable or python function.
  • EmailOperator: It is used to send emails to the receiver
  • MySqlOperator: It is used to run the SQL query for MySql Database.
  • S3ToHiveOperator: It transfers data from Amazon S3 to Hive.
  • HttpOperator: It is used to trigger an HTTP endpoint.
  • BranchingOperator: Just like PythonOperator except that it expects a python_callable that returns a task_id.

These operators are created from the base class BaseOperator, It defines all the required properties and methods.

Airflow DAG

In Airflow, various operators execute in a sequence and perform a series of tasks to accomplish a specific goal such as extracting the data(e.g. ETL), model training, and model prediction. These sequences of operations know as a pipeline or data pipelines. 

Directed Acyclic Graph

Data pipelines comprise the several tasks that are used to run in order to achieve a desired result. These pipelines can be represented by a graph or more specifically by DAG (directed acyclic graph). This sequence of tasks is represented by DAGs (Directed Acyclic Graphs) that represent a workflow. DAG is used to define the dependencies between tasks. The main goal of DAG is to orchestrate the execution of operators. It ensures dependencies, sequences, starts, and stop criteria. 

Directed Acyclic Vs Cyclic graphs

Acyclic property is the most important concept of DAG that prevents us from executing circular dependencies between tasks. These circular dependencies become more problematic during the execution of tasks in a sequence. 

Creating a BashOperator

BashOperator or Bash Task is used to run a Bash script in the DAG. BashOperator is a type of operator used to create a task that executes any bash or Linux command. We can create a BashOperator in Airflow using BashOperator class. Let’s create a Bashoperator in the below example: 

# my first bashoperator dag

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

dag = DAG(
    dag_id="random_number_dag",
    start_date=datetime(2023, 2, 13, 5, 12, 0),
    schedule_interval='* * * * *',
)

generate_random_number = BashOperator(
    task_id="generate_random_number",
    bash_command='echo $RANDOM',
    dag=dag,
)

In the above example, we have first defined the dag using the DAG class with class fields such as dag_id, start_date, and schedule_interval. After that, BashOperator is defined using BashOperator class with class fields task_id, bash_command, and dag object.

Creating a PythonOperator

PythonOperator or Python Function Task is used to run a Python script in the DAG. PythonOperator is a type of operator used to create a task that executes any python callable function. We can create a PythonOperator in Airflow using PythonOperator class. Let’s create a PythonOperator in the below example:

# my first pythonoperator dag

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

dag = DAG(
    dag_id="my_python_dag",
    start_date=datetime(2023, 2, 13, 5, 12, 0),
    schedule_interval='* * * * *',
)


def print_message():
    print("This is a Python Operator.")


python_task = PythonOperator(
    task_id="print_message", 
    python_callable=print_message, 
    dag=dag
)

In the above example, we have first defined the dag using the DAG class with class fields such as dag_id, start_date, and schedule_interval. After that, PythonOperator is defined using PythonOperator class with class fields task_id, python_callable, and dag object.

Combine Multiple Operators

In this example, we will create DAG with multiple operators such as BashOpertor and PythonOperator. For executing multiple tasks or operators, we need to define the complete order of tasks with dependencies among the tasks. Dependencies can be referred to as upstream or downstream tasks. Here Upstream means before and Downstream means after.

  • >>, or the upstream operator
  • <<, or the downstream operator

e.g. task1 >> task2 means the first task runs before the second task. or we can also define similar dependency in this way, task2 << task1. 

  • Chain Dependencies: a linear sequence of tasks, that can be defined as e.g. 

task1 >> task2 >> task3 >> task4

Chain Dependencies
  • Mixed Dependencies: Sequence of tasks that can be defined as e.g. 

task1 >> task2 << task3 

or 

task1 >> task2
task3 >> task2

Mixed Dependencies

Let’s create a DAG with multiple operators in the below example:

# my first multiple tasks dag

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime

dag = DAG(
    dag_id="my_first_dag",
    start_date=datetime(2023, 2, 13, 5, 12, 0),
    schedule_interval='* * * * *',
)


def print_message():
    print("This is a Python Operator.")


python_task = PythonOperator(
    task_id="print_message", 
    python_callable=print_message, 
    dag=dag
)


generate_random_number = BashOperator(
    task_id="generate_random_number",
    bash_command='echo $RANDOM',
    dag=dag,
)

notify = BashOperator(
    task_id="notify",
    bash_command='echo "random number generated"',
    dag=dag,
)

python_task >> generate_random_number >> notify

In the above example, we have first defined the dag using the DAG class with class fields such as dag_id, start_date, and schedule_interval. BashOperator is defined using BashOperator class with class fields task_id, bash_command, and dag object. After that, PythonOperator is defined using PythonOperator class with class fields task_id, python_callable, and dag object. Finally, we defined the dependencies in a chain using the upstream operator(>>).

Summary

In this article, we have discussed the various types of Airflow operators such as BashOperator, PythonOperator, EmailOperator, MySqlOperator, S3ToHiveOperater, HttpOperator, and BranchingOperator. We have also implemented the BashOperator, PythonOperator, and multiple combinations of both operators.