From 5da74a155cdd7d5427c320664788b61823e6a281 Mon Sep 17 00:00:00 2001 From: van Date: Thu, 30 Apr 2026 17:35:50 +0800 Subject: [PATCH] 1 --- .../src/main/resources/application-dev.yml | 5 + .../src/main/resources/application-prod.yml | 3 + .../service/impl/OpenPhoneForwardService.java | 97 +++++++++++++++++++ 3 files changed, 105 insertions(+) diff --git a/ruoyi-admin/src/main/resources/application-dev.yml b/ruoyi-admin/src/main/resources/application-dev.yml index 7ee6b5b..b580ad1 100644 --- a/ruoyi-admin/src/main/resources/application-dev.yml +++ b/ruoyi-admin/src/main/resources/application-dev.yml @@ -239,6 +239,11 @@ jarvis: # wait_reply 时服务端会等多条 Bot 回复,宜适当加大 read-timeout-ms: 120000 wait-reply: true + # 多台企微线程同时触发时串行调用 tg_bridge;排队超过该毫秒则提示「正忙」(0 表示一直等到上一条结束) + lock-acquire-timeout-ms: 180000 + # 连续失败后熔断,不再发起 HTTP(与 tg_bridge 侧熔断互不替代) + circuit-failure-threshold: 5 + circuit-open-ms: 120000 # reply_take_nth:仅「开」用 2;「慢开」由 tg_bridge reply_adaptive_skip_middle_ad 在 2/3 条间自适应 # Ollama 大模型服务(监控健康度调试用) ollama: diff --git a/ruoyi-admin/src/main/resources/application-prod.yml b/ruoyi-admin/src/main/resources/application-prod.yml index 49e5257..2d505b2 100644 --- a/ruoyi-admin/src/main/resources/application-prod.yml +++ b/ruoyi-admin/src/main/resources/application-prod.yml @@ -228,6 +228,9 @@ jarvis: connect-timeout-ms: 8000 read-timeout-ms: 120000 wait-reply: true + lock-acquire-timeout-ms: 180000 + circuit-failure-threshold: 5 + circuit-open-ms: 120000 # 「开」取第 2 条;「慢开」由桥接自适应第 2/3 条 # Ollama 大模型服务(监控健康度调试用) ollama: diff --git a/ruoyi-system/src/main/java/com/ruoyi/jarvis/service/impl/OpenPhoneForwardService.java b/ruoyi-system/src/main/java/com/ruoyi/jarvis/service/impl/OpenPhoneForwardService.java index 5664913..08d314d 100644 --- a/ruoyi-system/src/main/java/com/ruoyi/jarvis/service/impl/OpenPhoneForwardService.java +++ b/ruoyi-system/src/main/java/com/ruoyi/jarvis/service/impl/OpenPhoneForwardService.java @@ -10,10 +10,15 @@ import org.springframework.util.StringUtils; import java.io.InputStream; import java.io.OutputStream; import java.net.HttpURLConnection; +import java.net.SocketTimeoutException; import java.net.URL; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -80,6 +85,24 @@ public class OpenPhoneForwardService { @Value("${jarvis.phone-forward.wait-reply:true}") private boolean waitReply; + /** 与 tg_bridge 串行:多线程同时「开」时排队,避免 Python 端会话串话;0 表示无限等待 */ + @Value("${jarvis.phone-forward.lock-acquire-timeout-ms:180000}") + private long lockAcquireTimeoutMs; + + /** 连续失败达到阈值后,在 openDurationMs 内直接拒绝调用(不发起 HTTP) */ + @Value("${jarvis.phone-forward.circuit-failure-threshold:5}") + private int circuitFailureThreshold; + + @Value("${jarvis.phone-forward.circuit-open-ms:120000}") + private long circuitOpenMs; + + /** 仅允许单飞:所有 phone-forward 请求串行 */ + private final ReentrantLock tgBridgeCallLock = new ReentrantLock(true); + + private final AtomicInteger circuitFailureCount = new AtomicInteger(0); + + /** 熔断恢复时间(epoch ms),0 表示未熔断 */ + private final AtomicLong circuitOpenUntilMs = new AtomicLong(0); /** * @return 非 null 表示本条消息已由本服务处理(含错误提示);null 表示不匹配规则 */ @@ -103,6 +126,37 @@ public class OpenPhoneForwardService { return doForward(phone, bot); } + private boolean isCircuitOpen() { + long until = circuitOpenUntilMs.get(); + return until > 0L && System.currentTimeMillis() < until; + } + + private void recordSuccess() { + circuitFailureCount.set(0); + circuitOpenUntilMs.set(0L); + } + + /** HTTP/网络类失败、5xx、超时记入;达到阈值则熔断一段时间 */ + private void recordFailure() { + long now = System.currentTimeMillis(); + long until = circuitOpenUntilMs.get(); + if (until > now) { + return; + } + int n = circuitFailureCount.incrementAndGet(); + int thr = Math.max(1, circuitFailureThreshold); + if (n >= thr) { + long openUntil = now + Math.max(1000L, circuitOpenMs); + circuitOpenUntilMs.set(openUntil); + circuitFailureCount.set(0); + log.warn("phone-forward 熔断开启至 epochMs={}(连续失败 ≥ {})", openUntil, thr); + } + } + + private static boolean shouldTripCircuit(int httpCode) { + return httpCode == 504 || httpCode >= 500; + } + private static String extractFirstMobile11(String text) { Matcher m = MOBILE_11.matcher(text); if (m.find()) { @@ -112,6 +166,38 @@ public class OpenPhoneForwardService { } private String doForward(String phone, String bot) { + if (isCircuitOpen()) { + log.warn("phone-forward 熔断拒绝 phone={} bot={}", phone, bot); + return "「转发服务」暂时不可用(连续失败保护中),请一分钟后再试。"; + } + + boolean locked = false; + try { + if (lockAcquireTimeoutMs <= 0) { + tgBridgeCallLock.lock(); + locked = true; + } else { + locked = tgBridgeCallLock.tryLock(lockAcquireTimeoutMs, TimeUnit.MILLISECONDS); + if (!locked) { + log.warn( + "phone-forward 排队超时(上一条未完成)phone={} bot={} ms={}", + phone, bot, lockAcquireTimeoutMs); + return "「转发服务」正忙(上一条查询尚未结束),请稍后再试。"; + } + } + return doForwardUnsynchronized(phone, bot); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("phone-forward 获取锁被打断 phone={} bot={}", phone, bot); + return "「转发服务」被中断,请稍后再试。"; + } finally { + if (locked) { + tgBridgeCallLock.unlock(); + } + } + } + + private String doForwardUnsynchronized(String phone, String bot) { try { String base = baseUrl.trim(); if (base.endsWith("/")) { @@ -156,27 +242,38 @@ public class OpenPhoneForwardService { String resp = readAll(is); if (code < 200 || code >= 300) { log.warn("phone-forward HTTP {} url={} body={}", code, urlStr, resp); + if (shouldTripCircuit(code)) { + recordFailure(); + } return "「转发服务」请求失败(HTTP " + code + "),请稍后再试。"; } JSONObject jo = JSONObject.parseObject(resp); if (jo == null) { + recordFailure(); return "「转发服务」返回异常,请稍后再试。"; } String reply = jo.getString("reply_text"); if (!StringUtils.hasText(reply)) { + recordFailure(); return "「转发服务」未返回 reply_text。"; } if (BOT_SLOW_OPEN.equals(bot)) { reply = filterQingBaoAdLines(reply); } + recordSuccess(); return reply; } finally { if (conn != null) { conn.disconnect(); } } + } catch (SocketTimeoutException e) { + log.warn("phone-forward 超时 phone={} bot={} err={}", phone, bot, e.toString()); + recordFailure(); + return "「转发服务」超时,请稍后再试。"; } catch (Exception e) { log.warn("phone-forward 异常 phone={} bot={} err={}", phone, bot, e.toString()); + recordFailure(); return "「转发服务」连接失败,请确认 Jarvis 与局域网服务可达。"; } }