Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

How to pass the argument to tasks in airflow dag

I’m trying to execute python scripts and some shell commands in airflow dag based on a dictionary.

The code where I’m trying to achieve this is below. Here I don’t know how to pass the value of the dictionary "key" as the input to the python script – params.x doesn’t work, throws error saying "x" is undefined. Can you help me fixing this?

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

default_args = {
    'owner': 'Airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 0,
}

mydict = {
  1: "name1",
  2: "name2",
  3: "name3"
}

INDEX=1
with DAG('test_dag', default_args = default_args, schedule_interval=None, catchup=False) as dag:
    for x,y in mydict.items():        

        run_test_1 = BashOperator(
                task_id=f'task1_{y}',
                bash_command="python /path-to-the-script/simple_scirpt.py '{{ params.x }}'"
        )           

        run_test_2 = BashOperator(
                task_id=f'task2_{y}',
                bash_command="echo '{{ params.x }}'"
        )
        
        run_test_1 >> run_test_2
        INDEX += 1

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

>Solution :

params in Airflow enables you to pass in values from outside the DAG, e.g. when you trigger a DAG manually you can pass additional keys/values, which can then be used in your DAG via params. See https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/params.html for more information.

In your code, you have a hardcoded dictionary so there’s no use for params. You can use variables x and y in your code as such:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

default_args = {
    'owner': 'Airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 0,
}

mydict = {
  1: "name1",
  2: "name2",
  3: "name3"
}

INDEX=1
with DAG('test_dag', default_args = default_args, schedule_interval=None, catchup=False) as dag:
    for x,y in mydict.items():        

        run_test_1 = BashOperator(
                task_id=f'task1_{y}',
                bash_command=f"python /path-to-the-script/simple_scirpt.py '{x}'"
        )           

        run_test_2 = BashOperator(
                task_id=f'task2_{y}',
                bash_command=f"echo '{x}'"
        )
        
        run_test_1 >> run_test_2
        INDEX += 1
Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading