0


C++:设计一个线程安全的队列

文章目录

在这里插入图片描述

1. 目的

串行的程序只用到单个 CPU 核心, 希望加速整个程序, 考虑使用多线程加速。典型情况下可以找到生产者、消费者,两个角色之间通过队列进行数据交互:

  • 生产者负责把数据放入队列Q
  • 消费者负责从队列取出数据Q

要求队列是线程安全的,即:不能有读写冲突等。

2. 实现?验证!

这里并不给出具体实现, 主要原因是网络上有太多的“实现”,也许很强大,但是否正确则有待验证,反倒是怎样验证正确性,总是被忽略:

  • 新手小白,或者“算法工程师”们,往往没怎么写过合格的单元测试
  • 验证也许只是粗略跑一下,Thread Sanitizer 这样的有力武器没有被用上

makefile

我是在 Linux 下验证的, 用的 makefile 如下, 重点是 tsan 的设定, 以及 gtest 的配置:

SANITIZER_OPTION=-fsanitize=thread -fno-omit-frame-pointer
#SANITIZER_OPTION=

all:
    clang++ test_queue.cpp -I. -g `pkg-config --cflags --libs gtest gtest_main`${SANITIZER_OPTION}

Queue 类的 public 成员

template<typenameT>classQueue{public:Queue(unsignedint max_size =0);~Queue();voidpush(const T& elem);
    T pop();boolempty();
    size_t size();

其中:

  • Queue是模板类,这样可以支持任意数据类型作为队列元素(但队列中所有元素类型需要相同)
  • 所有成员函数都不能是 const 的, 尤其是 empty 和 size 函数, 原因是当前线程调用它们时,其他线程可能立即改变队列成员,需要 mutex 锁住, 对于 mutex 的操作导致函数不再是 const 的
  • 支持设定队列最大元素数量,如果没指定, 看似用0,实际表示“无限”

单元测试

如下是基于 GoogleTest 和 Queue 的 ADT 给出的单元测试代码。
如果你基于上述 Queue 类的定义, 能通过如下单元测试, 那么程序的正确性应该说比较高了。这部分代码的价值比 Queue 本身的价值要更高, 但往往被人们忽略:

#include<gtest/gtest.h>#include<digimon/queue.hpp>#include<shadow/queue.hpp>#include<unistd.h>usingnamespace digimon;//using namespace Shadow;TEST(Queue, SingleThread){
    Queue<int> q;EXPECT_EQ(q.empty(),true);

    q.push(1);
    q.push(2);EXPECT_EQ(q.empty(),false);int x = q.pop();EXPECT_EQ(x,1);

    x = q.pop();EXPECT_EQ(x,2);}classThreadData{public:ThreadData(){}ThreadData(Queue<int>* _q,int _start,int _end):q(_q),start(_start),end(_end){}public:
    Queue<int>* q;int start;int end;};classConsumerThreadData{public:ConsumerThreadData(Queue<int>* _q,int _start,int _end):q(_q),start(_start),end(_end),sum(0){pthread_mutex_init(&mutex,NULL);}~ConsumerThreadData(){pthread_mutex_destroy(&mutex);}public:
    Queue<int>* q;int start;int end;int sum;
    pthread_mutex_t mutex;};staticvoid*producer(void* _thread_data){
    ThreadData* thread_data =(ThreadData*)_thread_data;for(int i = thread_data->start; i < thread_data->end; i++){
        thread_data->q->push(i);}returnNULL;}TEST(Queue, MultiThread_MultiProducer){
    Queue<int> q;

    pthread_t t1;
    ThreadData thread_data1(&q,0,10);pthread_create(&t1,NULL, producer,(void*)&thread_data1);

    pthread_t t2;
    ThreadData thread_data2(&q,0,10);pthread_create(&t2,NULL, producer,(void*)&thread_data2);pthread_join(t1,NULL);pthread_join(t2,NULL);EXPECT_EQ(q.empty(),false);EXPECT_EQ(q.size(),20);int sum =0;while(!q.empty()){int x = q.pop();
        sum += x;}int expected_sum =90;EXPECT_EQ(expected_sum, sum);}staticvoid*consumer(void* _thread_data){
    ConsumerThreadData* thread_data =(ConsumerThreadData*)_thread_data;for(int i = thread_data->start; i < thread_data->end; i++){int x = thread_data->q->pop();
        thread_data->sum += x;
        std::cout << x << std::endl;}returnNULL;}TEST(Queue, MultiThread_SingleProducer_SingleConsumer){
    Queue<int> q;

    pthread_t t1;
    ThreadData thread_data1(&q,0,10);pthread_create(&t1,NULL, producer,(void*)&thread_data1);

    pthread_t t2;
    ConsumerThreadData thread_data2(&q,0,10);pthread_create(&t2,NULL, consumer,(void*)&thread_data2);pthread_join(t1,NULL);pthread_join(t2,NULL);EXPECT_EQ(q.empty(),true);EXPECT_EQ(q.size(),0);}staticvoid*producer_slow(void* _thread_data){
    ThreadData* thread_data =(ThreadData*)_thread_data;for(int i = thread_data->start; i < thread_data->end; i++){sleep(1);
        thread_data->q->push(i);}returnNULL;}TEST(Queue, MultiThread_Consumer_Meaningless_Grab_Mutex){
    Queue<int> q;

    pthread_t t1;
    ThreadData thread_data1(&q,0,3);pthread_create(&t1,NULL, producer_slow,(void*)&thread_data1);

    pthread_t t2;
    ConsumerThreadData thread_data2(&q,0,3);pthread_create(&t2,NULL, consumer,(void*)&thread_data2);pthread_join(t1,NULL);pthread_join(t2,NULL);EXPECT_EQ(q.empty(),true);EXPECT_EQ(q.size(),0);EXPECT_EQ(thread_data2.sum,3);}staticvoid*consumer_slow(void* _thread_data){
    ConsumerThreadData* thread_data =(ConsumerThreadData*)_thread_data;for(int i = thread_data->start; i < thread_data->end; i++){EXPECT_EQ(thread_data->q->size(),5);int x = thread_data->q->pop();
        thread_data->sum += x;sleep(1);
        std::cout << x << std::endl;}returnNULL;}TEST(Queue, LimitedQueueSize){
    Queue<int>q(5);

    pthread_t t1;
    ThreadData thread_data1(&q,0,10);pthread_create(&t1,NULL, producer,(void*)&thread_data1);

    pthread_t t2;
    ConsumerThreadData thread_data2(&q,0,5);pthread_create(&t2,NULL, consumer_slow,(void*)&thread_data2);pthread_join(t1,NULL);pthread_join(t2,NULL);EXPECT_EQ(q.empty(),false);EXPECT_EQ(q.size(),5);}

3. 实现 Queue 类的方案

可以基于 C++ 11 实现, 不过据说 C++11 的 thread 在华为手机上有问题,传闻中 pthread 能消除问题;
于是乎还有另一个选择: C++03 + pthread 实现 Queue 类。

Windows 平台上可以使用 windows-pthreads 库, 它是基于 Windows threads 模拟实现了 PThread 和 Semaphore 接口。(完)


本文转载自: https://blog.csdn.net/baiyu33/article/details/130663786
版权归原作者 baiyu33 所有, 如有侵权,请联系我们删除。

“C++:设计一个线程安全的队列”的评论:

还没有评论