case 1)
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
dag = DAG (
dag_id = '1_python_operator',
start_date = datetime(2022, 12, 3),
schedule_interval = '* * * * *',
catchup = False,
tags = ['test'],
description = 'Python Operator Sample',
default_args = {'owner': 'ParkGyeongTae'})
def print_1():
return '111'
print_11 = PythonOperator (
task_id = 'print_1',
python_callable = print_1,
dag = dag)
print_11
원시로그는 다음과 같다.
airflow-worker
*** Reading local file: /opt/airflow/logs/dag_id=1_python_operator/run_id=scheduled__2022-12-06T10:42:00+00:00/task_id=print_1/attempt=1.log
[2022-12-06, 19:43:01 KST] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: 1_python_operator.print_1 scheduled__2022-12-06T10:42:00+00:00 [queued]>
[2022-12-06, 19:43:01 KST] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: 1_python_operator.print_1 scheduled__2022-12-06T10:42:00+00:00 [queued]>
[2022-12-06, 19:43:01 KST] {taskinstance.py:1362} INFO -
--------------------------------------------------------------------------------
[2022-12-06, 19:43:01 KST] {taskinstance.py:1363} INFO - Starting attempt 1 of 1
[2022-12-06, 19:43:01 KST] {taskinstance.py:1364} INFO -
--------------------------------------------------------------------------------
[2022-12-06, 19:43:01 KST] {taskinstance.py:1383} INFO - Executing <Task(PythonOperator): print_1> on 2022-12-06 10:42:00+00:00
[2022-12-06, 19:43:01 KST] {standard_task_runner.py:54} INFO - Started process 106 to run task
[2022-12-06, 19:43:01 KST] {standard_task_runner.py:82} INFO - Running: ['***', 'tasks', 'run', '1_python_operator', 'print_1', 'scheduled__2022-12-06T10:42:00+00:00', '--job-id', '83', '--raw', '--subdir', 'DAGS_FOLDER/1_python_operator.py', '--cfg-path', '/tmp/tmp9xizs9tp']
[2022-12-06, 19:43:01 KST] {standard_task_runner.py:83} INFO - Job 83: Subtask print_1
[2022-12-06, 19:43:01 KST] {dagbag.py:525} INFO - Filling up the DagBag from /opt/***/dags/1_python_operator.py
[2022-12-06, 19:43:01 KST] {warnings.py:109} WARNING - /home/***/.local/lib/python3.8/site-packages/***/configuration.py:545: DeprecationWarning: The sql_alchemy_conn option in [core] has been moved to the sql_alchemy_conn option in [database] - the old setting has been used, but please update your config.
option = self._get_environment_variables(deprecated_key, deprecated_section, key, section)
[2022-12-06, 19:43:01 KST] {task_command.py:384} INFO - Running <TaskInstance: 1_python_operator.print_1 scheduled__2022-12-06T10:42:00+00:00 [running]> on host ***-worker
[2022-12-06, 19:43:01 KST] {taskinstance.py:1590} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=ParkGyeongTae
AIRFLOW_CTX_DAG_ID=1_python_operator
AIRFLOW_CTX_TASK_ID=print_1
AIRFLOW_CTX_EXECUTION_DATE=2022-12-06T10:42:00+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-12-06T10:42:00+00:00
[2022-12-06, 19:43:01 KST] {python.py:177} INFO - Done. Returned value was: 111
[2022-12-06, 19:43:01 KST] {taskinstance.py:1401} INFO - Marking task as SUCCESS. dag_id=1_python_operator, task_id=print_1, execution_date=20221206T104200, start_date=20221206T104301, end_date=20221206T104301
[2022-12-06, 19:43:01 KST] {local_task_job.py:164} INFO - Task exited with return code 0
[2022-12-06, 19:43:01 KST] {local_task_job.py:273} INFO - 1 downstream tasks scheduled from follow-on schedule check
case 2)
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
dag = DAG (
dag_id = '1_python_operator',
start_date = datetime(2022, 12, 3),
schedule_interval = '* * * * *',
catchup = False,
tags = ['test'],
description = 'Python Operator Sample',
default_args = {'owner': 'ParkGyeongTae'})
def print_1():
print('111')
return '111'
print_11 = PythonOperator (
task_id = 'print_1',
python_callable = print_1,
dag = dag)
print_11
로그는 다음과 같다.
airflow-worker
*** Reading local file: /opt/airflow/logs/dag_id=2_python_operator/run_id=scheduled__2022-12-06T10:42:00+00:00/task_id=print_1/attempt=1.log
[2022-12-06, 19:43:01 KST] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: 2_python_operator.print_1 scheduled__2022-12-06T10:42:00+00:00 [queued]>
[2022-12-06, 19:43:01 KST] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: 2_python_operator.print_1 scheduled__2022-12-06T10:42:00+00:00 [queued]>
[2022-12-06, 19:43:01 KST] {taskinstance.py:1362} INFO -
--------------------------------------------------------------------------------
[2022-12-06, 19:43:01 KST] {taskinstance.py:1363} INFO - Starting attempt 1 of 1
[2022-12-06, 19:43:01 KST] {taskinstance.py:1364} INFO -
--------------------------------------------------------------------------------
[2022-12-06, 19:43:01 KST] {taskinstance.py:1383} INFO - Executing <Task(PythonOperator): print_1> on 2022-12-06 10:42:00+00:00
[2022-12-06, 19:43:01 KST] {standard_task_runner.py:54} INFO - Started process 105 to run task
[2022-12-06, 19:43:01 KST] {standard_task_runner.py:82} INFO - Running: ['***', 'tasks', 'run', '2_python_operator', 'print_1', 'scheduled__2022-12-06T10:42:00+00:00', '--job-id', '82', '--raw', '--subdir', 'DAGS_FOLDER/2_python_operator.py', '--cfg-path', '/tmp/tmpxi_dehdf']
[2022-12-06, 19:43:01 KST] {standard_task_runner.py:83} INFO - Job 82: Subtask print_1
[2022-12-06, 19:43:01 KST] {dagbag.py:525} INFO - Filling up the DagBag from /opt/***/dags/2_python_operator.py
[2022-12-06, 19:43:01 KST] {warnings.py:109} WARNING - /home/***/.local/lib/python3.8/site-packages/***/configuration.py:545: DeprecationWarning: The sql_alchemy_conn option in [core] has been moved to the sql_alchemy_conn option in [database] - the old setting has been used, but please update your config.
option = self._get_environment_variables(deprecated_key, deprecated_section, key, section)
[2022-12-06, 19:43:01 KST] {task_command.py:384} INFO - Running <TaskInstance: 2_python_operator.print_1 scheduled__2022-12-06T10:42:00+00:00 [running]> on host ***-worker
[2022-12-06, 19:43:01 KST] {taskinstance.py:1590} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=ParkGyeongTae
AIRFLOW_CTX_DAG_ID=2_python_operator
AIRFLOW_CTX_TASK_ID=print_1
AIRFLOW_CTX_EXECUTION_DATE=2022-12-06T10:42:00+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-12-06T10:42:00+00:00
[2022-12-06, 19:43:01 KST] {logging_mixin.py:117} INFO - 111
[2022-12-06, 19:43:01 KST] {python.py:177} INFO - Done. Returned value was: 111
[2022-12-06, 19:43:01 KST] {taskinstance.py:1401} INFO - Marking task as SUCCESS. dag_id=2_python_operator, task_id=print_1, execution_date=20221206T104200, start_date=20221206T104301, end_date=20221206T104301
[2022-12-06, 19:43:01 KST] {local_task_job.py:164} INFO - Task exited with return code 0
[2022-12-06, 19:43:01 KST] {local_task_job.py:273} INFO - 1 downstream tasks scheduled from follow-on schedule check
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow] context를 이용한 xcom에 key, value 남기는 방법 (0) | 2022.12.06 |
---|---|
[Airflow] Xcom에 데이터 남기지 않는 방법 (0) | 2022.12.06 |
[Airflow] 각 컨테이너별로 생성되는 로그를 확인해보자 (0) | 2022.12.06 |
[Airflow] postgres MetaDB에 사용하지 않는 데이터가 쌓이는 현상 (0) | 2022.12.04 |
[Airflow] MetaDB에서 task 실행기록 확인하는 방법 (0) | 2022.12.04 |