SpringBoot集成Kafka

Apache Kafka 是一个分布式流处理平台,广泛应用于实时数据处理、日志聚合、流式分析等场景。Spring Boot 提供了简便的方式来集成 Kafka,使得我们可以快速构建 Kafka 生产者和消费者应用。

基本使用

  • 导入依赖:

    1
    2
    3
    4
    <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    </dependency>
  • 编写配置文件:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    spring:
    kafka:
    bootstrap-servers: ip1:9092
    consumer:
    group-id: test
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.apache.kafka.common.serialization.StringSerializer
  • 创建生产者:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @RestController
    public class ProducerController {
    @Autowired
    KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/hi")
    public String data(String msg){
    // 发送消息
    kafkaTemplate.send("test",msg);
    return "ok";
    }
    }
  • 创建具有回调函数的生产者:

    • 方式一:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      @GetMapping("/kafka/callbackOne/{message}")
      public void sendMessage2(@PathVariable("message") String callbackMessage) {
      kafkaTemplate.send("test", callbackMessage).addCallback(success -> {
      // 消息发送到的topic
      String topic = success.getRecordMetadata().topic();
      // 消息发送到的分区
      int partition = success.getRecordMetadata().partition();
      // 消息在分区内的offset
      long offset = success.getRecordMetadata().offset();
      System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
      }, failure -> {
      System.out.println("发送消息失败:" + failure.getMessage());
      });
      }
    • 方式二:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      @GetMapping("/kafka/callbackTwo/{message}")
      public void sendMessage3(@PathVariable("message") String callbackMessage) {
      kafkaTemplate.send("test", callbackMessage).addCallback(
      (ListenableFutureCallback<? super SendResult<String, String>>) new ListenableFutureCallback<SendResult<String, Object>>() {
      @Override
      public void onFailure(Throwable ex) {
      System.out.println("发送消息失败:"+ex.getMessage());
      }

      @Override
      public void onSuccess(SendResult<String, Object> result) {
      System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
      + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
      }
      });
      }
  • 创建消费者:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @Configuration
    public class KafkaConfiguration{

    // 监听kafka的test主题
    @KafkaListener(topics = {"test"})
    public void message1(ConsumerRecord<?, ?> record){
    // 消费的哪个topic、partition的消息,打印出消息内容
    System.out.println("点对点消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
    }

    }

    总结

通过上述步骤,我们已经成功地在 Spring Boot 项目中集成了 Kafka,并实现了基本的生产和消费功能。这只是 Kafka 集成 Spring Boot 的基础,实际应用中可能需要根据业务需求进行更复杂的配置和处理。