本文源于朱忠华的《RabbitMQ实战指南》
RabbitMQ简介
消息队列中间件有两种传递模式:点对点 和 发布/订阅
点对点依靠队列的原理;发布/订阅则可以用于一对多的广播消息中间件的作用:解耦、冗余(存储)、扩展性、削峰、可恢复性、顺序保证、缓冲、异步通信
RabbitMQ的具体特点:
可靠性,持久化、传输确认、发布确认 灵活的路由,多个交换器可以绑定 扩展性,可以组成集群 高可用性,队列镜像 多种协议,AMPQ,STOMP,MQTT 多语言客户端,支持所有常用语言 管理界面 插件机制编译安装Erlang
下载页面:http://www.erlang.org/downloads
下载地址:http://erlang.org/download/otp_src_20.3.tar.gz [84M] 安装Erlang到/opt/erlang yum install gcc perl ncurses-devel wget http://erlang.org/download/otp_src_20.3.tar.gz tar zxf otp_src_20.3.tar.gz cd otp_src_20.3 ./configure --prefix=/opt/erlang ##APPLICATIONS DISABLED: crypto : No usable OpenSSL found jinterface : No Java compiler found odbc : ODBC library - link check failed orber : No C++ compiler found ssh : No usable OpenSSL found ssl : No usable OpenSSL found ##APPLICATIONS INFORMATION: wx : wxWidgets not found, wx will NOT be usable ##DOCUMENTATION INFORMATION: documentation : xsltproc is missing. fop is missing. The documentation can not be built 可以看到,还缺少一些包,须要补齐 yum install openssl-devel gcc-c++ unixODBC-devel ./configure --prefix=/opt/erlang --without-javac make make install添加环境变量 vi /etc/profile 在末尾添加 ERLANG_HOME=/opt/erlang export PATH=$PATH:$ERLANG_HOME/bin export ERLANG_HOME 让配置生效 source /etc/profile验证安装 erl安装RabbitMQ
直接将下载的安装包解压到相应的目录下即可
下载页面:http://www.rabbitmq.com/releases/rabbitmq-server/ wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.15/rabbitmq-server-generic-unix-3.6.15.tar.xz [4.6M] tar xJf rabbitmq-server-generic-unix-3.6.15.tar.xz -C /opt cd /opt/ mv rabbitmq_server-3.6.15/ rabbitmq添加环境变量 添加环境变量 vi /etc/profile 在末尾添加 export PATH=$PATH:/opt/rabbitmq/sbin export RABBITMQ_HOME=/opt/rabbitmq 让配置生效 source /etc/profile运行RabbitMQ
rabbitmq-server -detached #detached以守护进程方式在后台运行
rabbitmqctl status #查看服务状态 rabbitmqctl cluster_status #查看集群信息用户和权限
默认用户用户名及密码均为guest 该帐户只能通过本地网络访问,远程网络访问受限
#添加新用户root:root123 rabbitmqctl add_user root root123 #为root用户配置所有权限 rabbitmqctl set_permissions -p / root ".*" ".*" ".*" #提升root为管理员 rabbitmqctl set_user_tags root administrator消息发送:客户端与MQ服务器建立一个连接connection->在连接上创建一个信道channel->创建一个交换器exchange和一个队列queue,并通过路由键进行绑定->发送消息->关闭资源
RabbitMQ入门
生产者/消费者模型,类似于交换机
发布到MQ的消息包含两部分:消息体(payload)和标签(label),消息体是实际负载,而标签是MQ处理该消息的依据,如一个交换器的名称和一个路由键
broker:消息中间件的服务节点,可以看成是一个MQ服务器
多个消费者可以订阅同一个队列,这时消息会平均分摊,轮询给多个消费者,这样一个消费者不会得到全部消息。RabbitMQ不支持队列层面的广播消费
Exchange交换器,共有四种类型,不同的类型对应不同的路由策略
RoutingKey路由键,生产者将消息发给交换器时,会指定RoutingKey,用来指定该消息的路由规则,这个RoutingKeyf需要与交换器类型和绑定键BindingKey联合使用才能最终生效
在交换器类型和绑定键固定的情况下,生产者在发送消息时,通过指定RoutingKey来决定消息流向哪里
Binding绑定,通过绑定将交换器与队列关联起来,在绑定时一般会指定绑定键BindingKey,这样,MQ就可以正确将消息路由到队列
生产者将消息发送给交换器时,需要一个RoutingKey,当BindingKey和RoutingKey相匹配时,消息会被路由到对应的队列中。绑定多个队列到同一个交换器时,允许使用相同的BindingKey。BindingKey并不是在所有情况都生效,它依赖于交换器类型,fanout类型的交换器会无视BindingKey,fanout会将消息发送到所有与自己绑定的队列中。
交换器相当于投递包裹的邮箱,RoutingKey相当于填写在包裹上的地址,BindingKey相当于包裹的目的地,当RoutingKey与BindingKey相匹配时,消息就会正确送达
大多数情况下,习惯性地将BindingKey写成RoutingKey,两者合称为路由键,特别是在使用direct类型的交换机时
RabbitMQ常用的交换器类型有fanout、direct、topic、header这四种
fanout 广播
发送到该交换器的消息将路由到所有与该交换器绑定的队列中direct 直投 将消息路由到BindingKey和RoutingKey完全匹配的队列中headers 比对headers属性 不依赖路由键的匹配来路由消息,而是根据发送的消息内容中的headers属性(键值对)进行匹配,在绑定队列和交换器时也指定一组键值对,两者匹配时,即路由 headers类型性能会很差,也不实用,很少用到topic 模式匹配 将消息路由到BindingKey和RoutingKey相匹配的队列中,可以使用模式匹配规则 匹配规则: 1、RoutingKey和BindingKey均为一个点号.分隔的字符,分隔开的叫单词 2、BindingKey可以有*#用于模糊匹配,*匹配一个单词,#匹配0个或多个单词 3、没有匹配上的将丢弃,或返回发送者(需设置mandatory参数)生产者发送消息流程:
1、生产者连接到RabbitMQ Broker,建立一个连接connection,开启一个信道channel
2、生产者声明一个交换器,并设置相关属性,如交换机类型、是否持久化等 3、生产者声明一个队列并设置相关属性,如是否排他、是否持久化、是否自动删除等 4、生产者通过路由键将交换器和队列绑定起来 5、生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息 6、相应的交换器根据接收到的路由键查找相匹配的队列 7、如果找到,将消息存入相应的队列中 8、如果没有找到,根据生产者配置的属性选择丢弃或者退回给生产者 9、关闭信道 10、关闭连接消费者接收消息的过程
1、消费者连接到RabbitMQ Broker,建立一个连接connection,开启一个信道channel
2、消费者向RabbitMQ Broker请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作 3、等待RabbitMQ Broker回应并投递相应队列中的消息,消费者接收消息 4、消费者确认ack接收到的消息 5、RabbitMQ从队列中删除相应已经被确认的消息 6、关闭信道 7、关闭连接客户端与MQ Broker建立的connection是一条tcp连接,amqp信道channel是connection上的虚拟连接。引入channel主要是为了复用tcp连接。当每个channel流量不是很大时,复用connection可以有效节省tcp连接资源;当每个channel流量很大时,复用connection会产生性能瓶颈,此时需要开辟多个connection
消费者接收到消息并正确消费后,会向Broker发送确认,即Basic.Ack命令
客户端开发向导
amqp://userName:password@ipAddress:portNumber/virtualHost
channel.exchangeDeclare(exchangeName,"direct",true);
#声明一个名为exchangeName,类型为direct,持久化非自动删除的交换器 channel.queueDeclare(queueName,true,false,false,null); #声明一个名为queueName,持久化,非排他、非自动删除的队列 channel.queueBind(queueName,exchangeName,routingKey) #将队列与交换器使用routingKey绑定声名交换器exchangeDeclare的参数说明:
exchange,交换器名称 type,交换器类型,如fanout,direct,topic durable,是否持久化,持久化可以将交换器存盘,在服务器重启后不会丢失相关信息 autoDelete,是否自动删除。自动删除的前提是至少有一个队列或交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或交换器都与此解绑,并不是与此连接的客户端都断开 internal,是否是内置的,内置的交换器客户端无法直接发送消息到这个交换器中,只能通过交换器路由到交换器的方式 argument,其他一些结构化参数交换器删除
exchangeDelete(交换器名称,isUnused):isUnused用来设置在交换器没有被使用时删除,为false时,强制删除队列queue声明参数说明
不带任何参数时,queueDeclare方法默认创建一个由RabbitMQ命名的队列,属性为排他、自动删除、非持久化 带参数时: queue,队列名称 durable,是否持久化。持久化的队列会存盘,在服务器重启时可以保证不丢失相关信息 exclusive,是否排他,如果一个队列声明为排他,该队列仅对首声声明它的连接可见,并在连接断开后自动删除 #排他队列基于连接可见,同一个连接的不同信道可以同时访问同一连接创建的排他队列;如果一个连接已声明一个排他队列,其他连接不允许建立同名排他队列;即使该队列是持久化的,一旦连接关闭或客户端退出,该排他队列都会被自动删除;该队列适用于同一个客户端同时发送和读取消息的场景 autoDelete,是否自动删除。自动删除前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才自动删除 arguments,其他参数,如x-message-ttl,x-expires,x-max-length,x-max-length-bytes,x-dead-letter-exchange,x-dead-letter-routing-key,x-max-priority等队列删除
queueDelete(queue,ifUnused,ifEmpty)清空队列
queuePurge队列与交换器绑定
queueBind(queue,exchange,routingKey) 解绑 queueUnbind(queue,exchange,routingKey)交换器绑定交换器
exchangeBind(destination,source,routingKey) 绑定后,消息从source交换器转发到destination交换器RabbitMQ的消息存储在队列中,交换器的使用并不会耗费服务器性能,但是队列会。衡量MQ的QPS看队列的即可,性能指标:流量、内存、网卡占用
消息的投递模式delivery mode为2时,消息会被持久化(存入磁盘);消息的优化级priority;消息的类型content-type:text/plain;消息还可以带headers头;带过期时间expiration
发送消息的参数:
exchange,交换器名称,若设置为空,则消息会发送至默认交换器中 routingKey,路由键,交换器根据路由键将消息存储到相应队列中 props,消息基本属性集,14个属性:contentType、contentEncodeing、headers、deliveryMode、priority、correlationId、replyTo、expiration、messageId、timestamp、type、userId、appId、clusterId body,消息体 mandatory和immediate消费消息:推模式Push、拉模式Pull
推模式中,可以通过持续订阅方式消费消息。不同的订阅采用不同的消费者标签consumerTag来区分,同一个channel中的消费者也需要通过消费者标签作区分 basicConsume参数: queue,队列名称 qutoAck,是否自动确认,建议为false,不自动确认 consumerTag,消费者标签,区分多个消费者 noLocal,为true时表示不能将同一个connection中生产者发送的消息传送给这个connection中的消费者 exclusive,是否排他 arguments,其他参数 callback,消费者的回调函数拉模式中,参数仅需要queue和autoAck两个参数 如果只想从队列获得单条消息而不是持续订阅,建议使用拉模式进行消费,但不能将拉模式放入一个循环里替代推模式,因为这样做会严重影响RabbitMQ的性能为保证消息从队列可靠达到消费者,MQ提供了消息确认机制。消费者在订阅队列时,可以指定autoAck参数,当autoAck为false时,MQ会等待消费者显式回复确认信号后才从内存中移去消息。当autoAck设置为true时,MQ会自动把发送出去的消息置为确认,然后从内存中删除,而不管消费者是否真的消费到了这些消息
采用消息确认机制后,只要设置了autoAck为false,消费者就有足够时间处理消息,不用担心处理过程中消费者进程挂掉后消息的丢失,因为MQ会一直等待直到消费者显式调用Basic.Ack命令为止
查看消息状态命令:rabbitmqctl list_queues name messages_ready messages_unacknowledged
消息的拒绝:在消费者接收到消息后,如果想拒绝,可以调用channel.basickReject方法,basicReject(deliveryTag,requeue),deliveryTag可以看作是消息的编号,是64位长整型值;requeue为true时,MQ会将该消息存入队列,为false时,消息会从队列中移除
RabbitMQ进阶
mandatory和immediate以及备份交换器Alternate Exchange
当mandatory为true时,交换器无法根据自身类型和路由键打开符合条件的队列时,会调用Basic.Return将消息返回给生产者;为false时将不会返回给生产者。生产者获取被退回的消息可以使用channel.addReturnListener添加ReturnListener监听器实现
当immediate参数设置为true时,如果交换器将消息路由到队列时,发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。当于路由键匹配的所有队列都没有消费者时,消息会通过Basic.Return返回至生产者
总结:mandatory告诉服务器至少将消息路由到一个队列中,否则将消息返回生产者;immediate告诉服务器,如果该消息关联的队列上有消费者,立即投递,没有消费者,则返回消息给生产者,不用将消息存入队列等待(immediate参数在3.0版本中已去掉,建议采用TTL和DLX的方法替代)
备份交换器Alternate Exchange,简称AE
在设置了madatory参数后,未被路由的消息会存入备份交换器,这样可以简化生产者代码逻辑,不用添加ReturnListener,而是在需要的时候处理这些消息。在声明交换器时添加alternate-exchange参数启用,也可以通过策略policy的方法实现如果备份交换器不存在,客户端和服务端都不会有异常出现,消息丢失
如果备份交换器没有绑定队列,客户端和服务端都不会有异常出现,消息丢失 如果备份交换器没有任何匹配队列,客户端和服务端都不会有异常出现,消息丢失 如果备份交换器和mandatory参数一起使用,那么mandatory参数无效过期时间TTL
设置方法一:通过队列属性设置,队列中所有消息都有相同过期时间 设置方法二:对消息本身单独设置 若两者同时设置,以TTL较小者的值为准。时间超过TTL值时,会变成死信Dead Message通过队列属性设置TTL的方法是,channel。queueDeclare方法中加入x-message-ttl参数实现,单位毫秒
也可以通过Policy的方式设置TTL rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues 还可以通过调用HTTP API接口设置 curl -i -u root:root -H "content-type:application/json" -X PUT -d'{"auto_delete":false,"durable":true,"arguments":{"x-message-ttl":60000}}' http://localhost:15672/api/queues/{vhost}/{queuename}若不设置TTL,则消息永不过期;若设置为0,表示除非可以直接将消息投递到消费者,否则消息立即丢弃
针对单条消息的TTL方法为,在channel.basicPublish中加入expiration属性,单位毫秒
也可以用HTTP API接口 curl -i -u root:root -H "content-type:application/json" -X POST -d '{"properties":{"expiration":"60000"},"routing_key":"routingKey","payload":"my body","payload_encoding":"string"}' http://localhost:15672/api/exchanges/{vhost}/{exchangename}/publish第一种设置方法的消息到期后,会直接从队列中抹去;第二种方法设置的,在投递前进行判断
队列的TTL
队列声明时通过x-expires参数设置,当队列未使用(没有任何消费者、没有被重新声明、过期时间段内未调用过Basic.Get命令)时,会被删除。服务器重启后,持久化的队列过期时间会重新计算,x-expires单位为毫秒,不能设置为0死信队列DLX
DLX,Dead-Letter-Exchange,死信交换器,死信邮箱。当消息在一个队列中变成死信后,能被重新发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就是死信队列 声明队列时通过x-dead-letter-exchange参数为该队列添加DLX 也可以通过Policy设置 rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"dlx_exchange"}' --apply-to queues 消息变成死信的情况:消息被拒,且设置requeue参数为false、消息过期、队列达到最大长度DLX可以用来分析异常,或者配合TTL实现延迟队列的功能
延迟队列
延迟队列存储的是延迟消息,MQ本身没有直接支持延迟队列的功能,但可以通过DLX和TTL模拟优化级队列
优化级高的消息具备优先被消费的特权。通过设置队列的x-max-priority参数实现,配置一个队列的最大优先级,然后在发送消息时,为每条消息指定当前的优先级priorityRPC实现
远程过程调用,客户端发送请求消息,服务端回复响应消息。为了接收响应消息,需要在请求中发送一个回调队列(replyTo) replyTo,设置一个回调队列 correlationId,关联请求request和其调用RPC之后的回复response处理流程:
1、客户端启动时,创建一个匿名的回调队列,名称由MQ自动创建 2、客户端为RPC请求设置2个属性,replyTo用来告知RPC服务端回复请求时的目的队列,即回调队列;correlationId用来标记一个请求 3、请求发送到rpc_queue队列中 4、RPC服务端监听rpc_queue队列中的请求,当请求到来时,处理请求且把带有结果的消息发送给客户端,接收队列就是replyTo设定的回调队列 5、客户端监听回调队列,当有消息时,检查correlationId属性,若与请求匹配,即说明为结果持久化
提高可靠性,在异常(重启、关闭、宕机)时的数据防丢失。MQ持久化包括:交换器持久化、队列持久化、消息持久化交换器持久化通过声明时将durable置为true实现,可以防止重启后交换器元数据的丢失,但消息不会丢失,只是不能再将消息发送到该交换器中
队列持久化通过声明时将durable置为true实现,若不设置持久化,重启后,队列会丢失,队列中的消息也会丢失。队列持久化可以保证本身元数据不丢失,但并不保证内部存储的消息不丢失
消息持久化通过将投递模式BasicProperties中deliveryMode属性设置为2实现
设置了队列和消息持久化,重启后,消息依旧存在
单单只设置队列持久化,重启后,消息会丢失 单单只设置消息持久化,重启后,队列消失,继而消息也丢失 所以,单单设置消息持久化而不设置队列持久化毫无意义消息持久化会严重影响RabbitMQ的性能
持久化还需要autoAck配合,同时,缓存落盘也有风险,为真正确保消息不丢失,可以使用RabbitMQ的镜像队列机制,实际生产环境中关键业务队列一般都会设置镜像队列
生产者确认
事务机制确认;发送方确认publisher confirm事务机制:channel.txSelect用于将当前信道设置成事务模式;channel.txCommit用于提交事务;channel.txRollback用于事务回滚
事务机制能够解决消息发送方和MQ之间消息确认的问题,只有消息成功被MQ接收,事务才能提交成功 采用事务机制会严重降低MQ消息吞吐量发送方确认机制:生产者将信道设置成confirm模式,消息被投递到匹配的队列后,会发送确认给生产者,包含消息的唯一ID;若消息和队列是可持久化的,那么确认消息会在消息写入磁盘后发出,要提高效率,需要使用批量confirm或异步conifrm,否则QPS只有2000左右(低7倍)
事务机制和publisher confirm机制是互斥的;事务机制和publisher confirm机制确认的是消息能够正确的发送至MQ,如果找不到匹配的队列,消息依然会被丢弃
消息分发
队列有多个消费者时,消息将以轮询roud-robin分发方式发送给消费者。channel.basicQos允许限制信道上的消费者所能保持的最大未确认消息数量,当达到设置的值时,MQ将不再发送消息给对应的客户端,直到消费者确认值小于Qos设置的值时 Basic.Qos的使用对拉模式的消费方式无效无特殊需要,最好只使用global为false的设置,默认
消息的顺序性
严格意义上讲,消息不能保证顺序性。若要保证消息顺序性,需要业务方做进一步处理,如在消息体内添加全局有序标识ID来实现目前大多数主流消息中间件都没有消息去重机制,也不保证"恰好一次",去重处理通常是在业务端实现,如引入GUID
提升数据可靠性的途径:设置mandatory参数或者备份交换器(immediate参数已被淘汰);设置publisher confirm机制或者事务机制;设置交换器、队列和消息都为持久化;设置消费端对应的autoAck参数为false,并在消费完消息后再进行消息确认
RabbitMQ管理
多租户与权限
虚拟主机virtual host,简称vhost vhost间是绝对隔离的创建vhost
rabbitmqctl add_vhost {vhost} rabbitmqctl add_vhost dong显示vhost rabbitmqctl list_vhosts [vhostinfoitem...] #可选值为name tracing rabbitmqctl list_vhosts rabbitmqctl list_vhosts name tracing rabbitmqctl trace_on #开启/的trace删除vhost #删除一个vhost同时会删除其下的所有队列、交换器、绑定关系、用户权限、参数和策略 rabbitmqctl delete_vhost vhost1MQ中权限以vhost为单位
授权命令
rabbitmqctl set_permissions [-p vhost] {user} {conf} {write} {read} #conf,write,read分别为匹配用户可配置,可写,可读权限的正则表达式 #可配置:队列和交换器的创建、删除 #可写:发布消息 #可读:与消息有关的操作,读取消息、清空队列等 rabbitmqctl list_users rabbitmqctl set_permissions -p dong root "^queue.*" ".*" ".*"显示用户权限 rabbitmqctl list_permissions [-p vhost] rabbitmqctl list_user_permissions {username} rabbitmqctl -p dong list_permissions rabbitmqctl list_permissions -p dong清除权限 rabbitmqctl clear_permissions [-p vhost] {username} rabbitmqctl clear_permissions -p dong rootrabbitmqctl工具标准语法:[]表示可选参数 {}表示必选参数
rabbitmqctl [-n node] [-t timeout] [-q] {command} [command options...] [-n node],默认节点是rabbit@hostname,hostname是主机名 [-q],启用quiet模式 [-t timeout],超时时间,秒,只适用list_xxx类型的命令用户管理
单个用户可以跨多个vhost进行授权创建用户
rabbitmqctl add_user root root123更新密码 rabbitmqctl change_password {username} {newpassword}清除密码 rabbitmqctl clear_password {username} #清除后用户不能使用密码登录通过密码验证用户 rabbitmqctl authenticate_user {username} {password}删除用户 rabbitmqctl delete_user {username}查询用户 rabbitmqctl list_users #会显示用户角色tags #none,无任何角色,新创建的用户默认为none #management,可以访问web管理页面 #policymaker,包含management权限,并可以管理策略policy和参数parameter #monitoring,包含management权限,并可看到所有连接、信道及节点相关信息 #administrator,最高权限,可以管理用户、虚拟主机、权限、策略、参数等设置用户角色 rabbitmqctl set_user_tags {username} {tag ...} #设置后任何之前的身份都会被删除 rabbitmqctl set_user_tags admin policymaket monitoringWeb端管理
启用RabbitMQ management插件,RabbitMQ提供很多插件,默认存放于$RABBITMQ_HOME/plugins目录下rabbitmq-plugins语法:
rabbitmq-plugins [-n node] {command} [command options...] 启用插件: rabbitmq-plugins enable [plugin-name] 关闭插件: rabbitmq-plugins disable [plugin-name]启用management插件 rabbitmq-plugins enable rabbitmq_management查看插件启用情况 rabbitmq-plugins list #[E*]为显示启动 [e*]为隐式启动应用与集群管理
应用管理
rabbitmqctl stop [pid_file]
若使用rabbitmq-server -detach这个命令启动RabbitMQ,则不会生成pid_file rabbitmqctl shutdown #阻塞直到erlang虚拟机进程退出 rabbitmqctl stop_app 停止MQ服务,但不停止Erlang虚拟机 rabbitmqctl reset #重置MQ,删除所有,恢复到初始状态 #从原来所在集群中删除此节点,从管理数据库中删除所有的配置数据,如用户、vhost,删除所有持久化消息 #执行reset前密码停止MQ应用 rabbitmqctl force_reset 强制重置还原到最初状态,执行前需要停止MQ应用 rabbitmqctl start_app rabbitmqctl wait [pid_file] 等待MQ应用的启动,会等到pid_file的创建,然后等待pid_file中所代表的进程启动 rabbitmqctl rotate_logs {suffix} 指示轮换日志文件,suffix为日志文件后缀集群管理
#将节点加入指定集群,执行前需要停止MQ应用并重置节点
rabbitmqctl join_cluster {cluster_node} [--ram] #显示集群状态 rabbitmqctl cluster_status #修改集群节点类型,执行前需要停止MQ应用 rabbitmqctl change_cluster_node_type {disc|ram} #将节点从集群中删除,允许离线执行 rabbitmqctl forget_cluster_node [--offline] #在集群节点应用启动前咨询clusternode节点的最新信息,并更新相应的集群信息,但不加入集群 rabbitmqctl update_cluster_nodes {clusternode} #无条件的启动节点 rabbitmqctl force_boot #指示未同步队列queue的slave镜像可以同步master镜像的内容,同步期间队列会被阻塞,直到同步完成 rabbitmqctl sync_queue [-p vhost] {queue} #取消队列queue同步镜像的操作 rabbitmqctl cancel_sync_queue [-p vhost] {queue} #设置集群名称 rabbitmqctl set_cluster_name {name}服务端状态
#显示队列详细信息
rabbitmqctl list_queues [-p vhost] [queueinfoitem...] 其中queueinfoitem包含的值如下: name,队列名称 durable,队列是否持久化 auto_delete,是否自动删除 arguments,队列参数 policy,应用到队列上的策略名称 pid,队列关联的Erlang进程ID owner_pid,处理排他队列连接的Erlang进程ID,若此队列是非排他的,则值为空 exclusive,队列是否排他 exclusive_sonsumer_pid,订阅到此排他队列的消费者相关的信道关联的Erlang进程ID,若为非排他,此值为空 exclusive_consumer_tag,订阅到此排他队列的消费者的consumerTag,若为非排他,此值为空 messages_ready,准备发送给客户端的消息个数 messages_unacknowledged,发送给客户端但尚未应答的消息个数 messages,准备发送给客户端和末应答消息的总和 messages_ready_ram,驻留内存的messages_ready消息个数 messages_unacknowledged_ram,驻留内存的messages_unacknowledged消息个数 messages_ram,驻留内存中的消息总数 messages_persistent,队列中持久化消息的个数,若为非持久化队列,则为0 messages_bytes,队列中所有消息大小总和(不包括消息属性和其他开销) messages_bytes_ready messages_bytes_unacknowledged messages_bytes_ram messages_bytes_persistent disk_reads,从队列启动开始,已从磁盘中读取该队列消息总次数 disk_writes,从队列启动开始,已向磁盘写入消息总次数 consumer,消费者数目 consumer_utilisation,能立刻投递给消费者的比率 memory,与队列相关的Erlang进程所消耗内存字节数,包括栈、堆及内部结构 slave_pids,镜像队列,列出所有slave镜像的pid synchronised_slave_pids,镜像队列,列出所有已同步的slave镜像pid state,队列状态 默认无参数显示队列名称和消息个数#显示交换器详细信息 rabbitmqctl list_exchanges [-p vhost] [exchangeinfoitem...] name,交换器名称 type,类型 durable,是否持久化 auto_delete,是否自动删除 internal,是否是内置的 arguments,其他一些结构化参数,如alternate-exchange policy,应用到交换器上的策略#显示绑定关系 rabbitmqctl list_bindings [-p vhost] [bindinginfoitem...] source_name,绑定中消息来源 source_kind,来源的类型 destination_name,目的名称 destination_kind,目的类型 routing_key,绑定的路由键,默认没有名称的交换器会绑定所有队列,且路由键为队列名称 arguments,绑定的参数#显示连接信息 rabbitmqctl list_connections [connectioninfoitem...] 返回TCP/IP连接统计信息 pid,与连接相关的Erlang进程ID name,连接的名称 port,服务器的端口 host,反向DNS获取的服务器主机名称或者IP地址,或者未启用 peer_port,对端端口,客户端的端口 peer_host,客户端IP,或者未启用 ssl,是否启用SSL ssl_protocol,SSL协议,如tlsv1 ssl_key_exchange,SSL密钥交换算法,如rsa ssl_cipher,SSL加密算法,如aes_256_cbc ssl_hash,SSL哈希算法,如sha peer_cert_subject,对端SSL安全证书主题,基于RFC4514形式 peer_cert_issuer,对端SSL证书发行者,基于RFC4514 peer_cert_validity,对端SSL证书有效期 state,连接状态,包括starting/tuning/opening/running/flow/blocking/blocked/closing/closed channels,该连接的信道个数 protocol,使用的AMQP协议版本,如{0,9,1} auth_mechanism,使用的SASL认证机制,如PLAIN/AMQPLAIN/EXTERNAL/BABBIT-CR-DEMO等 user,与连接相关的用户 vhost,与连接相关的vhost timeout,连接超时/协商的心跳间隔,秒 frame_max,最大传输的帧大小,单位B channel_max,此连接上信道的最大数量,若为0表示无上限 client_properties,在建立连接期间由客户端发送的信息属性 recv_oct,收到的字节数 recv_cnt,收到的数据包个数 send_oct,发送的字节数 send_cnt,发送的数据包个数 send_pend,发送队列大小 connected_at,连接建立的时间戳 默认显示user、peer_host、peer_port、state这几项#显示信道信息 rabbitmqctl list_channels [channelinfiitem...] pid,与连接相关的Erlang进程ID connection,信道所属的连接的Erlang进程ID name,信道名称 number,信道序号 user,与信道相关的用户名称 vhost,与信道相关的vhost transactional,信道是否处于事务模式 consumer_count,信道中消费者个数 messages_unacknowledged,已投递但未ack的消息个数 messages_uncommitted,已接收但还未提交事务的消息个数 acks_uncommitted,已ack收到,但还未提交事务的消息个数 messages_unconfirmed,已发送但未确认消息个数,若信道未处于publisher confirm模式下,此值为0 perfetch_count,新消费者的Qos个数限制,0表示不限 global_prefetch_count,整个信道的Qos个数限制,0表示不限 默认显示pid,user,consumer_count、messages_unacknowledged这几项#显示消费者信息 rabbitmqctl list_consumers [-p vhost] 已订阅队列名称、相关信道进程标识、consumerTag、是否需要消费端确认、prefetch_count、参数列表#显示Broker状态 rabbitmqctl status 当前Erlang节点上运行的应用程序、版本、OS名称、内存及文件描述符等信息#节点健康检查 rabbitmqctl node_health_check#运行环境 rabbitmqctl environment 显示程序环境中每个变量的名称和值#服务器状态报告 rabbitmqctl report 为所有服务器状态生成状态报告#执行任意表达式 rabbitmqctl eval {expr} 执行Erlang表达式,如rabbitmqctl eval 'node().' #返回rabbitmqctl连接的节点的名称用户、Parameter、vhost、权限等要以通过rabbitmqctl创建和删除;交换器、队列、绑定关系的创建和删除没有相关的rabbitmqctl命令
曲线救国方案:
#创建名称为exchange2的交换器 rabbitmqctl eval 'rabbit_exchange:declare({resource,<<"/">>,exchange,<<"exchange2">>},direct,true,false,false,[]).' ##declare(XName,Type,Durable,AutoDelete,Internal,Args). ##XName,交换器命名细节,{resource,VHost,exchange,Name} ##Type,类型,direct、headers、topic、fanout ##Durable,持久化;AutoDelete,自动删除;Internal,是否为内置交换器;Args,参数 ##删除rabbitmqctl eval 'rabbit_exchange:delete({resource,<<"/">>,exchange,<<"exchange2">>},false).'#创建名称为queue2的队列 rabbitmqctl eval 'rabbit_amqqueue:declare({resource,<<"/">>,queue,<<"queue2">>},true,false,[],none).' ##declare(QueueName,Durable,AutoDelete,Args,Owner). ##QueueName,队列命名细节,{resource,VHost,queue,Name} ##Owner,队列的独占模式,一般为none,排他 ##删除rabbitmqctl eval 'rabbit_amqqueue:internal_delete({resource,<<"/">>,queue,<<"queue2">>}).'#将交换器exchange2与队列queue2绑定,路由键为rk2 rabbitmqctl eval 'rabbit_binding:add({binding,{resource,<<"/">>,exchange,<<"exchange2">>},<<"rk2">>,{resource,<<"/">>,queue,<<"queue2">>},[]}).' ##add({binding,Source,Key,Destination,Args}). ##删除rabbitmqctl eval 'rabbit_binding:remove({binding,{resource,<<"/">>,exchange,<<"exchange2">>},<<"rk2">>,{resource,<<"/">>,queue,<<"queue2">>},[]}).' 若要删除所有的交换器、队列及绑定关系,只需要删除对应的vhost就可以一键搞定HTTP API接口管理
RabbitMQ Management插件不仅提供Web管理界面,还提供了HTTP API接口
POST方法创建的是无法用具体名称的资源,如绑定和发布消息等创建名为queue33的队列 %2F是默认vhost:/的转义 curl -i -u admin:admin -H"content-type:application/json" -XPUT -d'{"auto_delete":false,"durable":true,"node":"rabbit@localhost"}' http://127.0.0.1:15672/api/queues/%2F/queue33获取队列queue33的信息 curl -i -u admin:admin -XGET http://127.0.0.1:15672/api/queues/%2F/queue33删除队列queue33 curl -i -u admin:admin -XDELETE http://127.0.0.1:15672/api/queues/%2F/queue33向交换器exchange2发送一条消息 curl -i -u admin:admin -XPOST -d'{"properties":{},"routing_key":"rk2","payload":"my_body","payload_encoding":"string"}' http://127.0.0.1:15672/api/exchanges/%2F/exchange2/publish清空队列queue2 curl -i -u admin:admin -XDELETE http://127.0.0.1:15672/api/queues/%2F/queue2/contents从队列中消费消息 curl -i -u admin:admin -XPOST -d'{"count":5,"requeue":false,"encoding":"auto","truncate":50000}' http://127.0.0.1:15672/api/queues/%2F/queue2/get #获取到的结果[{"payload_bytes":7,"redelivered":false,"exchange":"exchange2","routing_key":"rk2","message_count":0,"properties":[],"payload":"my_body","payload_encoding":"string"}] #count表示最大能获取的消息个数 #requeue表示获取到消息后,消息是否从队列中删除,为true不删除,但会设置redelivered标识会被设置 #encoding表示编码格式,auto或base64 #truncate,可选参数,如果消息payload超过指定大小会被截断健康检查 GET /api/healthchecks/node #正常返回{"status":"ok"},异常返回{"status":"failed","reason":"string"} 返回码均为200获取HTTP API接口列表 GET /api/ 类似帮助文档rabbitmqadmin工具
rabbitmqadmin是management插件提供的功能,包装了HTTP API接口,更易于使用
位置:/opt/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@localhost-plugins-expand/rabbitmq_management-3.6.15/priv/www/cli/rabbitmqadmin 下载:wget http://127.0.0.1:15672/cli/rabbitmqadmin #rabbitmqadmin需要python的支持 chmod +x rabbitmqadmin #创建队列queue1 ./rabbitmqadmin -u admin -p admin declare queue name=queue1 #显示队列 ./rabbitmqadmin list queues #删除队列 ./rabbitmqadmin -u admin -p admin delete queue name=queue1 #其它常用命令 ./rabbitmqadmin -V / list exchanges #列出所有交换器 ./rabbitmqadmin list queues / name queue2 messages message_stats.publish_details.rate ./rabbitmqadmin declare exchange name=my-new-exchange type=fanout #创建交换器 ./rabbitmqadmin publish exchange=exchange2 routing_key=rk2 payload="hello world" #发送消息 ./rabbitmqadmin get queue=queue2 requeue=false #接收消息 ./rabbitmqadmin export rabbit.definitions.json #导出配置 ./rabbitmqadmin -q import rabbit.definitions.json #导入配置 #关闭所有连接 ./rabbitmqadmin -f tsv -q list connections name | while read conn ; do ./rabbitmqadmin -q close connection name="${conn}" ; doneRabbitMQ配置
通常默认配置就可以
三种配置方案: 环境变量,可以配置节点名称、RabbitMQ配置文件地址、节点内部通信端口等 配置文件,定义RabbitMQ服务和插件,如TCP监听端口、网络相关设置、内存限制、磁盘限制等 运行时参数和策略,集群层面的服务设置环境变量
以RABBITMQ_开头,可以在环境中设置,也可以在rabbitmq-env.conf中定义。在非shell环境中配置,需要将RABBITMQ_前缀去除 优先级:Shell环境>rabbitmq-env.conf>默认配置 RABBITMQ_NODENAME=rabbit@node2 rabbitmq-server -detachedrabbitmq-env.conf默认位于$RABBITMQ_HOME/etc/rabbitmq/目录下
配置示例:NODENAME=rabbit@node1
NODE_PORT=5672 #rabbitmq.conf不需要添加.conf后缀 CONFIG_FILE=/opt/rabbitmq/etc/rabbitmq/rabbitmq如果没有特殊需求,不建议更改RabbitMQ的环境变量,在实际生产环境中,若对配置和日志的目录有特殊要求,可以参考下面的配置
#配置文件的地址
CONFIG_FILE=/apps/conf/rabbitmq/rabbitmq #环境变量的配置文件地址 CONF_ENV_FILE=/apps/conf/rabbitmq/rabbitmq-env.conf #服务日志的地址 LOG_BASE=/apps/logs/rabbitmq #Mnesia的路径 MNESIA_BASE=/apps/dbdat/rabbitmq/mnesia配置文件
可以从日志中获取相关的文件位置信息,如下
=INFO REPORT==== 1-Jun-2018::15:30:12 === node : rabbit@localhost home dir : /root config file(s) : /opt/rabbitmq/etc/rabbitmq/rabbitmq.config (not found) cookie hash : EX4yjV7OOErONodYs7ez4w== log : /opt/rabbitmq/var/log/rabbitmq/rabbit@localhost.log sasl log : /opt/rabbitmq/var/log/rabbitmq/rabbit@localhost-sasl.log database dir : /opt/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@localhost也可以从进程信息中查看相关路径信息 ps -ef|grep rabbit官方配置文件示例:https://github.com/rabbitmq/rabbitmq-server/blob/master/docs/rabbitmq.config.example
官方高级配置示例:https://github.com/rabbitmq/rabbitmq-server/blob/master/docs/advanced.config.example配置加密
配置文件中的敏感配置项可以被加密,格式为:{encrypted,加密的值}形式包裹
加密需要设置口令,config_entry_decoder中的passphrase配置就是口令 config_entry_decoder,[{passphrase,<<"zzhpassphrase">>}] 生成加密字符串 rabbitmqctl encode '<<"guest">>' zzhpassphrase 得到{encrypted,<<"dRrG+PV2Jxu+yoQh6EKzGiv6U6cFxnFWvMx23g8ij17Tp5Ng5fdHFJ4Va754h2iI5DMi2ZN+wgnniPWXHt44XA==">>} 解密:rabbitmqctl encode --decode '{encrypted,<<"dRrG+PV2Jxu+yoQh6EKzGiv6U6cFxnFWvMx23g8ij17Tp5Ng5fdHFJ4Va754h2iI5DMi2ZN+wgnniPWXHt44XA==">>}' zzhpassphrase 得到:<<"guest">> 默认,加密机制PBKDF2从口令中派生了密钥,默认的hash算法是SHA512,默认的迭代次数是1000,默认的加密算法为AES_256_CBC优化网络配置
MQ支持的所有协议都是基于TCP层面的,默认MQ会在所有接口的5672端口下监听
提高吞吐量可以禁用Nagle算法、增大TCP缓冲区大小。每个TCP连接都有缓冲区,缓冲区越大,吞吐量也越高,但每个连接上消费的内存也就越多,从而使整体服务内存增大。linux中,默认会自动调节TCP缓冲区的大小,一般为80KB到120KB之间 文件句柄数,每个节点上连接的数目乘以1.5可以粗略估算,如要支撑10万个TCP连接,文件句柄数城要15万参数及策略
运行时参数可以通过rabbitmqctl工具或者management插件的http api接口进行修改
vhost级别的Parameter由一个组件名称component name、名称name、值value组成;global级别的参数,由名称和值组成。二者的值都是JSON类型的 vhost级别的参数对应的rabbitmqctl命令有三种:set_parameters、list_parameters、clear_patameter#设置参数
rabbitmqctl set_parameters [-p vhost] {component_name} {name} {value} rabbitmqctl list_parameters rabbitmqctl set_parameters federation-upstream f1 '{"uri":"amqp://root:root123@192.168.0.2:5672\",\"ack-mode\":\"on-confirm\"}'#显示参数
rabbitmqctl list_parameters [-p vhost]#消除参数
rabbitmqctl clear_parameter [-p vhost] {componenet_name} {key} rabbitmqctl clear_parameter -p / federation-upstream f1HTTP API接口:
#设置参数 PUT /api/parameters/{component_name}/vhost/name#清除参数 DELETE /api/parameters/{componenet_name}/vhost/name#显示参数 GET /api/parameters客户端创建交换器或者队列时可设置一些参数,如x-message-ttl、x-expires、x-max-length等,这些参数一但创建就不能删除,只能删除交换器或队列重新创建。为解决这个问题,可以引入Policy,它是一种特殊的Parameter用法,Policy是vhost级别的,一个Policy可以匹配一个或多个队列,或者交换器。Policy支持动态修改一些属性参数。通常Policy用来配置Federation、镜像、备份交换器、死信等功能。
pattern,正则匹配,匹配相关的队列或交换器 apply to,指定policy作用于哪一方,可以是队列、交换器、或同时 priority,定义优先级,若有多个policy作用于同一个交换品茶或者队列,那么priority最大的那个policy才起作用#设置policy
rabbitmqctl set_policy [-p vhost] [--priority priority] [--apply-to apply-to] {name} {pattern} {definition} 其中name pattern definition是必填项 #设置默认vhost中所有以^amp.开头的交换器为联邦交换器 rabbitmqctl set_policy --apply-to exchanges --priority 1 p1 "^amp." '{"federation-upstream":"f1"}' curl -i -u admin:admin -XPUT -d'{"pattern":"^amq\.","definition":{"federation-upstream":"f11"},"priority":1,"apply-to":"exchanges"}' http://127.0.0.1:15672/api/policies/%2F/p12#显示policy
rabbitmqctl list_policies [-p vhost] curl -i -u admin:admin -XGET http://127.0.0.1:15672/api/policies/%2F#消除policy
rabbitmqctl clear_policy [-p vhost] {name} rabbitmqctl clear_policy p12 curl -i -u admin:admin -XDELETE http://127.0.0.1:15672/api/policies/%2F/p12 若两个或者多个policy都作用到同一交换器或队列上,且这些policy优先级都一样,则参数项最多的policy具有决定权;若参数一样多,则最后添加的policy具有决定权