文档章节

nsqd 源码,写入数据

昏鸦
 昏鸦
发布于 2014/03/27 16:52
字数 375
阅读 120
收藏 1

切记: chanel 有有自己的持久化 队列

topic 也有自己的持久化队列,两个是相互独立

给一个topic put 数据的:

// PutMessage writes to the appropriate incoming message channel

func (t *Topic) PutMessage(msg *nsq.Message) error {

    t.RLock()

    defer t.RUnlock()

    if atomic.LoadInt32(&t.exitFlag) == 1 {

        return errors.New("exiting")

    }

    t.incomingMsgChan <- msg

    atomic.AddUint64(&t.messageCount, 1)

    return nil

}


incomingMsgChan  是:incomingMsgChan:   make(chan *nsq.Message, 1),
        memoryMsgChan:     make(chan *nsq.Message, context.nsqd.options.MemQueueSize),
        
 初始化topic 会执行:
 
 t.waitGroup.Wrap(func() { t.router() })
 t.waitGroup.Wrap(func() { t.messagePump() })
 
 func (t *Topic) router() {
    var msgBuf bytes.Buffer
    for msg := range t.incomingMsgChan { //当有数据的时候执行
        select {
        case t.memoryMsgChan <- msg://memoryMsgChan 可以写入
        default:
            err := WriteMessageToBackend(&msgBuf, msg, t.backend)// 默认持久化到硬盘
            if err != nil {
                log.Printf("ERROR: failed to write message to backend - %s", err.Error())
                // theres not really much we can do at this point, you're certainly
                // going to lose messages...
            }
        }
    }

    log.Printf("TOPIC(%s): closing ... router", t.name)
}
 
 WriteMessageToBackend  会调用:
 func (d *DiskQueue) Put(data []byte) error {
    d.RLock()
    defer d.RUnlock()

    if d.exitFlag == 1 {
        return errors.New("exiting")
    }

    d.writeChan <- data 
    return <-d.writeResponseChan
}
 
 初始化一个:NewDiskQueue 会定时执行:
 func (d *DiskQueue) ioLoop() {
    var dataRead []byte
    var err error
    var count int64
    var r chan []byte

    syncTicker := time.NewTicker(d.syncTimeout)

    for {
        count++
        // dont sync all the time :)
        if count == d.syncEvery {
            count = 0
            d.needSync = true
        }

        if d.needSync {
            err = d.sync()
            if err != nil {
                log.Printf("ERROR: diskqueue(%s) failed to sync - %s", d.name, err.Error())
            }
        }

        if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
            if d.nextReadPos == d.readPos {
                dataRead, err = d.readOne()
                if err != nil {
                    log.Printf("ERROR: reading from diskqueue(%s) at %d of %s - %s",
                        d.name, d.readPos, d.fileName(d.readFileNum), err.Error())
                    d.handleReadError()
                    continue
                }
            }
            r = d.readChan
        } else {
            r = nil
        }

        select {
        // the Go channel spec dictates that nil channel operations (read or write)
        // in a select are skipped, we set r to d.readChan only when there is data to read
        case r <- dataRead:
            d.moveForward()
        case <-d.emptyChan:
            d.emptyResponseChan <- d.deleteAllFiles()
        case dataWrite := <-d.writeChan:  //writeChan   刚刚写入数据的chan
            d.writeResponseChan <- d.writeOne(dataWrite)
        case <-syncTicker.C:
            d.needSync = true
        case <-d.exitChan:
            goto exit
        }
    }

exit:
    log.Printf("DISKQUEUE(%s): closing ... ioLoop", d.name)
    syncTicker.Stop()
    d.exitSyncChan <- 1
}


© 著作权归作者所有

共有 人打赏支持
昏鸦
粉丝 6
博文 182
码字总数 59043
作品 0
程序员
两个nsqd间传输数据时,如果有一方或者双方都在LAN中,数据怎样传输?

@昏鸦 你好,想跟你请教个问题: 按照我的学习了解:客户端通过查询nsqlookup得知自己想要的topic的新数据在哪些nsqd上,得到一个nsqd列表,然后客户端去连接这些nsqd,拿取它要的数据。 但是...

天台道人
2015/04/02
448
1
nsq源码分析之nsqlookup实现

nsqlookup服务为nsqd的服务发现,分析和借鉴下服务发现的源码: 再来看看nsqlookup是怎么去存储管理这些Producer: 分别看看怎么新增Registration和Producer: 再看看Find相应的Registration和P...

smart_w
07/31
0
0
游戏服务器骨架--gonet2

欢迎使用 gonet/2是新一代游戏服务器骨架,基于go语言开发,采用了先进的http/2作为服务器端主要通信协议,以microservice作为主要思想进行架构,采用docker作为服务发布手段。相比第一代gon...

xtaci
2015/10/15
3.2K
0
nsq在docker部署后nsqadmin无法resolve所有nsqd的host的解决方法

nsq在docker部署后nsqadmin无法resolve所有nsqd的host的解决方法 疯子的自留地2017-12-173 阅读 docker方法部署 问题 最开始部署 nsq 的时候是直接在云主机上直接部署的,但是作为一个牛逼的程...

疯子的自留地
2017/12/17
0
0
去中心化分布式服务实时消息平台-NSQ

NSQ是一个基于Go语言的开源的分布式实时消息平台 NSQ可用于大规模系统的实时消息服务,它的设计目标是为在分布式环境下提供一个强大的去除中心化的分布式服务架构,可以每天处理数以亿计的实...

kiwisoft
2016/10/27
22
0

没有更多内容

加载失败,请刷新页面

加载更多

如何通过 J2Cache 实现分布式 session 存储

做 Java Web 开发的人多数都会需要使用到 session (会话),我们使用 session 来保存一些需要在两个不同的请求之间共享数据。一般 Java 的 Web 容器像 Tomcat、Resin、Jetty 等等,它们会在...

红薯
今天
3
0
C++ std::thread

C++11提供了std::thread类来表示一个多线程对象。 1,首先介绍一下std::this_thread命名空间: (1)std::this_thread::get_id():返回当前线程id (2)std::this_thread::yield():用户接口...

yepanl
今天
3
0
Nignx缓存文件与动态文件自动均衡的配置

下面这段nginx的配置脚本的作用是,自动判断是否存在缓存文件,如果有优先输出缓存文件,不经过php,如果没有,则回到php去处理,同时生成缓存文件。 PHP框架是ThinkPHP,最后一个rewrite有关...

swingcoder
今天
1
0
20180920 usermod命令与用户密码管理

命令 usermod usermod 命令的选项和 useradd 差不多。 一个用户可以属于多个组,但是gid只有一个;除了gid,其他的组(groups)叫做扩展组。 usermod -u 1010 username # 更改用户idusermod ...

野雪球
今天
3
0
Java网络编程基础

1. 简单了解网络通信协议TCP/IP网络模型相关名词 应用层(HTTP,FTP,DNS等) 传输层(TCP,UDP) 网络层(IP,ICMP等) 链路层(驱动程序,接口等) 链路层:用于定义物理传输通道,通常是对...

江左煤郎
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部