三个主流消息中间件区别

上传人:小** 文档编号:46633652 上传时间:2021-12-14 格式:DOC 页数:18 大小:483.50KB
收藏 版权申诉 举报 下载
三个主流消息中间件区别_第1页
第1页 / 共18页
三个主流消息中间件区别_第2页
第2页 / 共18页
三个主流消息中间件区别_第3页
第3页 / 共18页
资源描述:

《三个主流消息中间件区别》由会员分享,可在线阅读,更多相关《三个主流消息中间件区别(18页珍藏版)》请在装配图网上搜索。

1、市场上的消息中间件:mom4jmom4j是一个完全实现 JMS1.1规范的消息中间件并且向下兼容JMS1.0与1.02它提供了自己的消息处理存储使它独立于关系数据与语言,所以它的客户端可以用任何语言开发Ope nJMSOpenJMS 是一个开源的 Java Message Service API 1.0.2规范的实现,它包含有以下特性:*.它既支持点到点(point-to-point )(PTP)模型和发布/订阅(Pub/Sub )模型。*.支持同步与异步消息发送*. JDBC持久性管理使用数据库表来存储消息*.可视化管理界面。*. Applet 支持。*.能够与Jakarta Tomcat

2、这样的Servlet 容器结合。*.支持 RMI, TCP , HTTP 与 SSL 协议。*.客户端验证*.提供可靠消息传输、事务和消息过滤UberMQUberMQ 完全实现了 Java Message Service 规范。UberMQ 是因为现有的许多 JMS提 供商已经违背了分布式计算的核心原则:快速与简单而开发的。Hermes JMS利用它提供的 Swing UI 可以很好的实现监控 JMS providers 。ActiveMQActiveMQ 是一个开放源码基于 Apache 2.0 lice need发布并实现了 JMS 1.1。它能够与Geronimo ,轻量级容器和任 Ja

3、va应用程序无缝的给合。Somn ifugiSomnifugi 使得工作在同一个java虚拟机中的线程能实现消息互发。Man taRayMantaRay 基于peer-2-peer技术。它具有以下特性 :1. 它既支持点对点(point-to-point)的域,又支持发布/订阅(publish/subscribe)类型的域。2. 并且提供对下列类型的支持:经认可的消息传递,事务型消息的传递,一致性消息和具有持久性的订阅者支持。3. 消息过滤体制。4. 能与 WebLogic and WebSphere 给合。5. 支持TCP, UDP 与HTTP传输协。PresumoPresumo 也是一个实

4、现 Java Message Service API 的JMS消息中间件。JORAMJORAM 一个类似于 openJMS 分布在 ObjectWeb 之下的JMS消息中间件。JMS4SpreadJMS4Spread是一个消息系统.它部分地实现了 Java消息服务(JMS) API.Open Message QueueOpen Message Queue 是 Sun Java System Message Queue的一个开源版本。Openmessage queue 是一个企业级,可升级,非常成熟的消息服务器。它为面向消息的系统 集成提供一套完整的JMS ( Java Message Serv

5、ice )实现。由于 Open MQ源自Sun的 Java Message Queue,所以其具有 Java System Message Queue拥有的所有特性,功能和性能。FFMQFFMQ是一个轻量级,高性能,快速的 Native JMS1.1开源实现。支持 SSL远程连接, 自动防故障的持久化机制,基于模板定义目的地(Destination ),采用模式匹配自动创建目的地(Destination )MQSSave/MQSLoadMQSSave 是一个简单的 Java程序,能够读取 MQSeries 队列的消息保存至文件中。而 MQSLoad 是一相反的Java程序,能够读取文件中的消息

6、然后加载至MQSeries 队列中。HornetQHornetQ 是一个支持集群和多种协议,可嵌入、高性能的异步消息系统。HornetQ 完全支持JMS,HornetQ 不但支持JMS1.1 API同时也定义属于自己的消息 API,这可以最大限 度的提升HornetQ 的性能和灵活性。在不久的将来更多的协议将被HornetQ支持。HornetQ 拥有超高的性能,HornetQ 在持久化消息方面的性能可以轻易的超于其它常 见的非持久化消息引擎的性能。当然,HornetQ 的非持久化消息的性能会表现的更好!* HornetQ 完全使用POJO,纯POJO的设计让HornetQ 可以尽可能少的以来第

