文档章节

C++11实现跨平台线程池

初雪之音
 初雪之音
发布于 2016/03/19 11:19
字数 1210
阅读 436
收藏 5

生产者消费者

转自:http://www.cnblogs.com/sanjin/archive/2012/08/09/2629890.html

std::condition_variable的使用方法如下:
·当持有锁(mutex)之后,线程调用wait
·wait解开持有的互斥锁(mutex),阻塞本线程,并将自己加入到唤醒队列中
·当收到通知(notification),该线程从阻塞中恢复,并加入互斥锁队列(mutex queue)
·线程被唤醒之后继续持有锁运行。

以下一个例子转自:
http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html

template<typename Data>  
class concurrent_queue  
{  
private:  
    std::queue<Data> the_queue;    // 任务队列  
    mutable boost::mutex the_mutex;    // mutex类是一个CriticalSection(临界区)封装类  
    boost::condition_variable the_condition_variable;  
public:  
    void push(Data const& data)  
    {  
        boost::mutex::scoped_lock lock(the_mutex);    // lock在构造时自动上锁,保证任务队列的线程安全  
        the_queue.push(data);  
        lock.unlock();    // lock解锁  
        the_condition_variable.notify_one();    // 唤醒被阻塞的线程  
    }  
    bool empty() const 
    {  
        boost::mutex::scoped_lock lock(the_mutex);    // lock构造时自动上锁,析构时自动解锁  
        return the_queue.empty();  
    }  
    bool try_pop(Data& popped_value)  
    {  
        boost::mutex::scoped_lock lock(the_mutex);    // lock构造时自动上锁,析构时自动解锁  
        if(the_queue.empty())  
        {  
            return false;  
        }  
           
        popped_value=the_queue.front();  
        the_queue.pop();  
        return true;  
    }  
    void wait_and_pop(Data& popped_value)  
    {  
        boost::mutex::scoped_lock lock(the_mutex);    // lock构造时自动上锁,析构时自动解锁  
        while(the_queue.empty())  
        {  
            the_condition_variable.wait(lock);    // 阻塞本线程,并添加到唤醒队列中  
        }  
           
        popped_value=the_queue.front();  
        the_queue.pop();  
    }  
};

跨平台线程池

转自:http://www.cnblogs.com/linbc/p/5041648.html

以下一个例子转自:https://github.com/linbc/worker_pool.git

#ifndef _WORKER_POOL_H_    
#define _WORKER_POOL_H_    
  
#include <vector>    
#include <queue>    
#include <thread>    
#include <mutex>    
#include <condition_variable>    
   
