반응형
앞으로 구현할 데이터 동기화를 위한 Kafka 개요입니다.
복수의 같은 서비스가 하나의 DB를 사용하기 위해서 Kafka로 동기화 하는 방법을 알아보겠습니다.
- Orders Service에서 Kafka Topic으로 메시지 전송 -> Producer
요약
MariaDB 설정
Order service에 Producer 생성
테스트
소스코드
MariaDB 설정
Order service에서 사용할 테이블을 만들겠습니다.
create table orders (
id int auto_increment primary key,
product_id varchar(20) not null,
qty int default 0,
unit_price int default 0,
total_price int default 0,
user_id varchar(50) not null,
order_id varchar(50) not null,
created_at datetime default NOW()
);
Order service
pom.xml에 mariaDB dependency를 추가하겠습니다.
<!-- MariaDB -->
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>2.7.3</version>
</dependency>
application.yaml에 mariaDB 설정을 추가하겠습니다.
spring:
application:
name: order-service
h2:
console:
enabled: true
settings:
web-allow-others: true
path: /h2-console
jpa:
hibernate:
ddl-auto: update #변경된 내용이 있을 때 업데이트
datasource:
# driver-class-name: org.h2.Driver
# url: jdbc:h2:mem:testdb
url: jdbc:mariadb://localhost:3306/mydb
driver-class-name: org.mariadb.jdbc.Driver
username: root
password: test1357
연결이 되었는지 확인해 보겠습니다.
mariaDB에 적재된 것을 확인했습니다.
Kafka를 통해서 DB에 저장하기 위해서는 스키마를 지켜야 합니다.
consumer console를 실행하여 어떤 스키마에 맞춰서 데이터가 전송되는지 확인합니다.
C:\Users\ASUS\IdeaProjects\kafka> ./bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic orders --from-beginning
실행하면 아래와 같이 스키마를 볼 수 있습니다.
{
"schema":{
"type":"struct",
"fields":[
{"type":"string", "optional":true,"field":"order_id"},
{"type":"string", "optional":true, "field":"user_id"},
{"type":"string", "optional":true, "field":"product_id"},
{"type":"string", "optional":true, "field":"qty"},
{"type":"string", "optional":true, "field":"unit_price"},
{"type":"string", "optional":true, "field":"total_price"}
],
"optional":false,
"name":"orders"
},
"payload":{
"order_id":"e1dc8bf6-9429-4073-b6b9-ec3cf0505356",
"user_id":"a33a7345-9151-4ed9-a34c-7a52038abd8e",
"product_id":"CATALOG-005",
"qty":15,
"unit_price":1700,
"total_price":25500
}
}
위 내용을 바탕으로 3개의 클래스를 만들겠습니다. Schema, field, payload 입니다.
Field.java
@Data
@AllArgsConstructor
public class Field {
private String type;
private boolean optional;
private String field;
}
Payload.java
@Data
@Builder // 생성자, setter 없이 필드 값을 직관적으로 생성가능
public class Payload {
// DB column 내용
private String order_id;
private String user_id;
private String product_id;
private int qty;
private int unit_price;
private int total_price;
}
Schema
@Data
@Builder // 생성자, setter 없이 필드 값을 직관적으로 생성가능 -> OrderProducer에서 확인 가능
public class Schema {
private String type;
private List<Field> fields;
private boolean optional;
private String name;
}
이제 Producer를 만들겠습니다. OrderProducer.java
@Service
@Slf4j
public class OrderProducer {
private KafkaTemplate<String, String> kafkaTemplate;
List<Field> fields = Arrays.asList(
new Field("string", true, "order_id"),
new Field("string", true, "user_id"),
new Field("string", true, "product_id"),
new Field("int32", true, "qty"),
new Field("int32", true, "unit_price"),
new Field("int32", true, "total_price")); // 생성자로 필드 값 채우기
Schema schema = Schema.builder() // builder로 필드 값 채우기
.type("struct")
.fields(fields)
.optional(false)
.name("orders")
.build();
@Autowired
public OrderProducer(KafkaTemplate<String, String> kafkaTemplate){
this.kafkaTemplate = kafkaTemplate;
}
/* kafka를 이용해서 단일 DB에 동기화 */
public OrderDto send(String topic, OrderDto orderDto){
Payload payload = Payload.builder()
.order_id(orderDto.getOrderId())
.user_id(orderDto.getUserId())
.product_id(orderDto.getProductId())
.qty(orderDto.getQty())
.unit_price(orderDto.getUnitPrice())
.total_price(orderDto.getTotalPrice())
.build();
KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try{
jsonInString = mapper.writeValueAsString(kafkaOrderDto);
} catch (JsonProcessingException ex){
ex.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("Order Prducer sent data from Order microservice: " + kafkaOrderDto);
return orderDto;
}
}
OrderController.java에 주문 메소드를 수정하겠습니다.
// 주문
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId,
@RequestBody RequestOrder order){
ModelMapper mapper = new ModelMapper();
mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
/* jpa */
OrderDto orderDto = mapper.map(order, OrderDto.class);
orderDto.setUserId(userId);
// order service의 로컬 DB에 저장하는 방법
// OrderDto createdOrder = orderService.createOrder(orderDto);
// ResponseOrder responseOrder = mapper.map(createdOrder, ResponseOrder.class);
/* kafka를 통해 단일 DB에 저장하는 방법 */
// orderService.createOrder에서 가져오기
orderDto.setOrderId(UUID.randomUUID().toString());
orderDto.setTotalPrice(order.getQty() * order.getUnitPrice());
/* send this order to the kafka */
kafkaProducer.send("example-catalog-topic", orderDto);
orderProducer.send("orders", orderDto); // 주문정보 단일 DB에 저장
ResponseOrder responseOrder = mapper.map(orderDto, ResponseOrder.class);
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
}
mariaDB에 데이터를 저장하기 위해서 connector를 실행하겠습니다.
./bin/windows/connect-distributed.bat ./etc/kafka/connect-distributed.properties
order 정보를 저장하기 위한 sink connector를 생성하겠습니다.
더보기
{
"name":"my-order-sink-connect",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"test1357",
"auto.create":"true",
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"orders"
}
}
connector가 생성되었습니다.
테스트
- order service 2개를 실행합니다.
- Postman으로 주문을 하면 2개의 애플이케이션에서 번갈아가며 Producer가 동작합니다.
- 다른 order service에서 데이터가 보내졌지만, 하나의 DB에 적재된 것을 확인했습니다.
끝.
반응형