7、三方的 包。从设计模式来说,HornetQ 这样的设计入侵性也最小。HornetQ既可以独立运行,也 可以与其它Java应用程序服务器集成使用。HornetQ 拥有完善的错误处理机制,HornetQ 提供服务器复制和故障自动转移功能,该功能可以消除消息丢失或多个重复信息导致服务器出错。* HornetQ 提供了灵活的集群功能,通过创建HornetQ 集群,您可以享受到到消息的负载均衡带来的性能提升。您也可以通过集群,组成一个全球性的消息网络。您也可以灵活的 配置消息路由。* HornetQ 拥有强大的管理功能。HornetQ提供了大量的管理 API和监控服务器。它可以无缝的与应用程序服务器整合

8、,并共同工作在一个HA环境中。Apache QpidApache Qpid 是最新开放企业信息标准 AMQP (Advaneed Message Queuing Protocol)的一个开源实现。Java版实现完全支持 JMS标准,可运行在任意Java平台上。此外Qpid 还提供AMQP Client APIs 的各种语言实现包括:« C+« Java, fully con forma nt with JMS 1.1« C# .NET , 0-10 using WCF« Ruby« Pytho nSpri ng AMQPSpring AMQP

9、是一个用于替换原先 Spri ng JMS 支持的消息解决方案。提供收发消息的模 板,还支持基于消息驱动的POJO。用法和配置与 Spring中对JMS的支持一样。这个项目包含Java和.NET两个版本。KafkaKafka是一个高吞吐量分布式消息系统。linkedin 开源的kafka 。 Kafka就跟这个名字一样,设计非常独特。首先,kafka的开发者们认为不需要在内存里缓存什么数据,操作系统 的文件缓存已经足够完善和强大,只要你不搞随机写,顺序读写的性能是非常高效的。kafka的数据只会顺序append,数据的删除策略是累积到一定程度或者超过一定时间再删除。Kafka另一个独特的地方是

10、将消费者信息保存在客户端而不是MQ服务器,这样服务器就不用记录消息的投递过程,每个客户端都自己知道自己下一次应该从什么地方什么位置读取消息,消息的投递过程也是采用客户端主动pull的模型,这样大大减轻了服务器的负担。Kafka还强调减少数据的序列化和拷贝开销,它会将一些消息组织成Message Set 做批量存储和发送,并且客户端在pull数据的时候,尽量以zero-copy的方式传输,利用sendfile(对应java 里的FileChannel.transferTo/transferFrom)这样的高级10函数来减少拷贝开销。可见,kafka是一个精心设计,特定于某些应用的MQ系统,这种偏

11、向特定领域的MQ系统我估计会越来越多,垂直化的产品策略值的考虑。play-rabbitmq这是Play! Framework开发框架的一个扩展模块。用于生产和消费RabbitMQ 消息。队列消息系统 FQueueFQueue是一个高性能、基于磁盘持久存储的队列消息系统。兼容memcached 协议,能用memcached的语言都可以良好的与它通信。FQueue为你提供一个不需要特别优化,高性能的一个消息系统。特性1. 基于磁盘持久化存储。2. 支持memcached 协议。3. 支持多队列,密码验证功能。4. 高性能,能达到数十万qps。5. 低内存消耗。100-300M内存即可工作得很好。6

12、. 高效率IO读写算法,IO效率高。7. 纯JAVA代码。支持进程内 JVM级别的直接调用。8. 在不需要强顺序的场景下,支持多机负载均衡。不支持1. 不支持topic方式的订阅功能。2. 不支持主从复制。主流消息中间件及选型推荐:ActiveMQ:(还有升级版叫 Apollo,由于转向Scala,原来的架构都要改掉。但是只支持storm 协议,不支持 JMS),在网络上别人反映,消息量越来越大时,当出现消息堆积时,性能争 骤下降,主要卡在磁盘写入,用了硬件加速,也还是不能忍受。消息中间件的技术选型心得 RabbitMQ、ActiveMQ 和ZeroMQ作者:chszs,转载需注明。博客主页:

13、RabbitMQ、ActiveMQ 和ZeroMQ 都是极好的消息中间件,但是我们在项目中该选择哪个 更适合呢?很多开发者面临这个烦恼。下面我会对这三个消息中间件做一个比较,看了后你们就心中有数了。RabbitMQ是AMQP协议领先的一个实现,它实现了代理(Broker)架构,意味着消息在发送到客户端之前可以在中央节点上排队。此特性使得RabbitMQ易于使用和部署,适宜于很多场景如路由、负载均衡或消息持久化等,用消息队列只需几行代码即可搞定。但是,这使得 它的可扩展性差,速度较慢,因为中央节点增加了延迟,消息封装后也比较大。IbRabbitMi|HQZeroMQ 是一个非常轻量级的消息系统,

