This commit is contained in:
van
2026-04-03 00:42:53 +08:00
parent 2e5540904f
commit 6b88e5376e
3 changed files with 143 additions and 5 deletions

View File

@@ -4,15 +4,20 @@ import com.ruoyi.common.core.controller.BaseController;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.core.page.TableDataInfo;
import com.ruoyi.jarvis.domain.WeComShareLinkLogisticsJob;
import com.ruoyi.jarvis.mapper.WeComShareLinkLogisticsJobMapper;
import com.ruoyi.jarvis.service.ILogisticsService;
import com.ruoyi.jarvis.service.IWeComShareLinkLogisticsJobService;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -22,6 +27,10 @@ public class WeComShareLinkLogisticsJobController extends BaseController {
@Resource
private IWeComShareLinkLogisticsJobService weComShareLinkLogisticsJobService;
@Resource
private ILogisticsService logisticsService;
@Resource
private WeComShareLinkLogisticsJobMapper weComShareLinkLogisticsJobMapper;
@PreAuthorize("@ss.hasPermi('jarvis:wecom:shareLinkLog:list')")
@GetMapping("/list")
@@ -43,4 +52,56 @@ public class WeComShareLinkLogisticsJobController extends BaseController {
Map<String, Object> r = weComShareLinkLogisticsJobService.backfillImportedFromInboundTrace();
return success(r);
}
/**
* 与订单列表「获取物流」一致:立即请求物流接口,有运单则推送分享链模板,并回写任务行。
*/
@PreAuthorize("@ss.hasPermi('jarvis:wecom:shareLinkLog:list')")
@PostMapping("/fetchShareLinkManually")
public AjaxResult fetchShareLinkManually(@RequestBody Map<String, Object> body) {
if (body == null || body.get("jobKey") == null) {
return AjaxResult.error("jobKey 不能为空");
}
String jobKey = body.get("jobKey").toString().trim();
if (!StringUtils.hasText(jobKey)) {
return AjaxResult.error("jobKey 不能为空");
}
WeComShareLinkLogisticsJob job = weComShareLinkLogisticsJobService.selectByJobKey(jobKey);
if (job == null) {
return AjaxResult.error("任务不存在");
}
if (!StringUtils.hasText(job.getTrackingUrl())) {
return AjaxResult.error("该任务无物流短链");
}
String remark = job.getUserRemark() != null ? job.getUserRemark() : "";
String touser = job.getTouserPush() != null ? job.getTouserPush() : "";
Map<String, Object> data = logisticsService.adminFetchShareLinkLogisticsDebug(
job.getTrackingUrl(), remark, touser);
data.put("jobKey", jobKey);
int nextAttempts = job.getScanAttempts() == null ? 1 : job.getScanAttempts() + 1;
String adhocNote = data.get("adhocNote") != null ? data.get("adhocNote").toString() : "";
String note = "manual:" + adhocNote;
if (Boolean.TRUE.equals(data.get("terminalSuccess"))) {
String wb = data.get("waybillNo") != null ? data.get("waybillNo").toString() : null;
weComShareLinkLogisticsJobMapper.updateByJobKey(jobKey, "PUSHED", note, nextAttempts,
StringUtils.hasText(wb) ? wb : null);
} else {
weComShareLinkLogisticsJobMapper.updateByJobKey(jobKey, "WAITING", note, nextAttempts, null);
}
return AjaxResult.success(data);
}
/**
* 手动执行一轮与定时任务相同的 Redis 待队列弹出(条数上限同 adhoc-pending-batch-size
*/
@PreAuthorize("@ss.hasPermi('jarvis:wecom:shareLinkLog:list')")
@PostMapping("/drainPendingQueueOnce")
public AjaxResult drainPendingQueueOnce() {
int n = logisticsService.drainPendingShareLinkQueue();
Map<String, Object> r = new LinkedHashMap<>();
r.put("processedFromQueue", n);
r.put("hint", "为单次弹栈处理条数;每项内部仍可能因未出单重新入队");
return AjaxResult.success(r);
}
}

View File

