C++11实现线程池

蒸汽
蒸汽
发布于 2024-11-07 / 15 阅读
0
0

C++11实现线程池

本文使用C++11实现实现线程池,涉及的技术如下:

可变参数
std::future
decltype
packaged_task
bind
支持可变参数列表
支持获取任务返回值 

线程池的设计流程

设计线程池类。

  • 线程池的初始化,工作线程个数。

  • 启动线程池,创建线程。

  • 线程池运行线程。

  • 用线程池启用任务。

  • 停止所有线程。

  • 等待当前任务队列中, 所有工作全部结束(队列无任务)。

    1.设计线程池类

    线程池类,采用c++11来实现了

class ZERO_ThreadPool
{
protected:
    struct TaskFunc
    {
        TaskFunc(uint64_t expireTime) : _expireTime(expireTime)
        { }
​
        std::function<void()>   _func;
        int64_t                _expireTime = 0; //超时的绝对时间
    };
    typedef shared_ptr<TaskFunc> TaskFuncPtr;
    /**
    * @brief 获取任务
    * @return TaskFuncPtr
    */
    bool get(TaskFuncPtr&task);
    /**
    * @brief 线程池是否退出
    */
    bool isTerminate() { return bTerminate_; }
    /**
    * @brief 线程运行态
    */
    void run();
​
public:
    /**
​
   * @brief 构造函数
     */
         ZERO_ThreadPool();
         /**
        * @brief 析构, 会停止所有线程
          */
              virtual ~ZERO_ThreadPool();
              /**
             * @brief 初始化.
               * @param num 工作线程个数
                 */
                 bool init(size_t num);
                 /**
                 * @brief 获取线程个数.
                 * @return size_t 线程个数
                   */
                   size_t getThreadNum()
                   {
                   std::unique_lock<std::mutex> lock(mutex_);
                   return threads_.size();
                   }
                   /**
                 * @brief 获取当前线程池的任务数
                 * @return size_t 线程池的任务数
                   */
                   size_t getJobNum()
                   {
                   std::unique_lock<std::mutex> lock(mutex_);
                   return tasks_.size();
                   }
                   /**
                 * @brief 停止所有线程, 会等待所有线程结束
                   */
                   void stop();
                   /**
                 * @brief 启动所有线程
                   */
                   bool start(); // 创建线程
                   /**
                 * @brief 等待当前任务队列中, 所有工作全部结束(队列无任务).
                   *
                 * @param millsecond 等待的时间(ms), -1:永远等待
                 * @return           true, 所有工作都处理完毕
                 * false,超时退出
                   */
                   bool waitForAllDone(int millsecond = -1);
                   /**
                 * @brief 用线程池启用任务(F是function, Args是参数)
                   *
                 * @param ParentFunctor
                 * @param tf
                 * @return 返回任务的future对象, 可以通过这个对象来获取返回值
                   */
                   template <class F, class... Args>
                   auto exec(F&& f, Args&&... args) -> std::future<decltype(f(args...))>
                   {
                   return exec(0,f,args...);
                   }
                   /**
                 * @brief 用线程池启用任务(F是function, Args是参数)
                   *
                 * @param 超时时间 ,单位ms (为0时不做超时控制) ;若任务超时,此任务将被丢弃
                 * @param bind function
                 * @return 返回任务的future对象, 可以通过这个对象来获取返回值
                   */
                   /*
                   template <class F, class... Args>
                   它是c++里新增的最强大的特性之一,它对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数
                   auto exec(F &&f, Args &&... args) -> std::future<decltype(f(args...))>
                   std::future<decltype(f(args...))>:返回future,调用者可以通过future获取返回值
                   返回值后置
                   */
                   template <class F, class... Args>
                   auto exec(int64_t timeoutMs, F&& f, Args&&... args) -> std::future<decltype(f(args...))>
                   {
                   int64_t expireTime =  (timeoutMs == 0 ? 0 : TNOWMS + timeoutMs);  // 获取现在时间
                   //定义返回值类型
                   using RetType = decltype(f(args...));  // 推导返回值
                   // 封装任务
                   auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
                   TaskFuncPtr fPtr = std::make_shared<TaskFunc>(expireTime);  // 封装任务指针,设置过期时间
                   fPtr->_func = [task]() {  // 具体执行的函数
                   (*task)();
                   };
                   std::unique_lock<std::mutex> lock(mutex_);
                   tasks_.push(fPtr);              // 插入任务
                   condition_.notify_one();        // 唤醒阻塞的线程,可以考虑只有任务队列为空的情况再去notify
                   return task->get_future();;
                   }
                   protected:
                   queue<TaskFuncPtr>          tasks_;       //任务队列
                   std::vector<std::thread*>   threads_;     //工作线程,vector 大致可以认为是一个可变数组
                   std::mutex                  mutex_;       //互斥锁
                   std::condition_variable     condition_;   //条件变量
                   size_t                      threadNum_;   //线程数量
                   bool                        bTerminate_;  //判定是否终止线程池
                   std::atomic<int>            atomic_{ 0 }; //原子变量
                   };

