首页 > 代码库 > 线程池
线程池
main.cpp
#include <iostream>#include <vector>#include <chrono>#include <functional>#include "ThreadPool.h"#include <omp.h>class CA{ int a = 1; int b = 2;public: int Max(int a, int b) { return a > b ? a : b; }#if 0 double Max(int a) { return a; }#endif int Sum(int a, int b) { return a + b; }};int main(){ ThreadPool pool(4); std::vector< std::future<int> > results; for (int i = 0; i < 16; ++i) { results.emplace_back( pool.enqueue([i] { // std::cout << "hello " << i << std::endl; // std::this_thread::sleep_for(std::chrono::seconds(1)); return i*i; }) ); } for (auto && result : results){ //std::cout << result.get() << ‘ ‘; std::cout <<"sync"<< ‘\n‘; }//同步:要等这里结束,才能往下执行, std::cout << "~~main~~" << std::endl; //成员函数不能有重名的(重载)。 CA testA; //mem_fn std::future<int> fe = pool.enqueue(std::mem_fn(&CA::Max), testA, 3, 99999); //bind using namespace std::placeholders; std::future<int> fl = pool.enqueue(std::bind(&CA::Sum,&testA,_1,_2),3,99); std::cout << fl.get() << std::endl; std::cout << fe.get() << std::endl; return 0;}
ThreadPool.h
#ifndef THREAD_POOL_H#define THREAD_POOL_H#include <vector>#include <queue>#include <memory>#include <thread>#include <mutex>#include <condition_variable>#include <future>#include <functional>#include <stdexcept>#include <stdio.h>#include <chrono> // std::chrono::seconds//线程池线程空闲自动退出时间间隔 ,5分钟const int THREAD_WAIT_TIME_OUT = 10; //10 sclass ThreadPool {public: ThreadPool(size_t); template<class F, class... Args> auto enqueue(F&& f, Args&&... args) ->//std::future<typename std::result_of<F(Args...)>::type>; //std::future < decltype(f(std::forward<Args>(args...))) > ; std::future < decltype(f(args...)) > ; void threadFun(); ~ThreadPool(); private: // need to keep track of threads so we can join them std::vector< std::thread > workers; // the task queue std::queue< std::function<void()> > tasks; // synchronization std::mutex queue_mutex; std::condition_variable condition; bool stop;};// the constructor just launches some amount of workersinline ThreadPool::ThreadPool(size_t threads) : stop(false){ for (size_t i = 0; i<threads; ++i) workers.emplace_back(&ThreadPool::threadFun, this);}// add new work item to the pooltemplate<class F, class... Args>auto ThreadPool::enqueue(F&& f, Args&&... args)->//std::future<decltype(f(std::forward<Args>(args...)))>//std::future<typename std::result_of<F(Args...)>::type>std::future<decltype(f(args...))>{ // using return_type = typename std::result_of<F(Args...)>::type; using return_type = decltype(f(args...)); auto task = std::make_shared< std::packaged_task<return_type()> >( std::bind(std::forward<F>(f), std::forward<Args>(args)...) ); std::future<return_type> res = task->get_future(); { std::unique_lock<std::mutex> lock(queue_mutex); // don‘t allow enqueueing after stopping the pool if (stop) throw std::runtime_error("enqueue on stopped ThreadPool"); tasks.emplace([task]{ (*task)(); }); //tasks.emplace([task]{ std::move(*task)(); }); } condition.notify_one(); return res;}void ThreadPool::threadFun() { for (;;) { std::function<void()> task; { //std::unique_lock<std::mutex> lock(this->queue_mutex); std::unique_lock<std::mutex> lock(queue_mutex); //当条件为假的时候,才阻塞 //同时为假才阻塞,stop为false ,并且任务为空 this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); }); if (this->stop && this->tasks.empty()) { printf("@@--%d--@@\n", std::this_thread::get_id()); if (this->condition.wait_for(lock, std::chrono::seconds(THREAD_WAIT_TIME_OUT)) == std::cv_status::timeout) // printf("@@--%d--@@\n",std::this_thread::get_id()); break; else continue; } task = std::move(this->tasks.front()); this->tasks.pop(); } task(); }}// the destructor joins all threadsinline ThreadPool::~ThreadPool(){ { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); for (std::thread &worker : workers) worker.join();}#endif
线程池
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。