티스토리 뷰
Airflow를 사용할때 XCom으로 task간 데이터를 주고받을때 사용할때가 많은데, task간 뿐만 아니라 이전 Dag Run에서 push한 데이터도 가져올 수 있다.
Airflow에서 push한 XCom은 UI에서 바로 확인이 가능하다.(Admin
> XComs
)
UI에서 확인한 것처럼 value와 함께 저장되는 다른 메타데이터들이 있는데 이 값들을 활용하면 이전 dag에서 push한 xcom value도 가져올 수 있다.
이전 Dag에서 XCom 가져오기(Airlfow 2.4)
나는 XCom.get_one()
함수를 사용하였고, 여러 값을 가져오려면 XCom.get_many()
를 사용할 수 있다.
함수에 executioin_date, dag_id, task_id를 넘겨주어서 기준에 맞는 XCom을 가져올 수 있다.
key는 따로 지정하지 않으면 기본은 return_value
로 설정된다. return_value는 함수에서 return하는 값이기 때문에 따로 push하는 작업을 정의할 필요가 없다. 만약 따로 push를 해주었다면 key 파라미터도 추가해주면 된다.
또한 include_prior_dates=True
(default는 False)로 설정하면, execution_date에 해당하는 XCom이 검색이 되지 않더라도 가장 최근의 XCom을 리턴해준다.
Example Code
from airflow.models import DAG
from datetime import datetime
from airflow.operators.python import PythonOperator
default_args = {
'start_date': datetime(2023, 8, 5)
}
with DAG('dag_xcom_test_dag', schedule_interval='@daily', default_args=default_args) as dag:
def push_xcom(**kwrags):
run_id = kwrags['run_id']
return f'pushed from dag, run id: {run_id}'
def get_prev_xcom(**kwargs):
from airflow.models import XCom
execution_date = kwargs['prev_execution_date']
if not execution_date:
return
print(f'exec: {execution_date}')
prev_xcom = XCom.get_one(execution_date=execution_date, dag_id='dag_xcom_test_dag', task_id='push_xcom', include_prior_dates=True)
print(f"prev_xcom: {prev_xcom}")
return prev_xcom
t1 = PythonOperator(task_id="push_xcom", python_callable=push_xcom)
t2 = PythonOperator(task_id="get_prev_xcom", python_callable=get_prev_xcom)
t1 >> t2
execution_date는 필수적으로 설정을 해줘야 한다. context를 사용하여 이전 dag의 execution_date를 가져올 수 있다.
이전 dag에 대한 xcom을 잘 가져오는지 확인하기 위해 return할때 run_id를 포함시켰다.
Dag 실행
처음 run에 대해서는 이전에 dag가 실행된 적이 없으므로 가져오는 xcom은 없고 pushed from dag, run id: scheduled__2023-08-05T00:00:00+00:00
를 return하여 xcom이 저장되었다.
이전 dag에서 return한 value에 대해서(run_id를 보고 확인) 값을 잘 가져온 것을 확인할 수 있다. pushed from dag, run id: scheduled__2023-08-06T00:00:00+00:00
을 리턴했고 세번째 run에서도 6일자의 run을 잘 가져오는지 확인해보자.
마찬가지로 이전 dag에 대한 값으로 잘 가져온 것을 확인할 수 있다.
이상으로 context와 xcom을 조합하여 이전 dag run에 대해 xcom을 가져올 수 있다는 것을 확인해보았다. context에서 어떤 값을 활용할 수 있는지 잘 알고 있으면 도움이 될때가 많은데 context에 대해서도 잘 알아두면 좋을 것 같다.
[ 참고 ]
'Python' 카테고리의 다른 글
[Airflow] Airflow Retry 설정 알아보기 (3) | 2024.10.13 |
---|---|
[Airflow] ExternalTaskSensor 알아보기 (0) | 2024.03.16 |
GitHub REST API로 Pull Request 자동화해보기 (8) | 2023.12.09 |