티스토리 뷰
Airflow Sensor
Airflow에서는 특정 상황이 발생할때까지 대기하는 Sensor Operator를 제공한다. 시간이 기준이 될 수도 있고 파일이나 외부 이벤트가 기준이 될수도 있다. Sensor를 사용하면 이러한 상황이 발생할 때까지 기다렸다가 downstream task들이 진행되게 할 수 있다.
Sensor의 유형에는 여러가지가 있지만 자주 사용하고 있는 ExternalTaskSensor
에 대해서 정리해보려고 한다.
ExternalTaskSensor
Airflow를 운영하다보면 DAG끼리 의존성이 생기는 경우가 있다. 예를 들면 A 테이블
이 생성되어야 A 테이블
을 참조하는 B 테이블
을 생성할 수 있다거나, A 테이블
을 Export하는 작업이 수행되어야 할때이다. 이런 경우 보통 ExternalTaskSensor
를 사용하여 이전 작업이 성공했을때만 수행될 수 있도록 하고 있다.
Sample Code
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta
default_args = {
'owner': 'hyunlang',
'depends_on_past': False,
'start_date': datetime(2024, 3, 15, tzinfo=pendulum.timezone("Asia/Seoul")),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Define the DAG
with DAG(
'example_external_task_sensor',
default_args=default_args,
description='A DAG to demonstrate the usage of ExternalTaskSensor',
schedule_interval='0 8 * * *',
catchup=False
):
# Define the task to wait for
wait_for_task = ExternalTaskSensor(
task_id='wait_for_task',
external_dag_id='upstream_dag', # ID of the other DAG
allowed_states=['success', 'skipped'],
timeout=1800, # 30분, Timeout in seconds, set to None for no timeout
mode='poke', # Mode can be 'poke' or 'reschedule'
poke_interval=60 # Time in seconds to wait between checks
)
# Define some other task
do_something = EmptyOperator(task_id='do_something')
# Set up task dependencies
wait_for_task >> do_something
간단하게 ExternalTaskSensor를 작성해 보았다.
external_dag_id
: 대기할 upstream dag id를 적는다.allowed_states
: 대기해야하는 upstream dag의 상태를 리스트 형태로 지정한다. default 값은['success']
이다.timeout
: 언제까지 대기할지 초 단위로 지정한다.mode
: poke 모드로 수행할 것인지 reschedule 모드로 수행할것인지 지정한다. poke가 default 모드이기 때문에 생략해도 된다.poke_interval
: 몇 분 간격으로 상태를 확인할지를 초단위 또는 timedelta 형식으로 지정한다.
위의 형태로 ExternalTaskSensor
를 작성하게 되면 주의해야하는 점은 해당 DAG과 upstream DAG의 스케줄 시간이 동일해야한다는 점이다. 왜냐하면 DAG과 동일한 logical date에 해당하는 upstream DAG의 수행을 확인하기 때문이다. 따라서 스케줄 된 시간이 다른 경우 또는 manual 수행인 경우 Sensor가 제대로 작동하지 않는다.
execution_delta, execution_date_fn
만약 스케줄을 동일하게 설정했을 때 대기하는 시간이 너무 길어지는 경우 execution_delta
또는 execution_date_fn
을 사용하여 DAG의 스케줄 시간을 다르게 설정할 수 있다. 단 두개를 동시에 지정할 수는 없으므로 주의하자.
execution_delta
는 어느 시간에 수행된 upstream DAG을 확인할지를 위한 것으로, 현재 DAG과 확인해야할 DAG의 시간 차를 지정해주면 된다. positive 형으로 작성해야하며 execution_delta=timedelta(hours=1)
와 같이 사용할 수 있다. 이 의미는 1시간 이전에 수행된 upstream DAG의 상태를 확인한다는 의미이다.
테스트를 해보면 8시에 스케줄링 된 DAG이 7시에 수행이 완료된 upstream DAG을 제대로 확인했다.
동일한 기능을 execution_date_fn
으로 대체 가능하고, 함수 형식으로 좀 더 유연하게 사용이 가능하다. 현재 수행되는 DAG의 logical date를 첫번째 argument로 전달받고 컨텍스트에서 사용 가능한 keyword arguments를 전달 받아 원하는 logical date를 반환할 수 있다.
def prev_execution_dt(dt, **kwargs):
return dt - timedelta(hours=1)
# Define the task to wait for
wait_for_task = ExternalTaskSensor(
task_id='wait_for_task',
external_dag_id='upstream_dag', # ID of the other DAG
allowed_states=['success'],
execution_date_fn=prev_execution_dt,
timeout=1800, # 30분, Timeout in seconds, set to None for no timeout
mode='poke', # Mode can be 'poke' or 'reschedule'
poke_interval=60 # Time in seconds to wait between checks
)
간단히 시간 차만 지정하면 된다면 lambda를 사용하여 lambda x: x - timedelta(hours=1)
처럼 간단히 설정할수도 있다.
마무리
ExternalTaskSensor
를 사용하면 의존성이 존재하는 DAG끼리의 수행이 더 수월해진다. 하지만 한계점도 존재한다. 만약 선행 DAG에서 장애가 발생해서 재수행을 하는 경우에는 ExternalTaskSensor
가 적용된 후행 DAG도 수동으로 재수행이 필요하기 때문이다.
TriggerDagRunOperator
를 사용하면 선행 DAG을 수행하면서 후행 DAG를 trigger하기 때문에 재수행이 자주 필요한 경우는 TriggerDagRunOperator
를 고려해보는 것도 좋을 것 같다.
'Python' 카테고리의 다른 글
[Airflow] Airflow Retry 설정 알아보기 (3) | 2024.10.13 |
---|---|
GitHub REST API로 Pull Request 자동화해보기 (8) | 2023.12.09 |
[Airflow] 이전 Dag에서 저장한 XCom 가져오기 (0) | 2023.08.08 |