使用说明

/**
​
 * @file zero_thread_pool.h
 * @brief 线程池类,采用c++11来实现了,
 * 使用说明:
 * ZERO_ThreadPool tpool;
 * tpool.init(5);   //初始化线程池线程数
 * //启动线程方式
 * tpool.start();
 * //将任务丢到线程池中
 * tpool.exec(testFunction, 10);    //参数和start相同
 * //等待线程池结束
 * tpool.waitForAllDone(1000);      //参数<0时, 表示无限等待(注意有人调用stop也会推出)
 * //此时: 外部需要结束线程池是调用
 * tpool.stop();
   */
​
/* 注意:
​
 * ZERO_ThreadPool::exec执行任务返回的是个future, 因此可以通过future异步获取结果, 比如:
 * int testInt(int i)
 * {
 * return i;
 * }
 * auto f = tpool.exec(testInt, 5);
 * cout << f.get() << endl;   //当testInt在线程池中执行后, f.get()会返回数值5
   *
 * class Test
 * {
 * public:
 * int test(int i);
 * };
 * Test t;
 * auto f = tpool.exec(std::bind(&Test::test, &t, std::placeholders::_1), 10);
 * //返回的future对象, 可以检查是否执行
 * cout << f.get() << endl;
   */

2.线程池的初始化

线程池初始化,首先加锁对资源进行保护,判断线程池为空,然后记录线程池线程的数量。

bool ZERO_ThreadPool::init(size_t num)
{
    std::unique_lock<std::mutex> lock(mutex_);
​
    if (!threads_.empty())  //空则返回ture
    {
        return false;
    }
    threadNum_ = num; // 设置线程数量
    return true;
​
}

3.启动线程池

启动线程池,首先加锁对资源进行保护,判断线程池为空。创建线程,并放入vector数组中。

bool ZERO_ThreadPool::start()
{
    std::unique_lock<std::mutex> lock(mutex_);
​
    if (!threads_.empty())
    {
        return false;
    }
    for (size_t i = 0; i < threadNum_; i++)
    {
        threads_.push_back(new thread(&ZERO_ThreadPool::run, this));
    }
    return true;
​
}

4.线程池运行线程

判定是否终止线程池

从任务队列获取任务,判断是否任务存在。如果任务队列为空,睡眠等待条件变量唤醒。如果任务队列不为空,从队列中获取任务,释放队列中的任务。

返回的任务抛入线程池执行。

void ZERO_ThreadPool::run()  // 执行任务的线程
{
    //调用处理部分
    while (!isTerminate()) // 判断是不是要停止
    {
        TaskFuncPtr task;
        bool ok = get(task);        // 1. 读取任务
        if (ok)
        {
            ++atomic_;
            try
            {
                if (task->_expireTime != 0 && task->_expireTime  < TNOWMS )
                {
                    //超时任务,是否需要处理?
                }
                else
                {
                    task->_func();  // 2. 执行任务
                }
            }
            catch (...)
            {
            }
​
            --atomic_;
            //任务都执行完毕了
            std::unique_lock<std::mutex> lock(mutex_);
            if (atomic_ == 0 && tasks_.empty()) // 3.检测是否所有任务都运行完毕
            {
                condition_.notify_all();  // 这里只是为了通知waitForAllDone
            }
        }
    }
​
}