14、专门为高吞吐量/低延迟的场景开发,在金融界的应用中经常可以发现它。与RabbitMQ相比,ZeroMQ支持许多高级消息场景,但是你必须实现ZeroMQ框架中的各个块(比如 Socket或Device等)。ZeroMQ非常灵活,但是你必 须学习它的80页的手册(如果你要写一个分布式系统,一定要阅读它)。0MQActiveMQ 居于两者之间,类似于 ZemoMQ,它可以部署于代理模式和 P2P模式。类似于 RabbitMQ ,它易于实现高级场景,而且只需付出低消耗。它被誉为消息中间件的 瑞士军刀ActjypMQ要注意一点,ActiveMQ的下一代产品为 Apollo。ApolloActiveMQ&

15、#39;5 next generation of messaging最终,这三个产品:1. 都有客户端API且支持多种编程语言;2. 都有大量的文档;3. 都提供了积极的支持。ActiveMQRabbitMQZeroMQ遵循规范JMS1.1 及 j2ee1.4AMPQ架构模型消息代理架构 Broker消息代理架构Brokerc/s架构实现语言JavaErla ngC/C+支持消息协议StompAMPQ、Stomp 等主要推动力量Apache 、 RedhatLshift、 Vmware、Sprin gSourceIMatix支持编程语言C、Java、PythonC、Java、PythonC、J

