package cn.van.business.mq; import com.alibaba.fastjson2.JSONObject; import jakarta.annotation.PostConstruct; import lombok.SneakyThrows; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import java.nio.charset.StandardCharsets; @Service public class MessageProducerService { private static final Logger logger = LoggerFactory.getLogger(MessageProducerService.class); private static final String topic = "wx-message"; private final RocketMQTemplate rocketMQTemplate; public MessageProducerService(RocketMQTemplate rocketMQTemplate ) { this.rocketMQTemplate = rocketMQTemplate; } @PostConstruct public void init() { if (rocketMQTemplate == null) { throw new IllegalStateException("RocketMQTemplate not initialized!"); } } @SneakyThrows public void sendMessage(JSONObject data) { // 消息结构校验 if (!data.containsKey("type") || !data.containsKey("data")) { logger.error("非法消息格式:{}", data); throw new IllegalArgumentException("消息必须包含type和data字段"); } // 新增校验 if (!data.getJSONObject("data").containsKey("wxid")) { throw new IllegalArgumentException("消息必须包含wxid字段"); } // 构建Spring Message Message message = MessageBuilder .withPayload(new String(data.toJSONString().getBytes(StandardCharsets.UTF_8), StandardCharsets.UTF_8)) .setHeader(RocketMQHeaders.TAGS, "wx") .build(); // 发送消息 rocketMQTemplate.send(topic, message); logger.debug("消息已发送:{}", data); } }