1、prefetch消息堵塞问题
mq是实现代码扩展的有利手段,个人喜欢用概念来学习新知识,介绍堵塞问题的之前,先来段概念的学习。
ConnectionFactory:创建connection的工厂类
Connection: 简单理解为socket
Channel:和mq交互的接口,定义queue、exchange和绑定queue、exhange等接口都是它。
接下来就是和mq的交互类
exchange:简单地看成路由,类型不是重点,看看官网即可
queue:客户端监听的是queue,而不是exchange,但是使用queue的前提要先将exchange和queue绑定。用过java queue工具类应该很容易上手,queue分为写和读,各自可以有自己频率,写得快读得慢,容易堵塞;写得慢读得快又容易造成消费者的空闲。
Prefetc:一个重要却容易被忽略的指标,也是这次遇到的问题。
prefetch与消息投递
prefetch是指单一消费者最多能消费的unacked messages数目。
如何理解呢?
mq为每一个 consumer设置一个缓冲区,大小就是prefetch。每次收到一条消息,MQ会把消息推送到缓存区中,然后再推送给客户端。当收到一个ack消息时(consumer 发出baseack指令),mq会从缓冲区中空出一个位置,然后加入新的消息。但是这时候如果缓冲区是满的,MQ将进入堵塞状态。
更具体点描述,假设prefetch值设为10,共有两个consumer。也就是说每个consumer每次会从queue中预抓取 10 条消息到本地缓存着等待消费。同时该channel的unacked数变为20。而Rabbit投递的顺序是,先为consumer1投递满10个message,再往consumer2投递10个message。如果这时有新message需要投递,先判断channel的unacked数是否等于20,如果是则不会将消息投递到consumer中,message继续呆在queue中。之后其中consumer对一条消息进行ack,unacked此时等于19,Rabbit就判断哪个consumer的unacked少于10,就投递到哪个consumer中。
我遇到的问题是一个粗心的程序员,在编写代码的时候,他对某些消息处理方式是这样的
if (success) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else {
logger.error("######### The message is not delete from queue : {}", body);
}
首先他讲ack机制设置为手动的,然后他的理解是如果处理成功的消息,就ack给MQ,期望MQ就可以删除完成的数据。不然,保留数据再次被处理。
这里的误区就是就是对ack的理解,失败的时候,如果需要让程序继续处理,应该使用basicNack,并告诉mq将消息再次放入队列
if (success) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
对于客户端意外宕机的情况,没有ack服务器确实不会删除掉数据,但是consumer重启以后,对于服务器就是一个新的消费者了,也就是它的缓冲区又被重置为原来的n-prefetch,所以这个问题被粗心的小哥想当然地测试通过了。
prefetch的大小应该为多少
这篇文章给了很好的建议,我简单地说一下我的理解。
理想状况下,计算MQ SERVER 从缓冲区中拿到消息并推送到消费端,加上消费端处理完ack消息到MQ server,的时间,假设为100ms,其中消费端处理业务话费了10ms。
这里可以得出我们 prefetch = 100ms / 10ms = 10,也就是消息来回的总时间/业务处理的时间,这里要求我们 prefetch >= 10。一般计算这个时间不会太准确只能毛姑姑的,所以prefetch一般要大一点。但是这个值也不能太大,不然消费端就一只处于空闲状态了。
所以如果你保证所有的消息都ack了,但是还是出现比较长时间的堵塞,你就或者加大一点prefetch,或者多加一些机器,或者减少业务处理的时间了。一开始建议采用或者,使用一个线程池来处理这些业务逻辑。
2、qos内存溢出—-队列超长QueueingConsumer导致JVM内存溢出
我们的服务器使用RabbitMQ作为消息中转的容器。某天我怀疑RabbitMQ队列是否都能及时消化。于是用命令查询了下:rabbitmqctllist_vhosts | grep -P “.*.host” | xargs -i rabbitmqctl list_queues-p {} | grep “queue”。
不查不知道,一查吓一跳:大多数服务器的队列基本上都是空的,但是有些服务器的某个队列竟然有超过500W条的记录。一般RabbitMQ进程占用内存不过100M-200M,这些队列超长的RabbitMQ进程可以占用超过2G的内存。
显然消息队列的消费者出现了问题。开发查看日志发现作为该队列消费者的Java服务的日志也卡住了,重启服务后(这点做得不对,应该用jstat、jstack进行排查,而不是直接重启)又很快卡住。这时候他才想起来用jstat,通过jstat发现JVM内存都耗尽了,之后进入无尽的FullGC,所以当然不会处理队列消息和输出日志信息了。jstat的输出如下:
————————————————————————–
[root@mail ~]# jstat -gcutil 29389
S0 S1 E O P YGC YGCT FGC FGCT GCT
100.00 0.00 100.00 100.00 59.59 1639 2.963 219078 99272.246 99275.209
————————————————————————–
使用jmap导出这时候的Java堆栈,命令:jmap-dump:format=b,file=29389.hprof 29389。将得到的dump文件放到MAT(EclipseMemory Analyzer)里进行分析,发现很明显是QueueingConsumer持有大量对象导致JVM内存溢出,截图如下:
上网搜索了下,发现有人遇到了类似的问题:RabbitMQ QueueingConsumer possible memoryleak 。解决办法是调用Channel的basicQos方法,设置临时内存中最多保存的消息数。这个数值的设置建议参考 《Some queuing theory: throughput, latency andbandwidth》 权衡决定。
拍脑袋将basicQos设置为16后重启服务,队列终于开始消化了。用jstat观察JVM内存使用情况,也没有再出现剧增溢出的现象。
总结:使用RabbitMQ的时候,一定要合理地设置QoS参数。我感觉RabbitMQ的默认做法其实是很脆弱的,容易导致雪崩。“Youhave a queue in Rabbit. You have some clients consuming from thatqueue. If you don’t set a QoS setting at all (basic.qos), thenRabbit will push all the queue’s messages to the clients as fast asthe network and the clients will allow.“。这样如果由于某些原因,队列中堆积了比较多的消息,就可能导致Comsumer内存溢出卡死,于是发生恶性循环,队列消息不断堆积得不到消化,彻底地悲剧了。