IT/Spring Cloud

Apache Kafka 활용 (1)

김 정 환 2022. 1. 23. 14:57
반응형

앞으로 구현할 데이터 동기화를 위한 Kafka 개요입니다.

 

 

Orders Service에 요청된 주문 수량 정보를 catalogs Service에 반영

  • Orders Service에서 Kafka Topic으로 메시지 전송 -> Producer
  • Catalogs Service에서 Kafka Topic에 전송된 메시지 취득 -> Comsumer

 

 

 

요약

Catalog Service에 Consumer 추가
Order Servie에 Producer 추가
테스트

 

 

소스코드

 

Catalog Service

pom.xmldependecy 추가합니다.

<!-- Kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

 

Kafka 서비스를 다룰 패키지를 만들어 줍니다.

 

 

Catalog 서비스는 물류를 기록하기 때문에 Consumer를 만들어 주겠습니다. 먼저 Config를 만들겠습니다.

KafkaConsumerConfig.java


@EnableKafka // Kafka 활성화
@Configuration // Configuration으로 사용할 것이기 때문에
public class KafkaConsumerConfig {

    // Consumer Config
    @Bean
    public ConsumerFactory<String, String> consumerFactory(){
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId"); // consumer들을 묶은 ID
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(properties);
    }

    // Consumer Listener
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
                = new ConcurrentKafkaListenerContainerFactory<>();
        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());

        return kafkaListenerContainerFactory;
    }
}

 

KafkaConsumer.java를 만들겠습니다.


@Service
@Slf4j
public class KafkaConsumer {

    // Catalog Service가 가지고 있는 DB
    CatalogRepository repository;

    @Autowired
    public KafkaConsumer(CatalogRepository repository){
        this.repository = repository;
    }

    // 수량 업데이트
    @KafkaListener(topics = "example-catalog-topic")
    public void updateQty(String kafkaMessage){
        log.info("Kafka Message : ->" + kafkaMessage);

        // 가져온 직렬화 데이터를 역직렬화 해준다.
        // Kafka에서 들어오는 데이터를 JSON 형태로 바꿔서 사용
        Map<Object, Object> map = new HashMap<>();
        ObjectMapper mapper = new ObjectMapper();
        try{
            map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
        } catch (JsonProcessingException ex){ // Json으로 바꾸는 과정에 에러가 날 수 있으므로 에러 처리
            ex.clearLocation();
        }

        CatalogEntity entity = repository.findByProductId((String)map.get("productId")); // map.get("productId")으로 반환되는 Object 타입을 String 형태로 변환
        if(entity != null){
            entity.setStock(entity.getStock() - (Integer)map.get("qty"));
            repository.save(entity);
        }
    }
}

 

 

 

Order Service

Catalog 서비스와 마찬가지고 Kafka를 사용할 패키지를 만들어 줍니다.

 

Order 서비스에는 주문을 받습니다. 주문 받은 수량 만큼 catalog에서 감소해야합니다. Order 서비스에서 Producer로 주문받은 수량을 Kafka에게 보냅니다. 먼저 Config를 만들어 줍니다. 

KafkaProducerConfig.java

@EnableKafka // Kafka 활성화
@Configuration // Configuration으로 사용할 것이기 때문에
public class KafkaProducerConfig {

    // Producer Config
    @Bean
    public ProducerFactory<String, String> producerFactory(){
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(properties);
    }

    // 보내주기 위한 instance
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }
}

 

KafkaProducer.java를 만들어 줍니다.


@Service
@Slf4j
public class KafkaProducer {
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate){
        this.kafkaTemplate = kafkaTemplate;
    }

    // Kafka로 보내기
    public OrderDto send(String topic, OrderDto orderDto){

        // 주문 내용을 json 형태로 변경
        ObjectMapper mapper = new ObjectMapper();
        String jsonInString = "";
        try{
            jsonInString = mapper.writeValueAsString(orderDto);
        } catch (JsonProcessingException ex){
            ex.printStackTrace();
        }

        kafkaTemplate.send(topic, jsonInString);
        log.info("Kafka Prducer sent data from Order microservice: " + orderDto);

        return orderDto;
    }
}

 

테스트

  • zookeeper와 kafka를 켜줍니다.
  • order 서비스에서 상품을 주문합니다. 

 

  • catalog 서비스에서 상품의 수량이 줄어든 것을 확인합니다. 120에서 105로 변경되었습니다.

 

 

 

끝.

반응형

'IT > Spring Cloud' 카테고리의 다른 글

Zipkin - 분산 추적  (0) 2022.01.30
CircuitBreaker  (0) 2022.01.30
데이터 동기화 - Kafka 설정  (0) 2022.01.08
Microservice간 통신 - FeignClient 방식  (0) 2022.01.07
Microservice간 통신 - RestTemplate 방식  (0) 2022.01.06