최근에 아파치 카프카 관련 글들이 많이 보여서 순전히 궁금증 때문에 오프라인 강의를 신청 했었다. 그리고 오늘 처음으로 총 4시간 동안 강의를 들었고 생각을 정리할 필요가 있겠다는 생각이 들었다.
모든 이미지는 아파치 카프카 애플리케이션 프로그래밍 with 자바 에 출처가 있습니다.
이 기술은 작은 스타트업에 어떻게 녹여내야 하는가.
복잡도를 해결한 카프카 그리고 복잡하지 않은 스타트업
아파치 카프카의 탄생 배경은 이러하다. 링크드인 (LinkedIn)은 파편화된 데이터 수집 및 분배 아키텍처를 운영하는데 큰 어려움을 겪었다. 일반적으로 데이터를 생성하는 소스 어플리케이션과 생성된 데이터가 적재되는 타깃 어플리케이션은 당연히 연결되어야 한다. 그런데 이 아키텍쳐들이 점점 거대하고 복잡해지면서 이 데이터 라인이 기하급수적으로 복잡해져버린다.
위 처럼 각각의 어플리케이션이 알아야 할 관계들이 많아짐에 따라 운영 관리가 복잡하게 되었고 이를 정리할 필요성을 느낀 것이다.
링크드인인의 데이터 팀은 이 문제를 해결할 수 있는 신규 시스템을 만들었고 이것이 바로 아파치 카프카(Apache Kafka)이다. 카프카는 어플리케이션의 관계를 연결하는 것이 아닌, 한 곳에 모아서 처리할수 있도록 중앙집중화 하여 문제를 해결하였다.
기존에 1:1 매칭으로 개발하고 운영하던 데이터 파이프라인은 어느 한 쪽의 어플리케이션에 장애가 발생할 시에 다른 쪽의 어플리케이션에 영향을 미친다. 카프카는 대신 메시지 큐 기반(FIFO)의 구조를 활용하여 이러한 커플링을 해결하였다. 소스 어플리케이션은 어떤 타깃 어플리케이션으로 보낼 것인지 하나하나 설정할 필요 없이 무조건 카프카로 넣으면 된다. 그 이후에는 타겟 어플리케이션이 저장된 데이터를 필요할 때 입력된 순서대로 가져가기만 하면 된다.
그런데 문제는 작은 스타트업은 이렇게 복잡한 아키텍쳐를 가지고 있을까 하는 점이다. 우리는 적어도 우리는 그러하다. 위 그림 그대로 따라 그리면 다음과 같다.
우리 같은 경우에는 저장을 위한 타깃 어플리케이션이 하나 이상 있지 않고, 심지어 일반적으로 사용되는 타겟 어플리케이션이라고 불리울만한 것들이 없는 서비스도 많이 보아왔다. 메시지 큐 기반의 구조도 현재로썬 크게 도움이 되어 보이지 않는다. 프로듀서에서 여러 컨슈머들을 등록해야 한다면 어플리케이션 설정 및 관리해야할 엔티티들이 많아짐에 따라 복잡해지겠지만, 현재는 그렇지 않기 때문이다.
이 때문에 스타트업에서 적어도 복잡한 아키텍쳐를 개선하는 이유로써의 도입은 설득력이 없다.
배치 처리를 통한 높은 처리량
매 요청마다 TCP연결을 위해 3-handshake를 하고 자원을 할당하는 것은 분명히 네트워크 비용이다. 동일한 양의 데이터를 묶음으로 처리할 수 있다면 이러한 비용을 절감 할 수 있을 것이다. 아파치 카프카는 많은 량의 데이터를 동일 목적의 데이터를 여러 파티션에 분배하고 이를 병렬 처리할 수 있다.
다만, 소스 어플리케이션과 타겟 어플리케이션이 복잡할 경우에는 1:1처리 시에 중복된 많은 네트워크 통신이 발생하겠지만 아키텍쳐가 위처럼 단순할 때는 네트워크 비용 절감에 드라마틱한 효과를 보긴 어려워보인다. 가령 타겟 어플리케이션이 하나라면 애초에 데이터 저장에 있어 중복된 요청들이 없기 때문이다. 네트워크 통신은 타겟어플리케이션이 부담하는 것들이 단순히 아파치 카프카로 이동한 형태가 되어버린다.
장애 발생을 대비하는 영속성
카프카는 다른 메시징 플랫폼과는 다르게 전송받은 데이터를 메모리에 저장하지 않고 파일 시스템에 저장한다. 일반적으로 파일시스템에 저장하게 되면 메모리에 저장하는 것보다 느리다. 카프카는 이를 파일 I/O 성능 향상을 위한 페이지 캐시(page cache)영역을 메모리에 따로 생성하여 사용하는 운영체제의 방식을 활용하였다. 한번 읽은 파일 내용은 메모리에 저장해두고 사용하기 때문에 파일 시스템에 데이터를 저장하더라도, 전송 처리량이 높은 이유가 이러하다. 이렇게 파일 시스템으로 저장할 시에 급작스럽게 서비스가 장애 발생으로 종료되더라도 프로세스를 재시작하여 안전하게 데이터를 재처리 할 수 있다.
분명 이러한 특징은 우리가 로깅 시스템을 운영하는데 있어서, 크나큰 이점이 될 수 있다. 현재로써는 장애 발생시에는 데이터를 다 손실할 수 밖에 없는 구조지만 카프카에 또 한번 저장함으로써 가용성 높은 데이터를 확보하게 되므로 장애 발생시에도 장애 발생동안 처리하지 못한 데이터를 복구 후에 마저 처리할 수 있음을 기대 할 수 있다.
카프카 내부의 고가용성
일반적으로 3개 이상의 서버들로 카프카 클러스터가 운영이 된다고 한다. 이 경우에는 일부 서버에 장애가 발생하더라도 무중단으로 안전하고 지속적으로 데이터를 처리할 수 있다고 한다. 클러스터로 이루어진 카프카는 데이터의 복제를 통해 고가용성의 특징을 가진다. 프로듀서로 전송받은 데이터는 하나의 브로커에만 저장하는것이 아니라 여러 브로커에 저장하여 복제된 데이터를 기준으로 지속적으로 데이터 처리가 가능하다. 이 때문에 카프카 내부의 장애가 발생하더라도 데이터의 유실에 대비할 수 있다.
좋다고 치자. 그렇다면 도입 비용은 저렴한가.
작은 스타트업에서는 신규 기술 도입 및 유지보수는 엄청난 도입 비용이 발생한다. 때문에 아무리 좋은 기술이라도 유지보수해야 할 범위가 넓으면 아무래도 도입을 고려하는데 망설여진다.
이를 살펴보기 위해서는 아무래도 카프카와 그 주위의 생태계를 이해할 필요가 있다.
데이터 레이크 아키텍처와 카프카 생태계
카프카 기본 생태계
람다 아키텍쳐
람다 아키텍쳐 예제
꽤 이전부터 “데이터 레이크 아키텍처” 즉 빅데이터 처리를 위한 아키텍처로써 람다 아키텍처라는 것이 일반적이였나 보다. 이 아키텍처는 3가지 레이어로 나뉜다.
배치 레이어는 이름 그대로 배치 데이터를 모아서 특정 이벤트마다 일괄 처리한다. 서빙 레이어는 가공된 데이터를 사용자가 사용할 수 있게 데이터를 저장해둔 공간이다. 스피드 레이어는 카프카가 위치한 곳으로 서비스에서 생성되는 원천 대이터를 실시간으로 분석하는 용도로 활용된다. 배치 데이터에 비해 빠르게 분석해야 하는 경우 스피드 레이어를 통해 데이터를 분석한다.
이 때, 배치 처리를 하는 레이어와 실시간 처리를 하는 레이어를 분리함으로써 데이터 처리의 목적에 맞는 방식을 명확히 분리할 수 있었다. 하지만 레이어가 2개로 분리되면서 데이터를 처리하는데 필요한 로직이 각각의 레이어에 별개로 존재해야 하고, 배치 데이터와 실시간 데이터를 섞어 사용해야 할 경우에는 다소 어려움을 겪게 되는 단점이 생기게 되었다.
카파 아키텍처
이러한 단점을 해소하기 위해서 카파 아키텍처가 제안되었다. 람다 아키텍처에서 단점으로 부각되었던 로직의 파편화, 디버깅, 배포, 운영 분리에 대한 이슈를 제거하기 위해 배치 레이어를 제거해버린다. 카파 아키텍처는 스피드 레이어에서 데이터를 모두 처리할 수 있었으므로 이러한 중복 이슈를 해결할 수 있게 된다.
이렇게 될 경우에 기존의 배치 레이어가 하는 역할을 스피드 레이어가 대신해야 한다. 즉, 스피드 레이어의 데이터 처리 방식인 스트림 데이터를 배치 레이어가 사용하는 방식인 배치 데이터로 사용할 수 있어야 한다. 아파치 카프카는 다음과 같은 방법으로 문제를 해결한다.
스트림 데이터를 배치 데이터로 사용하는 방법은 로그에 시간을 남기는 것이다. 로그에 남겨진 시간을 기준으로 데이터를 처리하면 스트림으로 적재된 데이터도 배치로 처리할 수 있게 된다. 2021년 신입생 목록을 배치 데이터로 가져오기 위해서 스트림 데이터로 적재된 1월 1일부터 12월 31일까지의 데이터를 구체화된 뷰로 가져온다면 배치로 처리할 수 있게 된다. 카프카는 로그에 시간(timestamp)을 남기기 때문에 이런 방식의 처리가 가능하다.
카프카 핵심 개념
카프카 브로커는 클라이언트와 데이터를 주고 받기 위해 사용하는 어플리케이션이다. 가장 기본이 되는 서버 어플리케이션이라고 생각하면 좋을듯 하다. 하나의 서버에는 한 개의 카프카 브로커만이 실행된다. 카프카 브로커 서버 1대로도 기능은 동작하나, 일반적으로 데이터를 안전하게 보관하고 처리하기 위해서 3대 이상의 브로커 서버를 1개의 클러스터로 묶어서 운영한다. 그리고 카프카 클러스터는 이 상위 개념으로써 이름 그대로 카프카의 브로커들을 포함하며 프로듀서들이 보낸 데이터를 여러 브로커에 분산 저장하거나 복제하는 역할을 수행한다. 주키퍼는 분산 시스템에서 서버 및 시스템들간의 여러 코디네이팅을 해주는 어플리케이션이라고 한다.
브로커 내에는 다시 토픽, 리더 파티션, 팔로워 파티션, 세그먼트, 레코드와 같은 분류가 존재한다.
토픽은 목적에 따라 분리된 데이터 저장 공간으로써 브로커에 원하는 만큼 위치할 수 있다. 데이터베이스의 테이블과 유사한 역할을 하기 때문에 도메인 별로 분리되거나 할 수 있다. 파티션은 토픽 안에 위치하며 논리적으로 저장되어 있는 데이터 저장 공간 ( 오프셋을 할당 받아 저장됨 ) 이다. 마찬가지로 하나의 토픽안에 여러 파티션들이 있을 수 있다. 파티션들은 또 다른 브로커에 복제가 가능하다. 이렇게 생성한 파티션을 팔로워 파티션이라고 부른다. 이를 통해서 데이터의 가용성을 높일 수 있다는 강력한 장점 때문에 일반적으로는 2이상의 복제 개수를 설정하는 경우가 많다. 다만 너무 많은 팔로워 파티션들은 쓰기 시에 발생하는 오버헤드나, 복제 개수만큼 저장 용량이 증가 하기 때문에 적절한 값이 필요하다.
파티션은 다시 세그먼트라는 나뉘는데, 이는 실제로 메시지가 저장되는 파일시스템 단위이다. 세그먼트의 마지막 파일 즉, 쓰기가 현재 일어나고 있는 파일은 별도로 액티브 세그먼트라고 부른다. 파일 시스템이기 때문에 현재 열려있는 파일에 한에서는 분리해서 작업을 해야하기 때문에 이러한 분류가 있다. 가령 retention 옵션에 따라 얼마만크 세그먼트를 어느 주기로 얼마만큼 보유하고 나머지는 삭제할 지 설정하는데, 이 때 대부분의 경우 액티브 세그먼트는 이 정책에서 제외된다. 그리고 세그먼트들은 오프셋들을 할당받는데, 이를 통해서 컨슈머들이 어디까지 데이터를 가지고 갔는지에 대한 여부를 알 수 있다.
그리고 데이터를 나타내는 가장 작은 단위가 레코드이다. 프로듀서가 생성한 레코드가 브로커로 전송되면 오프셋과 타임스탬프가 저장되게 된다. 이 떄, 한번 적재된 레코드는 수정할 수 없고 retention 관리 등으로 삭제만 가능하기 때문에 입력 시에 의도한 대로 데이터가 들어오는지 확인할 필요가 있다.
레코드 구성요소로는 타임스탬프, 헤더, 메시지 키, 메시지 값, 오프셋이 있다.
타임 스탬프는 스트림 프로세싱에서 활용하기 위한 시간을 저장하는 용도로 사용된다.
오프셋은 컨슈머에서 중복 처리를 방지하기 위한 구분자 역할을 하기 때문에 어디까지 처리를 완료했고, 앞으로 처리해야할 데이터가 무엇인지를 구분하는 역할을 한다.
헤더는 key/value 데이터를 추가할 수 있으며 일반적으로는 레코드의 스키마 버전이나 포맷과 같은 데이터 프로세싱에 참고할만한 정보를 담아서 사용할 수 있다.
레코드 키는 처리하고자 하는 메시지의 값을 분류하기 위한 용도로 사용할 수 있다. 이를 파티셔닝이라고 부르는데 파티셔닝에서 사용하는 메시지 키는 파티셔너에 따라 토픽의 파티션 번호가 정해진다. 메시지 키는 필수 값이 아니라 지정하지 않으면 null로 설정된다. 메시지 키가 null인 경우에는 키 별로 파티셔닝이 불가능하므로 파티션에 라운드 로빈으로 전달된다. null이 아닌 경우에는 해쉬값에 의해서 특정 파티션에 매핑되어 전달된다.
레코드 값은 실제로 처리할 데이터가 담기는 공간이다. 메시지 값의 포맷은 제네릭으로 사용자가 정할 수 있다. Float, Byte[], String등 다양한 형태로 지정할 수 있으나 일반적으로는 직렬화에 어려움 때문에 String을 가장 많이 쓰고 그 외 JSON이 유용하게 쓰인다고 한다. 또 브로커에 저장된 값의 포맷에 대해 컨슈머는 어떤 포맷으로 지정되어 있는지 알 방법이 없으므로 미리 역직렬화 포맷을 알고 있어야 한다.
위 내용을 정리하면 아래와 같다.
브로커들의 모음. 클러스터는 1대 이상의 브로커로 이루어져있다.
분산 코디네이션 시스템으로 브로커를 코디네이팅하는 역할을 하며, 클러스트 내의 리더 브로커를 발탁하는 방식을 제공
클라이언트와 데이터를 전달하기 위한 어플리케이션. 브로커는 복제, 컨트롤러, 데이터 삭제, 오프셋 저장 역할을 수행
목적에 따라(도메인 별) 분리된 데이터 저장 공간으로써 브로커에 원하는 만큼 위치할 수 있다.
파티션은 토픽 안에 위치하며 논리적으로 저장되어 있는 데이터 저장 공간. 컨슈머의 처리량을 늘리기 위해 파티션의 개수를 늘릴 수 있다.
물리적으로 메시지가 저장되는 파일시스템 단위이다. 세그먼트의 마지막 파일 즉, 쓰기가 현재 일어나고 있는 파일은 별도로 액티브 세그먼트라고 부른다. 파일 시스템이기 때문에 현재 열려있는 파일은 접근이 제한되어야 하기 때문이다.
레코드는 데이터를 포함한 여러가지 정보들을 담고 있다. 이는 timestamp, offset, headers, key, value로 이루어져있다. timestamp는 레코드 적재 시간, offset은 컨슈머가 어디 까지 읽어 갔는지, headers는 메타데이터 정보를 포함한다. 레코드 키는 필수값은 아니나, 사용 시에 구분자로 사용될 수 있으며 같은 key를 가지는 레코드는 같은 파티션에 저장된다. value는 데이터를 포함한다.
기본 사용법
설치하기
1.
위 페이지에 접속 후 카프카 2.5.1 버전의 바이너리 파일 중 하나를 다운 받거나 kafka_2.12-2.5.1.tgz 를 클릭하여 직접 다운 받는다.
2.
압축을 푼다.
카프카 클러스터 ( 주키퍼, 브로커 ) 실행 및 토픽 생성
아래 순서대로 진행하면 된다. 이 때, 브로커는 서버 하나에 하나 이상 실행 시키지 않는다. 실행은 가능하나, 운영상에 포트의 중복, 자원의 분배등의 이슈로 하나의 서버에는 하나만 실행하고 추가적인 브로커가 요구된다면 스케일 아웃 하는 것을 권장한다.
1.
데이터를 보관할 data 폴더를 만든다.
mkdir data
ls
LICENSE README.md config libs site-docs NOTICE bin data logs
Shell
복사
2.
server.properties 내의 log.dirs 값을 현재 데이터의 경로로 변경한다.
# kafka server.properties for localhost broker test
broker.id=0
num.network.threads=3
num.io.threads=8
# Please modify directory
log.dirs=/Users/moon/Documents/kafka/0206/data
num.partitions=3
listeners=PLAINTEXT://localhost:9092
... 이하 생략
Shell
복사
3.
주키퍼, 브로커를 각각 실행한다.
# 주키퍼 실행
bin/zookeeper-server-start.sh config/zookeeper.properties
# 브로커 실행
bin/kafka-server-start.sh config/server.properties
# 브로커 실행 확인
bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092
##localhost:9092 (id: 0 rack: null) -> (
## Produce(0): 0 to 8 [usable: 8],
## Fetch(1): 0 to 11 [usable: 11],
## ListOffsets(2): 0 to 5 [usable: 5],
Shell
복사
4.
토픽을 생성한다.
# 토픽 생성
bin/kafka-topics.sh \
--create \
--bootstrap-server localhost:9092 \
--topic hello.kafka
# Created topic hello.kafka.
#############################
## 토픽 생성 시 사용할 수 있는 옵션 값이 존재 한다.
## partitions ( 파티션 갯수 )
## config retention.ms ( 메시지 저장 기간 )
## replication-factor (브로커당 토픽의 레플리케이션 개수 )
#############################
# 토픽 상세 정보
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--topic hello.kafka \
--describe
# Topic: hello.kafka PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824
# Topic: hello.kafka Partition: 0 Leader: 0 Replicas: 0 Isr: 0
# Topic: hello.kafka Partition: 1 Leader: 0 Replicas: 0 Isr: 0
# Topic: hello.kafka Partition: 2 Leader: 0 Replicas: 0 Isr: 0
# 토픽 리스트
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
# hello kafka
Shell
복사
프로듀서와 컨슈머 생성
프로듀서와 컨슈머는 여러가지 방법으로 생성할 수 있다.
kafka-console로 생성하기
보통 간단한 테스트를 위해서 사용한다. 아래의 명령어로 생성 가능하다.
프로듀서 생성 (kafka-console-producer)
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic hello.kafka \
--property "parse.key=true" \
--property "key.separator=:"
>key1:no1
>key2:no2
>key3:no3
#################################
## 옵션 설명
## topic = 프로듀서가 전송할 토픽 ( 필수 )
## --property "parse.key=true" = 레코드 키를 사용할 건지의 여부 ( 옵션 )
## --property "key.separator=:" = 입력 값 중 레코드의 키와 값을 구분하는 구분자 ( 옵션 )
##################################
Shell
복사
컨슈머 생성 (kafka-console-consumer)
bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
--topic hello.kafka \
--property print.key=true \
--property key.separator="-" \
--group hello-group
--from-beginning
#################################
## 옵션 설명
## topic = 컨슈머가 읽을 토픽 ( 필수 )
## --property print.key=true 레코드 키를 출력 할 건지의 여부 ( 옵션 )
## --property key.separator="-" = 출력 값의 레코드의 키와 값을 구분하는 구분자 ( 옵션 )
## --from-beginning 토픽의 가장 처음 레코드부터 읽어들인다. ( 옵션 )
## hello-group 컨슈머 그룹을 설정할 수 있다. 이로 그룹별로 오프셋이 적용된다. ( 옵션 )
##################################
Shell
복사
Producer 및 Consumer 어플리케이션 개발
어플리케이션은 Java로 작성한다. 특히 카프카는 Java 생태계가 가장 크고 많이 사용되기 때문에 다른 언어로 사용하는 것보다 Java 사용이 유리하다.
Producer
프로듀서 어플리케이션의 아키텍쳐는 다음과 같다.
ProducerRecord 는 KafkaProducer 의 send 메서드를 통해 Partitioner 로 전달한다. Partitioner 는 어느 파티셔너로 전달할지를 결정하게 된다. 이후에 Accumlator 에서 토픽의 레코드들을 배치로 처리한다. 마지막으로 Sender 를 통해 카프카 클러스터로 전송된다.
의존성
dependencies {
compile 'org.apache.kafka:kafka-clients:2.5.0'
compile 'org.slf4j:slf4j-simple:1.7.30'
}
Java
복사
코드
public class Producer {
private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";
public static void main(String[] args) {
// 필수 설정값은 아래와 같다.
// BOOTSTRAP_SERVERS_CONFIG - 브로커의 주소
// KEY_SERIALIZER_CLASS_CONFIG - 키의 직렬화 포멧
// VALUE_SERIALIZER_CLASS_CONFIG - 값의 직렬화 포맷
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
ProducerRecord<String, String> record = new ㅊ
Java
복사
Consumer
컨슈머 어플리케이션의 아키텍쳐는 다음과 같다.
컨슈머 어플리케이션은 카프카 클러스터와 연결 이후에 Fetcher 에서 미리 레코드의 일부 값을 가져온다. 이후에 while(true) 를 통해 지속적으로 poll() 를 통해 레코드를 가져온다.
의존성
dependencies {
implementation 'org.apache.kafka:kafka-clients:2.5.0'
implementation 'org.slf4j:slf4j-simple:1.7.30'
}
Java
복사
코드
public class SimpleConsumer {
private final static Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";
private final static String GROUP_ID = "test-group";
public static void main(String[] args) {
// 필수 속성은 다음과 같다.
// BOOTSTRAP_SERVERS_CONFIG - 브로커의 주소
// KEY_DESERIALIZER_CLASS_CONFIG - 키의 역직렬화 포멧
// VALUE_DESERIALIZER_CLASS_CONFIG - 값의 역직렬화 포맷
// 옵션 속성은 다음과 같다.
// GROUP_ID_CONFIG = 그룹 아이디
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
logger.info("record:{}", record);
}
consumer.commitSync();
}
}
}
Java
복사
Kafka Connect 사용
컨슈머 측에서 자주 사용하는 데이터파이프라인을 추상화 한 “커넥터 모듈"을 설치하여 사용하는 방식.
반복적인 파이프라인을 직접 개발할 필요 없이 오픈소스 모듈을 다운받기만 하면 되서 많이 쓰임.
커넥터는 REST API를 통해 관리됨
커넥트 REST API 인터페이스
FileSinkTask
토픽 내 레코드를 읽어 파일에 작성하는 커넥터 모듈을 예제로 확인한다.
설정 값 적용
vi config/connect-distributed.properties
bootstrap.servers=my-kafka:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
Shell
복사
분산 모드 커넥트 실행, 플러그인 확인
$ bin/connect-distributed.sh config/connect-distributed.properties [2021-12-03 14:01:14,219] INFO WorkerInfo values:
## jvm.args = -Xms256M, -Xmx2G, -XX:+UseG1GC, -XX:MaxGCPauseMi
...
$ curl -X GET http://localhost:8083/connector-plugins [
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "type": "sink",
"version": "2.5.0"
}...
Shell
복사
새로운 커넥터 생성
$ curl -X POST \ http://localhost:8083/connectors \ -H 'Content-Type: application/json' \ -d '{
"name": "file-sink-test", "config":
{
"topics":"test", "connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":1,
"file":"/tmp/connect-test.txt"
} }
# /tmp/connect-test.txt 에 토픽 레코드 작성
Shell
복사
커넥터의 동작 및 생성된 파일 확인
# 커넥터 상태 확인
$ curl http://localhost:8083/connectors/file-sink-test/status {
"name": "file-sink-test", "connector": {
"state": "RUNNING",
"worker_id": "127.0.0.1:8083" },
"tasks": [ {
"id": 0,
"state": "RUNNING", "worker_id": "127.0.0.1:8083"
} ],
"type": "sink" }
## 프로듀서로 데이터 입력
$ bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \ --topic connect-test
> a
> b
> c
> d
> e
# 파일 확인
$ cat /tmp/connect-test.txt
> a
> b
> c
> d
> e
Shell
복사
결론
최종 보스. 극한의 퍼포먼스. 거대한 생태계
이벤트 기반의 메시지큐, 데이터 스트림 프로세스 플랫폼은 여러 가지 있다. 이 중에서 카프카는 가장 많은 기능을 가지고 있고, 다양한 설정이 가능하다. 이를 통해서 다양한 전략을 적용할 수 있고 그를 통해 퍼포먼스 향상을 꾀할수 있다. 하지만 반대로 전략에 대한 경험치가 없다면 이에 대한 비용은 상당할 것이라 생각된다. 운영에 대한 여러 설정이나 전략은 이야기는 일체 하지 않고 가장 기본이 되는 이야기만 했는데도 상당한 양이다.
생태계 또한 정말 거대하다. 위에서 이야기 하지 않은 “카프카 스트림즈", “커넥터 활용을 위한 클라이언트”등 무수히 많은 개념과 생태계를 보유하고 있다. 생태계가 넓다는 것은 그 만큼 많은 정보, 많은 기능을 가지고 있다는 것이지만 그만큼 알아야 할 것이 많다는 것이 된다.
이러한 이유로 처음부터 카프카를 도입을 고려할 필요는 없다고 생각한다. 다른 좋은 플랫폼들이 존재하니, 사용하다가 한계에 부딪히면 그 때 바꿔도 좋지 않을까 생각한다.
실제로 쏘카에서는 aws kinesis 를 사용하다가 Consumer 가 계속해서 증가함에 따라 Kinesis Stream 에 병목이 발생했고 이를 극복하기 위해 카프카를 도입한 사례가 있다. 이 때 직접 카프카 클러스터를 관리하기 보다는 AWS MSK 를 사용해서 관리 비용을 줄인 것도 확인 할 수 있다.
이와 같이 스타트업이라면 비용 대비 효율을 생각하여 점차 개선하는 방식으로 고려하는 것이 좋으리라 생각한다.
< 카프카는 데이터 파이프라인을 위한 올인원 최종 보스 >