diff --git a/ruoyi-admin/src/main/java/com/ruoyi/web/controller/jarvis/WeComShareLinkLogisticsJobController.java b/ruoyi-admin/src/main/java/com/ruoyi/web/controller/jarvis/WeComShareLinkLogisticsJobController.java index b1c4fe7..fb5bf49 100644 --- a/ruoyi-admin/src/main/java/com/ruoyi/web/controller/jarvis/WeComShareLinkLogisticsJobController.java +++ b/ruoyi-admin/src/main/java/com/ruoyi/web/controller/jarvis/WeComShareLinkLogisticsJobController.java @@ -79,15 +79,16 @@ public class WeComShareLinkLogisticsJobController extends BaseController { job.getTrackingUrl(), remark, touser); data.put("jobKey", jobKey); - int nextAttempts = job.getScanAttempts() == null ? 1 : job.getScanAttempts() + 1; + int successAttempts = job.getScanAttempts() == null ? 1 : job.getScanAttempts() + 1; String adhocNote = data.get("adhocNote") != null ? data.get("adhocNote").toString() : ""; String note = "manual:" + adhocNote; if (Boolean.TRUE.equals(data.get("terminalSuccess"))) { String wb = data.get("waybillNo") != null ? data.get("waybillNo").toString() : null; - weComShareLinkLogisticsJobMapper.updateByJobKey(jobKey, "PUSHED", note, nextAttempts, + weComShareLinkLogisticsJobMapper.updateByJobKey(jobKey, "PUSHED", note, successAttempts, StringUtils.hasText(wb) ? wb : null); } else { - weComShareLinkLogisticsJobMapper.updateByJobKey(jobKey, "WAITING", note, nextAttempts, null); + /* 失败仍走自动队列时,不得垫高 scan_attempts,否则 Redis attempts 与定时 drain 上限错位,未超限也会被放弃 */ + weComShareLinkLogisticsJobMapper.updateByJobKey(jobKey, "WAITING", note, job.getScanAttempts(), null); WeComShareLinkLogisticsJob refreshed = weComShareLinkLogisticsJobService.selectByJobKey(jobKey); if (refreshed != null) { logisticsService.pushShareLinkJobToRedis(refreshed); diff --git a/ruoyi-system/src/main/java/com/ruoyi/jarvis/mapper/WeComShareLinkLogisticsJobMapper.java b/ruoyi-system/src/main/java/com/ruoyi/jarvis/mapper/WeComShareLinkLogisticsJobMapper.java index d64f537..cda8778 100644 --- a/ruoyi-system/src/main/java/com/ruoyi/jarvis/mapper/WeComShareLinkLogisticsJobMapper.java +++ b/ruoyi-system/src/main/java/com/ruoyi/jarvis/mapper/WeComShareLinkLogisticsJobMapper.java @@ -18,4 +18,10 @@ public interface WeComShareLinkLogisticsJobMapper { WeComShareLinkLogisticsJob selectByJobKey(String jobKey); List selectWeComShareLinkLogisticsJobList(WeComShareLinkLogisticsJob query); + + /** + * 待自动扫描但可能未在 Redis 队列中的任务(如纯 SQL 补录的 IMPORTED、入队失败后的 PENDING 等)。 + * 仅 {@code create_time} 在最近一个月内的记录,避免扫到过旧历史。 + */ + List selectJobsNeedingQueueReconcile(@Param("limit") int limit); } diff --git a/ruoyi-system/src/main/java/com/ruoyi/jarvis/service/ILogisticsService.java b/ruoyi-system/src/main/java/com/ruoyi/jarvis/service/ILogisticsService.java index 55ddd9c..282051f 100644 --- a/ruoyi-system/src/main/java/com/ruoyi/jarvis/service/ILogisticsService.java +++ b/ruoyi-system/src/main/java/com/ruoyi/jarvis/service/ILogisticsService.java @@ -30,10 +30,17 @@ public interface ILogisticsService { /** * 将已落库任务按与 {@link #enqueueShareLinkForScan} 相同的 JSON 格式推入 Redis(补录、手动失败后重入队等)。 - * {@code attempts} 取自 {@link WeComShareLinkLogisticsJob#getScanAttempts()},与 drain 重试计数对齐。 + * Redis 项中的 attempts 与定时 drain 重试上限对齐,取自 {@link WeComShareLinkLogisticsJob#getScanAttempts()}(仅由 drain 回写递增;管理端手动拉取失败不会垫高该计数)。 */ void pushShareLinkJobToRedis(WeComShareLinkLogisticsJob job); + /** + * 扫描落库表中仍待处理的任务({@code create_time} 一个月内),在限频下补入 Redis(覆盖纯 SQL 补录、Redis 曾丢失等),避免仅靠企微回调入队。 + * + * @return 本次获得限频锁并成功推入队列的条数 + */ + int reconcileShareLinkJobsIntoPendingQueue(); + /** * 定时任务内:依次弹出队列并调用 {@link #fetchLogisticsByShareLinkAndPush}。 * diff --git a/ruoyi-system/src/main/java/com/ruoyi/jarvis/service/impl/LogisticsServiceImpl.java b/ruoyi-system/src/main/java/com/ruoyi/jarvis/service/impl/LogisticsServiceImpl.java index c936966..e8607d5 100644 --- a/ruoyi-system/src/main/java/com/ruoyi/jarvis/service/impl/LogisticsServiceImpl.java +++ b/ruoyi-system/src/main/java/com/ruoyi/jarvis/service/impl/LogisticsServiceImpl.java @@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets; import java.util.Calendar; import java.util.Date; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -42,6 +43,10 @@ public class LogisticsServiceImpl implements ILogisticsService { private static final String REDIS_ADHOC_WAYBILL_PREFIX = "logistics:adhoc:waybill:"; /** 企微分享链+备注入队,FIFO:RPUSH 入队、LPOP 出队 */ private static final String REDIS_ADHOC_PENDING_QUEUE = "logistics:adhoc:pending:queue"; + /** 对账补入队限频:避免每轮定时任务对同一 jobKey 重复 RPUSH */ + private static final String REDIS_ADHOC_RECONCILE_ENQUEUE_PREFIX = "logistics:adhoc:reconcile:enqueue:"; + private static final int ADHOC_RECONCILE_ROW_LIMIT = 200; + private static final long ADHOC_RECONCILE_ENQUEUE_LOCK_SECONDS = 8 * 60; /** 单次队列项最多重新入队次数(约对应多天 × 每 10 分钟一轮) */ private static final int ADHOC_MAX_REQUEUE_ATTEMPTS = 500; private static final String PUSH_URL = "https://wxts.van333.cn/wx/send/pdd"; @@ -472,6 +477,42 @@ public class LogisticsServiceImpl implements ILogisticsService { job.getUserRemark(), job.getTouserPush(), job.getFromUserName()); } + @Override + public int reconcileShareLinkJobsIntoPendingQueue() { + int pushed = 0; + try { + List rows = weComShareLinkLogisticsJobMapper.selectJobsNeedingQueueReconcile( + ADHOC_RECONCILE_ROW_LIMIT); + if (rows == null || rows.isEmpty()) { + return 0; + } + for (WeComShareLinkLogisticsJob job : rows) { + if (job == null || !StringUtils.hasText(job.getJobKey())) { + continue; + } + String lockKey = REDIS_ADHOC_RECONCILE_ENQUEUE_PREFIX + job.getJobKey().trim(); + Boolean gotLock = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1", + ADHOC_RECONCILE_ENQUEUE_LOCK_SECONDS, TimeUnit.SECONDS); + if (!Boolean.TRUE.equals(gotLock)) { + continue; + } + try { + pushShareLinkJobToRedis(job); + pushed++; + } catch (Exception e) { + try { + stringRedisTemplate.delete(lockKey); + } catch (Exception ignored) { + } + logger.warn("adhoc 对账入队失败 jobKey={} err={}", job.getJobKey(), e.toString()); + } + } + } catch (Exception e) { + logger.error("adhoc 分享链对账入队异常", e); + } + return pushed; + } + /** 与 drain 解析字段一致:jobKey、attempts、trackingUrl、remark、touser、fromWecom */ private void rightPushAdhocQueueJson(String jobKey, int attempts, String trackingUrl, String remark, String touser, String fromWecom) { @@ -508,18 +549,20 @@ public class LogisticsServiceImpl implements ILogisticsService { int attempts = o.getIntValue("attempts"); AdhocTryResult tr = tryAdhocShareLinkOnce(url, remark, touser, null); if (tr.needsRequeue) { - if (attempts >= ADHOC_MAX_REQUEUE_ATTEMPTS) { + int nextAttempts = attempts + 1; + if (nextAttempts > ADHOC_MAX_REQUEUE_ATTEMPTS) { logger.warn("adhoc 已达最大重试次数 {},放弃 jobKey={} url={} note={}", ADHOC_MAX_REQUEUE_ATTEMPTS, jobKey, url, tr.note); - touchShareLinkJobRow(jobKey, "ABANDONED", tr.note, attempts + 1, null); + touchShareLinkJobRow(jobKey, "ABANDONED", tr.note, nextAttempts, null); continue; } - o.put("attempts", attempts + 1); + o.put("attempts", nextAttempts); stringRedisTemplate.opsForList().rightPush(REDIS_ADHOC_PENDING_QUEUE, o.toJSONString()); - touchShareLinkJobRow(jobKey, "WAITING", tr.note, attempts + 1, null); + touchShareLinkJobRow(jobKey, "WAITING", tr.note, nextAttempts, null); } else { + int doneAttempts = attempts + 1; touchShareLinkJobRow(jobKey, "PUSHED", - tr.note, attempts + 1, StringUtils.hasText(tr.waybillNo) ? tr.waybillNo : null); + tr.note, doneAttempts, StringUtils.hasText(tr.waybillNo) ? tr.waybillNo : null); } } catch (Exception e) { logger.error("adhoc 队列项处理失败 raw={}", json.length() > 200 ? json.substring(0, 200) + "..." : json, e); diff --git a/ruoyi-system/src/main/java/com/ruoyi/jarvis/task/LogisticsScanTask.java b/ruoyi-system/src/main/java/com/ruoyi/jarvis/task/LogisticsScanTask.java index 4e15718..ad8c893 100644 --- a/ruoyi-system/src/main/java/com/ruoyi/jarvis/task/LogisticsScanTask.java +++ b/ruoyi-system/src/main/java/com/ruoyi/jarvis/task/LogisticsScanTask.java @@ -38,6 +38,7 @@ public class LogisticsScanTask { int orderSuccess = 0; int orderFailed = 0; int adhocQueuePopped = 0; + int adhocReconciled = 0; logger.info("========== 物流扫描定时任务开始(订单 F/PDD + 企微分享链队列) =========="); @@ -83,6 +84,13 @@ public class LogisticsScanTask { } catch (Exception e) { logger.error("订单扫描阶段异常", e); } finally { + try { + logger.info("---------- 企微分享链 adhoc:落库对账入队(IMPORTED/PENDING/WAITING 等) ----------"); + adhocReconciled = logisticsService.reconcileShareLinkJobsIntoPendingQueue(); + logger.info("企微分享链 adhoc:对账入队 {} 条(余者可能尚在限频窗口内)", adhocReconciled); + } catch (Exception e) { + logger.error("企微分享链 adhoc 对账入队异常", e); + } try { logger.info("---------- 企微分享链 adhoc 队列:开始 drain ----------"); adhocQueuePopped = logisticsService.drainPendingShareLinkQueue(); @@ -95,7 +103,8 @@ public class LogisticsScanTask { logger.info("========== 物流扫描任务统计(本轮) =========="); logger.info("订单:候选 {} 条 | 跳过(Redis已处理) {} | 本次已拉取处理 {} | 成功 {} | 失败或未推送 {}", orderCandidates, orderSkipped, orderProcessed, orderSuccess, orderFailed); - logger.info("企微分享链队列:Redis 弹出并处理 {} 条(未出单等情况会重入队,与逐条日志一致)", adhocQueuePopped); + logger.info("企微分享链队列:对账入队 {} 条 | Redis 弹出并处理 {} 条(未出单等情况会重入队,与逐条日志一致)", + adhocReconciled, adhocQueuePopped); logger.info("总耗时 {} ms", elapsed); logger.info("=========================================="); } diff --git a/ruoyi-system/src/main/resources/mapper/jarvis/WeComShareLinkLogisticsJobMapper.xml b/ruoyi-system/src/main/resources/mapper/jarvis/WeComShareLinkLogisticsJobMapper.xml index c3293ea..5bc829f 100644 --- a/ruoyi-system/src/main/resources/mapper/jarvis/WeComShareLinkLogisticsJobMapper.xml +++ b/ruoyi-system/src/main/resources/mapper/jarvis/WeComShareLinkLogisticsJobMapper.xml @@ -69,4 +69,13 @@ order by id desc + +