카테고리 없음

Apache Kafka 활용 (2)

김 정 환 2022. 1. 23. 17:58
반응형

앞으로 구현할 데이터 동기화를 위한 Kafka 개요입니다.

 

복수의 같은 서비스가 하나의 DB를 사용하기 위해서 Kafka로 동기화 하는 방법을 알아보겠습니다.

  • Orders Service에서 Kafka Topic으로 메시지 전송 -> Producer

 

 

 

요약

MariaDB 설정
Order service에 Producer 생성
테스트

 

 

소스코드

 

MariaDB 설정

 

Order service에서 사용할 테이블을 만들겠습니다.

create table orders (
    id int auto_increment primary key,
    product_id varchar(20) not null,
    qty int default 0,
    unit_price int default 0,
    total_price int default 0,
    user_id varchar(50) not null,
    order_id varchar(50) not null,
    created_at datetime default NOW()
);

 

 

Order service

pom.xml에 mariaDB dependency를 추가하겠습니다.

<!-- MariaDB -->
<dependency>
    <groupId>org.mariadb.jdbc</groupId>
    <artifactId>mariadb-java-client</artifactId>
    <version>2.7.3</version>
</dependency>

 

application.yamlmariaDB 설정을 추가하겠습니다.

spring:
  application:
    name: order-service
  h2:
    console:
      enabled: true
      settings:
        web-allow-others: true
      path: /h2-console

  jpa:
    hibernate:
      ddl-auto: update #변경된 내용이 있을 때 업데이트

  datasource:
#    driver-class-name: org.h2.Driver
#    url: jdbc:h2:mem:testdb
    url: jdbc:mariadb://localhost:3306/mydb
    driver-class-name: org.mariadb.jdbc.Driver
    username: root
    password: test1357

 

연결이 되었는지 확인해 보겠습니다.

mariaDB에 적재된 것을 확인했습니다.

 

 

 

Kafka를 통해서 DB에 저장하기 위해서는 스키마를 지켜야 합니다.

consumer console를 실행하여 어떤 스키마에 맞춰서 데이터가 전송되는지 확인합니다.

C:\Users\ASUS\IdeaProjects\kafka> ./bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic orders --from-beginning

실행하면 아래와 같이 스키마를 볼 수 있습니다.

{
   "schema":{
      "type":"struct",
      "fields":[
         {"type":"string", "optional":true,"field":"order_id"},
         {"type":"string", "optional":true, "field":"user_id"},
         {"type":"string", "optional":true, "field":"product_id"},
         {"type":"string", "optional":true, "field":"qty"},
         {"type":"string", "optional":true, "field":"unit_price"},
         {"type":"string", "optional":true, "field":"total_price"}
      ],
      "optional":false,
      "name":"orders"
   },
   "payload":{
      "order_id":"e1dc8bf6-9429-4073-b6b9-ec3cf0505356",
      "user_id":"a33a7345-9151-4ed9-a34c-7a52038abd8e",
      "product_id":"CATALOG-005",
      "qty":15,
      "unit_price":1700,
      "total_price":25500
   }
}

 

위 내용을 바탕으로 3개의 클래스를 만들겠습니다. Schema, field, payload 입니다. 

Field.java

@Data
@AllArgsConstructor
public class Field {
    private String type;
    private boolean optional;
    private String field;
}

 

Payload.java

@Data
@Builder // 생성자, setter 없이 필드 값을 직관적으로 생성가능
public class Payload {
    // DB column 내용
    private String order_id;
    private String user_id;
    private String product_id;
    private int qty;
    private int unit_price;
    private int total_price;
}

 

Schema

@Data
@Builder // 생성자, setter 없이 필드 값을 직관적으로 생성가능 -> OrderProducer에서 확인 가능
public class Schema {
    private String type;
    private List<Field> fields;
    private boolean optional;
    private String name;
}

 

