I’m trying to execute python scripts and some shell commands in airflow dag based on a dictionary.
The code where I’m trying to achieve this is below. Here I don’t know how to pass the value of the dictionary "key" as the input to the python script – params.x doesn’t work, throws error saying "x" is undefined. Can you help me fixing this?
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
default_args = {
'owner': 'Airflow',
'start_date': datetime(2023, 1, 1),
'retries': 0,
}
mydict = {
1: "name1",
2: "name2",
3: "name3"
}
INDEX=1
with DAG('test_dag', default_args = default_args, schedule_interval=None, catchup=False) as dag:
for x,y in mydict.items():
run_test_1 = BashOperator(
task_id=f'task1_{y}',
bash_command="python /path-to-the-script/simple_scirpt.py '{{ params.x }}'"
)
run_test_2 = BashOperator(
task_id=f'task2_{y}',
bash_command="echo '{{ params.x }}'"
)
run_test_1 >> run_test_2
INDEX += 1
>Solution :
params in Airflow enables you to pass in values from outside the DAG, e.g. when you trigger a DAG manually you can pass additional keys/values, which can then be used in your DAG via params. See https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/params.html for more information.
In your code, you have a hardcoded dictionary so there’s no use for params. You can use variables x and y in your code as such:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
default_args = {
'owner': 'Airflow',
'start_date': datetime(2023, 1, 1),
'retries': 0,
}
mydict = {
1: "name1",
2: "name2",
3: "name3"
}
INDEX=1
with DAG('test_dag', default_args = default_args, schedule_interval=None, catchup=False) as dag:
for x,y in mydict.items():
run_test_1 = BashOperator(
task_id=f'task1_{y}',
bash_command=f"python /path-to-the-script/simple_scirpt.py '{x}'"
)
run_test_2 = BashOperator(
task_id=f'task2_{y}',
bash_command=f"echo '{x}'"
)
run_test_1 >> run_test_2
INDEX += 1