1
This commit is contained in:
@@ -0,0 +1,33 @@
|
||||
package com.ruoyi.jarvis.service;
|
||||
|
||||
/**
|
||||
* 腾讯文档延迟推送服务接口
|
||||
*
|
||||
* @author system
|
||||
*/
|
||||
public interface ITencentDocDelayedPushService {
|
||||
|
||||
/**
|
||||
* 触发延迟推送
|
||||
* 录单时调用此方法,会重置10分钟倒计时
|
||||
*/
|
||||
void triggerDelayedPush();
|
||||
|
||||
/**
|
||||
* 立即执行推送(用于手动触发)
|
||||
*/
|
||||
void executePushNow();
|
||||
|
||||
/**
|
||||
* 获取下次推送的剩余时间(秒)
|
||||
*
|
||||
* @return 剩余秒数,如果没有待推送返回-1
|
||||
*/
|
||||
long getRemainingSeconds();
|
||||
|
||||
/**
|
||||
* 取消待推送任务
|
||||
*/
|
||||
void cancelPendingPush();
|
||||
}
|
||||
|
||||
@@ -48,6 +48,8 @@ public class InstructionServiceImpl implements IInstructionService {
|
||||
private com.ruoyi.jarvis.config.TencentDocConfig tencentDocConfig;
|
||||
@Resource
|
||||
private com.ruoyi.common.core.redis.RedisCache redisCache;
|
||||
@Resource(required = false)
|
||||
private com.ruoyi.jarvis.service.ITencentDocDelayedPushService tencentDocDelayedPushService;
|
||||
|
||||
// 录单模板(与 jd/JDUtil 中 WENAN_D 保持一致)
|
||||
private static final String WENAN_D = "单:\n" + "{单号} \n备注:{单的备注}\n" + "分销标记:{分销标记}\n" + "第三方单号:{第三方单号}\n" + "型号:\n" + "{型号}\n" + "链接:\n" + "{链接}\n" + "下单付款:\n" + "\n" + "后返金额:\n" + "\n" + "地址:\n" + "{地址}\n" + "物流链接:\n" + "\n" + "订单号:\n" + "\n" + "下单人:\n" + "\n" + "京粉实际价格:\n" + "\n";
|
||||
@@ -1233,11 +1235,20 @@ private String handleTF(String input) {
|
||||
jdOrderService.insertJDOrder(order);
|
||||
}
|
||||
|
||||
// 注意:H-TF订单不再自动写入腾讯文档,需通过订单列表手动触发
|
||||
// 原因:防止并发写入和数据覆盖,需要人工确认
|
||||
// if ("H-TF".equals(order.getDistributionMark())) {
|
||||
// asyncWriteToTencentDoc(order);
|
||||
// }
|
||||
// H-TF订单触发延迟推送机制
|
||||
// 录单后,重置10分钟倒计时,10分钟内无新录单则自动推送
|
||||
if ("H-TF".equals(order.getDistributionMark())) {
|
||||
try {
|
||||
if (tencentDocDelayedPushService != null) {
|
||||
tencentDocDelayedPushService.triggerDelayedPush();
|
||||
System.out.println("✓ H-TF订单已触发延迟推送 - 单号: " + order.getRemark() +
|
||||
", 第三方单号: " + order.getThirdPartyOrderNo());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// 触发延迟推送失败不影响录单结果
|
||||
System.err.println("✗ 触发延迟推送失败 - 单号: " + order.getRemark() + ", 错误: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
// 返回完整的表单格式,使用原始输入保留完整物流链接
|
||||
return formatOrderForm(order, originalInput);
|
||||
|
||||
@@ -0,0 +1,284 @@
|
||||
package com.ruoyi.jarvis.service.impl;
|
||||
|
||||
import com.ruoyi.common.core.redis.RedisCache;
|
||||
import com.ruoyi.jarvis.service.ITencentDocDelayedPushService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 腾讯文档延迟推送服务实现
|
||||
*
|
||||
* 功能说明:
|
||||
* 1. 录单时触发10分钟倒计时
|
||||
* 2. 10分钟内有新录单,重置倒计时
|
||||
* 3. 10分钟到期后自动执行推送
|
||||
* 4. 推送执行期间有录单,推送完成后重新开始倒计时
|
||||
* 5. 使用分布式锁防止并发推送
|
||||
*
|
||||
* @author system
|
||||
*/
|
||||
@Service
|
||||
public class TencentDocDelayedPushServiceImpl implements ITencentDocDelayedPushService {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(TencentDocDelayedPushServiceImpl.class);
|
||||
|
||||
@Autowired
|
||||
private RedisCache redisCache;
|
||||
|
||||
/**
|
||||
* 延迟时间(分钟),可通过配置文件修改
|
||||
*/
|
||||
@Value("${tencent.doc.delayed.push.minutes:10}")
|
||||
private int delayMinutes;
|
||||
|
||||
/**
|
||||
* Redis Key - 存储下次推送的时间戳
|
||||
*/
|
||||
private static final String REDIS_KEY_NEXT_PUSH_TIME = "tendoc:delayed_push:next_time";
|
||||
|
||||
/**
|
||||
* Redis Key - 推送执行锁
|
||||
*/
|
||||
private static final String REDIS_KEY_PUSH_LOCK = "tendoc:delayed_push:lock";
|
||||
|
||||
/**
|
||||
* Redis Key - 推送期间有新录单标记
|
||||
*/
|
||||
private static final String REDIS_KEY_NEW_ORDER_FLAG = "tendoc:delayed_push:new_order_flag";
|
||||
|
||||
/**
|
||||
* 定时任务执行器
|
||||
*/
|
||||
private ScheduledExecutorService scheduler;
|
||||
|
||||
/**
|
||||
* 初始化定时任务
|
||||
*/
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
// 创建单线程的定时任务执行器
|
||||
scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
|
||||
Thread thread = new Thread(r, "TencentDoc-DelayedPush-Thread");
|
||||
thread.setDaemon(true);
|
||||
return thread;
|
||||
});
|
||||
|
||||
// 每30秒检查一次是否需要推送
|
||||
scheduler.scheduleWithFixedDelay(this::checkAndExecutePush, 30, 30, TimeUnit.SECONDS);
|
||||
|
||||
log.info("腾讯文档延迟推送服务已启动,延迟时间: {} 分钟", delayMinutes);
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭定时任务
|
||||
*/
|
||||
@PreDestroy
|
||||
public void destroy() {
|
||||
if (scheduler != null && !scheduler.isShutdown()) {
|
||||
scheduler.shutdown();
|
||||
try {
|
||||
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
|
||||
scheduler.shutdownNow();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
scheduler.shutdownNow();
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
log.info("腾讯文档延迟推送服务已关闭");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void triggerDelayedPush() {
|
||||
try {
|
||||
// 计算下次推送时间 = 当前时间 + 延迟分钟数
|
||||
long nextPushTime = System.currentTimeMillis() + (delayMinutes * 60 * 1000L);
|
||||
|
||||
// 检查是否正在执行推送
|
||||
String lockValue = redisCache.getCacheObject(REDIS_KEY_PUSH_LOCK);
|
||||
if (lockValue != null && "locked".equals(lockValue)) {
|
||||
// 正在推送中,标记有新订单,推送完成后会重新触发
|
||||
redisCache.setCacheObject(REDIS_KEY_NEW_ORDER_FLAG, "true", 1, TimeUnit.HOURS);
|
||||
log.info("推送执行中,标记有新订单,推送完成后将重新开始倒计时");
|
||||
return;
|
||||
}
|
||||
|
||||
// 更新下次推送时间
|
||||
redisCache.setCacheObject(REDIS_KEY_NEXT_PUSH_TIME, nextPushTime, delayMinutes + 5, TimeUnit.MINUTES);
|
||||
|
||||
log.info("触发延迟推送,{}分钟后执行({})", delayMinutes,
|
||||
new java.text.SimpleDateFormat("HH:mm:ss").format(new java.util.Date(nextPushTime)));
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("触发延迟推送失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void executePushNow() {
|
||||
log.info("手动触发立即推送");
|
||||
// 清除待推送标记
|
||||
redisCache.deleteObject(REDIS_KEY_NEXT_PUSH_TIME);
|
||||
// 执行推送
|
||||
doExecutePush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getRemainingSeconds() {
|
||||
try {
|
||||
Long nextPushTime = redisCache.getCacheObject(REDIS_KEY_NEXT_PUSH_TIME);
|
||||
if (nextPushTime == null) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
long remaining = (nextPushTime - System.currentTimeMillis()) / 1000;
|
||||
return remaining > 0 ? remaining : 0;
|
||||
} catch (Exception e) {
|
||||
log.error("获取剩余时间失败", e);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancelPendingPush() {
|
||||
redisCache.deleteObject(REDIS_KEY_NEXT_PUSH_TIME);
|
||||
log.info("已取消待推送任务");
|
||||
}
|
||||
|
||||
/**
|
||||
* 定时检查并执行推送
|
||||
*/
|
||||
private void checkAndExecutePush() {
|
||||
try {
|
||||
// 获取下次推送时间
|
||||
Long nextPushTime = redisCache.getCacheObject(REDIS_KEY_NEXT_PUSH_TIME);
|
||||
if (nextPushTime == null) {
|
||||
// 没有待推送任务
|
||||
return;
|
||||
}
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
if (now < nextPushTime) {
|
||||
// 还没到推送时间
|
||||
long remainingSeconds = (nextPushTime - now) / 1000;
|
||||
log.debug("距离下次推送还有 {} 秒", remainingSeconds);
|
||||
return;
|
||||
}
|
||||
|
||||
// 时间到了,执行推送
|
||||
log.info("倒计时结束,开始执行推送");
|
||||
doExecutePush();
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("检查推送任务失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行推送
|
||||
*/
|
||||
private void doExecutePush() {
|
||||
String lockValue = null;
|
||||
try {
|
||||
// 1. 尝试获取分布式锁
|
||||
lockValue = redisCache.getCacheObject(REDIS_KEY_PUSH_LOCK);
|
||||
if (lockValue != null && "locked".equals(lockValue)) {
|
||||
log.warn("推送任务已在执行中,跳过本次推送");
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 加锁(30分钟超时,防止死锁)
|
||||
redisCache.setCacheObject(REDIS_KEY_PUSH_LOCK, "locked", 30, TimeUnit.MINUTES);
|
||||
log.info("✓ 获取推送锁成功,开始执行推送");
|
||||
|
||||
// 3. 清除待推送标记
|
||||
redisCache.deleteObject(REDIS_KEY_NEXT_PUSH_TIME);
|
||||
|
||||
// 4. 清除新订单标记
|
||||
redisCache.deleteObject(REDIS_KEY_NEW_ORDER_FLAG);
|
||||
|
||||
// 5. 调用批量同步接口
|
||||
// 注意:这里需要通过HTTP调用Controller的接口,或者注入Controller的方法
|
||||
// 为了避免循环依赖,这里使用Spring的ApplicationContext来获取Bean
|
||||
executeBatchSync();
|
||||
|
||||
log.info("✓ 推送执行完成");
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("❌ 推送执行失败", e);
|
||||
} finally {
|
||||
// 6. 释放锁
|
||||
try {
|
||||
redisCache.deleteObject(REDIS_KEY_PUSH_LOCK);
|
||||
log.info("✓ 释放推送锁");
|
||||
} catch (Exception e) {
|
||||
log.error("释放推送锁失败", e);
|
||||
}
|
||||
|
||||
// 7. 检查是否有新订单标记
|
||||
String newOrderFlag = redisCache.getCacheObject(REDIS_KEY_NEW_ORDER_FLAG);
|
||||
if (newOrderFlag != null && "true".equals(newOrderFlag)) {
|
||||
log.info("推送期间有新订单,重新开始倒计时");
|
||||
redisCache.deleteObject(REDIS_KEY_NEW_ORDER_FLAG);
|
||||
triggerDelayedPush();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行批量同步
|
||||
*
|
||||
* 说明:这里通过HTTP调用本地接口,避免复杂的依赖注入
|
||||
*/
|
||||
private void executeBatchSync() {
|
||||
try {
|
||||
log.info("开始执行批量同步...");
|
||||
|
||||
// 使用RestTemplate或HttpClient调用本地接口
|
||||
// 这里简化处理,直接发送HTTP请求到本地
|
||||
java.net.URL url = new java.net.URL("http://localhost:30313/jarvis-api/jarvis/tendoc/fillLogisticsByOrderNo");
|
||||
java.net.HttpURLConnection conn = (java.net.HttpURLConnection) url.openConnection();
|
||||
conn.setRequestMethod("POST");
|
||||
conn.setRequestProperty("Content-Type", "application/json");
|
||||
conn.setDoOutput(true);
|
||||
|
||||
// 发送空JSON对象
|
||||
try (java.io.OutputStream os = conn.getOutputStream()) {
|
||||
byte[] input = "{}".getBytes("utf-8");
|
||||
os.write(input, 0, input.length);
|
||||
}
|
||||
|
||||
int responseCode = conn.getResponseCode();
|
||||
log.info("批量同步调用完成,响应码: {}", responseCode);
|
||||
|
||||
if (responseCode == 200) {
|
||||
// 读取响应
|
||||
try (java.io.BufferedReader br = new java.io.BufferedReader(
|
||||
new java.io.InputStreamReader(conn.getInputStream(), "utf-8"))) {
|
||||
StringBuilder response = new StringBuilder();
|
||||
String responseLine;
|
||||
while ((responseLine = br.readLine()) != null) {
|
||||
response.append(responseLine.trim());
|
||||
}
|
||||
log.info("批量同步结果: {}", response.toString());
|
||||
}
|
||||
} else {
|
||||
log.error("批量同步调用失败,响应码: {}", responseCode);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("执行批量同步失败", e);
|
||||
throw new RuntimeException("执行批量同步失败", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user