
Airflow RetryDAG를 한번 등록해놓으면 안정적으로 잘 작동할 것 같지만 생각보다 종종 에러가 발생한다. 에러의 경우에 따라 직접 작업이 필요한 경우도 있지만 일시적인 네트워크 이슈 또는 외부 API 서버 등 재수행이 필요한 경우가 존재하기 때문에 대부분의 경우에 retry를 설정해주고 있다.Coderetry 설정은 보통 아래와 같이 설정한다. 설정한 retry_delay만큼 기다렸다가 재시도를 수행한다.from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta# Default arguments for the DAGdefault_args = { 'own..

Airflow Sensor Airflow에서는 특정 상황이 발생할때까지 대기하는 Sensor Operator를 제공한다. 시간이 기준이 될 수도 있고 파일이나 외부 이벤트가 기준이 될수도 있다. Sensor를 사용하면 이러한 상황이 발생할 때까지 기다렸다가 downstream task들이 진행되게 할 수 있다. Sensor의 유형에는 여러가지가 있지만 자주 사용하고 있는 ExternalTaskSensor에 대해서 정리해보려고 한다. ExternalTaskSensor Airflow를 운영하다보면 DAG끼리 의존성이 생기는 경우가 있다. 예를 들면 A 테이블이 생성되어야 A 테이블을 참조하는 B 테이블을 생성할 수 있다거나, A 테이블을 Export하는 작업이 수행되어야 할때이다. 이런 경우 보통 Exter..

빅쿼리를 사용하게 되면 로컬 또는 GCS에 업로드된 CSV 파일을 테이블로 로드하는 경우가 있다. 최근에 이러한 작업을 많이 하게되어 자주 사용한 명령어들에 대해서 정리를 해보았다. gsutil gsutil은 명령줄에서 Cloud Storage에 액세스하는 데 사용할 수 있는 Python 애플리케이션이다. gsutil을 사용하면 콘솔에서 직접 조작하는 것보다 간단하고 다양한 기능들을 수행할 수 있다. gsutil cp 로컬의 파일을 전송하거나 gcs에 있는 파일을 다운로드한다. gsuitl cp [source] [dest] 로컬에 있는 파일을 gcs의 test-hyunlang이라는 버킷으로 업로드 하였다. 다운로드 할때는 위치를 바꿔서 [source]쪽에 gcs 경로를 위치하고 [dest] 쪽에 다운로드..

목표 쿠버네티스에서 external secret operator를 사용하여 계정 정보와 같은 시크릿 정보들을 GCP secret manager에 저장해두고 어플리케이션에서 접근하여 사용할 수 있도록 한다. Install external-secret external secret과 관련된 리소스를 사용하기 위해서는 external-secrets 헬름 차트를 먼저 설치해야한다. helm repo add external-secrets https://charts.external-secrets.io helm install external-secrets external-secrets/external-secrets \ -n external-secrets \ --create-namespace helm templates ..

업무를 하다보면 반복되는 일들이 발생하는데, 이러한 부분들은 자동화가 필요하다. 업무의 효율 측면에서도 그렇지만 이따금씩 발생하는 휴먼에러(단순한 일일 수록 집중력이 떨어져서 휴먼 에러도 자주 발생 🥲) 때문에라도 자동화로 전환하는 것이 좋은 방법이라고 생각한다. 최근에 자동화 업무를 하면서, 제공되는 API가 생각보다 굉장히 다양하다는 것을 알게되었다. 가장 많이 사용했던 것은 slack api이었긴 하지만 이번에 github api도 사용하면서 익숙하지 않은 용어들이 많이 보여서 정리를 해보는게 좋겠다는 생각이 들었다. 우리 팀 업무의 경우에는 각 MS DB에서 발생한 변경 사항에 따라서 대응할 작업들이 있는데, 반복적인 작업이라 이 부분을 최근에 자동화 하려는 작업을 하고 있다. 변경 사항에 따라서..
실전 카프카에 나오는 java 예제 코드로 실습해보기 참고한 source code: https://github.com/onlybooks/kafka2/tree/main/chapter3 자바 초보임에 주의....💦 환경 IDE: IntlliJ Java 11.0.18 Kakfa 2.7.0 인텔리제이에서 개발 환경 구성하기 일단 프로젝트를 새로 생성하고 Build System을 Maven으로 설정한다. Create를 누르면 pom.xml이 함께 생성된 것을 확인할 수 있는데, 해당 파일을 소스 코드의 pom.xml 파일로 변경해준다. 참고로, 업로드된 pom.xml 파일 그대로 실행하려니 java.lang.NoClassDefFoundError: org.slf4j.LoggerFactory 에러가 발생해서 depe..
헬름 공식 문서의 Quickstart를 보면서 실습해보기 Initialize a Helm Chart Repository Chart repository 추가하기 ❯ helm repo add bitnami https://charts.bitnami.com/bitnami "bitnami" has been added to your repositories 설치가능한 차트 목록 보기 ❯ helm search repo bitnami NAME CHART VERSION APP VERSION DESCRIPTION bitnami/airflow 14.3.6 2.6.3 Apache Airflow is a tool to express and execute... bitnami/apache 10.1.0 2.4.57 Apache HTT..
보호되어 있는 글입니다.
요즘 "핵심만 콕 쿠버네티스"라는 책으로 쿠버네티스 공부를 하고 있는데, nginx ingress controller 관련 실습 중에 업데이트가 안된 부분들이 있어서 기록 겸 남겨둔다. 실행 환경 - kubernetes v1.25.4(docker desktop) namespace 생성 ❯ kubectl create ns ctrl namespace/ctrl creatednginx ingress controller helm chart 설치 - helm chart helm repo add ingress-nginx https://kubernetes.github.io/ingress-nginx helm repo updatehelm install nginx-ingress ingress-nginx/ingress-ngi..

Airflow를 사용할때 XCom으로 task간 데이터를 주고받을때 사용할때가 많은데, task간 뿐만 아니라 이전 Dag Run에서 push한 데이터도 가져올 수 있다. Airflow에서 push한 XCom은 UI에서 바로 확인이 가능하다.(Admin > XComs) UI에서 확인한 것처럼 value와 함께 저장되는 다른 메타데이터들이 있는데 이 값들을 활용하면 이전 dag에서 push한 xcom value도 가져올 수 있다. 이전 Dag에서 XCom 가져오기(Airlfow 2.4) 나는 XCom.get_one() 함수를 사용하였고, 여러 값을 가져오려면 XCom.get_many()를 사용할 수 있다. 함수에 executioin_date, dag_id, task_id를 넘겨주어서 기준에 맞는 XCom을..