Goal
- I use the Docker 2.4.1 version of Airflow
- I use my external python virtual environment for each task
- I have a pandas data frame that I want to pass on from task to task.
- I my previosue question How to use a python list as global variable python list with in @task.external_python? this was doen succesfully via a python list but when I switch to a pandas data frame the process crashes
- The first task succesfully runs
CODE
from airflow.decorators import dag, task
from pendulum import datetime
from datetime import timedelta
my_default_args = {
'owner': 'Anonymus',
# 'email': ['random@random.com'],
# 'email_on_failure': True,
# 'email_on_retry': False, #only allow if it was allowed in the scheduler
# 'retries': 1, #only allow if it was allowed in the scheduler
# 'retry_delay': timedelta(minutes=1),
# 'depends_on_past': False,
}
@dag(
dag_id='test_global_variable',
schedule='12 11 * * *',
start_date=datetime(2023,2,1,tz="UTC"),
catchup=False,
default_args=my_default_args,
tags=['sample_tag', 'sample_tag2'],
)
def write_var():
@task.external_python(task_id="task_1", python='/opt/airflow/v1/bin/python3')
def add_to_list(my_list):
print(my_list)
import pandas as pd
df = pd.DataFrame(my_list)
return df
@task.external_python(task_id="task_2", python='/opt/airflow/v1/bin/python3')
def add_to_list_2(df):
print(df)
df = df.append([19])
return df
add_to_list_2(add_to_list([23, 5, 8]))
write_var()
ERROR LOG of 2nd task
*** Reading local file: /opt/airflow/logs/dag_id=test_global_variable/run_id=manual__2023-02-07T14:06:17.432734+00:00/task_id=task_2/attempt=1.log
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_global_variable.task_2 manual__2023-02-07T14:06:17.432734+00:00 [queued]>
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_global_variable.task_2 manual__2023-02-07T14:06:17.432734+00:00 [queued]>
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1362} INFO -
--------------------------------------------------------------------------------
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1363} INFO - Starting attempt 1 of 1
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1364} INFO -
--------------------------------------------------------------------------------
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1383} INFO - Executing <Task(_PythonExternalDecoratedOperator): task_2> on 2023-02-07 14:06:17.432734+00:00
[2023-02-07, 14:06:22 GMT] {standard_task_runner.py:54} INFO - Started process 324831 to run task
[2023-02-07, 14:06:22 GMT] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_global_variable', 'task_2', 'manual__2023-02-07T14:06:17.432734+00:00', '--job-id', '74080', '--raw', '--subdir', 'DAGS_FOLDER/test_global_variable.py', '--cfg-path', '/tmp/tmpbm8tkk1i']
[2023-02-07, 14:06:22 GMT] {standard_task_runner.py:83} INFO - Job 74080: Subtask task_2
[2023-02-07, 14:06:22 GMT] {dagbag.py:525} INFO - Filling up the DagBag from /opt/airflow/dags/test_global_variable.py
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {task_command.py:384} INFO - Running <TaskInstance: test_global_variable.task_2 manual__2023-02-07T14:06:17.432734+00:00 [running]> on host 4851b30aa5cf
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1590} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=Anonymus
AIRFLOW_CTX_DAG_ID=test_global_variable
AIRFLOW_CTX_TASK_ID=task_2
AIRFLOW_CTX_EXECUTION_DATE=2023-02-07T14:06:17.432734+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2023-02-07T14:06:17.432734+00:00
[2023-02-07, 14:06:23 GMT] {process_utils.py:179} INFO - Executing cmd: /opt/airflow/venv1/bin/python3 /tmp/tmddiox599m/script.py /tmp/tmddiox599m/script.in /tmp/tmddiox599m/script.out /tmp/tmddiox599m/string_args.txt
[2023-02-07, 14:06:23 GMT] {process_utils.py:183} INFO - Output:
[2023-02-07, 14:06:24 GMT] {process_utils.py:187} INFO - Traceback (most recent call last):
[2023-02-07, 14:06:24 GMT] {process_utils.py:187} INFO - File "/tmp/tmddiox599m/script.py", line 17, in <module>
[2023-02-07, 14:06:24 GMT] {process_utils.py:187} INFO - arg_dict = pickle.load(file)
[2023-02-07, 14:06:24 GMT] {process_utils.py:187} INFO - AttributeError: Can't get attribute '_unpickle_block' on <module 'pandas._libs.internals' from '/opt/airflow/venv1/lib/python3.8/site-packages/pandas/_libs/internals.cpython-38-x86_64-linux-gnu.so'>
[2023-02-07, 14:06:24 GMT] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/decorators/base.py", line 188, in execute
return_value = super().execute(context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 370, in execute
return super().execute(context=serializable_context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 175, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 678, in execute_callable
return self._execute_python_callable_in_subprocess(python_path, tmp_path)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 426, in _execute_python_callable_in_subprocess
execute_in_subprocess(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/process_utils.py", line 168, in execute_in_subprocess
execute_in_subprocess_with_kwargs(cmd, cwd=cwd)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/process_utils.py", line 191, in execute_in_subprocess_with_kwargs
raise subprocess.CalledProcessError(exit_code, cmd)
subprocess.CalledProcessError: Command '['/opt/airflow/venv1/bin/python3', '/tmp/tmddiox599m/script.py', '/tmp/tmddiox599m/script.in', '/tmp/tmddiox599m/script.out', '/tmp/tmddiox599m/string_args.txt']' returned non-zero exit status 1.
[2023-02-07, 14:06:24 GMT] {taskinstance.py:1401} INFO - Marking task as FAILED. dag_id=test_global_variable, task_id=task_2, execution_date=20230207T140617, start_date=20230207T140622, end_date=20230207T140624
[2023-02-07, 14:06:24 GMT] {standard_task_runner.py:102} ERROR - Failed to execute job 74080 for task task_2 (Command '['/opt/airflow/venv1/bin/python3', '/tmp/tmddiox599m/script.py', '/tmp/tmddiox599m/script.in', '/tmp/tmddiox599m/script.out', '/tmp/tmddiox599m/string_args.txt']' returned non-zero exit status 1.; 324831)
[2023-02-07, 14:06:24 GMT] {local_task_job.py:164} INFO - Task exited with return code 1
[2023-02-07, 14:06:24 GMT] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check
>Solution :
Try to use to_dict to serialize your dataframe as dict and recreate it from the other task:
@task.external_python(task_id="task_1", python='/opt/airflow/v1/bin/python3')
def add_to_list(my_list):
print(my_list)
import pandas as pd
df = pd.DataFrame(my_list)
return df.to_dict('split') # <- HERE
@task.external_python(task_id="task_2", python='/opt/airflow/v1/bin/python3')
def add_to_list_2(df):
df = pd.DataFrame(**df) # <- HERE
print(df)
df = df.append([19])
return df
From the Airflow documentation of ExternalPythonOperator (and VirtualPythonOperator) about serialization.
Your python callable has to be serializable. There are a number of python objects that are not serializable using standard
picklelibrary. You can mitigate some of those limitations by usingdilllibrary but even that library does not solve all the serialization limitations.