文档章节

[直播一揽子]一个简单的无所队列的C实现

拉风的道长
 拉风的道长
发布于 2016/11/14 16:54
字数 552
阅读 67
收藏 1
/*
 * Copyright (C) 2014 Yuzhong Wen<supermartian@gmail.com>
 *
 * Distributed under terms of the MIT license.
 */

#ifndef PKT_QUEUE_H
#define PKT_QUEUE_H

#include <pthread.h>

#ifdef __cplusplus
extern "C" {
#endif

/*
 * Here we implement a lock-free queue for buffering the
 * packets that come out from multiple filters.
 *
 * */

struct queue_root {
	struct queue_node *head;
	struct queue_node *tail;
	volatile unsigned int size;
};

struct queue_node {
	void *n;
	struct queue_node *next;
};

void init_queue(struct queue_root **root);
int queue_add(struct queue_root *root, void *val);
void *queue_get(struct queue_root *root);

#ifdef __cplusplus
}
#endif

#endif /* !PKT_QUEUE_H */

上面是头文件(pkt_queuq.h)。

下面是实现文件:(pkt_queuq.c)

/*
 * Copyright (C) 2014 Yuzhong Wen<supermartian@gmail.com>
 *
 * Distributed under terms of the MIT license.
 */

#include <memory.h>
#include <stdio.h>
#include <stdlib.h>

#include "pkt_queue.h"

void init_queue(struct queue_root **root)
{
	*root = (struct queue_root*) malloc(sizeof(struct queue_root));
    if (root == NULL) {
        printf("malloc failed");
        exit(1);
    }
	(*root)->head = (struct queue_node*) malloc(sizeof(struct queue_node)); /* Sentinel node */
	(*root)->tail = (*root)->head;
	(*root)->head->n = NULL;
	(*root)->head->next = NULL;
}

int queue_add(struct queue_root *root, void *val)
{
	struct queue_node *n;
	struct queue_node *node = (struct queue_node *)malloc(sizeof(struct queue_node));
	node->n = val; 
	node->next = NULL;
	while (1) {
		n = root->tail;
		if (__sync_bool_compare_and_swap(&(n->next), NULL, node)) {
			break;
		} else {
			__sync_bool_compare_and_swap(&(root->tail), n, n->next);
		}
	}
	__sync_bool_compare_and_swap(&(root->tail), n, node);

	return 1;
}

void *queue_get(struct queue_root *root)
{
	struct queue_node *n;
	void *val;
	while (1) {
		n = root->head;
		if (n->next == NULL) {
            return NULL;
		}

		if (__sync_bool_compare_and_swap(&(root->head), n, n->next)) {
			break;
		}
	}
	val = (void *) n->next->n;
	free(n);
	return val;
}

 

还有一个测试文件(可要可不要):(test.c)

#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/time.h>
#include <time.h>

#include "pkt_queue.h"
#define CONSUMER 1
#define PRODUCER 4 
#define TEST_NUM 10000000

struct queue_root *queue;
volatile int total = TEST_NUM;
int complete;

void *producer_thread(void *para)
{
    int c = TEST_NUM / PRODUCER;
    for (; c > 0; c--)
    {
        int *val = (int *)malloc(sizeof(int));
        *val = random();
        queue_add(queue, val);
        /*
         *printf("Adding value %d\n", *val);
         */
    }

    complete++;
    return NULL;
}

void *consumer_thread(void *para)
{
     while (total) {
        int *val;
        val = (int *) queue_get(queue);
        if (val != NULL) {
            /*
             *printf("Getting value %d, %d left\n", *val, queue->size);
             */
            __sync_fetch_and_sub(&total, 1);
        }
     }

    return NULL;
}

