从接口拿行数
This commit is contained in:
@@ -43,8 +43,6 @@ public class TencentDocConfigController extends BaseController {
|
||||
// Redis key前缀(用于存储文档配置)
|
||||
private static final String REDIS_KEY_PREFIX = "tencent:doc:auto:config:";
|
||||
|
||||
/** Redis key前缀,用于存储上次处理的最大行数(与 TencentDocController 一致) */
|
||||
private static final String LAST_PROCESSED_ROW_KEY_PREFIX = "tendoc:last_row:";
|
||||
|
||||
/**
|
||||
* 获取当前配置
|
||||
@@ -98,43 +96,33 @@ public class TencentDocConfigController extends BaseController {
|
||||
config.put("appId", tencentDocConfig.getAppId());
|
||||
config.put("apiBaseUrl", tencentDocConfig.getApiBaseUrl());
|
||||
|
||||
// 获取当前同步进度(如果有配置)
|
||||
// 注意:使用与 TencentDocController 相同的 Redis key 前缀
|
||||
// 从接口获取 rowCount(匹配 sheetId 的表格实际有数据的行数),替代 Redis 上次记录的行数
|
||||
if (fileId != null && !fileId.isEmpty() && sheetId != null && !sheetId.isEmpty()) {
|
||||
String syncProgressKey = LAST_PROCESSED_ROW_KEY_PREFIX + fileId + ":" + sheetId;
|
||||
Integer currentProgress = redisCache.getCacheObject(syncProgressKey);
|
||||
log.debug("读取同步进度 - key: {}, value: {}", syncProgressKey, currentProgress);
|
||||
if (currentProgress != null) {
|
||||
config.put("currentProgress", currentProgress);
|
||||
|
||||
// 根据回溯机制计算下次起始行
|
||||
int threshold = startRow + 100;
|
||||
int nextStartRow;
|
||||
String progressHint;
|
||||
|
||||
if (currentProgress <= (startRow + 49)) {
|
||||
// 进度较小,下次从配置起始行开始
|
||||
nextStartRow = startRow;
|
||||
progressHint = String.format("已读取到第 %d 行,下次将从第 %d 行重新开始(进度较小)",
|
||||
currentProgress, nextStartRow);
|
||||
} else if (currentProgress > threshold) {
|
||||
// 进度较大,下次回溯100行(但不能小于起始行)
|
||||
nextStartRow = Math.max(startRow, currentProgress - 100);
|
||||
progressHint = String.format("已读取到第 %d 行,下次将从第 %d 行开始(回溯100行,防止遗漏)",
|
||||
currentProgress, nextStartRow);
|
||||
try {
|
||||
String accessToken = tencentDocTokenService.getValidAccessToken();
|
||||
if (accessToken != null && !accessToken.isEmpty()) {
|
||||
int rowCount = tencentDocService.getSheetRowTotal(accessToken, fileId, sheetId);
|
||||
if (rowCount > 0) {
|
||||
config.put("currentProgress", rowCount);
|
||||
int effectiveStart = startRow != null ? startRow : 3;
|
||||
int nextStartRow = Math.max(effectiveStart, rowCount - 200);
|
||||
config.put("nextStartRow", nextStartRow);
|
||||
config.put("progressHint", String.format("表格当前有 %d 行数据(从接口获取),下次将从第 %d 行开始", rowCount, nextStartRow));
|
||||
} else {
|
||||
config.put("currentProgress", null);
|
||||
config.put("nextStartRow", startRow);
|
||||
config.put("progressHint", String.format("未获取到 rowCount,将从第 %d 行开始", startRow));
|
||||
}
|
||||
} else {
|
||||
// 进度在阈值范围内,下次从配置起始行开始
|
||||
nextStartRow = startRow;
|
||||
progressHint = String.format("已读取到第 %d 行,下次将从第 %d 行重新开始",
|
||||
currentProgress, nextStartRow);
|
||||
config.put("currentProgress", null);
|
||||
config.put("nextStartRow", startRow);
|
||||
config.put("progressHint", "未授权,无法获取表格行数");
|
||||
}
|
||||
|
||||
config.put("nextStartRow", nextStartRow);
|
||||
config.put("progressHint", progressHint);
|
||||
} else {
|
||||
} catch (Exception e) {
|
||||
log.warn("获取 rowCount 失败: {}", e.getMessage());
|
||||
config.put("currentProgress", null);
|
||||
config.put("nextStartRow", startRow);
|
||||
config.put("progressHint", String.format("尚未开始同步,将从第 %d 行开始", startRow));
|
||||
config.put("progressHint", "获取表格行数失败: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -212,13 +200,8 @@ public class TencentDocConfigController extends BaseController {
|
||||
redisCache.setCacheObject(REDIS_KEY_PREFIX + "headerRow", headerRow, 180, TimeUnit.DAYS);
|
||||
redisCache.setCacheObject(REDIS_KEY_PREFIX + "startRow", startRow, 180, TimeUnit.DAYS);
|
||||
|
||||
// 清除该文档的同步进度(配置更新时重置进度,从新的startRow重新开始)
|
||||
// 注意:使用与 TencentDocController 相同的 Redis key 前缀
|
||||
String syncProgressKey = LAST_PROCESSED_ROW_KEY_PREFIX + fileId.trim() + ":" + sheetId.trim();
|
||||
String configVersionKey = "tencent:doc:sync:config_version:" + fileId.trim() + ":" + sheetId.trim();
|
||||
redisCache.deleteObject(syncProgressKey);
|
||||
redisCache.deleteObject(configVersionKey);
|
||||
log.info("配置已更新,已清除同步进度 - key: {}, 将从第 {} 行重新开始同步", syncProgressKey, startRow);
|
||||
// 不再使用 Redis 存储进度,配置更新后下次从接口获取 rowCount 决定范围
|
||||
log.info("配置已更新,将从第 {} 行开始(下次从接口获取 rowCount)", startRow);
|
||||
|
||||
// 同时更新TencentDocConfig对象(内存中)
|
||||
tencentDocConfig.setFileId(fileId.trim());
|
||||
|
||||
@@ -54,9 +54,6 @@ public class TencentDocController extends BaseController {
|
||||
@Autowired
|
||||
private com.ruoyi.jarvis.service.ITencentDocDelayedPushService delayedPushService;
|
||||
|
||||
/** Redis key前缀,用于存储上次处理的最大行数 */
|
||||
private static final String LAST_PROCESSED_ROW_KEY_PREFIX = "tendoc:last_row:";
|
||||
|
||||
/** 单次请求最大行数(腾讯文档 API:行数≤1000) */
|
||||
private static final int API_MAX_ROWS_PER_REQUEST = 200;
|
||||
/** 用 rowTotal 时接口实际单次只能读 200 行 */
|
||||
@@ -989,36 +986,12 @@ public class TencentDocController extends BaseController {
|
||||
log.info("同步物流配置 - fileId: {}, sheetId: {}, batchId: {}, 配置起始行: {}, 表头行: {}",
|
||||
fileId, sheetId, batchId, configStartRow, headerRow);
|
||||
|
||||
// 生成Redis key,用于存储该文件的工作表的动态处理进度
|
||||
String redisKey = LAST_PROCESSED_ROW_KEY_PREFIX + fileId + ":" + sheetId;
|
||||
// 生成配置版本key,用于检测配置是否被重新设置
|
||||
String configVersionKey = "tencent:doc:sync:config_version:" + fileId + ":" + sheetId;
|
||||
String currentConfigVersion = configStartRow + "_" + headerRow; // 配置版本:起始行_表头行
|
||||
|
||||
// 检查配置是否被更新(如果配置版本变化,则重置动态进度)
|
||||
String savedConfigVersion = redisCache.getCacheObject(configVersionKey);
|
||||
if (savedConfigVersion == null || !savedConfigVersion.equals(currentConfigVersion)) {
|
||||
// 配置已更新,清除旧的处理进度
|
||||
redisCache.deleteObject(redisKey);
|
||||
redisCache.setCacheObject(configVersionKey, currentConfigVersion, 180, TimeUnit.DAYS);
|
||||
log.info("检测到配置更新,重置同步进度 - 新配置版本: {}", currentConfigVersion);
|
||||
}
|
||||
|
||||
// 获取动态处理进度(上次处理到的最大行数)
|
||||
Integer lastMaxRow = null;
|
||||
if (!forceStart && redisCache.hasKey(redisKey)) {
|
||||
Object cacheObj = redisCache.getCacheObject(redisKey);
|
||||
if (cacheObj != null) {
|
||||
lastMaxRow = Integer.valueOf(cacheObj.toString());
|
||||
log.info("读取到上次处理进度 - 最大行: {}", lastMaxRow);
|
||||
}
|
||||
}
|
||||
|
||||
// 不再使用 Redis 存储进度,改为每次从接口获取 rowCount(匹配 sheetId 的表格实际有数据的行数)
|
||||
int effectiveStartRow = configStartRow != null ? configStartRow : (headerRow + 1);
|
||||
int startRow;
|
||||
int endRow;
|
||||
|
||||
// 直接拉取文档信息得到 rowTotal,用「最大行 - 200」作为起始(接口实际只能读 200 行),且保证 -200 之后 > 2
|
||||
// 从接口获取 rowCount(匹配 sheetId 的表格实际有数据的行数),替代 Redis 上次记录的行数
|
||||
int rowTotal = tencentDocService.getSheetRowTotal(accessToken, fileId, sheetId);
|
||||
if (rowTotal > 0) {
|
||||
if (forceStartRow != null) {
|
||||
@@ -1030,22 +1003,18 @@ public class TencentDocController extends BaseController {
|
||||
Math.max(effectiveStartRow, rowTotal - READ_ROWS_WHEN_USE_ROW_TOTAL));
|
||||
endRow = Math.min(rowTotal, startRow + READ_ROWS_WHEN_USE_ROW_TOTAL - 1);
|
||||
}
|
||||
log.info("使用文档 rowTotal={},本次范围: 第 {} ~ {} 行(共 {} 行,单次最多 {} 行)",
|
||||
log.info("使用接口 rowCount={},本次范围: 第 {} ~ {} 行(共 {} 行,单次最多 {} 行)",
|
||||
rowTotal, startRow, endRow, endRow - startRow + 1, READ_ROWS_WHEN_USE_ROW_TOTAL);
|
||||
} else {
|
||||
// 未取到 rowTotal 时退回原逻辑
|
||||
// 未取到 rowCount 时使用配置起始行(不再从 Redis 读取进度)
|
||||
if (forceStartRow != null) {
|
||||
startRow = forceStartRow;
|
||||
endRow = startRow + API_MAX_ROWS_PER_REQUEST - 1;
|
||||
log.info("使用强制指定的起始行: {}, 结束行: {}(未取到 rowTotal)", startRow, endRow);
|
||||
} else if (lastMaxRow != null && lastMaxRow >= effectiveStartRow) {
|
||||
startRow = Math.max(effectiveStartRow, lastMaxRow + 1 - BACKTRACK_ROWS);
|
||||
endRow = startRow + API_MAX_ROWS_PER_REQUEST - 1;
|
||||
log.info("上次最大行: {},回溯后从第 {} 行到第 {} 行(未取到 rowTotal)", lastMaxRow, startRow, endRow);
|
||||
log.info("使用强制指定的起始行: {}, 结束行: {}(未取到 rowCount)", startRow, endRow);
|
||||
} else {
|
||||
startRow = effectiveStartRow;
|
||||
endRow = startRow + API_MAX_ROWS_PER_REQUEST - 1;
|
||||
log.info("首次同步,从第 {} 行到第 {} 行(未取到 rowTotal)", startRow, endRow);
|
||||
log.info("从配置起始行开始: {} ~ {}(未取到 rowCount)", startRow, endRow);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1064,10 +1033,7 @@ public class TencentDocController extends BaseController {
|
||||
// 配置/缓存的起始行超出表尾时,必须忽略 effectiveStartRow,按 rowTotal 回溯
|
||||
startRow = Math.max(MIN_START_ROW_WHEN_USE_ROW_TOTAL, rowTotal - READ_ROWS_WHEN_USE_ROW_TOTAL);
|
||||
endRow = Math.min(rowTotal, startRow + READ_ROWS_WHEN_USE_ROW_TOTAL - 1);
|
||||
log.info("按 rowTotal={} 修正范围(忽略超出表尾的配置起始行),避免超出: 第 {} ~ {} 行", rowTotal, startRow, endRow);
|
||||
// 将修正后的起始行写回配置,避免下次仍使用过大的值
|
||||
redisCache.setCacheObject(CONFIG_KEY_PREFIX + "startRow", startRow, 180, TimeUnit.DAYS);
|
||||
log.info("已自动修正 Redis 配置 startRow 为 {}", startRow);
|
||||
log.info("按 rowCount={} 修正范围(忽略超出表尾的配置起始行): 第 {} ~ {} 行", rowTotal, startRow, endRow);
|
||||
} else if (endRow > rowTotal) {
|
||||
endRow = rowTotal;
|
||||
log.info("按 rowTotal={} 截断结束行: endRow={}", rowTotal, endRow);
|
||||
@@ -1276,19 +1242,17 @@ public class TencentDocController extends BaseController {
|
||||
log.warn("sheetData内容预览: {}", sheetData.toJSONString().substring(0, Math.min(500, sheetData.toJSONString().length())));
|
||||
}
|
||||
|
||||
// 无数据时也推进进度,使用较小回溯尽快扫到后续行
|
||||
// 无数据时计算下次起始行(不再写入 Redis,下次从接口获取 rowCount)
|
||||
int emptyCurrentMax = effectiveEndRow;
|
||||
int emptyBacktrack = BACKTRACK_ROWS_WHEN_ALL_SKIPPED;
|
||||
int emptyNextStart = Math.max(effectiveStartRow,
|
||||
Math.max(startRow + MIN_ADVANCE_ROWS, emptyCurrentMax + 1 - emptyBacktrack));
|
||||
int emptyProgressToSave = Math.max(emptyCurrentMax, startRow + MIN_ADVANCE_ROWS + emptyBacktrack - 1);
|
||||
redisCache.setCacheObject(redisKey, emptyProgressToSave, 30, TimeUnit.DAYS);
|
||||
log.info("本批无数据,已推进进度: nextStartRow={}, Redis 进度={}", emptyNextStart, emptyProgressToSave);
|
||||
log.info("本批无数据,下次起始行: {}(不再保存进度,下次从接口获取 rowCount)", emptyNextStart);
|
||||
|
||||
JSONObject result = new JSONObject();
|
||||
result.put("startRow", startRow);
|
||||
result.put("endRow", effectiveEndRow);
|
||||
result.put("lastMaxRow", lastMaxRow);
|
||||
result.put("lastMaxRow", null); // 不再使用 Redis 存储,改为从接口获取 rowCount
|
||||
result.put("nextStartRow", emptyNextStart);
|
||||
result.put("rowTotal", rowTotal > 0 ? rowTotal : null);
|
||||
result.put("currentMaxRow", emptyCurrentMax);
|
||||
@@ -1776,29 +1740,24 @@ public class TencentDocController extends BaseController {
|
||||
successUpdates, skippedCount, errorCount, maxSuccessRow > 0 ? maxSuccessRow : "无");
|
||||
}
|
||||
|
||||
// 完美推送回溯:根据本次扫描/写入结果决定下次起始行与 Redis 进度
|
||||
// 根据本次扫描/写入结果计算下次起始行(不再写入 Redis,下次从接口获取 rowCount 决定范围)
|
||||
// currentMaxRow = 本批实际涉及的最大行(有写入用最大成功行,否则用本批扫描终点)
|
||||
int currentMaxRow = (maxSuccessRow > 0) ? maxSuccessRow : effectiveEndRow;
|
||||
// 整批都跳过时用较小回溯,尽快扫到后续行(如 308);有写入时用完整回溯覆盖物流变更
|
||||
// 整批都跳过时用较小回溯,尽快扫到后续行;有写入时用完整回溯覆盖物流变更
|
||||
int backtrack = (successUpdates > 0) ? BACKTRACK_ROWS : BACKTRACK_ROWS_WHEN_ALL_SKIPPED;
|
||||
int nextStartRow = Math.max(effectiveStartRow,
|
||||
Math.max(startRow + MIN_ADVANCE_ROWS, currentMaxRow + 1 - backtrack));
|
||||
// 进度写入 Redis:至少推进到 (startRow + 最小前进 + 回溯 - 1),保证下次不会重复扫同一段
|
||||
int progressToSave = Math.max(currentMaxRow, startRow + MIN_ADVANCE_ROWS + backtrack - 1);
|
||||
redisCache.setCacheObject(redisKey, progressToSave, 30, TimeUnit.DAYS);
|
||||
|
||||
String nextSyncHint = String.format(
|
||||
"下次将从第 %d 行开始(本批最大行: %d,回溯 %d 行;已写入 %d 条,跳过 %d 条)",
|
||||
"下次将从第 %d 行开始(本批最大行: %d,回溯 %d 行;已写入 %d 条,跳过 %d 条;下次从接口获取 rowCount)",
|
||||
nextStartRow, currentMaxRow, backtrack, successUpdates, skippedCount);
|
||||
log.info("========== 更新Redis进度 ==========");
|
||||
log.info("本批范围: {}~{},实际最大行: {},下次起始: {},Redis 进度: {}", startRow, effectiveEndRow, currentMaxRow, nextStartRow, progressToSave);
|
||||
log.info("Redis Key: {}", redisKey);
|
||||
log.info("本批范围: {}~{},实际最大行: {},下次起始: {}(不再保存 Redis 进度)", startRow, effectiveEndRow, currentMaxRow, nextStartRow);
|
||||
|
||||
JSONObject result = new JSONObject();
|
||||
result.put("startRow", startRow);
|
||||
result.put("endRow", effectiveEndRow);
|
||||
result.put("currentMaxRow", currentMaxRow);
|
||||
result.put("lastMaxRow", lastMaxRow);
|
||||
result.put("lastMaxRow", null); // 不再使用 Redis 存储,改为从接口获取 rowCount
|
||||
result.put("nextStartRow", nextStartRow);
|
||||
result.put("rowTotal", rowTotal > 0 ? rowTotal : null);
|
||||
result.put("filledCount", filledCount);
|
||||
|
||||
@@ -99,12 +99,13 @@ public interface ITencentDocService {
|
||||
JSONObject getSheetList(String accessToken, String fileId);
|
||||
|
||||
/**
|
||||
* 获取指定工作表的 rowTotal(文档信息接口返回,直接用于 range 上限)
|
||||
* 获取指定工作表的行数(优先返回 rowCount-实际有数据的行数,其次 rowTotal)
|
||||
* 用于替代 Redis 上次记录的行数,决定批量填充的操作范围
|
||||
*
|
||||
* @param accessToken 访问令牌
|
||||
* @param fileId 文件ID
|
||||
* @param sheetId 工作表ID
|
||||
* @return rowTotal,未找到或解析失败返回 0
|
||||
* @return rowCount/rowTotal,未找到或解析失败返回 0
|
||||
*/
|
||||
int getSheetRowTotal(String accessToken, String fileId, String sheetId);
|
||||
|
||||
|
||||
@@ -349,42 +349,7 @@ public class TencentDocDelayedPushServiceImpl implements ITencentDocDelayedPushS
|
||||
Object result = method.invoke(controller, params);
|
||||
|
||||
log.info("✓ 批量同步执行完成,结果: {}", result);
|
||||
|
||||
// 将接口返回的 nextStartRow 写回 Redis,下次定时执行从新起始行开始(否则会一直从配置的 99 开始)
|
||||
if (result instanceof java.util.Map) {
|
||||
@SuppressWarnings("unchecked")
|
||||
java.util.Map<String, Object> resultMap = (java.util.Map<String, Object>) result;
|
||||
Object data = resultMap.get("data");
|
||||
if (data instanceof java.util.Map) {
|
||||
@SuppressWarnings("unchecked")
|
||||
java.util.Map<String, Object> dataMap = (java.util.Map<String, Object>) data;
|
||||
Object nextStartRowObj = dataMap.get("nextStartRow");
|
||||
if (nextStartRowObj != null) {
|
||||
int nextStartRow;
|
||||
if (nextStartRowObj instanceof Number) {
|
||||
nextStartRow = ((Number) nextStartRowObj).intValue();
|
||||
} else {
|
||||
try {
|
||||
nextStartRow = Integer.parseInt(nextStartRowObj.toString());
|
||||
} catch (NumberFormatException e) {
|
||||
nextStartRow = -1;
|
||||
}
|
||||
}
|
||||
if (nextStartRow >= 1) {
|
||||
// 若 nextStartRow 超出 rowTotal, capped 避免下次仍超出表尾
|
||||
Object rowTotalObj = dataMap.get("rowTotal");
|
||||
int rowTotal = rowTotalObj instanceof Number ? ((Number) rowTotalObj).intValue() : 0;
|
||||
int toWrite = nextStartRow;
|
||||
if (rowTotal > 0 && nextStartRow > rowTotal) {
|
||||
toWrite = Math.max(MIN_START_ROW_WHEN_USE_ROW_TOTAL, rowTotal - READ_ROWS_WHEN_USE_ROW_TOTAL);
|
||||
log.info("nextStartRow={} 超出 rowTotal={},capped 为 {}", nextStartRow, rowTotal, toWrite);
|
||||
}
|
||||
redisCache.setCacheObject(CONFIG_KEY_PREFIX + "startRow", toWrite, 180, TimeUnit.DAYS);
|
||||
log.info("✓ 已更新下次起始行到 Redis: startRow={}(下次定时同步从第 {} 行开始)", toWrite, toWrite);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// 不再将 nextStartRow 写入 Redis,下次定时执行时从接口获取 rowCount 决定范围
|
||||
|
||||
} catch (Exception ex) {
|
||||
log.error("批量同步调用失败", ex);
|
||||
|
||||
@@ -603,17 +603,17 @@ public class TencentDocServiceImpl implements ITencentDocService {
|
||||
if (sid == null || !sheetId.equals(sid)) {
|
||||
continue;
|
||||
}
|
||||
// rowTotal 可能为 rowTotal、row_total 或 rowCount(腾讯文档 API 多种返回格式)
|
||||
// 优先使用 rowCount(表格实际有数据的行数,用于操作范围),其次 rowTotal(表格容量上限)
|
||||
if (props.containsKey("rowCount")) {
|
||||
return Math.max(0, props.getIntValue("rowCount"));
|
||||
}
|
||||
if (props.containsKey("rowTotal")) {
|
||||
return Math.max(0, props.getIntValue("rowTotal"));
|
||||
}
|
||||
if (props.containsKey("row_total")) {
|
||||
return Math.max(0, props.getIntValue("row_total"));
|
||||
}
|
||||
if (props.containsKey("rowCount")) {
|
||||
return Math.max(0, props.getIntValue("rowCount"));
|
||||
}
|
||||
log.debug("getSheetRowTotal 找到 sheetId 但无 rowTotal/rowCount - fileId: {}, sheetId: {}, propsKeys: {}",
|
||||
log.debug("getSheetRowTotal 找到 sheetId 但无 rowCount/rowTotal - fileId: {}, sheetId: {}, propsKeys: {}",
|
||||
fileId, sheetId, props.keySet());
|
||||
}
|
||||
return 0;
|
||||
|
||||
Reference in New Issue
Block a user