
Airflow RetryDAG를 한번 등록해놓으면 안정적으로 잘 작동할 것 같지만 생각보다 종종 에러가 발생한다. 에러의 경우에 따라 직접 작업이 필요한 경우도 있지만 일시적인 네트워크 이슈 또는 외부 API 서버 등 재수행이 필요한 경우가 존재하기 때문에 대부분의 경우에 retry를 설정해주고 있다.Coderetry 설정은 보통 아래와 같이 설정한다. 설정한 retry_delay만큼 기다렸다가 재시도를 수행한다.from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta# Default arguments for the DAGdefault_args = { 'own..

Airflow Sensor Airflow에서는 특정 상황이 발생할때까지 대기하는 Sensor Operator를 제공한다. 시간이 기준이 될 수도 있고 파일이나 외부 이벤트가 기준이 될수도 있다. Sensor를 사용하면 이러한 상황이 발생할 때까지 기다렸다가 downstream task들이 진행되게 할 수 있다. Sensor의 유형에는 여러가지가 있지만 자주 사용하고 있는 ExternalTaskSensor에 대해서 정리해보려고 한다. ExternalTaskSensor Airflow를 운영하다보면 DAG끼리 의존성이 생기는 경우가 있다. 예를 들면 A 테이블이 생성되어야 A 테이블을 참조하는 B 테이블을 생성할 수 있다거나, A 테이블을 Export하는 작업이 수행되어야 할때이다. 이런 경우 보통 Exter..

Airflow를 사용할때 XCom으로 task간 데이터를 주고받을때 사용할때가 많은데, task간 뿐만 아니라 이전 Dag Run에서 push한 데이터도 가져올 수 있다. Airflow에서 push한 XCom은 UI에서 바로 확인이 가능하다.(Admin > XComs) UI에서 확인한 것처럼 value와 함께 저장되는 다른 메타데이터들이 있는데 이 값들을 활용하면 이전 dag에서 push한 xcom value도 가져올 수 있다. 이전 Dag에서 XCom 가져오기(Airlfow 2.4) 나는 XCom.get_one() 함수를 사용하였고, 여러 값을 가져오려면 XCom.get_many()를 사용할 수 있다. 함수에 executioin_date, dag_id, task_id를 넘겨주어서 기준에 맞는 XCom을..