TIL, WIL/TIL📘

03. 07 TIL : 카프카 세팅 및 실행, 테스트

wannaDevelopIt 2025. 3. 7. 17:45
728x90

스프링 카프카 실행하기

1. 아파치 제공 카프카 다운로드 및 압축풀기

2. 압축 푼 폴더로 이동해
2-1) Zookeeper 실행 ~ 기본포트 : 2181

bin>zookeeper-server-start.sh config/zookeeper.properties

 

 

2-2) Kafka 서버 실행 ~ 기본포트 : 9092

bin>zookeeper-server-start.sh config/server.properties

 

(config/server.properties에서 설정 변경 가능 (예: listeners=PLAINTEXT://localhost:9092)

3. 토픽(데이터 관리 단위) 생성
ex) 채팅시스템용 토픽 : 

bin/kafka-topics.sh --create --topic chat-messages --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

 

--partitions: 데이터 병렬 처리 단위.
--replication-factor: 복제본 수 (클러스터 환경에서 장애 복구용).

4. kafka 백엔드 통합
애플리케이션이 메세지를 발행(Produce)하고 소비(Consume)할 수 있어야 한다.
4-1) 의존성 추가

4-2) Producer 구현

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

 

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
        System.out.println("Sent to Kafka: " + message);
    }
}


4-4) 컨슈머 구현

spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=chat-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

 

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "chat-messages", groupId = "chat-group")
    public void listen(String message) {
        System.out.println("Received from Kafka: " + message);
        // 메시지 처리 로직 추가
    }
}


4-5) 애플리케이션에 구현

@Controller
public class ChatController {

    private final KafkaProducerService kafkaProducerService;
    private final SimpMessagingTemplate messagingTemplate;

    public ChatController(KafkaProducerService kafkaProducerService, SimpMessagingTemplate messagingTemplate) {
        this.kafkaProducerService = kafkaProducerService;
        this.messagingTemplate = messagingTemplate;
    }

    @MessageMapping("/sendMessage")
    public void sendMessage(@Payload String message) {
        kafkaProducerService.sendMessage("chat-messages", message); // Kafka로 전송
    }

    @KafkaListener(topics = "chat-messages", groupId = "chat-group")
    public void listen(String message) {
        messagingTemplate.convertAndSend("/topic/chatroom", message); // STOMP로 클라이언트에 전달
    }
}


5. Kafka 통합의 이점
확장성: Kafka의 파티션과 클러스터를 활용해 수백만 메시지 처리 가능.
영속성: 메시지가 토픽에 저장되므로 장애 발생 시 복구 가능.
비동기 처리: 백엔드에서 메시지를 Kafka로 보내고, 소비자가 필요할 때 처리.
유연성: 채팅 외에 로그 분석, 알림 등 추가 기능 확장 가능.

6. 고급 설정 (선택)
파티션 증가: 처리량을 높이기 위해 토픽 파티션 수 조정 (--partitions).
복제: 장애 복구를 위해 복제본 수 증가 (--replication-factor).
Consumer Group: 여러 소비자 그룹으로 메시지 분산 처리.
오프셋 관리: auto-offset-reset을 latest로 설정해 최신 메시지만 소비.

7. 테스트
7-1) 토픽 생성 테스트

bin/kafka-topics.sh --list --bootstrap-server localhost:9092


7-2) 메세지 발행 테스트

bin/kafka-console-producer.sh --topic chat-messages --bootstrap-server localhost:9092


7-3) 메세지 소비 테스트

bin/kafka-console-consumer.sh --topic chat-messages --from-beginning --bootstrap-server localhost:9092
728x90