티스토리 뷰
Airflow Retry
DAG를 한번 등록해놓으면 안정적으로 잘 작동할 것 같지만 생각보다 종종 에러가 발생한다. 에러의 경우에 따라 직접 작업이 필요한 경우도 있지만 일시적인 네트워크 이슈 또는 외부 API 서버 등 재수행이 필요한 경우가 존재하기 때문에 대부분의 경우에 retry를 설정해주고 있다.
Code
retry 설정은 보통 아래와 같이 설정한다. 설정한 retry_delay만큼 기다렸다가 재시도를 수행한다.
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
# Default arguments for the DAG
default_args = {
'owner': 'lang',
'start_date': datetime(2024, 10, 10),
'email_on_failure': False,
'email_on_retry': False,
'retries': 3, # Retry 횟수
'retry_delay': timedelta(minutes=5), # 재시도간 시간 간격
}
# Define the DAG
with DAG(
'bash_retry_dag_example',
default_args=default_args,
schedule_interval=timedelta(days=1), # Run once per day
catchup=False,
) as dag:
task1 = BashOperator(
task_id='print_hello_world',
bash_command='echo "Hello, World!"',
)
task1
위의 경우와 다르게 retry 간격을 점점 늘려서 시도를 해야하는 경우도 존재한다.(외부 API를 호출하고 있어서 해당 서버가 언제 안정화될지 모르는 경우 등)
이런 경우에는 2개의 파라미터를 추가하면 된다.
retry_exponential_backoff
: 재시도시 exponential backoff algorithm을 사용하여 재시도 사이에 점진적으로 더 긴 대기 시간을 설정한다. 해당 알고리즘을 사용하면 이전 재시도 간격에서 2배씩 늘어난다.max_retry_delay
: 재시도간 최대 지연 간격을 설정한다. 재시도 간격이 계속 늘어나도 해당 설정 값보다 늘어날 수 없다.
위의 파라미터를 적용한다면 default_args는 이렇게 변경할 수 있다.
default_args = {
'owner': 'lang',
'start_date': datetime(2024, 10, 10),
'email_on_failure': False,
'email_on_retry': False,
'retries': 4,
'retry_exponential_backoff': True,
'retry_delay': timedelta(minutes=5),
'max_retry_delay': timedelta(minutes=30)
}
위의 설정으로는 아래와 같이 재수행될 것이다.
- 첫번째 재시도: 첫번째 시도 완료 + 5분뒤
- 두번째 재시도: 첫번째 재시도 완료 + 10분뒤
- 세번째 재시도: 두번째 재시도 완료 + 20분뒤
- 네번째 재시도: 세번째 재시도 완료 + 30분뒤
재시도 간격은 이전 재시도 간격 * 2
로 수행되지만 마지막 재시도에서 40분(20분 * 2)이 아닌 30분으로 설정된 이유는 max_retry_delay
를 30분으로 설정했기 때문에 30분을 넘을 수 없기 때문이다.
How?
설정한 retry를 통해 task가 어떻게 재수행되는지 소스코드에서 확인한 재시도 로직을 다이어그램으로 간단히 표현하면 아래와 같다.
이러한 재시도 로직은 task instance에서 관리되고 있다. task 수행시 에러가 발생하면 Exception이 발생하고 handle_failure
메소드가 호출된다. (Airflow 2.6 기준)
task를 수행할때 에러가 발생하면 exception을 발생[1]시키고 그에 따른 예외 처리를 수행[2]한다.
class TaskInstance(Base, LoggingMixin):
@provide_session
@Sentry.enrich_errors
def _run_raw_task(
self,
mark_success: bool = False,
test_mode: bool = False,
job_id: str | None = None,
pool: str | None = None,
session: Session = NEW_SESSION,
) -> TaskReturnCode | None:
try:
if not mark_success:
# [1] task를 수행하는 부분, 실패시 Exception이 발생한다.
self._execute_task_with_callbacks(context, test_mode, session=session)
...
except AirflowException as e:
...
if self.state in State.finished:
...
else:
# [2] state가 finished에 포함(SUCCESS, FAILED, SKIPPED, UPSTREAM_FAILED, REMOVED)되는 경우가 아닐때 handle_failure를 호출한다. 상태를 바꾼적이 없으므로 현재는 RUNNING 상태일 것이다.
self.handle_failure(e, test_mode, context, session=session)
session.commit()
raise
실패한 작업에 대해서 UP_FOR_RETRY 상태로 변경[3]한다.
@provide_session
def handle_failure(
self,
error: None | str | Exception | KeyboardInterrupt,
test_mode: bool | None = None,
context: Context | None = None,
force_fail: bool = False,
session: Session = NEW_SESSION,
) -> None:
"""Handle Failure for the TaskInstance."""
...
task: BaseOperator | None = None
if self.state == TaskInstanceState.QUEUED:
self._try_number += 1
# [3]state를 UP_FOR_RETRY로 변경한다.
self.state = TaskInstanceState.UP_FOR_RETRY
email_for_state = operator.attrgetter("email_on_retry")
callbacks = task.on_retry_callback if task else None
callback_type = "on_retry"
...
UP_FOR_RETRY
상태인 task는 앞서 언급된 지수적 백오프(exponential backoff) 알고리즘을 적용하여 계산된 다음 재시도 시간이 만족된 경우에 수행되게 된다.
지수적 백오프 알고리즘에 대해서 짧게 설명하자면 재시도 횟수에 따라 지연 시간이 2의 거듭제곱으로 증가하는 것이다.
- 2번째 재시도: delay * (2 ** 0) → delay * 1
- 3번째 재시도: delay * (2 ** 1) → delay * 2
- 4번째 재시도: delay * (2 ** 2) → delay * 4
- 5번째 재시도: delay * (2 ** 3) → delay * 8
지연 시간을 계산하는 코드는 아래와 같다.
def next_retry_datetime(self):
"""
Get datetime of the next retry if the task instance fails.
For exponential backoff, retry_delay is used as base and will be converted to seconds.
"""
from airflow.models.abstractoperator import MAX_RETRY_DELAY
delay = self.task.retry_delay
if self.task.retry_exponential_backoff:
min_backoff = math.ceil(delay.total_seconds() * (2 ** (self.try_number - 2)))
ti_hash = int(
hashlib.sha1(
f"{self.dag_id}#{self.task_id}#{self.execution_date}#{self.try_number}".encode()
).hexdigest(),
16,
)
modded_hash = min_backoff + ti_hash % min_backoff
delay_backoff_in_seconds = min(modded_hash, MAX_RETRY_DELAY)
delay = timedelta(seconds=delay_backoff_in_seconds)
if self.task.max_retry_delay:
delay = min(self.task.max_retry_delay, delay)
return self.end_date + delay
retry_exponential_backoff
를 설정해준 경우에는 delay를 계산 후 delay가 설정한 MAX_RETRY_DELAY
보다 크면 MAX_RETRY_DELAY
가 delay로 설정되는 것을 확인할 수 있다.
일반적으로 retry를 설정한 경우에는 그냥 retry_delay 시간 만큼만 지연된다.
마무리
Airflow에서 DAG을 운영하다 보면 일시적인 네트워크 문제나 외부 API의 불안정성으로 인해 작업이 실패하는 경우가 종종 발생하므로 retry 설정을 적절히 구성함으로써 이러한 문제를 처리할 수 있다.
기본적으로는 고정된 시간만큼 지연하여 재시도하는 방법을 사용할 수 있고, 지수적 백오프(exponential backoff)를 적용해 재시도 간격을 점진적으로 늘려가는 방식도 사용할 수 있다.
retry 설정을 통해 사람이 개입하여 재수행할 필요 없이 효율적으로 작업이 재시도될 수 있도록 관리할 수 있다. 각 DAG의 특성과 상황에 맞춰 retry 설정을 적절하게 구성할 수 있다.
일시적인 문제라면 일정한 간격으로 재시도하는 기본 retry_delay를 설정해볼 수 있고,
외부 시스템의 상태가 안정화되기까지 시간이 필요할 경우, 지수적 백오프를 통해 재시도 간격을 점차 늘리는 방법을 사용하는 것이 좋다.
'Python' 카테고리의 다른 글
[Airflow] ExternalTaskSensor 알아보기 (0) | 2024.03.16 |
---|---|
GitHub REST API로 Pull Request 자동화해보기 (8) | 2023.12.09 |
[Airflow] 이전 Dag에서 저장한 XCom 가져오기 (0) | 2023.08.08 |