diff --git a/ruoyi-admin/src/main/resources/application-dev.yml b/ruoyi-admin/src/main/resources/application-dev.yml index b12a519..842da4a 100644 --- a/ruoyi-admin/src/main/resources/application-dev.yml +++ b/ruoyi-admin/src/main/resources/application-dev.yml @@ -202,6 +202,8 @@ jarvis: base-url: http://192.168.8.88:5001 fetch-path: /fetch_logistics health-path: /health + # 每次定时任务最多处理多少条企微分享链待队列(RPUSH 入队、LPOP 出队) + adhoc-pending-batch-size: 50 # 获取评论接口服务地址(后端转发,避免前端跨域) fetch-comments: base-url: http://192.168.8.60:5008 diff --git a/ruoyi-admin/src/main/resources/application-prod.yml b/ruoyi-admin/src/main/resources/application-prod.yml index daa637f..823d483 100644 --- a/ruoyi-admin/src/main/resources/application-prod.yml +++ b/ruoyi-admin/src/main/resources/application-prod.yml @@ -202,6 +202,7 @@ jarvis: base-url: http://127.0.0.1:5001 fetch-path: /fetch_logistics health-path: /health + adhoc-pending-batch-size: 50 # 获取评论接口服务地址(后端转发) fetch-comments: base-url: http://192.168.8.60:5008 diff --git a/ruoyi-system/src/main/java/com/ruoyi/jarvis/service/ILogisticsService.java b/ruoyi-system/src/main/java/com/ruoyi/jarvis/service/ILogisticsService.java index a86ae34..681712a 100644 --- a/ruoyi-system/src/main/java/com/ruoyi/jarvis/service/ILogisticsService.java +++ b/ruoyi-system/src/main/java/com/ruoyi/jarvis/service/ILogisticsService.java @@ -17,6 +17,16 @@ public interface ILogisticsService { * 分享类京东物流短链:查运单并通过 wxts 推送到指定 touser(不依赖订单表) */ boolean fetchLogisticsByShareLinkAndPush(String trackingUrl, String remark, String touser); + + /** + * 企微备注提交后仅入队,由 {@link com.ruoyi.jarvis.task.LogisticsScanTask} 与订单扫描一并拉取物流,避免阻塞 HTTP 回调。 + */ + void enqueueShareLinkForScan(String trackingUrl, String remark, String touser); + + /** + * 定时任务内:依次弹出队列并调用 {@link #fetchLogisticsByShareLinkAndPush}。 + */ + void drainPendingShareLinkQueue(); /** * 检查订单是否已处理过(Redis中是否有运单号) diff --git a/ruoyi-system/src/main/java/com/ruoyi/jarvis/service/impl/LogisticsServiceImpl.java b/ruoyi-system/src/main/java/com/ruoyi/jarvis/service/impl/LogisticsServiceImpl.java index 3d3b124..7ab7de0 100644 --- a/ruoyi-system/src/main/java/com/ruoyi/jarvis/service/impl/LogisticsServiceImpl.java +++ b/ruoyi-system/src/main/java/com/ruoyi/jarvis/service/impl/LogisticsServiceImpl.java @@ -35,6 +35,8 @@ public class LogisticsServiceImpl implements ILogisticsService { private static final String REDIS_LOCK_KEY_PREFIX = "logistics:lock:order:"; private static final String REDIS_HEALTH_CHECK_ALERT_KEY = "logistics:health:alert:"; private static final String REDIS_ADHOC_WAYBILL_PREFIX = "logistics:adhoc:waybill:"; + /** 企微分享链+备注入队,FIFO:RPUSH 入队、LPOP 出队 */ + private static final String REDIS_ADHOC_PENDING_QUEUE = "logistics:adhoc:pending:queue"; private static final String PUSH_URL = "https://wxts.van333.cn/wx/send/pdd"; private static final String PUSH_TOKEN = "super_token_b62190c26"; private static final String CONFIG_KEY_PREFIX = "logistics.push.touser."; @@ -49,6 +51,9 @@ public class LogisticsServiceImpl implements ILogisticsService { @Value("${jarvis.server.logistics.health-path:/health}") private String logisticsHealthPath; + + @Value("${jarvis.server.logistics.adhoc-pending-batch-size:50}") + private int adhocPendingBatchSize; private String externalApiUrlTemplate; private String healthCheckUrl; @@ -423,6 +428,43 @@ public class LogisticsServiceImpl implements ILogisticsService { } } + @Override + public void enqueueShareLinkForScan(String trackingUrl, String remark, String touser) { + if (!StringUtils.hasText(trackingUrl)) { + logger.warn("adhoc 入队跳过:分享链接为空"); + return; + } + JSONObject o = new JSONObject(); + o.put("trackingUrl", trackingUrl.trim()); + o.put("remark", remark != null ? remark : ""); + o.put("touser", touser != null ? touser : ""); + stringRedisTemplate.opsForList().rightPush(REDIS_ADHOC_PENDING_QUEUE, o.toJSONString()); + logger.info("adhoc 分享链接已入队待定时扫描(与订单物流任务一致) trackingUrl={} queueKey={}", trackingUrl.trim(), REDIS_ADHOC_PENDING_QUEUE); + } + + @Override + public void drainPendingShareLinkQueue() { + int n = Math.max(1, adhocPendingBatchSize); + for (int i = 0; i < n; i++) { + String json = stringRedisTemplate.opsForList().leftPop(REDIS_ADHOC_PENDING_QUEUE); + if (json == null || !StringUtils.hasText(json)) { + break; + } + try { + JSONObject o = JSON.parseObject(json); + if (o == null) { + continue; + } + String url = o.getString("trackingUrl"); + String remark = o.getString("remark"); + String touser = o.getString("touser"); + fetchLogisticsByShareLinkAndPush(url, remark, touser); + } catch (Exception e) { + logger.error("adhoc 队列项处理失败 raw={}", json.length() > 200 ? json.substring(0, 200) + "..." : json, e); + } + } + } + @Override public boolean fetchLogisticsByShareLinkAndPush(String trackingUrl, String remark, String touser) { if (!StringUtils.hasText(trackingUrl)) { diff --git a/ruoyi-system/src/main/java/com/ruoyi/jarvis/service/impl/WeComInboundServiceImpl.java b/ruoyi-system/src/main/java/com/ruoyi/jarvis/service/impl/WeComInboundServiceImpl.java index f078b72..6daaa78 100644 --- a/ruoyi-system/src/main/java/com/ruoyi/jarvis/service/impl/WeComInboundServiceImpl.java +++ b/ruoyi-system/src/main/java/com/ruoyi/jarvis/service/impl/WeComInboundServiceImpl.java @@ -82,8 +82,8 @@ public class WeComInboundServiceImpl implements IWeComInboundService { weComChatSessionService.delete(from); String touser = resolveTouser(row, isSuper); log.info("企微物流会话提交备注 user={} url={} remarkLen={}", from, url, t.length()); - boolean ok = logisticsService.fetchLogisticsByShareLinkAndPush(url, content.trim(), touser); - return ok ? "已查询并推送运单(接收人见超级管理员 touser)" : "物流查询或推送失败,请稍后重试"; + logisticsService.enqueueShareLinkForScan(url, content.trim(), touser); + return "已登记备注,物流将随定时任务查询并推送(与订单物流扫描一致,约每10分钟)"; } } diff --git a/ruoyi-system/src/main/java/com/ruoyi/jarvis/task/LogisticsScanTask.java b/ruoyi-system/src/main/java/com/ruoyi/jarvis/task/LogisticsScanTask.java index fc43338..c1319a9 100644 --- a/ruoyi-system/src/main/java/com/ruoyi/jarvis/task/LogisticsScanTask.java +++ b/ruoyi-system/src/main/java/com/ruoyi/jarvis/task/LogisticsScanTask.java @@ -13,7 +13,7 @@ import java.util.List; /** * 物流信息扫描定时任务 - * 每20分钟扫描一次分销标记为F或PDD的订单(最近30天),获取物流信息并推送 + * 每10分钟扫描一次分销标记为F或PDD的订单(最近30天),获取物流信息并推送;结束后处理企微分享链 adhoc 队列 */ @Component public class LogisticsScanTask { @@ -26,8 +26,7 @@ public class LogisticsScanTask { private ILogisticsService logisticsService; /** - * 定时任务:每20分钟执行一次 - * Cron表达式格式:0 每N分钟 * * * ? 表示每N分钟执行一次 + * 定时任务:每10分钟执行一次(与 @Scheduled 中 cron 一致) * 只扫描最近30天的订单 */ @Scheduled(cron = "0 */10 * * * ?") @@ -40,62 +39,67 @@ public class LogisticsScanTask { if (orders == null || orders.isEmpty()) { logger.info("未找到需要处理的订单"); - return; - } + } else { + logger.info("找到 {} 个需要处理的订单", orders.size()); - logger.info("找到 {} 个需要处理的订单", orders.size()); + int processedCount = 0; + int skippedCount = 0; + int successCount = 0; + int failedCount = 0; - int processedCount = 0; - int skippedCount = 0; - int successCount = 0; - int failedCount = 0; + // 串行处理订单(避免并发调用接口) + for (JDOrder order : orders) { + try { + // 检查Redis中是否已处理过(避免重复处理) + if (logisticsService.isOrderProcessed(order.getId())) { + //logger.debug("订单已处理过,跳过 - 订单ID: {}", order.getId()); + skippedCount++; + continue; + } - // 串行处理订单(避免并发调用接口) - for (JDOrder order : orders) { - try { - // 检查Redis中是否已处理过(避免重复处理) - if (logisticsService.isOrderProcessed(order.getId())) { - //logger.debug("订单已处理过,跳过 - 订单ID: {}", order.getId()); - skippedCount++; - continue; - } + logger.info("开始处理订单 - 订单ID: {}, 订单号: {}, 分销标识: {}", + order.getId(), order.getOrderId(), order.getDistributionMark()); - logger.info("开始处理订单 - 订单ID: {}, 订单号: {}, 分销标识: {}", - order.getId(), order.getOrderId(), order.getDistributionMark()); + // 获取物流信息并推送(串行执行,不并发) + boolean success = logisticsService.fetchLogisticsAndPush(order); - // 获取物流信息并推送(串行执行,不并发) - boolean success = logisticsService.fetchLogisticsAndPush(order); + if (success) { + successCount++; + logger.info("订单处理成功 - 订单ID: {}, 订单号: {}", order.getId(), order.getOrderId()); + } else { + failedCount++; + logger.warn("订单处理失败 - 订单ID: {}, 订单号: {}", order.getId(), order.getOrderId()); + } - if (success) { - successCount++; - logger.info("订单处理成功 - 订单ID: {}, 订单号: {}", order.getId(), order.getOrderId()); - } else { + processedCount++; + + // 添加短暂延迟,避免请求过于频繁 + Thread.sleep(500); // 每次请求间隔500毫秒 + + } catch (InterruptedException e) { + logger.error("定时任务被中断", e); + Thread.currentThread().interrupt(); + break; + } catch (Exception e) { failedCount++; - logger.warn("订单处理失败 - 订单ID: {}, 订单号: {}", order.getId(), order.getOrderId()); + logger.error("处理订单时发生异常 - 订单ID: {}, 错误: {}", order.getId(), e.getMessage(), e); + // 继续处理下一个订单 } - - processedCount++; - - // 添加短暂延迟,避免请求过于频繁 - Thread.sleep(500); // 每次请求间隔500毫秒 - - } catch (InterruptedException e) { - logger.error("定时任务被中断", e); - Thread.currentThread().interrupt(); - break; - } catch (Exception e) { - failedCount++; - logger.error("处理订单时发生异常 - 订单ID: {}, 错误: {}", order.getId(), e.getMessage(), e); - // 继续处理下一个订单 } + + logger.info("========== 物流信息扫描定时任务执行完成 =========="); + logger.info("总订单数: {}, 已处理: {}, 跳过: {}, 成功: {}, 失败: {}", + orders.size(), processedCount, skippedCount, successCount, failedCount); } - - logger.info("========== 物流信息扫描定时任务执行完成 =========="); - logger.info("总订单数: {}, 已处理: {}, 跳过: {}, 成功: {}, 失败: {}", - orders.size(), processedCount, skippedCount, successCount, failedCount); - } catch (Exception e) { logger.error("执行物流信息扫描定时任务时发生异常", e); + } finally { + try { + logger.info("---------- 处理企微分享链物流待队列(adhoc) ----------"); + logisticsService.drainPendingShareLinkQueue(); + } catch (Exception e) { + logger.error("企微分享链 adhoc 队列处理异常", e); + } } } }