PSD( Private-Self-Development )

Kafka( 카프카 ) 본문

Backend/MSA

Kafka( 카프카 )

chjysm 2024. 4. 22. 17:01

Kafka?

rabbit MQ 와 같은 메시지 브로커  

  • 분산형 스트리밍 플랫폼
  • 대용량의 데이터를 빠르게 처리 가능한 메시징 시스템 
  • 초당 100k+ 이상의 이벤트 처리
  • Pub/Sub,  Topic에 메시지 전달
  • Ack를 기다리지 않고 전달 가능 
  • 생산자 중심 

 

Kafka 의 구성

  • Zookeeper
    • 메타데이터(Broker Id, Controller Id 등) 저장
    • 브로커들을 중재한다
    • 어떠한 브로커 가 있는지 역할은 무엇인지 에 대한 정보를 
  • 보통 3개 이상의 Broker 로 클러스터 구성
    • 1대는 Controller 기능 수행 
    • Contoller 
      • 각 Broker 에게 담담 파티션 할당 
      • Broker 정상 동작 모니터링 관리

사용 

  1. kafka 설치 
  2. Zookeeper 구동
    1. $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
  3. kafka 구동 
    1. $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
    2. 기본 9092 포트번호로 설정된다 
  4. Topic 생성 
    1. $KAFKA_HOME/bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 --partitions 1 
      1. -- create : 생성하겠다
      2. --topic quickstart-events : quickstart-events 으로 토픽 이름 지정
      3.  --bootstrap-server localhost:9092 : kafka 서버 포트 설정 
      4. --partitions 1 : 멀티 클러스터링 설정 시, 토픽 내부에 몇 개로 나누어 저장 하려는가 를 의미하는 파티션 설정 
  5. Topic 목록 확인 
    1. $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
      1. 해당 Kafka 서버가 가지고 있는 토픽의 리스트를 확인 
  6. Topic 정보 확인 
    1. $KAFKA_HOME/bin/kafka-topics.sh --describe --topic quickstart-events  --bootstrap-server localhost:9092
      1. 설명을 확인한다 어떤 서버의 어떤 토픽에 대한 
  7. 메시지 생산 
    1. $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic quickstart-events
      1. 메시지 보낼 주소 설정
  8. 메시지 소비
    1. $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic quickstart-events --from-beginning
      1. 메시지 받을 주소 설정
      2. --from-beginning : 이전 데이터도 다 받아옴

 

Kafka Connect

  • 자유롭게 Data를 Import/Export 하도록 해준다
  • 단일, 분산 모드 지원
  • Restful Api 통해 지원
    • 사용 할 수 있는,  사용중인 커넥터를 조회
    • 새로운 커넥터 생성, 커넥터 변경, 삭제 
  • Stream 또는 Batch 형태로 데이터 전송 가능 
  • 커스텀 Connector 를 통한 다양한 Plugin 제공 (FILE, S3, Mysql 등...)
  • Rest API 
    1. [POST]/connectors : 생성 
    2. [GET]/connectors : Connect 리스트 조회
    3. [GET]/connectors/{CONNECT_NAME}/status : Connect 상태 조회
    4. [DELETE]/connectors/{CONNECT_NAME} : Connect 삭제

구성 

  • Kafka Connect Source 
    • 특정 리소스 (mysql 등) 에서 데이터를 가져오는 쪽
    • 가져와서 kafka 클러스터에 저장한다.
  • Kafka Connect Sink
    • 데이터를 보내는 쪽
    • kafka 클러스터의 데이터를 기타 시스템에(s3 등) 에 보낸다

