diff --git a/src/main/java/cn/van/business/controller/jd/OrderController.java b/src/main/java/cn/van/business/controller/jd/OrderController.java index 038355c..97f6e01 100644 --- a/src/main/java/cn/van/business/controller/jd/OrderController.java +++ b/src/main/java/cn/van/business/controller/jd/OrderController.java @@ -1,6 +1,8 @@ package cn.van.business.controller.jd; +import cn.van.business.mq.MessageProducerService; import cn.van.business.util.JDUtils; +import com.alibaba.fastjson2.JSONObject; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; @@ -20,8 +22,10 @@ public class OrderController { public static String TOKEN = "cc0313"; @Resource private JDUtils jdUtils; + @Resource + private MessageProducerService messageProducerService; - public boolean checkToken (String token){ + public boolean checkToken(String token) { return TOKEN.equals(token); } @@ -29,11 +33,17 @@ public class OrderController { @ResponseBody public String refreshHistory(String token) throws Exception { if (checkToken(token)) { - jdUtils.fetchHistoricalOrders(); - } return "OK"; } + @RequestMapping("/mq") + @ResponseBody + public String mq() { + JSONObject jsonObject = new JSONObject(); + messageProducerService.sendMessage(jsonObject); + return "OK"; + } + } diff --git a/src/main/java/cn/van/business/mq/MessageConsumerService.java b/src/main/java/cn/van/business/mq/MessageConsumerService.java index 3116f9e..695bd8b 100644 --- a/src/main/java/cn/van/business/mq/MessageConsumerService.java +++ b/src/main/java/cn/van/business/mq/MessageConsumerService.java @@ -19,7 +19,7 @@ import static cn.van.business.util.WXUtil.WX_BASE_URL; * @description: */ @Service -@RocketMQMessageListener(topic = "wx-message", consumerGroup = "${rocketmq.consumer.group}") +@RocketMQMessageListener(topic = "wx-message", consumerGroup = "${rocketmq.consumer.group}",nameServer = "${rocketmq.name-server}") public class MessageConsumerService implements RocketMQListener { private static final Logger logger = LoggerFactory.getLogger(MessageConsumerService.class); diff --git a/src/main/java/cn/van/business/mq/MessageProducerService.java b/src/main/java/cn/van/business/mq/MessageProducerService.java index fc5bab1..20d4380 100644 --- a/src/main/java/cn/van/business/mq/MessageProducerService.java +++ b/src/main/java/cn/van/business/mq/MessageProducerService.java @@ -1,16 +1,12 @@ package cn.van.business.mq; -import cn.van.business.util.WXUtil; import com.alibaba.fastjson2.JSONObject; import lombok.SneakyThrows; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.client.producer.MQProducer; +import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.apache.rocketmq.common.message.Message; import org.springframework.stereotype.Service; /** @@ -22,38 +18,21 @@ import org.springframework.stereotype.Service; @Service public class MessageProducerService { - private static final Logger logger = LoggerFactory.getLogger(MessageProducerService.class); private static final String topic = "wx-message"; - private static volatile DefaultMQProducer shareProducer; + private final RocketMQTemplate rocketMQTemplate; - public MQProducer getProducer() throws MQClientException { - if (shareProducer == null) { - synchronized (MQProducer.class) { - if (shareProducer == null) { - shareProducer = createProducer(); - } - } - } - return shareProducer; - } - - public DefaultMQProducer createProducer() throws MQClientException { - - DefaultMQProducer producer = new DefaultMQProducer("wx_producer"); - producer.setNamesrvAddr("192.168.8.88:9876"); - producer.start(); - - logger.info("shareProducer[{}|{}]", "wx_producer", producer.getNamesrvAddr()); - - return producer; + public MessageProducerService(RocketMQTemplate rocketMQTemplate) { + this.rocketMQTemplate = rocketMQTemplate; } @SneakyThrows public void sendMessage(JSONObject jsonObject) { Message message = new Message(topic, jsonObject.toJSONString().getBytes()); - getProducer().send(message); + message.setTags("wx"); + this.rocketMQTemplate.convertAndSend(topic, message); + } } diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 3ea5800..069675e 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -65,4 +65,4 @@ rocketmq: group: wx_consumer # 消费者组名 consume-thread-min: 20 # 消费线程池最小线程数 consume-thread-max: 64 # 消费线程池最大线程数 - consume-message-batch-max-size: 20 # 批量消费最大消息数 + consume-message-batch-max-size: 64 # 批量消费最大消息数