case 1)
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
dag = DAG (
dag_id = '3_python_operator_xcom',
start_date = datetime(2022, 12, 3),
schedule_interval = '* * * * *',
catchup = False,
tags = ['server_local', 'detail_test'],
description = 'Python Operator Sample',
default_args = {'owner': 'ParkGyeongTae'})
def func_xcom_push_1(**context):
return context['task_instance'].xcom_push(key = 'xcom_push_key_1', value = 'xcom_push_value_1')
task_xcom_push_1 = PythonOperator(task_id = 'xcom_push_1', python_callable = func_xcom_push_1, dag = dag, do_xcom_push = True)
task_xcom_push_1
case 2)
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
dag = DAG (
dag_id = '3_python_operator_xcom',
start_date = datetime(2022, 12, 3),
schedule_interval = '* * * * *',
catchup = False,
tags = ['server_local', 'detail_test'],
description = 'Python Operator Sample',
default_args = {'owner': 'ParkGyeongTae'})
def func_xcom_push_2(**context):
xcom_value = 'xcom_push_value_2'
return context['task_instance'].xcom_push(key = 'xcom_push_key_2', value = xcom_value)
task_xcom_push_2 = PythonOperator(task_id = 'xcom_push_2', python_callable = func_xcom_push_2, dag = dag, do_xcom_push = True)
task_xcom_push_2
case 3)
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
dag = DAG (
dag_id = '3_python_operator_xcom',
start_date = datetime(2022, 12, 3),
schedule_interval = '* * * * *',
catchup = False,
tags = ['server_local', 'detail_test'],
description = 'Python Operator Sample',
default_args = {'owner': 'ParkGyeongTae'})
def func_xcom_push_3(**context):
xcom_key, xcom_value = 'xcom_push_key_3', 'xcom_push_value_3'
return context['task_instance'].xcom_push(key = xcom_key, value = xcom_value)
task_xcom_push_3 = PythonOperator(task_id = 'xcom_push_3', python_callable = func_xcom_push_3, dag = dag, do_xcom_push = True)
task_xcom_push_3
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow] airflow db init (0) | 2022.12.06 |
---|---|
[Airflow] context, kwargs 에 들어있는 값 확인하기 (0) | 2022.12.06 |
[Airflow] Xcom에 데이터 남기지 않는 방법 (0) | 2022.12.06 |
[Airflow] PythonOperator에서 return값은 로그에 남는다. (0) | 2022.12.06 |
[Airflow] 각 컨테이너별로 생성되는 로그를 확인해보자 (0) | 2022.12.06 |