bool ZERO_ThreadPool::get(TaskFuncPtr& task)
{
    std::unique_lock<std::mutex> lock(mutex_); // 也要等锁
​
    if (tasks_.empty()) // 判断是否任务存在
    {
        condition_.wait(lock, [this] { return bTerminate_   // 要终止线程池 bTerminate_设置为true,外部notify后
                    || !tasks_.empty();                     // 任务队列不为空
        });                                                 // notify ->  1. 退出线程池; 2.任务队列不为空
    }
    if (bTerminate_)
        return false;
    if (!tasks_.empty()) //不为空
    {
        task = std::move(tasks_.front());  // 使用了移动语义
        tasks_.pop(); // 释放一个任务
        return true;
    }
    return false;
​
}

5.向线程池抛入任务

使用了可变模块参数、智能指针、bind、function、捕获列表的相关技术知识。
​
返回任务的future对象, 可以通过这个对象来获取返回值。
​
超时时间 ,单位ms (为0时不做超时控制) ;若任务超时,此任务将被丢弃。
​
可变模块参数对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数。)
/**
    * @brief 用线程池启用任务(F是function, Args是参数)
    *
    * @param ParentFunctor
    * @param tf
    * @return 返回任务的future对象, 可以通过这个对象来获取返回值
    */
    template <class F, class... Args>
    auto exec(F&& f, Args&&... args) -> std::future<decltype(f(args...))>
    {
        return exec(0,f,args...);
    }

    /**
    * @brief 用线程池启用任务(F是function, Args是参数)
    *
    * @param 超时时间 ,单位ms (为0时不做超时控制) ;若任务超时,此任务将被丢弃
    * @param bind function
    * @return 返回任务的future对象, 可以通过这个对象来获取返回值
    */
    /*
    template <class F, class... Args>
    它是c++里新增的最强大的特性之一,它对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数
    auto exec(F &&f, Args &&... args) -> std::future<decltype(f(args...))>
    std::future<decltype(f(args...))>:返回future,调用者可以通过future获取返回值
    返回值后置
    */
    template <class F, class... Args>
    auto exec(int64_t timeoutMs, F&& f, Args&&... args) -> std::future<decltype(f(args...))>
    {
        int64_t expireTime =  (timeoutMs == 0 ? 0 : TNOWMS + timeoutMs);  // 获取现在时间
        //定义返回值类型
        using RetType = decltype(f(args...));  // 推导返回值
        // 封装任务
        auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
        TaskFuncPtr fPtr = std::make_shared<TaskFunc>(expireTime);  // 封装任务指针,设置过期时间
        fPtr->_func = [task]() {  // 具体执行的函数
            (*task)();
        };
        std::unique_lock<std::mutex> lock(mutex_);
        tasks_.push(fPtr);              // 插入任务
        condition_.notify_one();        // 唤醒阻塞的线程,可以考虑只有任务队列为空的情况再去notify
        return task->get_future();;
    }

6.停止所有线程。

停止线程池,数组线程池停止标志,唤醒所有条件变量等待线程退出,清理线程vector数组。

void ZERO_ThreadPool::stop()
{
    {
        std::unique_lock<std::mutex> lock(mutex_);  //加锁

        bTerminate_ = true;         // 触发退出
        condition_.notify_all();    //唤醒所有等待线程
    }
    for (size_t i = 0; i < threads_.size(); i++)
    {
        //joinable判断线程是否可以加入等待
        if(threads_[i]->joinable())
        {
            threads_[i]->join(); // 等线程推出
        }
        delete threads_[i];
        threads_[i] = NULL;
    }
    std::unique_lock<std::mutex> lock(mutex_);
    threads_.clear();

}

7.等待当前任务队列中, 所有工作全部结束

如果当前任务队列为空,直接退出。

当前队列不为空,等待条件变量,线程睡眠。满足条件后退出。

