This commit is contained in:
van
2026-05-09 23:38:10 +08:00
parent 7582d22d2a
commit 217ea3afdc
4 changed files with 119 additions and 31 deletions

View File

@@ -12,7 +12,7 @@ public class PhoneForwardActivePushImpl implements IPhoneForwardActivePush {
private WxSendWeComPushClient wxSendWeComPushClient; private WxSendWeComPushClient wxSendWeComPushClient;
@Override @Override
public void schedulePushChunks(String toUser, List<String> chunks) { public boolean schedulePushChunks(String toUser, List<String> chunks) {
wxSendWeComPushClient.scheduleActivePushes(toUser, chunks); return wxSendWeComPushClient.pushAfterPassiveDelaySync(toUser, chunks);
} }
} }

View File

@@ -13,6 +13,7 @@ import java.net.HttpURLConnection;
import java.net.URL; import java.net.URL;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@@ -33,29 +34,76 @@ public class WxSendWeComPushClient {
private String pushSecret; private String pushSecret;
/** /**
* 在被动回复返回后延迟再发,保证企微侧先出现首条被动消息 * 与 {@link #scheduleActivePushes(String, List)} 相同顺序与延迟,但在当前线程同步执行,并返回是否全部成功
* 供「开/慢开」异步 TG 查询结束后立刻知晓推送结果并打错误日志。
*/
public boolean pushAfterPassiveDelaySync(String toUser, List<String> contents) {
final String userId = toUser != null ? toUser.trim() : "";
final List<String> list = contents != null ? new ArrayList<>(contents) : Collections.emptyList();
if (!StringUtils.hasText(wxsendBaseUrl)) {
log.error("企微主动推送未执行:未配置 jarvis.wecom.wxsend-base-url用户会话中收不到查询结果或报错");
return false;
}
if (!StringUtils.hasText(pushSecret)) {
log.error("企微主动推送未执行:未配置 jarvis.wecom.push-secret");
return false;
}
if (!StringUtils.hasText(userId)) {
log.error("企微主动推送未执行:目标成员 UserID 为空");
return false;
}
if (list.isEmpty()) {
log.warn("企微主动推送未执行:推送内容为空 userId={}", userId);
return false;
}
try {
Thread.sleep(450);
String base = normalizeBase(wxsendBaseUrl);
String url = base + "/wecom/active-push";
boolean anySent = false;
boolean allOk = true;
for (String c : list) {
if (!StringUtils.hasText(c)) {
continue;
}
anySent = true;
if (!postJson(url, userId, c.trim())) {
allOk = false;
}
Thread.sleep(120);
}
if (!anySent) {
log.error("企微主动推送:无有效正文段 userId={}", userId);
return false;
}
if (!allOk) {
log.error("企微主动推送部分失败 userId={}(请检查 wxSend /wecom/active-push 与密钥)", userId);
}
return allOk;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("企微主动推送被中断 userId={}", userId, e);
return false;
} catch (Exception e) {
log.error("企微主动推送异常 userId={}", userId, e);
return false;
}
}
/**
* 在被动回复返回后再发,保证企微侧先出现首条被动消息。
*/ */
public void scheduleActivePushes(String toUser, List<String> contents) { public void scheduleActivePushes(String toUser, List<String> contents) {
if (!StringUtils.hasText(wxsendBaseUrl) || !StringUtils.hasText(pushSecret) final String userId = toUser != null ? toUser.trim() : "";
|| !StringUtils.hasText(toUser) || contents == null || contents.isEmpty()) { final List<String> list = contents != null ? new ArrayList<>(contents) : Collections.emptyList();
return;
}
final String userId = toUser.trim();
final List<String> list = new ArrayList<>(contents);
CompletableFuture.runAsync(() -> { CompletableFuture.runAsync(() -> {
try { boolean ok = pushAfterPassiveDelaySync(userId, list);
Thread.sleep(450); if (!ok) {
String base = normalizeBase(wxsendBaseUrl); log.error(
String url = base + "/wecom/active-push"; "scheduleActivePushes 未完全成功 userId={}(用户可能未收到会话内的后续分段)",
for (String c : list) { userId);
if (!StringUtils.hasText(c)) {
continue;
}
postJson(url, userId, c.trim());
Thread.sleep(120);
}
} catch (Exception e) {
log.warn("企微主动推送任务异常 userId={} msg={}", userId, e.toString());
} }
}); });
} }
@@ -68,7 +116,8 @@ public class WxSendWeComPushClient {
return b; return b;
} }
private void postJson(String url, String toUser, String content) { /** @return HTTP 2xx 且无异常时为 true */
private boolean postJson(String url, String toUser, String content) {
JSONObject body = new JSONObject(); JSONObject body = new JSONObject();
body.put("toUser", toUser); body.put("toUser", toUser);
body.put("content", content); body.put("content", content);
@@ -89,12 +138,14 @@ public class WxSendWeComPushClient {
InputStream is = code >= 200 && code < 300 ? conn.getInputStream() : conn.getErrorStream(); InputStream is = code >= 200 && code < 300 ? conn.getInputStream() : conn.getErrorStream();
String resp = readAll(is); String resp = readAll(is);
if (code < 200 || code >= 300) { if (code < 200 || code >= 300) {
log.warn("wxSend active-push HTTP {} body={}", code, resp); log.error("wxSend active-push HTTP {} url={} body={}", code, url, resp);
} else { return false;
log.debug("wxSend active-push OK http={} resp={}", code, resp);
} }
log.debug("wxSend active-push OK http={} resp={}", code, resp);
return true;
} catch (Exception e) { } catch (Exception e) {
log.warn("wxSend active-push 请求失败 url={} err={}", url, e.toString()); log.error("wxSend active-push 请求失败 url={} err={}", url, e.toString(), e);
return false;
} finally { } finally {
if (conn != null) { if (conn != null) {
conn.disconnect(); conn.disconnect();

View File

@@ -9,10 +9,11 @@ import java.util.List;
public interface IPhoneForwardActivePush { public interface IPhoneForwardActivePush {
/** /**
* 异步排队推送到企微成员 * 在被动回复已发出后,按序推送到企微成员(首条前短暂延迟,避免次序错乱)。
* *
* @param toUser 成员 UserIDFromUserName * @param toUser 成员 UserIDFromUserName
* @param chunks 每段不超过 UTF-8 2048 字节的正文 * @param chunks 每段不超过 UTF-8 2048 字节的正文
* @return {@code true} 表示配置齐全且每一段非空正文均收到 wxSend 2xx 响应
*/ */
void schedulePushChunks(String toUser, List<String> chunks); boolean schedulePushChunks(String toUser, List<String> chunks);
} }

View File

@@ -258,20 +258,56 @@ public class OpenPhoneForwardService {
private void runForwardAndPush(String toUser, String phone, String bot, String cKey) { private void runForwardAndPush(String toUser, String phone, String bot, String cKey) {
try { try {
log.info("phone-forward 异步 TG 查询开始 phone={} bot={} toUser={}", phone, bot, toUser);
String reply = doForward(phone, bot, dedupEnabled ? cKey : null); String reply = doForward(phone, bot, dedupEnabled ? cKey : null);
List<String> chunks = WeComUtf8ChunkUtil.splitUtf8Chunks(reply, WeComUtf8ChunkUtil.WE_COM_TEXT_MAX_UTF8_BYTES); List<String> chunks = WeComUtf8ChunkUtil.splitUtf8Chunks(reply, WeComUtf8ChunkUtil.WE_COM_TEXT_MAX_UTF8_BYTES);
chunks.removeIf(s -> !StringUtils.hasText(s)); chunks.removeIf(s -> !StringUtils.hasText(s));
if (chunks.isEmpty()) { if (chunks.isEmpty()) {
chunks = Collections.singletonList("(无返回内容)"); chunks = Collections.singletonList("(无返回内容)");
} }
phoneForwardActivePush.schedulePushChunks(toUser, chunks); boolean pushed = phoneForwardActivePush.schedulePushChunks(toUser, chunks);
if (!pushed) {
log.error(
"phone-forward 结果未能推送到企微 user={} phone={}(可能为 TG/转发错误或服务端返回的正文,请检查 wxSend 与 jarvis.wecom",
toUser, phone);
}
} catch (Exception e) { } catch (Exception e) {
log.warn("phone-forward 异步推送异常 phone={} err={}", phone, e.toString()); log.error("phone-forward 异步处理异常 phone={} user={}", phone, toUser, e);
pushForwardFailureNotice(toUser, userVisibleThrowableMessage(e));
} finally { } finally {
clearForwardInflight(cKey); clearForwardInflight(cKey);
} }
} }
private void pushForwardFailureNotice(String toUser, String line) {
if (phoneForwardActivePush == null || !StringUtils.hasText(toUser)) {
return;
}
try {
List<String> parts = WeComUtf8ChunkUtil.splitUtf8Chunks(line, WeComUtf8ChunkUtil.WE_COM_TEXT_MAX_UTF8_BYTES);
if (parts.isEmpty()) {
parts = Collections.singletonList("「转发服务」异常,请稍后重试。");
}
boolean ok = phoneForwardActivePush.schedulePushChunks(toUser.trim(), parts);
if (!ok) {
log.error("phone-forward 异常说明未能推送到企微 user={}", toUser);
}
} catch (Exception e2) {
log.warn("phone-forward 异常说明推送过程失败 err={}", e2.toString());
}
}
private static String userVisibleThrowableMessage(Throwable e) {
String m = e.getMessage();
if (!StringUtils.hasText(m)) {
m = e.getClass().getSimpleName();
}
if (m.length() > 500) {
m = m.substring(0, 500) + "";
}
return "「转发服务」异常:" + m + "。请稍后重试。";
}
private String dedupGet(String key) { private String dedupGet(String key) {
if (!dedupEnabled || key == null) { if (!dedupEnabled || key == null) {
return null; return null;