IT/Spring Cloud

데이터 동기화 - Kafka 설정

김 정 환 2022. 1. 8. 12:21
반응형

앞으로 구현할 데이터 동기화를 위한 Kafka 설정 개요입니다.

 

운영의 안정성을 위해서 어떤 서비스는 다중화해 놓습니다. 다중화된 서비스에서 데이터베이스는 어떻게 해야할까요. 만약에 아래와 같이 각각의 서비스가 데이터베이스를 가지고 있다면 데이터가 동기화되지 않을 것입니다.

 

하나의 데이터베이스를 두고 동기화하는 것이 쉬워보입니다. 이럴 때에 Messaging Queuing을 사용합니다. 대표적인 솔루션으로 Kafka가 있습니다. 이번 장에서 데이터 동기화를 위한 Kafka 설정을 설명하겠습니다.

 

Kafka로 데이터를 보내고, Kafka에서 데이터를 받기 위해서 Kafka Connect를 설치하겠습니다.

Kafka Connect Source는 Source System의 데이터를 Kafka으로 보냅니다.

Kafka Connect Sink는 Kafka의 데이터를 Target System으로 보냅니다.

 

 

요약

Kafka 설치
MariaDB 설치
Kafka Connect 설치

 

 

Kafka

kafka를 다운로드합니다. 저는 2.8.1을 선택했습니다.

작업하는 폴더에 옮기시고 압출을 해제합니다.

tar 파일일 경우, tar xcf 파일명 명령어로 해제할 수 있습니다.

 

 

콘솔에서 테스트해보겠습니다. (윈도우 환경)

 

Mac일 경우

더보기

Zookeeper 및 kafka 서버 구동

./bin/zookeeper-server-start.sh ./config/zookeeper.properties

./bin/kafka-server-start.sh ./config/server.properties

 

Topic 생성

./bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 --partitions 1

 

Topic 목록 확인

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

 

Topic 정보 확인

./bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-sever localhost:9092

 

메시지 생산 테스트

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic quickstart-events

 

메시지 소비 테스트 

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic quickstart-events --from-beginning

 

Zookeeper 및 kafka 서버 구동

./bin/windows/zookeeper-server-start.bat ./config/zookeeper.properties

./bin/windows/kafka-server-start.bat ./config/server.properties

 

Topic 생성

./bin/windows/kafka-topics.bat --create --topic quickstart-events --bootstrap-server localhost:9092 --partitions 1

 

Topic 목록 확인

./bin/windows/kafka-topics.bat --bootstrap-server localhost:9092 --list

 

Topic 정보 확인

./bin/windows/kafka-topics.bat --describe --topic quickstart-events --bootstrap-sever localhost:9092

 

메시지 생산 테스트

./bin/windows/kafka-console-producer.bat --broker-list localhost:9092 --topic quickstart-events

 

메시지 소비 테스트 

./bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic quickstart-events --from-beginning

 

 

MariaDB

 

Mac일 경우

더보기

설치) $ brew install mariadb

시작) $ mysql.server start

종료) $ mysql.server stop

상태) $ mysql.server status

접속) $ mysql -u root -p

 

MariaDB를 설치합니다.

 

mariaDB를 다운로드합니다. 저는 10.6.5를 선택했습니다. 인스톨러를 사용했습니다.

비밀번호는 test1357로 설정합니다.

 

DB를 실행하겠습니다. mariaDB Prompt를 실행하고 비밀번호를 입력합니다.

DB에 접속했으면, 데이터베이스를 하나 만들겠습니다.

데이터베이스 리스트) show databases;
데이터베이스 생성) create database mydb;
데이터베이스 사용) use mydb;

 

 

이제 user-service에 mariaDB를 연결하겠습니다.

 

user-service에 Dependency 추가

<!-- MariaDB -->
<dependency>
    <groupId>org.mariadb.jdbc</groupId>
    <artifactId>mariadb-java-client</artifactId>
    <version>2.7.3</version>
</dependency>

 

user service를 구동해주고 DB에 접속해봅니다. 설정 내용을 아래와 같이 변경합니다.

 

테이블 생성을 생성해줍니다.

 

 

Kafka와 mariaDB 연결

Mac

더보기

(상세 내용은 윈도우 참조)

 

Kafka Connect 다운로드는 여기. 작업 폴더로 옮깁니다.

압축 해제 : tar xvf confluent-community-6.2.2.tar.gz

 

kafka connect 실행

./bin/connect-distributed ./etc/kafka/connect-distributed.properties

 

JDBC connector 다운로드는 여기. 작업 폴더로 옮깁니다.

JDBC connect를 사용하기 위해서 플러그인 경로를 설정해야 합니다.

Kafka connect 폴더에서 ./etc/kafka/connect-distributed.properties 를 열고 맨 밑으로 가서 아래 경로로 변경합니다.

plugin.path=\C:\\Users\\ASUS\\IdeaProjects\\confluentinc-kafka-connect-jdbc-10.2.6\\lib

 

JDBC connector가 마리아DB를 사용하기 위해서 현재 사용중인 마리아DB 드라이버 정보가 필요합니다.

./share/java/kafka 폴더에 mariadb-java-client-2.7.2.jar 파일 복사

 

 

카프카와 통신하기 위해서 Kafka Connect를 설치하겠습니다. 여기에서 커뮤니티 버전을 다운로드 합니다.

저는 6.2.2 버전 tar파일을 다운받았습니다. 아래 명령어로 압축 해제하고 해당 폴더(/confluent-6.2.2)로 이동합니다.

