Kafka
Kafka 개요
현재 하나의 서비스는 다양한 서비스와 연동됩니다. 이때 end-to-end 방식으로 연결되어 있으면 속도상 빠르다는 장점이 있을 수 있지만 아래와 같은 단점이 있습니다.
- 데이터 연동의 복잡성이 증가해 하드웨어, 운영체제에 영향을 많이 받고 장애에 영향을 많이 받음
- 서로 다른 데이터 파이프 라인으로 인한 확장에 어려움
이를 해결하고자 중간단계의 추상화를 위해 Apache Kafka가 등장했고, Data를 생성하는 Producer, Data를 소비하는 Cosumer를 통해 end-to-end는 서로 모르지만, 유연한 데이터 파이프라인을 구축할 수 있습니다.
kafka 구성
kafka Broker
실제로 실행되는 Kafka 애플리케이션 서버로 3대 이상의 Broker Cluster를 구성해 브로커에 이상이 있을 시 다른 Broker를 활용하는 등의 클러스터링이 가능합니다.
N개의 Broker 중 한대는 Controller 기능을 수행합니다.
- 각 Broker에게 담당 파티션 할당 수행
- Broker 정상 동작 모니터링 관리
Apache Zookeeper
메타데이터 (Broker ID, Controller Id) 저장, Controller 정보 저장하고, 저장된 정보를 통해 Controller에 이상이 생겼을 때 정상적인 Broker를 Controller로 승격을 관리합니다.
MySQL Duplication 대신 Kafka를 알았더라면...?
이전 프로젝트에서 DB 백업과 INSERT와 SELECT 하는 DB를 나눠 성능향상을 위해 MySQL Duplication을 진행했습니다.
https://anythingis.tistory.com/174
[프로젝트 회고] 시작 프로젝트 회고
https://github.com/kanggeonnim/PerspectiView?tab=readme-ov-file GitHub - kanggeonnim/PerspectiView: 시점으로 보는 작품, 시작 시점으로 보는 작품, 시작. Contribute to kanggeonnim/PerspectiView development by creating an account on GitHub.
anythingis.tistory.com
해당 부분 맡은 팀원이 mysql 복제를 위해 별로 없는 레퍼런스를 찾아가며 설정과 테스트를 위해 일주일정도의 걸렸고, 이 경험이 나중에 프로젝트에서 MySQL를 사용할 때 유의미하다고 생각해 아쉬웠습니다. Kafka를 활용하면 Producer/Consumer를 분리해 운영체제, 서비스에 구애받지 않는 파이프라인을 구축하는 역량을 길러 다른 서비스끼리 연동할 때도 활용할 수 있어 더 좋았을 것 같습니다.
(Kafka Broker, Zookeeper 등으로 인한 기본으로 구동되는 무거운 서버 부하에 대한 부분도 생각해야할 듯 합니다.)
Kafka Connect
Producer와 Consumer로 Kafka Pipeline을 생성할 수 있지만 반복적으로 파이프 라인을 구축해야합니다.
Kafka Connect는 코드 없이 Configuration으로 Kafka Cluster에 Data를 Import/Export 할 수 있게 도와줘 반복적인 파이프라인 구축을 간편하게 도와줍니다.
특히 코드를 짤 수 있는 서비스끼리의 통신은 Producer, Consumer를 구현하기 쉽지만, Topic에 생성된 데이터를 DB에서 Produce, Consume해야 할 경우 따로 서비스를 만들어야하기 때문에 이때 Connect 를 활용해 별도의 서비스 구현 없이 DB에서 Produce, Consume할 수 있습니다.
- Standalone(단일)/Distribution(분산) mode 지원
- RestfulAPI 지원
- Stream 또는 Batch 형태로 데이터 전송 가능
- 커스텀 Connector를 통한 다양한 plugin 제공(File, S3, Hive, MySQL...)
Kafka & Kafka Connect 활용이 필요할 때
MSA 아키텍처로 서비스를 구성할 때 동일한 서비스를 스케일 아웃을 통해 부하를 분담할 수 있습니다.
이때의 문제점에 대해 생각해보겠습니다.
APIGateway는 등록된 두개의 orderservice 중에 하나에 API를 넘겨주는데 order service 1에 주문 등록이 되었다고 가정하겠습니다.
그리고 GET을 통해 주문목록을 조회하려고 할때 해당 API가 order service 2에 넘겨준다면 DB가 달라서 주문이 없다고 조회될 수 있습니다.
데이터베이스 동기화의 해결법은 세가지가 있습니다.
첫번째로 N개의 서비스에서 한개의 DB를 사용하는 것입니다.
DB 단에서 부하가 커지고 트랜잭션 관리를 통한 동시성 제어가 필요합니다.
두번째는 N개의 서비스의 DB를 메세지, 이벤트 큐잉 시스템을 통해 동기화 시켜주는 것입니다.
Producer와 Consumer 파이프라인을 양방향으로 구축해줘야합니다.
세번째는 단일 DB 앞단에 메세지 큐잉 시스템을 활용해 동시성에 대한 문제는 큐잉 시스템에 위임해 처리할 수 있습니다.
추후에 두번째, 세번째 방법을 활용하기 위해 Kafka와 Kafka Connect를 활용해보겠습니다.
Kafka & Kafka Connect 환경 설정
Windows 환경에서 kafka, kafka Connect, SpringBoot + gradle, MariaDB를 활용합니다.
Kafka 환경설정에 대한 이해를 키우기 위해 로컬로 진행하고, MariaDB는 Docker를 활용해서 실행합니다.
kafka 다운로드
https://kafka.apache.org/downloads
Apache Kafka
Apache Kafka: A Distributed Streaming Platform.
kafka.apache.org
tar xvf kafka_2.12-3.4.0
kafka connect 다운로드
curl -O http://packages.confluent.io/archive/7.3.1/confluent-community-7.3.1.tar.gz
tar xvf confluent-community-7.3.1.tar.gz
Kafka Connect JDBC 다운로드
https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
JDBC Connector (Source and Sink)
Discover 200+ expert-built Apache Kafka connectors for seamless, real-time data streaming and integration. Connect with MongoDB, AWS S3, Snowflake, and more.
www.confluent.io
Kafka 폴더(work) 생성
압훅해제한 Kafka, KafaConnect(confluetn-7.3.1), KafkaConnect-JDBC를 work 폴더를 만들어 모으겠습니다.
Kafka Connect JDBC Plugin 추가
confluent-7.3.1/etc/kafka/connect-distributed.properties PlugIn 변경
Kafka Connect MariaDB Connect 의존성 추가
SpringBoot 프로젝트에서 Gradle을 통해 MariaDB Java Client 의존성을 추가하고, 해당 라이브러리를 복사합니다.
windows + gradle 기준 confluent-7.3.1\share\java\kafka 에 gradle로 추가한 mariadb java client 의존성 복사
Users\PC\.gradle\caches\modules-2\files-2.1\org.mariadb.jdbc\mariadb-java-client\${version}\${cache}\mariadb-java-client-${version}
confluent-7.3.1\share\java\kafka
Kafka & Kafka Connect 실행
MariaDB 실행
도커를 활용해 MariaDB를 실행합니다.
docker run --name mariadb -d -p 3306:3306 --restart=always -e MYSQL_ROOT_PASSWORD=1q2w3e4r mariadb
Zookeeper 실행
카프카 클러스터를 관리해줄 주키퍼를 먼저 실행합니다.
cd work/kafka_2.12-3.4.0
./bin/windows/zookeeper-server-start.bat ./config/zookeeper.properties # windows 기준
Kafka Server 실행
./bin/windows/kafka-server-start.bat ./config/server.properties
Kafka Connect 실행
cd work/confluent-7.3.1
./bin/windows/connect-distributed.bat ./etc/kafka/connect-distributed.properties #windows 기준
Kafka Connect Source
POST http://localhost:8083/connectors
{
"name": "my-source-connect", # 등록할 Connector 이름
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", # source connector 종류
"connection.url": "jdbc:mariadb://localhost:3306/mydb",
"connection.user": "root",
"connection.password": "1q2w3e4r",
"mode": "incrementing", # 데이터 추가 시 데이터 polling 방식
"incrementing.column.name": "id", # 자동증가 column
"table.whitelist": "mydb.users", # 데이터 변경 감지 table 명, 여러 db에 동일한 테이블이 존재하면 db.table로 명시
"topic.prefix": "my_topic_", # kafka 토픽에 저장될 형식
"tasks.max": "1"
}
}
조회도 등록과 마찬가지로 RestfulAPI를 제공합니다.
Connectors 등록후에는 이전에 없던 connect 관련 토픽들을 조회할 수 있습니다.
Source 연결 테이블 INSERT
Soruce Connect와 연결된 DB에서 Insert를 실행하면 토픽을 통해 데이터 생성되는 것을 볼 수 있습니다.
insert into users(user_id, pwd, name) values ('users1', 'test1111', 'user name');
$ ./bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my_topic_users --from-beginning
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "user_id"
},
{
"type": "string",
"optional": true,
"field": "pwd"
},
{
"type": "string",
"optional": true,
"field": "name"
},
{
"type": "int64",
"optional": true,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "created_at"
}
],
"optional": false,
"name": "users"
},
"payload": {
"id": 1,
"user_id": "users1",
"pwd": "test1111",
"name": "user name",
"created_at": 1721536482000
}
}
Kafka Connect Sink
생성된 데이터를 Consume 하기 위해 Sink Connect를 등록합니다.
POST http://localhost:8083/connectors
{
"name": "my-sink-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mariadb//localhost:3306/mydb",
"connection.user": "root",
"connection.password": "1q2w3e4r",
"auto.create": "true", # 데이터 넣은 테이블 없으면 생성
"auto.evolve": "true", # 특정 데이터 colmun 누락 시 alter로 테이블 변경
"delete.enabled": "false", # null 레코드 값 삭제 여부
"tasks.max": "1",
"topics": "my_topic_users"
}
}
Source 연결 테이블 INSERT
다시 source와 연결된 테이블에 데이터를 produce하면 Sink를 통해 데이터를 consume해 데이터가 복사됩니다.
insert into users(user_id, pwd, name) values ('users2', 'test33', 'user name1');
insert into users(user_id, pwd, name) values ('users3', 'test333', 'user name3');
Table 조회
select * from users;
Sink Table 조회
select * from my_topic_users;
다음 포스팅
세번째 방법에 대한 포스팅입니다.
https://anythingis.tistory.com/195?category=1028453
[MSA] 스프링에서 INSERT Query Kafka에 위임하기
개요https://anythingis.tistory.com/193 [MSA] Apache Kafka & Kafka ConnectKafkaKafka 개요현재 하나의 서비스는 다양한 서비스와 연동됩니다. 이때 end-to-end 방식으로 연결되어 있으면 속도상 빠르다는 장점이 있
anythingis.tistory.com