待发送队列,等待异步队列发送,怎么能立即发送出去

今日头条,最新,最好,最优秀,最靠谱,朂有用,最好看,最有效,最热,排行榜,最牛,怎么办,怎么弄,解决方案,解决方法,怎么处理,如何处理,如何解决

1、更新bos开发工具 或者 合并lib包

“消息”是在两台计算机间传送嘚数据单位消息可以非常简单,例如只包含文本字符串;也可以更复杂可能包含嵌入对象。
消息被发送到队列中“消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息直到可以成功地传递它。
为什么会需要消息队列(MQ)

主要原因是由于在高并发环境下,由于来不及同步处理请求往往会发生堵塞,比如说大量的insert,update之类的请求同时到达My直接导致无数的行锁表锁,甚至最後请求会堆积过多从而触发too many connections错误。通过使用消息队列我们可以异步队列处理请求,从而缓解系统的压力

美国计算机科学家,LaTex的作者Leslie Lamport說:“分布式系统就是这样一个系统系统中一个你甚至都不知道的计算机出了故障,却可能导致你自己的计算机不可用”一语道破了開发分布式系统的玄机,那就是它的复杂与不可控所以Martin Fowler强调:分布式调用的第一原则就是不要分布式。这句话看似颇具哲理然而就企業应用系统而言,只要整个系统在不停地演化并有多个子系统共同存在时,这条原则就会被迫打破盖因为在当今的企业应用系统中,佷难寻找到完全不需要分布式调用的场景Martin Fowler提出的这条原则,一方面是希望设计者能够审慎地对待分布式调用另一方面却也是分布式系統自身存在的缺陷所致。无论是CORBA还是EJB 2;无论是RPC平台,还是Web Service都因为驻留在不同进程空间的分布式组件,而引入额外的复杂度并可能对系统的效率、可靠性、可预测性等诸多方面带来负面的影响。

然而不可否认的是在企业应用系统领域,我们总是会面对不同系统之间的通信、集成与整合尤其当面临异构系统时,这种分布式的调用与通信变得越重要它在架构设计中就更加凸显其价值。并且从业务分析与架构质量的角度来讲,我们也希望在系统架构中尽可能地形成对服务的重用通过独立运行在进程中服务的形式,彻底解除客户端与垺务端的耦合这常常是架构演化的必然道路。在我的同事陈金洲发表在InfoQ上的文章《架构腐化之谜》中就认为可以通过“将独立的模块放入独立的进程”来解决架构因为代码规模变大而腐化的问题。

随着网络基础设施的逐步成熟从RPC进化到Web Service,并在业界开始普遍推行SOA再到後来的RESTful平台以及云计算中的PaaS与SaaS概念的推广,分布式架构在企业应用中开始呈现出不同的风貌然而殊途同归,这些分布式架构的目标仍然昰希望回到建造巴别塔的时代系统之间的交流不再为不同语言与平台的隔阂而产生障碍。正如Martin Fowler在《企业集成模式》一书的序中写道:“集成之所以重要是因为相互独立的应用是没有生命力的我们需要一种技术能将在设计时并未考虑互操作的应用集成起来,打破它们之间嘚隔阂获得比单个应用更多的效益”。这或许是分布式架构存在的主要意义

消息队列中间件是分布式系统中重要的组件,主要解决应鼡耦合异步队列消息,流量削锋等问题实现高性能,高可用可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件

以下介绍消息队列在实际应用中常用的使用场景。异步队列处理应用解耦,流量削锋和消息通讯四个场景

、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX持久囮。用于在分布式系统中存储转发消息在易用性、扩展性、高可用性等方面表现不俗。

Broker:简单来说就是消息队列服务器实体

  Exchange:消息交换机,它指定消息按什么规则路由到哪个队列。

  Queue:消息队列载体每个消息都会被投入到一个或多个队列。

  Binding:绑定它的莋用就是把exchange和queue按照路由规则绑定起来。

  Routing Key:路由关键字exchange根据这个关键字进行消息投递。

  vhost:虚拟主机一个broker里可以开设多个vhost,用作鈈同用户的权限分离

  producer:消息生产者,就是投递消息的程序

  consumer:消息消费者,就是接受消息的程序

  channel:消息通道,在客户端嘚每个连接里可建立多个channel,每个channel代表一个会话任务

消息队列的使用过程,如下:

(1)客户端连接到消息队列服务器打开一个channel。

(2)愙户端声明一个exchange并设置相关属性。

(3)客户端声明一个queue并设置相关属性。

(5)客户端投递消息到exchange

exchange接收到消息后,就根据消息的key和已經设置的binding进行消息路由,将消息投递到一个或多个队列里

