스프링 카프카 실행하기
1. 아파치 제공 카프카 다운로드 및 압축풀기
2. 압축 푼 폴더로 이동해
2-1) Zookeeper 실행 ~ 기본포트 : 2181
bin>zookeeper-server-start.sh config/zookeeper.properties
2-2) Kafka 서버 실행 ~ 기본포트 : 9092
bin>zookeeper-server-start.sh config/server.properties
(config/server.properties에서 설정 변경 가능 (예: listeners=PLAINTEXT://localhost:9092)
3. 토픽(데이터 관리 단위) 생성
ex) 채팅시스템용 토픽 :
bin/kafka-topics.sh --create --topic chat-messages --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
--partitions: 데이터 병렬 처리 단위.
--replication-factor: 복제본 수 (클러스터 환경에서 장애 복구용).
4. kafka 백엔드 통합
애플리케이션이 메세지를 발행(Produce)하고 소비(Consume)할 수 있어야 한다.
4-1) 의존성 추가
4-2) Producer 구현
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
System.out.println("Sent to Kafka: " + message);
}
}
4-4) 컨슈머 구현
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=chat-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "chat-messages", groupId = "chat-group")
public void listen(String message) {
System.out.println("Received from Kafka: " + message);
// 메시지 처리 로직 추가
}
}
4-5) 애플리케이션에 구현
@Controller
public class ChatController {
private final KafkaProducerService kafkaProducerService;
private final SimpMessagingTemplate messagingTemplate;
public ChatController(KafkaProducerService kafkaProducerService, SimpMessagingTemplate messagingTemplate) {
this.kafkaProducerService = kafkaProducerService;
this.messagingTemplate = messagingTemplate;
}
@MessageMapping("/sendMessage")
public void sendMessage(@Payload String message) {
kafkaProducerService.sendMessage("chat-messages", message); // Kafka로 전송
}
@KafkaListener(topics = "chat-messages", groupId = "chat-group")
public void listen(String message) {
messagingTemplate.convertAndSend("/topic/chatroom", message); // STOMP로 클라이언트에 전달
}
}
5. Kafka 통합의 이점
확장성: Kafka의 파티션과 클러스터를 활용해 수백만 메시지 처리 가능.
영속성: 메시지가 토픽에 저장되므로 장애 발생 시 복구 가능.
비동기 처리: 백엔드에서 메시지를 Kafka로 보내고, 소비자가 필요할 때 처리.
유연성: 채팅 외에 로그 분석, 알림 등 추가 기능 확장 가능.
6. 고급 설정 (선택)
파티션 증가: 처리량을 높이기 위해 토픽 파티션 수 조정 (--partitions).
복제: 장애 복구를 위해 복제본 수 증가 (--replication-factor).
Consumer Group: 여러 소비자 그룹으로 메시지 분산 처리.
오프셋 관리: auto-offset-reset을 latest로 설정해 최신 메시지만 소비.
7. 테스트
7-1) 토픽 생성 테스트
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
7-2) 메세지 발행 테스트
bin/kafka-console-producer.sh --topic chat-messages --bootstrap-server localhost:9092
7-3) 메세지 소비 테스트
bin/kafka-console-consumer.sh --topic chat-messages --from-beginning --bootstrap-server localhost:9092
'TIL, WIL > TIL📘' 카테고리의 다른 글
02. 28 TIL : Statc 상태 값 선언(직접 참조) vs RDB 데이터 쿼리를 통한 조회 (0) | 2025.02.28 |
---|---|
02. 26 TIL : Collectors.groupingBy() 외 ~ List<Object> list를 특정 요소로 mapping (0) | 2025.02.26 |
02. 24 TIL : RDB 설계 간 인덱스를 위한 PK 순서 (0) | 2025.02.24 |
02. 19 TIL : GPT API 도입 간 참고할 점 : 웹 검색에 대해 (1) | 2025.02.19 |
01. 06 TIL : YEARWEEK()의 연말 연초 값 이슈 (0) | 2025.01.06 |