bool ZERO_ThreadPool::waitForAllDone(int millsecond)
{
    std::unique_lock<std::mutex> lock(mutex_);

    if (tasks_.empty())
        return true;
    if (millsecond < 0)
    {
        //当被唤醒后 && 第二的参数为false
        condition_.wait(lock, [this] { return tasks_.empty(); });
        return true;
    }
    else
    {
        //在线程收到唤醒通知或者时间超时之前,该线程都会 处于阻塞状态,如果收到唤醒通知或者时间超时,wait_for返回,剩下操作和wait类似
        return condition_.wait_for(lock, std::chrono::milliseconds(millsecond), [this] { return tasks_.empty(); });
    }

}

完整代码

zero_threadpool.h

//zero_threadpool.h
#ifndef ZERO_THREADPOOL_H
#define ZERO_THREADPOOL_H

#include <future>
#include <functional>
#include <iostream>
#include <queue>
#include <mutex>
#include <memory>
//#include <winsock.h>

#ifdef WIN32
#include <windows.h>
#else
#include <sys/time.h>
#endif

using namespace std;

void getNow(timeval *tv);
int64_t getNowMs();

#define TNOW      getNow()
#define TNOWMS    getNowMs()

/
/**

 * @file zero_thread_pool.h
 * @brief 线程池类,采用c++11来实现了,
 * 使用说明:
 * ZERO_ThreadPool tpool;
 * tpool.init(5);   //初始化线程池线程数
 * //启动线程方式
 * tpool.start();
 * //将任务丢到线程池中
 * tpool.exec(testFunction, 10);    //参数和start相同
 * //等待线程池结束
 * tpool.waitForAllDone(1000);      //参数<0时, 表示无限等待(注意有人调用stop也会推出)
 * //此时: 外部需要结束线程池是调用
 * tpool.stop();
 * 注意:
 * ZERO_ThreadPool::exec执行任务返回的是个future, 因此可以通过future异步获取结果, 比如:
 * int testInt(int i)
 * {
 * return i;
 * }
 * auto f = tpool.exec(testInt, 5);
 * cout << f.get() << endl;   //当testInt在线程池中执行后, f.get()会返回数值5
   *
 * class Test
 * {
 * public:
 * int test(int i);
 * };
 * Test t;
 * auto f = tpool.exec(std::bind(&Test::test, &t, std::placeholders::_1), 10);
 * //返回的future对象, 可以检查是否执行
 * cout << f.get() << endl;
   */

class ZERO_ThreadPool
{
protected:
    struct TaskFunc
    {
        TaskFunc(uint64_t expireTime) : _expireTime(expireTime)
        { }

        std::function<void()>   _func;
        int64_t                _expireTime = 0;	//超时的绝对时间
    };
    typedef shared_ptr<TaskFunc> TaskFuncPtr;
     
    /**
    * @brief 获取任务
    * @return TaskFuncPtr
    */
    bool get(TaskFuncPtr&task);
     
    /**
    * @brief 线程池是否退出
    */
    bool isTerminate() { return bTerminate_; }
     
    /**
    * @brief 线程运行态
    */
    void run();


​	
public:
​    /**
​    * @brief 构造函数
​    */
​    ZERO_ThreadPool();

    /**
    * @brief 析构, 会停止所有线程
    */
    virtual ~ZERO_ThreadPool();
     
    /**
    * @brief 初始化.
    * @param num 工作线程个数
    */
    bool init(size_t num);
     
    /**
    * @brief 获取线程个数.
    * @return size_t 线程个数
    */
    size_t getThreadNum()
    {
        std::unique_lock<std::mutex> lock(mutex_);
     
        return threads_.size();
    }
     
    /**
    * @brief 获取当前线程池的任务数
    * @return size_t 线程池的任务数
    */
    size_t getJobNum()
    {
        std::unique_lock<std::mutex> lock(mutex_);
        return tasks_.size();
    }
     
    /**
    * @brief 停止所有线程, 会等待所有线程结束
    */
    void stop();
     
    /**
    * @brief 启动所有线程
    */
    bool start(); // 创建线程
     
