百度搜索网站怎么做,电子商务网站建设与维护题库,中国商标网官网入口,美丽乡村建设网站C写一个线程池 文章目录 C写一个线程池设计思路测试数据的实现任务类的实现线程池类的实现线程池构造函数线程池入口函数队列中取任务添加任务函数线程池终止函数 源码 之前用C语言写了一个线程池#xff0c;详情请见#xff1a; C语言写一个线程池
这次换成C了#xff01;…C写一个线程池 文章目录 C写一个线程池设计思路测试数据的实现任务类的实现线程池类的实现线程池构造函数线程池入口函数队列中取任务添加任务函数线程池终止函数 源码 之前用C语言写了一个线程池详情请见 C语言写一个线程池
这次换成C了由于C支持泛型编程所以代码的灵活性提高了不知道多少倍
设计思路
线程池的原理就不解释了这次主要来讲一下我使用C进行面向过程、面向对象、泛型设计时的思想。
线程池的工作原理是存在一个具有多个线程的空间我们对这些线程进行一个统一的管理利用C提供的线程类。然后具有一个存放任务的队列这些线程依次从中取出任务然后执行。
从上面的过程中发现可以将线程池作为一个对象来进行设计。这个对象中的成员至少有
存放n个线程对象的空间可以使用 vectorstd::thread 进行管理。标记每一个线程的工作状态的容器这里可以使用 unordered_mapstd::thread::id, bool来进行管理。存放任务的队列等等…在设计的过程中会发现
我希望设计一个很万能的线程池即可以接受任何任务我只需要将对应的函数和参数传入进队列中就可以让线程自动执行。
因此设计一个任务类是必不可少的。因为我们从函数外部传进去的函数和参数不一定相同而且不同功能的任务之间没有一个合适的管理方式因此我们需要设计一个任务类来兼容不同参数并且将参数和工作函数绑定到一块的情况。
测试数据的实现
我个人比较喜欢在设计代码之前假设他已经设计好然后先写出他的测试方法和数据之后一点点来实现这次我选择的测试方法是求1~50000000中的素数个数不使用素数筛
先实现判断素数的功能函数
// 判断素数n是不是素数
bool is_prime(int n) {for (int i 2; i * i n; i) {if (n % i 0) return false;}return true;
}// 求出 start 到 end 返回的素数个数
int prime_count(int start, int end) {int ans 0;for (int i start; i end; i) {ans is_prime(i);}return ans;
}// 需要传入到线程池内的函数求出 l 到 r 的素数个数然后保存在 ret 中
void worker(int l, int r, int ret) {cout this_thread::get_id() : begin endl;ret prime_count(l, r);cout this_thread::get_id() : end endl;return ;
}
下面是主函数
int main() {#define MAX_N 5000000 // 这里假设需要处理10次因此每次处理五百万的数据thread_pool tp(5); // 假设传入的参数是线程池中的线程个数int ret[10]; // 存放10次结果for (int i 0. j 1; i 10; i, j MAX_N) {tp.add_task(worker, j, j MAX_N - 1, ref(ret[i])); // ref是用来传入引用的}tp.stop(); // 停止线程池的运转int ans 0; // 计算出结果for (int i 0; i 10; i) {ans ret[i];}cout ans ans endl;return 0;
}任务类的实现
明确一下目的 实现一个类第一个参数是一个函数地址后面为变参列表该类会将函数与参数打包成一个新的函数作为任务队列中的一个元素当空闲线程将其取出之后可以执行这个新的函数。
这个需要用到模板
template typename FUNC_T, typename ...ARGS
class Task {Task(FUNC_T func, ARGS... args) {...}
private:...
};绑定之后我们需要一个变量来存放这个函数因此需要添加一个函数指针对象 functionvoid() 使用 bind 函数进行绑定。 在给bind函数传入参数列表时需要维持左右值原型因此需要工具类 std::forwardARGS(args)... 来解析参数类型。
template typename FUNC_T, typename ...ARGS
class Task {Task(FUNC_T func, ARGS... args) {this-_func bind(functionn, std::forwardARGS(args)...);}
private:std::functionvoid() _func;
};最后需要一个方法来运行这个函数 别忘了析构函数。
template typename FUNC_T, typename ...ARGS
class Task {Task(FUNC_T func, ARGS... args) {this-_func bind(functionn, std::forwardARGS(args)...);}~Task() {delete _func;}void run() {_func();return ;}
private:std::functionvoid() _func;
};线程池类的实现
根据一开始的测试数据发现线程池的操作对外就支持两个操作
压入任务停止
根据一开始所分析的
存放n个线程对象的空间可以使用 vectorstd::thread 进行管理。标记每一个线程的工作状态的容器这里可以使用 unordered_mapstd::thread::id, bool来进行管理。存放任务的队列等等…在设计的过程中会发现
我们可以先将成员和已知的方法写上
template typename FUNC_T, typename ...ARGS
class thread_pool {
public:thread_pool() {}template typename FUNC_T, typename ...ARGSvoid add_task(FUNC_T func, ARGS... args) {...}void stop() {...}private:std::vectorstd::thread * _threads;unordered_mapstd::thread::id, bool _running;std::queueTask * _tasks;
};线程池构造函数
先来尝试完善一下构造函数我们使用参数来控制线程池中的线程个数默认线程数量我们可以设置成为1
创建出的线程空间放在堆区因此使用 new 关键字来创建
thread_pool(int n 1) {for(int i 0; i n; i) {_threads[i] new thread(thread_pool::worker, this); // 别忘了内部方法的第一个隐藏参数this传入进去}
}线程池入口函数
这个时候我们发现由于thread的构造函数需要传入一个需要运行的函数因此发现又多了一个函数就是工作函数 worker 简单来说这个函数的功能规定了所有线程的行为——从队列中取出任务并执行。 在工作函数内部我们需要将该线程 id 记录下来表示他是否存活然后不断判断这个线程是否存活如果存活就继续等待队列中的任务
这个工作函数的作用就是不断检查队列中是否有可以取出的任务然后执行。
void worker() {auto id this_thread::get_id();_running[id] true; // 表示这个线程被记录下来在运行状态while (_running[id]) {Task *t get_task(); // 从队列中取任务这里又诞生出一个新函数t-run(); // 运行任务delete t;}
}
队列中取任务
可以发现又有了新的函数需求就是从队列中取出一个任务。
这个函数并不对外表现所以应该设置为私有成员方法。
主要逻辑就是检查队列头部是否有任务对象如果有的话就返回这个任务的地址。 由于队列是临界资源所以需要上锁此时不免又多了两个成员变量 std::mutex m_mutex; std::condition_variable m_cond;
Task *get_task() {unique_taskmutex (m_mutex); // 上锁while (_tasks.empty()) { // 如果队列为空则释放锁并等待队列被放入任务的条件m_cond.wait(lock)}Task *t _tasks.front();_tasks.pop();return t;
}这样一来我们就实现了线程的初始化以及任务的取出。
接下来还剩下任务的压入这个操作由 add_task() 实现因此我们先来实现这个函数
添加任务函数
由于他也是访问临界资源因此也需要上锁同时在添加成功之后释放一个信号。
同样的这个函数需要在外部调用因此设置成为共有成员方法
template typename FUNC_T, typename ...ARGS
void add_task(FUNC_T func, ARGS... args) {_tasks.push(new Task(func, std:forwardARGS(args)...));lock.unlock();m_cond.notify_one();return ;
}线程池终止函数
线程不再运行之后可以选择终止他们来节省计算机资源因此这个函数是必不可少的主要的操作方式如下我们想队列中压入等于同于线程数量的特殊任务这个特殊任务会把线程标记为非活动的然后后等到他们全部执行完任务后再依次释放掉他们的资源。
void stop() {for (int i 0; i _thread.size(); i) {this-add_task(thread_pool::stop_running, this); // 毒药方法}for(int i 0; i _thread_size(); i) {_threads[i]-join();}for(int i 0; i _thread.size(); i) {delete _threads[i];_threads[i] nullptr;}return ;
}其中涉及到了 stop_running() 这个毒药方法这个方法只在函数内部调用因此我们把他设计成为私有成员方法。
这个函数的逻辑就是将当前线程标记为非活动的状态。
void stop_running() {auto id this_thread::get_id();_running[id] false;return ;
}到此为止线程池的大部分功能就设计的差不多了之后我又进行了一下细微的调整相信读者应该自己也能读懂这里就不过多解释了。
源码
#include condition_variable
#include iostream
#include algorithm
#include vector
#include queue
#include stack
#include set
#include map
#include cstdio
#include cstdlib
#include ctype.h
#include cmath
#include string
#include sstream
#include functional
#include thread
#include mutex
#include condition_variable#define TEST_BEGINS(x) namespace x {
#define TEST_ENDS(x) } // end of namespace x
using namespace std;TEST_BEGINS(thread_pool)bool is_prime(int n) {for (int i 2; i * i n; i) {if (n % i 0) return false;}return true;
}int prime_count(int start, int end) {int ans 0;for (int i start; i end; i) {ans is_prime(i);}return ans;
}// 多线程入口函数
void worker(int l, int r, int ret) {cout this_thread::get_id() : begin endl;ret prime_count(l, r);cout this_thread::get_id() : done endl;return ;
}class Task {
public:template typename FUNC_T, typename ...ARGSTask(FUNC_T function, ARGS ...args) {this-func bind(function, std::forwardARGS(args)...);}void run() {func();return ;}
private:functionvoid() func;
};class thread_pool {
public:thread_pool(int n 1) : _thread_size(n), _threads(n), starting(false) {this-start();}~thread_pool() {this-stop();while (!_tasks.empty()) {delete _tasks.front();_tasks.pop();}return ;}/** 入口函数不断从队列中取任务然后运行最后释放资源。*/void worker() {auto id this_thread::get_id();_running[id] true;while (_running[id]) {Task *t get_task();t-run();delete t;}return ;}void start() {if (starting true) return ;for (int i 0; i _thread_size; i) {_threads[i] new thread(thread_pool::worker, this);}starting true; // 标记线程池运行return ;}void stop() {if (starting false) return ;for (int i 0; i _threads.size(); i) {this-add_task(thread_pool::stop_running, this);}for (int i 0; i _threads.size(); i) {_threads[i]-join();}for (int i 0; i _threads.size(); i) {delete _threads[i];_threads[i] nullptr;}starting false;return ;}template typename FUNC_T, typename ...ARGSvoid add_task(FUNC_T func, ARGS... args) {unique_lockmutex lock(m_mutex);_tasks.push(new Task(func, std::forwardARGS(args)...));lock.unlock();m_cond.notify_one();return ;}private:Task *get_task() {unique_lockmutex lock(m_mutex);while (_tasks.empty()) { // 避免虚假唤醒m_cond.wait(lock);}Task *t _tasks.front();_tasks.pop();return t;}void stop_running() {auto id this_thread::get_id();_running[id] false; // 毒药方法return ;}bool starting;int _thread_size; // 记录线程个数std::mutex m_mutex; // 互斥锁std::condition_variable m_cond; // 条件变量vectorthread * _threads; // 线程区unordered_mapstd::thread::id, bool _running; // 线程活动标记 queueTask * _tasks; // 任务队列
};int main() {#define MAX_N 5000000thread_pool tp(10);int ret[10];for (int i 0, j 1; i 10; i, j batch) {tp.add_task(worker, j, j MAX_N - 1, ref(ret[i]));}tp.stop();int ans 0;for (auto x : ret) ans x;cout ans endl;return 0;
}TEST_ENDS(thread_pool)int main() {thread_pool::main();return 0;
}运行结果