코드 결과
def fetchall_athena(self, *, query_string: str) -> list:
client = boto3.client("athena")
query_id = client.start_query_execution(
QueryString=query_string,
ResultConfiguration={
"OutputLocation": "s3://<bucket_name>/<path>"
},
)["QueryExecutionId"]
query_status = None
while query_status in ["QUEUED", "RUNNING", None]:
query_status = client.get_query_execution(QueryExecutionId=query_id)[
"QueryExecution"
]["Status"]["State"]
if query_status in ["FAILED", "CANCELLED"]:
raise Exception(
f'Athena query with the string "{query_string}" failed or was cancelled'
)
time.sleep(10)
results_paginator = client.get_paginator("get_query_results")
results_iter = results_paginator.paginate(
QueryExecutionId=query_id, PaginationConfig={"PageSize": 1000}
)
results = []
data_list = []
for results_page in results_iter:
for row in results_page["ResultSet"]["Rows"]:
data_list.append(row["Data"])
for datum in data_list[0:]:
results.append(
[x["VarCharValue"] if "VarCharValue" in x else "" for x in datum]
)
return [tuple(x) for x in results]
코드 설명
위 코드는 Python에서 AWS Athena에 쿼리를 실행하고, 완료될 때까지 대기한 뒤 결과를 가져오는 과정을 구현한 예시임.
함수 시그니처
def fetchall_athena(self, *, query_string: str) -> list:
클래스 메서드 형태(첫 번째 파라미터가 self)로 정의됨.
query_string 인자는 키워드 전용 인자(*)로 받음.
-> list:는 타입 힌트로, 이 함수가 리스트를 반환한다는 의미를 나타냄.
Athena 클라이언트 생성
client = boto3.client("athena")
boto3 라이브러리를 사용해 athena 서비스에 대한 클라이언트를 생성함.
이 클라이언트를 통해 Athena에 쿼리를 제출하고, 결과를 조회할 수 있음.
쿼리 실행 및 QueryExecutionId 획득
query_id = client.start_query_execution(
QueryString=query_string,
ResultConfiguration={
"OutputLocation": "s3://<bucket_name>/<path>"
},
)["QueryExecutionId"]
start_query_execution 메서드로 비동기(Asynchronous) 방식 쿼리를 실행함.
QueryString: 실제로 실행할 SQL 구문을 문자열 형태로 전달함.
ResultConfiguration["OutputLocation"]: 쿼리 결과 파일(CSV 등)을 저장할 S3 경로를 지정함.
Athena가 내부적으로 쿼리를 실행한 후, 해당 결과를 지정된 S3 버킷/폴더에 저장함.
start_query_execution의 응답 중 "QueryExecutionId"를 추출해 query_id 변수에 저장함.
이 query_id는 쿼리의 실행 상태 및 결과를 조회할 때 필요함.
쿼리 상태 폴링 : 반복 확인
query_status = None
while query_status in ["QUEUED", "RUNNING", None]:
query_status = client.get_query_execution(QueryExecutionId=query_id)[
"QueryExecution"
]["Status"]["State"]
if query_status in ["FAILED", "CANCELLED"]:
raise Exception(
f'Athena query with the string "{query_string}" failed or was cancelled'
)
time.sleep(10)
query_status 변수를 초기화(None)한 후, 쿼리 상태가 "QUEUED"(대기 중), "RUNNING"(실행 중), 또는 None일 때 계속 반복함.
client.get_query_execution(QueryExecutionId=query_id)를 통해 현재 쿼리의 실행 상태를 조회함.
반환된 딕셔너리에서 ["QueryExecution"]["Status"]["State"] 경로로 접근해 "QUEUED", "RUNNING", "SUCCEEDED", "FAILED", "CANCELLED" 등의 상태를 얻음.
만약 상태가 "FAILED"나 "CANCELLED"라면, Exception을 발생시켜 함수를 종료함.
쿼리가 여전히 "QUEUED" 또는 "RUNNING" 상태이면, 10초 대기(time.sleep(10)) 후 다시 상태를 확인함.
상태가 "SUCCEEDED"로 바뀌면 while 루프에서 빠져나옴.
주의 사항은 다음과 같음.
10초 폴링은 간단한 방법이지만, 긴 쿼리의 경우 대기시간이 길어지고, 짧은 쿼리의 경우에는 조금 비효율적일 수 있음.
더 짧은 간격으로 체크하거나, 다른 방식(예: CloudWatch Event, callback 등)을 고려해볼 수도 있음.
쿼리 결과 페이징 처리
results_paginator = client.get_paginator("get_query_results")
results_iter = results_paginator.paginate(
QueryExecutionId=query_id, PaginationConfig={"PageSize": 1000}
)
Athena 결과는 최대 1,000개 행(Row)씩 페이지 단위로 조회하는 것이 일반적임.
get_paginator("get_query_results")는 결과를 여러 페이지로 나누어 받아올 수 있는 paginator 객체를 생성함.
results_paginator.paginate(...)를 호출해, QueryExecutionId를 지정하고, 페이지 크기("PageSize": 1000")를 설정함.
results_iter는 각 페이지를 순회할 수 있는 이터레이터(Iterator)가 됨.
결과 데이터 수집
results = []
data_list = []
for results_page in results_iter:
for row in results_page["ResultSet"]["Rows"]:
data_list.append(row["Data"])
results_iter를 순회하면서 각 페이지(results_page)를 얻음.
results_page["ResultSet"]["Rows"] 항목을 보면, 실제 각 행(Row)에 대한 정보가 리스트 형태로 존재함.
모든 row에 대해 row["Data"]를 뽑아서 data_list에 차곡차곡 저장함.
각 Data 항목은 열(Column)별로 분할되어 있고, 실제 텍스트 값은 "VarCharValue" 키를 통해 접근할 수 있음.
데이터 후처리 및 반환
for datum in data_list[0:]:
results.append(
[x["VarCharValue"] if "VarCharValue" in x else "" for x in datum]
)
return [tuple(x) for x in results]
data_list[0:]는 단순히 data_list 전체를 순회한다는 의미임.
[0:]는 사실상 data_list 사본을 뜻함.
굳이 0:를 붙이지 않아도 동일함.
내부적으로 각 datum은 열(Column)별 데이터를 담은 딕셔너리들의 리스트임.
예를 들어,
[
{"VarCharValue": "col1_value"},
{"VarCharValue": "col2_value"},
...
]
리스트 컴프리헨션으로 x["VarCharValue"] if "VarCharValue" in x else ""를 수행하여, 열 값을 문자열로 추출함.
"VarCharValue" 키가 없는 경우는 일반적으로 헤더 행(필드 이름) 혹은 Null 데이터일 수 있어서, 빈 문자열 ""로 처리하고 있음.
이렇게 만들어진 내부 리스트를 results에 추가함.
즉, results는 최종적으로 [['col1_val', 'col2_val', ...], ...] 형태를 가지게 됨.
마지막으로 return [tuple(x) for x in results]를 통해, 각 행을 튜플 형태로 변환해 반환함.
반환값은 [(col1_val, col2_val, ...), ...] 의 리스트가 됨.
전체 동작 요약
1. start_query_execution: 비동기로 Athena 쿼리 실행 → QueryExecutionId 획득
2. 쿼리 상태 폴링: "SUCCEEDED" 상태가 될 때까지 10초 간격으로 재시도
3. 결과 페이징 조회: get_query_results의 paginator를 이용하여 모든 행을 수집
4. 데이터 후처리: JSON 형태로 내려오는 값을 파싱하여, 문자열(VarCharValue)만 추출
5. 리스트 of 튜플 형태로 최종 반환
추가 고려 사항 및 개선 아이디어
1. 헤더 행 스킵
Athena 결과의 첫 Row는 일반적으로 컬럼명(Column Header)을 담음.
현재 코드는 data_list의 모든 Row를 그대로 처리해 VarCharValue를 추출하기 때문에, 첫 번째 행이 컬럼명이 포함된 행일 가능성이 높음.
실제 로우 데이터만 사용하려면, data_list[1:] 같은 방식으로 첫 행을 스킵하거나, 컬럼명과 실제 데이터를 분리하는 로직을 넣을 수 있음.
2. 반환 구조
현 구현은 모든 결과를 메모리에 한꺼번에 담아 리스트로 반환함.
대용량 데이터셋의 경우, 스트리밍 처리(제너레이터로 반환)나 pandas DataFrame 변환 등을 고려해볼 수 있음.
3. 쿼리 상태 폴링 최적화
현재는 10초마다 상태를 확인함.
쿼리가 빠르게 끝나는 경우, 불필요한 대기가 발생할 수 있음.
더 짧은 주기로 폴링(예: 2초)하거나, CloudWatch 이벤트 등 비동기 알림을 사용하는 방법도 있음.
4. 예외 처리 강화
boto3 호출 중 예외(ClientError 등)가 발생할 수 있으므로, 적절한 예외 처리(try-except)나 재시도 로직을 고려해볼 수 있음.
5. ResultConfiguration
OutputLocation 값을 하드코딩하지 않고, 파라미터로 전달받도록 수정하면 재사용성이 높아짐.
S3 경로를 동적으로 지정하거나, 쿼리별로 다른 폴더를 구성하고 싶을 수 있기 때문임.
6. Type 변환
현재는 모든 값을 문자열로 취급함.
숫자나 날짜/시간 필드를 실제 Python 타입(int, float, datetime 등)으로 변환해야 하는 경우, 추가 파싱 로직이 필요함.
정리
이 함수는 AWS Athena에 쿼리를 제출하고, 쿼리가 완료될 때까지 기다린 후 그 결과를 리스트 of 튜플 형태로 반환하는 전형적인 구현 예시임.
쿼리 실행부터 상태 확인, 결과 페이징 등 Athena API 활용 흐름을 일목요연하게 보여줌.
단, 헤더 처리, 타입 변환, 대량 데이터 처리 시의 메모리 사용, 폴링 주기 최적화 등은 추가적인 개선 여지가 있음.
이 코드를 잘 이해하고 적절히 수정·확장하면, Athena 기반의 데이터 쿼리 작업을 쉽고 유연하게 자동화할 수 있음.
개선된 코드
1. Docstring으로 함수의 목적과 파라미터, 반환 타입을 명확히 기술
2. 키워드 인자 추가(쿼리 결과를 저장할 S3 위치, 폴링 주기, 페이지 크기 등)
3. 헤더 자동 스킵(필요 시 옵션화 가능)
4. 짧은 폴링 간격(sleep_interval)과 상태 감지 횟수를 로깅(또는 모니터링) 가능
5. 에러 처리 조금 더 세분화 (예: boto3.exceptions.Boto3Error)
원본 코드와 달리, 이 함수는 첫 행(컬럼 헤더)을 자동으로 제외하고 반환함.
만약 헤더도 필요하다면 간단히 수정할 수 있음.
import time
import logging
from datetime import datetime
from typing import List, Tuple
import boto3
from botocore.exceptions import BotoCoreError, ClientError
logger = logging.getLogger(__name__)
def fetchall_athena(
self,
*,
query_string: str,
s3_output_location: str = "s3://<bucket_name>/<path>",
sleep_interval: int = 5,
page_size: int = 1000,
skip_header: bool = True
) -> List[Tuple[str, ...]]:
"""
Execute a SQL query in AWS Athena and return all results as a list of tuples.
:param query_string: The SQL query to be run on Athena.
:param s3_output_location: S3 path where Athena stores the query results.
:param sleep_interval: Interval (seconds) between status checks.
:param page_size: Maximum number of rows fetched per page.
:param skip_header: Whether to skip the first row (which usually contains column headers).
:return: A list of tuples representing each row of the query result.
:raises Exception: If the query fails or is cancelled.
:raises BotoCoreError, ClientError: If there are issues communicating with AWS.
"""
# 1. Athena 클라이언트 생성
try:
client = boto3.client("athena")
except (BotoCoreError, ClientError) as e:
logger.error(f"Failed to create Athena client: {e}")
raise
# 2. 쿼리 실행(비동기)
try:
response = client.start_query_execution(
QueryString=query_string,
ResultConfiguration={"OutputLocation": s3_output_location},
)
query_id = response["QueryExecutionId"]
except (BotoCoreError, ClientError) as e:
logger.error(f"Failed to start Athena query: {e}")
raise
# 3. 쿼리 상태 폴링
query_status = None
poll_count = 0
while query_status in ("QUEUED", "RUNNING", None):
poll_count += 1
try:
execution = client.get_query_execution(QueryExecutionId=query_id)
query_status = execution["QueryExecution"]["Status"]["State"]
except (BotoCoreError, ClientError) as e:
logger.error(f"Failed to get Athena query status: {e}")
raise
if query_status in ("FAILED", "CANCELLED"):
message = (
f"Athena query failed or cancelled: {query_string}. "
f"State: {query_status}"
)
logger.error(message)
raise Exception(message)
if query_status in ("QUEUED", "RUNNING", None):
logger.debug(f"Query still running (poll #{poll_count}). Sleeping {sleep_interval}s...")
time.sleep(sleep_interval)
# 4. 페이지네이션을 통해 결과 수집
try:
results_paginator = client.get_paginator("get_query_results")
results_iter = results_paginator.paginate(
QueryExecutionId=query_id,
PaginationConfig={"PageSize": page_size},
)
except (BotoCoreError, ClientError) as e:
logger.error(f"Failed to create paginator for Athena results: {e}")
raise
# 5. 데이터 파싱
data_list = []
for results_page in results_iter:
rows = results_page["ResultSet"]["Rows"]
for row in rows:
data_list.append(row["Data"])
# (옵션) 첫 행 스킵: 일반적으로 첫 행은 컬럼 헤더
if skip_header and len(data_list) > 0:
data_list = data_list[1:]
# 6. 문자열로 변환 (VarCharValue가 없으면 빈 문자열)
records = []
for datum in data_list:
record = [col.get("VarCharValue", "") for col in datum]
records.append(tuple(record))
return records
개선 사항 요약
1. Docstring & 타입 힌트
함수의 목적, 파라미터, 반환 타입, 예외 상황을 명시해 협업 및 유지보수에 유리함.
2. 인자 유연성
s3_output_location, sleep_interval, page_size 등을 사용자가 쉽게 커스터마이즈할 수 있음.
3. skip_header 옵션
기본적으로 헤더 행을 스킵(일반적 시나리오).
헤더가 필요한 경우 skip_header=False로 설정하면 됨.
4. 상태 모니터링 개선
sleep_interval을 5초로 줄여, 쿼리가 빠르게 끝날 때 불필요하게 오래 대기하지 않도록 변경함.
logger.debug를 통해 폴링 횟수 등 상태를 로깅할 수 있도록 적용함.
5. 에러 처리
boto3에서 발생할 수 있는 BotoCoreError나 ClientError 등 예외를 명시적으로 잡아 로깅하는 구조를 도입함.
쿼리가 "FAILED", "CANCELLED" 상태이면 명확한 메시지와 함께 Exception을 발생시킴.
6. 가독성
로직이 크게 5~6단계로 나뉘어 있고, 주석과 로깅을 통해 각 단계를 명확히 구분함.
코드 개선 정리
이 코드 패턴은 대량의 데이터를 처리할 경우 메모리 사용량이 많아질 수 있으므로, 실제 운영 환경에서는 제너레이터 형태로 한 줄씩(혹은 한 페이지씩) 처리하거나, AWS Glue / Athena CTAS 등의 전략을 병행해서 대규모 데이터를 효율적으로 다루는 방법도 고려해볼 수 있음.
'Programming Language > Python' 카테고리의 다른 글
[Python] Anaconda 핵심 요소 (0) | 2025.01.11 |
---|---|
[Python] 데이터프레임 S3 저장 함수 (0) | 2025.01.04 |
[Python] 특정 기간동안 일 단위 날짜 함수 (0) | 2025.01.04 |
[Python] Avro 파일 개념 (0) | 2025.01.04 |
[Python] Parquet 파일 개념 (0) | 2025.01.04 |