commit e9823c2261ca86ef22f2e31d71eb06a80ca00e49 Author: van Date: Thu Apr 23 22:06:19 2026 +0800 初始化 diff --git a/demo.py b/demo.py new file mode 100644 index 0000000..4dd1188 --- /dev/null +++ b/demo.py @@ -0,0 +1,31 @@ +""" +wxauto 最简单的 demo 测试 +需要先安装: pip install wxauto +确保微信客户端已登录 +""" + +from wxauto import WeChat + +# 初始化微信实例 +print("正在初始化微信...") +wx = WeChat() + +# 发送消息到文件传输助手 +print("正在发送消息...") +wx.SendMsg("你好,这是一条测试消息!", who="文件传输助手") + +# 获取当前聊天窗口的消息 +print("正在获取消息...") +msgs = wx.GetAllMessage() + +# 打印消息信息 +print(f"\n共获取到 {len(msgs)} 条消息:") +for i, msg in enumerate(msgs, 1): + print(f"消息 {i}:") + print(f" 内容: {msg.content}") + print(f" 类型: {msg.type}") + print(f" 发送者: {msg.sender}") + print() + +print("Demo 测试完成!") + diff --git a/requirements-tg-bridge.txt b/requirements-tg-bridge.txt new file mode 100644 index 0000000..26611a6 --- /dev/null +++ b/requirements-tg-bridge.txt @@ -0,0 +1,7 @@ +# Telegram HTTP 转发桥(与 wxauto 无关,可单独安装) +telethon>=1.36.0 +PySocks>=1.7.1 +fastapi>=0.115.0 +uvicorn[standard]>=0.30.0 +python-dotenv>=1.0.0 +pydantic>=2.0.0 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..2938872 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +wxauto>=3.9.8.15 + diff --git a/run_bridge.bat b/run_bridge.bat new file mode 100644 index 0000000..fd51516 --- /dev/null +++ b/run_bridge.bat @@ -0,0 +1,5 @@ +@echo off +REM 须用 python -m tg_bridge(已强制 Selector 循环);不要裸 uvicorn tg_bridge.app:app +cd /d "%~dp0" +python -m tg_bridge +if errorlevel 1 pause diff --git a/run_login.bat b/run_login.bat new file mode 100644 index 0000000..0defbd4 --- /dev/null +++ b/run_login.bat @@ -0,0 +1,5 @@ +@echo off +REM 必须在「wx_python」根目录运行(与 tg_bridge 文件夹同级) +cd /d "%~dp0" +python -m tg_bridge.login_cli +if errorlevel 1 pause diff --git a/tg_bridge/.env.example b/tg_bridge/.env.example new file mode 100644 index 0000000..46d10e5 --- /dev/null +++ b/tg_bridge/.env.example @@ -0,0 +1,48 @@ +# https://my.telegram.org 申请 +TELEGRAM_API_ID=12345678 +TELEGRAM_API_HASH=你的api_hash + +# 目标 Bot 的用户名(可带或不带 @)。多个用英文逗号分隔;POST /v1/forward 的 bot 须在此列表内(企微「开」→ AJL05_bot,「慢开」→ QingBaoJuXWsgkbot) +TELEGRAM_BOT_USERNAME=AJL05_bot,QingBaoJuXWsgkbot +# TELEGRAM_BOT_USERNAME=YourBotName + +# Session 持久化(SQLite,一般只需 login_cli 一次)。默认:wx_python 根目录下 tg_bridge.session +# TELEGRAM_SESSION_PATH=D:/data/tg_bridge.session + +# HTTP 监听 +BRIDGE_HOST=0.0.0.0 +BRIDGE_PORT=18080 +# Windows + Telethon:若用手动 uvicorn,必须加(否则会 Proactor+121): +# --loop tg_bridge.uvicorn_loop:selector_loop_factory + +# 可选;设置后 POST /v1/forward 必须带 Bearer 或 X-Bridge-Token +# BRIDGE_TOKEN=随机长字符串 + +# Telegram 连接代理(Clash / v2ray 本地 HTTP 或 SOCKS5,填一种即可;不设则直连) +# TELEGRAM_PROXY_TYPE=http +# TELEGRAM_PROXY_TYPE=socks5 +# TELEGRAM_PROXY_HOST=127.0.0.1 +# TELEGRAM_PROXY_PORT=7890 +# TELEGRAM_PROXY_USER= +# TELEGRAM_PROXY_PASSWORD= +# SOCKS5 若仍超时,可显式关闭代理侧解析域名(默认 socks 已为 false,http 默认为 true) +# TELEGRAM_PROXY_RDNS=false +# Telethon 连接超时(秒),经代理建议 60~120 +# TELEGRAM_CONNECT_TIMEOUT=90 +# TELEGRAM_CONNECTION_RETRIES=5 +# TELEGRAM_RETRY_DELAY=3 +# 传输模式:遇 Windows WinError 121 可改为 tcp_obfuscated 或 tcp_intermediate +# TELEGRAM_CONNECTION=tcp_full +# TELEGRAM_CONNECTION=tcp_obfuscated +# /v1/forward 默认 wait_reply=true 时,等 Bot 回复的最长时间(秒) +# TELEGRAM_BOT_REPLY_TIMEOUT=120 +# Bot 先发「查询中」再发结果时,可默认取第 2 条(请求里 reply_take_nth 可覆盖) +# BOT_REPLY_TAKE_NTH=2 +# 业务日志中附带正文/回复预览的最大字符数(0 或未设置则只记录长度,避免日志含敏感全文) +# BRIDGE_LOG_PREVIEW_CHARS=120 +# 文件日志目录(默认 wx_python/logs);当前写入 tg_bridge.log,每日午夜滚动为 tg_bridge.log.YYYY-MM-DD +# BRIDGE_LOG_DIR=D:/data/tg_bridge_logs +# 保留的滚动日志文件个数(默认 30) +# BRIDGE_LOG_BACKUP_COUNT=30 +# tg_bridge 日志级别:DEBUG / INFO / WARNING … +# BRIDGE_LOG_LEVEL=INFO \ No newline at end of file diff --git a/tg_bridge/__init__.py b/tg_bridge/__init__.py new file mode 100644 index 0000000..7201798 --- /dev/null +++ b/tg_bridge/__init__.py @@ -0,0 +1,3 @@ +"""Telegram 转发桥:HTTP 入站,由个人号向指定 Bot 发送消息。""" + +__version__ = "0.1.0" diff --git a/tg_bridge/__main__.py b/tg_bridge/__main__.py new file mode 100644 index 0000000..c9aac09 --- /dev/null +++ b/tg_bridge/__main__.py @@ -0,0 +1,25 @@ +"""启动 HTTP 服务: python -m tg_bridge""" + +from __future__ import annotations + +import uvicorn + +from tg_bridge.config import Settings +from tg_bridge.winloop import apply_windows_selector_policy + + +def main() -> None: + apply_windows_selector_policy() + s = Settings.load() + # Windows 上 Uvicorn 仍优先 Proactor,须显式 loop_factory,否则 lifespan 里 Telethon WinError 121 + uvicorn.run( + "tg_bridge.app:app", + host=s.bridge_host, + port=s.bridge_port, + log_level="info", + loop="tg_bridge.uvicorn_loop:selector_loop_factory", + ) + + +if __name__ == "__main__": + main() diff --git a/tg_bridge/__pycache__/__init__.cpython-312.pyc b/tg_bridge/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..5098b51 Binary files /dev/null and b/tg_bridge/__pycache__/__init__.cpython-312.pyc differ diff --git a/tg_bridge/__pycache__/app.cpython-312.pyc b/tg_bridge/__pycache__/app.cpython-312.pyc new file mode 100644 index 0000000..0827048 Binary files /dev/null and b/tg_bridge/__pycache__/app.cpython-312.pyc differ diff --git a/tg_bridge/__pycache__/client_factory.cpython-312.pyc b/tg_bridge/__pycache__/client_factory.cpython-312.pyc new file mode 100644 index 0000000..1acce90 Binary files /dev/null and b/tg_bridge/__pycache__/client_factory.cpython-312.pyc differ diff --git a/tg_bridge/__pycache__/config.cpython-312.pyc b/tg_bridge/__pycache__/config.cpython-312.pyc new file mode 100644 index 0000000..ee71c72 Binary files /dev/null and b/tg_bridge/__pycache__/config.cpython-312.pyc differ diff --git a/tg_bridge/__pycache__/connection_mode.cpython-312.pyc b/tg_bridge/__pycache__/connection_mode.cpython-312.pyc new file mode 100644 index 0000000..0ed3937 Binary files /dev/null and b/tg_bridge/__pycache__/connection_mode.cpython-312.pyc differ diff --git a/tg_bridge/__pycache__/http_logging.cpython-312.pyc b/tg_bridge/__pycache__/http_logging.cpython-312.pyc new file mode 100644 index 0000000..9599bd8 Binary files /dev/null and b/tg_bridge/__pycache__/http_logging.cpython-312.pyc differ diff --git a/tg_bridge/__pycache__/logging_setup.cpython-312.pyc b/tg_bridge/__pycache__/logging_setup.cpython-312.pyc new file mode 100644 index 0000000..ec26bda Binary files /dev/null and b/tg_bridge/__pycache__/logging_setup.cpython-312.pyc differ diff --git a/tg_bridge/__pycache__/proxy.cpython-312.pyc b/tg_bridge/__pycache__/proxy.cpython-312.pyc new file mode 100644 index 0000000..8d52955 Binary files /dev/null and b/tg_bridge/__pycache__/proxy.cpython-312.pyc differ diff --git a/tg_bridge/app.py b/tg_bridge/app.py new file mode 100644 index 0000000..43f0e53 --- /dev/null +++ b/tg_bridge/app.py @@ -0,0 +1,256 @@ +from __future__ import annotations + +# 策略对纯 asyncio 有效;Uvicorn 在 win32 上会改用 Proactor 类作为 loop_factory, +# 与下文无关。请用 ``python -m tg_bridge`` 或 ``uvicorn ... --loop tg_bridge.uvicorn_loop:selector_loop_factory``。 +from tg_bridge.winloop import apply_windows_selector_policy + +apply_windows_selector_policy() + +import asyncio +import logging +from contextlib import asynccontextmanager +from typing import Annotated, Any + +from fastapi import Depends, FastAPI, Header, HTTPException +from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer +from pydantic import BaseModel, Field +from telethon import TelegramClient +from telethon.errors import RPCError + +from tg_bridge.client_factory import create_telegram_client +from tg_bridge.config import Settings +from tg_bridge.http_logging import AccessLogMiddleware, log_preview_max_chars, preview_for_log +from tg_bridge.logging_setup import setup_logging + +setup_logging() +logger = logging.getLogger(__name__) + +_settings: Settings | None = None +_client: TelegramClient | None = None +# 与 Bot 一问一答必须串行,否则 Conversation 会串话或 AlreadyInConversationError +_telegram_bridge_lock = asyncio.Lock() +security = HTTPBearer(auto_error=False) + + +def _extract_bot_reply_text(message: Any) -> str: + if message is None: + return "" + raw = getattr(message, "text", None) or getattr(message, "message", None) or "" + return raw.strip() if isinstance(raw, str) else "" + + +def _get_settings() -> Settings: + global _settings + if _settings is None: + _settings = Settings.load() + return _settings + + +async def _ensure_client() -> TelegramClient: + if _client is None: + raise HTTPException(status_code=503, detail="Telegram 客户端未初始化") + return _client + + +def _verify_bridge_auth( + creds: HTTPAuthorizationCredentials | None, + x_bridge_token: str | None, +) -> None: + s = _get_settings() + expected = s.bridge_token + if not expected: + return + if creds and creds.scheme.lower() == "bearer" and creds.credentials == expected: + return + if x_bridge_token and x_bridge_token == expected: + return + raise HTTPException(status_code=401, detail="无效或未提供鉴权") + + +@asynccontextmanager +async def _lifespan(app: FastAPI): + global _client + s = _get_settings() + if s.bridge_token: + logger.info("已启用 BRIDGE_TOKEN,请求需携带 Bearer 或 X-Bridge-Token") + else: + logger.warning("未设置 BRIDGE_TOKEN,/v1/forward 对同网段可达者开放,生产环境请设置") + + if s.proxy_type and s.proxy_host and s.proxy_port: + logger.info( + "Telegram 使用代理: type=%s host=%s port=%s rdns=%s connection=%s timeout=%ss", + s.proxy_type, + s.proxy_host, + s.proxy_port, + s.proxy_rdns, + s.connection_mode, + s.connect_timeout, + ) + _client = create_telegram_client(s) + await _client.connect() + if not await _client.is_user_authorized(): + await _client.disconnect() + _client = None + raise RuntimeError( + "Telegram 会话未授权。请在 wx_python 目录执行: python -m tg_bridge.login_cli" + ) + me = await _client.get_me() + logger.info("Telegram 已连接: user_id=%s username=%s", me.id, me.username) + yield + if _client: + await _client.disconnect() + _client = None + + +app = FastAPI(title="tg_bridge", version="0.1.0", lifespan=_lifespan) +app.add_middleware(AccessLogMiddleware) + + +class ForwardBody(BaseModel): + """转发到 Telegram Bot 的请求体。""" + + text: str = Field(..., min_length=1, description="要发送的文本") + bot: str | None = Field( + None, + description="目标 Bot 用户名(与 .env 中配置的某一个一致);省略则发往列表中的第一个", + ) + context: str | None = Field( + None, + description="可选,如企业微信 UserID,会加在正文前便于区分来源", + ) + wait_reply: bool = Field( + True, + description="为 true 时等待 Bot 下一条文本回复,并放入 reply_text(企业微信桥接常用)", + ) + reply_timeout_sec: int | None = Field( + None, + ge=5, + le=600, + description="等待回复超时秒数,默认使用服务端 TELEGRAM_BOT_REPLY_TIMEOUT", + ) + reply_take_nth: int | None = Field( + None, + ge=1, + le=20, + description="取 Bot 连续回复的第几条(1=第一条,2=第二条)。省略则用服务端 BOT_REPLY_TAKE_NTH", + ) + + +class ForwardResponse(BaseModel): + ok: bool = True + detail: str = "sent" + reply_text: str | None = Field( + None, + description="Bot 回复正文;仅 wait_reply 为 true 且收到消息时可能有值", + ) + + +async def _auth_dep( + creds: Annotated[HTTPAuthorizationCredentials | None, Depends(security)], + x_bridge_token: Annotated[str | None, Header(alias="X-Bridge-Token")] = None, +): + _verify_bridge_auth(creds, x_bridge_token) + + +@app.get("/health") +async def health(): + s = _get_settings() + c = _client + authorized = False + if c: + try: + authorized = await c.is_user_authorized() + except Exception: + authorized = False + return { + "status": "ok", + "telegram_authorized": authorized, + "bot": s.default_bot_username, + "bots": list(s.bot_usernames), + "telegram_proxy": bool(s.proxy_type and s.proxy_host and s.proxy_port), + "telegram_proxy_rdns": s.proxy_rdns, + "telegram_connect_timeout_sec": s.connect_timeout, + "telegram_connection": s.connection_mode, + "bot_reply_timeout_sec": s.bot_reply_timeout, + "default_bot_reply_take_nth": s.bot_reply_take_nth, + } + + +@app.post("/v1/forward", response_model=ForwardResponse) +async def forward_message( + body: ForwardBody, + _: Annotated[None, Depends(_auth_dep)], +): + """将 text 用当前登录的 Telegram 个人号发给配置的 Bot。 + + 若设置 BRIDGE_TOKEN:请求头需 `Authorization: Bearer ` 或 `X-Bridge-Token: `。 + """ + s = _get_settings() + client = await _ensure_client() + try: + target_bot = s.resolve_bot_username(body.bot) + except ValueError as e: + logger.warning("forward 请求被拒绝: %s", e) + raise HTTPException(status_code=400, detail=str(e)) from e + + payload = body.text.strip() + if body.context and body.context.strip(): + payload = f"[wx:{body.context.strip()}]\n{payload}" + + prev_n = log_preview_max_chars() + preview_part = ( + f" text_preview={preview_for_log(payload, prev_n)!r}" if prev_n else "" + ) + logger.info( + "forward 请求: bot=%s wait_reply=%s text_len=%d context=%s timeout_sec=%s take_nth=%s%s", + target_bot, + body.wait_reply, + len(payload), + bool(body.context and body.context.strip()), + body.reply_timeout_sec, + body.reply_take_nth, + preview_part, + ) + + reply_text: str | None = None + try: + if body.wait_reply: + rt = float(body.reply_timeout_sec or s.bot_reply_timeout) + nth = body.reply_take_nth if body.reply_take_nth is not None else s.bot_reply_take_nth + # 多条回复时,每条独立算超时;总会话上限略放大避免 total_timeout 先触发 + total_rt = rt * float(nth) + 15.0 + async with _telegram_bridge_lock: + async with client.conversation( + target_bot, + exclusive=True, + timeout=rt, + total_timeout=total_rt, + ) as conv: + await conv.send_message(payload) + response = None + for _ in range(nth): + response = await conv.get_response(timeout=rt) + reply_text = _extract_bot_reply_text(response) if response is not None else None + reply_text = reply_text or None + else: + await client.send_message(target_bot, payload) + except asyncio.TimeoutError: + logger.warning("forward: bot=%s 等待 Bot 回复超时", target_bot) + raise HTTPException(status_code=504, detail="等待 Bot 回复超时") from None + except RPCError as e: + logger.exception("Telegram RPC 失败") + raise HTTPException(status_code=502, detail=str(e)) from e + except Exception as e: + logger.exception("发送失败") + raise HTTPException(status_code=500, detail=str(e)) from e + out = ForwardResponse(reply_text=reply_text) + rlen = len(out.reply_text or "") + if prev_n and out.reply_text: + logger.info( + "forward 响应: reply_len=%d reply_preview=%r", + rlen, + preview_for_log(out.reply_text, prev_n), + ) + else: + logger.info("forward 响应: reply_len=%d", rlen) + return out diff --git a/tg_bridge/client_factory.py b/tg_bridge/client_factory.py new file mode 100644 index 0000000..78c41c3 --- /dev/null +++ b/tg_bridge/client_factory.py @@ -0,0 +1,23 @@ +from __future__ import annotations + +from telethon import TelegramClient + +from tg_bridge.config import Settings +from tg_bridge.connection_mode import resolve_connection_class +from tg_bridge.proxy import telethon_proxy_from_settings + + +def create_telegram_client(s: Settings) -> TelegramClient: + """创建带代理与连接超时的 TelegramClient(Telethon 默认 timeout=10 经代理易超时)。""" + proxy = telethon_proxy_from_settings(s) + conn = resolve_connection_class(s.connection_mode) + return TelegramClient( + str(s.session_path), + s.api_id, + s.api_hash, + proxy=proxy, + connection=conn, + timeout=s.connect_timeout, + connection_retries=s.connection_retries, + retry_delay=s.retry_delay, + ) diff --git a/tg_bridge/config.py b/tg_bridge/config.py new file mode 100644 index 0000000..9c70854 --- /dev/null +++ b/tg_bridge/config.py @@ -0,0 +1,153 @@ +from __future__ import annotations + +import os +from dataclasses import dataclass +from pathlib import Path + +from dotenv import load_dotenv + +# 优先从包上级目录(wx_python 根)加载 .env,避免工作目录不在项目根时失效 +_ROOT = Path(__file__).resolve().parent.parent +load_dotenv(_ROOT / ".env") +load_dotenv() + + +def _parse_bot_usernames(raw: str) -> tuple[str, ...]: + """从逗号分隔字符串解析 Bot 用户名(去 @、去空)。""" + parts = [p.strip().lstrip("@") for p in raw.split(",")] + bots = tuple(p for p in parts if p) + if not bots: + raise RuntimeError( + "缺少 TELEGRAM_BOT_USERNAME:填写至少一个 Bot 用户名;多个用英文逗号分隔,不要带 @ 也可" + ) + return bots + + +@dataclass(frozen=True) +class Settings: + api_id: int + api_hash: str + session_path: Path + """已配置的 Bot 用户名列表(顺序:默认目标为第一个)。""" + bot_usernames: tuple[str, ...] + bridge_host: str + bridge_port: int + bridge_token: str | None + # Telethon 连接 Telegram 用的代理(与系统「全局代理」无关,需显式配置) + proxy_type: str | None + proxy_host: str | None + proxy_port: int | None + proxy_user: str | None + proxy_password: str | None + # SOCKS 上 rdns=True 会在代理侧解析域名,部分代理会卡死;默认 False 多为本机解析再连 + proxy_rdns: bool + # Telethon 单次连接超时秒数(默认 10 经代理不够) + connect_timeout: int + connection_retries: int + retry_delay: int + # Telethon Connection 类:tcp_obfuscated 在受限网络 + 代理下常比 tcp_full 更稳 + connection_mode: str + # /v1/forward 在 wait_reply 时等待 Bot 回复的上限(秒) + bot_reply_timeout: float + # wait_reply 时默认取 Bot 第几条连续回复(可被请求体 reply_take_nth 覆盖) + bot_reply_take_nth: int + + @property + def default_bot_username(self) -> str: + return self.bot_usernames[0] + + def resolve_bot_username(self, bot: str | None) -> str: + """将请求里的 bot 解析为已配置的用户名;省略则用列表第一个。""" + if bot is None: + return self.bot_usernames[0] + key = bot.strip().lstrip("@").lower() + by_lower = {b.lower(): b for b in self.bot_usernames} + if key not in by_lower: + raise ValueError( + f"未知 Bot「{bot}」,当前允许: {', '.join(self.bot_usernames)}" + ) + return by_lower[key] + + @staticmethod + def load() -> "Settings": + api_id_raw = os.environ.get("TELEGRAM_API_ID", "").strip() + api_hash = os.environ.get("TELEGRAM_API_HASH", "").strip() + if not api_id_raw or not api_hash: + raise RuntimeError( + "缺少 TELEGRAM_API_ID / TELEGRAM_API_HASH。" + "请到 https://my.telegram.org 申请后写入环境变量或 .env" + ) + bot_raw = os.environ.get("TELEGRAM_BOT_USERNAME", "").strip() + bots = _parse_bot_usernames(bot_raw) + + # 默认可写绝对路径:避免「login 在项目目录、服务从别的工作目录启动」找不到同一份 session + _default_session = (_ROOT / "tg_bridge.session").resolve() + session_raw = os.environ.get("TELEGRAM_SESSION_PATH", "").strip() + session = ( + Path(session_raw).expanduser().resolve() + if session_raw + else _default_session + ) + + token = os.environ.get("BRIDGE_TOKEN", "").strip() or None + host = os.environ.get("BRIDGE_HOST", "0.0.0.0").strip() + port = int(os.environ.get("BRIDGE_PORT", "18080")) + + ptype = os.environ.get("TELEGRAM_PROXY_TYPE", "").strip() or None + phost = os.environ.get("TELEGRAM_PROXY_HOST", "").strip() or None + pport_raw = os.environ.get("TELEGRAM_PROXY_PORT", "").strip() + pport: int | None = int(pport_raw) if pport_raw else None + puser = os.environ.get("TELEGRAM_PROXY_USER", "").strip() or None + ppwd_raw = os.environ.get("TELEGRAM_PROXY_PASSWORD") + ppwd: str | None = None + if puser is not None: + ppwd = ppwd_raw if ppwd_raw is not None else "" + + if ptype and (not phost or not pport): + raise RuntimeError( + "已设置 TELEGRAM_PROXY_TYPE 时,须同时设置 TELEGRAM_PROXY_HOST 与 TELEGRAM_PROXY_PORT" + ) + if (phost or pport_raw) and not ptype: + raise RuntimeError("设置 TELEGRAM_PROXY_HOST / TELEGRAM_PROXY_PORT 时须设置 TELEGRAM_PROXY_TYPE(http|socks5|socks4)") + + rdns_raw = os.environ.get("TELEGRAM_PROXY_RDNS", "").strip().lower() + if rdns_raw in ("1", "true", "yes", "on"): + proxy_rdns = True + elif rdns_raw in ("0", "false", "no", "off"): + proxy_rdns = False + elif not rdns_raw: + # 未配置:HTTP 代理常用远程 DNS;SOCKS 默认本机解析更稳 + proxy_rdns = bool(ptype and str(ptype).lower() in ("http", "https")) + else: + raise RuntimeError("TELEGRAM_PROXY_RDNS 请使用 true/false") + + connect_timeout = int(os.environ.get("TELEGRAM_CONNECT_TIMEOUT", "90")) + connection_retries = int(os.environ.get("TELEGRAM_CONNECTION_RETRIES", "5")) + retry_delay = int(os.environ.get("TELEGRAM_RETRY_DELAY", "3")) + connection_mode = os.environ.get("TELEGRAM_CONNECTION", "tcp_full").strip() or "tcp_full" + bot_reply_timeout = float(os.environ.get("TELEGRAM_BOT_REPLY_TIMEOUT", "120")) + bot_reply_take_nth = int(os.environ.get("BOT_REPLY_TAKE_NTH", "1")) + if bot_reply_take_nth < 1 or bot_reply_take_nth > 20: + raise RuntimeError("BOT_REPLY_TAKE_NTH 须在 1~20 之间") + + return Settings( + api_id=int(api_id_raw), + api_hash=api_hash, + session_path=session, + bot_usernames=bots, + bridge_host=host, + bridge_port=port, + bridge_token=token, + proxy_type=ptype, + proxy_host=phost, + proxy_port=pport, + proxy_user=puser, + proxy_password=ppwd, + proxy_rdns=proxy_rdns, + connect_timeout=connect_timeout, + connection_retries=connection_retries, + retry_delay=retry_delay, + connection_mode=connection_mode, + bot_reply_timeout=bot_reply_timeout, + bot_reply_take_nth=bot_reply_take_nth, + ) diff --git a/tg_bridge/connection_mode.py b/tg_bridge/connection_mode.py new file mode 100644 index 0000000..2a479c1 --- /dev/null +++ b/tg_bridge/connection_mode.py @@ -0,0 +1,29 @@ +"""Telethon Connection 模式(与 Telegram Desktop 使用的传输类似可选用 obfuscated)。""" + +from __future__ import annotations + +from typing import Type + +from telethon.network.connection import ( + Connection, + ConnectionTcpAbridged, + ConnectionTcpFull, + ConnectionTcpIntermediate, + ConnectionTcpObfuscated, +) + + +def resolve_connection_class(mode: str) -> Type[Connection]: + m = (mode or "").strip().lower().replace("-", "_") + if not m or m == "tcp_full": + return ConnectionTcpFull + if m == "tcp_obfuscated": + return ConnectionTcpObfuscated + if m == "tcp_intermediate": + return ConnectionTcpIntermediate + if m == "tcp_abridged": + return ConnectionTcpAbridged + raise ValueError( + f"不支持的 TELEGRAM_CONNECTION={mode!r},可用: " + "tcp_full, tcp_obfuscated, tcp_intermediate, tcp_abridged" + ) diff --git a/tg_bridge/http_logging.py b/tg_bridge/http_logging.py new file mode 100644 index 0000000..5150b6c --- /dev/null +++ b/tg_bridge/http_logging.py @@ -0,0 +1,67 @@ +"""HTTP 访问日志与转发内容摘要(可配置是否打出正文预览)。""" + +from __future__ import annotations + +import logging +import os +import time +from collections.abc import Callable +from typing import Any + +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.requests import Request +from starlette.responses import Response + +access_logger = logging.getLogger("tg_bridge.access") + + +def log_preview_max_chars() -> int: + """环境变量 BRIDGE_LOG_PREVIEW_CHARS:>0 时在业务日志中附带截断后的正文/回复预览;未设置或 0 则只打长度。""" + raw = os.environ.get("BRIDGE_LOG_PREVIEW_CHARS", "").strip() + if not raw: + return 0 + try: + return max(0, int(raw)) + except ValueError: + return 0 + + +def preview_for_log(text: str | None, max_chars: int) -> str: + if not text or max_chars <= 0: + return "" + s = text.replace("\r", "").replace("\n", "\\n") + if len(s) <= max_chars: + return s + return s[:max_chars] + "..." + + +class AccessLogMiddleware(BaseHTTPMiddleware): + """记录每条 HTTP 请求的方法、路径、状态码与耗时(不含鉴权头与 Body)。""" + + async def dispatch(self, request: Request, call_next: Callable[[Request], Any]) -> Response: + start = time.perf_counter() + client = request.client.host if request.client else "-" + path = request.url.path + method = request.method + try: + response = await call_next(request) + except Exception: + elapsed_ms = (time.perf_counter() - start) * 1000 + access_logger.exception( + '%s "%s %s" 未捕获异常 %.1fms', + client, + method, + path, + elapsed_ms, + ) + raise + elapsed_ms = (time.perf_counter() - start) * 1000 + access_logger.info( + '%s "%s %s" %s %.1fms', + client, + method, + path, + response.status_code, + elapsed_ms, + ) + return response diff --git a/tg_bridge/logging_setup.py b/tg_bridge/logging_setup.py new file mode 100644 index 0000000..ea8b833 --- /dev/null +++ b/tg_bridge/logging_setup.py @@ -0,0 +1,66 @@ +"""tg_bridge 日志:控制台 + 按日切分的本地文件(午夜滚动,历史文件带日期后缀)。""" + +from __future__ import annotations + +import logging +import logging.handlers +import os +from pathlib import Path + +from dotenv import load_dotenv + +_ROOT = Path(__file__).resolve().parent.parent +load_dotenv(_ROOT / ".env") +load_dotenv() + + +def _parse_log_level(name: str) -> int: + return getattr(logging, name.upper(), logging.INFO) + + +def setup_logging() -> None: + """为 logger ``tg_bridge`` 及其子 logger 配置 StreamHandler + TimedRotatingFileHandler。 + + 当前日志文件:``<目录>/tg_bridge.log``;每日午夜滚动后重命名为 ``tg_bridge.log.YYYY-MM-DD``。 + 重复调用不会重复添加 handler。 + """ + root_tg = logging.getLogger("tg_bridge") + if root_tg.handlers: + return + + log_dir_raw = os.environ.get("BRIDGE_LOG_DIR", "").strip() + log_dir = Path(log_dir_raw).expanduser().resolve() if log_dir_raw else (_ROOT / "logs").resolve() + log_dir.mkdir(parents=True, exist_ok=True) + + backup_raw = os.environ.get("BRIDGE_LOG_BACKUP_COUNT", "30").strip() + try: + backup_count = max(1, int(backup_raw)) + except ValueError: + backup_count = 30 + + level = _parse_log_level(os.environ.get("BRIDGE_LOG_LEVEL", "INFO")) + root_tg.setLevel(level) + + fmt = logging.Formatter( + "%(asctime)s [%(levelname)s] %(name)s: %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + + file_path = log_dir / "tg_bridge.log" + fh = logging.handlers.TimedRotatingFileHandler( + filename=str(file_path), + when="midnight", + interval=1, + backupCount=backup_count, + encoding="utf-8", + delay=True, + ) + fh.suffix = "%Y-%m-%d" + fh.setFormatter(fmt) + + sh = logging.StreamHandler() + sh.setFormatter(fmt) + + root_tg.addHandler(fh) + root_tg.addHandler(sh) + root_tg.propagate = False diff --git a/tg_bridge/login_cli.py b/tg_bridge/login_cli.py new file mode 100644 index 0000000..60141ce --- /dev/null +++ b/tg_bridge/login_cli.py @@ -0,0 +1,49 @@ +"""首次登录 Telegram:生成 session 文件,供 tg_bridge 服务使用。 + +用法(在 wx_python 目录下): + python -m tg_bridge.login_cli + +需已配置 TELEGRAM_API_ID、TELEGRAM_API_HASH、TELEGRAM_SESSION_PATH(可选),见 tg_bridge/.env.example +""" + +from __future__ import annotations + +import asyncio + +from tg_bridge.client_factory import create_telegram_client +from tg_bridge.config import Settings +from tg_bridge.proxy import telethon_proxy_from_settings +from tg_bridge.winloop import apply_windows_selector_policy + + +async def main() -> None: + s = Settings.load() + try: + proxy = telethon_proxy_from_settings(s) + except ValueError as e: + print(e) + return + if proxy: + safe = {**proxy} + if safe.get("password"): + safe["password"] = "***" + print( + f"使用代理: {safe!r} connection={s.connection_mode} " + f"connect_timeout={s.connect_timeout}s" + ) + else: + print(f"未配置代理(直连) connection={s.connection_mode}") + client = create_telegram_client(s) + await client.start() + if not await client.is_user_authorized(): + print("登录未完成") + return + me = await client.get_me() + print(f"已保存会话: {s.session_path}") + print(f"当前账号: id={me.id} username={me.username!r}") + await client.disconnect() + + +if __name__ == "__main__": + apply_windows_selector_policy() + asyncio.run(main()) diff --git a/tg_bridge/proxy.py b/tg_bridge/proxy.py new file mode 100644 index 0000000..a690450 --- /dev/null +++ b/tg_bridge/proxy.py @@ -0,0 +1,39 @@ +"""Telethon 代理配置(传给 TelegramClient 的 proxy 参数)。""" + +from __future__ import annotations + +from typing import Any + +from tg_bridge.config import Settings + + +def telethon_proxy_from_settings(s: Settings) -> dict[str, Any] | None: + """返回 Telethon ``TelegramClient(..., proxy=...)`` 所用的 dict。 + + 键名与 Telethon ``Connection._parse_proxy`` 一致: + ``proxy_type, addr, port, rdns, username, password``(后两项可选)。 + 使用 dict 可避免 PySocks 整数类型与 ``(host, port, rdns)`` 元组长度的歧义。 + + 参考:``telethon/network/connection/connection.py`` 中 ``_proxy_connect``。 + """ + if not s.proxy_type or not s.proxy_host or not s.proxy_port: + return None + + t = s.proxy_type.lower().strip() + if t == "https": + t = "http" + if t not in ("http", "socks5", "socks4"): + raise ValueError( + f"不支持的 TELEGRAM_PROXY_TYPE={s.proxy_type!r},可用: http, socks5, socks4" + ) + + d: dict[str, Any] = { + "proxy_type": t, + "addr": s.proxy_host, + "port": s.proxy_port, + "rdns": s.proxy_rdns, + } + if s.proxy_user is not None: + d["username"] = s.proxy_user + d["password"] = s.proxy_password if s.proxy_password is not None else "" + return d diff --git a/tg_bridge/uvicorn_loop.py b/tg_bridge/uvicorn_loop.py new file mode 100644 index 0000000..a72cf41 --- /dev/null +++ b/tg_bridge/uvicorn_loop.py @@ -0,0 +1,17 @@ +"""Uvicorn 在 Windows 上默认选用 ``ProactorEventLoop``(见 ``uvicorn/loops/asyncio.py``), +与 Telethon 经 SOCKS 连接 Telegram DC 时常触发 ``WinError 121``;login_cli 使用 Selector 故正常。 + +本模块提供给 Uvicorn 的 ``loop_factory``,强制使用 ``SelectorEventLoop``。 + +命令行示例:: + + uvicorn tg_bridge.app:app --host 0.0.0.0 --port 18080 --loop tg_bridge.uvicorn_loop:selector_loop_factory +""" + +from __future__ import annotations + +import asyncio + + +def selector_loop_factory() -> asyncio.AbstractEventLoop: + return asyncio.SelectorEventLoop() diff --git a/tg_bridge/winloop.py b/tg_bridge/winloop.py new file mode 100644 index 0000000..4440dd4 --- /dev/null +++ b/tg_bridge/winloop.py @@ -0,0 +1,15 @@ +"""Windows:经 SOCKS 连接 Telegram 时,Proactor 事件循环易触发 WinError 121(信号灯超时)。 + +在启动任何 asyncio 代码之前调用 ``apply_windows_selector_policy()``。 +参考:asyncio 在 Windows 上默认 ProactorEventLoop 与部分 socket/代理场景不兼容。 +""" + +from __future__ import annotations + +import asyncio +import sys + + +def apply_windows_selector_policy() -> None: + if sys.platform == "win32": + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())