I am using Apache-Airflow 2.2.3 and I know we can create connections via admin/connections. But I trying for a way to create a connection using dynamic DB server details.
My DB host, user, password details are coming through the DAGRun input config and I need to read and write the data to DB.
>Solution :
You can read connection details from the DAGRun config:
# Say we gave input {"username": "foo", "password": "bar"}
from airflow.models.connection import Connection
def mytask(**context):
username = context["dag_run"].conf["username"]
password = context["dag_run"].conf["password"]
connection = Connection(login=username, password=password)
However, all operators (that require a connection) in Airflow take an argument conn_id that takes a string identifying the connection in the metastore/env var/secrets backend. At the moment it is not possible to provide a Connection object.
Therefore, if you implement your own Python functions (and use the PythonOperator or @task decorator) or implement your own operators, you should be able to create a Connection object and perform whatever logic using that. But using any other existing operators in Airflow will not be possible.