16、ava、Python编程复杂度复杂简单中等持久化支持支持,不支持第三方 数据库发送端缓存性能一般一般高三个经典消息中间件的比较对于消息中间件,绝大多数熟悉的是MQ(IBM公司出品),这是目前使用最广泛的中间件产品。还有两个也比较流行,他们是JMS和RV°JMS即JAVA消息服务(Java Message Service) 应用程序接口是一个 JAVA平台中关于面向消息中间件的 API,用于在两个应用程序之间, 或分布式系统中发送消息,进行异步通信,是一个与具体平台无关的API。TIBCO Rendezvous(或称为 TIBCO RV 也是一种中间件,具有发布/订阅(Publish/

17、Subscribe)、基于主题寻址(Subject-Based Addressing)和自定义数据信息 (Self-Describing Data Messages)等专利技术功能, 使不同应用平台上的信息在一个共享的虚拟总线In formation Bus(TIB)上进行传输交换。先总结一下消息中间件的功能,以上的三类中间件都实现了这些功能。侧实现消息的异步发送接收,发布订阅,使得两端的应用解耦 (减少或解除应用程序之间的耦合度)。H实现消息持久化机制,保证消息可靠性传输。优化网络传输,支持断点续传。1、区别之是否分布式RV和MQ都是分布式结构的,和JMS消息中间件的星型结构不同。分布式消息

18、中间件的Sever在应用环境里都会部署多个,彼此互联,没有主备之分。JMS消息中间件的应用部署一般都是主备两个Server,消息的发送和接收应用平时和主Server相连,有问题时切换到备Server,主备Server共用公共的存储设备来保存消息。2、区别之是否接收端主动MQ和JMS消息中间件都采用消息接收端主动接收消息的方式。消息从发送端发出后, 首先会缓存到Server上,接收端应用发起一个接收消息的请求,Server把消息作为应答返回给接收端。接收端不执行接收动作,消息就会一直在Server上保存。RV和这两种消息中间件都不同,使用的是发送端主动的消息推送模式。消息从发送端发出后,并不在S

19、erver上缓存,Server只做路由把消息推送给消息接收端。消息接收端只要连接上Server,订阅要接收的消息,这些消息就会源源不断地从Server那里推送过来,消息先缓存到接收客户端的队列里,接收端应用再从队列里取消息。RV的最大特点就是把一个数据生产者的数据以最快的速度推送到多个数据消费者那里。RV从金融市场数据系统的需求中产生而来,正是这些特点使得它在证券系统得到最广泛的应用。3、区别之是否便于一对多分布MQ和JMS消息中间件在IP层都使用点对点的一对一传输方式,而RV在IP层使用的是广播或者组播的一对多方式。使用广播或者组播可以直接实现一对多的发布订阅形式, 发布应用(发送端)发布消

20、息到RV网络上,这些消息会广播到网络的每一个节点上,每一个订阅应用(接收端)都会收到这些消息。而MQ和JMS实现一对多发布订阅就要麻烦的很多,都是在 Server按消息的Topic (主题)来缓存消息,为每一个订阅者拷贝每一条消息 的引用。当所有订阅者都从Server上取走某条消息,这条消息才可在Server上删除。4、区别之是否在传输层使用TCPMQ和JMS消息中间件不论是 Server和Server的通信,还是 Server和Client的通信, 在传输层都使用 TCP协议,保证消息传输连接的可靠性。而RV在Server和Server之间的通信使用了 UDP协议,牺牲可靠性来达到高实时性的

21、需求。RV有两种可靠性级别,RV Reliable和RVCM。RV Reliable模式使用基于 UDP增加了一定可靠机制的TRDP协议,在一定范围内具有消息包的检查和重传机制,保证了一定程度的消息可靠性,但不保证消息不丢失。RVCM在RV Reliable基础上更进一步,在消息级别具有消息确认和重传机制,可以保证消 息绝对不丢失。对于长度在1500个字节以下的消息, RV Reliable发布消息能达到150万笔消息每秒,接收也能达到50万笔消息每秒,传输消息的性能非常好。5、区别之是否用 Subject做收发端的匹配RV使用消息的Subject来做消息发送端和接收端的匹配。RV不在Serv

22、er端缓存消息,也没有 Server端的Queue和Topia 每个消息都有 Subject, Subject格式是多个字符串的串接,没有数目或者长度的限制。比如在市场数据系统里,行情数据消息的Subject里包含金融品种的名字,这样的Subject可以有上百万个。消息订阅端可以细到只接收某个市场的某 个品种的行情数据,所以RV能使用细粒度的消息分类。MQ和JMS消息中间件在Server端按Queue和Topic来缓存消息,消息的发送端和接收端按Queue和Topic的名字来匹配。每个Server能创建的Queue和Topic是有限的,这也就限制了使用MQ和JMS消息中间件构建的应用,这些应用

23、在做消息收发处理的时候只能使用粗粒度的消息分类。6、区别之中间件结构IV1< 仇好m JMS结构不意图*. 如朋 JL7、区别之典型应用场景实例MQ已知的典型应用场景是商业银行向人民银行报送监管信息;JMS已知的典型应用场景是异步发送邮件;RV已知的典型应用场景是金融市场数据提供商(如路透、彭博、道琼斯)向银行、大 型企业提供证券、外汇等金融市场信息。淘宝开源框架Metaq:淘宝吸收其它消息中间件(Apache Kafka )而自己开发的一款消息中间件原名 (Metamorphosis )MetaQ作为一个分布式的消息中间件,需要依赖zookeeper,对于一些规模不大、单机应用的场景,

24、我个人并不是特别支持尝试用MetaQ,因为多一个依赖系统,其实就是多一份风险,在这些简单场景下,可能类似memcacheq、kestrel甚至redis等轻量级MQ就非常合适。而MetaQ 开始就是为大规模分布式系统设计的,如果不当使用,可能没有带来好 处,反而多出一堆问题。开发者需要根据自己面对的场景,团队的技术能力,做出一个合适 的选择。MetaQ初探博客分类:MetaQ (全称Metamorphosis )是一个高性能、高可用、可扩展的分布式消息中间件,MetaQ具有消息存储顺序写、吞吐量大和支持本地和XA事务等特性,适用于大吞吐量、顺序消息、广播和日志数据传输等场景,METAQ在阿里巴

25、巴各个子公司被广泛应用,每天转发250亿+条消息。主要应用于异步解耦,Mysql数据复制,收集日志等场景。总体结构ProducerProducerProducerBrokerSlaver主要特点* 生产者、服务器和消费者都可分布式* 消息存储顺序写* 性能极高,吞吐量大* 支持消息顺序zero-copy,批量拉数据* 支持本地和XA事务* 客户端pull,随机读,利用sendfile系统调用,* 支持消费端事务* 支持消息广播模式* 支持异步发送消息* 支持http协议* 支持消息重试和 recover数据迁移、扩容对用户透明«消费状态保存在客户端支持同步和异步复制两种HA主要特性数

26、据完整性消息生产者发送的消息,meta服务器收到后在做必要的校验和检查之后的第一件事就是写入磁盘,写入成 功之后返回应答给生产者,生产者发送消息返回SendResult,如果isSuccess返回为true,则表示消息已经确认发送到服务器并被服务器接收存储。整个发送过程是一个同步的过程。保证消息送达服务器并返回结果。 因此,可以确认每条发送结果为成功的消息服务器都是写入磁盘的。写入磁盘,不意味着数据落到磁盘设备上,毕竟我们还隔着一层os, os对写有缓冲。Meta有两个特性来保证数据落到磁盘上:每1000条(可配置),即强制调用一次force来写入磁盘设备。每隔10秒(可配置), 强制调用一次

27、force来写入磁盘设备。因此,Meta通过配置可保证在异常情况下(如磁盘掉电)10秒内最多丢失1000条消息。当然通过参数调整你甚至可以在掉电情况下不丢失任何消息。虽然消息在发送到 broker之后立即写入磁盘才返回客户端告诉消息生产者消息发送成功,通过unflushThreshold 和unflushlnterval两个参数的控制,可以保证单机消息数据的安全性,只要机器的磁盘没有永久损坏,消息总可以在重启后恢复并正常投递给消费者们。但是,如果遇到了磁盘永久损坏或者数 据文件永久损坏的情况,那么该 broker上的消息数据将可能永久丢失。为了防止这种情况的发生,一个可 行的方案就是将消息数据

28、复制到多台机器,类似mysql的主从复制功能(异步复制和同步功能)数据可靠性服务器通常组织为一个集群,一条从生产者过来的消息可能按照路由规则存储到集群中的某台机器。Meta已经实现高可用的 HA方案,类似mysql的同步和异步复制,将一台 meta服务器的数据完整复制到另一台 slave服务器,并且slave服务器还提供消费功能(同步复制不提供消费)。消息的消费者是一条接着一条地消费消息,只有在成功消费一条消息后才会接着消费下一条。如果在消费某条消息失败(如异常),则会尝试重试消费这条消息(默认最大5次),超过最大次数后仍然无法消费,则将消息存储在消费者的本地磁盘,由后台线程继续做重试。而主线

29、程继续往后走,消费后续的消息。因此,只有在MessageListener确认成功消费一条消息后,meta的消费者才会继续消费另一条消息。由此来保证消息的可靠消费。消费者的另一个可靠性的关键点是offset的存储,也就是拉取数据的偏移量。我们目前提供了以下几种存储方案zookeeper,默认存储在zoopkeeper 上, zookeeper通过集群来保证数据的安全性。mysql,可以连接到您使用的mysql数据库,只要建立一张特定的表来存储。完全由数据库来保证数据的可靠性。file,文件存储,将offset信息存储在消费者的本地文件中。Offset会定期保存,并且在每次重新负载均衡前都会强制保

30、存一次下载、配置、运行首先需要安装配置 Zookeeper,如果不知道怎么配置的,看我的文章,有一章讲解过!https:/code.google.eom/p/meta-queue/downloads/list选择最新版本的服务器并下载到本地解压缩文件,bin目录存放的脚本文件,日志在 logs目录,而配置文件主要是 conf目录下server.ini, lib存放所有的依赖jar 包。进入 bin/env.sh ,修改 JAVA_HOME,JMX等变量。根据需要修改conf/server.ini文件(列出了所有的配置):zk.zkEnable=true是否注册到 zk,默认为truezk.zk

31、Connect=localhost:2181 zk 的服务器列表zk.zkSessionTimeoutMs=30000 zk 心跳超时,单位毫秒,默认30 秒 zk.zkSessionTimeoutMs=30000zk.zkConnectionTimeoutMs=30000 zk连接超时时间,单位毫秒,默认30秒zk.zkSyncTimeMs=5000 zk数据同步时间,单位毫秒,默认5秒brokerId :服务器ID(必须是集群内唯一)serverPort :服务器端口hostName :默认将取本机IP侈机网卡,需要指明)dataLogPath:日志数据文件路径,默认跟 dataPath

