SpringBoot集成kafka
SpringBoot集成Kafka
Apache Kafka 是一个分布式流处理平台,广泛应用于实时数据处理、日志聚合、流式分析等场景。Spring Boot 提供了简便的方式来集成 Kafka,使得我们可以快速构建 Kafka 生产者和消费者应用。
基本使用
导入依赖:
1
2
3
4<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>编写配置文件:
1
2
3
4
5
6
7
8
9
10spring:
kafka:
bootstrap-servers: ip1:9092
consumer:
group-id: test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer创建生产者:
1
2
3
4
5
6
7
8
9
10
11
12
public class ProducerController {
KafkaTemplate<String, String> kafkaTemplate;
public String data(String msg){
// 发送消息
kafkaTemplate.send("test",msg);
return "ok";
}
}创建具有回调函数的生产者:
方式一:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void sendMessage2( String callbackMessage){
kafkaTemplate.send("test", callbackMessage).addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
}, failure -> {
System.out.println("发送消息失败:" + failure.getMessage());
});
}方式二:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void sendMessage3( String callbackMessage){
kafkaTemplate.send("test", callbackMessage).addCallback(
(ListenableFutureCallback<? super SendResult<String, String>>) new ListenableFutureCallback<SendResult<String, Object>>() {
public void onFailure(Throwable ex) {
System.out.println("发送消息失败:"+ex.getMessage());
}
public void onSuccess(SendResult<String, Object> result) {
System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
}
});
}
创建消费者:
1
2
3
4
5
6
7
8
9
10
11
public class KafkaConfiguration{
// 监听kafka的test主题
public void message1(ConsumerRecord<?, ?> record){
// 消费的哪个topic、partition的消息,打印出消息内容
System.out.println("点对点消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
}总结
通过上述步骤,我们已经成功地在 Spring Boot 项目中集成了 Kafka,并实现了基本的生产和消费功能。这只是 Kafka 集成 Spring Boot 的基础,实际应用中可能需要根据业务需求进行更复杂的配置和处理。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 goMars的学习随记!
评论