from typing import List, Dict, Tuple
import asyncio
from dataclasses import dataclass
from toolbox import CatchException, update_ui, promote_file_to_downloadzone, get_log_folder, get_user
from toolbox import update_ui, CatchException, report_exception, write_history_to_file
from crazy_functions.paper_fns.auto_git.query_analyzer import QueryAnalyzer, SearchCriteria
from crazy_functions.paper_fns.auto_git.handlers.repo_handler import RepositoryHandler
from crazy_functions.paper_fns.auto_git.handlers.code_handler import CodeSearchHandler
from crazy_functions.paper_fns.auto_git.handlers.user_handler import UserSearchHandler
from crazy_functions.paper_fns.auto_git.handlers.topic_handler import TopicHandler
from crazy_functions.paper_fns.auto_git.sources.github_source import GitHubSource
from crazy_functions.crazy_utils import request_gpt_model_in_new_thread_with_ui_alive
import re
from datetime import datetime
import os
import json
from pathlib import Path
import time
# 导入格式化器
from crazy_functions.paper_fns.file2file_doc import (
TxtFormatter,
MarkdownFormatter,
HtmlFormatter,
WordFormatter
)
from crazy_functions.paper_fns.file2file_doc.word2pdf import WordToPdfConverter
@CatchException
def GitHub项目智能检索(txt: str, llm_kwargs: Dict, plugin_kwargs: Dict, chatbot: List,
history: List, system_prompt: str, user_request: str):
"""GitHub项目智能检索主函数"""
# 初始化GitHub API调用源
github_source = GitHubSource(api_key=plugin_kwargs.get("github_api_key"))
# 初始化处理器
handlers = {
"repo": RepositoryHandler(github_source, llm_kwargs),
"code": CodeSearchHandler(github_source, llm_kwargs),
"user": UserSearchHandler(github_source, llm_kwargs),
"topic": TopicHandler(github_source, llm_kwargs),
}
# 分析查询意图
chatbot.append(["分析查询意图", "正在分析您的查询需求..."])
yield from update_ui(chatbot=chatbot, history=history)
query_analyzer = QueryAnalyzer()
search_criteria = yield from query_analyzer.analyze_query(
txt, chatbot, llm_kwargs
)
# 根据查询类型选择处理器
handler = handlers.get(search_criteria.query_type)
if not handler:
handler = handlers["repo"] # 默认使用仓库处理器
# 处理查询
chatbot.append(["开始搜索", f"使用{handler.__class__.__name__}处理您的请求,正在搜索GitHub..."])
yield from update_ui(chatbot=chatbot, history=history)
final_prompt = asyncio.run(handler.handle(
criteria=search_criteria,
chatbot=chatbot,
history=history,
system_prompt=system_prompt,
llm_kwargs=llm_kwargs,
plugin_kwargs=plugin_kwargs
))
if final_prompt:
# 检查是否是道歉提示
if "很抱歉,我们未能找到" in final_prompt:
chatbot.append([txt, final_prompt])
yield from update_ui(chatbot=chatbot, history=history)
return
# 在 final_prompt 末尾添加用户原始查询要求
final_prompt += f"""
原始用户查询: "{txt}"
重要提示:
- 你的回答必须直接满足用户的原始查询要求
- 在遵循之前指南的同时,优先回答用户明确提出的问题
- 确保回答格式和内容与用户期望一致
- 对于GitHub仓库需要提供链接地址, 回复中请采用以下格式的HTML链接:
* 对于GitHub仓库: 仓库名
- 不要生成参考列表,引用信息将另行处理
"""
# 使用最终的prompt生成回答
response = yield from request_gpt_model_in_new_thread_with_ui_alive(
inputs=final_prompt,
inputs_show_user=txt,
llm_kwargs=llm_kwargs,
chatbot=chatbot,
history=[],
sys_prompt=f"你是一个熟悉GitHub生态系统的专业助手,能帮助用户找到合适的项目、代码和开发者。除非用户指定,否则请使用中文回复。"
)
# 1. 获取项目列表
repos_list = handler.ranked_repos # 直接使用原始仓库数据
# 在新的对话中添加格式化的仓库参考列表
if repos_list:
references = ""
for idx, repo in enumerate(repos_list, 1):
# 构建仓库引用
stars_str = f"⭐ {repo.get('stargazers_count', 'N/A')}" if repo.get('stargazers_count') else ""
forks_str = f"🍴 {repo.get('forks_count', 'N/A')}" if repo.get('forks_count') else ""
stats = f"{stars_str} {forks_str}".strip()
stats = f" ({stats})" if stats else ""
language = f" [{repo.get('language', '')}]" if repo.get('language') else ""
reference = f"[{idx}] **{repo.get('name', '')}**{language}{stats} \n"
reference += f"👤 {repo.get('owner', {}).get('login', 'N/A') if repo.get('owner') is not None else 'N/A'} | "
reference += f"📅 {repo.get('updated_at', 'N/A')[:10]} | "
reference += f"GitHub \n"
if repo.get('description'):
reference += f"{repo.get('description')} \n"
reference += " \n"
references += reference
# 添加新的对话显示参考仓库
chatbot.append(["推荐项目如下:", references])
yield from update_ui(chatbot=chatbot, history=history)
# 2. 保存结果到文件
# 创建保存目录
save_dir = get_log_folder(get_user(chatbot), plugin_name='github_search')
if not os.path.exists(save_dir):
os.makedirs(save_dir)
# 生成文件名
def get_safe_filename(txt, max_length=10):
# 获取文本前max_length个字符作为文件名
filename = txt[:max_length].strip()
# 移除不安全的文件名字符
filename = re.sub(r'[\\/:*?"<>|]', '', filename)
# 如果文件名为空,使用时间戳
if not filename:
filename = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
return filename
base_filename = get_safe_filename(txt)
# 准备保存的内容 - 优化文档结构
md_content = f"# GitHub搜索结果: {txt}\n\n"
md_content += f"搜索时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
# 添加模型回复
md_content += "## 搜索分析与总结\n\n"
md_content += response + "\n\n"
# 添加所有搜索到的仓库详细信息
md_content += "## 推荐项目详情\n\n"
if not repos_list:
md_content += "未找到匹配的项目\n\n"
else:
md_content += f"共找到 {len(repos_list)} 个相关项目\n\n"
# 添加项目简表
md_content += "### 项目一览表\n\n"
md_content += "| 序号 | 项目名称 | 作者 | 语言 | 星标数 | 更新时间 |\n"
md_content += "| ---- | -------- | ---- | ---- | ------ | -------- |\n"
for idx, repo in enumerate(repos_list, 1):
md_content += f"| {idx} | [{repo.get('name', '')}]({repo.get('html_url', '')}) | {repo.get('owner', {}).get('login', 'N/A') if repo.get('owner') is not None else 'N/A'} | {repo.get('language', 'N/A')} | {repo.get('stargazers_count', 'N/A')} | {repo.get('updated_at', 'N/A')[:10]} |\n"
md_content += "\n"
# 添加详细项目信息
md_content += "### 项目详细信息\n\n"
for idx, repo in enumerate(repos_list, 1):
md_content += f"#### {idx}. {repo.get('name', '')}\n\n"
md_content += f"- **仓库**: [{repo.get('full_name', '')}]({repo.get('html_url', '')})\n"
md_content += f"- **作者**: [{repo.get('owner', {}).get('login', '') if repo.get('owner') is not None else 'N/A'}]({repo.get('owner', {}).get('html_url', '') if repo.get('owner') is not None else '#'})\n"
md_content += f"- **描述**: {repo.get('description', 'N/A')}\n"
md_content += f"- **语言**: {repo.get('language', 'N/A')}\n"
md_content += f"- **星标**: {repo.get('stargazers_count', 'N/A')}\n"
md_content += f"- **Fork数**: {repo.get('forks_count', 'N/A')}\n"
md_content += f"- **最近更新**: {repo.get('updated_at', 'N/A')[:10]}\n"
md_content += f"- **创建时间**: {repo.get('created_at', 'N/A')[:10]}\n"
md_content += f"- **开源许可**: {repo.get('license', {}).get('name', 'N/A') if repo.get('license') is not None else 'N/A'}\n"
if repo.get('topics'):
md_content += f"- **主题标签**: {', '.join(repo.get('topics', []))}\n"
if repo.get('homepage'):
md_content += f"- **项目主页**: [{repo.get('homepage')}]({repo.get('homepage')})\n"
md_content += "\n"
# 添加查询信息和元数据
md_content += "## 查询元数据\n\n"
md_content += f"- **原始查询**: {txt}\n"
md_content += f"- **查询类型**: {search_criteria.query_type}\n"
md_content += f"- **关键词**: {', '.join(search_criteria.keywords) if hasattr(search_criteria, 'keywords') and search_criteria.keywords else 'N/A'}\n"
md_content += f"- **搜索日期**: {datetime.now().strftime('%Y-%m-%d')}\n\n"
# 保存为多种格式
saved_files = []
failed_files = []
# 1. 保存为TXT
try:
txt_formatter = TxtFormatter()
txt_content = txt_formatter.create_document(md_content)
txt_file = os.path.join(save_dir, f"github_results_{base_filename}.txt")
with open(txt_file, 'w', encoding='utf-8') as f:
f.write(txt_content)
promote_file_to_downloadzone(txt_file, chatbot=chatbot)
saved_files.append("TXT")
except Exception as e:
failed_files.append(f"TXT (错误: {str(e)})")
# 2. 保存为Markdown
try:
md_formatter = MarkdownFormatter()
formatted_md_content = md_formatter.create_document(md_content, "GitHub项目搜索")
md_file = os.path.join(save_dir, f"github_results_{base_filename}.md")
with open(md_file, 'w', encoding='utf-8') as f:
f.write(formatted_md_content)
promote_file_to_downloadzone(md_file, chatbot=chatbot)
saved_files.append("Markdown")
except Exception as e:
failed_files.append(f"Markdown (错误: {str(e)})")
# 3. 保存为HTML
try:
html_formatter = HtmlFormatter(processing_type="GitHub项目搜索")
html_content = html_formatter.create_document(md_content)
html_file = os.path.join(save_dir, f"github_results_{base_filename}.html")
with open(html_file, 'w', encoding='utf-8') as f:
f.write(html_content)
promote_file_to_downloadzone(html_file, chatbot=chatbot)
saved_files.append("HTML")
except Exception as e:
failed_files.append(f"HTML (错误: {str(e)})")
# 4. 保存为Word
word_file = None
try:
word_formatter = WordFormatter()
doc = word_formatter.create_document(md_content, "GitHub项目搜索")
word_file = os.path.join(save_dir, f"github_results_{base_filename}.docx")
doc.save(word_file)
promote_file_to_downloadzone(word_file, chatbot=chatbot)
saved_files.append("Word")
except Exception as e:
failed_files.append(f"Word (错误: {str(e)})")
word_file = None
# 5. 保存为PDF (仅当Word保存成功时)
if word_file and os.path.exists(word_file):
try:
pdf_file = WordToPdfConverter.convert_to_pdf(word_file)
promote_file_to_downloadzone(pdf_file, chatbot=chatbot)
saved_files.append("PDF")
except Exception as e:
failed_files.append(f"PDF (错误: {str(e)})")
# 报告保存结果
if saved_files:
success_message = f"成功保存以下格式: {', '.join(saved_files)}"
if failed_files:
failure_message = f"以下格式保存失败: {', '.join(failed_files)}"
chatbot.append(["部分格式保存成功", f"{success_message}。{failure_message}"])
else:
chatbot.append(["所有格式保存成功", success_message])
else:
chatbot.append(["保存失败", f"所有格式均保存失败: {', '.join(failed_files)}"])
else:
report_exception(chatbot, history, a=f"处理失败", b=f"请尝试其他查询")
yield from update_ui(chatbot=chatbot, history=history)