개요
https://anythingis.tistory.com/193
이전 포스팅에서 Zookeeper, Kafka, Kafka Connect를 활용해봤습니다.
그중에서도 단일 DB를 활용하고, 서비스가 DB에 INSERT를 직접 하는게 아닌 카프카에 위임하는 방법을 알아보겠습니다.
장점 : 서비스가 DB Lock, Connection에 대해 자유로움, 동시성 처리를 Kafka가, 비즈니스 로직을 서비스가 분담
단점 : 서비스가 직접 커넥션을 물고 있는게 아니라 예외 발생시 보상처리를 위한 사가패턴이 필요
환경설정
maria db create orders
create table orders (
id int auto_increment primary key,
user_id varchar(50) not null,
product_id varchar(20) not null,
order_id varchar(50) not null,
qty int default 0,
unit_price int default 0,
total_price int default 0,
created_at datetime default now()
)
application.yml
인메모리 h2가 아닌 실제 DB에 연결해 다수의 서비스에서 동일한 DB에 접근하게 합니다.
INSERT 쿼리는 카프카가 전담하기에 SELECT 쿼리를 위해 연결합니다.
spring:
datasource:
#driver-class-name: org.h2.Driver
#url: jdbc:h2:mem:testdb
driver-class-name: org.mariadb.jdbc.Driver
url: jdbc:mariadb://localhost:3306/mydb
username: root
password: 1q2w3e4r
generate-unique-name: false
Order Service INSERT 쿼리 카프카에 위임
Controller + Service
서비스가 직접 JPA를 활용해 DB에 접근하는 과정을 카프카 Produce 과정으로 변환합니다.
아래는 기존 order service의 serviceImpl 구현로직입니다.
order service에서 담당하던 total Price를 구하고 DB에 저장하는 로직을
@Override
public OrderDto createOrder(OrderDto orderDetails) {
orderDetails.setOrderId(UUID.randomUUID().toString());
orderDetails.setTotalPrice(orderDetails.getQty() * orderDetails.getUnitPrice());
ModelMapper mapper = new ModelMapper();
mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
OrderEntity orderEntity = mapper.map(orderDetails, OrderEntity.class);
orderRepository.save(orderEntity);
return mapper.map(orderEntity, OrderDto.class);
}
컨트롤러에서 구현하고 INSERT는 Kafka Producer로 처리합니다.
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@RequestBody RequestOrder order,
@PathVariable String userId) {
ModelMapper mapper = new ModelMapper();
mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
OrderDto orderDto = mapper.map(order, OrderDto.class);
orderDto.setUserId(userId);
/*
jpa
OrderDto returnUserDto = orderService.createOrder(orderDto);
ResponseOrder responseOrder = mapper.map(returnUserDto, ResponseOrder.class);
*/
//kafka
orderDto.setOrderId(UUID.randomUUID().toString());
orderDto.setTotalPrice(order.getQty() * order.getUnitPrice());
ResponseOrder responseOrder = mapper.map(orderDto, ResponseOrder.class);
kafkaProducer.send("example-catalog-topic", orderDto);
// kafka
orderProducer.send("orders", orderDto);
return new ResponseEntity(HttpStatus.CREATED).ok(responseOrder);
}
OrderProducerKafka
INSERT 쿼리를 카프카에 Produce하기 위한 Producer를 구현합니다.
DTO 타입과 DB타입이 매칭 되지 않으면 DB에 null 값이 들어감으로 주의하세요.
@Service
@Slf4j
@RequiredArgsConstructor
public class OrderProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
List<Field> fields = Arrays.asList(new Field("string", true, "order_id"),
new Field("string", true, "user_id"),
new Field("string", true, "product_id"),
new Field("int32", true, "qty"), // DTO 타입 - DB 타입 매칭 중요
new Field("int32", true, "unit_price"),
new Field("int32", true, "total_price"));
Schema schema = Schema.builder()
.type("struct")
.optional(false)
.name("orders")
.fields(fields).build();
public OrderDto send(String topic, OrderDto orderDto){
Payload payload = Payload.builder().
order_id(orderDto.getOrderId())
.user_id(orderDto.getUserId())
.product_id(orderDto.getProductId())
.qty(orderDto.getQty())
.unit_price(orderDto.getUnitPrice())
.total_price(orderDto.getTotalPrice())
.build();
KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(kafkaOrderDto);
}catch (JsonProcessingException e){
e.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("Order Producer send data from Order microservice: " + kafkaOrderDto);
return orderDto;
}
}
실행
Zookeeper 실행
cd /kafka_2.12-3.4.0
./bin/windows/zookeeper-server-start.bat ./config/zookeeper.properties # windows 기준
Kafka 실행
./bin/windows/kafka-server-start.bat ./config/server.properties
Kafka Connect 실행
cd confluent-7.3.1
./bin/windows/connect-distributed.bat ./etc/kafka/connect-distributed.properties #windows 기준
Order Sink Connect 추가
POST http://localhost:8083/connectors
{
"name": "my-order-sink-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mariadb://localhost:3306/mydb",
"connection.user": "root",
"connection.password": "1q2w3e4r",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "false",
"tasks.max": "1",
"topics": "orders"
}
}
order service 실행
두개의 서비스를 기동합니다.
ORDER POST 요청
POST http://localhost:8000/order-service/7fc99a15-44e0-456e-acdc-2d9cd00abda7/orders
{
"productId" : "CATALOG-005",
"qty" : 10,
"unitPrice" : 200
}
RESPONSE BODY
{
"productId": "CATALOG-005",
"qty": 10,
"unitPrice": 200,
"totalPrice": 2000,
"orderId": "bfb5a83f-9224-4e42-b252-79894c2e9215"
}
POST 요청 서비스1, 2에 번갈아서 전달됩니다.
order service 1
order service 2
order service에서 OrderProducerKafka로 Produce 됩니다.
DB 확인
단일 DB에 다수의 서비스를 에서 Kafka Connect Sink에 의해 DB에 Consume된 것을 확인할 수 있습니다.