第十章:服务器消费者管理模块

news/2025/2/26 20:19:03

目录

第一节:代码实现

        1-1.Consumer类

        1-2.QueueConsumer类

         1-3.QueueConsumerManger类

第二节:单元测试

下期预告:


        服务器的消费者管理模块在mqserver目录下实现。

第一节:代码实现

        创建一个名为mq_consumer.hpp的文件,打开并做好前置工作:

#ifndef __M_CONSUMER_H__
#define __M_CONSUMER_H__
#include "../mqcommon/mq_logger.hpp"
#include "../mqcommon/mq_helper.hpp"
#include "../mqcommon/mq_msg.pb.h"
// 以消息队列为单元管理消费者
#include <iostream>
#include <vector>
#include <unordered_map>
#include <mutex>
#include <functional> 

namespace zd
{};

#endif

        1-1.Consumer类

        要管理消费者,首先要有消费者,定义class Consumer类,它的实现如下:

    // 消费者的回调函数类型
    using ConsumerCallback = std::function<void(const std::string,const BasicProperties*,const std::string)>;

    class Consumer
    {
        public:
            using ptr = std::shared_ptr<Consumer>;

            std::string tag;            // 消费者唯一标识
            std::string qname;          // 消费者订阅的队列名称
            bool auto_ack;              // 自动确认标志
            ConsumerCallback callback;  // 订阅队列收到消息后调用,作用是推送消息给消费者

            Consumer(){}
            Consumer(const std::string& ctag,const std::string& queue_name,bool ack_flag,const ConsumerCallback& cb):
            tag(ctag),
            qname(queue_name),
            auto_ack(ack_flag),
            callback(cb)
            {}
    };

        tag:消费者的唯一标识,由用户设置

        qname:消费者订阅的队列

        auto_ack:自动确认标志,如果设置为true,服务器再推送完消息后会直接删除消息,不等待消费者的确认请求。

        callback:消费者对消息的处理函数,在服务端它的功能是固定的:将消息发送给对应的客户端消费者,因为服务器的消费者并不是真正的消费者,客户端消费者才是真正的消费者。而客户端消费者的消息处理函数才由用户自己定义。

        1-2.QueueConsumer类

        这个类用来管理一个队列的所有订阅者, 而且当一条消息到来时,不是所有队列的订阅者都能获得,只有队列当前轮询的一个消费者可以获得这条信息,这种叫做队列模型。

        还有每个订阅者都能获得消息的订阅/发布模型,现在实现的是队列模型,项目基本完成后也会实现一下订阅/发布模型。

        先实现一下构造函数和成员变量:

    // 一个队列的消费者管理
    class QueueConsumer
    {
        public:
            using ptr = std::shared_ptr<QueueConsumer>;

            QueueConsumer(const std::string& qname):
            _qname(qname),
            _rr_ser(0)
            {}
        private:
            std::mutex _mtx;
            std::string _qname; // 当前管理的队列名称
            size_t _rr_ser; // 轮转序号:决定当前把消息推送给哪个消费者
            std::vector<Consumer::ptr> _consumers; // 该队列的所有消费者
    };

        消费者管理接口:

            // 新增消费者
            Consumer::ptr create(const std::string& ctag,bool ack_flag,const ConsumerCallback& cb)
            {
                std::unique_lock<std::mutex> lock(_mtx);
                // 1.判断消费者重复添加
                for(const auto& consumer:_consumers)
                {
                    if(consumer->tag == ctag)
                        return nullptr;
                }
                // 2.构造消费者对象并添加
                Consumer::ptr consumer = std::make_shared<Consumer>(ctag,_qname,ack_flag,cb);
                _consumers.push_back(consumer);

                return consumer;
            }

            // 移除消费者
            void remove(const std::string& ctag)
            {
                std::unique_lock<std::mutex> lock(_mtx);
                for(auto it = _consumers.begin();it != _consumers.end();it++)
                {
                    if((*it)->tag == ctag)
                    {
                        _consumers.erase(it);
                        return;
                    }
                }
            }

        获取当前轮询的消费者,决定消息的去向:

            // 获取当前轮询的消费者
            Consumer::ptr choose()
            {
                std::unique_lock<std::mutex> lock(_mtx);
                if(_consumers.size() == 0)
                    return nullptr;
                return _consumers[_rr_ser++%_consumers.size()];;
            }

        取模是为了不会越界访问。

        其他的功能函数:

            // 判空
            bool empty()
            {
                std::unique_lock<std::mutex> lock(_mtx);
                return _consumers.empty();
            }

            // 判断消费者是否存在
            bool exists(const std::string& ctag)
            {
                std::unique_lock<std::mutex> lock(_mtx);
                for(const auto& consumer:_consumers)
                {
                    if(consumer->tag == ctag)
                        return true;
                }
                return false;
            }

            // 清理所有消费者
            void clear()
            {
                std::unique_lock<std::mutex> lock(_mtx);
                _consumers.clear();
                _rr_ser = 0;
            }

        析构函数调用clear()接口,这样当队列被删除时,也会删除队列的QueueConsumer对象,析构函数就自动清理数据了:

            ~QueueConsumer()
            {
                clear();
            }

 

         1-3.QueueConsumerManger类

        这个类用来管理服务器的所有队列的消费者。

        基本上就是对class QueueConsumer的封装,但是在队列执行自己的函数的时候不要上锁,因为每个队列是独立的。

    // 所有队列的消费者管理
    class QueueConsumerManger
    {
        public:
            using ptr = std::shared_ptr<QueueConsumerManger>;

            QueueConsumerManger(){}
            // 插入一个消费者管理队列
            void initQueueConsumer(const std::string& qname)
            {
                std::unique_lock<std::mutex> lock(_mtx);
                // 1.判断重复
                auto it = _queue_consumers.find(qname);
                if(it != _queue_consumers.end())
                {
                    return;
                }
                // 2.构造并插入
                QueueConsumer::ptr queueConsumer = std::make_shared<QueueConsumer>(qname);
                _queue_consumers.insert(std::make_pair(qname,queueConsumer));
            }

            // 移除一个消费者管理队列
            void destoryQueueConsumer(const std::string& qname)
            {
                std::unique_lock<std::mutex> lock(_mtx);
                // 1.判断存在
                auto it = _queue_consumers.find(qname);
                if(it == _queue_consumers.end())
                {
                    return;
                }
                // 2.移除
                _queue_consumers.erase(it);
            }

            // 向指定队列新增一个消费者
            Consumer::ptr create(const std::string& qname,const std::string& ctag,bool ack_flag,const ConsumerCallback& cb)
            {
                QueueConsumer::ptr queue;
                {
                std::unique_lock<std::mutex> lock(_mtx);
                // 1.判断队列存在
                auto qit = _queue_consumers.find(qname);
                if(qit == _queue_consumers.end())
                {
                    LOG("没有找到消费者管理队列 %s",qname.c_str());
                    return nullptr;
                }
                queue = qit->second;
                }
                // 2.调用队列管理的插入
                return queue->create(ctag,ack_flag,cb);
            }

            // 从指定队列中移除一个消费者
            void remove(const std::string& qname,const std::string& ctag)
            {
                QueueConsumer::ptr queue;
                {
                std::unique_lock<std::mutex> lock(_mtx);
                // 1.判断队列存在
                auto qit = _queue_consumers.find(qname);
                if(qit == _queue_consumers.end())
                {
                    LOG("没有找到消费者管理队列 %s",qname.c_str());
                    return;
                }
                queue = qit->second;
                }
                // 调用队列管理的移除
                queue->remove(ctag);
            }

            // 获取一个消费者管理队列的当前轮询消费者
            Consumer::ptr choose(const std::string& qname)
            {
                QueueConsumer::ptr queue;
                {
                std::unique_lock<std::mutex> lock(_mtx);
                // 1.判断队列存在
                auto qit = _queue_consumers.find(qname);
                if(qit == _queue_consumers.end())
                {
                    LOG("没有找到消费者管理队列 %s",qname.c_str());
                    return nullptr;
                }
                queue = qit->second;
                }   
                return queue->choose();
            }

            // 判空
            bool empty(const std::string& qname)
            {
                QueueConsumer::ptr queue;
                {
                std::unique_lock<std::mutex> lock(_mtx);
                // 1.判断队列存在
                auto qit = _queue_consumers.find(qname);
                if(qit == _queue_consumers.end())
                {
                    LOG("没有找到消费者管理队列 %s",qname.c_str());
                    return false;
                }
                queue = qit->second;
                }
                return queue->empty();
            }

            // 判断某个队列的某个消费者是否存在
            bool exists(const std::string& qname,const std::string& ctag)
            {
                QueueConsumer::ptr queue;
                {
                std::unique_lock<std::mutex> lock(_mtx);
                // 1.判断队列存在
                auto qit = _queue_consumers.find(qname);
                if(qit == _queue_consumers.end())
                {
                    LOG("没有找到消费者管理队列 %s",qname.c_str());
                    return false;
                }
                queue = qit->second;
                }
                return queue->exists(ctag);
            }

            // 清理
            void clear()
            {
                std::unique_lock<std::mutex> lock(_mtx);
                _queue_consumers.clear();
            }

            size_t size()
            {
                return _queue_consumers.size();
            }
        private:
            std::unordered_map<std::string,QueueConsumer::ptr> _queue_consumers; 
            std::mutex _mtx;
    };

        

