40 lines
1.3 KiB
Python
40 lines
1.3 KiB
Python
"""Telethon 代理配置(传给 TelegramClient 的 proxy 参数)。"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
from tg_bridge.config import Settings
|
|
|
|
|
|
def telethon_proxy_from_settings(s: Settings) -> dict[str, Any] | None:
|
|
"""返回 Telethon ``TelegramClient(..., proxy=...)`` 所用的 dict。
|
|
|
|
键名与 Telethon ``Connection._parse_proxy`` 一致:
|
|
``proxy_type, addr, port, rdns, username, password``(后两项可选)。
|
|
使用 dict 可避免 PySocks 整数类型与 ``(host, port, rdns)`` 元组长度的歧义。
|
|
|
|
参考:``telethon/network/connection/connection.py`` 中 ``_proxy_connect``。
|
|
"""
|
|
if not s.proxy_type or not s.proxy_host or not s.proxy_port:
|
|
return None
|
|
|
|
t = s.proxy_type.lower().strip()
|
|
if t == "https":
|
|
t = "http"
|
|
if t not in ("http", "socks5", "socks4"):
|
|
raise ValueError(
|
|
f"不支持的 TELEGRAM_PROXY_TYPE={s.proxy_type!r},可用: http, socks5, socks4"
|
|
)
|
|
|
|
d: dict[str, Any] = {
|
|
"proxy_type": t,
|
|
"addr": s.proxy_host,
|
|
"port": s.proxy_port,
|
|
"rdns": s.proxy_rdns,
|
|
}
|
|
if s.proxy_user is not None:
|
|
d["username"] = s.proxy_user
|
|
d["password"] = s.proxy_password if s.proxy_password is not None else ""
|
|
return d
|