사용 

  1. 카프카 커넥터 설치
    1. curl -O http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz
  2.  jdbc Connector 설치 
  3. 카프카 커넥터의 프로퍼티 파일(./etc/kafka/connect-distributed.properties)에  jdbc Connector Lib 경로 추가 
  4. 카프카 커넥터에 mariadb-java-client.jar 파일 ./share/java/kafka 에 복사 
  5. 구동 
    1. Kafka 서버가 먼저 구동 되어 있어야 한다.
    2. $KAFKA_CONNECT_HOME/bin/connect-distributed $KAFKA_CONNECT_HOME/etc/kafka/connect-distributed.properties
    3. 기본 8083 포트번호로 설정된다
  6. Kafka Connect Source 추가 
    1. echo '
      {
      "name" : "my-source-connect",
      "config" : {
      "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
      "connection.url":"jdbc:mariadb://localhost:3306/mydb",
      "connection.user":"root",
      "connection.password":"test1234",
      "mode": "incrementing",
      "incrementing.column.name" : "id",
      "table.whitelist":"mydb.users",
      "topic.prefix" : "my_topic_",
      "tasks.max" : "1"
      }
      }
      ' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"
      1. connector.class : DB 대상용 이다
      2. mode : incrementing => 데이터 추가 할 때 상승 시키겠다
      3. incrementing.column.name : 상승 시킬 컬럼의 이름
      4. table.whitelist : 테이블 명을 설정하면 해당 테이블에 변경사항이 있을 경우 가져와 kafka 클러스터에 저장하겠다
  7. Kafka Connect Sink 추가 
    1. echo '
      {
      "name":"my-sink-connect",
      "config":{
      "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url":"jdbc:mariadb://localhost:3306/mydb",
      "connection.user":"root",
      "connection.password":"test1234",
      "auto.create":"true",
      "auto.evolve":"true",
      "delete.enabled":"false",
      "tasks.max":"1",
      "topics":"my_topic_users"
      }
      }
      '| curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"
      1. topics 에 설정된 토픽명으로 데이터를 보낸다

 

JAVA 사용

1.  디펜던시 추가

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

 

2. Config 구현

// Producer
@EnableKafka
@Configuration
public class KafkaProducerConfig {
    @Bean
    public ProducerFactory<String, String> producerFactory(){
        Map<String, Object> properties = new HashMap<>();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // 카프카 서버 IP
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 키값 데이터 직렬화 뭐로 할래
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 값 데이터 직렬화 뭐로 할래

        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }
}

// Consumer
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String, String> consumerFactory(){
        Map<String, Object> properties = new HashMap<>();

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // 카프카 서버 IP
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId"); // 속할 컨슈머 그룹의 ID
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 키값 데이터 역직렬화 뭐로 할래
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 값 데이터 역직렬화 뭐로 할래

        return new DefaultKafkaConsumerFactory<>(properties);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
        return kafkaListenerContainerFactory;
    }
}

 

3. 서비스 구현

// Producer
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {
    private final KafkaTemplate<String,String> kafkaTemplate;

    public OrderDto send(String kafkaTopic, OrderDto orderDto){
        ObjectMapper mapper = new ObjectMapper();
        String jsonInString = "";
        try {
            jsonInString = mapper.writeValueAsString(orderDto);
        } catch (JsonProcessingException e){
            log.error("json parse error", e);
        }

        kafkaTemplate.send(kafkaTopic, jsonInString);
        log.info("kafka producer send data from the Order ms : " + orderDto);

        return orderDto;
    }
}

// Producer (Kafka Connect SINK 에 DB 이벤트 발생 시키기)
@Service
@Slf4j
@RequiredArgsConstructor
public class OrderProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;

    List<Field> fields = Arrays.asList(new Field("string", true, "order_id"),
            new Field("string", true, "user_id"),
            new Field("string", true, "product_id"),
            new Field("int32", true, "qty"),
            new Field("int32", true, "unit_price"),
            new Field("int32", true, "total_price"));
    Schema schema = Schema.builder()
            .type("struct")
            .fields(fields)
            .optional(false)
            .name("orders")
            .build();

    public OrderDto send(String topic, OrderDto orderDto) {
        Payload payload = Payload.builder()
                .order_id(orderDto.getOrderId())
                .user_id(orderDto.getUserId())
                .product_id(orderDto.getProductId())
                .qty(orderDto.getQty())
                .unit_price(orderDto.getUnitPrice())
                .total_price(orderDto.getTotalPrice())
                .build();

        KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);

        ObjectMapper mapper = new ObjectMapper();
        String jsonInString = "";
        try {
            jsonInString = mapper.writeValueAsString(kafkaOrderDto);
        } catch(JsonProcessingException ex) {
            ex.printStackTrace();
        }

        kafkaTemplate.send(topic, jsonInString);
        log.info("Order Producer sent data from the Order microservice: " + kafkaOrderDto);

        return orderDto;
    }
}


// Consumer
@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaConsumer {
    private final CatalogRepository catalogRepository;

    @KafkaListener(topics = "example-order-topic")
    public void processMessage(String kafkaMessage){
        ....
    }
}

 

'Backend > MSA' 카테고리의 다른 글

MSA 모니터링  (0) 2024.04.29
MSA 장애 처리 및 분산 추적  (0) 2024.04.23
Feign Client  (1) 2024.04.19
Spring Cloud Config  (0) 2024.04.16
API Gateway  (0) 2024.03.26