카프카 스트림즈 토폴로지
카프카 스트림즈는 실시간 데이터 스트림을 처리하는 강력한 도구임.
다양한 언어에서 사용할 수 있으며 특히 코틀린과 같은 JVM 기반 언어에서도 매우 잘 통합됨.
코틀린은 함수형 프로그래밍 스타일을 지원함.
이는 카프카 스트림즈의 데이터 흐름을 구성하는 데 매우 유용함.
1. 카프카 스트림즈의 기본 개념
카프카 스트림즈의 토폴로지는 데이터 처리 애플리케이션의 논리적인 구조를 나타냄.
토폴로지는 하나 이상의 데이터 소스와 처리 연산을 연결하여 데이터가 흐르는 방향을 정의함.
이를 통해 데이터를 변환하고 집계한 결과를 다시 내보낼 수 있음.
토폴로지는 다음과 같은 기본 구성 요소로 이루어짐.
소스 프로세서 : 카프카 토픽으로부터 데이터를 읽어오는 역할.
스트림 프로세서 : 데이터를 처리하는 역할
싱크 프로세서 : 처리된 데이터를 다시 카프카 토픽으로 내보내는 역할
2. 코틀린에서 카프카 스트림즈 토폴로지 작성
코틀린에서 카프카 스트림즈를 사용하면 함수형 프로그래밍 스타일을 활용하여 더욱 간결하고 읽기 쉬운 코드를 작성할 수 있음.
StreamsBuilder를 사용하여 토폴로지를 구성함.
KStream과 KTable을 통해 데이터의 흐름을 제어할 수 있음.
간단한 예시는 다음과 같음.
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.kstream.KStream
import java.util.Properties
fun main() {
// Kafka Streams 설정 구성
val props = Properties().apply {
put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app")
put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes\$StringSerde")
put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes\$StringSerde")
}
// 토폴로지 빌더 생성
val builder = StreamsBuilder()
// Source: Kafka에서 input-topic으로부터 데이터 읽기
val inputStream: KStream<String, String> = builder.stream("input-topic")
// Stream Processor: 데이터 변환 및 필터링
val processedStream: KStream<String, String> = inputStream
.mapValues { value -> value.uppercase() } // 모든 값을 대문자로 변환
.filter { _, value -> value.length > 5 } // 문자열 길이가 5 초과인 값만 통과
// Sink: 처리된 데이터를 output-topic에 쓰기
processedStream.to("output-topic")
// Kafka Streams 인스턴스 시작
val streams = KafkaStreams(builder.build(), props)
streams.start()
// 종료 후 클린업
Runtime.getRuntime().addShutdownHook(Thread { streams.close() })
}
이 예시는 간단한 카프카 스트림즈 애플리케이션을 코틀린으로 구현한 것임.
input-topic 에서 데이터를 읽어와서 대문자로 변환하고 길이가 5자를 초과하는 메시지만 필터링하여 output-topic 으로 내보냄.
3. 주요 토폴로지 연산
카프카 스트림즈에서 자주 사용하는 연산을 몇 가지 설명하겠음.
1. mapValues - 값 변환
mapValues는 각 메시지의 값을 변환하는 데 사용됨.
키는 그대로 두고 값만 변경하고 싶을 때 유용함.
inputStream.mapValues { value -> value.uppercase() }
2. filter - 데이터 필터링
filter는 주어진 조건에 맞는 데이터만을 통과시킴.
예를 들어, 메시지의 값이 길이가 5 이상인 경우에만 필터를 통과하게 할 수 있음.
inputStream.filter { _, value -> value.length > 5 }
3. groupByKey 및 aggregate - 집계 처리
스트림 데이터를 그룹화하고 집계할 때는 groupByKey와 aggregate를 사용할 수 있음.
예를 들어, 특정 키를 기준으로 데이터를 집계하는 과정은 다음과 같이 구현할 수 있음.
val aggregatedStream = inputStream
.groupByKey()
.aggregate(
{ 0 }, // 초기값
{ key, value, aggregate -> aggregate + value.length }, // 집계 로직
Materialized.with(Serdes.String(), Serdes.Integer())
)
이 코드는 각 키에 대해 메시지 값의 길이를 누적하는 집계 연산을 수행함.
4. join - 스트림 간 조인
카프카 스트림즈에서는 두 개 이상의 스트림을 조인하여 복합적인 데이터를 처리할 수 있음.
KStream 이나 KTable 을 서로 조인하여 복잡한 데이터 처리 로직을 구현할 수 있음.
val joinedStream = stream1.join(stream2,
{ value1, value2 -> "$value1-$value2" }, // 조인 결과 처리
JoinWindows.of(Duration.ofMinutes(5)) // 5분 윈도우로 조인
)
5. 상태 저장 처리
카프카 스트림즈는 상태 저장 처리를 지원함.
상태를 저장하면, 집계나 조인과 같은 복잡한 연산을 효율적으로 처리할 수 있음.
상태 저장 처리시 상태 저장소가 생성되며, 로컬 저장소나 카프카의 체인지 로그를 통해 상태를 유지함.
val countTable = inputStream
.groupByKey()
.count()
count 는 각 키에 대한 메시지 수를 세는 상태 저장 연산임.
4. 서브 토폴로지 및 병렬 처리
카프카 스트림즈는 서브 토폴로지를 생성하여 스트림 처리의 병렬성을 최적화함.
코틀린에서 카프카 스트림즈 애플리케이션은 각 파티션에 대해 별도의 쓰레드에서 처리되며, 토폴로지는 각 파티션 단위로 병렬 처리가 가능함.
아래 예시는 두 개의 스트림을 병렬 처리하는 서브 토폴로지임.
val builder = StreamsBuilder()
// 첫 번째 서브 토폴로지
val stream1: KStream<String, String> = builder.stream("topic1")
stream1.mapValues { value -> value.uppercase() }
.to("output1")
// 두 번째 서브 토폴로지
val stream2: KStream<String, String> = builder.stream("topic2")
stream2.filter { _, value -> value.startsWith("A") }
.to("output2")
val streams = KafkaStreams(builder.build(), props)
streams.start()
이 예시는 두 개의 서로 다른 토픽에서 데이터를 읽어와 각각 별도의 서브 토폴로지로 처리하는 예시임.
각 스트림은 서로 다른 연산을 적용한 후 서로 다른 토픽으로 결과를 내보냄.
5. 윈도우 처리
카프카 스트림즈는 윈도우 처리를 통해 특정 시간 범위 내에서 데이터를 그룹화하고 처리할 수 있는 기능을 제공함.
이는 시간 기반 집계에 매우 유용함.
val windowedCounts = inputStream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count()
이 코드는 5분 간격으로 데이터를 집계하는 윈도우 처리의 예임.
6. 글로벌 토폴로지
카프카 스트림즈에서는 서브 토폴로지 외에도 글로벌 토폴로지를 구성할 수 있음.
글로벌 토폴로지는 애플리케이션 전체에서 사용할 수 있는 공통 데이터를 유지하고, 각 인스턴스에서 동일한 데이터 상태를 공유함.
주로 글로벌 KTable을 사용할 때 유용함.
val globalTable = builder.globalTable("global-topic")
결론
카프카 스트림즈에서의 토폴로지 개념은 실시간 스트리밍 애플리케이션의 중요한 부분임.
코틀린의 함수형 스타일과 잘 결합하여 강력한 데이터 파이프라인을 구축할 수 있음.
카프카 스트림즈의 다양한 연산과 상태 저장, 윈도우 처리 등을 적절히 사용하면 복잡한 스트리밍 데이터를 효율적으로 처리할 수 있음.
'Data Engineering > Kafka' 카테고리의 다른 글
[Kafka] 아파치 카프카란? (0) | 2023.05.03 |
---|---|
[Kafka] 카프카의 장단점 (0) | 2023.04.17 |
[Kafka] log.cleanup.policy 이란 (0) | 2022.12.25 |
[Zookeeper] 주키퍼 도커 컴포즈로 실행시 가상볼륨 하나로 만드는 방법 (0) | 2022.08.14 |
[Zookeeper] 주키퍼 도커 컴포즈 실행시 가상볼륨 생성되는 문제 (0) | 2022.08.14 |