This commit is contained in:
van
2026-04-30 17:35:50 +08:00
parent a88600788a
commit 5da74a155c
3 changed files with 105 additions and 0 deletions

View File

@@ -239,6 +239,11 @@ jarvis:
# wait_reply 时服务端会等多条 Bot 回复,宜适当加大
read-timeout-ms: 120000
wait-reply: true
# 多台企微线程同时触发时串行调用 tg_bridge排队超过该毫秒则提示「正忙」0 表示一直等到上一条结束)
lock-acquire-timeout-ms: 180000
# 连续失败后熔断,不再发起 HTTP与 tg_bridge 侧熔断互不替代)
circuit-failure-threshold: 5
circuit-open-ms: 120000
# reply_take_nth仅「开」用 2「慢开」由 tg_bridge reply_adaptive_skip_middle_ad 在 2/3 条间自适应
# Ollama 大模型服务(监控健康度调试用)
ollama:

View File

@@ -228,6 +228,9 @@ jarvis:
connect-timeout-ms: 8000
read-timeout-ms: 120000
wait-reply: true
lock-acquire-timeout-ms: 180000
circuit-failure-threshold: 5
circuit-open-ms: 120000
# 「开」取第 2 条;「慢开」由桥接自适应第 2/3 条
# Ollama 大模型服务(监控健康度调试用)
ollama:

View File

@@ -10,10 +10,15 @@ import org.springframework.util.StringUtils;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -80,6 +85,24 @@ public class OpenPhoneForwardService {
@Value("${jarvis.phone-forward.wait-reply:true}")
private boolean waitReply;
/** 与 tg_bridge 串行:多线程同时「开」时排队,避免 Python 端会话串话0 表示无限等待 */
@Value("${jarvis.phone-forward.lock-acquire-timeout-ms:180000}")
private long lockAcquireTimeoutMs;
/** 连续失败达到阈值后,在 openDurationMs 内直接拒绝调用(不发起 HTTP */
@Value("${jarvis.phone-forward.circuit-failure-threshold:5}")
private int circuitFailureThreshold;
@Value("${jarvis.phone-forward.circuit-open-ms:120000}")
private long circuitOpenMs;
/** 仅允许单飞:所有 phone-forward 请求串行 */
private final ReentrantLock tgBridgeCallLock = new ReentrantLock(true);
private final AtomicInteger circuitFailureCount = new AtomicInteger(0);
/** 熔断恢复时间epoch ms0 表示未熔断 */
private final AtomicLong circuitOpenUntilMs = new AtomicLong(0);
/**
* @return 非 null 表示本条消息已由本服务处理含错误提示null 表示不匹配规则
*/
@@ -103,6 +126,37 @@ public class OpenPhoneForwardService {
return doForward(phone, bot);
}
private boolean isCircuitOpen() {
long until = circuitOpenUntilMs.get();
return until > 0L && System.currentTimeMillis() < until;
}
private void recordSuccess() {
circuitFailureCount.set(0);
circuitOpenUntilMs.set(0L);
}
/** HTTP/网络类失败、5xx、超时记入达到阈值则熔断一段时间 */
private void recordFailure() {
long now = System.currentTimeMillis();
long until = circuitOpenUntilMs.get();
if (until > now) {
return;
}
int n = circuitFailureCount.incrementAndGet();
int thr = Math.max(1, circuitFailureThreshold);
if (n >= thr) {
long openUntil = now + Math.max(1000L, circuitOpenMs);
circuitOpenUntilMs.set(openUntil);
circuitFailureCount.set(0);
log.warn("phone-forward 熔断开启至 epochMs={}(连续失败 ≥ {}", openUntil, thr);
}
}
private static boolean shouldTripCircuit(int httpCode) {
return httpCode == 504 || httpCode >= 500;
}
private static String extractFirstMobile11(String text) {
Matcher m = MOBILE_11.matcher(text);
if (m.find()) {
@@ -112,6 +166,38 @@ public class OpenPhoneForwardService {
}
private String doForward(String phone, String bot) {
if (isCircuitOpen()) {
log.warn("phone-forward 熔断拒绝 phone={} bot={}", phone, bot);
return "「转发服务」暂时不可用(连续失败保护中),请一分钟后再试。";
}
boolean locked = false;
try {
if (lockAcquireTimeoutMs <= 0) {
tgBridgeCallLock.lock();
locked = true;
} else {
locked = tgBridgeCallLock.tryLock(lockAcquireTimeoutMs, TimeUnit.MILLISECONDS);
if (!locked) {
log.warn(
"phone-forward 排队超时上一条未完成phone={} bot={} ms={}",
phone, bot, lockAcquireTimeoutMs);
return "「转发服务」正忙(上一条查询尚未结束),请稍后再试。";
}
}
return doForwardUnsynchronized(phone, bot);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("phone-forward 获取锁被打断 phone={} bot={}", phone, bot);
return "「转发服务」被中断,请稍后再试。";
} finally {
if (locked) {
tgBridgeCallLock.unlock();
}
}
}
private String doForwardUnsynchronized(String phone, String bot) {
try {
String base = baseUrl.trim();
if (base.endsWith("/")) {
@@ -156,27 +242,38 @@ public class OpenPhoneForwardService {
String resp = readAll(is);
if (code < 200 || code >= 300) {
log.warn("phone-forward HTTP {} url={} body={}", code, urlStr, resp);
if (shouldTripCircuit(code)) {
recordFailure();
}
return "「转发服务」请求失败HTTP " + code + "),请稍后再试。";
}
JSONObject jo = JSONObject.parseObject(resp);
if (jo == null) {
recordFailure();
return "「转发服务」返回异常,请稍后再试。";
}
String reply = jo.getString("reply_text");
if (!StringUtils.hasText(reply)) {
recordFailure();
return "「转发服务」未返回 reply_text。";
}
if (BOT_SLOW_OPEN.equals(bot)) {
reply = filterQingBaoAdLines(reply);
}
recordSuccess();
return reply;
} finally {
if (conn != null) {
conn.disconnect();
}
}
} catch (SocketTimeoutException e) {
log.warn("phone-forward 超时 phone={} bot={} err={}", phone, bot, e.toString());
recordFailure();
return "「转发服务」超时,请稍后再试。";
} catch (Exception e) {
log.warn("phone-forward 异常 phone={} bot={} err={}", phone, bot, e.toString());
recordFailure();
return "「转发服务」连接失败,请确认 Jarvis 与局域网服务可达。";
}
}