分开京东拉订单的调度类

This commit is contained in:
Leo
2025-03-16 17:09:37 +08:00
parent a0d185da73
commit 85ba779a1c
2 changed files with 434 additions and 391 deletions

View File

@@ -0,0 +1,431 @@
package cn.van.business.util;
import cn.van.business.model.jd.OrderRow;
import cn.van.business.repository.OrderRowRepository;
import cn.van.business.repository.ProductOrderRepository;
import cn.van.business.util.jdReq.*;
import com.alibaba.fastjson2.util.DateUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jd.open.api.sdk.DefaultJdClient;
import com.jd.open.api.sdk.JdClient;
import com.jd.open.api.sdk.domain.kplunion.OrderService.request.query.OrderRowReq;
import com.jd.open.api.sdk.domain.kplunion.OrderService.response.query.OrderRowResp;
import com.jd.open.api.sdk.request.kplunion.UnionOpenOrderRowQueryRequest;
import com.jd.open.api.sdk.response.kplunion.UnionOpenOrderRowQueryResponse;
import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import static cn.van.business.util.JDUtil.DATE_TIME_FORMATTER;
import static cn.van.business.util.WXUtil.super_admins;
/**
* @author Leo
* @version 1.0
* @create 2025/3/16 17:01
* @description
*/
@Component
public class JDScheduleJob {
private static final Logger logger = LoggerFactory.getLogger(JDScheduleJob.class);
private static final String SERVER_URL = "https://api.jd.com/routerjson";
//accessToken
private static final String ACCESS_TOKEN = "";
// 标记是否拉取过小时的订单空订单会set 一个 tag避免重复拉取
private static final String JD_REFRESH_TAG = "jd:refresh:tag:";
private final StringRedisTemplate redisTemplate;
private final OrderRowRepository orderRowRepository;
private final OrderUtil orderUtil;
@Getter
@Value("${isRunning.wx}")
private String isRunning_wx;
@Getter
@Value("${isRunning.jd}")
private String isRunning_jd;
// 构造函数中注入StringRedisTemplate
@Autowired
public JDScheduleJob(StringRedisTemplate redisTemplate, ProductOrderRepository productOrderRepository, OrderRowRepository orderRowRepository, WXUtil wxUtil, OrderUtil orderUtil) {
this.redisTemplate = redisTemplate;
this.orderRowRepository = orderRowRepository;
this.orderUtil = orderUtil;
}
/**
* 将 响应参数转化为 OrderRow并返回
*/
private OrderRow createOrderRow(OrderRowResp orderRowResp) {
OrderRow orderRow = new OrderRow();
orderRow.setOrderId(orderRowResp.getOrderId());
orderRow.setSkuId(orderRowResp.getSkuId());
orderRow.setSkuName(orderRowResp.getSkuName());
orderRow.setItemId(orderRowResp.getItemId());
orderRow.setSkuNum(orderRowResp.getSkuNum());
orderRow.setPrice(orderRowResp.getPrice());
orderRow.setActualCosPrice(orderRowResp.getActualCosPrice());
orderRow.setActualFee(orderRowResp.getActualFee());
orderRow.setEstimateCosPrice(orderRowResp.getEstimateCosPrice());
orderRow.setEstimateFee(orderRowResp.getEstimateFee());
orderRow.setSubSideRate(orderRowResp.getSubSideRate());
orderRow.setSubsidyRate(orderRowResp.getSubsidyRate());
orderRow.setCommissionRate(orderRowResp.getCommissionRate());
orderRow.setFinalRate(orderRowResp.getFinalRate());
orderRow.setOrderTime(DateUtils.parseDate(orderRowResp.getOrderTime()));
orderRow.setFinishTime(DateUtils.parseDate(orderRowResp.getFinishTime()));
orderRow.setOrderTag(orderRowResp.getOrderTag());
orderRow.setOrderEmt(orderRowResp.getOrderEmt());
orderRow.setUnionId(orderRowResp.getUnionId());
orderRow.setUnionRole(orderRowResp.getUnionRole());
orderRow.setUnionAlias(orderRowResp.getUnionAlias());
orderRow.setUnionTag(orderRowResp.getUnionTag());
orderRow.setTraceType(orderRowResp.getTraceType());
orderRow.setValidCode(orderRowResp.getValidCode());
orderRow.setPayMonth(orderRowResp.getPayMonth());
orderRow.setSiteId(orderRowResp.getSiteId());
orderRow.setParentId(orderRowResp.getParentId());
//GoodsInfo goodsInfo = orderRowResp.getGoodsInfo();
//GoodsInfoVO goodsInfoVO = new GoodsInfoVO();
//goodsInfoVO.setShopId(String.valueOf(goodsInfo.getShopId()));
//goodsInfoVO.setShopName(goodsInfo.getShopName());
//goodsInfoVO.setOwner(goodsInfo.getOwner());
//goodsInfoVO.setProductId(String.valueOf(goodsInfo.getProductId()));
//goodsInfoVO.setImageUrl(goodsInfo.getImageUrl());
//orderRow.setGoodsInfo(goodsInfoVO);
orderRow.setCallerItemId(orderRowResp.getCallerItemId());
orderRow.setPid(orderRowResp.getPid());
orderRow.setCid1(orderRowResp.getCid1());
orderRow.setCid2(orderRowResp.getCid2());
orderRow.setCid3(orderRowResp.getCid3());
orderRow.setChannelId(orderRowResp.getChannelId());
orderRow.setProPriceAmount(orderRowResp.getProPriceAmount());
orderRow.setSkuFrozenNum(orderRowResp.getSkuFrozenNum());
orderRow.setSkuReturnNum(orderRowResp.getSkuReturnNum());
orderRow.setSkuTag(orderRowResp.getSkuTag());
orderRow.setPositionId(orderRowResp.getPositionId());
orderRow.setPopId(orderRowResp.getPopId());
orderRow.setRid(orderRowResp.getRid());
orderRow.setPlus(orderRowResp.getPlus());
orderRow.setCpActId(orderRowResp.getCpActId());
orderRow.setGiftCouponKey(orderRowResp.getGiftCouponKey());
orderRow.setModifyTime(new Date());
orderRow.setSign(orderRowResp.getSign());
orderRow.setBalanceExt(orderRowResp.getBalanceExt());
orderRow.setExpressStatus(orderRowResp.getExpressStatus());
orderRow.setExt1(orderRowResp.getExt1());
orderRow.setSubUnionId(orderRowResp.getSubUnionId());
orderRow.setGiftCouponOcsAmount(orderRowResp.getGiftCouponOcsAmount());
orderRow.setTraceType(orderRowResp.getTraceType());
orderRow.setExpressStatus(orderRowResp.getExpressStatus());
orderRow.setTraceType(orderRowResp.getTraceType());
orderRow.setId(orderRowResp.getId());
orderRow.setValidCode(orderRowResp.getValidCode());
orderRow.setExpressStatus(orderRowResp.getExpressStatus());
orderRow.setTraceType(orderRowResp.getTraceType());
return orderRow;
}
/**
* 根据指定的日期时间拉取订单
*
* @param startTime 开始时间
* isRealTime 是否是实时订单 是的话不会判断是否拉取过
* page 分页页码
* isMinutes 是否是分钟级订单 分钟的每次加10分钟小时每小时加1小时
*/
public UnionOpenOrderRowQueryResponse fetchOrdersForDateTime(LocalDateTime startTime, boolean isRealTime, Integer page, boolean isMinutes, String appKey, String secretKey) {
LocalDateTime endTime = isMinutes ? startTime.plusMinutes(10) : startTime.plusHours(1);
String hourMinuteTag = isRealTime ? "minute" : "hour";
//String oldTimeTag = JD_REFRESH_TAG + startTime.format(DATE_TIME_FORMATTER);
String newTimeTag = JD_REFRESH_TAG + appKey + ":" + startTime.format(DATE_TIME_FORMATTER);
HashOperations<String, String, String> hashOps = redisTemplate.opsForHash();
// 检查这个小时或分钟是否已经被处理过
if (hashOps.hasKey(newTimeTag, hourMinuteTag)) {
if (!isMinutes) {
return null;
}
}
// 调用 API 以拉取订单
try {
UnionOpenOrderRowQueryResponse unionOpenOrderRowQueryResponse = getUnionOpenOrderRowQueryResponse(startTime, endTime, page, appKey, secretKey);
// 历史的订单才进行标记为已拉取,小时分钟的都进行拉取并且标记
if (!isRealTime) {
// 只有没有订单的才进行标记为已拉取
if (unionOpenOrderRowQueryResponse != null && unionOpenOrderRowQueryResponse.getQueryResult() != null && unionOpenOrderRowQueryResponse.getQueryResult().getData() == null) {
hashOps.put(newTimeTag, hourMinuteTag, "done");
}
}
// 打印方法调用和开始结束时间
if (isRealTime && (LocalDateTime.now().getMinute() % 10 == 0)) {
logger.debug(" {} --- 拉取订单, 分钟还是秒 {} , 开始时间:{} --- 结束时间:{}", appKey.substring(appKey.length() - 4), hourMinuteTag, startTime.format(DATE_TIME_FORMATTER), endTime.format(DATE_TIME_FORMATTER));
}
return unionOpenOrderRowQueryResponse;
} catch (Exception e) {
return null;
}
}
// 响应校验方法
private boolean isValidResponse(UnionOpenOrderRowQueryResponse response) {
return response != null && response.getQueryResult() != null && response.getQueryResult().getCode() == 200 && response.getQueryResult().getData() != null;
}
// 订单处理方法
private void processOrderResponse(UnionOpenOrderRowQueryResponse response, WXUtil.SuperAdmin admin) {
Arrays.stream(response.getQueryResult().getData()).parallel().map(this::createOrderRow).forEach(orderRowRepository::save);
}
public int fetchOrders(OrderFetchStrategy strategy, String appKey, String secretKey) {
TimeRange range = strategy.calculateRange(LocalDateTime.now());
int count = 0;
// 复用原有的抓取逻辑
LocalDateTime current = range.getStart();
while (!current.isAfter(range.getEnd())) {
// 调用分页抓取API...
Integer pageIndex = 1;
boolean hasMore = true;
while (hasMore) {
try {
// 30-60 天 ,非实时,非分钟
UnionOpenOrderRowQueryResponse response = fetchOrdersForDateTime(current, false, pageIndex, false, appKey, secretKey);
if (response != null && response.getQueryResult() != null) {
if (response.getQueryResult().getCode() == 200) {
OrderRowResp[] orderRowResps = response.getQueryResult().getData();
if (orderRowResps != null) {
for (OrderRowResp orderRowResp : orderRowResps) {
if (orderRowResp != null) { // Check each orderRowResp is not null
OrderRow orderRow = createOrderRow(orderRowResp);
// Ensure orderRow is not null after creation
orderRowRepository.save(orderRow);
count++;
}
}
}
hasMore = Boolean.TRUE.equals(response.getQueryResult().getHasMore());
} else {
hasMore = false;
}
} else {
hasMore = false;
}
} catch (Exception e) {
hasMore = false; // Optionally break out of the while loop if required
}
if (hasMore) pageIndex++;
}
current = current.plusHours(1);
}
return count;
}
/**
* 实时刷新最近10分钟的订单Resilience4j限流集成
*/
@Scheduled(cron = "0 * * * * ?")
public void fetchLatestOrder() {
if (isRunning_jd.equals("true")) {
LocalDateTime now = LocalDateTime.now();
LocalDateTime startTime = now.minusMinutes(10).withSecond(0).withNano(0);
super_admins.values().parallelStream().forEach(admin -> {
if (Util.isAnyEmpty(admin.getAppKey(), admin.getSecretKey())) return;
try {
UnionOpenOrderRowQueryResponse response = fetchOrdersForDateTime(startTime, true, 1, true, admin.getAppKey(), admin.getSecretKey());
if (isValidResponse(response)) {
processOrderResponse(response, admin);
}
} catch (JDUtil.RateLimitExceededException e) {
logger.warn("[限流] {} 请求频率受限", admin.getName());
} catch (Exception e) {
logger.error("{} 订单抓取异常: {}", admin.getName(), e.getMessage());
}
});
}
}
/**
* 扫描订单发送到微信
* 每分钟的30秒执行一次
*/
@Scheduled(cron = "3 * * * * ?")
public void sendOrderToWx() {
if (isRunning_wx.equals("true")) {
//long start = System.currentTimeMillis();
int[] validCodes = {-1};
// 只要三个月的,更多的也刷新不出来的
Date threeMonthsAgo = Date.from(LocalDateTime.now().minusMonths(3).atZone(ZoneId.systemDefault()).toInstant());
List<OrderRow> orderRows = orderRowRepository.findByValidCodeNotInAndOrderTimeGreaterThanOrderByOrderTimeDesc(validCodes, threeMonthsAgo);
for (OrderRow orderRow : orderRows) {
orderUtil.orderToWx(orderRow, true);
}
//logger.info("扫描订单发送到微信耗时:{} ms, 订单数:{} ", System.currentTimeMillis() - start, orderRows.size());
}
}
/**
* 一天拉取三次 30天到60天前的订单
*/
@Scheduled(cron = "0 0 */4 * * ?")
public void fetchHistoricalOrders3090() {
if (isRunning_jd.equals("true")) {
try {
OrderFetchStrategy strategy = new Days3090Strategy();
for (WXUtil.SuperAdmin admin : super_admins.values()) {
try {
int count = fetchOrders(strategy, admin.getAppKey(), admin.getSecretKey());
logger.info("账号{} 3090订单拉取完成新增{}条", admin.getName(), count);
} catch (Exception e) {
logger.error("账号 {} 拉取异常: {}", admin.getName(), e.getMessage());
}
}
} catch (Exception ex) {
logger.error("策略执行异常", ex);
}
}
}
/**
* 一天拉取6次 14天到30天前的订单
*/
@Scheduled(cron = "0 0 * * * ?")
public void fetchHistoricalOrders1430() {
if (isRunning_jd.equals("true")) {
try {
OrderFetchStrategy strategy = new Days1430Strategy(); // 需补充Days1430Strategy实现
for (WXUtil.SuperAdmin admin : super_admins.values()) {
try {
int count = fetchOrders(strategy, admin.getAppKey(), admin.getSecretKey());
logger.info("账号{} 1430订单拉取完成新增{}条", admin.getName(), count);
} catch (Exception e) {
logger.error("账号 {} 拉取异常: {}", admin.getName(), e.getMessage());
}
}
} catch (Exception ex) {
logger.error("1430策略执行异常", ex);
}
}
}
/**
* 每10分钟拉取07-14天的订单
*/
@Scheduled(cron = "0 0 * * * ?")
public void fetchHistoricalOrders0714() {
if (isRunning_jd.equals("true")) {
try {
OrderFetchStrategy strategy = new Days0714Strategy();
super_admins.values().parallelStream().forEach(admin -> {
if (Util.isAnyEmpty(admin.getAppKey(), admin.getSecretKey())) return;
try {
int count = fetchOrders(strategy, admin.getAppKey(), admin.getSecretKey());
logger.info("账号{} 0714订单拉取完成新增{}条", admin.getName(), count);
} catch (Exception e) {
logger.error("账号 {} 0714拉取异常: {}", admin.getName(), e.getMessage());
}
});
} catch (Exception ex) {
logger.error("0714策略执行异常", ex);
}
}
}
@Scheduled(cron = "0 */5 * * * ?")
public void fetchHistoricalOrders0007() {
if (isRunning_jd.equals("true")) {
try {
OrderFetchStrategy strategy = new Days0007Strategy();
super_admins.values().parallelStream().forEach(admin -> {
if (Util.isAnyEmpty(admin.getAppKey(), admin.getSecretKey())) return;
try {
int count = fetchOrders(strategy, admin.getAppKey(), admin.getSecretKey());
logger.info("账号{} 0007订单拉取完成新增{}条", admin.getName(), count);
} catch (JDUtil.RateLimitExceededException e) {
logger.warn("[限流] {} 0007请求受限", admin.getName());
} catch (Exception e) {
logger.error("账号{}0007拉取异常: {}", admin.getName(), e.getMessage());
}
});
} catch (Exception ex) {
logger.error("0007策略执行异常", ex);
}
}
}
/**
* 获取订单列表
*
* @param start 开始时间
* @param end 结束时间
* @return
* @throws Exception
*/
public UnionOpenOrderRowQueryResponse getUnionOpenOrderRowQueryResponse(LocalDateTime start, LocalDateTime end, Integer pageIndex, String appKey, String secretKey) throws Exception {
String startTime = start.format(DATE_TIME_FORMATTER);
String endTime = end.format(DATE_TIME_FORMATTER);
// 模拟 API 调用
//System.out.println("调用API - 从 " + startTime
// + " 到 " + endTime);
// 实际的 API 调用逻辑应在这里进行
JdClient client = new DefaultJdClient(SERVER_URL, ACCESS_TOKEN, appKey, secretKey);
UnionOpenOrderRowQueryRequest request = new UnionOpenOrderRowQueryRequest();
OrderRowReq orderReq = new OrderRowReq();
orderReq.setPageIndex(pageIndex);
orderReq.setPageSize(200);
orderReq.setStartTime(startTime);
orderReq.setEndTime(endTime);
orderReq.setType(1);
request.setOrderReq(orderReq);
request.setVersion("1.0");
request.setSignmethod("md5");
// 时间戳格式为yyyy-MM-dd HH:mm:ss时区为GMT+8。API服务端允许客户端请求最大时间误差为10分钟
Date date = new Date();
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
request.setTimestamp(simpleDateFormat.format(date));
return client.execute(request);
}
}

