Data Engineering

Data Engineering/Kafka

[Kafka] 카프카의 장단점

아파치 카프카의 장단점 아파치 카프카는 대용량 실시간 메시징 시스템으로, 대량의 데이터를 안정적이고 빠르게 전송하고 처리하는 데 사용됩니다. 이를 위해 아파치 카프카는 분산형 아키텍처를 채택하며, 여러 대의 브로커와 프로듀서, 컨슈머로 구성됩니다. 장점: 대용량 데이터의 분산 처리: 아파치 카프카는 대량의 데이터를 처리하기 위한 분산형 아키텍처를 채택하고 있어, 대용량 데이터를 안정적으로 처리할 수 있습니다. 실시간 처리: 아파치 카프카는 실시간 처리에 최적화되어 있어, 실시간으로 대용량 데이터를 처리할 수 있습니다. 고가용성: 아파치 카프카는 다수의 브로커로 구성되어 있어, 하나의 브로커가 다운되더라도 다른 브로커가 대신 처리할 수 있어, 고가용성을 보장합니다. 확장성: 아파치 카프카는 수평적으로 확장..

Data Engineering/Airflow

[Airflow] 에어플로우 DAG란?

DAG - Directed Acyclic Graph - 순환하지 않는 그래프 - 대그라고 부른다. - 순차적으로 작업이 실행된다. - 순환 실행을 하지 않는다.

Data Engineering/Spark

[Spark] conda로 pyspark 환경 구축하기

아나콘다 가상환경 리스트 확인 conda env list python 가상환경 만들기 conda create -n py38 python==3.8 -y 가상환경 리스트 확인 conda env list 가상환경 접속 후 리스트 확인 conda activate py38 conda env list pyspark 3.3.1 설치 conda install -c conda-forge pyspark==3.3.1 -y 라이브러리 리스트 확인 pip list 간단한 pyspark 코드 실행해보기 from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType spark = Spar..

Data Engineering/Spark

[Spark] ValueError: field score: This field is not nullable, but got None

다음과 같은 에러가 발생 Traceback (most recent call last): File "df_schema_null.py", line 23, in df = spark.createDataFrame(data = data, schema = schema) File "/Users/pgt0409/opt/anaconda3/envs/py38/lib/python3.8/site-packages/pyspark/sql/session.py", line 894, in createDataFrame return self._create_dataframe( File "/Users/pgt0409/opt/anaconda3/envs/py38/lib/python3.8/site-packages/pyspark/sql/session.py"..

Data Engineering/Spark

[Spark] pyspark dataframe 생성시 schema data type 설정 방법

from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType spark = SparkSession \ .builder \ .master('local') \ .appName('my_pyspark_app') \ .getOrCreate() data = [ ('kim', 100), ('kim', 90), ('lee', 80), ('lee', 70), ('park', 60) ] schema = ['name', 'score'] df = spark.createDataFrame(data = data, schema = schema) df.printSchema() df.show..

Data Engineering/Spark

[Spark] pyspark dataframe 의 특정 열을 list로 만드는 방법

from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType spark = SparkSession \ .builder \ .master('local') \ .appName('my_pyspark_app') \ .getOrCreate() data = [ ('kim', 100), ('kim', 90), ('lee', 80), ('lee', 70), ('park', 60) ] schema = StructType([ \ StructField('name', StringType(), True), \ StructField('score', IntegerType(), True)..

Data Engineering/Spark

[Spark] pyspark dataframe을 리스트로 만드는 가장 좋고 빠른 방법

+-------------------------------------------------------------+---------+-------------+ | Code | 100,000 | 100,000,000 | +-------------------------------------------------------------+---------+-------------+ | df.select("col_name").rdd.flatMap(lambda x: x).collect() | 0.4 | 55.3 | | list(df.select('col_name').toPandas()['col_name']) | 0.4 | 17.5 | | df.select('col_name').rdd.map(lambda row : ro..

Data Engineering/Spark

[Spark] pyspark dataframe 특정 컬럼(열)만 출력하는 방법

from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType from pyspark.sql.functions import col spark = SparkSession \ .builder \ .master('local') \ .appName('my_pyspark_app') \ .getOrCreate() data = [ ('kim', 100), ('kim', 90), ('lee', 80), ('lee', 70), ('park', 60) ] schema = StructType([ \ StructField('name', StringType(), True), \ Str..

Data Engineering/Spark

[Spark] pyspark dataframe 컬럼을 이용해 연산하는 방법

from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType from pyspark.sql.functions import col spark = SparkSession \ .builder \ .master('local') \ .appName('my_pyspark_app') \ .getOrCreate() data = [ ('kim', 100), ('kim', 90), ('lee', 80), ('lee', 70), ('park', 60) ] schema = StructType([ \ StructField('name', StringType(), True), \ Str..

Data Engineering/Spark

[Spark] pyspark dataframe을 원하는 열로 groupby 하는 방법

from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType spark = SparkSession \ .builder \ .master('local') \ .appName('my_pyspark_app') \ .getOrCreate() data = [ ('kim', 'a', 100), ('kim', 'a', 90), ('lee', 'a', 80), ('lee', 'b', 70), ('park', 'b', 60) ] schema = StructType([ \ StructField('name', StringType(), True), \ StructField('cla..

박경태
'Data Engineering' 카테고리의 글 목록 (5 Page)