// // thread_pool.hpp // // exercise solution - chapter 7 // modern cpp tutorial // // created by changkun at changkun.de // https://github.com/changkun/modern-cpp-tutorial/ // #ifndef THREAD_POOL_H #define THREAD_POOL_H #include // std::vector #include // std::queue #include // std::make_shared #include // std::thread #include // std::mutex, std::unique_lock #include // std::condition_variable #include // std::future, std::packaged_task #include // std::function, std::bind #include // std::runtime_error #include // std::move, std::forward class ThreadPool { public: // initialize the number of concurrency threads ThreadPool(size_t); // enqueue new thread task template decltype(auto) enqueue(F&& f, Args&&... args); // destroy thread pool and all created threads ~ThreadPool(); private: // thread list, stores all threads std::vector< std::thread > workers; // queue task, the type of queue elements are functions with void return type std::queue< std::function > tasks; // for synchonization std::mutex queue_mutex; // std::condition_variable is a new feature from c++11, // it's a synchronization primitives. it can be used // to block a thread or threads at the same time until // all of them modified condition_variable. std::condition_variable condition; bool stop; }; // constructor initialize a fixed size of worker inline ThreadPool::ThreadPool(size_t threads): stop(false) { // initialize worker for(size_t i = 0;i task; // critical section { // get mutex std::unique_lock lock(this->queue_mutex); // block current thread this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); }); // return if queue empty and task finished if(this->stop && this->tasks.empty()) return; // otherwise execute the first element of queue task = std::move(this->tasks.front()); this->tasks.pop(); } // execution task(); } } ); } // Enqueue a new thread // use variadic templates and tail return type template decltype(auto) ThreadPool::enqueue(F&& f, Args&&... args) { // deduce return type using return_type = typename std::result_of::type; // fetch task auto task = std::make_shared>( std::bind(std::forward(f), std::forward(args)...) ); std::future res = task->get_future(); // critical section { std::unique_lock lock(queue_mutex); // avoid add new thread if theadpool is destroyed if(stop) throw std::runtime_error("enqueue on stopped ThreadPool"); // add thread to queue tasks.emplace([task]{ (*task)(); }); } // notify a wait thread condition.notify_one(); return res; } // destroy everything inline ThreadPool::~ThreadPool() { // critical section { std::unique_lock lock(queue_mutex); stop = true; } // wake up all threads condition.notify_all(); // let all processes into synchronous execution, use c++11 new for-loop: for(value:values) for(std::thread &worker: workers) worker.join(); } #endif