Data Engineering

Data Engineering/Spark

[Spark] List로 pyspark dataframe 만드는 방법

from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType spark = SparkSession \ .builder \ .master('local') \ .appName('my_pyspark_app') \ .getOrCreate() data = [ ("kim", 100), ("kim", 90), ("lee", 80), ("lee", 70), ('park', 60) ] schema = StructType([ \ StructField('name', StringType(),True), \ StructField('score', IntegerType(),True) ]..

Data Engineering/Spark

[Spark] Row 함수를 이용해서 Pyspark dataframe 만드는 방법

from pyspark.sql import SparkSession, Row spark = SparkSession \ .builder \ .master('local') \ .appName('my_pyspark_app') \ .getOrCreate() data = [Row(id = 0, name = 'park', score = 100), Row(id = 1, name = 'lee', score = 90), Row(id = 2, name = 'kim', score = 80)] df = spark.createDataFrame(data) df.show()

Data Engineering/Spark

[Spark] pandas dataframe을 pyspark dataframe로 변환하는 방법

import pandas as pd from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('local') \ .appName('my_pyspark_app') \ .getOrCreate() df_pandas = pd.DataFrame({ 'id': [0, 1, 2, 3, 4], 'name': ['kim', 'kim', 'park', 'park', 'lee'], 'score': [100, 90, 80, 70, 60] }) df_spark = spark.createDataFrame(df_pandas) print(df_pandas) df_spark.show()

Data Engineering/Airflow

[Airflow] 데이터 파이프라인이란?

1. 언제, 어디에서, 어떻게, 왜 데이터를 수집할 것인가에 대한 고민 필요 2. 데이터 파이프라인 구축시 수동작업 제거 필요 3. 데이터가 흐르도록 만들어야 함 4. 데이터 파이프라인 구축은 추출, 변경, 결합, 검증, 적재 과정을 자동화하는 것 5. 여러 데이터 스트림을 한번에 처리가 가능해야 함 6. ETL은 추출, 변환, 적재의 줄임말 7. 데이터 파이프라인은 ETL을 포함하는 광범위한 말

Data Engineering/Kafka

[Kafka] log.cleanup.policy 이란

log.cleanup.policy - 기본값 : delete - 선택 가능 : delete, compact - delete : 로그 세그먼트는 시간이나 크기제한에 도달할 때 주기적으로 삭제 - compact : 불필요한 레코드를 없애기 위해 압축을 사용

Data Engineering/Spark

[Spark] Pyspark dataframe 안의 List 처리하는 방법

data = { 'parent': [{ 'id': 'id_1', 'category': 'category_1', }, { 'id': 'id_2', 'category': 'category_2', }] } df = spark.createDataFrame([data]) df.printSchema() df.show(truncate=False) df = df.select(explode(df.parent)) df.printSchema() df.show(truncate=False) root |-- parent: array (nullable = true) | |-- element: map (containsNull = true) | | |-- key: string | | |-- value: string (valueCont..

Data Engineering/Spark

[Spark] TypeError: Can not infer schema for type: <class 'str'> 해결 방법

data = { 'parent': [{ 'id': 'id_1', 'category': 'category_1', }, { 'id': 'id_2', 'category': 'category_2', }] } df = spark.createDataFrame(data) df.printSchema() Fail to execute line 49: df = spark.createDataFrame(data) Traceback (most recent call last): File "/tmp/python16708257068745741506/zeppelin_python.py", line 162, in exec(code, _zcUserQueryNameSpace) File "", line 49, in File "/usr/local..

Data Engineering/Spark

[Spark] Pyspark json List를 처리하는 방법

data = [{ 'id': 'id_1', 'category': 'category_1' }, { 'id': 'id_2', 'category': 'category_2' }] schema = MapType(StringType(), StringType()) df = spark.createDataFrame(data, schema) df.printSchema() df.show(truncate=False) df.withColumn('id', df.value.id).withColumn('category', df.value.category).drop('value').show()

Data Engineering/Spark

[Spark] Pyspark List+Json 확인하는 방법

data = [{ 'id': 'id_1', 'category': 'category_1' }, { 'id': 'id_2', 'category': 'category_2' }] df = spark.createDataFrame(data) df.printSchema() df.show() schema = StructType([ StructField('id', StringType()), StructField('category', StringType()) ]) df = spark.createDataFrame(data, schema) df.printSchema() df.show()

Data Engineering/Spark

[Spark] Pyspark 간단한 StructType 사용하는 방법

data = { 'category': 'category_1', 'id': 'id_1' } df = spark.createDataFrame([data]) df.printSchema() df.show() schema = StructType([ StructField('category', StringType()), StructField('id', StringType()) ]) df = spark.createDataFrame([data], schema) df.printSchema() df.show()

박경태
'Data Engineering' 카테고리의 글 목록 (6 Page)