稳定版。没重构之前。稳定的限流。

This commit is contained in:
Leo
2025-03-04 14:29:48 +08:00
parent 72b7a125e0
commit 645b025172
16 changed files with 182 additions and 130 deletions

52
pom.xml
View File

@@ -15,9 +15,9 @@
<properties>
<java.version>17</java.version>
<spring-boot.version>3.1.5</spring-boot.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<spring-boot.version>3.1.5</spring-boot.version> <!-- 推荐使用最新3.1.x -->
<rocketmq.version>2.3.2</rocketmq.version>
<maven.compiler.release>17</maven.compiler.release>
</properties>
@@ -54,16 +54,14 @@
<artifactId>mysql-connector-j</artifactId>
<version>8.2.0</version>
</dependency>
<!-- 修改RocketMQ依赖 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.3.1-jre</version>
<version>${rocketmq.version}</version>
</dependency>
<!-- Spring Boot Starter Test测试依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
@@ -108,29 +106,38 @@
<artifactId>jdk</artifactId>
<version>2.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- pom.xml -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-commons</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
</dependencies>
<!-- Maven 插件 -->
<build>
<plugins>
<!-- 核心配置:必须启用 spring-boot-maven-plugin -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>3.1.5</version> <!-- 与你的Spring Boot版本一致 -->
<configuration>
<!-- 显式指定主类(可选,建议添加) -->
<mainClass>cn.van.Application</mainClass>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal> <!-- 关键生成可执行JAR -->
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
@@ -140,5 +147,12 @@
<name>Local Repository</name>
<url>http://192.168.8.88:8081/repository/maven-local88/</url>
</repository>
<!-- 在pom.xml的repositories节点中添加RocketMQ官方仓库 -->
<repository>
<id>rocketmq-repo</id>
<name>RocketMQ Repository</name>
<url>https://repo1.maven.org/maven2/org/apache/rocketmq/</url>
</repository>
</repositories>
</project>

View File