@@ -2,6 +2,8 @@ package com.ruoyi.jarvis.service;
import com.ruoyi.jarvis.domain.JDOrder;
import java.util.Map;
/**
* 物流信息服务接口
*/
@@ -27,8 +29,15 @@ public interface ILogisticsService {
/**
* 定时任务内:依次弹出队列并调用 {@link #fetchLogisticsByShareLinkAndPush}。
*
* @return 本轮从 Redis 弹出并尝试处理的条数(与定时任务尾段逻辑一致)
*/
void drainPendingShareLinkQueue();
int drainPendingShareLinkQueue();
/**
* 管理端调试立刻请求物流接口并尝试推送分享链模板返回字段与订单「手动获取物流」相近requestUrl、responseRaw、responseData、pushSent 等)。
*/
Map<String, Object> adminFetchShareLinkLogisticsDebug(String trackingUrl, String remark, String touser);
/** 测试清理:删除分享链待扫描队列键 */
void clearAdhocPendingQueue();

View File

@@ -22,6 +22,8 @@ import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Calendar;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -468,13 +470,15 @@ public class LogisticsServiceImpl implements ILogisticsService {
}
@Override
public void drainPendingShareLinkQueue() {
public int drainPendingShareLinkQueue() {
int processed = 0;
int n = Math.max(1, adhocPendingBatchSize);
for (int i = 0; i < n; i++) {
String json = stringRedisTemplate.opsForList().leftPop(REDIS_ADHOC_PENDING_QUEUE);
if (json == null || !StringUtils.hasText(json)) {
break;
}
processed++;
try {
JSONObject o = JSON.parseObject(json);
if (o == null) {
@@ -485,7 +489,7 @@ public class LogisticsServiceImpl implements ILogisticsService {
String touser = o.getString("touser");
String jobKey = o.getString("jobKey");
int attempts = o.getIntValue("attempts");
AdhocTryResult tr = tryAdhocShareLinkOnce(url, remark, touser);
AdhocTryResult tr = tryAdhocShareLinkOnce(url, remark, touser, null);
if (tr.needsRequeue) {
if (attempts >= ADHOC_MAX_REQUEUE_ATTEMPTS) {
logger.warn("adhoc 已达最大重试次数 {},放弃 jobKey={} url={} note={}",
@@ -504,6 +508,7 @@ public class LogisticsServiceImpl implements ILogisticsService {
logger.error("adhoc 队列项处理失败 raw={}", json.length() > 200 ? json.substring(0, 200) + "..." : json, e);
}
}
return processed;
}
@Override
@@ -518,10 +523,22 @@ public class LogisticsServiceImpl implements ILogisticsService {
logger.warn("分享物流链接为空");
return false;
}
AdhocTryResult tr = tryAdhocShareLinkOnce(trackingUrl, remark, touser);
AdhocTryResult tr = tryAdhocShareLinkOnce(trackingUrl, remark, touser, null);
return !tr.needsRequeue;
}
@Override
public Map<String, Object> adminFetchShareLinkLogisticsDebug(String trackingUrl, String remark, String touser) {
Map<String, Object> debug = new LinkedHashMap<>();
AdhocTryResult tr = tryAdhocShareLinkOnce(trackingUrl, remark, touser, debug);
debug.put("terminalSuccess", !tr.needsRequeue);
debug.put("adhocNote", tr.note);
if (StringUtils.hasText(tr.waybillNo)) {
debug.put("waybillNo", tr.waybillNo);
}
return debug;
}
private void touchShareLinkJobRow(String jobKey, String status, String lastNote, int scanAttempts, String waybillNo) {
if (!StringUtils.hasText(jobKey)) {
return;
@@ -535,36 +552,74 @@ public class LogisticsServiceImpl implements ILogisticsService {
/**
* 处理一条分享链:需要重新入队时 {@link #needsRequeue} 为 true未发货、接口异常、推送失败等
*
* @param debug 非空时写入与订单手动接口相近的调试字段requestUrl、responseRaw、responseData、pushSent 等)
*/
private AdhocTryResult tryAdhocShareLinkOnce(String trackingUrl, String remark, String touser) {
private AdhocTryResult tryAdhocShareLinkOnce(String trackingUrl, String remark, String touser, Map<String, Object> debug) {
if (debug != null) {
debug.put("pushSent", false);
}
if (!StringUtils.hasText(trackingUrl)) {
if (debug != null) {
debug.put("error", "分享物流链接为空");
}
return AdhocTryResult.requeue("empty_url");
}
String url = trackingUrl.trim();
if (debug != null) {
debug.put("trackingUrl", url);
debug.put("logisticsLink", url);
debug.put("userRemark", remark != null ? remark : "");
}
String dedupeKey = REDIS_ADHOC_WAYBILL_PREFIX + DigestUtils.md5DigestAsHex(url.getBytes(StandardCharsets.UTF_8));
try {
ILogisticsService.HealthCheckResult healthResult = checkHealth();
if (!healthResult.isHealthy()) {
logger.error("物流服务不可用adhoc 将重试: {}", healthResult.getMessage());
if (debug != null) {
debug.put("healthOk", false);
debug.put("healthMessage", healthResult.getMessage());
}
return AdhocTryResult.requeue("health:" + healthResult.getMessage());
}
if (debug != null) {
debug.put("healthOk", true);
}
String externalUrl = externalApiUrlTemplate + URLEncoder.encode(url, "UTF-8");
if (debug != null) {
debug.put("requestUrl", externalUrl);
}
String result = HttpUtils.sendGet(externalUrl);
if (!StringUtils.hasText(result)) {
logger.warn("物流接口空响应 adhoc将重试");
if (debug != null) {
debug.put("responseRaw", result);
}
return AdhocTryResult.requeue("empty_http_body");
}
if (debug != null) {
debug.put("responseRaw", result);
}
JSONObject parsedData;
try {
Object parsed = JSON.parse(result);
if (!(parsed instanceof JSONObject)) {
if (debug != null) {
debug.put("responseData", parsed);
}
return AdhocTryResult.requeue("not_json_object");
}
parsedData = (JSONObject) parsed;
} catch (Exception e) {
logger.warn("物流响应非JSON adhoc: {}", e.getMessage());
if (debug != null) {
debug.put("jsonParseError", e.getMessage());
}
return AdhocTryResult.requeue("json_parse:" + e.getMessage());
}
if (debug != null) {
debug.put("responseData", parsedData);
}
JSONObject dataObj = parsedData.getJSONObject("data");
if (dataObj == null) {
return AdhocTryResult.requeue("no_data_field");
@@ -578,6 +633,10 @@ public class LogisticsServiceImpl implements ILogisticsService {
String existing = stringRedisTemplate.opsForValue().get(dedupeKey);
if (existing != null && existing.equals(waybillNo)) {
logger.info("adhoc 该链接已推送过运单 {}", waybillNo);
if (debug != null) {
debug.put("pushSent", false);
debug.put("skipPushReason", "already_pushed_same_waybill");
}
return AdhocTryResult.done("already_pushed", waybillNo);
}
StringBuilder pushContent = new StringBuilder();
@@ -595,6 +654,12 @@ public class LogisticsServiceImpl implements ILogisticsService {
}
String pushResult = sendPostWithHeaders(PUSH_URL, pushParam.toJSONString(), PUSH_TOKEN);
boolean success = isPushResponseSuccess(pushResult);
if (debug != null) {
debug.put("pushSent", success);
if (!success && pushResult != null) {
debug.put("pushError", pushResult.length() > 500 ? pushResult.substring(0, 500) + "..." : pushResult);
}
}
if (success) {
stringRedisTemplate.opsForValue().set(dedupeKey, waybillNo, 30, TimeUnit.DAYS);
logger.info("adhoc 物流推送成功 waybill={}", waybillNo);
@@ -605,6 +670,9 @@ public class LogisticsServiceImpl implements ILogisticsService {
return AdhocTryResult.requeue("push_failed");
} catch (Exception e) {
logger.error("adhoc 物流推送异常,将重新入队", e);
if (debug != null) {
debug.put("exception", e.getMessage());
}
return AdhocTryResult.requeue("exception:" + e.getMessage());
}
}