์คํ๋ง ์นดํ์นด ์คํํ๊ธฐ
1. ์ํ์น ์ ๊ณต ์นดํ์นด ๋ค์ด๋ก๋ ๋ฐ ์์ถํ๊ธฐ
2. ์์ถ ํผ ํด๋๋ก ์ด๋ํด
2-1) Zookeeper ์คํ ~ ๊ธฐ๋ณธํฌํธ : 2181
bin>zookeeper-server-start.sh config/zookeeper.properties
2-2) Kafka ์๋ฒ ์คํ ~ ๊ธฐ๋ณธํฌํธ : 9092
bin>zookeeper-server-start.sh config/server.properties
(config/server.properties์์ ์ค์ ๋ณ๊ฒฝ ๊ฐ๋ฅ (์: listeners=PLAINTEXT://localhost:9092)
3. ํ ํฝ(๋ฐ์ดํฐ ๊ด๋ฆฌ ๋จ์) ์์ฑ
ex) ์ฑํ
์์คํ
์ฉ ํ ํฝ :
bin/kafka-topics.sh --create --topic chat-messages --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
--partitions: ๋ฐ์ดํฐ ๋ณ๋ ฌ ์ฒ๋ฆฌ ๋จ์.
--replication-factor: ๋ณต์ ๋ณธ ์ (ํด๋ฌ์คํฐ ํ๊ฒฝ์์ ์ฅ์ ๋ณต๊ตฌ์ฉ).
4. kafka ๋ฐฑ์๋ ํตํฉ
์ ํ๋ฆฌ์ผ์ด์
์ด ๋ฉ์ธ์ง๋ฅผ ๋ฐํ(Produce)ํ๊ณ ์๋น(Consume)ํ ์ ์์ด์ผ ํ๋ค.
4-1) ์์กด์ฑ ์ถ๊ฐ
4-2) Producer ๊ตฌํ
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
System.out.println("Sent to Kafka: " + message);
}
}
4-4) ์ปจ์๋จธ ๊ตฌํ
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=chat-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "chat-messages", groupId = "chat-group")
public void listen(String message) {
System.out.println("Received from Kafka: " + message);
// ๋ฉ์์ง ์ฒ๋ฆฌ ๋ก์ง ์ถ๊ฐ
}
}
4-5) ์ ํ๋ฆฌ์ผ์ด์
์ ๊ตฌํ
@Controller
public class ChatController {
private final KafkaProducerService kafkaProducerService;
private final SimpMessagingTemplate messagingTemplate;
public ChatController(KafkaProducerService kafkaProducerService, SimpMessagingTemplate messagingTemplate) {
this.kafkaProducerService = kafkaProducerService;
this.messagingTemplate = messagingTemplate;
}
@MessageMapping("/sendMessage")
public void sendMessage(@Payload String message) {
kafkaProducerService.sendMessage("chat-messages", message); // Kafka๋ก ์ ์ก
}
@KafkaListener(topics = "chat-messages", groupId = "chat-group")
public void listen(String message) {
messagingTemplate.convertAndSend("/topic/chatroom", message); // STOMP๋ก ํด๋ผ์ด์ธํธ์ ์ ๋ฌ
}
}
5. Kafka ํตํฉ์ ์ด์
ํ์ฅ์ฑ: Kafka์ ํํฐ์
๊ณผ ํด๋ฌ์คํฐ๋ฅผ ํ์ฉํด ์๋ฐฑ๋ง ๋ฉ์์ง ์ฒ๋ฆฌ ๊ฐ๋ฅ.
์์์ฑ: ๋ฉ์์ง๊ฐ ํ ํฝ์ ์ ์ฅ๋๋ฏ๋ก ์ฅ์ ๋ฐ์ ์ ๋ณต๊ตฌ ๊ฐ๋ฅ.
๋น๋๊ธฐ ์ฒ๋ฆฌ: ๋ฐฑ์๋์์ ๋ฉ์์ง๋ฅผ Kafka๋ก ๋ณด๋ด๊ณ , ์๋น์๊ฐ ํ์ํ ๋ ์ฒ๋ฆฌ.
์ ์ฐ์ฑ: ์ฑํ
์ธ์ ๋ก๊ทธ ๋ถ์, ์๋ฆผ ๋ฑ ์ถ๊ฐ ๊ธฐ๋ฅ ํ์ฅ ๊ฐ๋ฅ.
6. ๊ณ ๊ธ ์ค์ (์ ํ)
ํํฐ์
์ฆ๊ฐ: ์ฒ๋ฆฌ๋์ ๋์ด๊ธฐ ์ํด ํ ํฝ ํํฐ์
์ ์กฐ์ (--partitions).
๋ณต์ : ์ฅ์ ๋ณต๊ตฌ๋ฅผ ์ํด ๋ณต์ ๋ณธ ์ ์ฆ๊ฐ (--replication-factor).
Consumer Group: ์ฌ๋ฌ ์๋น์ ๊ทธ๋ฃน์ผ๋ก ๋ฉ์์ง ๋ถ์ฐ ์ฒ๋ฆฌ.
์คํ์
๊ด๋ฆฌ: auto-offset-reset์ latest๋ก ์ค์ ํด ์ต์ ๋ฉ์์ง๋ง ์๋น.
7. ํ
์คํธ
7-1) ํ ํฝ ์์ฑ ํ
์คํธ
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
7-2) ๋ฉ์ธ์ง ๋ฐํ ํ
์คํธ
bin/kafka-console-producer.sh --topic chat-messages --bootstrap-server localhost:9092
7-3) ๋ฉ์ธ์ง ์๋น ํ
์คํธ
bin/kafka-console-consumer.sh --topic chat-messages --from-beginning --bootstrap-server localhost:9092