Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
Tags
- saga pattern
- Parallel Old GC
- spring cloud
- JPA
- Action Pattern
- 디자인패턴
- Spring Cloud Netfilx Eureka
- zipkin
- Resilinece4j
- Transaction Pattern
- 생산자 소비자 패턴
- 스레드
- MSA
- Spring Boot Actuator
- 배치
- Java
- 디자인 패턴
- The law of Demeter
- TypeScript
- 알고리즘
- java 정렬
- 스프링 배치
- Serial GC
- 타입스크립트
- thread
- spring batch
- 멀티스레드
- 사가 패턴
- 키클락
- 체인 패턴
Archives
- Today
- Total
PSD( Private-Self-Development )
Kafka( 카프카 ) 본문
Kafka?
rabbit MQ 와 같은 메시지 브로커
- 분산형 스트리밍 플랫폼
- 대용량의 데이터를 빠르게 처리 가능한 메시징 시스템
- 초당 100k+ 이상의 이벤트 처리
- Pub/Sub, Topic에 메시지 전달
- Ack를 기다리지 않고 전달 가능
- 생산자 중심
Kafka 의 구성
- Zookeeper
- 메타데이터(Broker Id, Controller Id 등) 저장
- 브로커들을 중재한다
- 어떠한 브로커 가 있는지 역할은 무엇인지 에 대한 정보를
- 보통 3개 이상의 Broker 로 클러스터 구성
- 1대는 Controller 기능 수행
- Contoller
- 각 Broker 에게 담담 파티션 할당
- Broker 정상 동작 모니터링 관리
사용
- kafka 설치
- Zookeeper 구동
- $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
- kafka 구동
- $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
- 기본 9092 포트번호로 설정된다
- Topic 생성
- $KAFKA_HOME/bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 --partitions 1
- -- create : 생성하겠다
- --topic quickstart-events : quickstart-events 으로 토픽 이름 지정
- --bootstrap-server localhost:9092 : kafka 서버 포트 설정
- --partitions 1 : 멀티 클러스터링 설정 시, 토픽 내부에 몇 개로 나누어 저장 하려는가 를 의미하는 파티션 설정
- $KAFKA_HOME/bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 --partitions 1
- Topic 목록 확인
- $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
- 해당 Kafka 서버가 가지고 있는 토픽의 리스트를 확인
- $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
- Topic 정보 확인
- $KAFKA_HOME/bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
- 설명을 확인한다 어떤 서버의 어떤 토픽에 대한
- $KAFKA_HOME/bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
- 메시지 생산
- $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic quickstart-events
- 메시지 보낼 주소 설정
- $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic quickstart-events
- 메시지 소비
- $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic quickstart-events --from-beginning
- 메시지 받을 주소 설정
- --from-beginning : 이전 데이터도 다 받아옴
- $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic quickstart-events --from-beginning
Kafka Connect
- 자유롭게 Data를 Import/Export 하도록 해준다
- 단일, 분산 모드 지원
- Restful Api 통해 지원
- 사용 할 수 있는, 사용중인 커넥터를 조회
- 새로운 커넥터 생성, 커넥터 변경, 삭제
- Stream 또는 Batch 형태로 데이터 전송 가능
- 커스텀 Connector 를 통한 다양한 Plugin 제공 (FILE, S3, Mysql 등...)
- Rest API
- [POST]/connectors : 생성
- [GET]/connectors : Connect 리스트 조회
- [GET]/connectors/{CONNECT_NAME}/status : Connect 상태 조회
- [DELETE]/connectors/{CONNECT_NAME} : Connect 삭제
구성
- Kafka Connect Source
- 특정 리소스 (mysql 등) 에서 데이터를 가져오는 쪽
- 가져와서 kafka 클러스터에 저장한다.
- Kafka Connect Sink
- 데이터를 보내는 쪽
- kafka 클러스터의 데이터를 기타 시스템에(s3 등) 에 보낸다
사용
- 카프카 커넥터 설치
- jdbc Connector 설치
- 카프카 커넥터의 프로퍼티 파일(./etc/kafka/connect-distributed.properties)에 jdbc Connector Lib 경로 추가
- 카프카 커넥터에 mariadb-java-client.jar 파일 ./share/java/kafka 에 복사
- 구동
- Kafka 서버가 먼저 구동 되어 있어야 한다.
- $KAFKA_CONNECT_HOME/bin/connect-distributed $KAFKA_CONNECT_HOME/etc/kafka/connect-distributed.properties
- 기본 8083 포트번호로 설정된다
- Kafka Connect Source 추가
- echo '
{
"name" : "my-source-connect",
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mariadb://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"test1234",
"mode": "incrementing",
"incrementing.column.name" : "id",
"table.whitelist":"mydb.users",
"topic.prefix" : "my_topic_",
"tasks.max" : "1"
}
}
' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"- connector.class : DB 대상용 이다
- mode : incrementing => 데이터 추가 할 때 상승 시키겠다
- incrementing.column.name : 상승 시킬 컬럼의 이름
- table.whitelist : 테이블 명을 설정하면 해당 테이블에 변경사항이 있을 경우 가져와 kafka 클러스터에 저장하겠다
- echo '
- Kafka Connect Sink 추가
- echo '
{
"name":"my-sink-connect",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mariadb://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"test1234",
"auto.create":"true",
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"my_topic_users"
}
}
'| curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"
- topics 에 설정된 토픽명으로 데이터를 보낸다
- echo '
JAVA 사용
1. 디펜던시 추가
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2. Config 구현
// Producer
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory(){
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // 카프카 서버 IP
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 키값 데이터 직렬화 뭐로 할래
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 값 데이터 직렬화 뭐로 할래
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
// Consumer
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory(){
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // 카프카 서버 IP
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId"); // 속할 컨슈머 그룹의 ID
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 키값 데이터 역직렬화 뭐로 할래
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 값 데이터 역직렬화 뭐로 할래
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
3. 서비스 구현
// Producer
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String,String> kafkaTemplate;
public OrderDto send(String kafkaTopic, OrderDto orderDto){
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(orderDto);
} catch (JsonProcessingException e){
log.error("json parse error", e);
}
kafkaTemplate.send(kafkaTopic, jsonInString);
log.info("kafka producer send data from the Order ms : " + orderDto);
return orderDto;
}
}
// Producer (Kafka Connect SINK 에 DB 이벤트 발생 시키기)
@Service
@Slf4j
@RequiredArgsConstructor
public class OrderProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
List<Field> fields = Arrays.asList(new Field("string", true, "order_id"),
new Field("string", true, "user_id"),
new Field("string", true, "product_id"),
new Field("int32", true, "qty"),
new Field("int32", true, "unit_price"),
new Field("int32", true, "total_price"));
Schema schema = Schema.builder()
.type("struct")
.fields(fields)
.optional(false)
.name("orders")
.build();
public OrderDto send(String topic, OrderDto orderDto) {
Payload payload = Payload.builder()
.order_id(orderDto.getOrderId())
.user_id(orderDto.getUserId())
.product_id(orderDto.getProductId())
.qty(orderDto.getQty())
.unit_price(orderDto.getUnitPrice())
.total_price(orderDto.getTotalPrice())
.build();
KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(kafkaOrderDto);
} catch(JsonProcessingException ex) {
ex.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("Order Producer sent data from the Order microservice: " + kafkaOrderDto);
return orderDto;
}
}
// Consumer
@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaConsumer {
private final CatalogRepository catalogRepository;
@KafkaListener(topics = "example-order-topic")
public void processMessage(String kafkaMessage){
....
}
}
'Backend > MSA' 카테고리의 다른 글
MSA 모니터링 (0) | 2024.04.29 |
---|---|
MSA 장애 처리 및 분산 추적 (0) | 2024.04.23 |
Feign Client (1) | 2024.04.19 |
Spring Cloud Config (0) | 2024.04.16 |
API Gateway (0) | 2024.03.26 |