背景痛点:自建代充值系统的三座大山
接口超时导致用户重复点击
直接调用 OpenAI 充值接口平均 RT 1.8 s,99 分位 4.5 s。前端 5 s 超时即提示“失败”,用户刷新再次点击,后端产生两条扣款请求,造成重复扣款率 3.7 %。并发竞争引发库存超卖
账号池采用 MySQLupdate ... set status=1 where status=0 limit 1抢占,高并发下间隙锁冲突,TPS 峰值 120 时 CPU 飙至 90 %,出现“同一账号被分配两次”的线上事故。支付回调幂等缺失
微信/支付宝回调 5 s 重试一次,业务层未做幂等,同一订单被重复加款 2~3 次,月度差错金额最高达 1.3 万元。
技术选型:为什么不是直接调 API
| 维度 | 同步直调 | 消息队列方案 |
|---|---|---|
| 接口超时容错 | 无,需客户端重试 | 队列兜底,自动重试 |
| 并发能力 | 受限于 Tomcat 线程池 | 消费端水平扩展 |
| 幂等控制 | 依赖数据库唯一索引 | 分布式锁 + 消息去重 |
| 削峰填谷 | 无 | 队列缓冲突发流量 |
选择 RabbitMQ 而非 Kafka 的原因:
- 单队列百万级 TPS 已满足当前 5 k 订单/日峰值
- 原生支持死信队列(DLQ),方便隔离异常订单
- 管理界面完备,中小团队运维成本低
Redis 选用 6.2 集群模式,支持 Redlock 与 Lua 脚本,保障分布式锁性能 < 1 ms。
核心实现
Spring Boot 异步任务框架
# application.yml spring: rabbitmq: host: rmq-cluster.internal port: 5672 username: ${RMQ_USER} password: ${RMQ_PWD} publisher-confirm-type: correlated listener: simple: prefetch: 16 concurrency: 8 max-concurrency: 32 acknowledge-mode: manual@Component @Slf4j public class RechargeConsumer { @Autowired private RedisLock redisLock; @Autowired private OpenAiService openAiService; @RabbitListener(queues = "q.recharge", containerFactory = "manualContainerFactory") public void handle(RechargeMsg msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) { log.info("consume orderId={}", msg.getOrderId()); try { if (!redisLock.tryLock("order:" + msg.getOrderId(), 10)) { // 消息去重 channel.basicAck(tag, false); return; } openAiService.recharge(msg); channel.basicAck(tag, false); } catch (Exception e) { log.error("consume error", e); channel.basicNack(tag, false, false); // 进入 DLQ } finally { redisLock.unlock("order:" +.getOrderId()); } } }Redis 分布式锁(可重入优化)
@Component public class RedisLock { private final StringRedisTemplate tpl; private final ThreadLocal<Map<String, Integer>> holdCount = ThreadLocal.withInitial(HashMap::new); public boolean tryLock(String key, int seconds) { String val = UUID.fastUUID().toString(); Boolean ok = tpl.opsForValue().setIfAbsent(key, val, seconds, TimeUnit.SECONDS); if (Boolean.TRUE.equals(ok)) { holdCount.get().put(key, 1); return true; } // 可重入逻辑 String curr = tpl.opsForValue().get(key); if (val.equals(curr)) { holdCount.get().merge(key, 1, Integer::sum); return true; } return false; } public void unlock(String key) { int cnt = holdCount.get().getOrDefault(key, 0); if (cnt > 1) { holdCount.get().put(key, cnt - 1); return; } String lua = "if redis.call('get', KEYS[1]) == ARGV[1] then " + " return redis.call('del', KEYS[1]) " + "else return 0 end"; tpl.execute(lua, Collections.singletonList(key), UUID.fastUUID().toString()); holdCount.get().remove(key); } }支付回调幂等处理
@RestController @RequestMapping("/cb") @Slf4j public class PayCallbackController { @Autowired private OrderService orderService; @PostMapping("/alipay") public String alipay(HttpServletRequest req) throws UnsupportedEncodingException { Map<String, String> params = convertParams(req); String orderId = params.get("out_trade_no"); // 1. 幂等校验 String key = "alipay:cb:" + orderId; Boolean exist = RedisUtils.setIfAbsent(key, "1", 300, TimeUnit.SECONDS); if (!exist) { return "success"; } // 2. 验签 boolean signOk = AlipaySignature.rsaCheckV1(params, ALIPUBKEY, "UTF-8", "RSA2"); if (!signOk) throw new IllegalArgumentException("sign fail"); // 3. 发货 orderService.deliver(orderId); return "success"; } }性能测试:优化前后对比
JMeter 5.5,200 线程,循环 50 次,压测环境 4C8G × 3 台。
| 指标 | 同步直调 | 队列+锁优化 |
|---|---|---|
| 平均 TPS | 124 | 580 |
| 99 % RT | 4.2 s | 380 ms |
| 重复扣款 | 3.7 % | 0 % |
| 充值成功率 | 92.3 % | 97.8 % |
避坑指南
支付证书管理
将alipayCertPublicKey_RSA2.crt与wechatpay_cert.pem存入 K8s Secret,挂载为只读卷;每 90 天通过定时任务拉取新证书并滚动更新,防止因证书过期导致回调验签失败。DLQ 死信队列配置
在 RabbitMQ 中声明q.recharge.dlq,设置x-message-length=3,超过 3 次投递进入 DLQ,配合钉钉群机器人告警,平均 5 分钟内可感知异常订单。时钟同步
分布式锁依赖 TTL,节点时钟漂移 > 1 s 时可能出现“锁提前过期”。使用 Chrony 同步 NTP,每日漂移控制在 10 ms 以内;同时把锁 TTL 设置成业务最大耗时的 3 倍,留足 buffer。
延伸思考:Kafka 更高并发场景
当订单量突破 5 w/日,RabbitMQ 单队列成为瓶颈。可平滑迁移至 Kafka:
- 按
orderId哈希到 32 分区,提高并行度 - 利用 Kafka 幂等生产者(
enable.idempotence=true)去重,替代 Redis 锁 - 采用事务消息(Transaction)实现“充值 + 订单状态变更”原子写入,避免分布式事务
读者可基于本文代码骨架,将@RabbitListener替换为 Spring-Kafka@KafkaListener,压测数据显示 TPS 可再提升 2.8 倍。
如果希望亲手搭建一套可运行的实时语音交互系统,体验“ASR→LLM→TTS”完整链路,可尝试火山引擎提供的从0打造个人豆包实时通话AI动手实验。实验内置 Web 模板与免费额度,本地 30 分钟即可跑通,对理解异步消息与流式响应同样具有借鉴意义。