EmrAddStepsOperator
아파치 에어플로우(Apache Airflow)에서 EmrAddStepsOperator는 이미 실행 중이거나 준비(Waiting) 상태인 EMR(Elastic MapReduce) 클러스터에 새로운 스텝(steps)을 추가하기 위해 사용되는 오퍼레이터임.
예를 들어 Spark 작업, Hadoop 작업, Hive 스크립트, Pig 스크립트 등을 EMR 클러스터에서 실행하고 싶을 때, 작업 단위를 ‘스텝’(Step)으로 정의하고 이를 EMR 클러스터에 추가하여 순차/병렬로 실행할 수 있음.
역할 및 동작 원리
1. EMR 클러스터에 작업(스텝)을 등록
EmrAddStepsOperator는 AWS의 EMR 클러스터에 대하여 add_job_flow_steps API를 호출해 여러 스텝을 한 번에 추가함.
각 스텝은 Spark/Hadoop과 같은 실행 커맨드(예: spark-submit), JAR 파일 경로, 스크립트 경로, 인자(Arguments) 등으로 구성되어 있음.
2. Airflow DAG 내에서의 위치
일반적으로 클러스터 생성 → 스텝 추가 → 스텝 모니터링 →(선택) 클러스터 종료와 같은 순서로 작업을 구성함.
EmrCreateJobFlowOperator로 클러스터를 생성하고, 이후 EmrAddStepsOperator로 필요한 Spark/Hadoop 스텝을 추가하며, EmrStepSensor 등을 활용해 스텝 상태를 확인하고, 작업이 모두 끝나면 EmrTerminateJobFlowOperator로 클러스터를 종료하는 식임.
3. 반환 값(XCom)
EmrAddStepsOperator는 실행이 완료된 뒤, 추가된 스텝들의 Step ID 리스트를 XCom으로 푸시함.
후속 태스크인 EmrStepSensor는 보통 이 Step ID들을 받아 모니터링에 사용함.
주요 파라미터
EmrAddStepsOperator를 사용할 때 주로 활용되는 파라미터들을 정리하면 아래와 같음.
Airflow 버전 및 AWS Provider 버전에 따라 일부 파라미터가 다를 수 있음.
1. job_flow_id (필수)
스텝을 추가할 대상 EMR 클러스터의 ID임.
예를 들면, j-2AXXXXXXGAPLF.
보통은 이전에 EmrCreateJobFlowOperator가 생성한 클러스터 ID를 XCom으로부터 받아({{ ti.xcom_pull(task_ids='create_cluster', key='return_value') }} 등) 사용함.
2. steps (필수)
EMR에 추가할 여러 스텝의 구성 정보(list of dict)임.
각 스텝은 주로 아래와 같은 필드를 가짐.
2-1. Name: 스텝 이름
2-2. ActionOnFailure: 스텝 실패 시 동작(TERMINATE_CLUSTER, TERMINATE_JOB_FLOW, CONTINUE 등)
2-3. HadoopJarStep: Jar, Args, MainClass 등을 지정하며, Spark/Hive/Pig 각각에 맞는 실행 커맨드를 정의함.
예를 들면 다음과 같음.
SPARK_STEPS = [
{
'Name': 'Run Spark Job',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'--deploy-mode', 'cluster',
'--class', 'org.apache.spark.examples.SparkPi',
's3://your-bucket/path/to/jar',
'100'
]
}
},
# 여러 개 가능
]
3. aws_conn_id
Airflow에 등록된 AWS Connection ID임.
여기서 Access Key, Secret Key, Region 등을 읽어옴.
기본 값은 aws_default임.
4. wait_for_completion (구버전에서 사용)
AWS 프로바이더 버전에 따라 제공될 수 있음.
이 값을 True로 설정하면, 스텝이 모두 종료될 때까지 Operator가 블로킹될 수 있음.
하지만 보통은 EmrStepSensor를 별도로 사용하는 방식을 권장함.
5. deferrable
Airflow 2.2+ 및 AWS Provider에서 지원할 수 있는 비동기 모드(Deferrable) 관련 플래그임.
배치 작업에서 EMR 스텝을 기다릴 때 워커 리소스를 오래 붙잡지 않도록 개선하는 방식인데, 환경에 따라 사용 가능 여부가 달라짐.
사용 예시 (DAG 예시 코드)
아래는 간단한 Airflow DAG 예시임.
EMR 클러스터 생성, 2) 스텝 추가, 3) 스텝 모니터링, 4) 종료 순서대로 진행함.
from airflow import DAG
from airflow.providers.amazon.aws.operators.emr import EmrCreateJobFlowOperator, EmrAddStepsOperator, EmrTerminateJobFlowOperator
from airflow.providers.amazon.aws.sensors.emr import EmrStepSensor
from airflow.utils.dates import days_ago
SPARK_STEPS = [
{
'Name': 'Run Spark Job',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'--deploy-mode', 'cluster',
's3://my-bucket/scripts/spark_job.py',
's3://my-bucket/input/',
's3://my-bucket/output/'
]
}
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'MySampleEMRCluster',
'ReleaseLabel': 'emr-6.7.0',
'Instances': {
'InstanceGroups': [
{
'Name': 'Master nodes',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1
},
{
'Name': 'Core nodes',
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'm5.xlarge',
'InstanceCount': 2
}
],
'KeepJobFlowAliveWhenNoSteps': True,
'Ec2KeyName': 'your-key-name',
'Ec2SubnetId': 'subnet-xxxxxx'
},
'Applications': [
{'Name': 'Hadoop'},
{'Name': 'Spark'}
],
'VisibleToAllUsers': True,
# 추가 옵션들...
}
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}
with DAG('emr_add_steps_example',
default_args=default_args,
schedule_interval=None,
catchup=False) as dag:
# 1) EMR 클러스터 생성
create_emr_cluster = EmrCreateJobFlowOperator(
task_id='create_emr_cluster',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default'
)
# 2) 스텝 추가
add_steps = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ ti.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
steps=SPARK_STEPS,
aws_conn_id='aws_default'
)
# 3) 스텝 모니터링
# add_steps가 반환한 스텝 ID 리스트의 첫 번째를 모니터링하는 예시
watch_step = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ ti.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
step_id="{{ ti.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default'
)
# 4) 클러스터 종료
terminate_emr_cluster = EmrTerminateJobFlowOperator(
task_id='terminate_emr_cluster',
job_flow_id="{{ ti.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
aws_conn_id='aws_default'
)
create_emr_cluster >> add_steps >> watch_step >> terminate_emr_cluster
모범 사례 및 주의사항
1. 스텝 단위로 모니터링하기
EmrAddStepsOperator에서 wait_for_completion=True 같은 옵션을 사용하기도 하지만, 가장 권장되는 방식은 EmrStepSensor를 별도로 둬서 필요한 스텝만 골라 모니터링하는 것임.
여러 스텝을 한 번에 등록했을 때, 각 스텝이 순차적으로 실행되는지, 중간에 실패했는지 등을 단계별로 파악하기 수월해짐.
2. 클러스터 상태 확인
스텝 추가 전, 클러스터가 적절히 RUNNING 또는 WAITING 상태에 있어야 함.
만약 클러스터가 TERMINATING 상태거나 에러가 난 상태라면 스텝을 정상적으로 추가할 수 없음.
3. ActionOnFailure
스텝이 실패했을 때, 전체 클러스터를 종료시킬지(TERMINATE_CLUSTER) 아니면 그냥 스텝만 실패 상태로 두고 계속 진행할지(CONTINUE) 옵션을 명시해야 함.
잘못 설정할 경우, 특정 스텝이 실패 시 이후 스텝을 실행할 수 없거나 클러스터가 즉시 종료될 수 있으므로 주의가 필요함.
4. XCom 활용
EmrAddStepsOperator → EmrStepSensor 간 연동을 위해 XCom에서 Step ID 배열을 잘 받아야 함.
{{ ti.xcom_pull(task_ids='add_steps', key='return_value')[0] }}와 같이 인덱스를 접근하는 방식을 자주 사용함.
5. 템플릿 기능
Airflow의 jinja 템플릿을 steps 파라미터나 job_flow_id 설정 등에 활용하여 동적으로 값을 주입할 수 있음.
날짜/시간 정보를 활용한 동적 Spark 설정, s3 폴더를 날짜별로 구분하는 등의 고급 활용이 가능함.
6. 단일 클러스터 vs 에페멀(Ephemeral) 클러스터
Airflow에서는 보통 ‘클러스터 한 번 띄우고 → 여러 스텝 순차 실행 → 종료’(일명 ephemeral cluster) 방식을 많이 씀.
이 경우 불필요하게 클러스터를 띄워놓지 않으므로 비용 절감이 가능함.
반면 장시간 상시 클러스터를 두고 여러 DAG들이 같은 클러스터에 스텝만 추가해서 사용할 수도 있음.
이 경우에는 병목 현상, 리소스 스펙 이슈, 동시성 등을 관리해야 함.
7. IAM 권한 설정
aws_conn_id가 사용하는 IAM Role/Policy에 EMR 관련 API(AddJobFlowSteps, ListSteps, DescribeStep 등)를 사용할 수 있는 권한이 반드시 포함되어 있어야 함.
확장 포인트
1. 비동기 센서 사용 (Deferrable Operator/Sensor)
Airflow 2.2 이상, AWS Provider 관련 라이브러리 버전에 따라 Deferrable 모드를 지원하는 센서/오퍼레이터가 존재함.
이는 EMR 스텝 완료까지 작업자를 블로킹하지 않고, 이벤트 기반으로 재개하기 때문에 대규모 배치 파이프라인에서 워커 리소스 낭비를 줄일 수 있음.
2. 오류 처리/재시도 전략
스텝 추가 시 오류가 발생하면 EmrAddStepsOperator 자체가 실패할 수 있음.
이 경우 Airflow 레벨에서 태스크 재시도(retries, retry_delay 등)를 걸어두어, 일시적인 AWS API 오류를 재시도하게 만들 수 있음.
3. 동적 스텝 생성
Airflow XCom이나 다른 태스크의 결과에 따라, 스텝 목록을 동적으로 생성한 뒤 EmrAddStepsOperator로 전달할 수도 있음.
예를 들어, 데이터 분할 결과에 따라 spark-submit 인자를 다르게 주어 스텝을 여러 개 생성함.
정리
EmrAddStepsOperator는 실행 중인 EMR 클러스터에 스텝(작업)을 추가하는 핵심 오퍼레이터임.
일반적으로 클러스터 생성 → 스텝 추가 → 스텝 모니터링 → (필요 시)클러스터 종료 단계로 파이프라인을 구성함.
반환되는 스텝 ID를 사용하여 EmrStepSensor로 모니터링할 수 있으며, 이 과정을 통해 대규모 Spark/Hadoop 작업을 자동화, 스케줄링, 모니터링하는 워크플로우를 구축할 수 있음.
에어플로우의 템플릿, XCom을 적극 활용하면, 동적인 파이프라인 구성도 가능하며, IAM 권한 설정, 클러스터 상태 체크, ActionOnFailure 등 세부 설정에 유의해야 안정적인 운영이 가능함.
이와 같은 사항을 종합적으로 이해하고 적용하면, Apache Airflow에서 EmrAddStepsOperator를 이용해 EMR 기반 데이터 파이프라인을 유연하고 확장성 있게 구성할 수 있음.
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow] EMR에서 Step의 개념 (0) | 2025.04.04 |
---|---|
[Airflow] Airflow를 통해 EMR에 Pyspark 여러개 실행하는 방법 (1) | 2025.04.04 |
[Airflow] 컴포넌트, MWAA (0) | 2024.05.15 |
[Airflow] 에어플로우 DAG란? (0) | 2023.04.02 |
[Airflow] 데이터 파이프라인이란? (0) | 2023.01.09 |
EmrAddStepsOperator
아파치 에어플로우(Apache Airflow)에서 EmrAddStepsOperator는 이미 실행 중이거나 준비(Waiting) 상태인 EMR(Elastic MapReduce) 클러스터에 새로운 스텝(steps)을 추가하기 위해 사용되는 오퍼레이터임.
예를 들어 Spark 작업, Hadoop 작업, Hive 스크립트, Pig 스크립트 등을 EMR 클러스터에서 실행하고 싶을 때, 작업 단위를 ‘스텝’(Step)으로 정의하고 이를 EMR 클러스터에 추가하여 순차/병렬로 실행할 수 있음.
역할 및 동작 원리
1. EMR 클러스터에 작업(스텝)을 등록
EmrAddStepsOperator는 AWS의 EMR 클러스터에 대하여 add_job_flow_steps API를 호출해 여러 스텝을 한 번에 추가함.
각 스텝은 Spark/Hadoop과 같은 실행 커맨드(예: spark-submit), JAR 파일 경로, 스크립트 경로, 인자(Arguments) 등으로 구성되어 있음.
2. Airflow DAG 내에서의 위치
일반적으로 클러스터 생성 → 스텝 추가 → 스텝 모니터링 →(선택) 클러스터 종료와 같은 순서로 작업을 구성함.
EmrCreateJobFlowOperator로 클러스터를 생성하고, 이후 EmrAddStepsOperator로 필요한 Spark/Hadoop 스텝을 추가하며, EmrStepSensor 등을 활용해 스텝 상태를 확인하고, 작업이 모두 끝나면 EmrTerminateJobFlowOperator로 클러스터를 종료하는 식임.
3. 반환 값(XCom)
EmrAddStepsOperator는 실행이 완료된 뒤, 추가된 스텝들의 Step ID 리스트를 XCom으로 푸시함.
후속 태스크인 EmrStepSensor는 보통 이 Step ID들을 받아 모니터링에 사용함.
주요 파라미터
EmrAddStepsOperator를 사용할 때 주로 활용되는 파라미터들을 정리하면 아래와 같음.
Airflow 버전 및 AWS Provider 버전에 따라 일부 파라미터가 다를 수 있음.
1. job_flow_id (필수)
스텝을 추가할 대상 EMR 클러스터의 ID임.
예를 들면, j-2AXXXXXXGAPLF.
보통은 이전에 EmrCreateJobFlowOperator가 생성한 클러스터 ID를 XCom으로부터 받아({{ ti.xcom_pull(task_ids='create_cluster', key='return_value') }} 등) 사용함.
2. steps (필수)
EMR에 추가할 여러 스텝의 구성 정보(list of dict)임.
각 스텝은 주로 아래와 같은 필드를 가짐.
2-1. Name: 스텝 이름
2-2. ActionOnFailure: 스텝 실패 시 동작(TERMINATE_CLUSTER, TERMINATE_JOB_FLOW, CONTINUE 등)
2-3. HadoopJarStep: Jar, Args, MainClass 등을 지정하며, Spark/Hive/Pig 각각에 맞는 실행 커맨드를 정의함.
예를 들면 다음과 같음.
SPARK_STEPS = [
{
'Name': 'Run Spark Job',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'--deploy-mode', 'cluster',
'--class', 'org.apache.spark.examples.SparkPi',
's3://your-bucket/path/to/jar',
'100'
]
}
},
# 여러 개 가능
]
3. aws_conn_id
Airflow에 등록된 AWS Connection ID임.
여기서 Access Key, Secret Key, Region 등을 읽어옴.
기본 값은 aws_default임.
4. wait_for_completion (구버전에서 사용)
AWS 프로바이더 버전에 따라 제공될 수 있음.
이 값을 True로 설정하면, 스텝이 모두 종료될 때까지 Operator가 블로킹될 수 있음.
하지만 보통은 EmrStepSensor를 별도로 사용하는 방식을 권장함.
5. deferrable
Airflow 2.2+ 및 AWS Provider에서 지원할 수 있는 비동기 모드(Deferrable) 관련 플래그임.
배치 작업에서 EMR 스텝을 기다릴 때 워커 리소스를 오래 붙잡지 않도록 개선하는 방식인데, 환경에 따라 사용 가능 여부가 달라짐.
사용 예시 (DAG 예시 코드)
아래는 간단한 Airflow DAG 예시임.
EMR 클러스터 생성, 2) 스텝 추가, 3) 스텝 모니터링, 4) 종료 순서대로 진행함.
from airflow import DAG
from airflow.providers.amazon.aws.operators.emr import EmrCreateJobFlowOperator, EmrAddStepsOperator, EmrTerminateJobFlowOperator
from airflow.providers.amazon.aws.sensors.emr import EmrStepSensor
from airflow.utils.dates import days_ago
SPARK_STEPS = [
{
'Name': 'Run Spark Job',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'--deploy-mode', 'cluster',
's3://my-bucket/scripts/spark_job.py',
's3://my-bucket/input/',
's3://my-bucket/output/'
]
}
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'MySampleEMRCluster',
'ReleaseLabel': 'emr-6.7.0',
'Instances': {
'InstanceGroups': [
{
'Name': 'Master nodes',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1
},
{
'Name': 'Core nodes',
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'm5.xlarge',
'InstanceCount': 2
}
],
'KeepJobFlowAliveWhenNoSteps': True,
'Ec2KeyName': 'your-key-name',
'Ec2SubnetId': 'subnet-xxxxxx'
},
'Applications': [
{'Name': 'Hadoop'},
{'Name': 'Spark'}
],
'VisibleToAllUsers': True,
# 추가 옵션들...
}
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}
with DAG('emr_add_steps_example',
default_args=default_args,
schedule_interval=None,
catchup=False) as dag:
# 1) EMR 클러스터 생성
create_emr_cluster = EmrCreateJobFlowOperator(
task_id='create_emr_cluster',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default'
)
# 2) 스텝 추가
add_steps = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ ti.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
steps=SPARK_STEPS,
aws_conn_id='aws_default'
)
# 3) 스텝 모니터링
# add_steps가 반환한 스텝 ID 리스트의 첫 번째를 모니터링하는 예시
watch_step = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ ti.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
step_id="{{ ti.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default'
)
# 4) 클러스터 종료
terminate_emr_cluster = EmrTerminateJobFlowOperator(
task_id='terminate_emr_cluster',
job_flow_id="{{ ti.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
aws_conn_id='aws_default'
)
create_emr_cluster >> add_steps >> watch_step >> terminate_emr_cluster
모범 사례 및 주의사항
1. 스텝 단위로 모니터링하기
EmrAddStepsOperator에서 wait_for_completion=True 같은 옵션을 사용하기도 하지만, 가장 권장되는 방식은 EmrStepSensor를 별도로 둬서 필요한 스텝만 골라 모니터링하는 것임.
여러 스텝을 한 번에 등록했을 때, 각 스텝이 순차적으로 실행되는지, 중간에 실패했는지 등을 단계별로 파악하기 수월해짐.
2. 클러스터 상태 확인
스텝 추가 전, 클러스터가 적절히 RUNNING 또는 WAITING 상태에 있어야 함.
만약 클러스터가 TERMINATING 상태거나 에러가 난 상태라면 스텝을 정상적으로 추가할 수 없음.
3. ActionOnFailure
스텝이 실패했을 때, 전체 클러스터를 종료시킬지(TERMINATE_CLUSTER) 아니면 그냥 스텝만 실패 상태로 두고 계속 진행할지(CONTINUE) 옵션을 명시해야 함.
잘못 설정할 경우, 특정 스텝이 실패 시 이후 스텝을 실행할 수 없거나 클러스터가 즉시 종료될 수 있으므로 주의가 필요함.
4. XCom 활용
EmrAddStepsOperator → EmrStepSensor 간 연동을 위해 XCom에서 Step ID 배열을 잘 받아야 함.
{{ ti.xcom_pull(task_ids='add_steps', key='return_value')[0] }}와 같이 인덱스를 접근하는 방식을 자주 사용함.
5. 템플릿 기능
Airflow의 jinja 템플릿을 steps 파라미터나 job_flow_id 설정 등에 활용하여 동적으로 값을 주입할 수 있음.
날짜/시간 정보를 활용한 동적 Spark 설정, s3 폴더를 날짜별로 구분하는 등의 고급 활용이 가능함.
6. 단일 클러스터 vs 에페멀(Ephemeral) 클러스터
Airflow에서는 보통 ‘클러스터 한 번 띄우고 → 여러 스텝 순차 실행 → 종료’(일명 ephemeral cluster) 방식을 많이 씀.
이 경우 불필요하게 클러스터를 띄워놓지 않으므로 비용 절감이 가능함.
반면 장시간 상시 클러스터를 두고 여러 DAG들이 같은 클러스터에 스텝만 추가해서 사용할 수도 있음.
이 경우에는 병목 현상, 리소스 스펙 이슈, 동시성 등을 관리해야 함.
7. IAM 권한 설정
aws_conn_id가 사용하는 IAM Role/Policy에 EMR 관련 API(AddJobFlowSteps, ListSteps, DescribeStep 등)를 사용할 수 있는 권한이 반드시 포함되어 있어야 함.
확장 포인트
1. 비동기 센서 사용 (Deferrable Operator/Sensor)
Airflow 2.2 이상, AWS Provider 관련 라이브러리 버전에 따라 Deferrable 모드를 지원하는 센서/오퍼레이터가 존재함.
이는 EMR 스텝 완료까지 작업자를 블로킹하지 않고, 이벤트 기반으로 재개하기 때문에 대규모 배치 파이프라인에서 워커 리소스 낭비를 줄일 수 있음.
2. 오류 처리/재시도 전략
스텝 추가 시 오류가 발생하면 EmrAddStepsOperator 자체가 실패할 수 있음.
이 경우 Airflow 레벨에서 태스크 재시도(retries, retry_delay 등)를 걸어두어, 일시적인 AWS API 오류를 재시도하게 만들 수 있음.
3. 동적 스텝 생성
Airflow XCom이나 다른 태스크의 결과에 따라, 스텝 목록을 동적으로 생성한 뒤 EmrAddStepsOperator로 전달할 수도 있음.
예를 들어, 데이터 분할 결과에 따라 spark-submit 인자를 다르게 주어 스텝을 여러 개 생성함.
정리
EmrAddStepsOperator는 실행 중인 EMR 클러스터에 스텝(작업)을 추가하는 핵심 오퍼레이터임.
일반적으로 클러스터 생성 → 스텝 추가 → 스텝 모니터링 → (필요 시)클러스터 종료 단계로 파이프라인을 구성함.
반환되는 스텝 ID를 사용하여 EmrStepSensor로 모니터링할 수 있으며, 이 과정을 통해 대규모 Spark/Hadoop 작업을 자동화, 스케줄링, 모니터링하는 워크플로우를 구축할 수 있음.
에어플로우의 템플릿, XCom을 적극 활용하면, 동적인 파이프라인 구성도 가능하며, IAM 권한 설정, 클러스터 상태 체크, ActionOnFailure 등 세부 설정에 유의해야 안정적인 운영이 가능함.
이와 같은 사항을 종합적으로 이해하고 적용하면, Apache Airflow에서 EmrAddStepsOperator를 이용해 EMR 기반 데이터 파이프라인을 유연하고 확장성 있게 구성할 수 있음.
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow] EMR에서 Step의 개념 (0) | 2025.04.04 |
---|---|
[Airflow] Airflow를 통해 EMR에 Pyspark 여러개 실행하는 방법 (1) | 2025.04.04 |
[Airflow] 컴포넌트, MWAA (0) | 2024.05.15 |
[Airflow] 에어플로우 DAG란? (0) | 2023.04.02 |
[Airflow] 데이터 파이프라인이란? (0) | 2023.01.09 |