如何做资源论坛网站,买2g 空间做下载网站,软件项目管理书籍推荐,北京网聘信息技术有限公司概述
封装代码仓库#xff1a; https://gitee.com/liudegui/my_thread
尝试封装一个基于C11的多线程控制与调度类#xff0c;适配QNX和Linux平台#xff0c;且代码符合Misra标准。它提供了以下主要功能#xff1a;
线程的创建与销毁管理。线程的优先级调度。线程的CPU亲…概述
封装代码仓库 https://gitee.com/liudegui/my_thread
尝试封装一个基于C11的多线程控制与调度类适配QNX和Linux平台且代码符合Misra标准。它提供了以下主要功能
线程的创建与销毁管理。线程的优先级调度。线程的CPU亲和性设置。线程的等待与唤醒机制。
类的结构及主要成员函数如下
构造函数与析构函数负责初始化线程相关参数并在析构时确保线程安全退出。thread_start()启动线程创建指定数量的线程实例。thread_shutdown()关闭线程释放资源并等待所有线程退出。timed_wait()线程的超时等待机制防止线程无限等待。set_self_thread_priority()设置当前线程的优先级。
实现细节
MyThread类内部实现主要涉及以下几个方面
使用std::thread来创建线程实例。利用std::mutex和std::condition_variable来实现线程同步。使用pthread库来设置线程的优先级和CPU亲和性。
/**
* my_thread.h
*/#ifndef UTILS_MY_THREAD_H_
#define UTILS_MY_THREAD_H_#include atomic
#include cstdint
#include functional
#include mutex
#include string
#include thread
#include vector
#include condition_variable
#include memorynamespace MyTest {
class MyThread {public:explicit MyThread(const std::string in_name, int32_t in_priority, uint32_t in_worker_num,const std::functionvoid() in_func, int32_t in_cpusetsize, const cpu_set_t*const in_cpuset);~MyThread();void thread_start();void thread_shutdown();bool has_shutdown() const {return is_shutdown_;}void timed_wait(uint32_t useconds);static void set_self_thread_priority(int32_t in_priority);private:static void my_thread_func_(MyThread* thread_ins);private:std::string thread_name_;int32_t thread_priority_;uint32_t thread_worker_num_;std::vectorstd::shared_ptrstd::thread my_threads_;std::functionvoid() func_main_;int32_t thread_cpusetsize_;const cpu_set_t* thread_cpuset_ nullptr;std::mutex thread_mutex_;std::condition_variable thread_cond_;bool is_shutdown_;
};} // namespace MyTest#endif // UTILS_MY_THREAD_H_/**
* my_thread.cpp
*/#include my_thread.h#include ctime
#include sstream
#ifndef _QNX_
#include sys/syscall.h
#endif
#include sys/types.h
#include fake_log.hnamespace MyTest {MyThread::MyThread(const std::string in_name, int32_t in_priority, uint32_t in_worker_num,const std::functionvoid() in_func, int32_t in_cpusetsize, const cpu_set_t* const in_cpuset): thread_name_(in_name),thread_priority_(in_priority),thread_worker_num_(in_worker_num),func_main_(in_func),thread_cpusetsize_(in_cpusetsize),thread_cpuset_(in_cpuset),is_shutdown_(true) {
}MyThread::~MyThread() {if (!is_shutdown_) {thread_shutdown();}
}void MyThread::my_thread_func_(MyThread* thread_ins) {if (thread_ins nullptr) {MY_LOG_ERROR(thread_ins is nullptr);} else {
#ifndef _QNX_if ((thread_ins-thread_cpuset_ ! nullptr) (thread_ins-thread_cpusetsize_ 0)) {const pthread_t tid pthread_self();const auto np_return pthread_setaffinity_np(tid, static_castsize_t(thread_ins-thread_cpusetsize_), thread_ins-thread_cpuset_);if (np_return ! 0) {MY_LOG_ERROR(pthread_setaffinity_np failed. return%d, np_return);}}
#else// qnx ...
#endifstd::stringstream thread_id_stream;thread_id_stream std::this_thread::get_id();MY_LOG_INFO(thread %s starts. pid%s target_priority%d, thread_ins-thread_name_.c_str(),thread_id_stream.str().c_str(), thread_ins-thread_priority_);MyThread::set_self_thread_priority(thread_ins-thread_priority_);try {thread_ins-func_main_();} catch (...) {MY_LOG_ERROR(Exception occurred in thread %s, thread_ins-thread_name_.c_str());}}
}void MyThread::thread_start() {std::lock_guardstd::mutex lck(thread_mutex_);is_shutdown_ false;my_threads_.resize(thread_worker_num_);uint32_t thread_idx 0;for (; thread_idx thread_worker_num_; thread_idx) {my_threads_[thread_idx] std::make_sharedstd::thread(my_thread_func_, this);}
}void MyThread::thread_shutdown() {std::lock_guardstd::mutex lck(thread_mutex_);if (!is_shutdown_) {is_shutdown_ true;thread_cond_.notify_all();for (const auto my_t : my_threads_) {if (my_t-joinable()) {my_t-join();}}}
}void MyThread::timed_wait(uint32_t useconds) {std::unique_lockstd::mutex lck(thread_mutex_);const auto timeout_val std::chrono::microseconds(useconds);do {const auto return_val thread_cond_.wait_for(lck, timeout_val);if (return_val std::cv_status::timeout) {MY_LOG_ERROR(thread timed_wait timeout);}} while (false);
}void MyThread::set_self_thread_priority(int32_t in_priority) {bool go_on true;struct sched_param params;struct sched_param current_params;int32_t set_policy{0};int32_t current_policy{0};const pthread_t this_thread pthread_self();int32_t status_ret pthread_getschedparam(this_thread, current_policy, current_params);if (status_ret ! 0) {MY_LOG_ERROR(getschedparam %d, status_ret);go_on false;} else {MY_LOG_DEBUG(thread current priority is %d (%d), target is %d, current_params.sched_priority, current_policy,in_priority); // MY_LOG_TRACEif (in_priority 0) {go_on false;} else if (in_priority 0) {set_policy SCHED_FIFO;params.sched_priority current_params.sched_priority in_priority;} else {set_policy SCHED_IDLE;params.sched_priority 0;}}if (go_on) {if (params.sched_priority 99) {params.sched_priority 99;}if (params.sched_priority 0) {params.sched_priority 0;}status_ret pthread_setschedparam(this_thread, set_policy, params);if (status_ret ! 0) {MY_LOG_WARN(setschedparam(%d), params.sched_priority);go_on false;}}if (go_on) {status_ret pthread_getschedparam(this_thread, current_policy, current_params);if (status_ret ! 0) {MY_LOG_ERROR(getschedparam 2 %d, status_ret);} else {if (current_params.sched_priority ! params.sched_priority) {MY_LOG_ERROR(current priority%d (%d), target is %d, current_params.sched_priority, current_policy,params.sched_priority);} else {MY_LOG_INFO(set thread priority to %d (%d), current_params.sched_priority, current_policy);}}}
}} // namespace MyTest测试代码
#include my_thread.h#include chrono
#include iostream
#include stringnamespace MyTest {
static void worker_function() {std::this_thread::sleep_for(std::chrono::milliseconds(100));std::cout Thread ID: std::this_thread::get_id() is working. std::endl;
}
} // namespace MyTestint32_t main() {int32_t main_res{0};try {const uint32_t num_threads 4;cpu_set_t cpuset1;CPU_ZERO(cpuset1);CPU_SET(0, cpuset1);std::vectorstd::shared_ptrMyTest::MyThread test_threads;uint32_t thread_idx 0;for (; thread_idx num_threads; thread_idx) {const std::string thread_name1 std::string(Thread_) std::to_string(thread_idx);const std::shared_ptrMyTest::MyThread my_t std::make_sharedMyTest::MyThread(thread_name1, 1, 1, MyTest::worker_function, sizeof(cpuset1), cpuset1);test_threads.push_back(my_t);}for (const auto my_t : test_threads) {my_t-thread_start();}std::this_thread::sleep_for(std::chrono::seconds(2));for (const auto my_t : test_threads) {my_t-thread_shutdown();}for (const auto my_t : test_threads) {while (!my_t-has_shutdown()) {// std::this_thread::yield();my_t-timed_wait(100);}}std::cout All threads have been shutdown. std::endl;} catch (...) {std::cerr Exception occurred std::endl;main_res 1;}return main_res;
}