这里我必须先提一提RabbitMQ的消息协议——AMQP(Advanced Message Queuing Protocol,高级消息队列协议),在面试时我经常问候选人一个问题:RabbitMQ用的是什么消息协议?大部分候选人是回答不出来AMQP的,更不用说AMQP模型是如何设计的了。
由于MQ是基础网络通讯的中间件,网络通讯必然因丢包、网络抖动等原因产生数据丢失,MQ组件本身也会由于宕机或软件崩溃而中止服务,从而造成数据丢失,那么我们就需要从这两个根本原因着手补偿,这里科普一下RabbitMQ和Kafka是怎么解决的。
MQ作为异步通讯的消息中间件,其功能除了解耦生产者与消费者,还能用于大流量的削峰填谷,解决业务的最终一致性问题,那么消息的“”就显得尤为重要了,比如说商品出库后的库存数据通过MQ同步到财务系统,如果消息的可靠性没有保障,那财务系统的存货成本分析数据就无法有效支撑财务团队。
在RabbitMQ中对数据的持久化有三方面:交换机持久化、队列持久化、消息持久化。
上面只是说了API层的实现,那RabbitMQ底层又是怎么做消息持久化的呢?
通过API将信道(channel)设置为confirm模式,则每条消息会被分配一个唯—ID
意味着生产者不需要等待,可以继续发送新消息ack和nack只有一个被触发,只触发一次,而且是异步执行,
通常来说,消息是在内存中存储通讯的,而基于内存的都是会有数据丢失的问题产生,服务一重启,数据就随之销毁。
如果消息投递成功,也就是说消息已经到达broker了,信道会发送ack给生产者,回调ConfirmCallback接口,带上唯一ID
如果指定了持久化参数,它们会以append的方式写文件,会根据文件大小(默认16M)自动切割,生成新的文件,RabbitMQ启动时会创建两个进程,一个负责持久化消息的存储,另一个负责非持久化消息的存储(当内存不够时会用到)。
准确来说,我们需要保障MQ消息的可靠性,需要从三个层面/维度解决:生产者100%投递、MQ持久化、消费者100%消费,这里的100%消费指的是消息不少消费,也不多消费。
声明队列时,指定noack=false, 表示消费者不会自动提交ack,broker会等待消费者手动返回ack、才会删除消息,否则立刻删除
此外,RabbitMQ除了消息确认机制,还有另一种方式——使用事务消息:消息生产端发送commit命令,MQ同步返回commit ok命令,这种方式由于需要同步阻塞等待MQ返回是否投递成功,才能执行别的操作,性能较差,因此不推荐使用。
如果发生错误导致消息丢失,比如通过某个RoutingKey无法路由到某个Queue,则会发送nack给生产者,回调ReturnCallback接口,并带上唯一ID和异常信息
Exchange:接收发布应用程序发送的消息,并根据一定的规则将这些消息路由到消息队列
使用这个模型我们可以很容易地模拟出存储转发队列和主题订阅这些典型的消息中间件概念。
broker的ack没有超时机制,只会判断链接是否断开,如果断开了(比如消费者处理消息过程中宕机),消息会被重新发送,所以消费者要做好消息幂等性处理
消息存储时,会在一个叫ets的表中记录消息在文件