关于KafkaTemplate与 @KafkaListener生产者与消费者功能的实现

news/2024/8/22 0:51:01 标签: kafka, spring boot

1.前言:

1.1关于生产者与消费者的详细介绍请查看另一篇文章:

使用JavaApi实现模拟Kafka的消息生产者与发送者icon-default.png?t=N7T8http://t.csdnimg.cn/ukNSU

 

1.2 本文使用 KafkaTemplate与 @KafkaListener实现生产者与消费者功能:

        Kafka 是一个流行的分布式流处理平台,广泛用于构建实时数据管道和流应用程序。在 Java 应用程序中,Spring Framework 提供了对 Kafka 的集成支持,通过 spring-kafka 模块实现。KafkaTemplate@KafkaListener 是 Spring Kafka 中的两个重要组件,它们分别用于发送消息和接收消息。

所需依赖:

!-- spring-kafka --> 
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.6.0</version>
</dependency>
<!-- kafka-clients --> 
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.0</version>
</dependency>

2. KafkaTemplate生产者发送消息到kafka

        KafkaTemplate 是Spring Kafka提供的一个高级抽象,用于简化Kafka消息的生产。它封装了KafkaProducer生产者客户端的复杂性,并提供了一系列发送消息的方法。

2.1 主要特性:

  1. 线程安全:KafkaTemplate 是线程安全的,可以在多个线程中共享使用。
  2. 消息发送:支持同步和异步发送消息。
  3. 消息类型:支持发送键值对(Key-Value)消息和仅值消息。
  4. 事务管理:支持在事务中发送消息。

2.2 常用方法:

  • send(String topic, V data):发送一个仅值消息到指定主题。
  • send(String topic, K key, V data):发送一个键值对消息到指定主题。
  • send(Message<?> message):发送一个Spring Kafka Message对象。
  • send(String topic, Integer partition, K key, V data):发送消息到指定主题的指定分区。
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String topic, String message) {
    kafkaTemplate.send(topic, message);
}

3.@KafkaListener消费者从kafka中获取消息进行消费

        @KafkaListener 是Spring Kafka提供的一个注解,它简化了消息的接收和处理。用于标记方法作为 Kafka 消费者来监听特定的主题。当消息到达时,被标记的方法将会被自动调用来处理这些消息。这使得消息的处理能够以事件驱动的方式进行,无需轮询或显式拉取。

3.1主要特性:

  1. 消息监听:可以监听一个或多个主题的消息。
  2. 分区监听:可以指定监听特定分区的消息。
  3. 消息类型:支持接收键值对(Key-Value)消息和仅值消息。
  4. 消息转换:支持将接收到的消息转换为具体的对象。

3.2常用属性:

  • topics:监听的Kafka主题,可以是多个。
  • topicPattern:监听的Kafka主题模式,支持正则表达式。
  • groupId:消费者组ID。
  • containerFactory:指定KafkaListenerContainerFactory,用于自定义消费者配置。
@KafkaListener(topics = "myTopic", groupId = "myGroup")
public void listen(String message) {
    System.out.println("Received message: " + message);
}

        在这个例子中,listen 方法将被自动调用,每当 Kafka 消费者从 "myTopic" 主题的 "myGroup" 消费者组接收到消息时。

4. 代码举例

        最近有个需求,记录不同系统之间的交互日志,使用拦截器将日志拦截,进行前置与后置处理,最终组装成json发送到kafka中,消费者消费kafka中的数据,最终将日志数据进行入库操作,前端进行展示。

        下面只将KafkaTemplate与 @KafkaListener举例出来,其他不过多展示。

4.1 生产者KafkaTemplate

@Slf4j
@Tag(name = "运营平台-系统日志管理")
@RestController
@RequestMapping("/log")
public class SystemInteractionLogController {

    @Autowired
    private SystemInteractionLogService systemInteractionLogService;
    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;
    @PostMapping("/page")
    @Operation(summary = "日志分页查询")
    //分组设计  根据messageId分类
    public Result<PageData<SystemInteractionLogVO>> page(@RequestBody LogQueryPageDTO dto) {
        return Result.success(systemInteractionLogService.queryPage(dto));
    }


    @PostMapping("/queryInfoById/{id}")
    @Operation(summary = "日志详情查询")
    public Result<SystemInteractionLogInfoVO> queryInfoById(@PathVariable Long id) {
        Validator.validateNotNull(id, "id不能为空");
        return Result.success(systemInteractionLogService.queryInfoById(id));
    }