第二节:单元测试

        打开mqtest目录,创建mq_consumer_test.cc,添加以下代码进行测试:

#include "../mqserver/mq_consumer.hpp"
#include <gtest/gtest.h>
#include <iostream>
#include <unordered_map>

zd::QueueConsumerManger::ptr qcmp;
void cb(const std::string,const zd::BasicProperties*,const std::string){};
// 全局测试套件------------------------------------------------
// 自己初始化自己的环境,使不同单元测试之间解耦
class ConsumerTest :public testing::Environment
{
public:
    // 全部单元测试之前调用一次
    virtual void SetUp() override
    {
        // std::cout << "单元测试执行前的环境初始化" << std::endl;
        qcmp = std::make_shared<zd::QueueConsumerManger>();
    }   

    // 全部单元测试之后调用一次
    virtual void TearDown() override
    {
        // std::cout << "单元测试执行后的环境清理" << std::endl;
        // emp->clear();
    }
};

// 单元测试
// 测试名称与类名称相同,则会先调用SetUp
// 测试队列的新增和移除
TEST(ConsumerTest,ConsumerTest_test1_Test)
{  
    std::cout << "单元测试-1" << std::endl;
    // 新增队列
    qcmp->initQueueConsumer("q1");
    ASSERT_EQ(qcmp->size(),1);
    qcmp->initQueueConsumer("q2");
    ASSERT_EQ(qcmp->size(),2);
    qcmp->initQueueConsumer("q3");
    ASSERT_EQ(qcmp->size(),3);
    qcmp->initQueueConsumer("q4");
    ASSERT_EQ(qcmp->size(),4);
    qcmp->initQueueConsumer("q5");
    ASSERT_EQ(qcmp->size(),5);
    qcmp->initQueueConsumer("q1");
    ASSERT_EQ(qcmp->size(),5);

    // 移除队列
    qcmp->destoryQueueConsumer("q2");
    ASSERT_EQ(qcmp->size(),4);
    qcmp->destoryQueueConsumer("q6");
    ASSERT_EQ(qcmp->size(),4); 

    // q1 q3 q4 q5
}
// 测试消费者的新增和移除
TEST(ConsumerTest,ConsumerTest_test2_Test)
{
    std::cout << "单元测试-2" << std::endl;
    // 向队列新增消费者
    qcmp->create("q1","consumer-1",false,cb);
    qcmp->create("q1","consumer-2",false,cb);
    qcmp->create("q1","consumer-3",false,cb);
    qcmp->create("q1","consumer-4",false,cb);
    qcmp->create("q1","consumer-5",false,cb);
    ASSERT_EQ(qcmp->exists("q1","consumer-1"),true);
    ASSERT_EQ(qcmp->exists("q1","consumer-6"),false);

    // 从队列移除消费者
    qcmp->remove("q1","consumer-2");
    ASSERT_EQ(qcmp->exists("q1","consumer-2"),false);
    ASSERT_EQ(qcmp->exists("q2","consumer-2"),false);  // q2之前已经被移除了
    // q1:c1 c3 c4 c5
}
// 测试当前轮询接口
TEST(ConsumerTest,ConsumerTest_test3_Test)
{
    std::cout << "单元测试-3" << std::endl;
    zd::Consumer::ptr cp1 = qcmp->choose("q1");
    zd::Consumer::ptr cp2 = qcmp->choose("q1");
    zd::Consumer::ptr cp3 = qcmp->choose("q1");
    zd::Consumer::ptr cp4 = qcmp->choose("q1");
    std::cout << std::endl;
    std::cout << cp1->tag << " " << cp2->tag << " " << cp3->tag << " " << cp4->tag << std::endl;
    std::cout << std::endl;

    zd::Consumer::ptr cp5 = qcmp->choose("q3");
    ASSERT_EQ(cp5.get(),nullptr);
}
// 单元测试全部结束后调用TearDown