32、一样dataPath:于指定默认的数据存储路径(慎重设置,默认在 user.home/meta下)numPartitions:默认topic的分区数目(慎重设置)maxSegmentSize:单个文件的最大大小,实际会超过此值,默认1GmaxTransferSize传输给客户端每次最大的缓冲区大小,默认1MunflushThreshold :最大允许的未flush间隔时间,毫秒,默认10秒putProcessThreadCount:;处理 put 请求线程数,默认 cpus*10 deletePolicy=delete,168(数据删除策略,默认超过7天即删除,这里的168是小时,10s表示1

33、0秒,10m表示 10分钟,10h表示10小时,默认为小时)deleteWhen:何时执行删除策略的cron表达式,默认是0 0 6,18 * * ?,也就是每天的早晚 6点执行处理策略。deleteWhen:删除策略的执行时间,cron表达式maxCheckpoints:最大保存事务 checkpoint数目,默认为 3checkpointinterval:事务checkpoint时间间隔,单位毫秒,默认1小时(3600000)maxTxTimeoutTimerCapacity=30000最大事务超时事件数,用于监控事务超时 maxTxTimeoutinSeconds=60最大事务超时时间,

34、单位秒flushTxLogAtCommit=1事务日志的同步设置,0表示让操作系统决定,1表示每次commit都同步,2表示每 隔1秒同步一次,此参数严重影响事务性能,可根据你需要的性能和可靠性之间权衡做岀一个合理的选择。通常建议设置为 2,表示每隔1秒刷盘一次,也就是最多丢失一秒内的运行时事务。这样的可靠级别对大多数服务是足够的。最安全的当然是设置为1,但是将严重影响事务性能。而0的安全级别最低。安全级别上1>=2>0,而性能则是 0 >= 2 > 1。diamondZKDatald=metamorphosis.zkConfig zk 在 diamond 中配置存储的