    @PostMapping ("/testSendLogToKafka")
    @Operation(summary = "(测试使用)往kafka中发送数据")
    public void sendMessage(@RequestBody SystemInteractionLog systemInteractionLog) {
        String topicName = "system-interaction-logs";
        if(ObjectUtils.isEmpty(systemInteractionLog)){
            log.info("数据为空,不操作");
            return;
        }
        try {
            String message = JSONUtil.toJsonStr(systemInteractionLog);
            log.info("消息为:{}",message);
            kafkaTemplate.send(topicName, message);
        }catch (Exception e){
            log.error("消息发送失败:{}",e.getMessage());
        }
    }

}

4.2 消费者@KafkaListener

/**跨系统交互日志监听
 * @author ZhaoShuhao
 * @data 2024/7/13 9:46
 */
@Slf4j
@Component
public class SystemInteractionLogListener {
    @Resource
    private OperateApi operateApi;
 @KafkaListener(topics="system-interaction-logs",groupId="system-interaction-logs-groupId")
 public void receiveTask(String message, Acknowledgment ack) {
     try {
         log.info("接收到消息:{}", message);
         if(StringUtils.isNoneBlank(message)){
             SystemInteractionLogDto logDto  = JSONUtil.toBean(message, SystemInteractionLogDto.class);
              //保存入库操作
             Result<Boolean> result  = operateApi.saveLog(logDto);
             if(!ObjectUtils.isEmpty(result)){
                 if(result.getCode() == 200){
                     log.info("消息处理成功:{}", logDto);
                     //只有保存成功后才会确认接收到消息
                     ack.acknowledge();
                 }else {
                     log.info("消息保存失败:{}", logDto);
                 }
             }
         }
     }catch (Exception e) {
         //异常数据不做处理
         log.info("消息保存失败,参数为{}", message);
         log.error(ExceptionUtils.getStackTrace(e));
     }
 }

}


http://www.niftyadmin.cn/n/5556545.html

相关文章

C++函数(函数原型,指标、参考、预设、可变参数)第一部

函数原型 自订函数的定义需要放在main()或呼叫之前&#xff0c;如果放在main()或呼叫之后&#xff0c;例如 #include <iostream>int main() {do_something("Whats truth?");do_something("There is no spoon.");return 0; }void do_something(cha…

【重走编程路】设计模式概述(七) -- 外观模式、组合模式、享元模式

文章目录 前言10. 外观模式&#xff08;Facade&#xff09;定义解决方案为什么使用外观模式应用场景优缺点 11. 组合模式&#xff08;Composite&#xff09;定义解决方案应用场景优缺点 12. 享元模式&#xff08;Flyweight&#xff09;定义解决方案应用场景优缺点 前言 结构型…

智慧园区软件平台设计方案Word(1129页)

1. 智慧园区大数据平台的建设目的是通过智慧化手段提升园区吸引力、促进园区可持续发展、助力园区发展战略性新兴产业&#xff0c;并顺应信息技术创新与应用趋势。平台的基础设施建设包括管委会大楼智能化、园区主干网敷设、应急指挥中心等&#xff0c;以实现对园区各项活动的智…

Android系统上常见的性能优化工具

Android系统上常见的性能优化工具 在Android系统开发中&#xff0c;性能优化是一个重要的任务&#xff0c;有许多工具可以帮助你进行各种方面的性能分析和优化。以下是一些常见的Android性能优化工具及其用途和使用方法&#xff1a; 1. Android Studio Profiler 功能: 提供CP…

小程序-模板与配置

一、WXML模板语法 1.数据绑定 2.事件绑定 什么是事件 小程序中常用的事件 事件对象的属性列表 target和currentTarget的区别 bindtap的语法格式 在事件处理函数中为data中的数据赋值 事件传参 &#xff08;以下为错误示例&#xff09; 3.事件传参与数据同步 4.条件渲染 …

C++ 类和对象(A)

一、类与对象的初步认识 1.类是对象的抽象&#xff0c;而对象是类的具体实例。 类是抽象的&#xff0c;不占用内存&#xff1b;而对象是具体的&#xff0c;占用存储空间。 2.面向过程与面向对象 C语言是面向过程的&#xff0c;关注的是过程中的数据与方法。 C是面向对象的&…

【JVM】JVM实战笔记-随笔

JVM实战笔记-随笔 前言字节码如何查看字节码文件jclasslibJavapArthasArthurs监控面板Arthus查看字节码信息 内存调优内存溢出的常见场景解决内存溢出发现问题Top命令VisualVMArthas使用案例 Prometheus Grafana案例 堆内存情况对比内存泄漏的原因:代码中的内存泄漏并发请求问…

【Linux】Linux操作系统

Linux基本指令 os概念与定位 本节内容&#xff1a; Linux操作系统讲解 os概念与定位 操作系统&#xff08;Operating System&#xff0c;简称OS&#xff09;是管理和控制计算机硬件与软件资源的计算机程序。总的来讲&#xff0c;操作系统是一款做软硬件管理的软件。 了解操作…