Adaptive Query Execution
Apache Spark 3.0부터 본격적으로 도입된 Adaptive Query Execution(AQE)는 런타임에 실행 계획을 동적으로 최적화함으로써, 기존의 정적(Static) 최적화가 갖는 한계를 보완하고 Spark SQL 성능을 크게 향상시키는 기술임.
일반적으로 Spark SQL은 쿼리를 제출할 때 Catalyst 옵티마이저를 통해 논리적/물리적 계획을 최적화하지만, 쿼리 실행 이전에 추정된 통계 정보(Statistics) 를 기반으로 계획이 결정됨.
그러나 빅데이터 환경에서는 데이터 분포가 매우 다양하여 사전에 추정된 통계가 실제와 많이 다를 수 있기 때문에, 기존 정적 최적화만으로는 완벽한 성능을 보장하기 어려움.
이 문제를 해결하기 위해 AQE는 실행 중에 각 Stage가 완료될 때마다 실제 런타임 통계(Runtime Statistics)를 수집하고, 이를 바탕으로 다음 Stage의 물리적 실행 계획을 동적으로 조정할 수 있게 해줌.
AQE의 주요 목표와 특징
1. 런타임 기반 최적화
기존 정적 방식에서는 Spark가 Catalyst 옵티마이저를 통해 쿼리 계획을 한 번에 확정함.
반면 AQE는 한 Stage가 끝날 때마다 실제 처리한 데이터 양, 파티션 통계, Skew 상태 등을 확인해, 다음 Stage의 계획을 재설계함.
2. 동적 파티션 병합
Shuffle 이후, 실제로 생성된 파티션 중 어떤 파티션은 매우 작고, 어떤 파티션은 매우 큰 데이터 스큐(skew)를 갖는 경우가 있음.
AQE는 “너무 작은 파티션”을 적절히 병합해 파티션 수를 줄이고, “너무 큰 파티션(스큐 파티션)”을 나누거나, Skew Join 전략을 적용해 처리 시간을 단축함.
3. 스큐(데이터 편중) 자동 처리
Spark SQL에서 Join 시, 특정 키에 데이터가 과도하게 몰려 있으면 해당 파티션을 처리하는 Task가 병목이 되어 성능이 크게 저하됨.
AQE는 런타임에 각 파티션의 사이즈를 확인해, 특정 파티션이 일정 임계값을 초과하면 스큐 파티션으로 분류하고, 이를 여러 파티션으로 재분할하거나 Skew Join 최적화 전략(broadcast hash join 또는 ‘shuffle and split’)을 적용해 부하를 분산함.
4. 동적 Join 전략 변경
쿼리 제출 시점에는 Broadcast Join으로 결정된 플랜이었어도, 막상 실제 런타임에 보니 해당 테이블이 예상보다 큰 경우가 있을 수 있음.
AQE는 “Broadcast가 불가능”한 크기의 테이블을 감지하면, Sort-Merge Join 등 다른 Join 알고리즘으로 자동 변환하여 오버헤드를 줄임.
또는 그 반대도 가능함.
AQE의 내부 동작 메커니즘
1. 일반적인 쿼리 실행 흐름
사용자가 Spark SQL 쿼리를 제출하면, Catalyst 옵티마이저가 논리적/물리적 계획을 생성함.
Shuffle 경계를 기준으로 Job이 여러 Stage로 분할되어, 순차적으로 실행하게 됨.
각 Stage가 완료될 때, 해당 Stage의 Shuffle 출력 파일 정보(메타데이터, 파티션 사이즈 등)가 Spark Driver로 전송됨.
2. 런타임 통계 수집
Stage가 실행을 마친 뒤, Spark는 Shuffle 파일에 대한 실제 사이즈, 파티션 분포, 레코드 수 등을 수집함.
이러한 “런타임 통계”가 AQE 최적화의 핵심 입력값이 됨.
3. 재최적화(Re-Optimization)
AQE가 활성화되어 있으면, Driver의 AQE 컴포넌트가 실행 중 ‘QueryStage’ 단위로 나뉘어 있는 물리적 실행 계획을 확인함.
이전 Stage의 결과물(실제 파티션 분포)을 보고, 아직 실행되지 않은 Stage(서브쿼리나 Join 등)의 계획을 재작성(Rewrite) 할 수 있는지 검사함.
스큐 파티션이 발견되면 스큐 처리 로직을 적용하거나, 파티션이 지나치게 많으면 파티션을 통합(Merge)하여 Task 수를 줄이거나, Broadcast Join이 가능한 사이즈로 확인되면 Join 전략을 변경하는 등의 최적화를 수행함.
4. 최종 실행
재최적화된 물리적 계획은 다음 Stage 실행에 바로 적용됨.
만약 여러 Stage가 순차적으로 실행된다면, 각 Stage 완료 시점마다 순차적으로 최적화 과정을 반복해나가면서 전체 쿼리를 최적화함.
AQE를 통한 대표적 최적화 기법
1. Shuffle 파티션 자동 병합(Coalesce)
spark.sql.adaptive.coalescePartitions.enabled 옵션을 통해 활성화됨.
Shuffle 이후 실제 파티션 사이즈가 너무 작은 경우 여러 파티션을 합쳐서, 너무 많은 Task가 생기는 비효율을 줄임.
반대로, 너무 큰 파티션이라면 스큐 조정 로직과 결합되어 파티션을 적절히 조정함.
2. Skew Join 자동 처리
spark.sql.adaptive.skewJoin.enabled 옵션을 통해 활성화됨.
일정 임계값(spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes) 이상으로 크기가 몰린 파티션은 여러 조각으로 나누어 병렬 처리할 수 있게 함.
Spark 3.x에서 Skew Join 자동 처리로 인해, 원래라면 한 Task가 처리해야 했던 거대한 파티션을 여러 Task가 나눠 처리해 성능 향상을 이끌어냄.
3. 동적 Join 전략 변경(Dynamic Switching)
spark.sql.adaptive.enabled 옵션이 켜져 있을 때, Broadcast Join 가능 여부가 런타임에 감지됨.
쿼리 계획 단계에서 테이블 A를 브로드캐스트하기로 결정했지만, 실제 사이즈가 Broadcast 가능한 임계값(spark.sql.autoBroadcastJoinThreshold) 이하일 때만 적용함.
반대로, 정적 계획에서 브로드캐스트를 사용하지 않기로 했어도, 런타임에 테이블이 충분히 작다는 것이 확인되면 Broadcast Join으로 바꿔 성능을 높일 수 있음.
4. Partial / Incremental Re-Optimization
Stage가 끝날 때마다 전체 쿼리를 처음부터 다시 최적화하는 것이 아니라, 필요한 부분(해당 Stage 이후의 계획)만 부분적으로 최적화함.
이로 인해 재최적화 비용(오버헤드)을 최소화하고, 성능 이점을 확보할 수 있음.
AQE 구성 및 옵션들
1. AQE 메인 옵션: spark.sql.adaptive.enabled (기본값: false → Spark 3.2+에서는 기본 true인 클러스터도 있음)
2. Coalesce Shuffle Partitions: spark.sql.adaptive.coalescePartitions.enabled
3. Skew Join 처리: spark.sql.adaptive.skewJoin.enabled
4. Broadcast Join Threshold: spark.sql.autoBroadcastJoinThreshold (Broadcast Join이 가능한 최대 테이블 크기; 기본 10MB)
이 외에도 AQE와 관련된 다양한 내부 파라미터가 있으며, Spark 버전 업그레이드에 따라 추가/변경되거나 기본값이 달라지기도 함.
AQE 사용 시 이점
1) 이점
1. 쿼리 성능 개선
스큐 문제를 자동으로 완화하고, 필요 이상의 파티션 수를 줄여서 Task Overhead도 감소시키므로, 전반적인 쿼리 시간이 크게 단축됨.
동적 Join 변경을 통해 최적의 알고리즘을 실행 시간에 결정할 수 있음.
2. 운영 편의성 향상
기존에는 정확한 통계를 얻기 어려운 환경에서, 스큐를 해소하기 위해 수작업 튜닝(스큐 키에 대한 해시 분산, Salting 등)을 해야 했음.
AQE를 활용하면 이러한 작업을 대부분 자동화할 수 있음.
3. 워크로드 적응형(Adaptive)
반복적으로 쿼리를 실행하는 배치 파이프라인이나 다양한 데이터 볼륨을 다루는 애드혹(Ad-hoc) 쿼리 환경에서, 데이터 특성 변동에 자동 대응이 가능함.
AQE 사용 시 주의사항
1. 오버헤드
Stage 완료 후 재최적화 과정을 거치는 데 따른 오버헤드가 존재함.
대규모 쿼리에서는 재최적화 역시 시간과 리소스를 사용함.
다만, 대부분 경우 오버헤드보다 성능 이점이 훨씬 크므로 효과적임.
2. BroadCast Join 한계
Broadcast Join 변경은 테이블 사이즈가 임계값 이하일 때만 가능하므로, 편중된 데이터 분포나 UDF 연산 등으로 사이즈가 증가하는 경우 예상과 다르게 동작할 수도 있음.
3. 하위 버전 및 호환성 이슈
Spark 3.x 이전 버전에서는 AQE가 충분히 안정화되지 않았거나 실험적 기능이었음.
AQE 적용 시, Spark 버전 호환성 및 특정 API/옵션이 바뀔 수 있으므로, 프로덕션에 도입할 때는 Spark 버전을 확인해야 함.
4. 실시간 스트리밍(Structured Streaming) 적용
Structured Streaming에서 AQE를 사용하는 것은 제한적이며, 버전마다 지원 여부가 다름.
점차 개선 중임.
정리
Adaptive Query Execution(AQE)는 Spark SQL 엔진이 런타임 정보를 활용해 실행 계획을 동적으로 변경함으로써, 데이터 스큐, 부적절한 파티션 개수, Join 전략 부적합 등의 문제를 자동으로 해결하고 성능을 극대화하는 기술임.
Spark 3.0 이후로 지속적으로 발전 중이며, 대규모 배치/쿼리 워크로드에서 실질적인 성능 이점을 제공하고 있음.
Key Idea는 다음과 같음.
쿼리 실행 도중(특히 Stage가 끝날 때), 실제 데이터 분포와 파티션 사이즈를 확인하고 필요하면 플랜을 수정함.
주요 최적화는 다음과 같음.
1. 파티션 병합(Coalesce)
2. 스큐 파티션 처리(Skew Join)
3. 동적 Join 전략 변경(Broadcast vs. Sort Merge 등)
AQE를 적절히 활용하면, Spark SQL에서 발생할 수 있는 대부분의 성능 병목을 자동화된 방식으로 완화할 수 있고, Spark 운영 및 튜닝에 필요한 수작업을 크게 줄일 수 있음.
다만, AQE를 사용할 때는 Spark 버전, 런타임 통계 품질, 적절한 파라미터 설정 등을 함께 고려해야 하며, 프로덕션 환경에서는 모니터링과 함께 점진적으로 적용하는 것이 좋음.
'Data Engineering > Spark' 카테고리의 다른 글
[Spark] Apache Spark의 Job, Stage, Task 구조 (0) | 2025.01.17 |
---|---|
[Spark] 스파크의 동작 원리 (0) | 2025.01.17 |
[Spark] 스파크에서 Shuffle 개념 (0) | 2025.01.17 |
[Spark] 클러스터 매니저 종류 (0) | 2025.01.17 |
[Spark] Executor 개념 (0) | 2025.01.17 |