35、 dataid diamondZKGroup=DEFAULT_GROUPzk 在 diamond 中配置存储的 groupacceptPublish:是否接收消息,默认为 true ;如果为false,则不会注册发送信息到zookeeper上,客户端当然无法发送消息到该broker。本参数可以被后续的topic配置覆盖。acceptSubscribe:与acceptPublish类似,默认也为true ;如果为false,则不会注册消费信息到 zookeeper 上, 消费者无法发现该 broker,当然无法从该broker消费消息。本参数可以被后续的topic配置覆盖。unflushThre

36、shold:每隔多少条消息做一次磁盘 sync,强制将更改的数据刷入磁盘。默认为1000。也就是说在掉电情况下,最多允许丢失 1000条消息。可设置为0,强制每次写入都sync。在设置为0的情况下,服 务器会自动启用group commit技术,将多个消息合并成一次 sync来提升IO性能。经过测试,group commit 情况下消息发送者的 TPS没有受到太大影响,但是服务端的负载会上升很多。unflushinterval:间隔多少毫秒定期做一次磁盘sync,默认是10秒。也就是说在服务器掉电情况下,最多丢失10秒内发送过来的消息。不可设置为小于或者等于JAVA客户端代码Java 代码-1

37、. package com.metaq.product;2.2. import java.io.BufferedReader;3. import java.i o.ln putStreamReader;5.4. import com.taobao.metamorphosis.Message;5. import com.taobao.metamorphosis.clie nt.MessageSessi onF actory;6. import com.taobao.metamorphosis.clie nt.MetaClie ntCon fig;7. import com.taobao.meta

38、morphosis.clie nt.MetaMessageSessi onF actory;8. import com.taobao.metamorphosis.client.producer.MessageProducer;9. import com.taobao.metamorphosis.client.producer.SendResult;10. import com.taobao.metamorphosis.utils.ZkUtils.ZKC on fig;11. public class Products 12. public static void main( Stri ng a

39、rgs) throws Excepti on 13. final MetaClie ntCo nfig metaClie ntCo nfig = new MetaClie ntCo nfig();14. final ZKC onfig zkConfig = new ZKCo nfig();15. zkCo nfig.zkCo nn ect = "192.168.2.11:2181"16. metaClie ntCo nfig.setZkCo nfig(zkCo nfig);17. /由这个工厂创建生产者或者消费者18. 1.服务的查找和发现,通过diamond和zookee

40、per帮你查找日常的 meta服务器地址列表19. 2.连接的创建和销毁,自动创建和销毁到meta服务器的连接,并做连接复用,也就是到同一台meta的服务器在一个工厂内只维持一个连接。20. 3.消息消费者的消息存储和恢复,后续我们会谈到这一点。21. 4.协调和管理各种资源,包括创建的生产者和消费者的。22. MessageSessi onF actory sessi onF actory = new MetaMessageSessi onF actory(metaC lie ntCon fig);23. 消息生产者的接口,MessageProducer是线程安全的,MessageProdu

41、cer创建的代价昂贵,每次都需要通过zk24. 查找服务器并创建tcp长连接,通过它来发送消息,每个消息对象都是 Message类的实例,Message表示一个消息对象,它包含这么几个属性:25. /id: Long型的消息id,消息的唯一 id,系统自动产生,用户无法设置,在发送成功后由服务器返回,发送失败则为0。26. /topic:消息的主题,订阅者订阅该主题即可接收发送到该主题下的消息,生 产者通过指定发布的topic查找到需要连接的服务器地址,必须。27. /data:消息的有效载荷,二进制数据,也就是消息内容, meta永远不会修改 消息内容,你发送出去是什么样子,接收到就是什么样

