How to run Airflow S3 sensor exactly once?

Advertisements

I want to continue the DAG only if a csv file exists in S3, otherwise it should just end.
The DAG itself is being scheduled hourly.

with DAG(dag_id="my_dag",
         start_date=datetime(2023, 1, 1),
         schedule_interval='@hourly',
         catchup=False
         ) as dag:
    check_for_new_csv = S3KeySensor(
        task_id='check_for_new_csv',
        bucket_name='bucket-data',
        bucket_key='*.csv',
        wildcard_match=True,
        soft_fail=True,
        retries=1
    )
        start_instance = EC2StartInstanceOperator(
            task_id="start_ec2_instance_task",
            instance_id=INSTANCE_ID,
            region_name=REGION
        )

    check_for_new_csv >> start_instance

But the sensor seems to run forever – in the log I can see it keeps on running:

[2023-01-10, 15:02:06 UTC] {s3.py:98} INFO - Poking for key : s3://bucket-data/*.csv    
[2023-01-10, 15:03:08 UTC] {s3.py:98} INFO - Poking for key : s3://bucket-data/*.csv

Maybe the sensor in not the best choice for such logic?

>Solution :

A sensor is a perfect choice for this use case. I’d try setting the poke_interval and timeout to different smaller values than their default to make sure the sensor that Airflow is checking on the right intervals (by default, they are very long).

One thing to watch out for is if your sensors run on longer intervals than your schedule interval. For example, if your DAG is scheduled to run hourly, but your sensors’s timeout is set for 2 hours, your next DAG run may not run as expected (depending on your concurrency and max_active_dag settings), or it may run unexpectedly because the sensor detects an older file. Ideally, you can append a timestamp in the name of your file to avoid this.

Leave a ReplyCancel reply