티스토리 뷰

Airflow Sensor

Airflow에서는 특정 상황이 발생할때까지 대기하는 Sensor Operator를 제공한다. 시간이 기준이 될 수도 있고 파일이나 외부 이벤트가 기준이 될수도 있다. Sensor를 사용하면 이러한 상황이 발생할 때까지 기다렸다가 downstream task들이 진행되게 할 수 있다.

 

Sensor의 유형에는 여러가지가 있지만 자주 사용하고 있는 ExternalTaskSensor에 대해서 정리해보려고 한다.

ExternalTaskSensor

Airflow를 운영하다보면 DAG끼리 의존성이 생기는 경우가 있다. 예를 들면 A 테이블이 생성되어야 A 테이블을 참조하는 B 테이블을 생성할 수 있다거나, A 테이블을 Export하는 작업이 수행되어야 할때이다. 이런 경우 보통 ExternalTaskSensor를 사용하여 이전 작업이 성공했을때만 수행될 수 있도록 하고 있다.

Sample Code

import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta

default_args = {
    'owner': 'hyunlang',
    'depends_on_past': False,
    'start_date': datetime(2024, 3, 15, tzinfo=pendulum.timezone("Asia/Seoul")),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Define the DAG
with DAG(
    'example_external_task_sensor',
    default_args=default_args,
    description='A DAG to demonstrate the usage of ExternalTaskSensor',
    schedule_interval='0 8 * * *',
    catchup=False
):

    # Define the task to wait for
    wait_for_task = ExternalTaskSensor(
        task_id='wait_for_task',
        external_dag_id='upstream_dag',  # ID of the other DAG
        allowed_states=['success', 'skipped'],
        timeout=1800,  # 30분, Timeout in seconds, set to None for no timeout
        mode='poke',   # Mode can be 'poke' or 'reschedule'
        poke_interval=60  # Time in seconds to wait between checks
    )

    # Define some other task
    do_something = EmptyOperator(task_id='do_something')

# Set up task dependencies
wait_for_task >> do_something

 

간단하게 ExternalTaskSensor를 작성해 보았다.

  • external_dag_id: 대기할 upstream dag id를 적는다.
  • allowed_states: 대기해야하는 upstream dag의 상태를 리스트 형태로 지정한다. default 값은 ['success'] 이다.
  • timeout: 언제까지 대기할지 초 단위로 지정한다.
  • mode: poke 모드로 수행할 것인지 reschedule 모드로 수행할것인지 지정한다. poke가 default 모드이기 때문에 생략해도 된다.
  • poke_interval: 몇 분 간격으로 상태를 확인할지를 초단위 또는 timedelta 형식으로 지정한다.

위의 형태로 ExternalTaskSensor를 작성하게 되면 주의해야하는 점은 해당 DAG과 upstream DAG의 스케줄 시간이 동일해야한다는 점이다. 왜냐하면 DAG과 동일한 logical date에 해당하는 upstream DAG의 수행을 확인하기 때문이다. 따라서 스케줄 된 시간이 다른 경우 또는 manual 수행인 경우 Sensor가 제대로 작동하지 않는다.

execution_delta, execution_date_fn

만약 스케줄을 동일하게 설정했을 때 대기하는 시간이 너무 길어지는 경우 execution_delta 또는 execution_date_fn을 사용하여 DAG의 스케줄 시간을 다르게 설정할 수 있다. 단 두개를 동시에 지정할 수는 없으므로 주의하자.

 

execution_delta는 어느 시간에 수행된 upstream DAG을 확인할지를 위한 것으로, 현재 DAG과 확인해야할 DAG의 시간 차를 지정해주면 된다. positive 형으로 작성해야하며 execution_delta=timedelta(hours=1)와 같이 사용할 수 있다. 이 의미는 1시간 이전에 수행된 upstream DAG의 상태를 확인한다는 의미이다.

 

 

테스트를 해보면 8시에 스케줄링 된 DAG이 7시에 수행이 완료된 upstream DAG을 제대로 확인했다.

동일한 기능을 execution_date_fn으로 대체 가능하고, 함수 형식으로 좀 더 유연하게 사용이 가능하다. 현재 수행되는 DAG의 logical date를 첫번째 argument로 전달받고 컨텍스트에서 사용 가능한 keyword arguments를 전달 받아 원하는 logical date를 반환할 수 있다.

    def prev_execution_dt(dt, **kwargs):
        return dt - timedelta(hours=1)


    # Define the task to wait for
    wait_for_task = ExternalTaskSensor(
        task_id='wait_for_task',
        external_dag_id='upstream_dag',  # ID of the other DAG
        allowed_states=['success'],
        execution_date_fn=prev_execution_dt,
        timeout=1800,  # 30분, Timeout in seconds, set to None for no timeout
        mode='poke',   # Mode can be 'poke' or 'reschedule'
        poke_interval=60  # Time in seconds to wait between checks
    )

 

간단히 시간 차만 지정하면 된다면 lambda를 사용하여 lambda x: x - timedelta(hours=1)처럼 간단히 설정할수도 있다.

마무리

ExternalTaskSensor를 사용하면 의존성이 존재하는 DAG끼리의 수행이 더 수월해진다. 하지만 한계점도 존재한다. 만약 선행 DAG에서 장애가 발생해서 재수행을 하는 경우에는 ExternalTaskSensor가 적용된 후행 DAG도 수동으로 재수행이 필요하기 때문이다.

 

TriggerDagRunOperator를 사용하면 선행 DAG을 수행하면서 후행 DAG를 trigger하기 때문에 재수행이 자주 필요한 경우는 TriggerDagRunOperator를 고려해보는 것도 좋을 것 같다.

 


댓글