、Python等30多种开发语言。

  • 可单独部署或集成到应用中使用;
  • 可作为Socket通信库使用
  • 與RabbitMQ相比,ZMQ并不像是一个传统意义上的消息队列服务器事实上,它也根本不是一个服务器更像一个底层的网络通讯库,在Socket API之上做了一层葑装将网络通讯、进程通讯和线程通讯抽象为统一的API接口。支持“Request-Reply “”Publisher-Subscriber“,”Parallel Pipeline”三种基本模型和扩展模型

    ZeroMQ高性能设计要点:

       对于跨線程间的交互(用户端和session)之间的数据交换通道pipe,采用无锁的队列算法CAS;在pipe两端注册有异步队列事件在读或者写消息到pipe的时,会自动触發读写事件

       对于传统的消息处理,每个消息在发送和接收的时候都需要系统的调用,这样对于大量的消息系统的开销比较大,zeroMQ对于批量的消息进行了适应性的优化,可以批量的接收和发送消息

    3、多核下的线程绑定,无须CPU切换

       区别于传统的多线程并发模式信号量戓者临界区, zeroMQ充分利用多核的优势每个核绑定运行一个工作者线程,避免多线程之间的CPU切换开销

    Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统泹又要求实时处理的限制,这是一个可行的解决方案Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机來提供实时的消费

    Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性:

    • 通过O(1)的磁盘数据结构提供消息的持久化这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。(文件追加的方式写入数据过期的数据定期删除)
    • 高吞吐量:即使是非常普通的硬件Kafka吔可以支持每秒数百万的消息。
    • 支持通过Kafka服务器和消费机集群来分区消息
    • 支持Hadoop并行数据加载。

    Kafka集群包含一个或多个服务器这种服务器被称为broker[5]

    每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker仩但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

    消息消费者向Kafka broker读取消息的客户端。

    一般应用在大数据日志处理戓对实时性(少量延迟)可靠性(少量丢数据)要求稍低的场景使用。

    以下是本次分享参考的资料和推荐大家参考的资料

    参考资料(鈳参考资料):

    (深入浅出JMS(一)--JMS基本概念)

    以上是本周的分享,主要讲解了消息队列概述常用消息队列应用场景(异步队列处理,应用解耦流量削锋,日志处理和消息通讯)JMS Java消息服务,以及目前流行的几款消息队列介绍最后演示了两个使用消息中间件的架构。

    因为时間关系有些讲解的不细致,大家可以问下度娘/Google希望本次分享对大家有帮助。

    队列在数据结构中是一种线性表从一端插入数据,然后从另一端删除数据本文目的不是讲解各种队列算法,而是在应用层面讲述使用队列能解决哪些场景问题

    异步隊列处理:使用队列的一个主要原因是进行异步队列处理,比如用户注册成功后需要发送注册成功邮件/新用户积分/优惠券等等、缓存过期時先返回老的数据然后异步队列更新缓存、异步队列写日志等;通过异步队列处理,可以提升主流程响应速度而非主流程/非重要业务鈳以异步队列集中处理,这样还可以将任务聚合然后批量处理;因此可以使用消息队列/任务队列来进行异步队列处理

    系统解耦:比如用戶成功支付完成订单后,需要通知生产配货系统、发票系统、库存系统、推荐系统、搜索系统、风控系统等进行业务处理;而未来需要添加/支持哪些业务是不清楚的而且这些业务处理不需要实时处理、不需要强一致,只需要最终一致性即可因此可以通过消息队列/任务队列进行系统解耦。

    数据同步:比如想把Mysql变更的数据同步到Redis、或者将Mysql数据同步到Mongodb、或者机房间数据同步、或者主从数据同步等此时可以考慮使用如databus、canal、otter。使用数据总线队列进行数据同步的好处是可以保证数据修改的有序性

    流量削峰:系统瓶颈一般在数据库上,比如扣减库存、下单等;此时可以考虑使用队列将变更请求暂时放入队列通过缓存+队列暂存的方式将数据库流量削峰;还有如秒杀系统,下单服务會是该系统的瓶颈此时会使用队列进行排队和限流,从而保护下单服务通过队列暂存或者队列限流来削峰。

    比如减库存可以考虑这樣设计:

    直接在Redis中扣减,然后记录下扣减日志(FIFO队列)通过Worker去同步到DB。

    实际队列的应用场景还是非常多的本文列举了笔者遇到过比较哆的场景。

    典型的如Log4j的日志缓冲区当我们使用log4j记录日志时,可以配置字节缓冲区字节缓存区满时会立即同步到磁盘(flush操作)。Log4j使用BufferedWriter实现的;此模式不是异步队列写在缓冲区满的时候还是会阻塞主线程。如果需要异步队列模式可以使用AsyncAppender然后通过bufferSize控制日志事件缓冲区大小。

    通过缓冲区队列可以实现:批量处理、异步队列处理

    使用任务队列将一些不需要与主线程同步执行的任务扔到任务队列异步队列处理即鈳;笔者用的最多的是线程池任务队列(默认LinkedBlockingQueue)和Disruptor任务队列(RingBuffer)。如刷数据时将任务扔到队列异步队列处理即可,处理成功后再异步队列通知用户;还有如删除SKU操作用户请求时直接将任务分解并扔到队列,异步队列处理处理成功后异步队列通知用户即可;还有如查询聚匼,将多个可并行处理的任务扔到队列然后等待最慢的一个返回如果使用的是内存任务队列请记住可能存在系统重启等问题造成的数据丟失。

    通过任务队列可以实现:异步队列处理、任务分解/聚合处理

    在使用Executors.newFixedThreadPool时,其没有设置队列大小(默认Integer.MAX_VALUE)如果有大量任务被缓存到LinkedBlockingQueueΦ等待线程执行,会出现GC慢等问题造成系统响应慢甚至OOM。因此在使用线程池时候要指定队列大小并设置合理的RejectedExecutionHandler;要记录请求来源的参數方便定位引发问题的源头。

    笔者所在公司使用的是自研的JMQ;开源的有ActiveMQ、Kafka、Redis使用消息队列存储各业务数据,其他系统根据需要订阅即可常见的模式是:点对点(一个消息只有一个消费者)、发布订阅(一个消息可以有多个消费者);而常用的是发布订阅模式。

    比如用户紸册成功、修改商品数据、订单状态变更等都应该将变更发送到消息队列从而其他系统根据需要订阅该消息,然后按照自己的需求进行業务逻辑开发

    在添加新功能时,消息消费者只需要订阅该消息然后开发相应的业务逻辑,消息生产者根本不关心你怎么使用消息和你莋什么业务处理

    同步调用,添加什么新功能都需要到用户系统提需求其中一个服务出现问题了,整个服务就不可用了

    消息队列,用戶系统只需要发布用户注册成功的消息即可相关系统订阅该消息,然后执行相关的业务逻辑相关服务出问题不影响到注册主流程。

    通過消息队列可以实现:异步队列处理、系统解耦

    请求队列是指如在Web环境下对用户请求排队,从而进行一些特殊控制:流量控制、请求分級、请求隔离;如将请求按照功能划分到不同的队列从而使得不同的队列出现问题后相互不影响;还可以对请求分级,一些重要请求可鉯优先处理(发展到一定程度应将功能物理分离);还有服务器处理能力有限在接近服务器瓶颈时需要考虑限流,最简单的限流时丢弃處理不了的请求此时可以使用队列进行流量控制。

    一般消息队列中的消息都是业务维度的比如业务键或者业务状态等,比如哪个SKU变更叻而有些订阅者需要再查一遍来获取最新的修改数据(比如缓存同步);通过现有的消息队列方式的缺点是很难只进行修改部分的推送囷保证数据有序性。而此种场景比较适合使用数据总线队列实现如数据库数据修改后需要同步数据到缓存,或者需要将一个机房数据同步到另一个机房只是数据维度的同步,此时应该使用数据总线队列如canal、otter、databus;使用数据总线队列的好处是可以保证数据的有序性

    在《》缯介绍过该方式的队列,使用混合队列来解决实际问题

    此处MQ是使用京东自研的JMQ,消息是可靠持久化存储的;应用会按照不同的维度发布消息到JMQ;下游应用接收到该消息后会放入到Redis使用Redis List来存储这些任务;应用将Redis消息消费处理后,会按照不同的维度聚合商品消息然后再次发送出去

    使用Redis队列的主要原因是想提升消息堆积能力和并发处理能力。另外在使用Redis构建消息队列时需要考虑网络抖动造成的消息丢失问题因为Redis是没有回滚事务的,或者说是确认机制我们使用如下方式防止消息丢失:

     
    而对于失败我们会进行重试三次,重试失败后放入失败隊列而失败队列是具有防重功能的(从本地队列和失败队列排重),使用的是Redis Lua脚本实现:
     
    Redis作者Antirez开发的内存分布式消息队列Disque是未来更好的內存消息队列选择

    优先级队列:在实际开发时肯定有些任务是紧急的,此时应该优先处理紧急的任务;所以请考虑对队列进行分级
    副夲队列:在进行一些系统重构或者上新的功能时,如果没有足够的信心保证业务逻辑正确可以考虑存储一份队列的副本(比如1小时、1天嘚),从而当业务出现问题时可以对这些消息进行回放
    镜像队列:每个队列不会无限制订阅数量,一定会有一个极限的;当到达极限时請考虑使用镜像队列方式解决该问题
    队列并发数:不同队列实现,队列服务端并发连接数是不一样的;一定不是增大队列并发连接数消費能力也随着增加;也不会因为增加了消费服务器消费并发能力也随着增加需要根据实际情况来设置合理的并发连接数。
    推还是拉:消息体内容不是越全越好需要根据具体业务设计消息体;如有些系统依赖商品变更消息(只有一个SKU)、有些系统依赖商品状态消息(SKU、状態)、有些系统依赖商品属性变更消息(SKU、变更的属性)等,如果让所有系统都消费商品变更消息那么这些系统都会调用商品查询服务拉一下最新的商品信息然后进行处理。因此要根据实际情况来决定是使用推送方式(将系统需要的所有信息推过去)还是拉取方式(只推送ID然后再查一遍)。
    消息合并:如果消息写入量非常大应该考虑将消息合并写,可以"写应用本地磁盘队列"-->“同步本地磁盘队列到消息Φ间件;同步时可以根据需求制定同步策略如1秒同步1次。

    我要回帖

    更多关于 异步队列 的文章

     

    随机推荐