package cn.van.business.mq; import cn.hutool.core.util.ObjectUtil; import cn.hutool.http.HttpRequest; import cn.van.business.util.WxtsUtil; import com.alibaba.fastjson2.JSONObject; import com.google.common.util.concurrent.RateLimiter; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.concurrent.TimeUnit; import static cn.van.business.util.WXUtil.WX_BASE_URL; /** * @author Leo * @version 1.0 * @create 2024/12/1 上午2:06 * @description: */ @Service @RocketMQMessageListener(topic = "wx-message", consumerGroup = "${rocketmq.consumer.group}", nameServer = "${rocketmq.name-server}", consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式 consumeThreadNumber = 1) public class MessageConsumerService implements RocketMQListener { private static final Logger logger = LoggerFactory.getLogger(MessageConsumerService.class); private static final RateLimiter rateLimiter = RateLimiter.create(4, // 1 QPS 0, // 预热期 5 秒 TimeUnit.SECONDS); private final WxtsUtil wxtsUtil; @Autowired public MessageConsumerService(WxtsUtil wxtsUtil) { this.wxtsUtil = wxtsUtil; } @Override public void onMessage(JSONObject message) { try { //logger.info("[RateLimiter] 开始处理消息,当前时间:{}", System.currentTimeMillis()); rateLimiter.acquire(); //logger.info("[RateLimiter] 获得令牌,当前时间:{}", System.currentTimeMillis()); //logger.debug("构造完成的消息结构:{}", requestBody.toJSONString()); // 4. 发送请求(保持原有) String responseStr; responseStr = HttpRequest.post(WX_BASE_URL).body(message.toJSONString()).execute().body(); // ... [保持原有响应处理逻辑] if (ObjectUtil.isNotEmpty(responseStr)) { JSONObject response = JSONObject.parseObject(responseStr); //logger.info("消息成功发送并得到响应:{}", response); if (response.getInteger("code") != 200) { // TODO: 如果需要处理错误,您可以在这里添加逻辑 wxtsUtil.sendNotify("消息发送失败: " + responseStr); throw new RuntimeException("消息发送失败: " + responseStr); } //logger.info("消息成功发送并得到响应:{}", response); } else { wxtsUtil.sendNotify("消息发送失败,没有收到响应"); throw new RuntimeException("消息发送失败,没有收到响应"); } } catch (Exception e) { logger.error("消息处理失败,原始消息:{}", message, e); wxtsUtil.sendNotify("系统异常:" + e.getMessage()); } } }