데이터 엔지니어

Data Engineering/Airflow

[Airflow] context, kwargs 에 들어있는 값 확인하기

from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime import pprint dag = DAG ( dag_id = '4_python_operator_context', start_date = datetime(2022, 12, 3), schedule_interval = '* * * * *', catchup = False, tags = ['server_local', 'detail_test'], description = 'Python Operator Sample', default_args = {'owner': 'ParkGyeongTae'}) def func_p..

Data Engineering/Airflow

[Airflow] context를 이용한 xcom에 key, value 남기는 방법

case 1) from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime dag = DAG ( dag_id = '3_python_operator_xcom', start_date = datetime(2022, 12, 3), schedule_interval = '* * * * *', catchup = False, tags = ['server_local', 'detail_test'], description = 'Python Operator Sample', default_args = {'owner': 'ParkGyeongTae'}) def func_xcom_push_..

Data Engineering/Airflow

[Airflow] Xcom에 데이터 남기지 않는 방법

기존 DAG - Xcom에 데이터를 task마다 남긴다.. from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime dag = DAG ( dag_id = '2_python_operator', start_date = datetime(2022, 12, 3), schedule_interval = '* * * * *', catchup = False, tags = ['server_local', 'detail_test'], description = 'Python Operator Sample', default_args = {'owner': 'ParkGyeongTae'})..

Data Engineering/Airflow

[Airflow] PythonOperator에서 return값은 로그에 남는다.

case 1) from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime dag = DAG ( dag_id = '1_python_operator', start_date = datetime(2022, 12, 3), schedule_interval = '* * * * *', catchup = False, tags = ['test'], description = 'Python Operator Sample', default_args = {'owner': 'ParkGyeongTae'}) def print_1(): return '111' print_11 = PythonOp..

Data Engineering/Airflow

[Airflow] 각 컨테이너별로 생성되는 로그를 확인해보자

flower - /opt/airflow/logs - 아무것도 없음 init - /opt/airflow/logs - 아무것도 없음 scheduler - /opt/airflow/logs webserver - /opt/airflow/logs - 아무것도 없음 worker - /opt/airflow/logs

Data Engineering/Airflow

[Airflow] postgres MetaDB에 사용하지 않는 데이터가 쌓이는 현상

지금까지 확인한 상황은 다음과 같다. 1) 1_python_operator 라는 dag를 만들면 일어나는 일 - 웹서버에서 dag 표시가 됨 - postgres db에서 dag 테이블에 데이터가 생김 - postgres db에서 dag_code 테이블에 데이터가 생김 추가확인) 웹서버의 dag와 postgres db에서 dag 테이블의 데이터 생성이 일치함 2) 1_python_operator 라는 dag를 실행하면 일어나는 일 - task 실행됨 - dag_run 에 데이터 생김 - task_instance 에 실행한 task 데이터 생김 - flower 에 실행한 task 생김 3) 1_python_operator 라는 dag를 웹서버에서 삭제하면 일어나는 일 - 웹서버에서 dag 삭제됨 - post..

Data Engineering/Airflow

[Airflow] MetaDB에서 task 실행기록 확인하는 방법

1_python_operator 라는 dag를 2번 실행시켰고, task는 6개가 있다. select * from task_instance

Data Engineering/Airflow

[Airflow] DAG 추가시 MetaDB 변경사항 확인

기존 dag 는 1_python_operator 이고, 추가된 dag 는 2_python_operator 이다. 메타DB를 확인해보자 select * from dag select * from dag_code

Data Engineering/Airflow

[Airflow] DAG파일 실행시 MetaDB 확인

dag 파일이름 : 1_python_operator dag에는 6개의 task가 있고, 2번 실행했다. select * from dag_run

박경태
'분류 전체보기' 카테고리의 글 목록 (40 Page)