42、子。28. 消息内容通常限制在 1M以内,我的建议是最好不要发送超过上百 K的消息, 必须。数据是否压缩也完全取决于用户。31.过滤。/attribute:消息属性,一个字符串,可选。发送者可设置消息属性来让消费者32.MessageProducer producer = sessi onF actory.createProducer();33.final String topic = "test"34.producer.publish(topic);35.);BufferedReader reader = new BufferedReader( new In putStr

43、eamReader(System.i n)36.String line = "qiujinyong"37.while (li ne = reader.readL in e() != n ull) 38./ send message39.Sen dResult sen dResult = producer.se ndMessage( new Message(topic, li ne.getBytes();40./ check result41.if (!se ndResult.isSuccess() 42.System.err.pri ntl n("Se nd me

44、ssage failed,error message:" + sen dResult.getErrorMessage();43.44.else 45.System.out.println("Send message successfully,sent to " + sendResult.getPartitio n();46.47.48.49.消费者:Java 代码1. package com.metaq.c on sum;2.2. import java.util.con curre nt.Executor;4.3. import com.taobao.metam

45、orphosis.Message;4. import com.taobao.metamorphosis.clie nt.MessageSessi onF actory;5. import com.taobao.metamorphosis.clie nt.MetaClie ntCon fig;6. import com.taobao.metamorphosis.clie nt.MetaMessageSessi onF actory;7. import com.taobao.metamorphosis.clie nt.c on sumer.C on sumerC on fig;8. import

46、com.taobao.metamorphosis.client.consumer.MessageConsumer;9. import com.taobao.metamorphosis.client.consumer.MessageListener;10. import com.taobao.metamorphosis.utils.ZkUtils.ZKC on fig;13.11. public class AsyncConsum 12. public static void main( Stri ng args) throws Exceptio n 13. final MetaClie ntC

47、o nfig metaClie ntCo nfig = new MetaClie ntCo nfig();14. final ZKConfig zkConfig = new ZKConfig();15. zkConfig.zkConnect = "192.168.2.11:2181"16. metaClientConfig.setZkConfig(zkConfig);17. MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig);18. final Stri

48、ng topic = "test"19. final String group = "meta-example"20. / 每个消息者都必须有一个ConsumerConfig 配置对象 ,我们这里设置了 group 属性,这是消费者的分组名称21. /Meta 的 Producer 、Consumer 和 Broker 都可以为集群。 消费者可以组成一个集 群共同消费同一个 topic ,22. / 发往这个 topic 的消息将按照一定的负载均衡规则发送给集群里的一台机器。同一个消费者集群必须拥有同23. / 一个分组名称,也就是同一个group。我们

49、这里将分组名称设置为meta-example24. MessageConsumer consumer = sessionFactory.createConsumer(new ConsumerConfi g(group);25. /topic ,订阅的主题26. /maxSize,因为meta是一个消费者主动拉取的模型,这个参数规定每次拉取的最大数据量,单位为字节,这里设置为 1M ,默认最大为 1M。27. /MessageListener ,消息监听器,负责消息消息。28. consumer.subscribe(topic, 1024 * 1024, new MessageListener(

50、) 29. public void recieveMessages(Message message) 30. System.out.println("Receive message " + new String(message.getData();31. 32. /消息的消费过程可以是一个并发处理的过程, getExecutor 返回你想设置的线 程池,每次消费都会33. /在这个线程池里进行。recieveMessage 方法用于实际的消息消费处理,message 参数即为消费者收到的消息,它必不为 null。34. /我们这里简单地打印收到的消息内容就完成消费。如果在消

51、费过程中抛出任何异常,该条消息将会35. /在一定间隔后重新尝试提交给MessageListener 消费。在多次消费失败的情况下,该消息将会存储到消费者应用的本次磁盘,36. /并在后台自动恢复重试消费37. public Executor getExecutor() 38. return null;39. 40. );41. / 在调用 subscribe 之后, 我们还调用了 completeSubscribe 方法来完成订阅过 程。请注意,42. /subscribe 仅是将订阅信息保存在本地,并没有实际跟 meta 服务器交互,要 使得订阅关系生效必须调用43. /一次 comple

52、teSubscribe , completeSubscribe 仅能被调用一次, 多次调用将抛 出异常。 为什么需44. 要 completeSubscribe 方法呢,原因有二:45. 首先,subscribe方法可以被调用多次,也就是一个消费者可以消费多种topic46. 其次,如果每次调用subscribe都跟zk和meta服务器交互一次,代价太高47. 因此completeSubscribe 一次性将所有订阅的topic生效,并处理跟zk和meta服务器交互的所有过程。48. /同样,MessageConsumer也是线程安全的,创建的代价不低,因此也应该尽 量复用49. con pl

53、eteSubscribe();50. 54.51. 启动:bin/metaServer.sh start(更多的命令直接 help 一下。)观察:日志方式(logs/metaServer.log),命令方式(bin/metaServer.sh stats),telnet 方式(telnet IP 端口 stats)启动服务器后执行客户端的生产者和消费者的代码,运行结果如下(生产者随便在控制台输入消息后,进行回车,生产者消息如果成功发送到broker,会返回一条发送成功的消息,同时消费者会接收到该消息):生产者曰 Ccnscle £3 屯 ” asksstcrmiratcd >

54、Products A.ppiicctioH| CiProflram FifesVkl"3皿作"20妇429 112:3140)1j| = WLRR No appendera ssuld be found.L匚召gz* (TamSc酋®口寸皀uk®宜乜砂1 og z : KAPLiJ Please inxt;ia.Lxze t壬详- gys七mr prcperlyfI A r 启冃 Arnessagr successiEu已亡nt; hD O_0xieMadqeaCullypta 0-2|消费者Q 匚 onsole 23 Tasks-酹心品皿怙肛 Myn工

55、onaun Jcvc ApplicatienJjCProgram H 1匕勺/曲甘口1<1石°-21卩品、曲w启ax (2013 V 29 上罕lu2;3W9) lcg4jNo sppeTkdes'JS could b* feunri f-ar Logger (=emB taobao B jseie jse站丄巴 m芷s丄亠吕凰耳巷Rm亡log4j iWURIJ Ele&sethe lsg4j ayatfero properlyReceive n-essage 2222222集群启动metaQ后,它将启动一个内置的 zookeeper,并将broker注册到该z

56、ookeeper。但MetaQ应该是作为一 个分布式集群提供服务。MetaQ的集群管理是利用zookeeper实现的,使用zookeeper发布和订阅服务,并默认使用zookeeper存储消费者offset,你需要首先安装一个 zookeeper到某台机器上,或者使用某个现有 的zk集群,然后配置 zookeeper( zk配置参见我blogzookeeper初探)负载均衡每个broker都可以配置一个topic可以有多少个分区,但是在生产者看来,一个topic在所有broker上的的所有分区组成一个分区列表来使用。在创建producer的时候,客户端会从zookeeper上获取publish

57、的topic对应的broker和分区列表,生产者在发送消息的时候必须选择一台broker上的一个分区来发送消息,默认的策略是一个轮询的路由规则如果你想实现自己的负载均衡策略,可以实现上文提到过的Partitionselector接口,并在创建producer的时候传入即可对于消费者而言,合理地设置分区数目至关重要。如果分区数目 太小,则有部分消费者可能闲置,如果分区数目太大,则对服务器的性能有影响。在某个消费者故障或者 重启等情况下,其他消费者会感知到这一变化(通过 zookeeper watch消费者列表),然后重新进行负载均 衡,保证所有的分区都有消费者进行消费。拷贝brokerl的配置文

58、件conf/server.ini至噺的broker,假设为broker2。修改broker2的server.ini,只要修改brokerId为另一个不同于 broker1的值即可,启动 broker2, 在 这个过程中你不需要重启任何现有的服务,包括生产者、消费者和broker1,他们都将自动感知到新的broker2主从复制先配置负载均衡后(和上面配置一样),然后再配置从机的另一个文件(conf/async_slave.properties)#slave编号,大于等于0表示作为slave启动,同一个master下的slave编号应该设不同值. slaveld=0#作为slave启动时向mast

59、er订阅消息的group,如果没配置则默认为meta-slave-group,不同的slaveId请使用不同的 group slaveGroup=meta-slave-group#slave数据同步的最大延时,单位毫秒slaveMaxDelaylnMills=500#是否自动从master同步server.ini, 1.4.2新增选项#第一次仍然需要自己拷贝server.ini,后续可以通过设置此选项为true来自动同步 autoSyncMasterConfig=true这样主从配置完成,其实metaQ环境搭建以及原理还是较为简单的,MetaQ作为一个分布式的消息中间件,主要依赖zookeeper,对于一些规模不大、单机应用的场景,并不是特别支持尝试用MetaQ,因为多一个依赖系统,其实就是多一份风险,在这些简单场景下,可能类似activeMQ、redis等轻量级MQ就非常合适。而MetaQ 开始就是为大规模分布式系统设计的,如果不当使用,可能没有带来好处,反而多出一堆 问题。开发者需要根据自己面对的场景,团队的技术能力,做岀一个合适的选择。

展开阅读全文
温馨提示:
1: 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
2: 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
3.本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
5. 装配图网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
关于我们 - 网站声明 - 网站地图 - 资源地图 - 友情链接 - 网站客服 - 联系我们

copyright@ 2023-2025  zhuangpeitu.com 装配图网版权所有   联系电话:18123376007

备案号:ICP2024067431-1 川公网安备51140202000466号


本站为文档C2C交易模式,即用户上传的文档直接被用户下载,本站只是中间服务平台,本站所有文档下载所得的收益归上传人(含作者)所有。装配图网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。若文档所含内容侵犯了您的版权或隐私,请立即通知装配图网,我们立即给予删除!