// ----------------------------------------------------------
int main(int argc,char** argv)
{
    testing::InitGoogleTest(&argc,argv);

    testing::AddGlobalTestEnvironment(new ConsumerTest); // 注册Test的所有单元测试

    if(RUN_ALL_TESTS() != 0) // 运行所有单元测试
    {
        printf("单元测试失败!\n");
    }
    return 0;
}

        编译:

mq_consumer_test:mq_consumer_test.cc 
	g++ -std=c++14 $^ -o $@ -lgtest -lprotobuf

        执行结果:

                ​​​​​​​        ​​​​​​​        

        没有错误,打印也符合预期。

        服务器消费者管理模块就完成了。

下期预告:

        完成消费者管理模块之后,下一个要完成的是信道管理模块。

        对于服务器来说,一个信道就可以是一个消费者。如果客户端创建了一个信道(c-1),客户端就会发送创建信道的请求给服务器,服务器收到请求后也会创建一个对应的信道(c-1')。当客户端使用信道进行订阅的时候,服务器的对应信道就会承担消费者的角色,服务器会把消息推送给这个信道,这个信道再把消息推送给客户端的信道。

        以上过程中,消息都被推送给了服务器的c-1',所以对于服务器来说,c-1'才是消费者,而不是真正使用消息的客户端信道c-1。


http://www.niftyadmin.cn/n/5869137.html

相关文章

七、Spring Boot:初识与项目搭建

深入解析 Spring Boot&#xff1a;初识与项目搭建 Spring Boot 是基于 Spring Framework 的开源 Java 基础框架&#xff0c;旨在简化 Spring 应用的开发过程。它通过“约定优于配置”的理念&#xff0c;极大地减少了开发中的配置工作&#xff0c;同时提供了“开箱即用”的功能…

【视频2 - 4】初识操作系统,Linux,虚拟机

&#x1f4dd;前言说明&#xff1a; ●本专栏主要记录本人的基础算法学习以及LeetCode刷题记录&#xff0c;主要跟随B站博主灵茶山的视频进行学习&#xff0c;专栏中的每一篇文章对应B站博主灵茶山的一个视频 ●题目主要为B站视频内涉及的题目以及B站视频中提到的“课后作业”。…

【第六节】C++设计模式(结构型模式)-Bridge(桥接)模式

目录 一、问题提出 二、桥接模式的结构与优势 三、桥接模式的实现 四、桥接模式的深入讨论 五、总结 一、问题提出 面向对象设计的核心原则 面向对象设计的核心可以总结为两点&#xff1a;松耦合&#xff08;Coupling&#xff09;和高内聚&#xff08;Cohesion&am…

SSL 证书是 SSL 协议实现安全通信的必要组成部分

SSL证书和SSL/TLS协议有着密切的关系&#xff0c;但它们本质上是不同的概念。下面是两者的区别和它们之间的关系的表格&#xff1a; 属性SSL/TLS 协议SSL证书英文全称SSL&#xff08;Secure Sockets Layer&#xff09;&#xff0c;TLS&#xff08;Transport Layer Security&am…

【音视频】音视频录制、播放原理

一、音视频录制原理 通常&#xff0c;音视频录制的步骤如下图所示&#xff1a; 我们分别从音频和视频开始采样&#xff0c;通过麦克风和摄像头来接受我们的音频信息和图像信息&#xff0c;这通常是同时进行的&#xff0c;不过&#xff0c;通常视频的采集会比音频的采集慢&…

网络原理---HTTP/HTTPS

通过之前的网络编程&#xff0c;我们已经初步了解UDP和TCP的基本实现方法&#xff0c;接下来我们对其进一步的学习。 在网络编程中&#xff1a; 1.读和写数据通过Socket&#xff0c;通过Socket内置的InputStream和OutputStream(读写的基本单位都是字节&#xff09;。2.当在编…

半导体芯片制造中 W CVD(钨化学气相沉积)

半导体芯片制造中 W CVD&#xff08;钨化学气相沉积&#xff09; 的 Nucleation 解析 在钨&#xff08;W&#xff09;化学气相沉积&#xff08;CVD&#xff09;工艺中&#xff0c;Nucleation&#xff08;成核&#xff09; 是沉积过程的初始阶段&#xff0c;指钨原子或分子在基…

Ollama辅助工具在思源笔记中的核心玩法助力实现AI高效创作

前言&#xff1a;在创作的道路上&#xff0c;找到合适的工具就像找到了一位贴心的好伙伴。思源笔记以其强大的编辑功能、灵活的整理方式和丰富的插件支持&#xff0c;成为了许多写作者的心头好。无论是记录闪现的灵感火花&#xff0c;还是撰写复杂的文档&#xff0c;它都能轻松…