Files
Jarvis_java/src/main/java/cn/van/business/mq/MessageConsumerService.java
雷欧(林平凡) 727b1a5200 菜单优化
2025-03-12 17:10:20 +08:00

81 lines
3.0 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package cn.van.business.mq;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.http.HttpRequest;
import cn.van.business.util.WxtsUtil;
import com.alibaba.fastjson2.JSONObject;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
import static cn.van.business.util.WXUtil.WX_BASE_URL;
/**
* @author Leo
* @version 1.0
* @create 2024/12/1 上午2:06
* @description
*/
@Service
@RocketMQMessageListener(topic = "wx-message", consumerGroup = "${rocketmq.consumer.group}", nameServer = "${rocketmq.name-server}"
)
public class MessageConsumerService implements RocketMQListener<JSONObject> {
private static final Logger logger = LoggerFactory.getLogger(MessageConsumerService.class);
private static final RateLimiter rateLimiter = RateLimiter.create(2, // 1 QPS
0, // 预热期 5 秒
TimeUnit.SECONDS);
private final WxtsUtil wxtsUtil;
@Autowired
public MessageConsumerService(WxtsUtil wxtsUtil) {
this.wxtsUtil = wxtsUtil;
}
@Override
public void onMessage(JSONObject message) {
try {
//logger.info("[RateLimiter] 开始处理消息,当前时间:{}", System.currentTimeMillis());
rateLimiter.acquire();
//logger.info("[RateLimiter] 获得令牌,当前时间:{}", System.currentTimeMillis());
//logger.debug("构造完成的消息结构:{}", requestBody.toJSONString());
// 4. 发送请求(保持原有)
String responseStr;
responseStr = HttpRequest.post(WX_BASE_URL).body(message.toJSONString()).execute().body();
// ... [保持原有响应处理逻辑]
if (ObjectUtil.isNotEmpty(responseStr)) {
JSONObject response = JSONObject.parseObject(responseStr);
//logger.info("消息成功发送并得到响应:{}", response);
if (response.getInteger("code") != 200) {
// TODO: 如果需要处理错误,您可以在这里添加逻辑
wxtsUtil.sendNotify("消息发送失败: " + responseStr);
throw new RuntimeException("消息发送失败: " + responseStr);
}
//logger.info("消息成功发送并得到响应:{}", response);
} else {
wxtsUtil.sendNotify("消息发送失败,没有收到响应");
throw new RuntimeException("消息发送失败,没有收到响应");
}
} catch (Exception e) {
logger.error("消息处理失败,原始消息:{}", message, e);
wxtsUtil.sendNotify("系统异常:" + e.getMessage());
}
}
}