    /**
    * @brief 等待当前任务队列中, 所有工作全部结束(队列无任务).
    *
    * @param millsecond 等待的时间(ms), -1:永远等待
    * @return           true, 所有工作都处理完毕
    *                   false,超时退出
    */
    bool waitForAllDone(int millsecond = -1);

 

    /**
    * @brief 用线程池启用任务(F是function, Args是参数)
    *
    * @param ParentFunctor
    * @param tf
    * @return 返回任务的future对象, 可以通过这个对象来获取返回值
    */
    template <class F, class... Args>
    auto exec(F&& f, Args&&... args) -> std::future<decltype(f(args...))>
    {
        return exec(0,f,args...);
    }

 

    /**
    * @brief 用线程池启用任务(F是function, Args是参数)
    *
    * @param 超时时间 ,单位ms (为0时不做超时控制) ;若任务超时,此任务将被丢弃
    * @param bind function
    * @return 返回任务的future对象, 可以通过这个对象来获取返回值
    */
    /*
    template <class F, class... Args>
    它是c++里新增的最强大的特性之一,它对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数
    auto exec(F &&f, Args &&... args) -> std::future<decltype(f(args...))>
    std::future<decltype(f(args...))>:返回future,调用者可以通过future获取返回值
    返回值后置
    */
    template <class F, class... Args>
    auto exec(int64_t timeoutMs, F&& f, Args&&... args) -> std::future<decltype(f(args...))>
    {
        int64_t expireTime =  (timeoutMs == 0 ? 0 : TNOWMS + timeoutMs);  // 获取现在时间
        //定义返回值类型
        using RetType = decltype(f(args...));  // 推导返回值
        // 封装任务
        auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
        TaskFuncPtr fPtr = std::make_shared<TaskFunc>(expireTime);  // 封装任务指针,设置过期时间
        fPtr->_func = [task]() {  // 具体执行的函数
            (*task)();
        };
     
        std::unique_lock<std::mutex> lock(mutex_);
        tasks_.push(fPtr);              // 插入任务
        condition_.notify_one();        // 唤醒阻塞的线程,可以考虑只有任务队列为空的情况再去notify
     
        return task->get_future();;
    }

protected:

    queue<TaskFuncPtr> 			tasks_;		  //任务队列
    std::vector<std::thread*> 	threads_; 	  //工作线程,vector 大致可以认为是一个可变数组
    std::mutex                	mutex_;		  //互斥锁
    std::condition_variable   	condition_;	  //条件变量
    size_t                    	threadNum_;	  //线程数量
    bool                      	bTerminate_;  //判定是否终止线程池
    std::atomic<int>          	atomic_{ 0 }; //原子变量

};

#endif // ZERO_THREADPOOL_H

zero_threadpool.cpp