View File

@@ -5,13 +5,10 @@ import cn.van.business.model.jd.OrderRow;
import cn.van.business.model.jd.ProductOrder;
import cn.van.business.repository.OrderRowRepository;
import cn.van.business.repository.ProductOrderRepository;
import cn.van.business.util.jdReq.*;
import com.alibaba.fastjson2.util.DateUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jd.open.api.sdk.DefaultJdClient;
import com.jd.open.api.sdk.JdClient;
import com.jd.open.api.sdk.domain.kplunion.OrderService.request.query.OrderRowReq;
import com.jd.open.api.sdk.domain.kplunion.OrderService.response.query.OrderRowResp;
import com.jd.open.api.sdk.domain.kplunion.promotionbysubunioni.PromotionService.request.get.PromotionCodeReq;
import com.jd.open.api.sdk.request.kplunion.UnionOpenOrderRowQueryRequest;
import com.jd.open.api.sdk.request.kplunion.UnionOpenPromotionBysubunionidGetRequest;
@@ -23,8 +20,6 @@ import lombok.Setter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@@ -68,18 +63,14 @@ public class JDUtil {
private static final String LL_SECRET_KEY_DG = "3ceddff403e544a8a2eacc727cf05dab";
/**
* 实际业务处理
*/
// 标记是否拉取过小时的订单空订单会set 一个 tag避免重复拉取
private static final String JD_REFRESH_TAG = "jd:refresh:tag:";
private static final String SERVER_URL = "https://api.jd.com/routerjson";
//accessToken
private static final String ACCESS_TOKEN = "";
private static final Logger logger = LoggerFactory.getLogger(JDUtil.class);
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final String INTERACTION_STATE_PREFIX = "interaction_state:";
private static final long TIMEOUT_MINUTES = 1;
private final StringRedisTemplate redisTemplate;
@@ -90,13 +81,7 @@ public class JDUtil {
// 添加ObjectMapper来序列化和反序列化UserInteractionState
private final ObjectMapper objectMapper = new ObjectMapper();
// Getter 方法,方便外部访问
@Getter
@Value("${isRunning.wx}")
private String isRunning_wx;
@Getter
@Value("${isRunning.jd}")
private String isRunning_jd;
// 构造函数中注入StringRedisTemplate
@@ -125,338 +110,6 @@ public class JDUtil {
return todayOrders.stream().filter(orderRow -> orderRow.getValidCode() == 13 || orderRow.getValidCode() == 25 || orderRow.getValidCode() == 26 || orderRow.getValidCode() == 27 || orderRow.getValidCode() == 28 || orderRow.getValidCode() == 29);
}
/**
* 将 响应参数转化为 OrderRow并返回
*/
private OrderRow createOrderRow(OrderRowResp orderRowResp) {
OrderRow orderRow = new OrderRow();
orderRow.setOrderId(orderRowResp.getOrderId());
orderRow.setSkuId(orderRowResp.getSkuId());
orderRow.setSkuName(orderRowResp.getSkuName());
orderRow.setItemId(orderRowResp.getItemId());
orderRow.setSkuNum(orderRowResp.getSkuNum());
orderRow.setPrice(orderRowResp.getPrice());
orderRow.setActualCosPrice(orderRowResp.getActualCosPrice());
orderRow.setActualFee(orderRowResp.getActualFee());
orderRow.setEstimateCosPrice(orderRowResp.getEstimateCosPrice());
orderRow.setEstimateFee(orderRowResp.getEstimateFee());
orderRow.setSubSideRate(orderRowResp.getSubSideRate());
orderRow.setSubsidyRate(orderRowResp.getSubsidyRate());
orderRow.setCommissionRate(orderRowResp.getCommissionRate());
orderRow.setFinalRate(orderRowResp.getFinalRate());
orderRow.setOrderTime(DateUtils.parseDate(orderRowResp.getOrderTime()));
orderRow.setFinishTime(DateUtils.parseDate(orderRowResp.getFinishTime()));
orderRow.setOrderTag(orderRowResp.getOrderTag());
orderRow.setOrderEmt(orderRowResp.getOrderEmt());
orderRow.setUnionId(orderRowResp.getUnionId());
orderRow.setUnionRole(orderRowResp.getUnionRole());
orderRow.setUnionAlias(orderRowResp.getUnionAlias());
orderRow.setUnionTag(orderRowResp.getUnionTag());
orderRow.setTraceType(orderRowResp.getTraceType());
orderRow.setValidCode(orderRowResp.getValidCode());
orderRow.setPayMonth(orderRowResp.getPayMonth());
orderRow.setSiteId(orderRowResp.getSiteId());
orderRow.setParentId(orderRowResp.getParentId());
//GoodsInfo goodsInfo = orderRowResp.getGoodsInfo();
//GoodsInfoVO goodsInfoVO = new GoodsInfoVO();
//goodsInfoVO.setShopId(String.valueOf(goodsInfo.getShopId()));
//goodsInfoVO.setShopName(goodsInfo.getShopName());
//goodsInfoVO.setOwner(goodsInfo.getOwner());
//goodsInfoVO.setProductId(String.valueOf(goodsInfo.getProductId()));
//goodsInfoVO.setImageUrl(goodsInfo.getImageUrl());
//orderRow.setGoodsInfo(goodsInfoVO);
orderRow.setCallerItemId(orderRowResp.getCallerItemId());
orderRow.setPid(orderRowResp.getPid());
orderRow.setCid1(orderRowResp.getCid1());
orderRow.setCid2(orderRowResp.getCid2());
orderRow.setCid3(orderRowResp.getCid3());
orderRow.setChannelId(orderRowResp.getChannelId());
orderRow.setProPriceAmount(orderRowResp.getProPriceAmount());
orderRow.setSkuFrozenNum(orderRowResp.getSkuFrozenNum());
orderRow.setSkuReturnNum(orderRowResp.getSkuReturnNum());
orderRow.setSkuTag(orderRowResp.getSkuTag());
orderRow.setPositionId(orderRowResp.getPositionId());
orderRow.setPopId(orderRowResp.getPopId());
orderRow.setRid(orderRowResp.getRid());
orderRow.setPlus(orderRowResp.getPlus());
orderRow.setCpActId(orderRowResp.getCpActId());
orderRow.setGiftCouponKey(orderRowResp.getGiftCouponKey());
orderRow.setModifyTime(new Date());
orderRow.setSign(orderRowResp.getSign());
orderRow.setBalanceExt(orderRowResp.getBalanceExt());
orderRow.setExpressStatus(orderRowResp.getExpressStatus());
orderRow.setExt1(orderRowResp.getExt1());
orderRow.setSubUnionId(orderRowResp.getSubUnionId());
orderRow.setGiftCouponOcsAmount(orderRowResp.getGiftCouponOcsAmount());
orderRow.setTraceType(orderRowResp.getTraceType());
orderRow.setExpressStatus(orderRowResp.getExpressStatus());
orderRow.setTraceType(orderRowResp.getTraceType());
orderRow.setId(orderRowResp.getId());
orderRow.setValidCode(orderRowResp.getValidCode());
orderRow.setExpressStatus(orderRowResp.getExpressStatus());
orderRow.setTraceType(orderRowResp.getTraceType());
return orderRow;
}
public int fetchOrders(OrderFetchStrategy strategy, String appKey, String secretKey) {
TimeRange range = strategy.calculateRange(LocalDateTime.now());
int count = 0;
// 复用原有的抓取逻辑
LocalDateTime current = range.getStart();
while (!current.isAfter(range.getEnd())) {
// 调用分页抓取API...
Integer pageIndex = 1;
boolean hasMore = true;
while (hasMore) {
try {
// 30-60 天 ,非实时,非分钟
UnionOpenOrderRowQueryResponse response = fetchOrdersForDateTime(current, false, pageIndex, false, appKey, secretKey);
if (response != null && response.getQueryResult() != null) {
if (response.getQueryResult().getCode() == 200) {
OrderRowResp[] orderRowResps = response.getQueryResult().getData();
if (orderRowResps != null) {
for (OrderRowResp orderRowResp : orderRowResps) {
if (orderRowResp != null) { // Check each orderRowResp is not null
OrderRow orderRow = createOrderRow(orderRowResp);
if (orderRow != null) { // Ensure orderRow is not null after creation
orderRowRepository.save(orderRow);
count++;
}
}
}
}
hasMore = Boolean.TRUE.equals(response.getQueryResult().getHasMore());
} else {
hasMore = false;
}
} else {
hasMore = false;
}
} catch (Exception e) {
hasMore = false; // Optionally break out of the while loop if required
}
if (hasMore) pageIndex++;
}
current = current.plusHours(1);
}
return count;
}
/**
* 实时刷新最近10分钟的订单Resilience4j限流集成
*/
@Scheduled(cron = "0 * * * * ?")
public void fetchLatestOrder() {
if (isRunning_jd.equals("true")) {
LocalDateTime now = LocalDateTime.now();
LocalDateTime startTime = now.minusMinutes(10).withSecond(0).withNano(0);
super_admins.values().parallelStream().forEach(admin -> {
if (Util.isAnyEmpty(admin.getAppKey(), admin.getSecretKey())) return;
try {
UnionOpenOrderRowQueryResponse response = fetchOrdersForDateTime(startTime, true, 1, true, admin.getAppKey(), admin.getSecretKey());
if (isValidResponse(response)) {
processOrderResponse(response, admin);
}
} catch (RateLimitExceededException e) {
logger.warn("[限流] {} 请求频率受限", admin.getName());
} catch (Exception e) {
logger.error("{} 订单抓取异常: {}", admin.getName(), e.getMessage());
}
});
}
}
// 响应校验方法
private boolean isValidResponse(UnionOpenOrderRowQueryResponse response) {
return response != null && response.getQueryResult() != null && response.getQueryResult().getCode() == 200 && response.getQueryResult().getData() != null;
}
// 订单处理方法
private void processOrderResponse(UnionOpenOrderRowQueryResponse response, WXUtil.SuperAdmin admin) {
Arrays.stream(response.getQueryResult().getData()).parallel().map(this::createOrderRow).forEach(orderRowRepository::save);
}
/**
* 扫描订单发送到微信
* 每分钟的30秒执行一次
*/
@Scheduled(cron = "3 * * * * ?")
public void sendOrderToWx() {
if (isRunning_wx.equals("true")) {
//long start = System.currentTimeMillis();
int[] validCodes = {-1};
// 只要三个月的,更多的也刷新不出来的
Date threeMonthsAgo = Date.from(LocalDateTime.now().minusMonths(3).atZone(ZoneId.systemDefault()).toInstant());
List<OrderRow> orderRows = orderRowRepository.findByValidCodeNotInAndOrderTimeGreaterThanOrderByOrderTimeDesc(validCodes, threeMonthsAgo);
for (OrderRow orderRow : orderRows) {
orderUtil.orderToWx(orderRow, true);
}
//logger.info("扫描订单发送到微信耗时:{} ms, 订单数:{} ", System.currentTimeMillis() - start, orderRows.size());
}
}
/**
* 一天拉取三次 30天到60天前的订单
*/
@Scheduled(cron = "0 0 */4 * * ?")
public void fetchHistoricalOrders3090() {
if (isRunning_jd.equals("true")) {
try {
OrderFetchStrategy strategy = new Days3090Strategy();
for (WXUtil.SuperAdmin admin : super_admins.values()) {
try {
int count = fetchOrders(strategy, admin.getAppKey(), admin.getSecretKey());
logger.info("账号{} 3090订单拉取完成新增{}条", admin.getName(), count);
} catch (Exception e) {
logger.error("账号 {} 拉取异常: {}", admin.getName(), e.getMessage());
}
}
} catch (Exception ex) {
logger.error("策略执行异常", ex);
}
}
}
/**
* 一天拉取6次 14天到30天前的订单
*/
@Scheduled(cron = "0 0 * * * ?")
public void fetchHistoricalOrders1430() {
if (isRunning_jd.equals("true")) {
try {
OrderFetchStrategy strategy = new Days1430Strategy(); // 需补充Days1430Strategy实现
for (WXUtil.SuperAdmin admin : super_admins.values()) {
try {
int count = fetchOrders(strategy, admin.getAppKey(), admin.getSecretKey());
logger.info("账号{} 1430订单拉取完成新增{}条", admin.getName(), count);
} catch (Exception e) {
logger.error("账号 {} 拉取异常: {}", admin.getName(), e.getMessage());
}
}
} catch (Exception ex) {
logger.error("1430策略执行异常", ex);
}
}
}
/**
* 每10分钟拉取07-14天的订单
*/
@Scheduled(cron = "0 0 * * * ?")
public void fetchHistoricalOrders0714() {
if (isRunning_jd.equals("true")) {
try {
OrderFetchStrategy strategy = new Days0714Strategy();
super_admins.values().parallelStream().forEach(admin -> {
if (Util.isAnyEmpty(admin.getAppKey(), admin.getSecretKey())) return;
try {
int count = fetchOrders(strategy, admin.getAppKey(), admin.getSecretKey());
logger.info("账号{} 0714订单拉取完成新增{}条", admin.getName(), count);
} catch (Exception e) {
logger.error("账号 {} 0714拉取异常: {}", admin.getName(), e.getMessage());
}
});
} catch (Exception ex) {
logger.error("0714策略执行异常", ex);
}
}
}
@Scheduled(cron = "0 */5 * * * ?")
public void fetchHistoricalOrders0007() {
if (isRunning_jd.equals("true")) {
try {
OrderFetchStrategy strategy = new Days0007Strategy();
super_admins.values().parallelStream().forEach(admin -> {
if (Util.isAnyEmpty(admin.getAppKey(), admin.getSecretKey())) return;
try {
int count = fetchOrders(strategy, admin.getAppKey(), admin.getSecretKey());
logger.info("账号{} 0007订单拉取完成新增{}条", admin.getName(), count);
} catch (RateLimitExceededException e) {
logger.warn("[限流] {} 0007请求受限", admin.getName());
} catch (Exception e) {
logger.error("账号{}0007拉取异常: {}", admin.getName(), e.getMessage());
}
});
} catch (Exception ex) {
logger.error("0007策略执行异常", ex);
}
}
}
/**
* 根据指定的日期时间拉取订单
*
* @param startTime 开始时间
* isRealTime 是否是实时订单 是的话不会判断是否拉取过
* page 分页页码
* isMinutes 是否是分钟级订单 分钟的每次加10分钟小时每小时加1小时
*/
public UnionOpenOrderRowQueryResponse fetchOrdersForDateTime(LocalDateTime startTime, boolean isRealTime, Integer page, boolean isMinutes, String appKey, String secretKey) {
LocalDateTime endTime = isMinutes ? startTime.plusMinutes(10) : startTime.plusHours(1);
String hourMinuteTag = isRealTime ? "minute" : "hour";
//String oldTimeTag = JD_REFRESH_TAG + startTime.format(DATE_TIME_FORMATTER);
String newTimeTag = JD_REFRESH_TAG + appKey + ":" + startTime.format(DATE_TIME_FORMATTER);
HashOperations<String, String, String> hashOps = redisTemplate.opsForHash();
// 检查这个小时或分钟是否已经被处理过
if (hashOps.hasKey(newTimeTag, hourMinuteTag)) {
if (!isMinutes) {
return null;
}
}
// 调用 API 以拉取订单
try {
UnionOpenOrderRowQueryResponse unionOpenOrderRowQueryResponse = getUnionOpenOrderRowQueryResponse(startTime, endTime, page, appKey, secretKey);
// 历史的订单才进行标记为已拉取,小时分钟的都进行拉取并且标记
if (!isRealTime) {
// 只有没有订单的才进行标记为已拉取
if (unionOpenOrderRowQueryResponse != null && unionOpenOrderRowQueryResponse.getQueryResult() != null && unionOpenOrderRowQueryResponse.getQueryResult().getData() == null) {
hashOps.put(newTimeTag, hourMinuteTag, "done");
}
}
// 打印方法调用和开始结束时间
if (isRealTime && (LocalDateTime.now().getMinute() % 10 == 0)) {
logger.debug(" {} --- 拉取订单, 分钟还是秒 {} , 开始时间:{} --- 结束时间:{}", appKey.substring(appKey.length() - 4), hourMinuteTag, startTime.format(DATE_TIME_FORMATTER), endTime.format(DATE_TIME_FORMATTER));
}
return unionOpenOrderRowQueryResponse;
} catch (Exception e) {
return null;
}
}
public void sendOrderToWxByOrderDefault(String order, String fromWxid) {
logger.info("执行 sendOrderToWxByOrderDefault 方法order: {}, fromWxid: {}", order, fromWxid);
handleUserInteraction(fromWxid, order);
@@ -665,47 +318,6 @@ public class JDUtil {
}
break;
}
case "刷新7天": {
content = new StringBuilder();
long start = System.currentTimeMillis();
int count = 0;
LocalDateTime startDate = LocalDateTime.now().minusDays(7).withMinute(0).withSecond(0).withNano(0);
LocalDateTime lastHour = LocalDateTime.now().minusHours(1).withMinute(0).withSecond(0).withNano(0);
String appKey = superAdmin.getAppKey();
String secretKey = superAdmin.getSecretKey();
if (Util.isAnyEmpty(appKey, secretKey)) {
return;
}
while (!startDate.isEqual(lastHour)) {
startDate = startDate.plusHours(1);
UnionOpenOrderRowQueryResponse response = fetchOrdersForDateTime(startDate, false, 1, false, appKey, secretKey);
if (response != null) {
int code = response.getQueryResult().getCode();
if (code == 200) {
if (response.getQueryResult().getCode() == 200) {
OrderRowResp[] orderRowResps = response.getQueryResult().getData();
if (orderRowResps == null) {
continue;
}
for (OrderRowResp orderRowResp : orderRowResps) {
// 固化到数据库
OrderRow orderRow = createOrderRow(orderRowResp);
// 订单号不存在就保存,存在就更新订单状态
orderRowRepository.save(orderRow);
count++;
}
}
}
}
}
content.append("刷新7天成功,耗时").append((System.currentTimeMillis() - start) / 1000).append("\r").append("刷新订单数:").append(count);
contents.add(content);
break;
}
default:
sendOrderToWxByOrderJDAdvanced(order, fromWxid);
}