
Spring Boot + Kafka
2021, Sep 08
Hướng dẫn tích hợp Kafka trong Spring Boot application
Configuration
- Thêm dependency trong pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
- Thêm cấu hình kafa trong file application.yml
spring:
kafka:
bootstrap-servers: "localhost:9093"
listener:
missing-topics-fatal: false
Write a producer
- Sử dụng bean KafkaTemplate để gửi một message lên topic UserMessage của Kafka
@Autowired
KafkaTemplate<String, String> userKafka;
@Override
public void sendUserMesage(UserMessage message) {
userKafka.send("UserMessage", GsonUtils.toJson(message))
.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("Kafka sent message='{}' with offset={}", message,
result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
log.error("Kafka unable to send message='{}'", message, ex);
}
});
}
Write a consumer
@Autowired
UserUseCase userUseCase;
@KafkaListener(topics = "UserMessage", groupId = "example")
public void consume(ConsumerRecord<String, String> record) {
try {
log.info(
"Consumed - Partition: {} - Offset: {} - Value: {}",
record.partition(),
record.offset(),
record.value());
UserMessage userMessage = GsonUtils.fromJson(record.value(), UserMessage.class);
userUseCase.goodbye(userMessage);
} catch (Exception ex) {
log.error("Exception - Reason:", ex);
}
}
Source code ở đây