[Airflow] 데이터 파이프라인이란?
1. 언제, 어디에서, 어떻게, 왜 데이터를 수집할 것인가에 대한 고민 필요 2. 데이터 파이프라인 구축시 수동작업 제거 필요 3. 데이터가 흐르도록 만들어야 함 4. 데이터 파이프라인 구축은 추출, 변경, 결합, 검증, 적재 과정을 자동화하는 것 5. 여러 데이터 스트림을 한번에 처리가 가능해야 함 6. ETL은 추출, 변환, 적재의 줄임말 7. 데이터 파이프라인은 ETL을 포함하는 광범위한 말
1. 언제, 어디에서, 어떻게, 왜 데이터를 수집할 것인가에 대한 고민 필요 2. 데이터 파이프라인 구축시 수동작업 제거 필요 3. 데이터가 흐르도록 만들어야 함 4. 데이터 파이프라인 구축은 추출, 변경, 결합, 검증, 적재 과정을 자동화하는 것 5. 여러 데이터 스트림을 한번에 처리가 가능해야 함 6. ETL은 추출, 변환, 적재의 줄임말 7. 데이터 파이프라인은 ETL을 포함하는 광범위한 말
log.cleanup.policy - 기본값 : delete - 선택 가능 : delete, compact - delete : 로그 세그먼트는 시간이나 크기제한에 도달할 때 주기적으로 삭제 - compact : 불필요한 레코드를 없애기 위해 압축을 사용
data = { 'parent': [{ 'id': 'id_1', 'category': 'category_1', }, { 'id': 'id_2', 'category': 'category_2', }] } df = spark.createDataFrame([data]) df.printSchema() df.show(truncate=False) df = df.select(explode(df.parent)) df.printSchema() df.show(truncate=False) root |-- parent: array (nullable = true) | |-- element: map (containsNull = true) | | |-- key: string | | |-- value: string (valueCont..
data = { 'parent': [{ 'id': 'id_1', 'category': 'category_1', }, { 'id': 'id_2', 'category': 'category_2', }] } df = spark.createDataFrame(data) df.printSchema() Fail to execute line 49: df = spark.createDataFrame(data) Traceback (most recent call last): File "/tmp/python16708257068745741506/zeppelin_python.py", line 162, in exec(code, _zcUserQueryNameSpace) File "", line 49, in File "/usr/local..
data = [{ 'id': 'id_1', 'category': 'category_1' }, { 'id': 'id_2', 'category': 'category_2' }] schema = MapType(StringType(), StringType()) df = spark.createDataFrame(data, schema) df.printSchema() df.show(truncate=False) df.withColumn('id', df.value.id).withColumn('category', df.value.category).drop('value').show()
data = [{ 'id': 'id_1', 'category': 'category_1' }, { 'id': 'id_2', 'category': 'category_2' }] df = spark.createDataFrame(data) df.printSchema() df.show() schema = StructType([ StructField('id', StringType()), StructField('category', StringType()) ]) df = spark.createDataFrame(data, schema) df.printSchema() df.show()
data = { 'category': 'category_1', 'id': 'id_1' } df = spark.createDataFrame([data]) df.printSchema() df.show() schema = StructType([ StructField('category', StringType()), StructField('id', StringType()) ]) df = spark.createDataFrame([data], schema) df.printSchema() df.show()
비정상 airflow-worker *** Log file does not exist: /opt/airflow/logs/dag_id=4_python_operator_context/run_id=scheduled__2022-12-10T11:22:00+00:00/task_id=print_kwargs/attempt=1.log *** Fetching from: http://airflow-worker:8793/log/dag_id=4_python_operator_context/run_id=scheduled__2022-12-10T11:22:00+00:00/task_id=print_kwargs/attempt=1.log *** !!!! Please make sure that all your Airflow components..
/home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:545 DeprecationWarning: The sql_alchemy_conn option in [core] has been moved to the sql_alchemy_conn option in [database] - the old setting has been used, but please update your config. 에러 해결하는 방법 환경변수 변경 AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow 를 다음처럼 바꾼다 AIRFLOW__DATABASE__S..
~/Desktop/git/airflow-pgt/airflow_celery main docker exec -it airflow-webserver bash airflow@airflow-webserver:/opt/airflow$ airflow db reset /home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:545: DeprecationWarning: The sql_alchemy_conn option in [core] has been moved to the sql_alchemy_conn option in [database] - the old setting has been used, but please update yo..