This commit is contained in:
van
2026-04-03 17:18:27 +08:00
parent d4fdf076e9
commit fa7e26cf6e
6 changed files with 85 additions and 10 deletions

View File

@@ -79,15 +79,16 @@ public class WeComShareLinkLogisticsJobController extends BaseController {
job.getTrackingUrl(), remark, touser);
data.put("jobKey", jobKey);
int nextAttempts = job.getScanAttempts() == null ? 1 : job.getScanAttempts() + 1;
int successAttempts = job.getScanAttempts() == null ? 1 : job.getScanAttempts() + 1;
String adhocNote = data.get("adhocNote") != null ? data.get("adhocNote").toString() : "";
String note = "manual:" + adhocNote;
if (Boolean.TRUE.equals(data.get("terminalSuccess"))) {
String wb = data.get("waybillNo") != null ? data.get("waybillNo").toString() : null;
weComShareLinkLogisticsJobMapper.updateByJobKey(jobKey, "PUSHED", note, nextAttempts,
weComShareLinkLogisticsJobMapper.updateByJobKey(jobKey, "PUSHED", note, successAttempts,
StringUtils.hasText(wb) ? wb : null);
} else {
weComShareLinkLogisticsJobMapper.updateByJobKey(jobKey, "WAITING", note, nextAttempts, null);
/* 失败仍走自动队列时,不得垫高 scan_attempts否则 Redis attempts 与定时 drain 上限错位,未超限也会被放弃 */
weComShareLinkLogisticsJobMapper.updateByJobKey(jobKey, "WAITING", note, job.getScanAttempts(), null);
WeComShareLinkLogisticsJob refreshed = weComShareLinkLogisticsJobService.selectByJobKey(jobKey);
if (refreshed != null) {
logisticsService.pushShareLinkJobToRedis(refreshed);

View File

@@ -18,4 +18,10 @@ public interface WeComShareLinkLogisticsJobMapper {
WeComShareLinkLogisticsJob selectByJobKey(String jobKey);
List<WeComShareLinkLogisticsJob> selectWeComShareLinkLogisticsJobList(WeComShareLinkLogisticsJob query);
/**
* 待自动扫描但可能未在 Redis 队列中的任务(如纯 SQL 补录的 IMPORTED、入队失败后的 PENDING 等)。
* 仅 {@code create_time} 在最近一个月内的记录,避免扫到过旧历史。
*/
List<WeComShareLinkLogisticsJob> selectJobsNeedingQueueReconcile(@Param("limit") int limit);
}

View File

@@ -30,10 +30,17 @@ public interface ILogisticsService {
/**
* 将已落库任务按与 {@link #enqueueShareLinkForScan} 相同的 JSON 格式推入 Redis补录、手动失败后重入队等
* {@code attempts} 取自 {@link WeComShareLinkLogisticsJob#getScanAttempts()},与 drain 重试计数对齐
* Redis 项中的 attempts 与定时 drain 重试上限对齐,取自 {@link WeComShareLinkLogisticsJob#getScanAttempts()}(仅由 drain 回写递增;管理端手动拉取失败不会垫高该计数)
*/
void pushShareLinkJobToRedis(WeComShareLinkLogisticsJob job);
/**
* 扫描落库表中仍待处理的任务({@code create_time} 一个月内),在限频下补入 Redis覆盖纯 SQL 补录、Redis 曾丢失等),避免仅靠企微回调入队。
*
* @return 本次获得限频锁并成功推入队列的条数
*/
int reconcileShareLinkJobsIntoPendingQueue();
/**
* 定时任务内:依次弹出队列并调用 {@link #fetchLogisticsByShareLinkAndPush}。
*

View File

@@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Calendar;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -42,6 +43,10 @@ public class LogisticsServiceImpl implements ILogisticsService {
private static final String REDIS_ADHOC_WAYBILL_PREFIX = "logistics:adhoc:waybill:";
/** 企微分享链+备注入队FIFORPUSH 入队、LPOP 出队 */
private static final String REDIS_ADHOC_PENDING_QUEUE = "logistics:adhoc:pending:queue";
/** 对账补入队限频:避免每轮定时任务对同一 jobKey 重复 RPUSH */
private static final String REDIS_ADHOC_RECONCILE_ENQUEUE_PREFIX = "logistics:adhoc:reconcile:enqueue:";
private static final int ADHOC_RECONCILE_ROW_LIMIT = 200;
private static final long ADHOC_RECONCILE_ENQUEUE_LOCK_SECONDS = 8 * 60;
/** 单次队列项最多重新入队次数(约对应多天 × 每 10 分钟一轮) */
private static final int ADHOC_MAX_REQUEUE_ATTEMPTS = 500;
private static final String PUSH_URL = "https://wxts.van333.cn/wx/send/pdd";
@@ -472,6 +477,42 @@ public class LogisticsServiceImpl implements ILogisticsService {
job.getUserRemark(), job.getTouserPush(), job.getFromUserName());
}
@Override
public int reconcileShareLinkJobsIntoPendingQueue() {
int pushed = 0;
try {
List<WeComShareLinkLogisticsJob> rows = weComShareLinkLogisticsJobMapper.selectJobsNeedingQueueReconcile(
ADHOC_RECONCILE_ROW_LIMIT);
if (rows == null || rows.isEmpty()) {
return 0;
}
for (WeComShareLinkLogisticsJob job : rows) {
if (job == null || !StringUtils.hasText(job.getJobKey())) {
continue;
}
String lockKey = REDIS_ADHOC_RECONCILE_ENQUEUE_PREFIX + job.getJobKey().trim();
Boolean gotLock = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1",
ADHOC_RECONCILE_ENQUEUE_LOCK_SECONDS, TimeUnit.SECONDS);
if (!Boolean.TRUE.equals(gotLock)) {
continue;
}
try {
pushShareLinkJobToRedis(job);
pushed++;
} catch (Exception e) {
try {
stringRedisTemplate.delete(lockKey);
} catch (Exception ignored) {
}
logger.warn("adhoc 对账入队失败 jobKey={} err={}", job.getJobKey(), e.toString());
}
}
} catch (Exception e) {
logger.error("adhoc 分享链对账入队异常", e);
}
return pushed;
}
/** 与 drain 解析字段一致jobKey、attempts、trackingUrl、remark、touser、fromWecom */
private void rightPushAdhocQueueJson(String jobKey, int attempts, String trackingUrl, String remark,
String touser, String fromWecom) {
@@ -508,18 +549,20 @@ public class LogisticsServiceImpl implements ILogisticsService {
int attempts = o.getIntValue("attempts");
AdhocTryResult tr = tryAdhocShareLinkOnce(url, remark, touser, null);
if (tr.needsRequeue) {
if (attempts >= ADHOC_MAX_REQUEUE_ATTEMPTS) {
int nextAttempts = attempts + 1;
if (nextAttempts > ADHOC_MAX_REQUEUE_ATTEMPTS) {
logger.warn("adhoc 已达最大重试次数 {},放弃 jobKey={} url={} note={}",
ADHOC_MAX_REQUEUE_ATTEMPTS, jobKey, url, tr.note);
touchShareLinkJobRow(jobKey, "ABANDONED", tr.note, attempts + 1, null);
touchShareLinkJobRow(jobKey, "ABANDONED", tr.note, nextAttempts, null);
continue;
}
o.put("attempts", attempts + 1);
o.put("attempts", nextAttempts);
stringRedisTemplate.opsForList().rightPush(REDIS_ADHOC_PENDING_QUEUE, o.toJSONString());
touchShareLinkJobRow(jobKey, "WAITING", tr.note, attempts + 1, null);
touchShareLinkJobRow(jobKey, "WAITING", tr.note, nextAttempts, null);
} else {
int doneAttempts = attempts + 1;
touchShareLinkJobRow(jobKey, "PUSHED",
tr.note, attempts + 1, StringUtils.hasText(tr.waybillNo) ? tr.waybillNo : null);
tr.note, doneAttempts, StringUtils.hasText(tr.waybillNo) ? tr.waybillNo : null);
}
} catch (Exception e) {
logger.error("adhoc 队列项处理失败 raw={}", json.length() > 200 ? json.substring(0, 200) + "..." : json, e);

View File

@@ -38,6 +38,7 @@ public class LogisticsScanTask {
int orderSuccess = 0;
int orderFailed = 0;
int adhocQueuePopped = 0;
int adhocReconciled = 0;
logger.info("========== 物流扫描定时任务开始(订单 F/PDD + 企微分享链队列) ==========");
@@ -83,6 +84,13 @@ public class LogisticsScanTask {
} catch (Exception e) {
logger.error("订单扫描阶段异常", e);
} finally {
try {
logger.info("---------- 企微分享链 adhoc落库对账入队IMPORTED/PENDING/WAITING 等) ----------");
adhocReconciled = logisticsService.reconcileShareLinkJobsIntoPendingQueue();
logger.info("企微分享链 adhoc对账入队 {} 条(余者可能尚在限频窗口内)", adhocReconciled);
} catch (Exception e) {
logger.error("企微分享链 adhoc 对账入队异常", e);
}
try {
logger.info("---------- 企微分享链 adhoc 队列:开始 drain ----------");
adhocQueuePopped = logisticsService.drainPendingShareLinkQueue();
@@ -95,7 +103,8 @@ public class LogisticsScanTask {
logger.info("========== 物流扫描任务统计(本轮) ==========");
logger.info("订单:候选 {} 条 | 跳过(Redis已处理) {} | 本次已拉取处理 {} | 成功 {} | 失败或未推送 {}",
orderCandidates, orderSkipped, orderProcessed, orderSuccess, orderFailed);
logger.info("企微分享链队列Redis 弹出并处理 {} 条(未出单等情况会重入队,与逐条日志一致)", adhocQueuePopped);
logger.info("企微分享链队列:对账入队 {} 条 | Redis 弹出并处理 {} 条(未出单等情况会重入队,与逐条日志一致)",
adhocReconciled, adhocQueuePopped);
logger.info("总耗时 {} ms", elapsed);
logger.info("==========================================");
}

View File

@@ -69,4 +69,13 @@
</where>
order by id desc
</select>
<select id="selectJobsNeedingQueueReconcile" resultMap="WeComShareLinkLogisticsJobResult">
<include refid="selectVo"/>
where status in ('PENDING', 'WAITING', 'IMPORTED')
and tracking_url is not null and trim(tracking_url) != ''
and create_time >= date_sub(now(), interval 1 month)
order by id asc
limit #{limit}
</select>
</mapper>