Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

How to run Airflow S3 sensor exactly once?

I want to continue the DAG only if a csv file exists in S3, otherwise it should just end.
The DAG itself is being scheduled hourly.

with DAG(dag_id="my_dag",
         start_date=datetime(2023, 1, 1),
         schedule_interval='@hourly',
         catchup=False
         ) as dag:
    check_for_new_csv = S3KeySensor(
        task_id='check_for_new_csv',
        bucket_name='bucket-data',
        bucket_key='*.csv',
        wildcard_match=True,
        soft_fail=True,
        retries=1
    )
        start_instance = EC2StartInstanceOperator(
            task_id="start_ec2_instance_task",
            instance_id=INSTANCE_ID,
            region_name=REGION
        )

    check_for_new_csv >> start_instance

But the sensor seems to run forever – in the log I can see it keeps on running:

[2023-01-10, 15:02:06 UTC] {s3.py:98} INFO - Poking for key : s3://bucket-data/*.csv    
[2023-01-10, 15:03:08 UTC] {s3.py:98} INFO - Poking for key : s3://bucket-data/*.csv

Maybe the sensor in not the best choice for such logic?

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

>Solution :

A sensor is a perfect choice for this use case. I’d try setting the poke_interval and timeout to different smaller values than their default to make sure the sensor that Airflow is checking on the right intervals (by default, they are very long).

One thing to watch out for is if your sensors run on longer intervals than your schedule interval. For example, if your DAG is scheduled to run hourly, but your sensors’s timeout is set for 2 hours, your next DAG run may not run as expected (depending on your concurrency and max_active_dag settings), or it may run unexpectedly because the sensor detects an older file. Ideally, you can append a timestamp in the name of your file to avoid this.

Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading