again
This commit is contained in:
91
toolbox.py
91
toolbox.py
@@ -1,11 +1,12 @@
|
||||
import markdown
|
||||
import importlib
|
||||
import traceback
|
||||
import time
|
||||
import inspect
|
||||
import re
|
||||
import os
|
||||
from latex2mathml.converter import convert as tex2mathml
|
||||
from functools import wraps, lru_cache
|
||||
pj = os.path.join
|
||||
|
||||
"""
|
||||
========================================================================
|
||||
@@ -70,6 +71,17 @@ def update_ui(chatbot, history, msg='正常', **kwargs): # 刷新界面
|
||||
assert isinstance(chatbot, ChatBotWithCookies), "在传递chatbot的过程中不要将其丢弃。必要时,可用clear将其清空,然后用for+append循环重新赋值。"
|
||||
yield chatbot.get_cookies(), chatbot, history, msg
|
||||
|
||||
def update_ui_lastest_msg(lastmsg, chatbot, history, delay=1): # 刷新界面
|
||||
"""
|
||||
刷新用户界面
|
||||
"""
|
||||
if len(chatbot) == 0: chatbot.append(["update_ui_last_msg", lastmsg])
|
||||
chatbot[-1] = list(chatbot[-1])
|
||||
chatbot[-1][-1] = lastmsg
|
||||
yield from update_ui(chatbot=chatbot, history=history)
|
||||
time.sleep(delay)
|
||||
|
||||
|
||||
def trimmed_format_exc():
|
||||
import os, traceback
|
||||
str = traceback.format_exc()
|
||||
@@ -83,7 +95,7 @@ def CatchException(f):
|
||||
"""
|
||||
|
||||
@wraps(f)
|
||||
def decorated(txt, top_p, temperature, chatbot, history, systemPromptTxt, WEB_PORT):
|
||||
def decorated(txt, top_p, temperature, chatbot, history, systemPromptTxt, WEB_PORT=-1):
|
||||
try:
|
||||
yield from f(txt, top_p, temperature, chatbot, history, systemPromptTxt, WEB_PORT)
|
||||
except Exception as e:
|
||||
@@ -210,16 +222,21 @@ def text_divide_paragraph(text):
|
||||
"""
|
||||
将文本按照段落分隔符分割开,生成带有段落标签的HTML代码。
|
||||
"""
|
||||
pre = '<div class="markdown-body">'
|
||||
suf = '</div>'
|
||||
if text.startswith(pre) and text.endswith(suf):
|
||||
return text
|
||||
|
||||
if '```' in text:
|
||||
# careful input
|
||||
return text
|
||||
return pre + text + suf
|
||||
else:
|
||||
# wtf input
|
||||
lines = text.split("\n")
|
||||
for i, line in enumerate(lines):
|
||||
lines[i] = lines[i].replace(" ", " ")
|
||||
text = "</br>".join(lines)
|
||||
return text
|
||||
return pre + text + suf
|
||||
|
||||
@lru_cache(maxsize=128) # 使用 lru缓存 加快转换速度
|
||||
def markdown_convertion(txt):
|
||||
@@ -331,8 +348,11 @@ def format_io(self, y):
|
||||
if y is None or y == []:
|
||||
return []
|
||||
i_ask, gpt_reply = y[-1]
|
||||
i_ask = text_divide_paragraph(i_ask) # 输入部分太自由,预处理一波
|
||||
gpt_reply = close_up_code_segment_during_stream(gpt_reply) # 当代码输出半截的时候,试着补上后个```
|
||||
# 输入部分太自由,预处理一波
|
||||
if i_ask is not None: i_ask = text_divide_paragraph(i_ask)
|
||||
# 当代码输出半截的时候,试着补上后个```
|
||||
if gpt_reply is not None: gpt_reply = close_up_code_segment_during_stream(gpt_reply)
|
||||
# process
|
||||
y[-1] = (
|
||||
None if i_ask is None else markdown.markdown(i_ask, extensions=['fenced_code', 'tables']),
|
||||
None if gpt_reply is None else markdown_convertion(gpt_reply)
|
||||
@@ -380,7 +400,7 @@ def extract_archive(file_path, dest_dir):
|
||||
print("Successfully extracted rar archive to {}".format(dest_dir))
|
||||
except:
|
||||
print("Rar format requires additional dependencies to install")
|
||||
return '\n\n需要安装pip install rarfile来解压rar文件'
|
||||
return '\n\n解压失败! 需要安装pip install rarfile来解压rar文件'
|
||||
|
||||
# 第三方库,需要预先pip install py7zr
|
||||
elif file_extension == '.7z':
|
||||
@@ -391,7 +411,7 @@ def extract_archive(file_path, dest_dir):
|
||||
print("Successfully extracted 7z archive to {}".format(dest_dir))
|
||||
except:
|
||||
print("7z format requires additional dependencies to install")
|
||||
return '\n\n需要安装pip install py7zr来解压7z文件'
|
||||
return '\n\n解压失败! 需要安装pip install py7zr来解压7z文件'
|
||||
else:
|
||||
return ''
|
||||
return ''
|
||||
@@ -420,6 +440,17 @@ def find_recent_files(directory):
|
||||
|
||||
return recent_files
|
||||
|
||||
def promote_file_to_downloadzone(file, rename_file=None, chatbot=None):
|
||||
# 将文件复制一份到下载区
|
||||
import shutil
|
||||
if rename_file is None: rename_file = f'{gen_time_str()}-{os.path.basename(file)}'
|
||||
new_path = os.path.join(f'./gpt_log/', rename_file)
|
||||
if os.path.exists(new_path) and not os.path.samefile(new_path, file): os.remove(new_path)
|
||||
if not os.path.exists(new_path): shutil.copyfile(file, new_path)
|
||||
if chatbot:
|
||||
if 'file_to_promote' in chatbot._cookies: current = chatbot._cookies['file_to_promote']
|
||||
else: current = []
|
||||
chatbot._cookies.update({'file_to_promote': [new_path] + current})
|
||||
|
||||
def on_file_uploaded(files, chatbot, txt, txt2, checkboxes):
|
||||
"""
|
||||
@@ -459,14 +490,20 @@ def on_file_uploaded(files, chatbot, txt, txt2, checkboxes):
|
||||
return chatbot, txt, txt2
|
||||
|
||||
|
||||
def on_report_generated(files, chatbot):
|
||||
def on_report_generated(cookies, files, chatbot):
|
||||
from toolbox import find_recent_files
|
||||
report_files = find_recent_files('gpt_log')
|
||||
if 'file_to_promote' in cookies:
|
||||
report_files = cookies['file_to_promote']
|
||||
cookies.pop('file_to_promote')
|
||||
else:
|
||||
report_files = find_recent_files('gpt_log')
|
||||
if len(report_files) == 0:
|
||||
return None, chatbot
|
||||
# files.extend(report_files)
|
||||
chatbot.append(['报告如何远程获取?', '报告已经添加到右侧“文件上传区”(可能处于折叠状态),请查收。'])
|
||||
return report_files, chatbot
|
||||
file_links = ''
|
||||
for f in report_files: file_links += f'<br/><a href="file={os.path.abspath(f)}" target="_blank">{f}</a>'
|
||||
chatbot.append(['报告如何远程获取?', f'报告已经添加到右侧“文件上传区”(可能处于折叠状态),请查收。{file_links}'])
|
||||
return cookies, report_files, chatbot
|
||||
|
||||
def is_openai_api_key(key):
|
||||
API_MATCH_ORIGINAL = re.match(r"sk-[a-zA-Z0-9]{48}$", key)
|
||||
@@ -728,6 +765,8 @@ def clip_history(inputs, history, tokenizer, max_token_limit):
|
||||
其他小工具:
|
||||
- zip_folder: 把某个路径下所有文件压缩,然后转移到指定的另一个路径中(gpt写的)
|
||||
- gen_time_str: 生成时间戳
|
||||
- ProxyNetworkActivate: 临时地启动代理网络(如果有)
|
||||
- objdump/objload: 快捷的调试函数
|
||||
========================================================================
|
||||
"""
|
||||
|
||||
@@ -762,11 +801,16 @@ def zip_folder(source_folder, dest_folder, zip_name):
|
||||
|
||||
print(f"Zip file created at {zip_file}")
|
||||
|
||||
def zip_result(folder):
|
||||
import time
|
||||
t = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
|
||||
zip_folder(folder, './gpt_log/', f'{t}-result.zip')
|
||||
return pj('./gpt_log/', f'{t}-result.zip')
|
||||
|
||||
def gen_time_str():
|
||||
import time
|
||||
return time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
|
||||
|
||||
|
||||
class ProxyNetworkActivate():
|
||||
"""
|
||||
这段代码定义了一个名为TempProxy的空上下文管理器, 用于给一小段代码上代理
|
||||
@@ -775,12 +819,27 @@ class ProxyNetworkActivate():
|
||||
from toolbox import get_conf
|
||||
proxies, = get_conf('proxies')
|
||||
if 'no_proxy' in os.environ: os.environ.pop('no_proxy')
|
||||
os.environ['HTTP_PROXY'] = proxies['http']
|
||||
os.environ['HTTPS_PROXY'] = proxies['https']
|
||||
if proxies is not None:
|
||||
if 'http' in proxies: os.environ['HTTP_PROXY'] = proxies['http']
|
||||
if 'https' in proxies: os.environ['HTTPS_PROXY'] = proxies['https']
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
os.environ['no_proxy'] = '*'
|
||||
if 'HTTP_PROXY' in os.environ: os.environ.pop('HTTP_PROXY')
|
||||
if 'HTTPS_PROXY' in os.environ: os.environ.pop('HTTPS_PROXY')
|
||||
return
|
||||
return
|
||||
|
||||
def objdump(obj, file='objdump.tmp'):
|
||||
import pickle
|
||||
with open(file, 'wb+') as f:
|
||||
pickle.dump(obj, f)
|
||||
return
|
||||
|
||||
def objload(file='objdump.tmp'):
|
||||
import pickle, os
|
||||
if not os.path.exists(file):
|
||||
return
|
||||
with open(file, 'rb') as f:
|
||||
return pickle.load(f)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user