文档章节

linux producer consumer sources

romalin99
 romalin99
发布于 2013/07/07 14:40
字数 1178
阅读 63
收藏 0

除了提供互斥之外,信号量的另外一个作用是调度对共享资源的访问。在这种情景中,一个线程用信号量来通知另一个线程,程序状态中的某个条件已经为真了。

下图给出了生产者-------消费者问题。生产者和消费者线程共享一个有n个槽的有限缓冲区。

生产者线程反复的生成新的项目(item),并把它们插入到缓冲区中。消费者线程不断的从缓冲区中取出这些项目,然后消费( 使用它们)
 

 

因为插入和取出项目都涉及更新共享变量,所以我们必须保证对缓冲区的访问时互斥的。

但是只保证互斥访问还是不够的,我们还需要调度  对缓冲区的访问 (如果缓冲区是满的即没有可用的槽,那么生产者必须等待直到有一个空的槽变为可用为止。与之相似,若缓冲区是空的即没有可取用的槽, 那么消费者必须等待直到有一个项目变为可用。

接下来我们为生产者----消费者  定义一个结构体: 来存储数据

Typedef struct {

       Int * buf;      //  item存放在一个动态分配的n项整数buf

       Int n;         //

       Int front;     //  索引值记录第一项 和最后一项

       Int rear;     //

       Sem_t mutex;  //  三个信号量同步对缓冲区的访问。提供互斥的缓冲区的访问。

       Sem_t slots; //   分别记录空槽和可用item的数量

       Sem_t items; //

}sbuf_t;

/*

Buffer array

Maximum number of slots

Buf[(front+1)%n]  is first item

Buf[rear%n] is last item

Protects accessed to buf

Counts available slots

Counts available items

*/

此结构体包含使用的有限缓冲区

我们使用一个函数sbuf_init来初始化此缓冲区,并设置front rear 表示一个空的缓冲区,并为三个信号量赋予初始值。使用sbuf_deinit函数来删除缓冲区(当程序使用完之后)

Sbuf_insert函数等待一个可用的槽,对互斥锁加锁,添加项目item。对互斥锁解锁,然后宣布有一个新的item可用。

Sbuf_remove函数式与上一个函数对应的。在等待一个可用的缓冲区之后,对互斥锁加锁,从缓冲区的前面取出该项目,对互斥锁解锁。然后发信号通知一个新的槽可供使用。

 

 

void  sbuf_init(sbuf_t *sp,int n)

{

       If((Sp->buf=calloc(n,sizeof(int))==NULL)

              Printf(“calloc  错误\n”);

       Sp->n=n; //Buffer holds max of n items

       Sp->front=sp->rear=0;  //Empty buffer iff front==rear

       Sem_init(&sp->mutex,0,1);//binary semaphore for locking

       Sem_init(&sp->slots,0,n); //Initially ,buf has n empty slots

       Sem_init(&sp->items,0,0); //initially, buf has zero data items

}

Void sbuf_deinit(sbuf_t *sp)

{

       Free(sp->buf);

}

Void sbuf_insert(sbuf_t *sp,int item)

{

       Sem_wait(&sp->slots);//wait for available slot

       Sem_wait(&sp->mutex);//lock the buffer

       Sp->buf[(++sp->rear)%(sp->n)]=item;//insert the item

       Sem_post(&sp->mutex);//unlock the buffer

       Sem_post(&sp->items);//announce available item

}

 

Int sbuf_remove(sbuf_t *sp)

{

       Int item;

       Sem_wait(&sp->items);//wait for available item

       Sem_wait(&sp->mutex);//lock the buffer

       Item=sp->buf[(++sp->front)%(sp->n)];// remove the item

       Sem_post(&sp->mutex);

       Sem_post(&sp->slots);

       Return item;

}

#include  <stdio.h>

#include  <stdlib.h>
#include  <string.h>
#include  <pthread.h>
#include  <unistd.h>
#include  <semaphore.h>

typedef struct {
    int * buf;      //  item存放在一个动态分配的n项整数buf中
    int n;         //
    int front;     //  索引值记录第一项 和最后一项
    int rear;     //
    sem_t mutex;  //  三个信号量同步对缓冲区的访问。提供互斥的缓冲区的访问。
    sem_t slots; //   分别记录空槽和可用item的数量
    sem_t items; //
}sbuf_t;
sbuf_t g_sp;
void  sbuf_init(sbuf_t *sp,int n)
{
    sp->buf=(int*)calloc(n,sizeof(int));
    if( NULL == sp->buf)
        printf("calloc  错误\n");
    sp->n=n; //Buffer holds max of n items
    sp->front=sp->rear=0;  //Empty buffer iff front==rear
    sem_init(&sp->mutex,0,1);//binary semaphore for locking
    sem_init(&sp->slots,0,n); //Initially ,buf has n empty slots
    sem_init(&sp->items,0,0); //initially, buf has zero data items
}
void sbuf_deinit(sbuf_t *sp)
{
    free(sp->buf);
}
void sbuf_insert(sbuf_t *sp,int item)
{
    sem_wait(&sp->slots);//wait for available slot
    sem_wait(&sp->mutex);//lock the buffer
    sp->buf[(++sp->rear)%(sp->n)]=item;//insert the item
    sem_post(&sp->mutex);//unlock the buffer
    sem_post(&sp->items);//announce available item
}

int sbuf_remove(sbuf_t *sp)
{
    int item;
    sem_wait(&sp->items);//wait for available item
    sem_wait(&sp->mutex);//lock the buffer
    item=sp->buf[(++sp->front)%(sp->n)];// remove the item
    sem_post(&sp->mutex);
    sem_post(&sp->slots);
    return item;
}

void* producer(void* arg)
{
    int i,item,index;
    //index 是线程编号
    index=*(int*)arg;
    for(int i=0;i<g_sp.n;i++)
    {
        item =(i*2+1);
        sbuf_insert(&g_sp,item);
        printf("[P%d] Producing %d ...\n",index,item);
        fflush(stdout);
        sleep(1);
    }
}
void* consumer(void* arg)
{
    int i,item,index;
    index=*(int*)arg;
    for(int i=0;i<g_sp.n;i++)
    {
        item =sbuf_remove(&g_sp);
        printf("------>[C%d] Consuming %d ...\n",index,item);
        sleep(1);
    }
}

int main(int argc,char* argv[])
{
    sbuf_init(&g_sp,10);
    int NP=3,NC=3;
    pthread_t idp[NP],idc[NC];
    //1
    for(int i=0;i<NP;i++)
    {
            pthread_create(&idp[i],NULL,producer,(void*)&i);
    }
    //2
    for(int i=0;i<NC;i++)
    {
            pthread_create(&idc[i],NULL,consumer,(void*)&i);
    }
    //3
    for(int i=0;i<NP;i++)
        pthread_join(idp[i],NULL);
    for(int i=0;i<NC;i++)
        pthread_join(idc[i],NULL);
        
    sbuf_deinit(&g_sp);
    return 0;
}

© 著作权归作者所有

共有 人打赏支持
romalin99
粉丝 8
博文 153
码字总数 76769
作品 0
浦东
高级程序员
circular buffer in Linux kernel

/ See Documentation/circular-buffers.txt for more information. */ #ifndef LINUXCIRCBUFH #define LINUXCIRCBUFH 1 struct circ_buf { char *buf; int head; int tail; }; / Return coun......

ChenQi
2012/03/23
0
0
Using Kafka with Flume

这个文档是 Cloudera Distribution of Apache Kafka 1.3.x. 其他版本的文档在Cloudera Documentation. Using Kafka with Flume 在CDH 5.2.0 及更高的版本中, Flume 包含一个Kafka source an......

晨磊
2015/08/29
852
0
flume传递到kafka的消息,consumer接收不到

做的是flume+kafka,把flume接收到的消息传递到kafka,后台程序监控消息队列,取出flume传递过来的消息。现在的问题是: 1、通过Producer生产的消息,Consumer程序能接收到,但是flume生产的消...

wanghu1983
2017/05/10
241
0
Apache Samza 0.14.0 发布,分布式流处理框架

Apache Samza 是一个分布式流处理框架,专用于实时数据的处理,目前已经在几个大公司(包括 LinkedIn、Netflix、Uber、Slack、Redfin、TripAdvisor)实际生产中使用。 0.14.0 版本包含以下备...

王练
01/07
450
0
协程技术研究

先记住一句话,子程序是协程的一种特例。 Python yield Python 中的 yield 保存一个 generator 函数的状态,generator 是一个特殊类型的迭代器(iterator) 运行结果 yield 相当于 return,主...

兔之
2016/11/24
114
0

没有更多内容

加载失败,请刷新页面

加载更多

【大福利】极客时间专栏返现二维码大汇总

我已经购买了如下专栏,大家通过我的二维码你可以获得一定额度的返现! 然后,再给大家来个福利,只要你通过我的二维码购买,并且关注了【飞鱼说编程】公众号,可以加我微信或者私聊我,我再...

飞鱼说编程
今天
1
0
Spring5对比Spring3.2源码之容器的基本实现

最近看了《Spring源码深度解析》,该书是基于Spring3.2版本的,其中关于第二章容器的基本实现部分,目前spring5的实现方式已有较大改变。 Spring3.2的实现: public void testSimpleLoad(){...

Ilike_Java
今天
1
0
【王阳明心学语录】-001

1.“破山中贼易,破心中贼难。” 2.“夫万事万物之理不外于吾心。” 3.“心即理也。”“心外无理,心外无物,心外无事。” 4.“人心之得其正者即道心;道心之失其正者即人心。” 5.“无...

卯金刀GG
今天
2
0
OSChina 周三乱弹 —— 我们无法成为野兽

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @ _刚刚好: 霸王洗发水这波很骚 手机党少年们想听歌,请使劲儿戳(这里) hahahahahahh @嘻酱:居然忘了喝水。 让你喝可乐的话, 你准忘不了...

小小编辑
今天
11
0
vm GC 日志 配置及查看

-XX:+PrintGCDetails 打印 gc 日志 -XX:+PrintTenuringDistribution 监控晋升分布 -XX:+PrintGCTimeStamps 包含时间戳 -XX:+printGCDateStamps 包含时间 -Xloggc:<filename> 可以将数据保存为......

Canaan_
昨天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部