从nsq队列研究中取数据时会报java.lang.IllegalStateException: Queue full

NSQ源码分析(2)- nsqd消息的推送与订阅 - 简书
NSQ源码分析(2)- nsqd消息的推送与订阅
NSQ针对消费者采取消息推送的方式,因为NSQ本身基于内存和diskq,并不能容忍太大的消息的堆积,使用推模式也合情合理。
前一篇我们已经看到了针对一个发送到给定topic后,这个message被复制了多份,发送到了这个topic下的每一个channel中,存在在channel的memeoryMsgChan或者backend中。
消息的订阅与推送
关于消息的推送最重要的是两个文件:nsqd/protocol_v2.go和nsqd/client_v2.go。
当一个客户端与nsqd进程建立了一个tcp链接时,代码会调用protocolV2.IOLoop方法,并新建一个clientV2结构体对象。IOLoop方法会启动一个协程执行messagePump方法。
对于每一个tcp连接,都会有两个协程:运行IOLoop的协程用于接收客户端的请求;运行messagePump的负责处理数据,把数据给客户端clientV2推送给客户端。
整个protocol_v2就是一个比较经典的tcp协议的实现。每当建立一个新的tcp连接,服务器都会建立一个client_v2对象,和启动protocol_v2.messagePump协程,一个client只会订阅一个channel。IOLoop用于接收客户端传来的指令,并进行回复,并通过各个channel和其它的组件通信(包括protocol_v2.messagePump)。详情可以看源代码:
我们想要关注的消息的推送可以看messagePump的实现,如下:
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
var err error
var buf bytes.Buffer
var memoryMsgChan chan *Message
var backendMsgChan chan []byte
var subChannel *Channel
// NOTE: `flusherChan` is used to bound message latency for
// the pathological case of a channel on a low volume topic
// with &1 clients having &1 RDY counts
var flusherChan &-chan time.Time
var sampleRate int32
subEventChan := client.SubEventChan
identifyEventChan := client.IdentifyEventChan
outputBufferTicker := time.NewTicker(client.OutputBufferTimeout)
heartbeatTicker := time.NewTicker(client.HeartbeatInterval)
heartbeatChan := heartbeatTicker.C
msgTimeout := client.MsgTimeout
// v2 opportunistically buffers data to clients to reduce write system calls
// we force flush in two cases:
1. when the client is not ready to receive messages
2. we're buffered and the channel has nothing left to send us
(ie. we would block in this loop anyway)
flushed := true
// signal to the goroutine that started the messagePump
// that we've started up
close(startedChan)
//IsReadyForMessages会检查InFlightMessages的数目是否超过了客户端设置的RDY,超过后,不再取消息推送,而是强制做flush。
if subChannel == nil || !client.IsReadyForMessages() {
// the client is not ready to receive messages...
memoryMsgChan = nil
backendMsgChan = nil
flusherChan = nil
// force flush
client.writeLock.Lock()
err = client.Flush()
client.writeLock.Unlock()
if err != nil {
flushed = true
} else if flushed {
// last iteration we flushed...
// do not select on the flusher ticker channel
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = nil
// select on the flusher ticker channel, too
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = outputBufferTicker.C
case &-flusherChan: //ticker chan,保证定期flush
client.writeLock.Lock()
err = client.Flush()
client.writeLock.Unlock()
if err != nil {
flushed = true
case &-client.ReadyStateChan://continue to next iteration:check ready state
case subChannel = &-subEventChan://收到client的SUB的topic的channel后,更新内存中的subChannel开始推送;只会SUB一个channel
// you can't SUB anymore
subEventChan = nil
case identifyData := &-identifyEventChan:
case &-heartbeatChan://heartbeat check
err = p.Send(client, frameTypeResponse, heartbeatBytes)
if err != nil {
case b := &-backendMsgChan:
if sampleRate & 0 && rand.Int31n(100) & sampleRate {
msg, err := decodeMessage(b)
if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
msg.Attempts++
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage() //add mem count
err = p.SendMessage(client, msg, &buf)
if err != nil {
flushed = false
case msg := &-memoryMsgChan:
if sampleRate & 0 && rand.Int31n(100) & sampleRate {
msg.Attempts++
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg, &buf)
if err != nil {
flushed = false
case &-client.ExitChan:
p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting messagePump", client)
heartbeatTicker.Stop()
outputBufferTicker.Stop()
if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): [%s] messagePump error - %s", client, err)
首先,客户端发送一个SUB消息来订阅一个topic下的Channel。protocol_v2.go/protocolV2.SUB中,会往clientV2.go/client.SubEventChan发送一个channel。这里的messagePump便更新了内存中的subChannel开始推送这个订阅的channel的消息。
在循环的开头,messageMsgChan和backendChan都会用这个subChannel对应的channel select它们的消息。每当有一个消息来的时候,首先(1)会调用channel的StartInFlightTimeout,channel会把这个消息加到InFlightPqueue里,这个是以timeout时间作为优先级的优先级队列(最小堆),用于保存发送给客户端但是还没有被确认的消息。
(2)还有会更新client的一些counter信息,如InFlightMessageCount等,根据InFlightMessageCount和RDY比较决定是否继续推送消息。
客户端成功消费一条消息后,会发送一个FIN消息,带上message ID,client会-1 InFlightMessageCount,从channel的InflightMessage中取出这个消息,并向ReadStateChan发送一个消息;如果服务端因为RDY限制停止推送消息,收到这个消息后,也会重新查看是否可以继续推送消息。
或者客户端如果消费失败,也会发送一个REQ的请求,channel会把这个消息从channel的InflightMessage中取出这个消息,重新放入channel。
那如果客户端没有对消息做回复呢?
消息超时的设计与实现
在nsqd.go中,还有一部分重要的实现,queueScanLoop方法中,每隔QueueScanInterval的时间,会从方法cache的channels list中随机选择QueueScanSelectionCount个channel,然后去执行resizePool。这个实现参考了redis的probabilistic expiration algorithm.
参考《Redis设计与实现》9.6 Redis的过期键删除策略,结合了两种策略:
惰性删除。每次客户端对某个key读写时,会检查它是否过期,如果过期,就把它删掉。
定期删除。定期删除并不会遍历整个DB,它会在规定时间内,分多次遍历服务器中各个DB,从数据库的expires字典中随机检查一部分键的过期时间,如果过期,则删除。
对于nsqd的channel,它有两个队列需要定时检查,一个是InFlightQueue,一个是DeferredQueue。任何一个有工作做,这个channel就被视为dirty的。
每隔default 100ms(QueueScanInterval),nsqd会随机选择20(QueueScanSelectionCount)个channel扔到workerCh chan之中。
每隔5s,queueScanLoop都会调用resizePool。resizePool可以看做是一个fixed pool size的协程池,idealPoolSize= min(AllChannelNum * 0.25, QueueScanWorkerPoolMax)。这么多的协程的工作就是,对于从workerCh收到的每一个channel,都会调用它的channel.go/channel.processInFlightQueue方法和channel.go/channel.processDeferredQueue方法,任何的变动都会把这次queueScan行为标记为dirty。
每次这20个channel全部都scan完毕后,会统计dirtyNum / QueueScanSelectionNum的比例,如果大于某个预设的阈值QueueScanDirtyPercent,将不会间隔时间,直接开始下一轮的QueueScan。
那么为什么每隔5s要重新调用resizePool呢?这是为了根据最新的allChannelNum给予机会去更新resizePool协程池的协程数。因为PoolSize是NSQD的数据域,是全局的状态,每次调用并不会另外新建一个协程池,而是根据idealSize调整它的大小。这部分代码实现也比较经典,可以学习一下“如何使用Golang实现一个协程池的经典实现,尤其是需要动态调整池大小的需求”。
// resizePool adjusts the size of the pool of queueScanWorker goroutines
1 &= pool &= min(num * 0.25, QueueScanWorkerPoolMax)
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
idealPoolSize := int(float64(num) * 0.25)
if idealPoolSize & 1 {
idealPoolSize = 1
} else if idealPoolSize & n.getOpts().QueueScanWorkerPoolMax {
idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
if idealPoolSize == n.poolSize {
} else if idealPoolSize & n.poolSize {
// contract
closeCh &- 1
n.poolSize--
n.waitGroup.Wrap(func() {
n.queueScanWorker(workCh, responseCh, closeCh)
n.poolSize++
// queueScanWorker receives work (in the form of a channel) from queueScanLoop
// and processes the deferred and in-flight queues
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
case c := &-workCh:
now := time.Now().UnixNano()
dirty := false
if c.processInFlightQueue(now) {
dirty = true
if c.processDeferredQueue(now) {
dirty = true
responseCh &- dirty
case &-closeCh:
// queueScanLoop runs in a single goroutine to process in-flight and deferred
// priority queues. It manages a pool of queueScanWorker (configurable max of
// QueueScanWorkerPoolMax (default: 4)) that process channels concurrently.
// It copies Redis's probabilistic expiration algorithm: it wakes up every
// QueueScanInterval (default: 100ms) to select a random QueueScanSelectionCount
// (default: 20) channels from a locally cached list (refreshed every
// QueueScanRefreshInterval (default: 5s)).
// If either of the queues had work to do the channel is considered "dirty".
// If QueueScanDirtyPercent (default: 25%) of the selected channels were dirty,
// the loop continues without sleep.
func (n *NSQD) queueScanLoop() {
workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
closeCh := make(chan int)
workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)
channels := n.channels()
n.resizePool(len(channels), workCh, responseCh, closeCh)
case &-workTicker.C:
if len(channels) == 0 {
case &-refreshTicker.C:
channels = n.channels()
n.resizePool(len(channels), workCh, responseCh, closeCh)
case &-n.exitChan:
num := n.getOpts().QueueScanSelectionCount
if num & len(channels) {
num = len(channels)
for _, i := range util.UniqRands(num, len(channels)) {
workCh &- channels[i]
numDirty := 0
for i := 0; i & i++ {
if &-responseCh {
numDirty++
if float64(numDirty)/float64(num) & n.getOpts().QueueScanDirtyPercent {
n.logf(LOG_INFO, "QUEUESCAN: closing")
close(closeCh)
workTicker.Stop()
refreshTicker.Stop()
channel.go/channel.processInFlightQueue的实现比较简单,把channel的InflightPQueue中的message按照超时时间由早到晚把超时时间小于给定时间的消息依次取出,做一些一致性的数据操作后,重新放入channel之中(也会发送TryUpdateReadyState)。processDeferredQueue也是类似的。
这里通过了一定的概率加受控制的并发协程池,清理内存中timeout未被客户端所确认的消息,重新放入队列,保证了消息的可达性。(存在重复消费消息的可能)
经典的GO并发
我们其实可以发现,同一个channel,可能会有很多的client从它的memoryMsgChan和backendChan里select监听消息,因为同一个消息对于Golang的channel来说只会被一个监听者收到,所以,通过这样的机制实现了一定程度上的消费者的负载均衡。
NSQ的代码很适合用Golang的Goroutine, Channel, & Mutex并发的good practice来学习。
Don't repeat yourself.
Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智能路由,微代理,控制总线)。分布式系统的协调导致了样板模式, 使用Spring Cloud开发人员可以快速地支持实现这些模式的服务和应用程序。他们将在任何分布式...
一. 什么是Tao Tao,在英文中的意思是“The ultimate principle of universe”,即“道”,它是宇宙的终极奥义。 “道生一,一生二,二生三,三生无穷。” ——《道德经》 Tao同时也是我用Go语言开发的一个异步的TCP服务器框架(TCP ...
实时消息协议---流的分块 版权声明: 版权(c)2009 Adobe系统有限公司。全权所有。 摘要: 本备忘录描述实时消息协议块流。块流是一种应用层协议,主要用于通过一种合适的传输层协议(例如TCP)复用、打包多媒体数据流(音频,视频和交互数据)。 目录: o 简介 o ...
本文章翻译自http://www.rabbitmq.com/api-guide.html,并没有及时更新。 术语对照 指南:Guide 教程:Tutorial 代理(即RabbitMQ服务端):Broker、RabbitMQ server 客户端:Client 发布者:Pu...
用两张图告诉你,为什么你的 App 会卡顿? - Android - 掘金Cover 有什么料? 从这篇文章中你能获得这些料: 知道setContentView()之后发生了什么? ... Android 获取 View 宽高的常用正确方式,避免为零 - 掘金相信有很多朋友...
出差多日,昨日被提前放回家,终于可以有个懒洋洋的休息日。一进门,行李箱门口一扔,把自己扔进沙发,看最近火爆的电视剧人民的名义。晚些时妈妈带了食物过来,可是完全没食欲,妈妈喋喋不休一阵回去了。妈妈唠唠叨叨的时候就觉得烦躁,可是又每次都后悔反省检讨自己,怎么对阿爸阿妈总是没有耐...
为了摆脱经济学一些概念的束缚,曾经有过许多次努力的尝试,但是到现在,虽然付出了那么多的努力,这些概念依然保留着,这是一个事实,我们只是将这些概念另取了名字,但是我们并不能去使用它,当这些概念被我们的主流搜索抛弃,但它却一直不曾离去,它从我们的偏僻领域又回到了我们的研究中。在...
诗/耳风 我是长河边的荒野 你是肆虐天际的藤蔓 在同一个日落的时候 我消失在黑夜里 你成长在晚霞中 我们都会,狂野生长 却不会同样奔向死亡 或许,下个黎明 消失的我 被另一片草野代替 但燃烧的你 将成为下一个太阳 难道你会记住我? 就算你我 同样灿烂与沉默 放肆与孤独 曾同...消息队列各种比较
我的图书馆
消息队列各种比较
http://blog.csdn.net/archimedes_zht/article/details/7401210
最近的工作需要用到MessageQueue来做“任务分发”,自己写了一个简单的,但是感觉不够满意。主要还是感觉消息队列持久化要做的好很难,还有各种异常情况等等,短时间开发一个,挑战很大,还是找一个开源的实现吧。
先是找到了RabbitMQ,是erlang写的,而我们的系统是用Go写的,目前它还没有Go的绑定,部署系统的时候也要装erlang虚拟机。考察了之后决定放弃。
然后是最近大名鼎鼎的ZeroMQ,第一次听说是在云风大哥的博客上(http://blog.codingnow.com/2011/02/zeromq_message_patterns.html),我后来在还没有用它写代码之前就分析过它的源码了(http://blog.csdn.net/archimedes_zht/article/details/7342937,http://www.zeromq.org/docs:omq-2-1-class-diagram)。:D
分析代码的时候就知道它没有分发任务、执行任务时候的各种确认,任务的持久化以防止任务丢失。不过ZeroMQ可以把内存中放不下的消息先写到磁盘上去,然后再读进内存(见swap_t类)。这次再大致看了下它的文档,可以确认它的名字虽然是MQ,但是它提倡的是“结构化网络编程”,它建立了非常灵活的编写网络程序的基础设施,可以用在各种场合,不过这些场合都需要你自己利用它提供的基础设施来定制(比如我现在急切需要的各种任务分发相关的特性)。目前知道的是它可以创建一个queue的device。不过这个目前和我的“任务分发”需求相差很远。以后有空再研究这个精彩的、雄心勃勃的项目吧。
然后是Gearman(http://gearman.org/),它是专门做任务分发的。任务可以分为前台任务(同步任务)和后台任务(异步任务),而且提供了任务的持久化机制(配置一个数据库就行了,支持MySQL,Sqlite3等),非常符合我的胃口。:) 而且 mikespook大侠已经提供好了Gearman协议的Golang实现(http://www.mikespook.com/2011/05/go-%E8%AF%AD%E8%A8%80%E7%9A%84-gearman-api/)代码也实现的非常漂亮,学习了。而且我听说有Gearman这个好东东也是看了mikespook大侠的Go语言介绍才知道的(http://ecug.googlecode.com/svn/trunk/ecug-con/2011/mikespook/)。
我个人觉得Gearman的文档最值得看的就是它的协议交互了(&),另外就是它的C接口使用(&),分为Client和Worker两部分。
不过在下载安装Gearman的时候,发现竟然要先装Boost,唉。。。&另外如果Gearman仍然是C开发的就好了,为什么要转向C++呢。。。搞不懂,不明白。。&
最后以查找资料的时候,国外牛人对各种“MQ”的总结结束。
RabbitMQ、ZeroMq和ActiveMQ的比较:&
RabbitMQ is one of the leading implementation of the AMQP protocol (along with Apache Qpid). Therefore, it implements a broker architecture, meaning that messages are queued on a central node before being sent to clients. This approach makes RabbitMQ very easy to use and deploy, because advanced scenarios like routing, load balancing or persistent message queuing are supported in just a few lines of code. However, it also makes it less scalable and “slower” because the central node adds latency and message envelopes are quite big.
ZeroMq is a very lightweight messaging system specially designed for high throughput/low latency scenarios like the one you can find in the financial world. Zmq supports many advanced messaging scenarios but contrary to RabbitMQ, you’ll have to implement most of them yourself by combining various pieces of the framework (e.g : sockets and devices). Zmq is very flexible but you’ll have to study the 80 pages or so of the guide (which I recommend reading for anybody writing distributed system, even if you don’t use Zmq) before being able to do anything more complicated that sending messages between 2 peers.
ActiveMQ is in the middle ground. Like Zmq, it can be deployed with both broker and P2P topologies. Like RabbitMQ, it’s easier to implement advanced scenarios but usually at the cost of raw performance. It’s the Swiss army knife of messaging :-).&Finally, all 3 products:&(1)have client apis for the most common languages (C++, Java, .Net, Python, Php, Ruby, …)&(2)have strong documentation&(3)are actively supported&Gearman与RabbitMQ的比较:&
I would say that Gearman is better for queuing "jobs" and RabbitMQ is better for queuing "data". Of course, they are both really the same thing, but the way it works out for me is that if you are trying to "fan out" work to be done, and the workers can work independently, Gearman is the better way to do it. But if you are trying to feed data from a lot of sources down into fewer data consumers, RabbitMQ is the better solution.
The history of RabbitMQ, as something that allowed Twitter to take bursty loads of messages, and feed them into crusty old SMS gateways that could keep only one connection open, were rate limited, and didnt have retries, is illustrative of the kind of problems that RabbitMQ is good at solving.
一篇文章:
常见的开源消息系统——网站公告、通知、消息的实现方式
的作用:异步处理、削减峰值、减少组件之间的耦合。
选择消息系统根据业务需要需要考虑以下几个方面:
分布式扩展能力
兼容现有协议
其他,如消息丢失和重复的处理
避免单点故障
常见消息系统协议:
类似 MEMCACHE 的协议
下述1、2 是不错的可选开源组件:
1. Kafka/MetaQ: 广泛用于&&内部 (类似有 Java 版本的国产 MetaQ)
由于优先考虑吞吐,更加适合大数据量的消息收集和处理,比如日志分析、用户行为信息实时报表、集群状态信息收集和分析。
优先考虑持久化的设计,依靠 page cache 管理内存
高吞吐 112MB/s 11K msgs/s (比 beanstalkd &70x 吞吐能力)
支持异步复制
高可用、基于 Zookeeper 的集群设计、支持消费者失效后重新负载均衡
Kafka 提供&&类库
支持 ganglia JMX 监控
需要策略避免重复消息, 消费者更新 Zookeeper 的 offset 的方式 (MetaQ 已经提供了几种方式避免消息重复)
MetaQ 提供 HTTP 接口
2. NSQ – Golang
无中心设计、节点自动注册和发现。可以考虑作为内部通讯框架的基础。
* 追求简单部署* 追求高可用、避免单点故障、无中心设计* 确保消息送达* 生产者消费者自动发现、消费者连接所有生产者、向消费者推的模式* 提供 HTTP 接口
3. Beanstalkd
支持持久化 binlog 设计,重启消息不丢失
无高可用设计
和 memcached 一样的分布式扩展方式
有 Web 管理工具
支持同步调用,等待返回
只有类似 Memcache TCP ASCII 协议, 单文件部署
支持消息优先级
9K jobs/s 入队列 5K jobs/s 出队列
无主从同步复制机制
最好单机多实例部署
需要自己封装 Pub/Sub
基于 Redis 的复制高可用
其他常见开源消息系统:
ZeroMQ: 轻量级基础消息库
只适合不需要持久化的场景、需要自己封装
不支持持久化,只提供消息分发, 性能最好
无 Broker 设计, 无中心故障
2500 job/s 入队列 1300 job/s 出队列
适合小消息
分布式无单点设计
底层为 Erlang 实现有评论: RabbitMQ could not enqueue/dequeue fast enough.
性能差不考虑
3800 jobs/s 入队列 300 jobs/s 出队列基于 Redis 的消息队列
SquirrelMQ
Sparrow – Ruby
Apache ActiveMQ
ActiveMQ crashed constantly under load.
STOMP HTTP 协议
喜欢该文的人也喜欢寂寂轻舞,静待秋风
nsq之基本用法
nsq的基本用法
安装参考路径
http://nsq.io/deployment/installing.html
nsq安装脚本
`brew install nsq`
`nohup /usr/local/bin/nsqlookupd & /dev/null &`
`nohup nsqd --lookupd-tcp-address=127.0.0.1:4160
& /dev/null &`
`nohup nsq_to_file --topic=test --output-dir=/Users/bikang1/Documents/GoProject/go-go-go/标准库的使用/src/weibo.com/uverd/kproject/nsq-use/tmp --lookupd-http-address=127.0.0.1:4161
& /dev/null &`
`nohup nsqadmin --lookupd-http-address=127.0.0.1:4161
& /dev/null &`
nsq的详细说明
http://127.0.0.1:4171/
通过命令行发布一条消息
curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'
一直报错~~因为nsqlookup没有指定域名。我绑定了本机名称到本地解决了这个问题
通过go-nsq发布消息
https://godoc.org/github.com/nsqio/go-nsq#Producer.SetLogger
package main
"github.com/nsqio/go-nsq"
func main() {
config := nsq.NewConfig()
produce, err := nsq.NewProducer("127.0.0.1:4152", config)
defer produce.Stop()
if err != nil {
panic(err)
topicName := "test"
err = produce.Publish(topicName, []byte("a-msg"))
if err != nil {
log.Printf("%v\n", err)
package main
"github.com/nsqio/go-nsq"
func main() {
config := nsq.NewConfig()
config.LookupdPollInterval = time.Second * 3
topicName := "test"
consumer, err := nsq.NewConsumer(topicName, "tt", config)
if err != nil {
panic(err)
consumer.AddHandler(nsq.HandlerFunc(func(m *nsq.Message) error {
log.Println("receive", m.NSQDAddress, "message:", string(m.Body))
return nil
address := "0.0.0.0:4161"
if err = consumer.ConnectToNSQLookupd(address); err != nil {
panic(err)
time.Sleep(time.Second * 10)
nsq 优秀的消息队列
NSQ研究与实践
nsq 快速入门经验分享
NSQ 安装指南
golang实战-nsq集群入门与坑
kafka和nsq的对比
golang实时消息平台NSQ的使用
Linux安装及部署NSQ消息队列
NSQ的设计特点:
没有更多推荐了,

我要回帖

更多关于 nsq kafka 比较 的文章

 

随机推荐