package cn.van.business.mq; import cn.van.business.util.WxtsUtil; import com.alibaba.fastjson2.JSONObject; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * @author Leo * @version 1.0 * @create 2024/12/1 上午2:06 * @description: */ @Service @RocketMQMessageListener(topic = "wx-message", consumerGroup = "${rocketmq.consumer.group}", nameServer = "${rocketmq.name-server}", consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式 consumeThreadNumber = 4) public class MessageConsumerService implements RocketMQListener { private static final Logger logger = LoggerFactory.getLogger(MessageConsumerService.class); //private static final RateLimiter rateLimiter = RateLimiter.create(4, // 1 QPS // 1, // 预热期 5 秒 // TimeUnit.SECONDS); private final WxtsUtil wxtsUtil; @Autowired public MessageConsumerService(WxtsUtil wxtsUtil) { this.wxtsUtil = wxtsUtil; } @Override public void onMessage(JSONObject message) { try { logger.info("消费消息:{}", message); // 解析消息类型和数据 //String type = message.getString("type"); JSONObject data = message.getJSONObject("data"); // //if (data == null) { // logger.error("消息数据为空:{}", message); // return; //} // String wxid = data.getString("wxid"); //if (wxid == null || wxid.isEmpty()) { // logger.error("消息缺少wxid字段:{}", message); // return; //} // 根据消息类型调用不同的wxts接口 // 发送文本消息 String content = data.getString("msg"); Integer msgType = data.getInteger("msgType"); String fromWxid = data.getString("fromWxid"); Boolean hiddenTime = data.getBoolean("hiddenTime"); String touser = data.getString("touser"); // 获取接收人参数 wxtsUtil.sendWxTextMessage(wxid, content, msgType, fromWxid, hiddenTime, touser); } catch (Exception e) { logger.error("消息处理失败,原始消息:{}", message, e); wxtsUtil.sendNotify("系统异常:" + e.getMessage()); } } }