@@ -1,8 +1,10 @@
package cn.van;
import jakarta.annotation.PostConstruct;
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;
import org.springframework.core.env.Environment;
import org.springframework.scheduling.annotation.EnableScheduling;
@@ -16,6 +18,7 @@ import java.util.Arrays;
*/
@SpringBootApplication
@EnableScheduling
@Import(RocketMQAutoConfiguration.class)
public class Application {
private final Environment env;

View File

@@ -1,15 +0,0 @@
package cn.van.business.config;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RocketMQConfig {
@Bean
public RocketMQTemplate rocketMQTemplate() {
System.out.println("RocketMQTemplate init");
return new RocketMQTemplate();
}
}

View File

@@ -1,12 +1,12 @@
package cn.van.business.config;
import jakarta.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import jakarta.annotation.PreDestroy; // 使用 jakarta.annotation 包
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;

View File

@@ -44,7 +44,7 @@ public enum WXReqType {
*/
GET_WX_LIST("getWeChatList", "获取微信列表"),
GET_WX_STATUS("checkWeChat", "微信状态检测"),
SEND_TEXT_MESSAGE("sendText", "发送文本消息"),
SEND_TEXT_MESSAGE("sendText2", "发送文本消息"),
UPDATE_DOWNLOAD_IMAGE("Q0002", "修改下载图片"),
GET_USER_INFO("Q0003", "获取个人信息"),
QUERY_OBJECT_INFO("Q0004", "查询对象信"),

View File

@@ -8,7 +8,6 @@ package cn.van.business.model.jd;
*/
import jakarta.persistence.*;
import jakarta.persistence.TemporalType;
import java.util.Date;

View File

@@ -1,6 +1,7 @@
package cn.van.business.model.jd;
import jakarta.persistence.*;
import java.util.Date;
/**

View File

@@ -2,8 +2,6 @@ package cn.van.business.model.jd;
import jakarta.persistence.*;
import java.util.Set;
/**
* sku对应的商品信息
*/

View File

@@ -2,9 +2,6 @@ package cn.van.business.model.jd;
import jakarta.persistence.*;
import java.util.List;
import java.util.Set;
/**
* 实体类,用于存储商品类型信息。
*/

View File

@@ -2,10 +2,11 @@ package cn.van.business.mq;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.http.HttpRequest;
import cn.hutool.json.JSONUtil;
import cn.van.business.util.WxtsUtil;
import com.alibaba.fastjson2.JSONObject;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
@@ -13,10 +14,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.concurrent.TimeUnit;
import static cn.hutool.core.thread.ThreadUtil.sleep;
import static cn.van.business.util.WXUtil.WX_BASE_URL;
/**
@@ -26,14 +25,17 @@ import static cn.van.business.util.WXUtil.WX_BASE_URL;
* @description
*/
@Service
@RocketMQMessageListener(topic = "wx-message", consumerGroup = "${rocketmq.consumer.group}", nameServer = "${rocketmq.name-server}")
@RocketMQMessageListener(topic = "wx-message", consumerGroup = "${rocketmq.consumer.group}", nameServer = "${rocketmq.name-server}", consumeMode = ConsumeMode.ORDERLY // 顺序消费(单线程)
)
public class MessageConsumerService implements RocketMQListener<JSONObject> {
private static final Logger logger = LoggerFactory.getLogger(MessageConsumerService.class);
private static final RateLimiter rateLimiter = RateLimiter.create(0.5, // 1 QPS
5, // 预热期 5 秒
TimeUnit.SECONDS);
private final WxtsUtil wxtsUtil;
// create a rate limiter of 1 qps
RateLimiter rateLimiter = RateLimiter.create(0.5);
@Autowired
public MessageConsumerService(WxtsUtil wxtsUtil) {
@@ -41,23 +43,23 @@ public class MessageConsumerService implements RocketMQListener<JSONObject> {
}
@Override
public void onMessage(JSONObject jsonObject) {
// 处理消息
public void onMessage(JSONObject message) {
try {
rateLimiter.acquire(); // 请求许可。如果超过速率,则此方法会阻塞
String body = jsonObject.getString("body");
byte[] decodedBody = Base64.getDecoder().decode(body);
String decodedBodyStr = new String(decodedBody, StandardCharsets.UTF_8);
JSONObject decodedBodyJson = JSONObject.parseObject(decodedBodyStr);
String jsonStr = JSONUtil.toJsonStr(decodedBodyJson);
String responseStr = HttpRequest.post(WX_BASE_URL)
.body(jsonStr)
.execute()
.body();
logger.info("消费消息:{}", jsonStr);
//logger.info("[RateLimiter] 开始处理消息,当前时间:{}", System.currentTimeMillis());
rateLimiter.acquire();
//logger.info("[RateLimiter] 获得令牌,当前时间:{}", System.currentTimeMillis());
//logger.debug("构造完成的消息结构:{}", requestBody.toJSONString());
// 4. 发送请求(保持原有)
String responseStr;
responseStr = HttpRequest.post(WX_BASE_URL).body(message.toJSONString()).execute().body();
// ... [保持原有响应处理逻辑]
if (ObjectUtil.isNotEmpty(responseStr)) {
JSONObject response = JSONObject.parseObject(responseStr);
logger.info("消息成功发送并得到响应:{}", response);
//logger.info("消息成功发送并得到响应:{}", response);
if (response.getInteger("code") != 200) {
// TODO: 如果需要处理错误,您可以在这里添加逻辑
wxtsUtil.sendNotify("消息发送失败: " + responseStr);
@@ -69,9 +71,10 @@ public class MessageConsumerService implements RocketMQListener<JSONObject> {
throw new RuntimeException("消息发送失败,没有收到响应");
}
} catch (Exception e) {
//logger.error("处理消息时发生错误", e);
throw e; // 重抛异常使得 RocketMQ 可以捕获到这个异常
logger.error("消息处理失败,原始消息:{}", message, e);
wxtsUtil.sendNotify("系统异常:" + e.getMessage());
}
}
}
}

View File

@@ -1,36 +1,56 @@
package cn.van.business.mq;
import com.alibaba.fastjson2.JSONObject;
import jakarta.annotation.PostConstruct;
import lombok.SneakyThrows;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
/**
* @author Leo
* @version 1.0
* @create 2024/12/1 上午2:06
* @description
*/
@Service
public class MessageProducerService {
private static final Logger logger = LoggerFactory.getLogger(MessageProducerService.class);
private static final String topic = "wx-message";
private final RocketMQTemplate rocketMQTemplate;
public MessageProducerService(RocketMQTemplate rocketMQTemplate) {
public MessageProducerService(RocketMQTemplate rocketMQTemplate
) {
this.rocketMQTemplate = rocketMQTemplate;
}
@PostConstruct
public void init() {
if (rocketMQTemplate == null) {
throw new IllegalStateException("RocketMQTemplate not initialized!");
}
}
@SneakyThrows
public void sendMessage(JSONObject jsonObject) {
Message message = new Message(topic, jsonObject.toJSONString().getBytes());
message.setTags("wx");
rocketMQTemplate.convertAndSend(topic, message);
public void sendMessage(JSONObject data) {
// 消息结构校验
if (!data.containsKey("type") || !data.containsKey("data")) {
logger.error("非法消息格式:{}", data);
throw new IllegalArgumentException("消息必须包含type和data字段");
}
// 新增校验
if (!data.getJSONObject("data").containsKey("wxid")) {
throw new IllegalArgumentException("消息必须包含wxid字段");
}
// 构建Spring Message
Message<String> message = MessageBuilder
.withPayload(data.toJSONString())
.setHeader(RocketMQHeaders.TAGS, "wx")
.build();
// 发送消息
rocketMQTemplate.send(topic, message);
logger.debug("消息已发送:{}", data);
}
}

View File

@@ -6,7 +6,9 @@ import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.Calendar;
import java.util.Date;
import java.util.Locale;
/**
* 日期工具类

View File

@@ -16,11 +16,15 @@ import com.jd.open.api.sdk.request.kplunion.UnionOpenOrderRowQueryRequest;
import com.jd.open.api.sdk.request.kplunion.UnionOpenPromotionBysubunionidGetRequest;
import com.jd.open.api.sdk.response.kplunion.UnionOpenOrderRowQueryResponse;
import com.jd.open.api.sdk.response.kplunion.UnionOpenPromotionBysubunionidGetResponse;
import io.github.resilience4j.core.functions.CheckedRunnable;
import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RequestNotPermitted;
import lombok.Getter;
import lombok.Setter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
@@ -85,7 +89,9 @@ public class JDUtil {
// 构造函数中注入StringRedisTemplate
@Autowired
public JDUtil(StringRedisTemplate redisTemplate,
ProductOrderRepository productOrderRepository,OrderRowRepository orderRowRepository, WXUtil wxUtil, OrderUtil orderUtil) {
ProductOrderRepository productOrderRepository,
OrderRowRepository orderRowRepository, WXUtil wxUtil,
OrderUtil orderUtil) {
this.redisTemplate = redisTemplate;
this.orderRowRepository = orderRowRepository;
this.productOrderRepository = productOrderRepository;
@@ -122,7 +128,7 @@ private static class UserInteractionState {
/**
* 将 响应参数转化为 OrderRow并返回
*/
private static OrderRow createOrderRow(OrderRowResp orderRowResp) {
private OrderRow createOrderRow(OrderRowResp orderRowResp) {
OrderRow orderRow = new OrderRow();
orderRow.setOrderId(orderRowResp.getOrderId());
orderRow.setSkuId(orderRowResp.getSkuId());
@@ -211,49 +217,46 @@ private static class UserInteractionState {
}
/**
* 实时刷新最近10分钟的订单
* 实时刷新最近10分钟的订单Resilience4j限流集成
*/
@Scheduled(cron = "0 * * * * ?")
public void fetchLatestOrder() {
try {
LocalDateTime now = LocalDateTime.now();
LocalDateTime lastMinute = now.minusMinutes(10).withSecond(0).withNano(0);
/**临时代码*/
/**下面是原先的代码*/
for (Map.Entry<String, WXUtil.SuperAdmin> entry : super_admins.entrySet()) {
//String wxid = entry.getKey();
WXUtil.SuperAdmin admin = entry.getValue();
String appKey = admin.getAppKey();
String secretKey = admin.getSecretKey();
if (Util.isAnyEmpty(appKey, secretKey)) {
continue;
}
// 如果当前分钟数刚好是整10就打印
if (now.getMinute() % 10 == 0) {
logger.info("实时订单 {} ", appKey.substring(appKey.length() - 4));
UnionOpenOrderRowQueryResponse response = fetchOrdersForDateTime(lastMinute, true, 1, true, appKey, secretKey); // 真实代表实时订单
if (response != null) {
}
UnionOpenOrderRowQueryResponse response = fetchOrdersForDateTime(
lastMinute, true, 1, true, appKey, secretKey
);
int code = response.getQueryResult().getCode();
if (code == 200) {
if (response.getQueryResult().getCode() == 200) {
if (response != null && response.getQueryResult() != null
&& response.getQueryResult().getCode() == 200) {
OrderRowResp[] orderRowResps = response.getQueryResult().getData();
if (orderRowResps == null) {
continue;
}
for (OrderRowResp orderRowResp : orderRowResps) {
// 固化到数据库
OrderRow orderRow = createOrderRow(orderRowResp);
// 订单号不存在就保存,存在就更新订单状态
orderRowRepository.save(orderRow);
}
if (orderRowResps == null) continue;
Arrays.stream(orderRowResps)
.map(this::createOrderRow)
.forEach(orderRowRepository::save);
}
}
} catch (Exception e) {
logger.error("调度任务异常", e);
}
}
}
public void test01() {
@@ -291,7 +294,7 @@ private static class UserInteractionState {
*/
@Scheduled(cron = "10 * * * * ?")
public void sendOrderToWx() {
long start = System.currentTimeMillis();
//long start = System.currentTimeMillis();
int[] validCodes = {-1};
// 只要三个月的,更多的也刷新不出来的
Date threeMonthsAgo = Date.from(LocalDateTime.now().minusMonths(3).atZone(ZoneId.systemDefault()).toInstant());
@@ -303,7 +306,7 @@ private static class UserInteractionState {
}
logger.info("扫描订单发送到微信耗时:{} ms, 订单数:{} ", System.currentTimeMillis() - start, orderRows.size());
//logger.info("扫描订单发送到微信耗时:{} ms, 订单数:{} ", System.currentTimeMillis() - start, orderRows.size());
}
@@ -620,7 +623,8 @@ private static class UserInteractionState {
}
// 打印方法调用和开始结束时间
if (isRealTime) {
if (isRealTime && (LocalDateTime.now().getMinute() % 10 == 0)) {
logger.info(" {} --- 拉取订单, 分钟还是秒 {} , 开始时间:{} --- 结束时间:{}", appKey.substring(appKey.length() - 4), hourMinuteTag, startTime.format(DATE_TIME_FORMATTER), endTime.format(DATE_TIME_FORMATTER));
}

View File

@@ -145,7 +145,7 @@ public class WXUtil {
//if (wxid.equals(super_admin_wxid) || fromwxid.equals(super_admin_wxid)) {
// content = "超管: 凡神 \r\n" + content;
//}
List<String> strings = splitStringByLength(content, 2048);
List<String> strings = splitStringByLength(content, 4096);
int count = 1;
for (String string : strings) {

View File

@@ -2,7 +2,6 @@ package cn.van.business.util;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson2.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,7 +32,7 @@ public class WxtsUtil {
content = common + content + "<br><br>";
paramMap.put("text", content);
HttpResponse execute = HttpRequest.post(url).header("vanToken", TOKEN).header("source", "XZJ_UBUNTU").body(JSON.toJSONString(paramMap)).execute();
logger.info("企业微信推送结果:{}", execute);
//logger.info("企业微信推送结果:{}", execute);
} catch (Exception e) {
logger.error("企业微信推送失败:{}", e.getMessage());
}
@@ -41,4 +40,13 @@ public class WxtsUtil {
}
// 添加分级告警方法
public void sendCriticalAlert(String title, String content) {
String formattedMsg = String.format("[CRITICAL] %s\n%s", title, content);
// 这里调用实际的通知渠道,例如:
// - 发送邮件
// - 调用企业微信机器人
// - 触发短信通知
sendNotify(formattedMsg); // 复用原有通知方法
}
}

View File

@@ -61,3 +61,21 @@ rocketmq:
consume-thread-min: 20 # 消费线程池最小线程数
consume-thread-max: 64 # 消费线程池最大线程数
consume-message-batch-max-size: 64 # 批量消费最大消息数
management:
endpoints:
web:
exposure:
include: health,metrics,resilience4j
prometheus:
metrics:
export:
enabled: true
resilience4j.ratelimiter:
instances:
wxMsgLimiter:
limitForPeriod: 10 # 根据业务吞吐量调整
limitRefreshPeriod: 1s # 固定1秒周期
timeoutDuration: 0 # 立即失败模式
registerHealthIndicator: true