template<typename T>    
class WorkerPool    
{    
public:    
	typedef WorkerPool<T> THIS_TYPE;    
	typedef std::function<void(T*)> WorkerProc;    
	typedef std::vector< std::thread* > ThreadVec;    // 线程池任务队列    
	WorkerPool()    
	{		    
		active_ = false;    
	}    
	virtual ~WorkerPool()    
	{    
		for(ThreadVec::iterator it = all_thread_.begin();it != all_thread_.end();++it)    
			delete *it;    
		all_thread_.clear();    
	}    
	void Start(WorkerProc f, int worker_num = 1)    
	{    
		active_ = true;		    
		all_thread_.resize(worker_num);    
		for (int i = 0; i < worker_num;i++ )    
		{    
			all_thread_[i] = new std::thread(std::bind(&THIS_TYPE::consumer,this));    
		}    
		proc_ = f;    
	}    
	//生产者    
	void Push(T *t)    
	{    
		std::unique_lock < std::mutex > lck(mutex_);    // lock构造时自动上锁,析构时自动解锁    
		task_.push(t);    
		cv_.notify_one();    // 若唤醒队列中有阻塞线程,则唤醒它    
	}    
	void Stop()    
	{    
		//等待所有的任务执行完毕    
		mutex_.lock();    
		while (!task_.empty())    
		{	    
			mutex_.unlock();    
			std::this_thread::sleep_for(std::chrono::milliseconds(1000));    
			cv_.notify_one();    
			mutex_.lock();    
		}    
		mutex_.unlock();    
		//关闭连接后,等待线程自动退出    
		active_ = false;    
		cv_.notify_all();    
		for(ThreadVec::iterator it = all_thread_.begin();    
			it != all_thread_.end();++it)    
			(*it)->join();    
	}    
private:    
	//消费者    
	void consumer()    
	{    
		//第一次上锁    
		std::unique_lock < std::mutex > lck(mutex_);    // lock构造时自动上锁    
		while (active_)    
		{    
			//如果是活动的,并且任务为空则一直等待    
			while (active_ && task_.empty())    
				cv_.wait(lck);    // 阻塞本线程,暂时解锁,并加入到唤醒队列中    
			//如果已经停止则退出    
			if(!active_)    
				break;    
			T *quest = task_.front();    // 被唤醒时,重新持有锁上锁,并继续运行    
			task_.pop();    
			//从任务队列取出后该解锁(任务队列锁)了    
			lck.unlock();    
			//执行任务后释放    
			proc_(quest);    
			//delete quest;   //在proc_已经释放该指针了    
			//重新上锁    
			lck.lock();    
		}    
	}    
	std::mutex mutex_;    // 临界区资源,用于锁    
	std::queue<T*> task_;    // 线程池任务队列    
	std::condition_variable cv_;    
	bool active_;    // 线程池是否工作    
	std::vector< std::thread* > all_thread_;    
	WorkerProc proc_;    
};    
#endif
#include "worker_pool.h"    
#include <iostream>    
//为了多耗点cpu,计算斐波那契数列吧    
static int fibonacci(int a)    
{    
	//ASSERT(a > 0);    
	if (a == 1 || a == 2)    
		return 1;    
	return fibonacci(a-1) + fibonacci(a-2);    
}    
//异步计算任务    
struct AsyncCalcQuest    
{    
	AsyncCalcQuest():num(0),result(0)    
	{}    
	//计算需要用到的变量    
	int num;    
	int result;    
};    
//为了测试方便,引入全局变量用于标识线程池已将所有计算完成    
const int TOTAL_COUNT = 1000000;    
int now_count = 0;    
//继承一下线程池类,在子类处理计算完成的业务,在我们这里,只是打印一下计算结果    
class CalcWorkerPool:public WorkerPool<AsyncCalcQuest>    
{    
public:    
	CalcWorkerPool(){}    
	virtual ~CalcWorkerPool()    
	{    
	}    
	//在工人线程中执行    
	void DoWork(AsyncCalcQuest *quest)    
	{    
		quest->result = fibonacci(quest->num);	    
		//并将已完成任务返回到准备回调的列表    
		std::unique_lock<std::mutex > lck(mutex_callbacks_);    
		callbacks_.push_back(quest);    
	}    
	//在主线程执行    
	void DoCallback()    
	{    
		//组回调任务上锁    
		std::unique_lock<std::mutex > lck(mutex_callbacks_);    
		while (!callbacks_.empty())    
		{    
			auto *quest = callbacks_.back();			    
			{//此处为业务代码打印一下吧    
				std::cout << quest->num << " " << quest->result << std::endl;    
				now_count ++;    
			}    
			delete quest;		//TODO:这里如果采用内存池就更好了    
			callbacks_.pop_back();    
		}    
	}    
private:    
	//这里是准备给回调的任务列表    
	std::vector<AsyncCalcQuest*> callbacks_;    
	std::mutex mutex_callbacks_;    
};    
int main()    
{    
	CalcWorkerPool workers;    
	//工厂开工了 8个工人喔    
	workers.Start(std::bind(&CalcWorkerPool::DoWork,&workers,std::placeholders::_1),8);	    
	//开始产生任务了    
	for (int i=0; i<TOTAL_COUNT; i++)    
	{    
		AsyncCalcQuest *quest = new AsyncCalcQuest;    
		quest->num = i%40+1;    
		workers.Push(quest);    
	}    
	while (now_count != TOTAL_COUNT)    
	{    
		workers.DoCallback();    
	}    
	workers.Stop();    
return 0;    
}

C/C++中的static函数

一个例子:http://my.oschina.net/keyven/blog/639828

