This commit is contained in:
Leo
2024-12-01 15:22:15 +08:00
parent 6ba281b7ed
commit 4ef5ae256b
4 changed files with 22 additions and 33 deletions

View File

@@ -1,6 +1,8 @@
package cn.van.business.controller.jd; package cn.van.business.controller.jd;
import cn.van.business.mq.MessageProducerService;
import cn.van.business.util.JDUtils; import cn.van.business.util.JDUtils;
import com.alibaba.fastjson2.JSONObject;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
@@ -20,8 +22,10 @@ public class OrderController {
public static String TOKEN = "cc0313"; public static String TOKEN = "cc0313";
@Resource @Resource
private JDUtils jdUtils; private JDUtils jdUtils;
@Resource
private MessageProducerService messageProducerService;
public boolean checkToken (String token){ public boolean checkToken(String token) {
return TOKEN.equals(token); return TOKEN.equals(token);
} }
@@ -29,11 +33,17 @@ public class OrderController {
@ResponseBody @ResponseBody
public String refreshHistory(String token) throws Exception { public String refreshHistory(String token) throws Exception {
if (checkToken(token)) { if (checkToken(token)) {
jdUtils.fetchHistoricalOrders(); jdUtils.fetchHistoricalOrders();
} }
return "OK"; return "OK";
} }
@RequestMapping("/mq")
@ResponseBody
public String mq() {
JSONObject jsonObject = new JSONObject();
messageProducerService.sendMessage(jsonObject);
return "OK";
}
} }

View File

@@ -19,7 +19,7 @@ import static cn.van.business.util.WXUtil.WX_BASE_URL;
* @description * @description
*/ */
@Service @Service
@RocketMQMessageListener(topic = "wx-message", consumerGroup = "${rocketmq.consumer.group}") @RocketMQMessageListener(topic = "wx-message", consumerGroup = "${rocketmq.consumer.group}",nameServer = "${rocketmq.name-server}")
public class MessageConsumerService implements RocketMQListener<JSONObject> { public class MessageConsumerService implements RocketMQListener<JSONObject> {
private static final Logger logger = LoggerFactory.getLogger(MessageConsumerService.class); private static final Logger logger = LoggerFactory.getLogger(MessageConsumerService.class);

View File

@@ -1,16 +1,12 @@
package cn.van.business.mq; package cn.van.business.mq;
import cn.van.business.util.WXUtil;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
/** /**
@@ -22,38 +18,21 @@ import org.springframework.stereotype.Service;
@Service @Service
public class MessageProducerService { public class MessageProducerService {
private static final Logger logger = LoggerFactory.getLogger(MessageProducerService.class); private static final Logger logger = LoggerFactory.getLogger(MessageProducerService.class);
private static final String topic = "wx-message"; private static final String topic = "wx-message";
private static volatile DefaultMQProducer shareProducer; private final RocketMQTemplate rocketMQTemplate;
public MQProducer getProducer() throws MQClientException { public MessageProducerService(RocketMQTemplate rocketMQTemplate) {
if (shareProducer == null) { this.rocketMQTemplate = rocketMQTemplate;
synchronized (MQProducer.class) {
if (shareProducer == null) {
shareProducer = createProducer();
}
}
}
return shareProducer;
}
public DefaultMQProducer createProducer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("wx_producer");
producer.setNamesrvAddr("192.168.8.88:9876");
producer.start();
logger.info("shareProducer[{}|{}]", "wx_producer", producer.getNamesrvAddr());
return producer;
} }
@SneakyThrows @SneakyThrows
public void sendMessage(JSONObject jsonObject) { public void sendMessage(JSONObject jsonObject) {
Message message = new Message(topic, jsonObject.toJSONString().getBytes()); Message message = new Message(topic, jsonObject.toJSONString().getBytes());
getProducer().send(message); message.setTags("wx");
this.rocketMQTemplate.convertAndSend(topic, message);
} }
} }

View File

@@ -65,4 +65,4 @@ rocketmq:
group: wx_consumer # 消费者组名 group: wx_consumer # 消费者组名
consume-thread-min: 20 # 消费线程池最小线程数 consume-thread-min: 20 # 消费线程池最小线程数
consume-thread-max: 64 # 消费线程池最大线程数 consume-thread-max: 64 # 消费线程池最大线程数
consume-message-batch-max-size: 20 # 批量消费最大消息数 consume-message-batch-max-size: 64 # 批量消费最大消息数