package com.ruoyi.jarvis.task; import com.ruoyi.jarvis.domain.JDOrder; import com.ruoyi.jarvis.service.IJDOrderService; import com.ruoyi.jarvis.service.ILogisticsService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; /** * 物流信息扫描定时任务 * 按配置周期(默认每 20 分钟)扫描分销标记为 F/PDD 等的订单(最近 30 天),拉物流并推送; * 结束后处理企微分享链 adhoc 队列。 */ @Component public class LogisticsScanTask { private static final Logger logger = LoggerFactory.getLogger(LogisticsScanTask.class); @Resource private IJDOrderService jdOrderService; @Resource private ILogisticsService logisticsService; /** 每条订单处理后的间隔(毫秒),减轻下游物流 HTTP 压力;0 表示不睡眠 */ @Value("${jarvis.server.logistics.scan.order-delay-ms:250}") private long orderDelayMs; /** 单轮最多处理的订单数,0 表示不限制(候选很多时可限制单轮耗时) */ @Value("${jarvis.server.logistics.scan.max-orders-per-round:0}") private int maxOrdersPerRound; /** * 只扫描最近 30 天的订单(SQL 固定);周期与单轮上限见 jarvis.server.logistics.scan.* */ @Scheduled(cron = "${jarvis.server.logistics.scan.cron:0 */20 * * * ?}") public void scanAndFetchLogistics() { long t0 = System.currentTimeMillis(); int orderCandidates = 0; int orderProcessed = 0; int orderSkipped = 0; int orderSuccess = 0; int orderFailed = 0; int adhocQueuePopped = 0; int adhocReconciled = 0; logger.info("========== 物流扫描定时任务开始(订单 F/PDD + 企微分享链队列) =========="); try { List orders = jdOrderService.selectJDOrderListByDistributionMarkFOrPDD(); if (orders == null || orders.isEmpty()) { logger.info("订单扫描:候选 0 条(最近30天 F/PDD 有物流链)"); } else { int totalFromDb = orders.size(); if (maxOrdersPerRound > 0 && orders.size() > maxOrdersPerRound) { logger.info("订单扫描:库中候选 {} 条,本轮按 max-orders-per-round={} 仅处理前 {} 条(余下轮次继续)", totalFromDb, maxOrdersPerRound, maxOrdersPerRound); orders = new ArrayList<>(orders.subList(0, maxOrdersPerRound)); } orderCandidates = orders.size(); logger.info("订单扫描:本轮处理列表 {} 条(最近30天 F/PDD 有物流链)", orderCandidates); for (JDOrder order : orders) { try { if (logisticsService.isOrderProcessed(order.getId())) { orderSkipped++; continue; } logger.debug("订单扫描:处理中 id={} orderId={} mark={}", order.getId(), order.getOrderId(), order.getDistributionMark()); boolean success = logisticsService.fetchLogisticsAndPush(order); orderProcessed++; if (success) { orderSuccess++; logger.info("订单扫描:成功 id={} orderId={}", order.getId(), order.getOrderId()); } else { orderFailed++; logger.warn("订单扫描:未成功 id={} orderId={}", order.getId(), order.getOrderId()); } if (orderDelayMs > 0) { Thread.sleep(orderDelayMs); } } catch (InterruptedException e) { logger.error("定时任务被中断", e); Thread.currentThread().interrupt(); break; } catch (Exception e) { orderFailed++; logger.error("订单扫描:异常 id={} err={}", order.getId(), e.getMessage(), e); } } } } catch (Exception e) { logger.error("订单扫描阶段异常", e); } finally { try { logger.info("---------- 企微分享链 adhoc:落库对账入队(含 ABANDONED 复位入队,一月内 PENDING/WAITING/IMPORTED 等) ----------"); adhocReconciled = logisticsService.reconcileShareLinkJobsIntoPendingQueue(); logger.info("企微分享链 adhoc:对账入队 {} 条(余者可能尚在限频窗口内)", adhocReconciled); } catch (Exception e) { logger.error("企微分享链 adhoc 对账入队异常", e); } try { logger.info("---------- 企微分享链 adhoc 队列:开始 drain ----------"); adhocQueuePopped = logisticsService.drainPendingShareLinkQueue(); } catch (Exception e) { logger.error("企微分享链 adhoc 队列 drain 异常", e); } } long elapsed = System.currentTimeMillis() - t0; logger.info("========== 物流扫描任务统计(本轮) =========="); logger.info("订单:候选 {} 条 | 跳过(Redis已处理) {} | 本次已拉取处理 {} | 成功 {} | 失败或未推送 {}", orderCandidates, orderSkipped, orderProcessed, orderSuccess, orderFailed); logger.info("企微分享链队列:对账入队 {} 条 | Redis 弹出并处理 {} 条(未出单等情况会重入队,与逐条日志一致)", adhocReconciled, adhocQueuePopped); logger.info("总耗时 {} ms", elapsed); logger.info("=========================================="); } }