把上传文件路径和日志路径修改为统一可配置的变量

This commit is contained in:
binary-husky
2023-09-14 00:51:25 +08:00
parent 14de282302
commit ec9d030457
30 changed files with 193 additions and 329 deletions

View File

@@ -5,6 +5,8 @@ import inspect
import re
import os
import gradio
import shutil
import glob
from latex2mathml.converter import convert as tex2mathml
from functools import wraps, lru_cache
pj = os.path.join
@@ -171,7 +173,7 @@ def HotReload(f):
========================================================================
第二部分
其他小工具:
- write_results_to_file: 将结果写入markdown文件中
- write_history_to_file: 将结果写入markdown文件中
- regular_txt_to_markdown: 将普通文本转换为Markdown格式的文本。
- report_execption: 向chatbot中添加简单的意外错误信息
- text_divide_paragraph: 将文本按照段落分隔符分割开生成带有段落标签的HTML代码。
@@ -203,36 +205,6 @@ def get_reduce_token_percent(text):
return 0.5, '不详'
def write_results_to_file(history, file_name=None):
"""
将对话记录history以Markdown格式写入文件中。如果没有指定文件名则使用当前时间生成文件名。
"""
import os
import time
if file_name is None:
# file_name = time.strftime("chatGPT分析报告%Y-%m-%d-%H-%M-%S", time.localtime()) + '.md'
file_name = 'GPT-Report-' + gen_time_str() + '.md'
os.makedirs('./gpt_log/', exist_ok=True)
with open(f'./gpt_log/{file_name}', 'w', encoding='utf8') as f:
f.write('# GPT-Academic Report\n')
for i, content in enumerate(history):
try:
if type(content) != str: content = str(content)
except:
continue
if i % 2 == 0:
f.write('## ')
try:
f.write(content)
except:
# remove everything that cannot be handled by utf8
f.write(content.encode('utf-8', 'ignore').decode())
f.write('\n\n')
res = '以上材料已经被写入:\t' + os.path.abspath(f'./gpt_log/{file_name}')
print(res)
return res
def write_history_to_file(history, file_basename=None, file_fullname=None):
"""
将对话记录history以Markdown格式写入文件中。如果没有指定文件名则使用当前时间生成文件名。
@@ -241,9 +213,9 @@ def write_history_to_file(history, file_basename=None, file_fullname=None):
import time
if file_fullname is None:
if file_basename is not None:
file_fullname = os.path.join(get_log_folder(), file_basename)
file_fullname = pj(get_log_folder(), file_basename)
else:
file_fullname = os.path.join(get_log_folder(), f'GPT-Academic-{gen_time_str()}.md')
file_fullname = pj(get_log_folder(), f'GPT-Academic-{gen_time_str()}.md')
os.makedirs(os.path.dirname(file_fullname), exist_ok=True)
with open(file_fullname, 'w', encoding='utf8') as f:
f.write('# GPT-Academic Report\n')
@@ -519,7 +491,7 @@ def find_recent_files(directory):
if not os.path.exists(directory):
os.makedirs(directory, exist_ok=True)
for filename in os.listdir(directory):
file_path = os.path.join(directory, filename)
file_path = pj(directory, filename)
if file_path.endswith('.log'):
continue
created_time = os.path.getmtime(file_path)
@@ -534,7 +506,7 @@ def promote_file_to_downloadzone(file, rename_file=None, chatbot=None):
# 将文件复制一份到下载区
import shutil
if rename_file is None: rename_file = f'{gen_time_str()}-{os.path.basename(file)}'
new_path = os.path.join(get_log_folder(), rename_file)
new_path = pj(get_log_folder(), rename_file)
# 如果已经存在,先删除
if os.path.exists(new_path) and not os.path.samefile(new_path, file): os.remove(new_path)
# 把文件复制过去
@@ -549,44 +521,70 @@ def disable_auto_promotion(chatbot):
chatbot._cookies.update({'files_to_promote': []})
return
def on_file_uploaded(files, chatbot, txt, txt2, checkboxes, cookies):
def is_the_upload_folder(string):
PATH_PRIVATE_UPLOAD, = get_conf('PATH_PRIVATE_UPLOAD')
pattern = r'^PATH_PRIVATE_UPLOAD/[A-Za-z0-9_-]+/\d{4}-\d{2}-\d{2}-\d{2}-\d{2}-\d{2}$'
pattern = pattern.replace('PATH_PRIVATE_UPLOAD', PATH_PRIVATE_UPLOAD)
if re.match(pattern, string): return True
else: return False
def del_outdated_uploads(outdate_time_seconds):
PATH_PRIVATE_UPLOAD, = get_conf('PATH_PRIVATE_UPLOAD')
current_time = time.time()
one_hour_ago = current_time - outdate_time_seconds
# Get a list of all subdirectories in the PATH_PRIVATE_UPLOAD folder
# Remove subdirectories that are older than one hour
for subdirectory in glob.glob(f'{PATH_PRIVATE_UPLOAD}/*/*'):
subdirectory_time = os.path.getmtime(subdirectory)
if subdirectory_time < one_hour_ago:
try: shutil.rmtree(subdirectory)
except: pass
return
def on_file_uploaded(request: gradio.Request, files, chatbot, txt, txt2, checkboxes, cookies):
"""
当文件被上传时的回调函数
"""
if len(files) == 0:
return chatbot, txt
import shutil
import os
import time
import glob
from toolbox import extract_archive
try:
shutil.rmtree('./private_upload/')
except:
pass
# 移除过时的旧文件从而节省空间&保护隐私
outdate_time_seconds = 60
del_outdated_uploads(outdate_time_seconds)
# 创建工作路径
user_name = "default" if not request.username else request.username
time_tag = gen_time_str()
os.makedirs(f'private_upload/{time_tag}', exist_ok=True)
err_msg = ''
PATH_PRIVATE_UPLOAD, = get_conf('PATH_PRIVATE_UPLOAD')
target_path_base = pj(PATH_PRIVATE_UPLOAD, user_name, time_tag)
os.makedirs(target_path_base, exist_ok=True)
# 逐个文件转移到目标路径
upload_msg = ''
for file in files:
file_origin_name = os.path.basename(file.orig_name)
shutil.copy(file.name, f'private_upload/{time_tag}/{file_origin_name}')
err_msg += extract_archive(f'private_upload/{time_tag}/{file_origin_name}',
dest_dir=f'private_upload/{time_tag}/{file_origin_name}.extract')
moved_files = [fp for fp in glob.glob('private_upload/**/*', recursive=True)]
if "底部输入区" in checkboxes:
txt = ""
txt2 = f'private_upload/{time_tag}'
this_file_path = pj(target_path_base, file_origin_name)
shutil.move(file.name, this_file_path)
upload_msg += extract_archive(file_path=this_file_path, dest_dir=this_file_path+'.extract')
# 整理文件集合
moved_files = [fp for fp in glob.glob(f'{target_path_base}/**/*', recursive=True)]
if "底部输入区" in checkboxes:
txt, txt2 = "", target_path_base
else:
txt = f'private_upload/{time_tag}'
txt2 = ""
txt, txt2 = target_path_base, ""
# 输出消息
moved_files_str = '\t\n\n'.join(moved_files)
chatbot.append(['我上传了文件,请查收',
chatbot.append(['我上传了文件,请查收',
f'[Local Message] 收到以下文件: \n\n{moved_files_str}' +
f'\n\n调用路径参数已自动修正到: \n\n{txt}' +
f'\n\n现在您点击任意函数插件时,以上文件将被作为输入参数'+err_msg])
f'\n\n现在您点击任意函数插件时,以上文件将被作为输入参数'+upload_msg])
# 记录近期文件
cookies.update({
'most_recent_uploaded': {
'path': f'private_upload/{time_tag}',
'path': target_path_base,
'time': time.time(),
'time_str': time_tag
}})
@@ -595,11 +593,12 @@ def on_file_uploaded(files, chatbot, txt, txt2, checkboxes, cookies):
def on_report_generated(cookies, files, chatbot):
from toolbox import find_recent_files
PATH_LOGGING, = get_conf('PATH_LOGGING')
if 'files_to_promote' in cookies:
report_files = cookies['files_to_promote']
cookies.pop('files_to_promote')
else:
report_files = find_recent_files('gpt_log')
report_files = find_recent_files(PATH_LOGGING)
if len(report_files) == 0:
return cookies, None, chatbot
# files.extend(report_files)
@@ -909,34 +908,35 @@ def zip_folder(source_folder, dest_folder, zip_name):
return
# Create the name for the zip file
zip_file = os.path.join(dest_folder, zip_name)
zip_file = pj(dest_folder, zip_name)
# Create a ZipFile object
with zipfile.ZipFile(zip_file, 'w', zipfile.ZIP_DEFLATED) as zipf:
# Walk through the source folder and add files to the zip file
for foldername, subfolders, filenames in os.walk(source_folder):
for filename in filenames:
filepath = os.path.join(foldername, filename)
filepath = pj(foldername, filename)
zipf.write(filepath, arcname=os.path.relpath(filepath, source_folder))
# Move the zip file to the destination folder (if it wasn't already there)
if os.path.dirname(zip_file) != dest_folder:
os.rename(zip_file, os.path.join(dest_folder, os.path.basename(zip_file)))
zip_file = os.path.join(dest_folder, os.path.basename(zip_file))
os.rename(zip_file, pj(dest_folder, os.path.basename(zip_file)))
zip_file = pj(dest_folder, os.path.basename(zip_file))
print(f"Zip file created at {zip_file}")
def zip_result(folder):
t = gen_time_str()
zip_folder(folder, './gpt_log/', f'{t}-result.zip')
return pj('./gpt_log/', f'{t}-result.zip')
zip_folder(folder, get_log_folder(), f'{t}-result.zip')
return pj(get_log_folder(), f'{t}-result.zip')
def gen_time_str():
import time
return time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
def get_log_folder(user='default', plugin_name='shared'):
_dir = os.path.join(os.path.dirname(__file__), 'gpt_log', user, plugin_name)
PATH_LOGGING, = get_conf('PATH_LOGGING')
_dir = pj(PATH_LOGGING, user, plugin_name)
if not os.path.exists(_dir): os.makedirs(_dir)
return _dir