Data Engineering

Data Engineering/Spark

[Spark] RDD 앞에서부터 원하는 갯수만큼 사용하는 방법

코드 from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession\ .builder\ .appName("0_save_file")\ .getOrCreate() sc = spark.sparkContext line_1 = sc.parallelize(['0', '1', '2', '3', '4']) line_2 = sc.parallelize(['5', '6', '7', '8', '9']) line_3 = sc.parallelize(['10', '11', '12', '13', '14']) line_all = line_1.union(line_2).union(line_3) line_filter = line_all.filter(l..

Data Engineering/Spark

[Spark] RDD에서 filter 적용해서 데이터 처리하는 방법

코드 from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession\ .builder\ .appName("0_save_file")\ .getOrCreate() sc = spark.sparkContext line_1 = sc.parallelize(['0', '1', '2', '3', '4']) line_2 = sc.parallelize(['5', '6', '7', '8', '9']) line_3 = sc.parallelize(['10', '11', '12', '13', '14']) line_all = line_1.union(line_2).union(line_3) line_filter = line_all.filter(l..

Data Engineering/Spark

[Spark] 리스트 값이 있는 rdd 여러개를 하나의 rdd로 만드는 방법

코드 from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession\ .builder\ .appName("0_save_file")\ .getOrCreate() line_1 = spark.sparkContext.parallelize(['0', '1', '2', '3', '4']) line_2 = spark.sparkContext.parallelize(['5', '6', '7', '8', '9']) line_3 = spark.sparkContext.parallelize(['10', '11', '12', '13', '14']) line_all = line_1.union(line_2).union(line_3) print('..

Data Engineering/Spark

[Spark] pyspark는 코드 상에서 2개 이상의 rdd를 같은 위치에 파일을 저장할 수 없다.

내 코드를 보자 from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession\ .builder\ .appName("0_save_file")\ .getOrCreate() alphabet_list = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z'] alphabet_rdd = spark.sparkContext.parallelize(alphabet_list) number_rdd = spark.sparkContext.parallelize(r..

Data Engineering/Spark

[Spark] org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file

스파크 테스트 중 다음과 같은 에러가 발생 22/05/28 11:51:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Traceback (most recent call last): File "0_save_file.py", line 24, in alphabet_rdd.saveAsTextFile("/home/spark/result/0_save_file") File "/usr/local/lib/python3.8/dist-packages/pyspark/rdd.py", line 1828, in saveAsTextFile keyed._j..

Data Engineering/Airflow

[Airflow] Bash Operator로 간단한 echo DAG 만들어보기

3_bash_operator_echo.py from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime default_args = { 'owner' : 'ParkGyeongTae' } dag = DAG ( dag_id = '3_bash_operator_echo', start_date = datetime(2022, 5, 4), schedule_interval = '* * * * *', catchup = False, tags = ['test'], description = 'Bash Operator Sample', default_args = default_args ) ech..

Data Engineering/Airflow

[Airflow] Bash Operator로 간단한 Sleep Dag 만들어보기

2_bash_operator.py from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime default_args = { 'owner' : 'ParkGyeongTae' } dag = DAG ( dag_id = '2_bash_operator', start_date = datetime(2022, 5, 4), schedule_interval = '* * * * *', catchup = False, tags = ['test'], description = 'Bash Operator Sample', default_args = default_args ) sleep_1 = Bas..

Data Engineering/Airflow

[Airflow] Python Operator로 간단한 Dag 만들어보기

1_python_operator.py from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime default_args = { 'owner' : 'ParkGyeongTae' } dag = DAG ( dag_id = '1_python_operator', start_date = datetime(2022, 5, 4), schedule_interval = '* * * * *', catchup = False, tags = ['test'], description = 'Python Operator Sample', default_args = default_args ) def..

Data Engineering/Airflow

[Airflow] airflow에 연결된 postgreSQL 테이블 확인하기

https://github.com/ParkGyeongTae/airflow-pgt/tree/main/0_airflow GitHub - ParkGyeongTae/airflow-pgt Contribute to ParkGyeongTae/airflow-pgt development by creating an account on GitHub. github.com sudo -u postgres psql -U postgres -c "\list" sudo -u postgres psql -U postgres -d airflow -c "\list" sudo -u postgres psql -U postgres -d airflow -c "\dt" sudo -u postgres psql -U postgres -d airflow -..

Data Engineering/Airflow

[Airflow] 다양한 Dag Graph 실습해보자 (Bash operator)

https://github.com/ParkGyeongTae/airflow-pgt/tree/main/0_airflow GitHub - ParkGyeongTae/airflow-pgt Contribute to ParkGyeongTae/airflow-pgt development by creating an account on GitHub. github.com from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime dag = DAG ( dag_id = 'my_bash_dag', start_date = datetime(2022, 4, 16), schedule_interval ..

박경태
'Data Engineering' 카테고리의 글 목록 (17 Page)