tar xvf confluent-community-6.2.2.tar.gz

 

kafka connect 실행합니다.

./bin/windows/connect-distributed.bat ./etc/kafka/connect-distributed.properties

 

 

오류 해결

더보기

Classpath 오류

윈도우로 실행하면 아래와 같은 오류가 발생할 수 있습니다.

오류 : classpath is empty. please build the project first.

./bin/windows/kafka-run-class.bat 파일을 열고, 약 95번째 줄에서 rem classpath addtition for core를 찾습니다.

그리고 해당 라인 위에 아래 내용 삽입합니다.

rem classpath addtition for LSB style path
    if exist %BASE_DIR%\share\java\kafka\* (
    call:concat %BASE_DIR%\share\java\kafka\*
)

 

log 경로 오류

참고 : https://www.inflearn.com/questions/199173

 

Connector로는 JDBC connector를 사용하겠습니다. JDBC connector 여기서 다운로드하고 작업 폴더로 옮겨서 압축 해제합니다. JDBC connect를 사용하기 위해서 플러그인 경로를 설정해야 합니다.

Kafka connect 폴더에서 ./etc/kafka/connect-distributed.properties 를 열고 맨 밑으로 가서 아래 경로로 변경합니다.

plugin.path=\C:\\Users\\ASUS\\IdeaProjects\\confluentinc-kafka-connect-jdbc-10.2.6\\lib

 

JDBC connector가 마리아DB를 사용하기 위해서 현재 사용중인 마리아DB 드라이버 정보가 필요합니다. 아래 경로로 이동하여 mariadb-java-client-2.7.3.jar 파일을 복사하여 kafka connect 폴더의 share/java/kafka로 옮겨줍니다.

 

 

 

Kafka source 등록합니다. 카프카 소스는 소스 시스템(예: 마리아DB의) 변경 내용을 카프카 토픽에게 보내줍니다.

 

Postman을 이용해서 kafka connect ( localhost:8083 )에 source를 등록하겠습니다. 아래 생성 내용을 자세히 보시면 어떤 설정을 가지고 있는지 알 수 있습니다.

{
    "name" : "my-source-connect",
    "config" : {
        "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url":"jdbc:mysql://localhost:3306/mydb",
        "connection.user":"root",
        "connection.password":"test1357",
        "mode": "incrementing",
        "incrementing.column.name" : "id",
        "table.whitelist":"users",
        "topic.prefix" : "my_topic_",
        "tasks.max" : "1"
    }
}

 

등록된 source connect를 확인할 수 있습니다.

 

마리아DB에 변경이 생기면 카프카에 토픽이 생기고 토픽 내부에 데이터가 생깁니다.

마리아DB의 users 테이블에 데이터를 넣어봅니다.

insert into users(user_id, pwd, name) values('user1',' test1111', 'User name');

 

카프카에 토픽을 조회해 보면, my_topic_users가 생겼습니다.

./bin/windows/kafka-topics.bat --bootstrap-server localhost:9092 --list

 

토픽에는 무슨 내용이 있는 확인해 보겠습니다. Kafka Console을 이용해서 확인하겠습니다. payload에 마리아DB에 추가한 데이터가 보입니다.

./bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my_topic_users --from-beginning

 

 

 

Kafka sink 등록합니다. 카프카 싱크는 카프카 토픽에 쌓인 데이터를 사용합니다.

users 데이터베이스에 변경을 주면, my_topic_users 데이터베이스에 자동으로 입력되도록 해보겠습니다.

 

Postman으로 sink connector를 생성하겠습니다. 상세 내용을 보면, my_topic_users 테이블을 만들고 자동으로 생성하겠다는 의미입니다.

{
    "name":"my-sink-connect",
    "config":{
        "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url":"jdbc:mysql://localhost:3306/mydb",
        "connection.user":"root",
        "connection.password":"test1357",
        "auto.create":"true",
        "auto.evolve":"true",
        "delete.enabled":"false",
        "tasks.max":"1",
        "topics":"my_topic_users"
    }
}

 

커넥터 목록을 보니 방금 만든 커넥터가 생성되어 있습니다. 

 

마리아DB를 보니 테이블이 하나 생성되어 있습니다.

 

내용을 보니, users 테이블과 같은 내용으로 채워져 있습니다.

 

새로운 데이터를 users 테이블에 넣어보겠습니다. 그러면 자동으로 my_topic_users 테이블에 데이터가 채워집니다.

 

Consumer console을 보면, 카프카로 보낸 데이터가 보입니다.

 

이번에는 Producer로 데이터를 직접 넣어보겠습니다. Kafka 폴더에서 아래 명령어를 실행하여 Producer console를 실행합니다. 보낼 데이터의 형식은 Consumer Console에서 보여지는 데이터 형식을 그대로 사용합니다.

./bin/windows/kafka-console-producer.bat --broker-list localhost:9092 --topic my_topic_users

 

마리아DB를 확인해 보겠습니다. Kafka에서 데이터를 받아서 사용하는 my_topic_users 테이블에는 데이터가 입력되었습니다. 하지만, Kafka에서 데이터를 받지 않는 users 테이블에는 데이터가 입력되지 않습니다.

 

 

끝.

반응형

'IT > Spring Cloud' 카테고리의 다른 글

CircuitBreaker  (0) 2022.01.30
Apache Kafka 활용 (1)  (0) 2022.01.23
Microservice간 통신 - FeignClient 방식  (0) 2022.01.07
Microservice간 통신 - RestTemplate 방식  (0) 2022.01.06
비대칭키 암호화  (0) 2022.01.05