Data Engineering/Airflow

Data Engineering/Airflow

[Airflow] EMR에서 Step의 개념

EMR에서 Step의 개념아마존 EMR(Amazon Elastic MapReduce)에서 Step은 클러스터 상에서 수행되는 작업(잡, 태스크) 단위를 추상화한 개념임.EMR 클러스터에서 Spark, Hadoop MapReduce, Hive, Pig 스크립트 등을 실행할 때, 이를 하나의 단위(스텝)로 정의하고 순차적으로(또는 병렬로) 실행할 수 있음. EMR Step1. 작업(잡)을 ‘스텝’ 단위로 관리EMR 클러스터에서 수행되는 작업(예: Spark-submit, Hadoop jar, Hive 쿼리, Pig 스크립트, S3DistCp 등)을 하나의 “스텝(Step)”이라 부름.각 스텝은 필요한 실행 파일(또는 JAR, 스크립트), 인자, 실행 옵션, 실행 시나리오 등을 담고 있음. 2. 순차 실행 v..

Data Engineering/Airflow

[Airflow] Airflow를 통해 EMR에 Pyspark 여러개 실행하는 방법

Airflow를 통해 EMR에 Pyspark 여러개 실행하는 방법아파치 에어플로우(Apache Airflow)에서 EMR(Elastic MapReduce) 클러스터를 사용해 여러 개의 PySpark 작업을 실행하는 전반적인 방법과 모범사례는 아래와 같음.여기서는 각각의 PySpark 작업을 EMR 스텝으로 등록하고 실행하는 시나리오를 다룸. 기본 개념 정리1. EMR 클러스터AWS에서 Spark/Hadoop/Hive/Pig 등의 작업을 실행할 수 있는 클러스터 환경임.EmrCreateJobFlowOperator로 클러스터를 생성하거나, 이미 띄워진 클러스터가 있다면 그 클러스터 ID를 사용하여 스텝을 추가할 수 있음. 2. EMR 스텝(Step)EMR에서 실행하는 하나의 작업 단위를 스텝이라 부름.예를 ..

Data Engineering/Airflow

[Airflow] EmrAddStepsOperator 개념

EmrAddStepsOperator아파치 에어플로우(Apache Airflow)에서 EmrAddStepsOperator는 이미 실행 중이거나 준비(Waiting) 상태인 EMR(Elastic MapReduce) 클러스터에 새로운 스텝(steps)을 추가하기 위해 사용되는 오퍼레이터임.예를 들어 Spark 작업, Hadoop 작업, Hive 스크립트, Pig 스크립트 등을 EMR 클러스터에서 실행하고 싶을 때, 작업 단위를 ‘스텝’(Step)으로 정의하고 이를 EMR 클러스터에 추가하여 순차/병렬로 실행할 수 있음. 역할 및 동작 원리1. EMR 클러스터에 작업(스텝)을 등록EmrAddStepsOperator는 AWS의 EMR 클러스터에 대하여 add_job_flow_steps API를 호출해 여러 스텝을..

Data Engineering/Airflow

[Airflow] 컴포넌트, MWAA

Airflow 컴포넌트Airflow는 복잡한 워크플로우를 스케줄링하고 관리하기 위한 오픈소스 플랫폼임.데이터 엔지니어 및 개발자들은 작업의 의존성을 명확히 정의하고, 반복적인 작업을 자동화할 수 있음.Airflow는 여러 컴포넌트로 구성되어 있으며, 각각의 역할이 있음 Airflow 각 컴포넌트 역할1. Web ServerFlask를 기반으로 한 웹 어플리케이션으로 사용자 인터페이스를 제공함.웹 서버를 통해 사용자는 작업 스케줄, 실행 상태 확인, 로그 보기 등의 작업을 수행할 수 있음. 2. Scheduler워크플로우의 스케줄링을 담당함.이 컴포넌트는 DAGs(방향성 비순환 그래프)를 주기적으로 폴링하며, 실행해야 할 새로운 작업 인스턴스를 확인하고 실행함. 3. Executor실제 작업을 실행하는 역..

Data Engineering/Airflow

[Airflow] 에어플로우 DAG란?

DAG - Directed Acyclic Graph - 순환하지 않는 그래프 - 대그라고 부른다. - 순차적으로 작업이 실행된다. - 순환 실행을 하지 않는다.

Data Engineering/Airflow

[Airflow] 데이터 파이프라인이란?

1. 언제, 어디에서, 어떻게, 왜 데이터를 수집할 것인가에 대한 고민 필요 2. 데이터 파이프라인 구축시 수동작업 제거 필요 3. 데이터가 흐르도록 만들어야 함 4. 데이터 파이프라인 구축은 추출, 변경, 결합, 검증, 적재 과정을 자동화하는 것 5. 여러 데이터 스트림을 한번에 처리가 가능해야 함 6. ETL은 추출, 변환, 적재의 줄임말 7. 데이터 파이프라인은 ETL을 포함하는 광범위한 말

Data Engineering/Airflow

[Airflow] 에어플로우의 로그파일이 존재하지 않을 때

비정상 airflow-worker *** Log file does not exist: /opt/airflow/logs/dag_id=4_python_operator_context/run_id=scheduled__2022-12-10T11:22:00+00:00/task_id=print_kwargs/attempt=1.log *** Fetching from: http://airflow-worker:8793/log/dag_id=4_python_operator_context/run_id=scheduled__2022-12-10T11:22:00+00:00/task_id=print_kwargs/attempt=1.log *** !!!! Please make sure that all your Airflow components..

Data Engineering/Airflow

[Airflow] The sql_alchemy_conn option in [core] has been moved to the sql_alchemy_conn option in [database] - the old setting has been used, but please update your config. 에러 해결

/home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:545 DeprecationWarning: The sql_alchemy_conn option in [core] has been moved to the sql_alchemy_conn option in [database] - the old setting has been used, but please update your config. 에러 해결하는 방법 환경변수 변경 AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow 를 다음처럼 바꾼다 AIRFLOW__DATABASE__S..

Data Engineering/Airflow

[Airflow] airflow db reset을 하면 scheduler가 죽는다..

~/Desktop/git/airflow-pgt/airflow_celery   main  docker exec -it airflow-webserver bash airflow@airflow-webserver:/opt/airflow$ airflow db reset /home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:545: DeprecationWarning: The sql_alchemy_conn option in [core] has been moved to the sql_alchemy_conn option in [database] - the old setting has been used, but please update yo..

Data Engineering/Airflow

[Airflow] airflow db init

~/Desktop/git/airflow-pgt/airflow_celery   main  docker exec -it airflow-webserver bash airflow@airflow-webserver:/opt/airflow$ airflow db init /home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:545: DeprecationWarning: The sql_alchemy_conn option in [core] has been moved to the sql_alchemy_conn option in [database] - the old setting has been used, but please update you..

박경태
'Data Engineering/Airflow' 카테고리의 글 목록