이제 Producer를 만들겠습니다. OrderProducer.java

@Service
@Slf4j
public class OrderProducer {
    private KafkaTemplate<String, String> kafkaTemplate;

    List<Field> fields = Arrays.asList(
            new Field("string", true, "order_id"),
            new Field("string", true, "user_id"),
            new Field("string", true, "product_id"),
            new Field("int32", true, "qty"),
            new Field("int32", true, "unit_price"),
            new Field("int32", true, "total_price")); // 생성자로 필드 값 채우기

    Schema schema = Schema.builder() // builder로 필드 값 채우기
            .type("struct")
            .fields(fields)
            .optional(false)
            .name("orders")
            .build();


    @Autowired
    public OrderProducer(KafkaTemplate<String, String> kafkaTemplate){
        this.kafkaTemplate = kafkaTemplate;
    }

    /* kafka를 이용해서 단일 DB에 동기화 */
    public OrderDto send(String topic, OrderDto orderDto){

        Payload payload = Payload.builder()
                .order_id(orderDto.getOrderId())
                .user_id(orderDto.getUserId())
                .product_id(orderDto.getProductId())
                .qty(orderDto.getQty())
                .unit_price(orderDto.getUnitPrice())
                .total_price(orderDto.getTotalPrice())
                .build();

        KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);

        ObjectMapper mapper = new ObjectMapper();
        String jsonInString = "";
        try{
            jsonInString = mapper.writeValueAsString(kafkaOrderDto);
        } catch (JsonProcessingException ex){
            ex.printStackTrace();
        }

        kafkaTemplate.send(topic, jsonInString);
        log.info("Order Prducer sent data from Order microservice: " + kafkaOrderDto);

        return orderDto;
    }
}

 

 

OrderController.java에 주문 메소드를 수정하겠습니다.

    // 주문
    @PostMapping("/{userId}/orders")
    public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId,
                                                     @RequestBody RequestOrder order){

        ModelMapper mapper = new ModelMapper();
        mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);

        /* jpa */
        OrderDto orderDto = mapper.map(order, OrderDto.class);
        orderDto.setUserId(userId);
        // order service의 로컬 DB에 저장하는 방법
//        OrderDto createdOrder = orderService.createOrder(orderDto);
//        ResponseOrder responseOrder = mapper.map(createdOrder, ResponseOrder.class);

        /* kafka를 통해 단일 DB에 저장하는 방법 */
        //  orderService.createOrder에서 가져오기
        orderDto.setOrderId(UUID.randomUUID().toString());
        orderDto.setTotalPrice(order.getQty() * order.getUnitPrice());


        /* send this order to the kafka */
        kafkaProducer.send("example-catalog-topic", orderDto);
        orderProducer.send("orders", orderDto); // 주문정보 단일 DB에 저장


        ResponseOrder responseOrder = mapper.map(orderDto, ResponseOrder.class);

        return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
    }

 

 

 

mariaDB에 데이터를 저장하기 위해서 connector를 실행하겠습니다.

./bin/windows/connect-distributed.bat ./etc/kafka/connect-distributed.properties

 

order 정보를 저장하기 위한 sink connector를 생성하겠습니다.

더보기
{
    "name":"my-order-sink-connect",
    "config":{
        "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url":"jdbc:mysql://localhost:3306/mydb",
        "connection.user":"root",
        "connection.password":"test1357",
        "auto.create":"true",
        "auto.evolve":"true",
        "delete.enabled":"false",
        "tasks.max":"1",
        "topics":"orders"
    }
}

 

connector가 생성되었습니다.

 

 

 

 

 

테스트

  • order service 2개를 실행합니다.
  • Postman으로 주문을 하면 2개의 애플이케이션에서 번갈아가며 Producer가 동작합니다.

  • 다른 order service에서 데이터가 보내졌지만, 하나의 DB에 적재된 것을 확인했습니다.

 

 

끝.

반응형