#include "zero_threadpool.h"
​
​
ZERO_ThreadPool::ZERO_ThreadPool()
    :  threadNum_(1), bTerminate_(false)
{
}
​
ZERO_ThreadPool::~ZERO_ThreadPool()
{
    stop();
}
​
bool ZERO_ThreadPool::init(size_t num)
{
    std::unique_lock<std::mutex> lock(mutex_);
​
    if (!threads_.empty())  //空则返回ture
    {
        return false;
    }
     
    threadNum_ = num; // 设置线程数量
    return true;
​
}
​
void ZERO_ThreadPool::stop()
{
    {
        std::unique_lock<std::mutex> lock(mutex_);  //加锁
​
        bTerminate_ = true;         // 触发退出
     
        condition_.notify_all();    //唤醒所有等待线程
    }
     
    for (size_t i = 0; i < threads_.size(); i++)
    {
        //joinable判断线程是否可以加入等待
        if(threads_[i]->joinable())
        {
            threads_[i]->join(); // 等线程推出
        }
        delete threads_[i];
        threads_[i] = NULL;
    }
     
    std::unique_lock<std::mutex> lock(mutex_);
    threads_.clear();
​
}
​
bool ZERO_ThreadPool::start()
{
    std::unique_lock<std::mutex> lock(mutex_);
​
    if (!threads_.empty())
    {
        return false;
    }
     
    for (size_t i = 0; i < threadNum_; i++)
    {
        threads_.push_back(new thread(&ZERO_ThreadPool::run, this));
    }
    return true;
​
}
​
bool ZERO_ThreadPool::get(TaskFuncPtr& task)
{
    std::unique_lock<std::mutex> lock(mutex_); // 也要等锁
​
    if (tasks_.empty()) // 判断是否任务存在
    {
        condition_.wait(lock, [this] { return bTerminate_   // 要终止线程池 bTerminate_设置为true,外部notify后
                    || !tasks_.empty();                     // 任务队列不为空
        });                                                 // notify ->  1. 退出线程池; 2.任务队列不为空
    }
     
    if (bTerminate_)
        return false;
     
    if (!tasks_.empty()) //不为空
    {
        task = std::move(tasks_.front());  // 使用了移动语义
     
        tasks_.pop(); // 释放一个任务
     
        return true;
    }
     
    return false;
​
}
​
void ZERO_ThreadPool::run()  // 执行任务的线程
{
    //调用处理部分
    while (!isTerminate()) // 判断是不是要停止
    {
        TaskFuncPtr task;
        bool ok = get(task);        // 1. 读取任务
        if (ok)
        {
            ++atomic_;
            try
            {
                if (task->_expireTime != 0 && task->_expireTime  < TNOWMS )
                {
                    //超时任务,是否需要处理?
                }
                else
                {
                    task->_func();  // 2. 执行任务
                }
            }
            catch (...)
            {
            }
​
            --atomic_;
     
            //任务都执行完毕了
            std::unique_lock<std::mutex> lock(mutex_);
            if (atomic_ == 0 && tasks_.empty()) // 3.检测是否所有任务都运行完毕
            {
                condition_.notify_all();  // 这里只是为了通知waitForAllDone
            }
        }
    }
​
}
// 1000ms
​
bool ZERO_ThreadPool::waitForAllDone(int millsecond)
{
    std::unique_lock<std::mutex> lock(mutex_);
​
    if (tasks_.empty())
        return true;
     
    if (millsecond < 0)
    {
        //当被唤醒后 && 第二的参数为false
        condition_.wait(lock, [this] { return tasks_.empty(); });
        return true;
    }
    else
    {
        //在线程收到唤醒通知或者时间超时之前,该线程都会 处于阻塞状态,如果收到唤醒通知或者时间超时,wait_for返回,剩下操作和wait类似
        return condition_.wait_for(lock, std::chrono::milliseconds(millsecond), [this] { return tasks_.empty(); });
    }
​
}
​
​
int gettimeofday(struct timeval &tv)
{
#if WIN32
    time_t clock;
    struct tm tm;
    SYSTEMTIME wtm;
    GetLocalTime(&wtm);
    tm.tm_year   = wtm.wYear - 1900;
    tm.tm_mon   = wtm.wMonth - 1;
    tm.tm_mday   = wtm.wDay;
    tm.tm_hour   = wtm.wHour;
    tm.tm_min   = wtm.wMinute;
    tm.tm_sec   = wtm.wSecond;
    tm. tm_isdst  = -1;
    clock = mktime(&tm);
    tv.tv_sec = clock;
    tv.tv_usec = wtm.wMilliseconds * 1000;
​
    return 0;
​
#else
    return ::gettimeofday(&tv, 0);
#endif
}
​
void getNow(timeval *tv)
{
#if TARGET_PLATFORM_IOS || TARGET_PLATFORM_LINUX
​
    int idx = _buf_idx;
    *tv = _t[idx];
    if(fabs(_cpu_cycle - 0) < 0.0001 && _use_tsc)
    {
        addTimeOffset(*tv, idx);
    }
    else
    {
        TC_Common::gettimeofday(*tv);
    }
​
#else
    gettimeofday(*tv);
#endif
}
​
int64_t getNowMs()
{
    struct timeval tv;
    getNow(&tv);
​
    return tv.tv_sec * (int64_t)1000 + tv.tv_usec / 1000;
​
}
​
threadpool.cpp
#include <iostream>
#include "zero_threadpool.h"
using namespace std;
​
void func0()
{
    cout << "func0()" << endl;
}
​
void func1(int a)
{
    cout << "func1 int =" << a << endl;
}
​
//void func1(string a)
//{
//    cout << "func1 string =" << a << endl;
//}
​
void func2(int a, string b)
{
    cout << "func2() a=" << a << ", b=" << b<< endl;
}
​
​
void test1() // 简单测试线程池
{
    ZERO_ThreadPool threadpool;     // 封装一个线程池
    threadpool.init(1);             // 设置线程的数量
    threadpool.start();              // 启动线程池,创建线程, 线程没有start,创建完毕后被调度
    // 假如要执行的任务
//    threadpool.exec(1000,func0); // 1000是超时1000的意思,目前没有起作用
      threadpool.exec(func1, 10);
      threadpool.exec((void(*)(int))func1, 10);   // 插入任务
//     threadpool.exec((void(*)(string))func1, "king");
     threadpool.exec(func2, 20, "darren"); // 插入任务
    threadpool.waitForAllDone(); // 等待都执行完退出 运行函数 插入1000个任务, 等1000个任务执行完毕才退出
    threadpool.stop();      // 这里才是真正执行退出
}
​
int func1_future(int a)
{
    cout << "func1() a=" << a << endl;
    return a;
}
​
string func2_future(int a, string b)
{
    cout << "func1() a=" << a << ", b=" << b<< endl;
    return b;
}
​
void test2() // 测试任务函数返回值
{
    ZERO_ThreadPool threadpool;     //封装一个线程池
    threadpool.init(1);             //设置线程数量
    threadpool.start();             // 启动线程池,创建线程
    // 假如要执行的任务
    std::future<decltype (func1_future(0))> result1 = threadpool.exec(func1_future, 10);
    std::future<string> result2 = threadpool.exec(func2_future, 20, "darren");
//  auto result2 = threadpool.exec(func2_future, 20, "darren");
​
    std::cout << "result1: " << result1.get() << std::endl;
    std::cout << "result2: " << result2.get() << std::endl;
    threadpool.waitForAllDone();
    threadpool.stop();
​
}
​
class Test
{
public:
    int test(int i){
        cout << _name << ", i = " << i << endl;
        return i;
    }
//    int test(string str) {
//        cout << _name << ", str = " << str << endl;
//    }
    void setName(string name){
        _name = name;
    }
    string _name;
};
​
void test3() // 测试类对象函数的绑定
{
    ZERO_ThreadPool threadpool;
    threadpool.init(1);
    threadpool.start(); // 启动线程池
    Test t1;
    Test t2;
    t1.setName("Test1");
    t2.setName("Test2");
    auto f1 = threadpool.exec(std::bind(&Test::test, &t1, std::placeholders::_1), 10);
    auto f2 = threadpool.exec(std::bind(&Test::test, &t2, std::placeholders::_1), 20);
    threadpool.waitForAllDone();
    cout << "t1 " << f1.get() << endl;
    cout << "t2 " << f2.get() << endl;
}
​
void func2_1(int a, int b)
{
    cout << "func2_1 a + b = " << a+b << endl;
}
​
int func2_1(string a, string b)
{
    cout << "func2_1 a + b = " << a << b<< endl;
    return 0;
}
void test4() // 简单测试线程池
{
    ZERO_ThreadPool threadpool;     // 封装一个线程池
    threadpool.init(1);             // 设置线程的数量
    threadpool.start();              // 启动线程池,创建线程, 线程没有start,创建完毕后被调度
    // 假如要执行的任务
    threadpool.exec((void(*)(int, int))func2_1, 10, 20);   // 插入任务
    threadpool.exec((int(*)(string, string))func2_1, "king", " and darren");
    threadpool.waitForAllDone(); // 等待都执行完退出 运行函数 插入1000个任务, 等1000个任务执行完毕才退出
    threadpool.stop();      // 这里才是真正执行退出
}
​
int main()
{
//  test1(); // 简单测试线程池
    test2(); // 测试任务函数返回值
//  test3(); // 测试类对象函数的绑定
//  test4();
    cout << "main finish!" << endl;
    return 0;
}



评论