코드 결과
def save_to_parquet(self, *, df: pd.DataFrame, s3_output_path: str) -> None:
df.to_parquet(
path=s3_output_path,
engine="pyarrow",
compression="snappy",
index=False,
)
설명
위 코드는 Pandas DataFrame을 Parquet 포맷으로 저장하되, S3 경로를 직접 지정해 업로드하는 기능을 수행함.
함수 시그니처
def save_to_parquet(self, *, df: pd.DataFrame, s3_output_path: str) -> None:
1. self: 보통 클래스 내부 메서드(인스턴스 메서드)를 정의할 때 사용함.
2. * (별표)는 키워드 전용 인자를 지정한다는 의미임.
즉, 이 함수를 호출할 때 df와 s3_output_path는 반드시 키워드 인자로 전달해야 함.
예를 들면, obj.save_to_parquet(df=my_df, s3_output_path="s3://...")
3. df: pd.DataFrame: 이 함수에 전달될 매개변수 df가 판다스의 DataFrame 타입임을 의미함
4. s3_output_path: str: S3 위치를 나타내는 문자열 경로.
예를 들면, "s3://my-bucket/my-folder/filename.parquet"
5. -> None: 함수가 반환값을 명시적으로 갖지 않는다는 의미의 타입 힌트(Python 3.6+)
df.to_parquet(...) 호출
df.to_parquet(
path=s3_output_path,
engine="pyarrow",
compression="snappy",
index=False,
)
1. df.to_parquet():
판다스(Pandas)에서 제공하는 메서드로, DataFrame을 Parquet 파일 형식으로 저장(직렬화)하는 함수임.
이때, path 인자에 S3 경로(s3://bucket-name/...)를 전달하면, 내부적으로 pyarrow 엔진을 통해 S3에 직접 쓰기가 가능함.
AWS 자격 증명은 일반적으로 boto3 설정이나 환경 변수 등을 통해 처리함.
2. engine="pyarrow":
Parquet 데이터를 읽고 쓰는 백엔드 엔진을 지정함.
Pandas에서는 주로 "pyarrow"(Apache Arrow)나 "fastparquet" 등을 사용할 수 있지만, "pyarrow"가 기능이나 호환성 면에서 더 보편적으로 권장됨.
3. compression="snappy":
Parquet 파일 내의 데이터 압축 방식(코덱)을 지정함.
"snappy"는 속도가 빠르고 효율도 비교적 높은 압축으로, 빅데이터 분석 환경에서 사실상 표준처럼 자주 사용됨.
그 외에도 "gzip", "brotli", "zstd" 등을 지정할 수 있는데, "snappy"가 속도와 압축률 간 밸런스가 좋은 편임.
4. index=False:
DataFrame의 인덱스(index) 컬럼을 별도로 저장하지 않도록 지정하는 옵션임.
주로 머신러닝이나 데이터 분석 시, 인덱스 컬럼이 필요 없으면 공간 절약을 위해 False로 두는 경우가 많음.
인덱스를 유지해야 한다면 True(또는 기본값)로 설정할 수도 있음.
S3 경로에 쓰기
Pandas ≥ 1.0.0 및 PyArrow ≥ 0.15.1 이상에서는 df.to_parquet()가 S3 URL(s3://...)을 직접 인식하여 파일을 쓸 수 있음.
내부적으로 PyArrow가 s3fs 또는 boto3를 사용해 AWS 인증을 수행하고, 해당 버킷 경로에 Parquet 파일을 업로드함.
실제로는 임시 파일 생성 없이 네트워크 스트림을 통해 바로 업로드가 이뤄짐.
반환 타입
-> None
이 함수는 아무 값도 반환하지 않는 단순 유틸 메서드임.
로직 완료 후, Parquet 파일이 S3에 정상적으로 생성되었다면 종료됨.
일반적으로는 I/O 작업이다 보니, 예외(IOError, OSError, boto3 관련 예외 등)가 발생할 수 있음.
실제 운영 환경에서는 적절한 예외 처리(try-except)나 로그(log) 처리가 필요할 수 있음.
사용 시나리오
예를 들어, DataFrame my_df가 있고, 이를 "s3://my-bucket/data/sales_data.parquet" 경로에 저장하고 싶다면 다음과 같이 작업할 수 있음.
my_instance.save_to_parquet(
df=my_df,
s3_output_path="s3://my-bucket/data/sales_data.parquet"
)
1. my_df.to_parquet(..., engine="pyarrow", compression="snappy")가 실행됨
2. Parquet 포맷으로 직렬화된 데이터가 S3의 "my-bucket/data/sales_data.parquet" 파일로 업로드됨
3. 함수는 별도의 리턴 없이 종료
확장 포인트
1. 분할 저장(Partitioning)
더 큰 데이터셋을 사용할 경우, 파티션 컬럼을 지정하거나 여러 파일로 나누어 저장할 수 있음.
예를 들면, df.to_parquet(path, partition_cols=["year", "month"], ...)
2. 메타데이터(Parquet Schema) 관리
데이터 스키마(컬럼명, 타입)를 파일에 저장하므로, Athena/Presto/Trino에서 외부 테이블로 읽거나, Spark DataFrame으로 바로 로딩할 수 있음.
3. 엔진 및 압축 알고리즘 선택
필요에 따라 "fastparquet" 엔진, "gzip", "zstd" 등 다른 압축 방식을 써볼 수 있음.
압축 수준과 알고리즘의 트레이드오프(압축률 vs. CPU 부하 vs. 속도)를 고려해야 함.
4. 권한 & 인증
S3에 쓰려면 IAM 권한이 필요함(s3:PutObject, s3:ListBucket 등).
로컬 개발 환경과 배포 환경에서 boto3 인증 자격(Creds, Role)이 어떻게 설정되는지 확인해야 함.
5. 에러 처리
네트워크 불안정이나 S3 권한 문제, 파티션 키 중복 등으로 예외가 발생할 수 있으므로, try-except로 상황별 예외 메시지를 로깅하고 재시도 로직 등을 둘 수 있음.
정리
결국 이 함수는 Pandas DataFrame을 Parquet 형식으로 S3에 손쉽게 업로드하는 헬퍼 메서드임.
PyArrow 엔진으로 Snappy 압축을 적용하여 Parquet 파일을 생성하고, 인덱스를 저장하지 않는 형태로 S3에 직렬화함.
사용 측면에서, 데이터 분석 결과나 전처리된 데이터를 서버리스/클라우드 분석(예: AWS Athena, EMR, Databricks 등) 환경에서 쉽게 읽을 수 있도록 Columnar 포맷으로 저장하기에 최적임.
코드 양은 짧지만, Parquet 스펙과 AWS S3 접근 로직이 내부적으로 처리되어 빅데이터 파이프라인 구현의 핵심 요소로 활용할 수 있음.
개선된 코드
1. Docstring으로 함수 의도, 파라미터, 반환 타입을 명확히 기술
2. Optional 파라미터(예: partition_cols, compression, index) 추가로 확장성 증대
3. 에러 처리(예: try-except) 및 로깅 추가
import logging
from typing import List, Optional
import pandas as pd
from pyarrow import ArrowInvalid
from botocore.exceptions import BotoCoreError, NoCredentialsError
logger = logging.getLogger(__name__)
def save_to_parquet(
self,
*,
df: pd.DataFrame,
s3_output_path: str,
partition_cols: Optional[List[str]] = None,
compression: str = "snappy",
index: bool = False
) -> None:
"""
Save a Pandas DataFrame to a Parquet file (or files) in an S3 path.
:param df: A Pandas DataFrame to be saved.
:param s3_output_path: S3 path where the Parquet file(s) will be written.
Example: "s3://my-bucket/data/my_table.parquet"
:param partition_cols: An optional list of column names by which the data
will be partitioned. This can create multiple files
organized by partition in S3.
:param compression: Compression codec to use. Default is "snappy".
:param index: Whether to include the DataFrame index as a column.
Default is False (do not include index).
:return: None. Writes Parquet file(s) to the specified S3 location.
:raises ValueError: If the DataFrame is empty or invalid.
:raises ArrowInvalid: If there is an issue with PyArrow conversion.
:raises BotoCoreError, NoCredentialsError: If there is an AWS credentials issue
or other Boto3-related error.
"""
if df.empty:
message = "DataFrame is empty. Nothing to save."
logger.warning(message)
raise ValueError(message)
logger.info(f"Saving DataFrame to Parquet. Rows: {len(df)}, Columns: {len(df.columns)}")
try:
df.to_parquet(
path=s3_output_path,
engine="pyarrow",
compression=compression,
index=index,
partition_cols=partition_cols if partition_cols else None
)
logger.info(f"DataFrame saved to {s3_output_path} successfully.")
except (ArrowInvalid, ValueError) as e:
logger.error(f"Failed to convert DataFrame to Parquet: {e}")
raise
except (BotoCoreError, NoCredentialsError) as e:
logger.error(f"Failed to upload Parquet file to S3: {e}")
raise
except Exception as e:
logger.exception(f"Unexpected error while saving Parquet file: {e}")
raise
주요 개선 사항
1. Docstring & 타입 힌트
함수의 목적, 파라미터, 반환 형태를 명시함으로써 협업·유지보수에 용이함.
2. 옵션 파라미터
partition_cols: 파티션 컬럼을 지정할 수 있어, 대규모 데이터를 효율적으로 쪼개서 저장 가능함.
compression: 기본 "snappy" 외에 "gzip", "zstd" 등 변경 가능함.
index: 필요하면 DataFrame의 인덱스를 저장할 수 있음.
3. 에러 처리
DataFrame이 비어있는 경우 ValueError를 발생시키고, 로깅하는 로직을 추가함.
ArrowInvalid, BotoCoreError, NoCredentialsError 등 PyArrow·AWS 관련 예외를 명시적으로 처리해 디버깅 편의성을 높임.
4. 로깅
작업 시작 전/후로 logger.info를 사용하여, 몇 행·열이 저장되는지, 성공 여부를 남김.
예외 발생 시 logger.error나 logger.exception을 통해 원인을 빠르게 파악할 수 있음.
5. 가독성 & 확장성
짧은 코드이지만, 메서드가 수행할 작업이 명확하게 구분되며, 추가 파라미터나 로직을 붙이기 쉬움.
이처럼 함수 인터페이스를 좀 더 유연하게 하고, 에러 처리와 로깅을 강화하면 운영 환경에서 문제 발생 시 추적과 진단이 훨씬 수월해짐.
'Programming Language > Python' 카테고리의 다른 글
[Python] Miniconda 주요 특징 (0) | 2025.01.11 |
---|---|
[Python] Anaconda 핵심 요소 (0) | 2025.01.11 |
[Python] 파이썬에서 아테나에 쿼리를 실행하는 방법 (0) | 2025.01.04 |
[Python] 특정 기간동안 일 단위 날짜 함수 (0) | 2025.01.04 |
[Python] Avro 파일 개념 (0) | 2025.01.04 |