int main()
{
    int p, c;
    complete = 0;
    pthread_attr_t attr;
    pthread_t threads[200];
    struct timeval then, now;
    gettimeofday(&then, NULL);
    srandom((unsigned int)then.tv_usec);

    init_queue(&queue);
    /* Now create some threads */
    pthread_attr_init(&attr);

    for (p = 0; p < PRODUCER; p++) {
        pthread_create(&threads[p], &attr, producer_thread, NULL);
        pthread_join(threads[p], NULL);
    }
    for (c = 0; c < CONSUMER; c++) {
        pthread_create(&threads[p + c], &attr, consumer_thread, NULL);
        pthread_join(threads[p + c], NULL);
    }

    gettimeofday(&now, NULL);

    printf("Executions in %.3g seconds\n", now.tv_sec - then.tv_sec + 1e-6 * (now.tv_usec - then.tv_usec));

    return 0;
}

 

© 著作权归作者所有

共有 人打赏支持
拉风的道长
粉丝 54
博文 110
码字总数 55629
作品 0
昌平
程序员
私信 提问
[直播一揽子]初期调研

这几天在调研直播的技术。虽然现在有很多“开源”的SDK,或者各个厂家的SDK。但是还是想自己去调研一下整个的直播流程/技术,并且通过代码去实现一套这样的功能。 整体规划: 看网上的文章介...

拉风的道长
2016/07/10
89
0
关于程序性能优化基础的一些个人总结

性能点: I/O,系统调用,并发/锁,内存分配,内存拷贝,函数调用消耗,编译优化,算法 I/O性能优化: I/O性能主要耗费点:系统调用,磁盘读写,网络通讯等 优化点:减少系统调用次数,减少磁...

先进哥
2014/07/31
0
0
每秒高达千万分发,如何应对直播互动平台中海量消息挑战?

由于直播平台的特点,对系统功能设计的可靠性要求更高,同时,如何在直播火热的当下快速实现直播平台的构建,成为很多企业的现实需求。本文主要分享融云直播互动系统的设计与实践,详细介绍礼...

雪夜凋零
2018/06/26
0
0
AbstractQueuedSynchronizer浅析——同步

想研究并发编程快一年了,一直断断续续,没有重点,无所收获。写此文,以明志! LockSupport,以后要重点看一下,在parkAndCheckInterrupt用到 conditionObject doReleaseShared 本文主要讲独...

锦语冰
2016/11/25
3
0
开源纯C日志函数库iLOG3快速入门(八、如果你喜欢简单日志函数甚于日志函数库)

开源纯C日志函数库iLOG3快速入门(八、如果你喜欢简单日志函数甚于日志函数库) 很多网友来信坚持表达了在项目中应使用简单日志函数,而不喜欢日志函数库,我与之反复争论无果,不过话说回来...

calvinwilliams
2014/07/20
0
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周四乱弹 —— Im fine

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @LuckyXu:分享戴荃的单曲《小荃拳之歌》: 手机党少年们想听歌,请使劲儿戳(这里) 今天遇到倒霉事了, @ FalconChen :电梯宕机了我靠 但是...

小小编辑
20分钟前
15
4
【转载】uclibc和glibc的差别

转载自:http://blog.163.com/huangnan0727@126/blog/static/30626184201042022011225/ CC的标准库,就是glibc这个库,里面有GCC各种标准函数的实现,还有各种unix系的函数在里面。 当初创建...

shzwork
29分钟前
1
0
关于360插件化Replugin Activity动态修改父类的字节码操作

近期在接入360插件化方案Replugin时,发现出现崩溃情况。 大概崩溃内容如下: aused by: java.lang.ClassNotFoundException: Didn't find class "x.x.x.xActivity" on path: 我自己在插件代码......

Gemini-Lin
今天
1
0
mybatis缓存的装饰器模式

一般在开发生产中,对于新需求的实现,我们一般会有两种方式来处理,一种是直接修改已有组件的代码,另一种是使用继承方式。第一种显然会破坏已有组件的稳定性。第二种,会导致大量子类的出现...

算法之名
昨天
21
0
单元测试

右键方法 Go To --> Test,简便快速生成测试方法。 相关注解 @RunWith(SpringRunner.class) 表示要在测试环境中跑,底层实现是 jUnit测试工具。 @SpringBootTest 表示启动整个 Spring工程 @A...

imbiao
昨天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部