C++11中的std::function及std::bind

C++11中的std::function:http://www.jellythink.com/archives/771

C++11 std::bind std::function 高级用法:http://blog.csdn.net/eclipser1987/article/details/24406203

C++11之多线程

C++11之多线程(一、标准库的线程封装类Thread和Future):http://blog.poxiao.me/p/multi-threading-in-cpp11-part-1-thread-and-future/

扩展

曾经转载过一篇用Java编程实现的线程池,该例存在一些bug,看完本篇例子后对其进行调整修改并重新更新到git上(见摘要部分):http://my.oschina.net/keyven/blog/311548

© 著作权归作者所有

共有 人打赏支持
初雪之音
粉丝 47
博文 268
码字总数 150009
作品 0
广州
程序员
私信 提问
macwe/thefoxframework

TheFox Cpp Framework 简介 一个C++的基础开发库,包含包括常见的C++常用工具类(file、mutex、semaphore、thread、线程池等)、mysql封装、net-snmp封装、net库、rpc、log库。 现在主要工作...

macwe
2014/09/22
0
0
基于 libevent 开发的 C++ 11 高性能网络服务器--evpp

evpp是一个基于libevent开发的现代化C++11高性能网络服务器,自带TCP/UDP/HTTP等协议的异步非阻塞式的服务器和客户端库。 特性: 现代版的C++11接口 非阻塞异步接口都是C++11的functional/bi...

zieckey
2017/03/06
6K
16
使用C++11封装线程池ThreadPool

读本文之前,请务必阅读: 使用C++11的function/bind组件封装Thread以及回调函数的使用 Linux组件封装(五)一个生产者消费者问题示例 线程池本质上是一个生产者消费者模型,所以请熟悉这篇文...

inevermore
2015/03/19
0
0
c++11 的Thread测试。

//这是最近单的实现,可以做到了线程跨平台 class CThread{public:CThread():str(""){}CThread(string s){str = s;}void start(){thread th(&CThread::run,this);//th.join();th.detach();};......

阳666
2016/04/13
31
0
收藏的博客 -- 高性能Linux服务器

http://zhuanlan.51cto.com/columnlist/shenj/ --- 58沈剑 http://blog.csdn.net/analogouslove --- 范蠡&张小方 http://blog.csdn.net/column/details/15700.html --- teamtalk https://gi......

libaineu2004
2017/08/08
0
0

没有更多内容

加载失败,请刷新页面

加载更多

oh-my-zsh 自定义

GitHub 地址 基于 oh-my-zsh 的自定义配置,增加了一些个人常用插件与皮肤。 采用的是 git submodule 来维护,包括 oh-my-zsh,之所以这么搞,主要是手头有多台 linux 需要维护, 每台机器、...

郁也风
今天
6
0
Docker安装踩坑:E_FAIL 0x80004005的解决

参考 菜鸟教程--Windows Docker 安装 http://www.runoob.com/docker/windows-docker-install.html 官方文档-Install Docker Toolbox on Windows https://docs.docker.com/toolbox/toolbox_in......

karma123
今天
5
0
js垃圾回收机制和引起内存泄漏的操作

JS的垃圾回收机制了解吗? Js具有自动垃圾回收机制。垃圾收集器会按照固定的时间间隔周期性的执行。 JS中最常见的垃圾回收方式是标记清除。 工作原理:是当变量进入环境时,将这个变量标记为“...

Jack088
昨天
17
0
大数据教程(10.1)倒排索引建立

前面博主介绍了sql中join功能的大数据实现,本节将继续为小伙伴们分享倒排索引的建立。 一、需求 在很多项目中,我们需要对我们的文档建立索引(如:论坛帖子);我们需要记录某个词在各个文...

em_aaron
昨天
27
0
"errcode": 41001, "errmsg": "access_token missing hint: [w.ILza05728877!]"

Postman获取微信小程序码的时候报错, errcode: 41001, errmsg: access_token missing hint 查看小程序开发api指南,原来access_token是直接当作parameter的(写在url之后),scene参数一定要...

两广总督bogang
昨天
33
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部