티스토리 뷰

Airflow를 사용할때 XCom으로 task간 데이터를 주고받을때 사용할때가 많은데, task간 뿐만 아니라 이전 Dag Run에서 push한 데이터도 가져올 수 있다.

 

Airflow에서 push한 XCom은 UI에서 바로 확인이 가능하다.(Admin > XComs)

 

UI에서 확인한 것처럼 value와 함께 저장되는 다른 메타데이터들이 있는데 이 값들을 활용하면 이전 dag에서 push한 xcom value도 가져올 수 있다.

이전 Dag에서 XCom 가져오기(Airlfow 2.4)

 

나는 XCom.get_one() 함수를 사용하였고, 여러 값을 가져오려면 XCom.get_many()를 사용할 수 있다.

함수에 executioin_date, dag_id, task_id를 넘겨주어서 기준에 맞는 XCom을 가져올 수 있다.

 

key는 따로 지정하지 않으면 기본은 return_value로 설정된다. return_value는 함수에서 return하는 값이기 때문에 따로 push하는 작업을 정의할 필요가 없다. 만약 따로 push를 해주었다면 key 파라미터도 추가해주면 된다.

 

또한 include_prior_dates=True(default는 False)로 설정하면, execution_date에 해당하는 XCom이 검색이 되지 않더라도 가장 최근의 XCom을 리턴해준다.

Example Code

from airflow.models import DAG
from datetime import datetime
from airflow.operators.python import PythonOperator

default_args = {
    'start_date': datetime(2023, 8, 5)
}

with DAG('dag_xcom_test_dag', schedule_interval='@daily', default_args=default_args) as dag:
    def push_xcom(**kwrags):
        run_id = kwrags['run_id']

        return f'pushed from dag, run id: {run_id}'

    def get_prev_xcom(**kwargs):
        from airflow.models import XCom

        execution_date = kwargs['prev_execution_date']

        if not execution_date:
            return

        print(f'exec: {execution_date}')
        prev_xcom = XCom.get_one(execution_date=execution_date, dag_id='dag_xcom_test_dag', task_id='push_xcom', include_prior_dates=True)
        print(f"prev_xcom: {prev_xcom}")
        return prev_xcom

    t1 = PythonOperator(task_id="push_xcom", python_callable=push_xcom)
    t2 = PythonOperator(task_id="get_prev_xcom", python_callable=get_prev_xcom)

    t1 >> t2

execution_date는 필수적으로 설정을 해줘야 한다. context를 사용하여 이전 dag의 execution_date를 가져올 수 있다.

이전 dag에 대한 xcom을 잘 가져오는지 확인하기 위해 return할때 run_id를 포함시켰다.

 

Dag 실행

첫번째 dag run

 

처음 run에 대해서는 이전에 dag가 실행된 적이 없으므로 가져오는 xcom은 없고 pushed from dag, run id: scheduled__2023-08-05T00:00:00+00:00를 return하여 xcom이 저장되었다.

 

두번째 dag run

 

이전 dag에서 return한 value에 대해서(run_id를 보고 확인) 값을 잘 가져온 것을 확인할 수 있다. pushed from dag, run id: scheduled__2023-08-06T00:00:00+00:00을 리턴했고 세번째 run에서도 6일자의 run을 잘 가져오는지 확인해보자.

 

세번째 dag run

 

마찬가지로 이전 dag에 대한 값으로 잘 가져온 것을 확인할 수 있다.

 

이상으로 context와 xcom을 조합하여 이전 dag run에 대해 xcom을 가져올 수 있다는 것을 확인해보았다. context에서 어떤 값을 활용할 수 있는지 잘 알고 있으면 도움이 될때가 많은데 context에 대해서도 잘 알아두면 좋을 것 같다.

 


[ 참고 ]

https://medium.com/analytics-vidhya/airflow-xcom-pull-and-push-under-the-hood-multiple-value-from-different-dags-and-etc-2b9b3a942299

댓글