In distributed system development, one question comes up again and again: if a direct service call can get the job done, why introduce a message queue at all?
That doubt is reasonable. MQ is not something you add just for architectural neatness. Its value becomes obvious only when a system starts running into real pressure: high concurrency, rigid service dependencies, unstable downstream calls, traffic spikes, or cross-service consistency problems. In those situations, MQ stops being an extra middle layer and becomes a practical tool for keeping the system responsive and resilient.
What follows is a close look at ten common business scenarios where MQ proves useful, moving from the underlying problem to the solution pattern, implementation examples, and the technical details that matter in practice.
Why MQ exists in the first place
When services call each other directly, four problems tend to surface quickly:
- Tight coupling: the upstream service must know every downstream dependency. Adding or removing one means changing upstream code.
- Performance bottlenecks: synchronous calls stack downstream latency onto the main request, making response times longer and longer.
- Single-point failures: if one downstream service goes down, the upstream call chain may fail and break the core business flow.
- Poor scalability: when downstream services scale or change behavior, the caller often has to adapt as well.
MQ changes that dynamic by providing several core capabilities:
- Decoupling: producers send messages without caring how consumers process them.
- Asynchronous execution: non-critical work can move off the main request path.
- Traffic buffering: bursts can be absorbed by queues instead of crashing databases or services.
- Persistent storage: messages can be retained durably instead of disappearing on transient failures.
- Consistency support: transaction messages, retries, and compensation help maintain eventual consistency in distributed systems.
1. Decoupling services that should not depend on each other directly
The problem
Take an e-commerce order service. After an order is created, it may need to update inventory, grant loyalty points, send an email, and push data to analytics. In a synchronous design, the order service ends up depending on every downstream service. That means two things: every new feature expands the dependency graph, and a failure in any non-core downstream call can still break order creation.
// 紧耦合代码示例
@Service
public class OrderService {
// 依赖多个下游服务
@Autowired private InventoryService inventoryService;
@Autowired private PointsService pointsService;
@Autowired private EmailService emailService;
public void createOrder(Order order) {
// 1. 保存订单(核心流程)
orderDao.save(order);
// 2. 同步调用库存服务(非核心,但故障会阻塞订单)
inventoryService.updateInventory(order);
// 3. 同步调用积分服务(非核心)
pointsService.addPoints(order.getUserId(), order.getAmount());
// 4. 同步发送邮件(非核心,超时风险高)
emailService.sendConfirmation(order);
}
}
The MQ approach
Once MQ is introduced, the order service only publishes an order created event. Inventory, points, email, and any future service subscribe independently and handle the event on their own.
The flow becomes:
Order service -> publish message -> MQ -> inventory / points / email consumers
Code example
// 1. 订单服务(生产者):只关注核心流程,发送消息
@Service
public class OrderService {
@Autowired private RabbitTemplate rabbitTemplate;
public void createOrder(Order order) {
// 核心:保存订单
orderDao.save(order);
// 发送消息到MQ,参数:交换机、路由键、消息体
OrderCreatedEvent event = new OrderCreatedEvent(
order.getId(), order.getUserId(), order.getAmount()
);
rabbitTemplate.convertAndSend(
"order.exchange", // 交换机名称
"order.created", // 路由键(匹配下游队列)
event
);
}
}
// 2. 库存服务(消费者):订阅消息,独立处理
@Component
public class InventoryConsumer {
@Autowired private InventoryService inventoryService;
// 监听指定队列
@RabbitListener(queues = "inventory.queue")
public void handleOrderCreated(OrderCreatedEvent event) {
// 独立处理库存更新,失败不影响订单服务
inventoryService.updateInventory(event.getOrderId());
}
}
// 3. 邮件服务(消费者):同理订阅消息
@Component
public class EmailConsumer {
@Autowired private EmailService emailService;
@RabbitListener(queues = "email.queue")
public void handleOrderCreated(OrderCreatedEvent event) {
emailService.sendConfirmation(event.getUserId(), event.getOrderId());
}
}
What matters technically
- Use exchange-to-queue bindings and routing keys such as
order.createdto send messages to the correct downstream queues. - If a consumer fails, retries or a dead-letter queue should catch the message instead of losing it.
- Standardize the message format with JSON or Protobuf so services in different languages can consume it consistently.
2. Offloading slow work with asynchronous processing
The problem
Video upload is a classic example. After a user uploads a file, the system may need to transcode it, generate thumbnails, and run content review. Each step can take seconds. If that work is done synchronously, the user waits far too long for a response.
The MQ approach
The upload service stores the original video, returns immediately, and publishes a processing event. Background workers consume the event and perform the heavy work asynchronously. After processing finishes, the user can be notified.
The flow looks like this:
User -> video service (save file + send message) -> immediate success response -> MQ -> processing service -> user notification
Code example
// 1. 视频服务(生产者):快速响应用户
@Service
public class VideoService {
@Autowired private KafkaTemplate<String, Object> kafkaTemplate;
public UploadResponse uploadVideo(MultipartFile file, String userId) {
// 1. 保存原始视频,生成唯一视频ID
String videoId = saveOriginalVideo(file);
// 2. 发送处理消息到Kafka(异步执行,不阻塞返回)
VideoProcessingEvent event = new VideoProcessingEvent(videoId, userId);
kafkaTemplate.send("video-processing-topic", event);
// 3. 立即返回,用户无需等待
return new UploadResponse(videoId, "上传成功,处理中");
}
}
// 2. 视频处理服务(消费者):后台异步处理
@Service
public class VideoProcessingConsumer {
@Autowired private VideoProcessor videoProcessor;
@Autowired private NotificationService notificationService;
// 监听Kafka主题
@KafkaListener(topics = "video-processing-topic")
public void processVideo(VideoProcessingEvent event) {
try {
// 耗时操作:转码、生成缩略图、内容审核
videoProcessor.transcode(event.getVideoId());
videoProcessor.generateThumbnails(event.getVideoId());
contentAuditService.check(event.getVideoId());
// 处理完成,通知用户
notificationService.notifyUser(event.getUserId(), "视频处理完成");
} catch (Exception e) {
log.error("视频处理失败,videoId:{}", event.getVideoId(), e);
// 失败重试:可发送到重试队列
}
}
}
What matters technically
- For large volumes of video processing events, Kafka is usually a better fit because of its high throughput and strong persistence.
- The user-facing API should return a
processingstate and usually provide a query endpoint keyed byvideoId. - If demand increases, consumer instances can be scaled horizontally and MQ will distribute work among them.
3. Smoothing traffic spikes in flash-sale scenarios
The problem
Flash sales can drive traffic far beyond normal load. A service that usually handles moderate traffic may suddenly receive 100x more requests, such as 100,000 requests in one second. If all of them hit the database directly, connection pools can be exhausted, queries time out, and the database may fail entirely.
The MQ approach
MQ turns an instantaneous burst into controlled downstream consumption:
- Rate limiting at the gateway rejects traffic beyond what the system can carry.
- The queue buffers incoming purchase requests instead of letting them strike the database directly.
- Consumers process requests at a stable pace the database can sustain.
Code example
// 1. 秒杀服务(生产者):预减库存+发送消息
@Service
public class SecKillService {
@Autowired private RedisTemplate<String, Object> redisTemplate;
@Autowired private RabbitTemplate rabbitTemplate;
public SecKillResponse secKill(SecKillRequest request) {
String userId = request.getUserId();
String itemId = request.getItemId();
// 1. 校验用户资格(如是否已秒杀过)
if (Boolean.TRUE.equals(redisTemplate.hasKey("sec_kill:user:" + userId + ":" + itemId))) {
return SecKillResponse.failed("已参与秒杀");
}
// 2. Redis预减库存(原子操作,避免超卖)
String stockKey = "sec_kill:stock:" + itemId;
Long remainingStock = redisTemplate.opsForValue().decrement(stockKey);
if (remainingStock == null || remainingStock < 0) {
// 库存不足,恢复Redis计数
redisTemplate.opsForValue().increment(stockKey);
return SecKillResponse.failed("库存已空");
}
// 3. 发送秒杀成功消息到MQ,异步创建订单
SecKillSuccessEvent event = new SecKillSuccessEvent(userId, itemId);
rabbitTemplate.convertAndSend("sec_kill.exchange", "sec_kill.success", event);
// 4. 标记用户已参与秒杀
redisTemplate.opsForValue().set("sec_kill:user:" + userId + ":" + itemId, "1", 24, TimeUnit.HOURS);
return SecKillResponse.success("秒杀成功,订单创建中");
}
}
// 2. 订单处理服务(消费者):匀速创建订单
@Component
public class SecKillOrderConsumer {
@Autowired private OrderService orderService;
// 单消费者实例控制处理速度,或通过线程池调节
@RabbitListener(queues = "sec_kill.order.queue")
public void createSecKillOrder(SecKillSuccessEvent event) {
try {
// 匀速处理:创建订单(数据库操作)
orderService.createSecKillOrder(event.getUserId(), event.getItemId());
} catch (Exception e) {
log.error("创建秒杀订单失败,userId:{}, itemId:{}", event.getUserId(), event.getItemId(), e);
// 失败处理:恢复库存+通知用户
restoreStock(event.getItemId());
notificationService.notifyUser(event.getUserId(), "秒杀订单创建失败");
}
}
// 恢复库存(补偿逻辑)
private void restoreStock(String itemId) {
redisTemplate.opsForValue().increment("sec_kill:stock:" + itemId);
}
}
What matters technically
- Prevent overselling with Redis atomic pre-deduction using
decrement. - Limit traffic at the gateway with tools such as Nginx or Sentinel so the queue itself does not get overwhelmed.
- After the campaign, reconcile Redis stock against database stock to correct any drift.
4. Synchronizing data across microservices
The problem
In a microservice architecture, each service often owns its own database. If a user account is disabled in the user service, the order service still needs to update its local cache or related state. Otherwise, a disabled user might still be able to place orders.
The MQ approach
Whenever user data changes, the user service publishes a user updated event. Services that depend on user information—orders, payments, or others—consume the message and update local caches or tables. The result is eventual consistency across the system.
Code example
// 1. 用户服务(生产者):事务内发送消息,保证数据与消息一致
@Service
public class UserService {
@Autowired private RocketMQTemplate rocketMQTemplate;
// 本地事务:更新数据库+发送消息
@Transactional
public void updateUserStatus(String userId, String status) {
// 1. 更新用户状态(数据库事务)
User user = userDao.findById(userId).orElseThrow(() -> new RuntimeException("用户不存在"));
user.setStatus(status);
userDao.save(user);
// 2. 发送事务消息(RocketMQ支持,确保消息与数据库操作同成功/同失败)
UserStatusEvent event = new UserStatusEvent(userId, status);
rocketMQTemplate.sendMessageInTransaction(
"user-status-topic", // 主题
MessageBuilder.withPayload(event).build(),
null // 事务参数(可选)
);
}
}
// 2. 订单服务(消费者):同步更新本地缓存
@Component
@RocketMQMessageListener(
topic = "user-status-topic",
consumerGroup = "order-service-group" // 消费者组,同一组内负载均衡
)
public class UserStatusConsumer implements RocketMQListener<UserStatusEvent> {
@Autowired private LocalCacheManager localCacheManager;
@Override
public void onMessage(UserStatusEvent event) {
// 更新本地用户缓存(如禁用用户,后续下单会校验)
localCacheManager.updateUserStatus(event.getUserId(), event.getStatus());
// 可选:标记用户相关订单为“不可操作”
orderService.markOrderByUserStatus(event.getUserId(), event.getStatus());
}
}
What matters technically
- RocketMQ transaction messages help keep database updates and message publication aligned.
- Consumers must be idempotent, typically by deduplicating with a message ID.
- If dependent data is not ready yet downstream, a delayed retry can help rather than failing permanently.
5. Centralized log collection in distributed deployments
The problem
In distributed applications, logs are scattered across machines. One node might write to /var/log/app1/, another to /var/log/app2/. Troubleshooting means logging into servers one by one, which is slow and painful.
The MQ approach
MQ can serve as the backbone of a centralized log pipeline:
- Application nodes send logs into MQ, typically Kafka.
- Different consumers read the same stream and push logs into Elasticsearch for search, HDFS for archiving, and monitoring systems for alerting.
Code example
// 1. 日志收集组件(生产者):嵌入应用,发送日志
@Component
public class LogCollector {
@Autowired private KafkaTemplate<String, String> kafkaTemplate;
// 收集日志并发送到Kafka
public void collect(String appId, String level, String message, Map<String, Object> context) {
// 构建日志对象
LogEvent logEvent = new LogEvent();
logEvent.setAppId(appId);
logEvent.setLevel(level);
logEvent.setMessage(message);
logEvent.setContext(context);
logEvent.setTimestamp(System.currentTimeMillis());
// 发送到Kafka,按appId分区(便于按应用筛选)
kafkaTemplate.send(
"app-logs-topic",
appId, // 分区键:同一应用的日志进入同一分区,保证顺序
JsonUtils.toJson(logEvent)
);
}
}
// 2. 日志处理服务(消费者1):写入Elasticsearch,支持查询
@Service
public class EsLogConsumer {
@Autowired private ElasticsearchRestTemplate esTemplate;
@KafkaListener(topics = "app-logs-topic", groupId = "log-es-group")
public void writeToEs(String logJson) {
LogEvent logEvent = JsonUtils.fromJson(logJson, LogEvent.class);
// 写入Elasticsearch,索引按日期拆分(如app-logs-20251017)
String index = "app-logs-" + DateUtils.format(new Date(), "yyyyMMdd");
esTemplate.save(logEvent, index);
}
}
// 3. 日志告警服务(消费者2):异常日志触发告警
@Service
public class AlertLogConsumer {
@Autowired private AlertService alertService;
@KafkaListener(topics = "app-logs-topic", groupId = "log-alert-group")
public void checkAndAlert(String logJson) {
LogEvent logEvent = JsonUtils.fromJson(logJson, LogEvent.class);
// ERROR级日志触发告警
if ("ERROR".equals(logEvent.getLevel())) {
AlertMessage alert = new AlertMessage();
alert.setTitle("应用异常日志");
alert.setContent("appId:" + logEvent.getAppId() + ", 消息:" + logEvent.getMessage());
alertService.sendAlert(alert); // 发送邮件/短信告警
}
}
}
What matters technically
- Kafka is usually the best fit for large-scale log ingestion.
- Partition logs by application ID or service name to avoid overload on a single partition.
- Turn on message compression such as GZIP to save bandwidth.
6. Broadcasting configuration changes to every node
The problem
When runtime configuration changes—rate limits, feature flags, toggles—every service node needs to see the new value quickly. If some nodes keep stale local cache entries, behavior becomes inconsistent across the cluster.
The MQ approach
A configuration service publishes a config updated broadcast, and every application node subscribes and refreshes its local cache. RabbitMQ fanout exchanges are one option; Redis Pub/Sub is a lighter one.
Code example (Redis Pub/Sub)
// 1. 配置服务(发布者):广播配置更新
@Service
public class ConfigService {
@Autowired private RedisTemplate<String, Object> redisTemplate;
public void updateConfig(String configKey, String configValue) {
// 1. 保存配置到数据库/Redis
configDao.updateConfig(configKey, configValue);
// 2. 广播配置更新消息(Redis Pub/Sub)
ConfigUpdateEvent event = new ConfigUpdateEvent(configKey, configValue);
redisTemplate.convertAndSend("config-update-channel", event);
}
}
// 2. 应用节点(订阅者):接收消息并更新本地缓存
@Component
public class ConfigSubscriber {
@Autowired private LocalConfigCache localConfigCache;
// 订阅Redis频道
@RedisMessageListener(channel = "config-update-channel")
public void onConfigUpdate(ConfigUpdateEvent event) {
// 更新本地配置缓存(内存缓存,如Caffeine)
localConfigCache.update(event.getConfigKey(), event.getConfigValue());
log.info("配置更新:{}={}", event.getConfigKey(), event.getConfigValue());
}
}
What matters technically
- Redis Pub/Sub is simple, but it does not persist messages. If a node is offline, it can miss the update.
- If reliable broadcast is required, a RabbitMQ fanout exchange with durable queues is safer.
- Include a config version in the message so an older update cannot overwrite a newer one.
7. Preserving message order for stateful workflows
The problem
Some business flows must be processed in strict order. Order state transitions are the obvious case: created -> paid -> shipped -> completed. If a shipped message arrives before paid, the business logic breaks.
The MQ approach
Ordering is typically achieved with partitioning plus ordered consumption:
- On the producer side, messages for the same order are routed to the same partition or queue using the order ID.
- On the consumer side, each partition is handled serially so messages within that partition remain ordered.
Code example (RocketMQ)
// 1. 订单服务(生产者):按订单ID分区,保证同一订单消息有序
@Service
public class OrderStateService {
@Autowired private RocketMQTemplate rocketMQTemplate;
public void changeOrderState(String orderId, String oldState, String newState) {
OrderStateEvent event = new OrderStateEvent(orderId, oldState, newState);
// 发送顺序消息:第三个参数为shardingKey(订单ID),同一ID进入同一队列
rocketMQTemplate.syncSendOrderly(
"order-state-topic", // 主题
event,
orderId // 分片键:保证同一订单消息有序
);
}
}
// 2. 订单处理服务(消费者):顺序消费
@Service
@RocketMQMessageListener(
topic = "order-state-topic",
consumerGroup = "order-state-group",
consumeMode = ConsumeMode.ORDERLY // 顺序消费模式(单线程处理每个队列)
)
public class OrderStateConsumer implements RocketMQListener<OrderStateEvent> {
@Autowired private OrderService orderService;
@Override
public void onMessage(OrderStateEvent event) {
// 按顺序处理订单状态变更(如“支付”完成后才处理“发货”)
orderService.processStateChange(event.getOrderId(), event.getOldState(), event.getNewState());
}
}
What matters technically
- Partition counts should be chosen carefully. Too few partitions can cause backlog; too many can add operational overhead.
- If one partition fails during consumption, ideally only that partition should pause rather than blocking all others.
8. Using delayed messages instead of polling for scheduled actions
The problem
A common requirement is to cancel unpaid orders after 30 minutes. The traditional way is database polling, but that creates needless load and poor timeliness. If the scheduler runs once per minute, an order may be canceled up to a minute late.
The MQ approach
With delayed messages, the order service sends a message scheduled for 30 minutes later. When the consumer receives it, it checks whether the order has been paid. If not, it cancels the order and restores inventory.
Code example (RabbitMQ)
// 1. 订单服务(生产者):发送延迟消息
@Service
public class OrderService {
@Autowired private RabbitTemplate rabbitTemplate;
public void createOrder(Order order) {
// 1. 保存订单(状态:未支付)
order.setStatus("UNPAID");
orderDao.save(order);
// 2. 发送延迟30分钟的消息
OrderCreateEvent event = new OrderCreateEvent(order.getId());
rabbitTemplate.convertAndSend(
"order.delay.exchange", // 延迟交换机
"order.delay.routing", // 延迟路由键
event,
message -> {
// 设置延迟时间:30分钟(单位:毫秒)
message.getMessageProperties().setDelay(30 * 60 * 1000);
return message;
}
);
}
}
// 2. 订单超时处理服务(消费者):30分钟后检查
@Component
public class OrderTimeoutConsumer {
@Autowired private OrderService orderService;
@RabbitListener(queues = "order.delay.queue")
public void checkOrderPayment(OrderCreateEvent event) {
// 查询订单当前状态
Order order = orderDao.findById(event.getOrderId()).orElse(null);
if (order != null && "UNPAID".equals(order.getStatus())) {
// 超时未支付,取消订单
orderService.cancelOrder(order.getId(), "超时未支付");
// 恢复库存
inventoryService.restoreStock(order.getItems());
}
}
}
What matters technically
- RabbitMQ delayed delivery is roughly second-level in precision. If tighter timing is needed, RocketMQ scheduled messages may be more suitable.
- Delayed queues should be durable so messages survive broker restarts.
- Compared with alternatives:
9. Retrying messages when failures are temporary
The problem
Consumers often fail for reasons that are temporary: a database timeout, a downstream service glitch, or a short-lived network issue. Dropping such a message immediately means losing business data for a recoverable problem.
The MQ approach
A retry mechanism combined with a dead-letter queue works well:
- Temporary failures trigger requeue and retry, usually with a capped number of attempts.
- Permanent failures are moved to a dead-letter queue for investigation and later handling.
Code example (RabbitMQ)
@Service
@Slf4j
public class RetryableConsumer {
@Autowired private BusinessService businessService;
// 监听业务队列,手动ACK(控制消息确认)
@RabbitListener(queues = "business.queue")
public void processMessage(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 获取重试次数(自定义消息头)
Integer retryCount = message.getMessageProperties().getHeader("retry_count");
if (retryCount == null) retryCount = 0;
try {
// 1. 解析消息
String msgBody = new String(message.getBody(), StandardCharsets.UTF_8);
BusinessEvent event = JsonUtils.fromJson(msgBody, BusinessEvent.class);
// 2. 业务处理
businessService.process(event);
// 3. 处理成功,手动ACK(消息从队列删除)
channel.basicAck(deliveryTag, false);
} catch (TemporaryException e) {
// 临时故障(如数据库超时),重试(最多3次)
if (retryCount < 3) {
log.warn("临时故障,重试第{}次,消息:{}", retryCount + 1, msgBody, e);
// 设置重试次数,重新入队
message.getMessageProperties().setHeader("retry_count", retryCount + 1);
channel.basicNack(deliveryTag, false, true); // requeue=true:重新入队
} else {
// 重试超过3次,进入死信队列
log.error("重试3次失败,进入死信队列,消息:{}", msgBody, e);
channel.basicNack(deliveryTag, false, false); // requeue=false:不重新入队
}
} catch (PermanentException e) {
// 永久故障(如消息格式错误),直接进入死信队列
log.error("永久故障,进入死信队列,消息:{}", msgBody, e);
channel.basicNack(deliveryTag, false, false);
}
}
}
What matters technically
- Manual acknowledgment is essential. Do not confirm the message until business processing has actually succeeded.
- Retry intervals are often better when they back off gradually—for example 1 second, then 3, then 5—using delayed queues plus retry count.
- Dead-letter queues should be monitored regularly, and failed messages can be replayed once root causes are fixed.
10. Transaction messages for distributed transaction problems
The problem
Some actions span multiple services and still need atomic behavior. Consider order creation and stock deduction:
- If the order is created first and stock deduction fails, the system ends up with an invalid order and potential overselling.
- If stock is deducted first and order creation fails, inventory may stay locked incorrectly.
The MQ approach
RocketMQ transaction messages address this by combining local transaction execution with broker-coordinated message visibility:
- A half message is sent to MQ but not yet made consumable.
- The local transaction runs, such as inserting the order.
- If the local transaction succeeds, the message is committed and consumers can process it.
- If the broker does not receive a clear result, it performs a transaction check to determine the actual local state.
Code example
// 1. 订单服务(生产者):发送事务消息
@Service
public class OrderTransactionService {
@Autowired private RocketMQTemplate rocketMQTemplate;
@Autowired private OrderDao orderDao;
// 事务方法:创建订单+发送消息
@Transactional
public void createOrderWithTransaction(Order order) {
// 1. 发送半消息(不可消费)
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"order-tx-topic", // 事务消息主题
MessageBuilder.withPayload(new OrderCreatedEvent(order.getId()))
.setHeader("orderId", order.getId()) // 携带订单ID,用于回查
.build(),
order // 事务参数(传递给本地事务方法)
);
// 2. 检查半消息发送结果
if (!result.getLocalTransactionState().equals(LocalTransactionState.UNKNOWN)) {
throw new RuntimeException("事务消息发送失败,订单创建中止");
}
}
// 2. 本地事务执行器(RocketMQ回调)
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// arg为事务参数(订单对象)
Order order = (Order) arg;
// 执行本地事务:创建订单
orderDao.save(order);
// 本地事务成功,提交消息(下游可消费)
return RocketMQLocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
// 本地事务失败,回滚消息(消息删除)
return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 3. 事务回查(MQ未收到确认时调用)
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 从消息头获取订单ID
String orderId = msg.getHeaders().get("orderId", String.class);
// 回查本地事务状态:订单是否存在
Order order = orderDao.findById(orderId).orElse(null);
if (order != null) {
// 订单存在,提交消息
return RocketMQLocalTransactionState.COMMIT_MESSAGE;
} else {
// 订单不存在,回滚消息
return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;
}
}
}
}
// 4. 库存服务(消费者):消费事务消息,扣减库存
@Component
@RocketMQMessageListener(
topic = "order-tx-topic",
consumerGroup = "inventory-service-group"
)
public class InventoryTxConsumer implements RocketMQListener<OrderCreatedEvent> {
@Autowired private InventoryService inventoryService;
@Override
public void onMessage(OrderCreatedEvent event) {
// 扣减库存(需实现幂等,避免重复扣减)
inventoryService.deductStock(event.getOrderId());
}
}
What matters technically
- The inventory consumer must be idempotent, usually by deduplicating with the order ID before deducting stock.
- Transaction check logic has to be reliable, because this is what resolves uncertain broker states.
- If inventory deduction fails even after the message is committed, compensation is still required, such as restoring the order state, notifying the user, or routing the event to a dead-letter queue.
Choosing the right MQ for the job
Different systems need different strengths. In practice, these choices are common:
<table> <thead> <tr> <th>Requirement</th> <th>Recommended MQ</th> <th>Why</th> </tr> </thead> <tbody> <tr> <td>High throughput, such as log collection</td> <td>Kafka</td> <td>Very high throughput and strong persistence</td> </tr> <tr> <td>Transaction messages</td> <td>RocketMQ</td> <td>Native support for transactional messaging and transaction checks</td> </tr> <tr> <td>Complex routing, such as broadcast and selective delivery</td> <td>RabbitMQ</td> <td>Flexible exchange types including Fanout, Direct, and Topic</td> </tr> <tr> <td>Delayed delivery</td> <td>RabbitMQ / RocketMQ</td> <td>RabbitMQ supports delayed queues; RocketMQ supports scheduled messages</td> </tr> <tr> <td>Lightweight broadcast, such as config sync</td> <td>Redis Pub/Sub</td> <td>No separate MQ deployment needed, good for simple cases</td> </tr> </tbody> </table>Practices that matter regardless of product choice
A queue alone does not solve reliability. Several basics have to be in place:
- Idempotent consumers: every consumer should tolerate duplicate delivery by deduplicating with a message ID or a business key.
- Dead-letter queues: each important business queue should have one so failed messages are not silently lost.
- Monitoring and alerts: backlog size, consumption delay, and dead-letter counts should be tracked and alerted on when thresholds are exceeded.
- Performance tuning:
- Enable compression where appropriate, such as Kafka GZIP.
- Choose queue or partition counts carefully to avoid hotspots and buildup.
- Match consumer thread pools to queue/partition structure so workers are neither idle nor overloaded.
MQ is not a silver bullet. It adds complexity, operational overhead, and new failure modes. But in distributed systems, it is one of the most effective tools for handling decoupling, async execution, burst traffic, retries, ordering, and cross-service consistency. The real challenge is not merely adding a queue, but choosing the right messaging pattern for the business problem and backing it with solid guarantees around reliability, idempotency, and observability.