
Airflow RetryDAG를 한번 등록해놓으면 안정적으로 잘 작동할 것 같지만 생각보다 종종 에러가 발생한다. 에러의 경우에 따라 직접 작업이 필요한 경우도 있지만 일시적인 네트워크 이슈 또는 외부 API 서버 등 재수행이 필요한 경우가 존재하기 때문에 대부분의 경우에 retry를 설정해주고 있다.Coderetry 설정은 보통 아래와 같이 설정한다. 설정한 retry_delay만큼 기다렸다가 재시도를 수행한다.from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta# Default arguments for the DAGdefault_args = { 'own..

Airflow Sensor Airflow에서는 특정 상황이 발생할때까지 대기하는 Sensor Operator를 제공한다. 시간이 기준이 될 수도 있고 파일이나 외부 이벤트가 기준이 될수도 있다. Sensor를 사용하면 이러한 상황이 발생할 때까지 기다렸다가 downstream task들이 진행되게 할 수 있다. Sensor의 유형에는 여러가지가 있지만 자주 사용하고 있는 ExternalTaskSensor에 대해서 정리해보려고 한다. ExternalTaskSensor Airflow를 운영하다보면 DAG끼리 의존성이 생기는 경우가 있다. 예를 들면 A 테이블이 생성되어야 A 테이블을 참조하는 B 테이블을 생성할 수 있다거나, A 테이블을 Export하는 작업이 수행되어야 할때이다. 이런 경우 보통 Exter..

업무를 하다보면 반복되는 일들이 발생하는데, 이러한 부분들은 자동화가 필요하다. 업무의 효율 측면에서도 그렇지만 이따금씩 발생하는 휴먼에러(단순한 일일 수록 집중력이 떨어져서 휴먼 에러도 자주 발생 🥲) 때문에라도 자동화로 전환하는 것이 좋은 방법이라고 생각한다. 최근에 자동화 업무를 하면서, 제공되는 API가 생각보다 굉장히 다양하다는 것을 알게되었다. 가장 많이 사용했던 것은 slack api이었긴 하지만 이번에 github api도 사용하면서 익숙하지 않은 용어들이 많이 보여서 정리를 해보는게 좋겠다는 생각이 들었다. 우리 팀 업무의 경우에는 각 MS DB에서 발생한 변경 사항에 따라서 대응할 작업들이 있는데, 반복적인 작업이라 이 부분을 최근에 자동화 하려는 작업을 하고 있다. 변경 사항에 따라서..

Airflow를 사용할때 XCom으로 task간 데이터를 주고받을때 사용할때가 많은데, task간 뿐만 아니라 이전 Dag Run에서 push한 데이터도 가져올 수 있다. Airflow에서 push한 XCom은 UI에서 바로 확인이 가능하다.(Admin > XComs) UI에서 확인한 것처럼 value와 함께 저장되는 다른 메타데이터들이 있는데 이 값들을 활용하면 이전 dag에서 push한 xcom value도 가져올 수 있다. 이전 Dag에서 XCom 가져오기(Airlfow 2.4) 나는 XCom.get_one() 함수를 사용하였고, 여러 값을 가져오려면 XCom.get_many()를 사용할 수 있다. 함수에 executioin_date, dag_id, task_id를 넘겨주어서 기준에 맞는 XCom을..