Airflow Operators
Airflow operators are core components of any workflow defined in airflow. The operator represents a single task that runs independently without sharing any information. Operators can execute various actions such as python function, bash command, SQL query, triggering API, sending email, and performing conditional operations. In airflow documentation, operator and task terms are used interchangeably. Tasks used to manage the execution of an operator in a DAG.
Types of Airflow Operators
Airflow has a wide variety of built-in operators that can perform the required operation such as python function, bash command, SQL query, triggering API, sending email, and performing conditional operations. Let’s see a few examples of airflow operators:
- BashOperator: It is used to execute a bash command
- PythonOperator: It is used to run the python callable or python function.
- EmailOperator: It is used to send emails to the receiver
- MySqlOperator: It is used to run the SQL query for MySql Database.
- S3ToHiveOperator: It transfers data from Amazon S3 to Hive.
- HttpOperator: It is used to trigger an HTTP endpoint.
- BranchingOperator: Just like PythonOperator except that it expects a python_callable that returns a task_id.
These operators are created from the base class BaseOperator, It defines all the required properties and methods.
Airflow DAG
In Airflow, various operators execute in a sequence and perform a series of tasks to accomplish a specific goal such as extracting the data(e.g. ETL), model training, and model prediction. These sequences of operations know as a pipeline or data pipelines.
Data pipelines comprise the several tasks that are used to run in order to achieve a desired result. These pipelines can be represented by a graph or more specifically by DAG (directed acyclic graph). This sequence of tasks is represented by DAGs (Directed Acyclic Graphs) that represent a workflow. DAG is used to define the dependencies between tasks. The main goal of DAG is to orchestrate the execution of operators. It ensures dependencies, sequences, starts, and stop criteria.
Acyclic property is the most important concept of DAG that prevents us from executing circular dependencies between tasks. These circular dependencies become more problematic during the execution of tasks in a sequence.
Creating a BashOperator
BashOperator or Bash Task is used to run a Bash script in the DAG. BashOperator is a type of operator used to create a task that executes any bash or Linux command. We can create a BashOperator in Airflow using BashOperator class. Let’s create a Bashoperator in the below example:
# my first bashoperator dag
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
dag = DAG(
dag_id="random_number_dag",
start_date=datetime(2023, 2, 13, 5, 12, 0),
schedule_interval='* * * * *',
)
generate_random_number = BashOperator(
task_id="generate_random_number",
bash_command='echo $RANDOM',
dag=dag,
)
In the above example, we have first defined the dag using the DAG class with class fields such as dag_id, start_date, and schedule_interval. After that, BashOperator is defined using BashOperator class with class fields task_id, bash_command, and dag object.
Creating a PythonOperator
PythonOperator or Python Function Task is used to run a Python script in the DAG. PythonOperator is a type of operator used to create a task that executes any python callable function. We can create a PythonOperator in Airflow using PythonOperator class. Let’s create a PythonOperator in the below example:
# my first pythonoperator dag
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
dag = DAG(
dag_id="my_python_dag",
start_date=datetime(2023, 2, 13, 5, 12, 0),
schedule_interval='* * * * *',
)
def print_message():
print("This is a Python Operator.")
python_task = PythonOperator(
task_id="print_message",
python_callable=print_message,
dag=dag
)
In the above example, we have first defined the dag using the DAG class with class fields such as dag_id, start_date, and schedule_interval. After that, PythonOperator is defined using PythonOperator class with class fields task_id, python_callable, and dag object.
Combine Multiple Operators
In this example, we will create DAG with multiple operators such as BashOpertor and PythonOperator. For executing multiple tasks or operators, we need to define the complete order of tasks with dependencies among the tasks. Dependencies can be referred to as upstream or downstream tasks. Here Upstream means before and Downstream means after.
- >>, or the upstream operator
- <<, or the downstream operator
e.g. task1 >> task2 means the first task runs before the second task. or we can also define similar dependency in this way, task2 << task1.
- Chain Dependencies: a linear sequence of tasks, that can be defined as e.g.
task1 >> task2 >> task3 >> task4
- Mixed Dependencies: Sequence of tasks that can be defined as e.g.
task1 >> task2 << task3
or
task1 >> task2
task3 >> task2
Let’s create a DAG with multiple operators in the below example:
# my first multiple tasks dag
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
dag = DAG(
dag_id="my_first_dag",
start_date=datetime(2023, 2, 13, 5, 12, 0),
schedule_interval='* * * * *',
)
def print_message():
print("This is a Python Operator.")
python_task = PythonOperator(
task_id="print_message",
python_callable=print_message,
dag=dag
)
generate_random_number = BashOperator(
task_id="generate_random_number",
bash_command='echo $RANDOM',
dag=dag,
)
notify = BashOperator(
task_id="notify",
bash_command='echo "random number generated"',
dag=dag,
)
python_task >> generate_random_number >> notify
In the above example, we have first defined the dag using the DAG class with class fields such as dag_id, start_date, and schedule_interval. BashOperator is defined using BashOperator class with class fields task_id, bash_command, and dag object. After that, PythonOperator is defined using PythonOperator class with class fields task_id, python_callable, and dag object. Finally, we defined the dependencies in a chain using the upstream operator(>>).
Summary
In this article, we have discussed the various types of Airflow operators such as BashOperator, PythonOperator, EmailOperator, MySqlOperator, S3ToHiveOperater, HttpOperator, and BranchingOperator. We have also implemented the BashOperator, PythonOperator, and multiple combinations of both operators.