Spring Boot + Kafka

Spring Boot + Kafka

2021, Sep 08    

Hướng dẫn tích hợp Kafka trong Spring Boot application

Configuration

  • Thêm dependency trong pom.xml
<dependencies>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
</dependencies>
  • Thêm cấu hình kafa trong file application.yml
spring:
  kafka:
    bootstrap-servers: "localhost:9093"
    listener:
      missing-topics-fatal: false

Write a producer

  • Sử dụng bean KafkaTemplate để gửi một message lên topic UserMessage của Kafka
    @Autowired
    KafkaTemplate<String, String> userKafka;
  
    @Override
    public void sendUserMesage(UserMessage message) {
      userKafka.send("UserMessage", GsonUtils.toJson(message))
          .addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
  
            @Override
            public void onSuccess(SendResult<String, String> result) {
              log.info("Kafka sent message='{}' with offset={}", message,
                  result.getRecordMetadata().offset());
            }
  
            @Override
            public void onFailure(Throwable ex) {
              log.error("Kafka unable to send message='{}'", message, ex);
            }
          });
    }

Write a consumer

  @Autowired
  UserUseCase userUseCase;
  
  @KafkaListener(topics = "UserMessage", groupId = "example")
  public void consume(ConsumerRecord<String, String> record) {
    try {
      log.info(
          "Consumed - Partition: {} - Offset: {} - Value: {}",
          record.partition(),
          record.offset(),
          record.value());
  
      UserMessage userMessage = GsonUtils.fromJson(record.value(), UserMessage.class);
      userUseCase.goodbye(userMessage);
  
    } catch (Exception ex) {
      log.error("Exception - Reason:", ex);
    }
  }    

Source code ở đây