Airflow를 통해 EMR에 Pyspark 여러개 실행하는 방법
아파치 에어플로우(Apache Airflow)에서 EMR(Elastic MapReduce) 클러스터를 사용해 여러 개의 PySpark 작업을 실행하는 전반적인 방법과 모범사례는 아래와 같음.
여기서는 각각의 PySpark 작업을 EMR 스텝으로 등록하고 실행하는 시나리오를 다룸.
기본 개념 정리
1. EMR 클러스터
AWS에서 Spark/Hadoop/Hive/Pig 등의 작업을 실행할 수 있는 클러스터 환경임.
EmrCreateJobFlowOperator로 클러스터를 생성하거나, 이미 띄워진 클러스터가 있다면 그 클러스터 ID를 사용하여 스텝을 추가할 수 있음.
2. EMR 스텝(Step)
EMR에서 실행하는 하나의 작업 단위를 스텝이라 부름.
예를 들어, Spark-submit 작업 1회.
PySpark 코드 또한 spark-submit을 통해 JAR 또는 .py 스크립트를 실행하므로, EMR 스텝 형태로 정의할 수 있음.
3. Airflow와 EMR
보통 Airflow DAG 안에서 다음 단계로 진행됨.
3-1. EMR 클러스터 생성 (EmrCreateJobFlowOperator)
3-2. PySpark 작업 추가 (EmrAddStepsOperator)
3-3. 스텝 완료 모니터링 (EmrStepSensor)
3-4. (필요시) 클러스터 종료 (EmrTerminateJobFlowOperator)
4. 여러 PySpark 작업 실행
여러 PySpark 스크립트를 한꺼번에 또는 단계적으로 실행하려면, EMR 스텝을 여러 개 정의해서 EmrAddStepsOperator에 전달하면 됨.
순차 실행(스텝 A가 끝나면 B, B가 끝나면 C 등) 또는 병렬 실행(클러스터 노드 리소스를 활용 가능한 수준에서 병렬) 구조로 구성 가능함.
EMR에서 PySpark 스크립트를 여러 개 실행하는 시나리오
1. 에페멀(Ephemeral) 클러스터 vs. 상시(Persistent) 클러스터
1-1. 에페멀(Ephemeral) 클러스터
작업 시점마다 클러스터를 새로 생성 후, 모든 스텝(작업)이 끝나면 즉시 종료함.
장점: 사용 시간만큼만 비용이 발생하고, 클러스터 상태가 항상 ‘깨끗한’ 상태에서 시작.
단점: 클러스터 생성에 소요되는 시간이 있기 때문에, 자주/짧은 주기로 실행하면 오버헤드가 커질 수 있음.
1-2. 상시(Persistent) 클러스터
이미 띄워져 있는 클러스터에 계속 스텝을 추가해 사용하는 방식임.
장점: 스텝 추가 시간이 빠르고, 클러스터 생성 오버헤드가 없음.
단점: 클러스터가 항상 켜져 있어 비용이 추가되고, 여러 파이프라인이 동시에 스텝을 추가하면 리소스 충돌/병목에 대한 고려가 필요함.
1-3. 결정 팁
일 단위 등 배치성으로 일정 시간에만 작업한다면, 에페멀 방식을 추천.
자주(수시간 간격 등) 작업하거나, 클러스터가 상시 필요하다면 상시 클러스터 방식도 고려.
2. PySpark 스크립트를 EMR 스텝으로 정의하기
EMR 스텝은 보통 HadoopJarStep이라는 형식으로 정의됨.
Spark 작업을 실행할 때는 command-runner.jar와 spark-submit이 핵심임.
PySpark 스크립트를 S3에 두고, spark-submit 명령으로 실행하도록 만듬.
예를 들면 다음과 같음.
SPARK_STEPS = [
{
'Name': 'Pyspark Job 1',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'--deploy-mode', 'cluster',
'--master', 'yarn', # 보통 yarn
's3://my-bucket/pyspark-scripts/job1.py',
's3://my-bucket/input-data/job1/',
's3://my-bucket/output-data/job1/'
]
}
},
{
'Name': 'Pyspark Job 2',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'--deploy-mode', 'cluster',
's3://my-bucket/pyspark-scripts/job2.py',
'--input-path', 's3://my-bucket/input-data/job2/',
'--output-path', 's3://my-bucket/output-data/job2/',
'--some-arg', 'xyz'
]
}
},
# 필요에 따라 더 추가 가능
]
ActionOnFailure 파라미터
스텝이 실패했을 때 EMR이 취할 동작을 지정.
TERMINATE_CLUSTER, CANCEL_AND_WAIT, CONTINUE 등.
보통 여러 스텝을 차례로 실행하고 싶다면 CONTINUE 또는 CANCEL_AND_WAIT를 사용.
단일 스텝이 실패하면 전체 클러스터를 종료해야 한다면 TERMINATE_CLUSTER를 사용.
여러 스텝을 한 번의 EmrAddStepsOperator 호출에 넘겨도 되고, 여러 번 호출해도 되지만 보통은 한 번에 등록하는 편이 단순함.
Airflow DAG 예시
아래 코드는 하나의 DAG 안에서 클러스터 생성 → 여러 PySpark 스크립트를 스텝으로 추가 → 스텝 완료 감시 → 클러스터 종료 순으로 구성한 대표적인 예시임.
from airflow import DAG
from airflow.providers.amazon.aws.operators.emr import (
EmrCreateJobFlowOperator,
EmrAddStepsOperator,
EmrTerminateJobFlowOperator
)
from airflow.providers.amazon.aws.sensors.emr import EmrStepSensor
from airflow.utils.dates import days_ago
# 1) 여러 PySpark 스크립트를 각기 다른 스텝으로 정의
SPARK_STEPS = [
{
'Name': 'Pyspark Job 1',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'--deploy-mode', 'cluster',
's3://my-bucket/pyspark-scripts/job1.py',
's3://my-bucket/input-data/job1/',
's3://my-bucket/output-data/job1/'
]
}
},
{
'Name': 'Pyspark Job 2',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'--deploy-mode', 'cluster',
's3://my-bucket/pyspark-scripts/job2.py',
's3://my-bucket/input-data/job2/',
's3://my-bucket/output-data/job2/'
]
}
}
]
# 2) EMR 클러스터 생성 시에 필요한 설정 (예: 6.7.0 버전)
JOB_FLOW_OVERRIDES = {
'Name': 'MyEMRClusterForMultiplePySparkJobs',
'ReleaseLabel': 'emr-6.7.0',
'Instances': {
'InstanceGroups': [
{
'Name': 'Master node',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1
},
{
'Name': 'Core - 2',
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'm5.xlarge',
'InstanceCount': 2
}
],
'KeepJobFlowAliveWhenNoSteps': True,
'Ec2KeyName': 'your-key',
'Ec2SubnetId': 'subnet-xxxxxxx'
},
'Applications': [
{'Name': 'Hadoop'},
{'Name': 'Spark'}
],
'VisibleToAllUsers': True,
# 기타 옵션들...
}
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}
with DAG(
'emr_multiple_pyspark_example',
default_args=default_args,
schedule_interval=None,
catchup=False,
) as dag:
# (1) 클러스터 생성
create_emr_cluster = EmrCreateJobFlowOperator(
task_id='create_emr_cluster',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default'
)
# (2) 여러 PySpark 스텝 추가
add_steps = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ ti.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
steps=SPARK_STEPS,
aws_conn_id='aws_default'
)
# (3) 스텝 결과 감시
# - EmrAddStepsOperator는 스텝 ID들의 리스트를 XCom으로 반환
# - 예: [stepId1, stepId2, ...]
watch_first_step = EmrStepSensor(
task_id='watch_first_step',
job_flow_id="{{ ti.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
step_id="{{ ti.xcom_pull(task_ids='add_steps', key='return_value')[0] }}", # 0번 스텝
aws_conn_id='aws_default'
)
watch_second_step = EmrStepSensor(
task_id='watch_second_step',
job_flow_id="{{ ti.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
step_id="{{ ti.xcom_pull(task_ids='add_steps', key='return_value')[1] }}", # 1번 스텝
aws_conn_id='aws_default'
)
# (4) 모든 스텝 끝난 후 클러스터 종료
terminate_emr_cluster = EmrTerminateJobFlowOperator(
task_id='terminate_emr_cluster',
job_flow_id="{{ ti.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
aws_conn_id='aws_default'
)
# DAG 내 Task 의존관계 설정
create_emr_cluster >> add_steps
add_steps >> watch_first_step >> watch_second_step
watch_second_step >> terminate_emr_cluster
이 DAG를 실행하면 다음과 같음.
1. 새 클러스터를 만들고(create_emr_cluster)
2. 스텝 2개(PySpark 스크립트 2개)를 한 번에 등록(add_steps)
3. 등록된 순서대로 Job1, Job2가 실행된다. watch_first_step, watch_second_step 센서가 완료 상태를 감시
4. 두 작업이 모두 성공하면 terminate_emr_cluster에서 클러스터가 종료
실행 및 모니터링 전략
1. Airflow UI
Airflow UI에서 DAG를 Trigger 후, 태스크 상태를 확인할 수 있음.
EmrStepSensor가 ‘running’ 상태로 대기하면 EMR 콘솔에서 스텝 로그를 확인하거나 Spark UI를 통해 진행 상황을 모니터링함.
2. EMR 콘솔/CloudWatch
EMR 콘솔의 클러스터 상세 정보에서 각 스텝의 실행 로그, Spark UI, 드라이버/실행자 로그 등을 추적할 수 있음.
CloudWatch 로그를 통해 드라이버/Executor 에러 메시지 등을 확인 가능함.
3. logs 파라미터
EMR 클러스터 생성 시 LogUri를 지정해서 S3에 로그가 저장되도록 하거나, CloudWatch Logs로 리다이렉션 설정을 할 수도 있음.
DAG에서 PySpark 에러를 확인해야 할 때는 S3/CloudWatch Logs를 함께 살펴봐야 함.
4. 스텝 모니터링
여러 스텝을 동시에 추가해도, EMR의 기본 동작은 순차로 실행(스텝 간에는 기본적으로 FIFO)하지만, 필요에 따라 병렬 설정도 가능함(단, 스텝 실행 정책에 따라 다름).
Airflow 관점에서는 각 스텝마다 EmrStepSensor로 상태를 감시하여, 실패 시 DAG에서 에러를 발생시키는 식으로 제어할 수 있음.
병렬 실행 고려
EMR의 기본 스텝 실행은 순차(FIFO)지만, Spark on YARN 기준으로는 병렬화가 가능하도록 설정할 수도 있음.
단, EMR에서 “스텝 병렬화”를 별도 설정해야 하며, 리소스가 충분해야 병렬로 돌아갈 수 있음.
병렬로 돌리면 EMR 클러스터의 코어/메모리 리소스가 여러 스텝 간에 분산할 수 있으므로, Executor 메모리, 코어 수, 동시성 등을 충분히 계산해둬야 함.
모범 사례
1. 에페멀 vs. 상시 클러스터
배치성 작업이라면 보통 에페멀 방식(작업 후 클러스터 종료)이 비용 면에서 유리함.
단, 짧은 간격으로 많이 실행한다면 상시 클러스터가 유리할 때도 있음.
2. KeepJobFlowAliveWhenNoSteps
에페멀 방식에서 스텝을 모두 실행해도 더 스텝을 추가해야 할 가능성이 있다면, 클러스터를 자동 종료하지 않고 대기 상태로 유지할 수 있음.
이 값을 False로 설정하면 마지막 스텝이 끝나면 EMR이 자동 종료됨.
3. ActionOnFailure
여러 PySpark 스텝 중 하나라도 실패하면 즉시 클러스터를 종료할지, 아니면 넘어갈지를 ActionOnFailure에 맞춰 설정함.
DAG 레벨에서 재시도(retries, retry_delay)를 걸어두는 것도 좋은 방법임(네트워크 문제 등으로 일시 실패 시 재시도).
4. 템플릿, XCom, 매크로
steps 내부 인자에 Airflow 매크로(예: {{ ds }} 등)나 다른 태스크의 XCom 값을 템플릿으로 주입해, PySpark 스크립트에서 날짜별 S3 경로를 다르게 설정 가능.
5. Deferrable Operator
Airflow 2.2+ 및 AWS Provider 버전이 적절히 지원한다면, EmrAddStepsOperator와 EmrStepSensor의 “Deferrable” 버전을 고려해볼 수 있음.
이를 통해 작업(센서)이 대기 중일 때 작업자 슬롯을 차지하지 않고, 이벤트 기반으로 재개되어 자원 효율을 높일 수 있음.
6. IAM 권한
Airflow가 사용하는 AWS Connection ID(aws_conn_id)의 IAM Role/Policy에 AddJobFlowSteps, DescribeStep, ListSteps, TerminateJobFlows 등 적절한 권한이 있어야 함.
7. 메트릭/로깅
PySpark 코드를 잘 로깅하거나, Spark UI/CloudWatch와 연동해서 성능 및 에러 정보를 추적함.
정리
Airflow에서 여러 개의 PySpark 스크립트를 EMR로 실행하려면,
클러스터 생성 (또는 기존 클러스터 재사용),
여러 PySpark 스크립트를 각각의 스텝으로 정의해 EmrAddStepsOperator로 전달,
각 스텝 완료 여부는 EmrStepSensor로 모니터링,
필요시 클러스터 종료
로 구성하는 것이 일반적임.
스텝을 여러 개 등록함으로써 PySpark 잡들을 순차 혹은 병렬로 자동 실행할 수 있으며, 실패 처리를 어떻게 할지(ActionOnFailure), 클러스터를 언제 종료할지, IAM 권한, 로그 모니터링 등을 철저히 관리해야 함.
이러한 방식을 통해, Airflow를 중심으로 EMR 기반 PySpark 작업을 일관된 워크플로우로 자동화하면서, 확장성과 유지보수성을 높일 수 있음.
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow] EMR에서 Step의 개념 (0) | 2025.04.04 |
---|---|
[Airflow] EmrAddStepsOperator 개념 (0) | 2025.04.04 |
[Airflow] 컴포넌트, MWAA (0) | 2024.05.15 |
[Airflow] 에어플로우 DAG란? (0) | 2023.04.02 |
[Airflow] 데이터 파이프라인이란? (0) | 2023.01.09 |
Airflow를 통해 EMR에 Pyspark 여러개 실행하는 방법
아파치 에어플로우(Apache Airflow)에서 EMR(Elastic MapReduce) 클러스터를 사용해 여러 개의 PySpark 작업을 실행하는 전반적인 방법과 모범사례는 아래와 같음.
여기서는 각각의 PySpark 작업을 EMR 스텝으로 등록하고 실행하는 시나리오를 다룸.
기본 개념 정리
1. EMR 클러스터
AWS에서 Spark/Hadoop/Hive/Pig 등의 작업을 실행할 수 있는 클러스터 환경임.
EmrCreateJobFlowOperator로 클러스터를 생성하거나, 이미 띄워진 클러스터가 있다면 그 클러스터 ID를 사용하여 스텝을 추가할 수 있음.
2. EMR 스텝(Step)
EMR에서 실행하는 하나의 작업 단위를 스텝이라 부름.
예를 들어, Spark-submit 작업 1회.
PySpark 코드 또한 spark-submit을 통해 JAR 또는 .py 스크립트를 실행하므로, EMR 스텝 형태로 정의할 수 있음.
3. Airflow와 EMR
보통 Airflow DAG 안에서 다음 단계로 진행됨.
3-1. EMR 클러스터 생성 (EmrCreateJobFlowOperator)
3-2. PySpark 작업 추가 (EmrAddStepsOperator)
3-3. 스텝 완료 모니터링 (EmrStepSensor)
3-4. (필요시) 클러스터 종료 (EmrTerminateJobFlowOperator)
4. 여러 PySpark 작업 실행
여러 PySpark 스크립트를 한꺼번에 또는 단계적으로 실행하려면, EMR 스텝을 여러 개 정의해서 EmrAddStepsOperator에 전달하면 됨.
순차 실행(스텝 A가 끝나면 B, B가 끝나면 C 등) 또는 병렬 실행(클러스터 노드 리소스를 활용 가능한 수준에서 병렬) 구조로 구성 가능함.
EMR에서 PySpark 스크립트를 여러 개 실행하는 시나리오
1. 에페멀(Ephemeral) 클러스터 vs. 상시(Persistent) 클러스터
1-1. 에페멀(Ephemeral) 클러스터
작업 시점마다 클러스터를 새로 생성 후, 모든 스텝(작업)이 끝나면 즉시 종료함.
장점: 사용 시간만큼만 비용이 발생하고, 클러스터 상태가 항상 ‘깨끗한’ 상태에서 시작.
단점: 클러스터 생성에 소요되는 시간이 있기 때문에, 자주/짧은 주기로 실행하면 오버헤드가 커질 수 있음.
1-2. 상시(Persistent) 클러스터
이미 띄워져 있는 클러스터에 계속 스텝을 추가해 사용하는 방식임.
장점: 스텝 추가 시간이 빠르고, 클러스터 생성 오버헤드가 없음.
단점: 클러스터가 항상 켜져 있어 비용이 추가되고, 여러 파이프라인이 동시에 스텝을 추가하면 리소스 충돌/병목에 대한 고려가 필요함.
1-3. 결정 팁
일 단위 등 배치성으로 일정 시간에만 작업한다면, 에페멀 방식을 추천.
자주(수시간 간격 등) 작업하거나, 클러스터가 상시 필요하다면 상시 클러스터 방식도 고려.
2. PySpark 스크립트를 EMR 스텝으로 정의하기
EMR 스텝은 보통 HadoopJarStep이라는 형식으로 정의됨.
Spark 작업을 실행할 때는 command-runner.jar와 spark-submit이 핵심임.
PySpark 스크립트를 S3에 두고, spark-submit 명령으로 실행하도록 만듬.
예를 들면 다음과 같음.
SPARK_STEPS = [
{
'Name': 'Pyspark Job 1',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'--deploy-mode', 'cluster',
'--master', 'yarn', # 보통 yarn
's3://my-bucket/pyspark-scripts/job1.py',
's3://my-bucket/input-data/job1/',
's3://my-bucket/output-data/job1/'
]
}
},
{
'Name': 'Pyspark Job 2',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'--deploy-mode', 'cluster',
's3://my-bucket/pyspark-scripts/job2.py',
'--input-path', 's3://my-bucket/input-data/job2/',
'--output-path', 's3://my-bucket/output-data/job2/',
'--some-arg', 'xyz'
]
}
},
# 필요에 따라 더 추가 가능
]
ActionOnFailure 파라미터
스텝이 실패했을 때 EMR이 취할 동작을 지정.
TERMINATE_CLUSTER, CANCEL_AND_WAIT, CONTINUE 등.
보통 여러 스텝을 차례로 실행하고 싶다면 CONTINUE 또는 CANCEL_AND_WAIT를 사용.
단일 스텝이 실패하면 전체 클러스터를 종료해야 한다면 TERMINATE_CLUSTER를 사용.
여러 스텝을 한 번의 EmrAddStepsOperator 호출에 넘겨도 되고, 여러 번 호출해도 되지만 보통은 한 번에 등록하는 편이 단순함.
Airflow DAG 예시
아래 코드는 하나의 DAG 안에서 클러스터 생성 → 여러 PySpark 스크립트를 스텝으로 추가 → 스텝 완료 감시 → 클러스터 종료 순으로 구성한 대표적인 예시임.
from airflow import DAG
from airflow.providers.amazon.aws.operators.emr import (
EmrCreateJobFlowOperator,
EmrAddStepsOperator,
EmrTerminateJobFlowOperator
)
from airflow.providers.amazon.aws.sensors.emr import EmrStepSensor
from airflow.utils.dates import days_ago
# 1) 여러 PySpark 스크립트를 각기 다른 스텝으로 정의
SPARK_STEPS = [
{
'Name': 'Pyspark Job 1',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'--deploy-mode', 'cluster',
's3://my-bucket/pyspark-scripts/job1.py',
's3://my-bucket/input-data/job1/',
's3://my-bucket/output-data/job1/'
]
}
},
{
'Name': 'Pyspark Job 2',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'--deploy-mode', 'cluster',
's3://my-bucket/pyspark-scripts/job2.py',
's3://my-bucket/input-data/job2/',
's3://my-bucket/output-data/job2/'
]
}
}
]
# 2) EMR 클러스터 생성 시에 필요한 설정 (예: 6.7.0 버전)
JOB_FLOW_OVERRIDES = {
'Name': 'MyEMRClusterForMultiplePySparkJobs',
'ReleaseLabel': 'emr-6.7.0',
'Instances': {
'InstanceGroups': [
{
'Name': 'Master node',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1
},
{
'Name': 'Core - 2',
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'm5.xlarge',
'InstanceCount': 2
}
],
'KeepJobFlowAliveWhenNoSteps': True,
'Ec2KeyName': 'your-key',
'Ec2SubnetId': 'subnet-xxxxxxx'
},
'Applications': [
{'Name': 'Hadoop'},
{'Name': 'Spark'}
],
'VisibleToAllUsers': True,
# 기타 옵션들...
}
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}
with DAG(
'emr_multiple_pyspark_example',
default_args=default_args,
schedule_interval=None,
catchup=False,
) as dag:
# (1) 클러스터 생성
create_emr_cluster = EmrCreateJobFlowOperator(
task_id='create_emr_cluster',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default'
)
# (2) 여러 PySpark 스텝 추가
add_steps = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ ti.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
steps=SPARK_STEPS,
aws_conn_id='aws_default'
)
# (3) 스텝 결과 감시
# - EmrAddStepsOperator는 스텝 ID들의 리스트를 XCom으로 반환
# - 예: [stepId1, stepId2, ...]
watch_first_step = EmrStepSensor(
task_id='watch_first_step',
job_flow_id="{{ ti.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
step_id="{{ ti.xcom_pull(task_ids='add_steps', key='return_value')[0] }}", # 0번 스텝
aws_conn_id='aws_default'
)
watch_second_step = EmrStepSensor(
task_id='watch_second_step',
job_flow_id="{{ ti.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
step_id="{{ ti.xcom_pull(task_ids='add_steps', key='return_value')[1] }}", # 1번 스텝
aws_conn_id='aws_default'
)
# (4) 모든 스텝 끝난 후 클러스터 종료
terminate_emr_cluster = EmrTerminateJobFlowOperator(
task_id='terminate_emr_cluster',
job_flow_id="{{ ti.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
aws_conn_id='aws_default'
)
# DAG 내 Task 의존관계 설정
create_emr_cluster >> add_steps
add_steps >> watch_first_step >> watch_second_step
watch_second_step >> terminate_emr_cluster
이 DAG를 실행하면 다음과 같음.
1. 새 클러스터를 만들고(create_emr_cluster)
2. 스텝 2개(PySpark 스크립트 2개)를 한 번에 등록(add_steps)
3. 등록된 순서대로 Job1, Job2가 실행된다. watch_first_step, watch_second_step 센서가 완료 상태를 감시
4. 두 작업이 모두 성공하면 terminate_emr_cluster에서 클러스터가 종료
실행 및 모니터링 전략
1. Airflow UI
Airflow UI에서 DAG를 Trigger 후, 태스크 상태를 확인할 수 있음.
EmrStepSensor가 ‘running’ 상태로 대기하면 EMR 콘솔에서 스텝 로그를 확인하거나 Spark UI를 통해 진행 상황을 모니터링함.
2. EMR 콘솔/CloudWatch
EMR 콘솔의 클러스터 상세 정보에서 각 스텝의 실행 로그, Spark UI, 드라이버/실행자 로그 등을 추적할 수 있음.
CloudWatch 로그를 통해 드라이버/Executor 에러 메시지 등을 확인 가능함.
3. logs 파라미터
EMR 클러스터 생성 시 LogUri를 지정해서 S3에 로그가 저장되도록 하거나, CloudWatch Logs로 리다이렉션 설정을 할 수도 있음.
DAG에서 PySpark 에러를 확인해야 할 때는 S3/CloudWatch Logs를 함께 살펴봐야 함.
4. 스텝 모니터링
여러 스텝을 동시에 추가해도, EMR의 기본 동작은 순차로 실행(스텝 간에는 기본적으로 FIFO)하지만, 필요에 따라 병렬 설정도 가능함(단, 스텝 실행 정책에 따라 다름).
Airflow 관점에서는 각 스텝마다 EmrStepSensor로 상태를 감시하여, 실패 시 DAG에서 에러를 발생시키는 식으로 제어할 수 있음.
병렬 실행 고려
EMR의 기본 스텝 실행은 순차(FIFO)지만, Spark on YARN 기준으로는 병렬화가 가능하도록 설정할 수도 있음.
단, EMR에서 “스텝 병렬화”를 별도 설정해야 하며, 리소스가 충분해야 병렬로 돌아갈 수 있음.
병렬로 돌리면 EMR 클러스터의 코어/메모리 리소스가 여러 스텝 간에 분산할 수 있으므로, Executor 메모리, 코어 수, 동시성 등을 충분히 계산해둬야 함.
모범 사례
1. 에페멀 vs. 상시 클러스터
배치성 작업이라면 보통 에페멀 방식(작업 후 클러스터 종료)이 비용 면에서 유리함.
단, 짧은 간격으로 많이 실행한다면 상시 클러스터가 유리할 때도 있음.
2. KeepJobFlowAliveWhenNoSteps
에페멀 방식에서 스텝을 모두 실행해도 더 스텝을 추가해야 할 가능성이 있다면, 클러스터를 자동 종료하지 않고 대기 상태로 유지할 수 있음.
이 값을 False로 설정하면 마지막 스텝이 끝나면 EMR이 자동 종료됨.
3. ActionOnFailure
여러 PySpark 스텝 중 하나라도 실패하면 즉시 클러스터를 종료할지, 아니면 넘어갈지를 ActionOnFailure에 맞춰 설정함.
DAG 레벨에서 재시도(retries, retry_delay)를 걸어두는 것도 좋은 방법임(네트워크 문제 등으로 일시 실패 시 재시도).
4. 템플릿, XCom, 매크로
steps 내부 인자에 Airflow 매크로(예: {{ ds }} 등)나 다른 태스크의 XCom 값을 템플릿으로 주입해, PySpark 스크립트에서 날짜별 S3 경로를 다르게 설정 가능.
5. Deferrable Operator
Airflow 2.2+ 및 AWS Provider 버전이 적절히 지원한다면, EmrAddStepsOperator와 EmrStepSensor의 “Deferrable” 버전을 고려해볼 수 있음.
이를 통해 작업(센서)이 대기 중일 때 작업자 슬롯을 차지하지 않고, 이벤트 기반으로 재개되어 자원 효율을 높일 수 있음.
6. IAM 권한
Airflow가 사용하는 AWS Connection ID(aws_conn_id)의 IAM Role/Policy에 AddJobFlowSteps, DescribeStep, ListSteps, TerminateJobFlows 등 적절한 권한이 있어야 함.
7. 메트릭/로깅
PySpark 코드를 잘 로깅하거나, Spark UI/CloudWatch와 연동해서 성능 및 에러 정보를 추적함.
정리
Airflow에서 여러 개의 PySpark 스크립트를 EMR로 실행하려면,
클러스터 생성 (또는 기존 클러스터 재사용),
여러 PySpark 스크립트를 각각의 스텝으로 정의해 EmrAddStepsOperator로 전달,
각 스텝 완료 여부는 EmrStepSensor로 모니터링,
필요시 클러스터 종료
로 구성하는 것이 일반적임.
스텝을 여러 개 등록함으로써 PySpark 잡들을 순차 혹은 병렬로 자동 실행할 수 있으며, 실패 처리를 어떻게 할지(ActionOnFailure), 클러스터를 언제 종료할지, IAM 권한, 로그 모니터링 등을 철저히 관리해야 함.
이러한 방식을 통해, Airflow를 중심으로 EMR 기반 PySpark 작업을 일관된 워크플로우로 자동화하면서, 확장성과 유지보수성을 높일 수 있음.
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow] EMR에서 Step의 개념 (0) | 2025.04.04 |
---|---|
[Airflow] EmrAddStepsOperator 개념 (0) | 2025.04.04 |
[Airflow] 컴포넌트, MWAA (0) | 2024.05.15 |
[Airflow] 에어플로우 DAG란? (0) | 2023.04.02 |
[Airflow] 데이터 파이프라인이란? (0) | 2023.01.09 |