repartition()와 coalesce()의 역할과 차이점
repartition()과 coalesce()는 Spark에서 데이터 파티셔닝 개수(=파티션 수)를 재조정하는 함수임.
파티션은 Spark의 분산 처리 단위로, 적절한 파티션 수를 유지하면 클러스터 자원을 효율적으로 사용할 수 있음.
두 함수의 특징은 아래와 같음.
1. repartition(n)
데이터셋을 지정한 n개의 파티션으로 새롭게 분산.
파티션 수를 늘리거나 줄이는 모든 경우에 사용 가능.
셔플(Shuffle)이 항상 발생하기 때문에 비용이 높은 연산.
파티션을 분산시킬 때 무작위 랜덤 샘플링 방식으로 데이터를 재분배하므로 전체 데이터를 고르게 분산하려는 경우 유용.
예) df.repartition(10): DataFrame의 파티션을 10개로 만든다.
2. coalesce(n)
이미 존재하는 파티션을 지정한 n개 이하로만 줄여서 사용.
파티션 수를 줄이는 경우에만 사용 가능(늘릴 수 없음).
가능한 셔플을 피하고, 데이터가 인접한 노드/파티션에 남아있도록 하여 상대적으로 연산 비용이 낮음.
실제로 셔플이 전혀 발생하지 않으려면, 파티션 재배치가 필요 없는 물리적/논리적 조건이 맞아야 하지만, 일반적으로 기존 파티션 안에서의 재분배만 있으므로 repartition에 비해 셔플 비용이 훨씬 적음.
예) df.coalesce(5): DataFrame의 파티션 수를 5개로 줄인다.
어디서 어떻게 사용해야 하는가?
1. 입력 데이터 처리 직후 (Source Data Load 후)
Use Case: 데이터 소스(HDFS, RDB, S3 등)에서 불러온 직후, 파티션의 균일도를 맞추고자 할 때.
권장 방식: 만약 로드 후 데이터가 매우 불균일한 파티션을 가지고 있거나, 특정 한 파티션에만 데이터가 몰리는 경우가 있다면, repartition()을 사용해 파티션을 고르게 재분배해야 함.
예: 전체 데이터가 수백 GB이지만, 특정 파티션(키값에 따른 파티션)이 지나치게 크다면, 바로 df = df.repartition(최적 파티션 수)로 파티션을 균일화한다.
특히 데이터 양이 많고 클러스터가 크다면, 너무 적은 파티션 수로 시작하면 클러스터 자원이 비효율적으로 사용된다. 따라서 데이터 적재 직후 repartition()으로 충분히 파티션을 늘려 고르게 분산시키는 편이 좋음.
2. Join, Aggregation 등의 Shuffle 연산 직전/후
Spark 작업에서 가장 비용이 큰 연산은 셔플(Shuffle)이 필요한 연산(Join, GroupBy, Distinct 등)임.
셔플을 효율적으로 제어하면 전체 파이프라인 성능에 큰 차이를 만들 수 있음.
2-1. Join(조인) 직전
만약 join할 두 RDD/DataFrame의 파티션 개수가 크게 다르거나, 파티션의 키 분산이 불균등하여 성능 문제가 예상된다면, repartition()을 통해 사전에 키 중심의 파티션을 재정렬할 수 있음.
예를 들어, 이렇게 하면 실제 셔플 과정에서 파티셔닝이 한 번에 맞춰져서 재분배 비용이 절약될 수 있음.
df1 = df1.repartition("joinKey") # joinKey 기준으로 파티셔닝
df2 = df2.repartition("joinKey")
joined_df = df1.join(df2, "joinKey")
2-2. Aggregation(GroupBy) 직전
GroupBy가 이루어지는 키 분포가 편향(skew)되어 있다면, Spark 셔플 단계에서 병목이 발생함.
이 경우, "Salt 테크닉(가짜키 추가)"과 함께 적절히 repartition()을 사용해 키 분산을 유도할 수 있음.
단순히 파티션 수가 모자라서 셔플 스필이 많이 일어난다면, 파티션을 늘리기 위해 repartition()을 고려해볼 수 있음.
2-3. Aggregation 직후
GroupBy 결과물이 매우 작아진다면(최적화된 소수의 레코드만 남는다면), 이후 연산에서 필요 이상으로 많은 파티션이 유지될 수 있음.
이럴 때는 결과 DataFrame에 대해 coalesce()로 파티션을 줄여서 향후 연산 비용을 낮춤.
aggregated_df = df.groupBy("key").agg(...) # 결과 셋이 소수의 레코드라면 파티션을 줄여서 소량의 파티션만 유지
aggregated_df = aggregated_df.coalesce(1)
3. Wide Transformation 이후 (주로 Shuffle이 발생한 뒤)
map(), filter() 같은 Narrow Transformation 후에는 비교적 비용이 적으므로 굳이 파티션 재조정을 할 필요가 크지 않음.
하지만 repartitionByRange(), repartitionByCol(), sortBy(), join(), groupBy() 등 Wide Transformation이 일어난 뒤라면, 파티션을 다시 정리하는 것이 유용할 수 있음.
정말로 파티션 조정이 필요한지(너무 많아도, 너무 적어도 문제가 되므로)를 판단해야 함.
Wide Transformation이 끝난 결과 데이터가 예상보다 훨씬 작아졌으면, 파티션을 줄이기 위해 coalesce()를 적용해도 좋음.
4. 파일로 저장하기 직전 (Sink로 데이터 내보내기 전)
파일로 저장(.save() 또는 .write())하는 단계에서의 파티션 수는 실제 출력 파일 개수와 직결됨.
(예: parquet 포맷이면 파일이 여러 개 생성)
HDFS나 S3에 너무 작은 파일이 너무 많이 생기면 소위 “small file problem”이 발생해 파일 시스템의 메타데이터 관리 측면에서 비효율이 커짐.
반대로 너무 큰 단일 파티션만 존재해도 일부 노드에 과부하가 걸릴 수 있음.
그러므로 "적절한 파일 크기(보통 수백 MB~1GB 사이)"가 만들어지도록 coalesce()(또는 repartition())를 사용해 파티션 수를 조정한 뒤에 파일 출력을 하는 경우가 많음.
final_df = transformed_df.coalesce(10) # 적정 파티션 수
final_df.write.parquet("path/to/output")
성능 최적화를 위한 모범 사례 (Best Practices)
1. 파티션 늘리기는 repartition(), 줄이기는 coalesce()
파티션 수를 늘려야 한다면 repartition()을 사용함. (셔플 비용이 높지만 파티션이 고르게 분산됨)
파티션 수를 줄여야 한다면 coalesce()를 먼저 고려해야 함.(셔플 비용 최소화)
2. 한 번의 셔플로 최대한 많은 목적을 달성하기
Spark 애플리케이션 내에서 셔플은 가장 비용이 큰 연산 중 하나이므로, 단순히 파티션만 맞추기 위해서 별도로 여러 번 repartition()을 하지 않도록 주의함.
예: 어차피 join을 위해 키 기반 셔플이 일어나야 한다면, 그 과정에서 파티션 수와 키 분포를 미리 맞추는 식으로 작업 순서를 조정할 수 있음.
3. 적절한 파티션 수 계산
일반적으로는 작업을 실행해보면서 Stage별로 Task 수, Shuffle read/write 크기, 메모리 사용량, 실행 시간 등을 종합적으로 모니터링해야 함.
파티션 크기가 너무 커서 한 태스크가 메모리를 초과하면 OOM(Out Of Memory) 문제가 발생하고, 너무 작으면 Task가 지나치게 많아서 스케줄링/오버헤드가 커짐.
대략 “1개 파티션 ~ 수백 MB” 정도가 자주 사용되는 경험적 수치이지만, 이는 클러스터 스펙, 데이터 형태 등에 따라 달라짐.
4. Shuffle 파일이 디스크에 저장되는 상황 고려
큰 셔플이 발생하면 Spark는 데이터를 디스크(Shuffle 파일)에 씀.
repartition()을 자주 호출하면 Shuffle 파일을 여러 번 쓰고 읽기 때문에 I/O 부하가 커짐.
따라서 반복적인 repartition() 호출은 피해야 하며, 필요 최소한으로 유지함.
5. 데이터 스큐(편향) 문제 미리 파악
특정 파티션에 데이터가 몰리는 스큐(skew)가 발생할 것 같다면, 미리 repartition()을 사용하거나 Salt key를 사용하는 등의 전략으로 대응함.
coalesce()만으로는 데이터 스큐를 해소하기가 어려움.
스큐 자체를 해결하려면 repartition()으로 데이터를 무작위로 골고루 섞어주는 과정이 필요함.
정리
repartition(n): 파티션을 확실하게 n개로 재분배(늘리거나 줄임). 데이터 전수 셔플 발생.
coalesce(n): 파티션을 주어진 n개 이하로만 축소(줄이기만 가능). 최소한의 셔플 또는 무셔플(물리적 조건에 따라).
사용 위치/타이밍 요약은 다음과 같음.
1. 입력 데이터 적재 직후
데이터 파티션이 지나치게 편향되어 있거나 파티션 수가 너무 적다면 repartition()으로 균등화/확대.
2. Join, Aggregation 등의 Shuffle 직전
키 분산이 비효율적이면 repartition()을 통한 최적 파티셔닝으로 병목 예방.
3. 주요 Wide Transformation(Join, GroupBy) 이후
결과 데이터 크기가 줄어들었다면 coalesce()로 파티션 개수를 합리적으로 축소.
4. 최종 파일 쓰기 직전
“small file problem” 방지를 위해 coalesce()나 repartition()으로 적절한 파일 개수 결정.
최종적으로, 데이터의 볼륨, 노드 스펙, 네트워크 대역폭, 조인의 종류(Shuffle Join vs. Broadcast Join), 스큐 상황 등을 종합적으로 고려하여 최소한의 Shuffle로 최대한의 최적 파티셔닝을 달성하는 것이 핵심임.
'Data Engineering > Spark' 카테고리의 다른 글
[Spark] Apache Spark 개념 (0) | 2025.03.30 |
---|---|
[Spark] Disk Spill (0) | 2025.03.28 |
[Spark] SparkSQL의 Window함수 종류 (1) | 2025.03.22 |
[Spark] Apache Spark 구조 (2) | 2025.03.15 |
[Spark] Apache Iceberg (2) | 2025.03.14 |
repartition()와 coalesce()의 역할과 차이점
repartition()과 coalesce()는 Spark에서 데이터 파티셔닝 개수(=파티션 수)를 재조정하는 함수임.
파티션은 Spark의 분산 처리 단위로, 적절한 파티션 수를 유지하면 클러스터 자원을 효율적으로 사용할 수 있음.
두 함수의 특징은 아래와 같음.
1. repartition(n)
데이터셋을 지정한 n개의 파티션으로 새롭게 분산.
파티션 수를 늘리거나 줄이는 모든 경우에 사용 가능.
셔플(Shuffle)이 항상 발생하기 때문에 비용이 높은 연산.
파티션을 분산시킬 때 무작위 랜덤 샘플링 방식으로 데이터를 재분배하므로 전체 데이터를 고르게 분산하려는 경우 유용.
예) df.repartition(10): DataFrame의 파티션을 10개로 만든다.
2. coalesce(n)
이미 존재하는 파티션을 지정한 n개 이하로만 줄여서 사용.
파티션 수를 줄이는 경우에만 사용 가능(늘릴 수 없음).
가능한 셔플을 피하고, 데이터가 인접한 노드/파티션에 남아있도록 하여 상대적으로 연산 비용이 낮음.
실제로 셔플이 전혀 발생하지 않으려면, 파티션 재배치가 필요 없는 물리적/논리적 조건이 맞아야 하지만, 일반적으로 기존 파티션 안에서의 재분배만 있으므로 repartition에 비해 셔플 비용이 훨씬 적음.
예) df.coalesce(5): DataFrame의 파티션 수를 5개로 줄인다.
어디서 어떻게 사용해야 하는가?
1. 입력 데이터 처리 직후 (Source Data Load 후)
Use Case: 데이터 소스(HDFS, RDB, S3 등)에서 불러온 직후, 파티션의 균일도를 맞추고자 할 때.
권장 방식: 만약 로드 후 데이터가 매우 불균일한 파티션을 가지고 있거나, 특정 한 파티션에만 데이터가 몰리는 경우가 있다면, repartition()을 사용해 파티션을 고르게 재분배해야 함.
예: 전체 데이터가 수백 GB이지만, 특정 파티션(키값에 따른 파티션)이 지나치게 크다면, 바로 df = df.repartition(최적 파티션 수)로 파티션을 균일화한다.
특히 데이터 양이 많고 클러스터가 크다면, 너무 적은 파티션 수로 시작하면 클러스터 자원이 비효율적으로 사용된다. 따라서 데이터 적재 직후 repartition()으로 충분히 파티션을 늘려 고르게 분산시키는 편이 좋음.
2. Join, Aggregation 등의 Shuffle 연산 직전/후
Spark 작업에서 가장 비용이 큰 연산은 셔플(Shuffle)이 필요한 연산(Join, GroupBy, Distinct 등)임.
셔플을 효율적으로 제어하면 전체 파이프라인 성능에 큰 차이를 만들 수 있음.
2-1. Join(조인) 직전
만약 join할 두 RDD/DataFrame의 파티션 개수가 크게 다르거나, 파티션의 키 분산이 불균등하여 성능 문제가 예상된다면, repartition()을 통해 사전에 키 중심의 파티션을 재정렬할 수 있음.
예를 들어, 이렇게 하면 실제 셔플 과정에서 파티셔닝이 한 번에 맞춰져서 재분배 비용이 절약될 수 있음.
df1 = df1.repartition("joinKey") # joinKey 기준으로 파티셔닝
df2 = df2.repartition("joinKey")
joined_df = df1.join(df2, "joinKey")
2-2. Aggregation(GroupBy) 직전
GroupBy가 이루어지는 키 분포가 편향(skew)되어 있다면, Spark 셔플 단계에서 병목이 발생함.
이 경우, "Salt 테크닉(가짜키 추가)"과 함께 적절히 repartition()을 사용해 키 분산을 유도할 수 있음.
단순히 파티션 수가 모자라서 셔플 스필이 많이 일어난다면, 파티션을 늘리기 위해 repartition()을 고려해볼 수 있음.
2-3. Aggregation 직후
GroupBy 결과물이 매우 작아진다면(최적화된 소수의 레코드만 남는다면), 이후 연산에서 필요 이상으로 많은 파티션이 유지될 수 있음.
이럴 때는 결과 DataFrame에 대해 coalesce()로 파티션을 줄여서 향후 연산 비용을 낮춤.
aggregated_df = df.groupBy("key").agg(...) # 결과 셋이 소수의 레코드라면 파티션을 줄여서 소량의 파티션만 유지
aggregated_df = aggregated_df.coalesce(1)
3. Wide Transformation 이후 (주로 Shuffle이 발생한 뒤)
map(), filter() 같은 Narrow Transformation 후에는 비교적 비용이 적으므로 굳이 파티션 재조정을 할 필요가 크지 않음.
하지만 repartitionByRange(), repartitionByCol(), sortBy(), join(), groupBy() 등 Wide Transformation이 일어난 뒤라면, 파티션을 다시 정리하는 것이 유용할 수 있음.
정말로 파티션 조정이 필요한지(너무 많아도, 너무 적어도 문제가 되므로)를 판단해야 함.
Wide Transformation이 끝난 결과 데이터가 예상보다 훨씬 작아졌으면, 파티션을 줄이기 위해 coalesce()를 적용해도 좋음.
4. 파일로 저장하기 직전 (Sink로 데이터 내보내기 전)
파일로 저장(.save() 또는 .write())하는 단계에서의 파티션 수는 실제 출력 파일 개수와 직결됨.
(예: parquet 포맷이면 파일이 여러 개 생성)
HDFS나 S3에 너무 작은 파일이 너무 많이 생기면 소위 “small file problem”이 발생해 파일 시스템의 메타데이터 관리 측면에서 비효율이 커짐.
반대로 너무 큰 단일 파티션만 존재해도 일부 노드에 과부하가 걸릴 수 있음.
그러므로 "적절한 파일 크기(보통 수백 MB~1GB 사이)"가 만들어지도록 coalesce()(또는 repartition())를 사용해 파티션 수를 조정한 뒤에 파일 출력을 하는 경우가 많음.
final_df = transformed_df.coalesce(10) # 적정 파티션 수
final_df.write.parquet("path/to/output")
성능 최적화를 위한 모범 사례 (Best Practices)
1. 파티션 늘리기는 repartition(), 줄이기는 coalesce()
파티션 수를 늘려야 한다면 repartition()을 사용함. (셔플 비용이 높지만 파티션이 고르게 분산됨)
파티션 수를 줄여야 한다면 coalesce()를 먼저 고려해야 함.(셔플 비용 최소화)
2. 한 번의 셔플로 최대한 많은 목적을 달성하기
Spark 애플리케이션 내에서 셔플은 가장 비용이 큰 연산 중 하나이므로, 단순히 파티션만 맞추기 위해서 별도로 여러 번 repartition()을 하지 않도록 주의함.
예: 어차피 join을 위해 키 기반 셔플이 일어나야 한다면, 그 과정에서 파티션 수와 키 분포를 미리 맞추는 식으로 작업 순서를 조정할 수 있음.
3. 적절한 파티션 수 계산
일반적으로는 작업을 실행해보면서 Stage별로 Task 수, Shuffle read/write 크기, 메모리 사용량, 실행 시간 등을 종합적으로 모니터링해야 함.
파티션 크기가 너무 커서 한 태스크가 메모리를 초과하면 OOM(Out Of Memory) 문제가 발생하고, 너무 작으면 Task가 지나치게 많아서 스케줄링/오버헤드가 커짐.
대략 “1개 파티션 ~ 수백 MB” 정도가 자주 사용되는 경험적 수치이지만, 이는 클러스터 스펙, 데이터 형태 등에 따라 달라짐.
4. Shuffle 파일이 디스크에 저장되는 상황 고려
큰 셔플이 발생하면 Spark는 데이터를 디스크(Shuffle 파일)에 씀.
repartition()을 자주 호출하면 Shuffle 파일을 여러 번 쓰고 읽기 때문에 I/O 부하가 커짐.
따라서 반복적인 repartition() 호출은 피해야 하며, 필요 최소한으로 유지함.
5. 데이터 스큐(편향) 문제 미리 파악
특정 파티션에 데이터가 몰리는 스큐(skew)가 발생할 것 같다면, 미리 repartition()을 사용하거나 Salt key를 사용하는 등의 전략으로 대응함.
coalesce()만으로는 데이터 스큐를 해소하기가 어려움.
스큐 자체를 해결하려면 repartition()으로 데이터를 무작위로 골고루 섞어주는 과정이 필요함.
정리
repartition(n): 파티션을 확실하게 n개로 재분배(늘리거나 줄임). 데이터 전수 셔플 발생.
coalesce(n): 파티션을 주어진 n개 이하로만 축소(줄이기만 가능). 최소한의 셔플 또는 무셔플(물리적 조건에 따라).
사용 위치/타이밍 요약은 다음과 같음.
1. 입력 데이터 적재 직후
데이터 파티션이 지나치게 편향되어 있거나 파티션 수가 너무 적다면 repartition()으로 균등화/확대.
2. Join, Aggregation 등의 Shuffle 직전
키 분산이 비효율적이면 repartition()을 통한 최적 파티셔닝으로 병목 예방.
3. 주요 Wide Transformation(Join, GroupBy) 이후
결과 데이터 크기가 줄어들었다면 coalesce()로 파티션 개수를 합리적으로 축소.
4. 최종 파일 쓰기 직전
“small file problem” 방지를 위해 coalesce()나 repartition()으로 적절한 파일 개수 결정.
최종적으로, 데이터의 볼륨, 노드 스펙, 네트워크 대역폭, 조인의 종류(Shuffle Join vs. Broadcast Join), 스큐 상황 등을 종합적으로 고려하여 최소한의 Shuffle로 최대한의 최적 파티셔닝을 달성하는 것이 핵심임.
'Data Engineering > Spark' 카테고리의 다른 글
[Spark] Apache Spark 개념 (0) | 2025.03.30 |
---|---|
[Spark] Disk Spill (0) | 2025.03.28 |
[Spark] SparkSQL의 Window함수 종류 (1) | 2025.03.22 |
[Spark] Apache Spark 구조 (2) | 2025.03.15 |
[Spark] Apache Iceberg (2) | 2025.03.14 |