1.保证消息不丢失的必要条件
- 生产者发送消息、生产者存储消息、消费者拉取消息,需要保证三大流程消息不丢失,缺一不可。
- 生产者保证消息完整发送并存储至broker。
- broker保证存储的消息不丢失。
- 消费者保证拉取的消息一定被消费,即使重启了,也能确保未消费的消息继续消费。
2.生产+发送消息流程
以下单为例,下单后增加积分,增加积分这个动作放在消息里实现,且要保证该消息一定发送成功。
这里可以引用TCP协议的ack请求确认机制。如果broker收到生产者推送来的消息,就返回ack给生产者,这样就能保证生产者发送消息这个阶段,消息不会丢失。如果生产者一直没收到ack,可能是网络或者其他原因,这时候就会进行重试,重试次数达到上限后抛异常。但这里抛异常不能影响主流程,所以需要对异常进行特殊处理。如果是同步发送消息,那就try-catch,如果是异步,那就在异步方法对应的onException做异常处理,这里可能要做一些补偿机制。
3.存储流程
broker在返回ack给生产者之前要确保消息已经成功存储了,RocketMQ的消息默认是异步刷盘,先刷到cache上,再等操作系统或定时刷盘任务把消息刷到磁盘上。如果此时断电了,消息就丢失了。所以为了保证消息不被丢失,这里可以把刷盘方式改成同步刷盘(flushDiskType=SYNC_FLUSH)。如果broker是集群,那也得保证主从broker的复制方式是同步复制,这样的话消息就更安全了。
4.消费流程
消费者消费后需要上报点位给broker,告诉broker已经消费到第几条消息了。如果消费者在处理消息的时候异步处理,然后直接告知broker消费成功,就很有可能在消费过程中报错,导致再次拉取消息的时候,从之前上报过的点位继续拉消息,消息就“丢失”了。所以这里要确保消费者真正消费完成消息后,再提交点位。