This commit is contained in:
van
2026-04-01 16:59:29 +08:00
parent 9ae74c999e
commit 75d7c8e6de
6 changed files with 108 additions and 49 deletions

View File

@@ -202,6 +202,8 @@ jarvis:
base-url: http://192.168.8.88:5001 base-url: http://192.168.8.88:5001
fetch-path: /fetch_logistics fetch-path: /fetch_logistics
health-path: /health health-path: /health
# 每次定时任务最多处理多少条企微分享链待队列RPUSH 入队、LPOP 出队)
adhoc-pending-batch-size: 50
# 获取评论接口服务地址(后端转发,避免前端跨域) # 获取评论接口服务地址(后端转发,避免前端跨域)
fetch-comments: fetch-comments:
base-url: http://192.168.8.60:5008 base-url: http://192.168.8.60:5008

View File

@@ -202,6 +202,7 @@ jarvis:
base-url: http://127.0.0.1:5001 base-url: http://127.0.0.1:5001
fetch-path: /fetch_logistics fetch-path: /fetch_logistics
health-path: /health health-path: /health
adhoc-pending-batch-size: 50
# 获取评论接口服务地址(后端转发) # 获取评论接口服务地址(后端转发)
fetch-comments: fetch-comments:
base-url: http://192.168.8.60:5008 base-url: http://192.168.8.60:5008

View File

@@ -18,6 +18,16 @@ public interface ILogisticsService {
*/ */
boolean fetchLogisticsByShareLinkAndPush(String trackingUrl, String remark, String touser); boolean fetchLogisticsByShareLinkAndPush(String trackingUrl, String remark, String touser);
/**
* 企微备注提交后仅入队,由 {@link com.ruoyi.jarvis.task.LogisticsScanTask} 与订单扫描一并拉取物流,避免阻塞 HTTP 回调。
*/
void enqueueShareLinkForScan(String trackingUrl, String remark, String touser);
/**
* 定时任务内:依次弹出队列并调用 {@link #fetchLogisticsByShareLinkAndPush}。
*/
void drainPendingShareLinkQueue();
/** /**
* 检查订单是否已处理过Redis中是否有运单号 * 检查订单是否已处理过Redis中是否有运单号
* @param orderId 订单ID * @param orderId 订单ID

View File

@@ -35,6 +35,8 @@ public class LogisticsServiceImpl implements ILogisticsService {
private static final String REDIS_LOCK_KEY_PREFIX = "logistics:lock:order:"; private static final String REDIS_LOCK_KEY_PREFIX = "logistics:lock:order:";
private static final String REDIS_HEALTH_CHECK_ALERT_KEY = "logistics:health:alert:"; private static final String REDIS_HEALTH_CHECK_ALERT_KEY = "logistics:health:alert:";
private static final String REDIS_ADHOC_WAYBILL_PREFIX = "logistics:adhoc:waybill:"; private static final String REDIS_ADHOC_WAYBILL_PREFIX = "logistics:adhoc:waybill:";
/** 企微分享链+备注入队FIFORPUSH 入队、LPOP 出队 */
private static final String REDIS_ADHOC_PENDING_QUEUE = "logistics:adhoc:pending:queue";
private static final String PUSH_URL = "https://wxts.van333.cn/wx/send/pdd"; private static final String PUSH_URL = "https://wxts.van333.cn/wx/send/pdd";
private static final String PUSH_TOKEN = "super_token_b62190c26"; private static final String PUSH_TOKEN = "super_token_b62190c26";
private static final String CONFIG_KEY_PREFIX = "logistics.push.touser."; private static final String CONFIG_KEY_PREFIX = "logistics.push.touser.";
@@ -50,6 +52,9 @@ public class LogisticsServiceImpl implements ILogisticsService {
@Value("${jarvis.server.logistics.health-path:/health}") @Value("${jarvis.server.logistics.health-path:/health}")
private String logisticsHealthPath; private String logisticsHealthPath;
@Value("${jarvis.server.logistics.adhoc-pending-batch-size:50}")
private int adhocPendingBatchSize;
private String externalApiUrlTemplate; private String externalApiUrlTemplate;
private String healthCheckUrl; private String healthCheckUrl;
@@ -423,6 +428,43 @@ public class LogisticsServiceImpl implements ILogisticsService {
} }
} }
@Override
public void enqueueShareLinkForScan(String trackingUrl, String remark, String touser) {
if (!StringUtils.hasText(trackingUrl)) {
logger.warn("adhoc 入队跳过:分享链接为空");
return;
}
JSONObject o = new JSONObject();
o.put("trackingUrl", trackingUrl.trim());
o.put("remark", remark != null ? remark : "");
o.put("touser", touser != null ? touser : "");
stringRedisTemplate.opsForList().rightPush(REDIS_ADHOC_PENDING_QUEUE, o.toJSONString());
logger.info("adhoc 分享链接已入队待定时扫描(与订单物流任务一致) trackingUrl={} queueKey={}", trackingUrl.trim(), REDIS_ADHOC_PENDING_QUEUE);
}
@Override
public void drainPendingShareLinkQueue() {
int n = Math.max(1, adhocPendingBatchSize);
for (int i = 0; i < n; i++) {
String json = stringRedisTemplate.opsForList().leftPop(REDIS_ADHOC_PENDING_QUEUE);
if (json == null || !StringUtils.hasText(json)) {
break;
}
try {
JSONObject o = JSON.parseObject(json);
if (o == null) {
continue;
}
String url = o.getString("trackingUrl");
String remark = o.getString("remark");
String touser = o.getString("touser");
fetchLogisticsByShareLinkAndPush(url, remark, touser);
} catch (Exception e) {
logger.error("adhoc 队列项处理失败 raw={}", json.length() > 200 ? json.substring(0, 200) + "..." : json, e);
}
}
}
@Override @Override
public boolean fetchLogisticsByShareLinkAndPush(String trackingUrl, String remark, String touser) { public boolean fetchLogisticsByShareLinkAndPush(String trackingUrl, String remark, String touser) {
if (!StringUtils.hasText(trackingUrl)) { if (!StringUtils.hasText(trackingUrl)) {

View File

@@ -82,8 +82,8 @@ public class WeComInboundServiceImpl implements IWeComInboundService {
weComChatSessionService.delete(from); weComChatSessionService.delete(from);
String touser = resolveTouser(row, isSuper); String touser = resolveTouser(row, isSuper);
log.info("企微物流会话提交备注 user={} url={} remarkLen={}", from, url, t.length()); log.info("企微物流会话提交备注 user={} url={} remarkLen={}", from, url, t.length());
boolean ok = logisticsService.fetchLogisticsByShareLinkAndPush(url, content.trim(), touser); logisticsService.enqueueShareLinkForScan(url, content.trim(), touser);
return ok ? "已查询并推送运单(接收人见超级管理员 touser" : "物流查询或推送失败,请稍后重试"; return "已登记备注物流将随定时任务查询并推送与订单物流扫描一致约每10分钟";
} }
} }

View File

@@ -13,7 +13,7 @@ import java.util.List;
/** /**
* 物流信息扫描定时任务 * 物流信息扫描定时任务
* 每20分钟扫描一次分销标记为F或PDD的订单最近30天获取物流信息并推送 * 每10分钟扫描一次分销标记为F或PDD的订单最近30天获取物流信息并推送;结束后处理企微分享链 adhoc 队列
*/ */
@Component @Component
public class LogisticsScanTask { public class LogisticsScanTask {
@@ -26,8 +26,7 @@ public class LogisticsScanTask {
private ILogisticsService logisticsService; private ILogisticsService logisticsService;
/** /**
* 定时任务:每20分钟执行一次 * 定时任务:每10分钟执行一次(与 @Scheduled 中 cron 一致)
* Cron表达式格式0 每N分钟 * * * ? 表示每N分钟执行一次
* 只扫描最近30天的订单 * 只扫描最近30天的订单
*/ */
@Scheduled(cron = "0 */10 * * * ?") @Scheduled(cron = "0 */10 * * * ?")
@@ -40,62 +39,67 @@ public class LogisticsScanTask {
if (orders == null || orders.isEmpty()) { if (orders == null || orders.isEmpty()) {
logger.info("未找到需要处理的订单"); logger.info("未找到需要处理的订单");
return; } else {
} logger.info("找到 {} 个需要处理的订单", orders.size());
logger.info("找到 {} 个需要处理的订单", orders.size()); int processedCount = 0;
int skippedCount = 0;
int successCount = 0;
int failedCount = 0;
int processedCount = 0; // 串行处理订单(避免并发调用接口)
int skippedCount = 0; for (JDOrder order : orders) {
int successCount = 0; try {
int failedCount = 0; // 检查Redis中是否已处理过避免重复处理
if (logisticsService.isOrderProcessed(order.getId())) {
//logger.debug("订单已处理过,跳过 - 订单ID: {}", order.getId());
skippedCount++;
continue;
}
// 串行处理订单(避免并发调用接口) logger.info("开始处理订单 - 订单ID: {}, 订单号: {}, 分销标识: {}",
for (JDOrder order : orders) { order.getId(), order.getOrderId(), order.getDistributionMark());
try {
// 检查Redis中是否已处理过避免重复处理
if (logisticsService.isOrderProcessed(order.getId())) {
//logger.debug("订单已处理过,跳过 - 订单ID: {}", order.getId());
skippedCount++;
continue;
}
logger.info("开始处理订单 - 订单ID: {}, 订单号: {}, 分销标识: {}", // 获取物流信息并推送(串行执行,不并发)
order.getId(), order.getOrderId(), order.getDistributionMark()); boolean success = logisticsService.fetchLogisticsAndPush(order);
// 获取物流信息并推送(串行执行,不并发) if (success) {
boolean success = logisticsService.fetchLogisticsAndPush(order); successCount++;
logger.info("订单处理成功 - 订单ID: {}, 订单号: {}", order.getId(), order.getOrderId());
} else {
failedCount++;
logger.warn("订单处理失败 - 订单ID: {}, 订单号: {}", order.getId(), order.getOrderId());
}
if (success) { processedCount++;
successCount++;
logger.info("订单处理成功 - 订单ID: {}, 订单号: {}", order.getId(), order.getOrderId()); // 添加短暂延迟,避免请求过于频繁
} else { Thread.sleep(500); // 每次请求间隔500毫秒
} catch (InterruptedException e) {
logger.error("定时任务被中断", e);
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
failedCount++; failedCount++;
logger.warn("订单处理失败 - 订单ID: {}, 订单号: {}", order.getId(), order.getOrderId()); logger.error("处理订单时发生异常 - 订单ID: {}, 错误: {}", order.getId(), e.getMessage(), e);
// 继续处理下一个订单
} }
processedCount++;
// 添加短暂延迟,避免请求过于频繁
Thread.sleep(500); // 每次请求间隔500毫秒
} catch (InterruptedException e) {
logger.error("定时任务被中断", e);
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
failedCount++;
logger.error("处理订单时发生异常 - 订单ID: {}, 错误: {}", order.getId(), e.getMessage(), e);
// 继续处理下一个订单
} }
logger.info("========== 物流信息扫描定时任务执行完成 ==========");
logger.info("总订单数: {}, 已处理: {}, 跳过: {}, 成功: {}, 失败: {}",
orders.size(), processedCount, skippedCount, successCount, failedCount);
} }
logger.info("========== 物流信息扫描定时任务执行完成 ==========");
logger.info("总订单数: {}, 已处理: {}, 跳过: {}, 成功: {}, 失败: {}",
orders.size(), processedCount, skippedCount, successCount, failedCount);
} catch (Exception e) { } catch (Exception e) {
logger.error("执行物流信息扫描定时任务时发生异常", e); logger.error("执行物流信息扫描定时任务时发生异常", e);
} finally {
try {
logger.info("---------- 处理企微分享链物流待队列adhoc ----------");
logisticsService.drainPendingShareLinkQueue();
} catch (Exception e) {
logger.error("企微分享链 adhoc 队列处理异常", e);
}
} }
} }
} }