implement doc_fns

This commit is contained in:
binary-husky
2025-06-04 00:20:09 +08:00
parent 725f60fba3
commit f42aad5093
21 changed files with 5828 additions and 0 deletions

View File

@@ -0,0 +1,812 @@
import os
import time
from abc import ABC, abstractmethod
from datetime import datetime
from docx import Document
from docx.enum.style import WD_STYLE_TYPE
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT, WD_LINE_SPACING
from docx.oxml.ns import qn
from docx.shared import Inches, Cm
from docx.shared import Pt, RGBColor, Inches
from typing import Dict, List, Tuple
import markdown
from crazy_functions.doc_fns.conversation_doc.word_doc import convert_markdown_to_word
class DocumentFormatter(ABC):
"""文档格式化基类,定义文档格式化的基本接口"""
def __init__(self, final_summary: str, file_summaries_map: Dict, failed_files: List[Tuple]):
self.final_summary = final_summary
self.file_summaries_map = file_summaries_map
self.failed_files = failed_files
@abstractmethod
def format_failed_files(self) -> str:
"""格式化失败文件列表"""
pass
@abstractmethod
def format_file_summaries(self) -> str:
"""格式化文件总结内容"""
pass
@abstractmethod
def create_document(self) -> str:
"""创建完整文档"""
pass
class WordFormatter(DocumentFormatter):
"""Word格式文档生成器 - 符合中国政府公文格式规范(GB/T 9704-2012),并进行了优化"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.doc = Document()
self._setup_document()
self._create_styles()
# 初始化三级标题编号系统
self.numbers = {
1: 0, # 一级标题编号
2: 0, # 二级标题编号
3: 0 # 三级标题编号
}
def _setup_document(self):
"""设置文档基本格式,包括页面设置和页眉"""
sections = self.doc.sections
for section in sections:
# 设置页面大小为A4
section.page_width = Cm(21)
section.page_height = Cm(29.7)
# 设置页边距
section.top_margin = Cm(3.7) # 上边距37mm
section.bottom_margin = Cm(3.5) # 下边距35mm
section.left_margin = Cm(2.8) # 左边距28mm
section.right_margin = Cm(2.6) # 右边距26mm
# 设置页眉页脚距离
section.header_distance = Cm(2.0)
section.footer_distance = Cm(2.0)
# 添加页眉
header = section.header
header_para = header.paragraphs[0]
header_para.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
header_run = header_para.add_run("该文档由GPT-academic生成")
header_run.font.name = '仿宋'
header_run._element.rPr.rFonts.set(qn('w:eastAsia'), '仿宋')
header_run.font.size = Pt(9)
def _create_styles(self):
"""创建文档样式"""
# 创建正文样式
style = self.doc.styles.add_style('Normal_Custom', WD_STYLE_TYPE.PARAGRAPH)
style.font.name = '仿宋'
style._element.rPr.rFonts.set(qn('w:eastAsia'), '仿宋')
style.font.size = Pt(14)
style.paragraph_format.line_spacing_rule = WD_LINE_SPACING.ONE_POINT_FIVE
style.paragraph_format.space_after = Pt(0)
style.paragraph_format.first_line_indent = Pt(28)
# 创建各级标题样式
self._create_heading_style('Title_Custom', '方正小标宋简体', 32, WD_PARAGRAPH_ALIGNMENT.CENTER)
self._create_heading_style('Heading1_Custom', '黑体', 22, WD_PARAGRAPH_ALIGNMENT.LEFT)
self._create_heading_style('Heading2_Custom', '黑体', 18, WD_PARAGRAPH_ALIGNMENT.LEFT)
self._create_heading_style('Heading3_Custom', '黑体', 16, WD_PARAGRAPH_ALIGNMENT.LEFT)
def _create_heading_style(self, style_name: str, font_name: str, font_size: int, alignment):
"""创建标题样式"""
style = self.doc.styles.add_style(style_name, WD_STYLE_TYPE.PARAGRAPH)
style.font.name = font_name
style._element.rPr.rFonts.set(qn('w:eastAsia'), font_name)
style.font.size = Pt(font_size)
style.font.bold = True
style.paragraph_format.alignment = alignment
style.paragraph_format.space_before = Pt(12)
style.paragraph_format.space_after = Pt(12)
style.paragraph_format.line_spacing_rule = WD_LINE_SPACING.ONE_POINT_FIVE
return style
def _get_heading_number(self, level: int) -> str:
"""
生成标题编号
Args:
level: 标题级别 (0-3)
Returns:
str: 格式化的标题编号
"""
if level == 0: # 主标题不需要编号
return ""
self.numbers[level] += 1 # 增加当前级别的编号
# 重置下级标题编号
for i in range(level + 1, 4):
self.numbers[i] = 0
# 根据级别返回不同格式的编号
if level == 1:
return f"{self.numbers[1]}. "
elif level == 2:
return f"{self.numbers[1]}.{self.numbers[2]} "
elif level == 3:
return f"{self.numbers[1]}.{self.numbers[2]}.{self.numbers[3]} "
return ""
def _add_heading(self, text: str, level: int):
"""
添加带编号的标题
Args:
text: 标题文本
level: 标题级别 (0-3)
"""
style_map = {
0: 'Title_Custom',
1: 'Heading1_Custom',
2: 'Heading2_Custom',
3: 'Heading3_Custom'
}
number = self._get_heading_number(level)
paragraph = self.doc.add_paragraph(style=style_map[level])
if number:
number_run = paragraph.add_run(number)
font_size = 22 if level == 1 else (18 if level == 2 else 16)
self._get_run_style(number_run, '黑体', font_size, True)
text_run = paragraph.add_run(text)
font_size = 32 if level == 0 else (22 if level == 1 else (18 if level == 2 else 16))
self._get_run_style(text_run, '黑体', font_size, True)
# 主标题添加日期
if level == 0:
date_paragraph = self.doc.add_paragraph()
date_paragraph.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
date_run = date_paragraph.add_run(datetime.now().strftime('%Y年%m月%d'))
self._get_run_style(date_run, '仿宋', 16, False)
return paragraph
def _get_run_style(self, run, font_name: str, font_size: int, bold: bool = False):
"""设置文本运行对象的样式"""
run.font.name = font_name
run._element.rPr.rFonts.set(qn('w:eastAsia'), font_name)
run.font.size = Pt(font_size)
run.font.bold = bold
def format_failed_files(self) -> str:
"""格式化失败文件列表"""
result = []
if not self.failed_files:
return "\n".join(result)
result.append("处理失败文件:")
for fp, reason in self.failed_files:
result.append(f"{os.path.basename(fp)}: {reason}")
self._add_heading("处理失败文件", 1)
for fp, reason in self.failed_files:
self._add_content(f"{os.path.basename(fp)}: {reason}", indent=False)
self.doc.add_paragraph()
return "\n".join(result)
def _add_content(self, text: str, indent: bool = True):
"""添加正文内容使用convert_markdown_to_word处理文本"""
# 使用convert_markdown_to_word处理markdown文本
processed_text = convert_markdown_to_word(text)
paragraph = self.doc.add_paragraph(processed_text, style='Normal_Custom')
if not indent:
paragraph.paragraph_format.first_line_indent = Pt(0)
return paragraph
def format_file_summaries(self) -> str:
"""
格式化文件总结内容确保正确的标题层级并处理markdown文本
"""
result = []
# 首先对文件路径进行分组整理
file_groups = {}
for path in sorted(self.file_summaries_map.keys()):
dir_path = os.path.dirname(path)
if dir_path not in file_groups:
file_groups[dir_path] = []
file_groups[dir_path].append(path)
# 处理没有目录的文件
root_files = file_groups.get("", [])
if root_files:
for path in sorted(root_files):
file_name = os.path.basename(path)
result.append(f"\n📄 {file_name}")
result.append(self.file_summaries_map[path])
# 无目录的文件作为二级标题
self._add_heading(f"📄 {file_name}", 2)
# 使用convert_markdown_to_word处理文件内容
self._add_content(convert_markdown_to_word(self.file_summaries_map[path]))
self.doc.add_paragraph()
# 处理有目录的文件
for dir_path in sorted(file_groups.keys()):
if dir_path == "": # 跳过已处理的根目录文件
continue
# 添加目录作为二级标题
result.append(f"\n📁 {dir_path}")
self._add_heading(f"📁 {dir_path}", 2)
# 该目录下的所有文件作为三级标题
for path in sorted(file_groups[dir_path]):
file_name = os.path.basename(path)
result.append(f"\n📄 {file_name}")
result.append(self.file_summaries_map[path])
# 添加文件名作为三级标题
self._add_heading(f"📄 {file_name}", 3)
# 使用convert_markdown_to_word处理文件内容
self._add_content(convert_markdown_to_word(self.file_summaries_map[path]))
self.doc.add_paragraph()
return "\n".join(result)
def create_document(self):
"""创建完整Word文档并返回文档对象"""
# 重置所有编号
for level in self.numbers:
self.numbers[level] = 0
# 添加主标题
self._add_heading("文档总结报告", 0)
self.doc.add_paragraph()
# 添加总体摘要使用convert_markdown_to_word处理
self._add_heading("总体摘要", 1)
self._add_content(convert_markdown_to_word(self.final_summary))
self.doc.add_paragraph()
# 添加失败文件列表(如果有)
if self.failed_files:
self.format_failed_files()
# 添加文件详细总结
self._add_heading("各文件详细总结", 1)
self.format_file_summaries()
return self.doc
def save_as_pdf(self, word_path, pdf_path=None):
"""将生成的Word文档转换为PDF
参数:
word_path: Word文档的路径
pdf_path: 可选PDF文件的输出路径。如果未指定将使用与Word文档相同的名称和位置
返回:
生成的PDF文件路径如果转换失败则返回None
"""
from crazy_functions.doc_fns.conversation_doc.word2pdf import WordToPdfConverter
try:
pdf_path = WordToPdfConverter.convert_to_pdf(word_path, pdf_path)
return pdf_path
except Exception as e:
print(f"PDF转换失败: {str(e)}")
return None
class MarkdownFormatter(DocumentFormatter):
"""Markdown格式文档生成器"""
def format_failed_files(self) -> str:
if not self.failed_files:
return ""
formatted_text = ["\n## ⚠️ 处理失败的文件"]
for fp, reason in self.failed_files:
formatted_text.append(f"- {os.path.basename(fp)}: {reason}")
formatted_text.append("\n---")
return "\n".join(formatted_text)
def format_file_summaries(self) -> str:
formatted_text = []
sorted_paths = sorted(self.file_summaries_map.keys())
current_dir = ""
for path in sorted_paths:
dir_path = os.path.dirname(path)
if dir_path != current_dir:
if dir_path:
formatted_text.append(f"\n## 📁 {dir_path}")
current_dir = dir_path
file_name = os.path.basename(path)
formatted_text.append(f"\n### 📄 {file_name}")
formatted_text.append(self.file_summaries_map[path])
formatted_text.append("\n---")
return "\n".join(formatted_text)
def create_document(self) -> str:
document = [
"# 📑 文档总结报告",
"\n## 总体摘要",
self.final_summary
]
if self.failed_files:
document.append(self.format_failed_files())
document.extend([
"\n# 📚 各文件详细总结",
self.format_file_summaries()
])
return "\n".join(document)
class HtmlFormatter(DocumentFormatter):
"""HTML格式文档生成器 - 优化版"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.md = markdown.Markdown(extensions=['extra','codehilite', 'tables','nl2br'])
self.css_styles = """
@keyframes fadeIn {
from { opacity: 0; transform: translateY(20px); }
to { opacity: 1; transform: translateY(0); }
}
@keyframes slideIn {
from { transform: translateX(-20px); opacity: 0; }
to { transform: translateX(0); opacity: 1; }
}
@keyframes pulse {
0% { transform: scale(1); }
50% { transform: scale(1.05); }
100% { transform: scale(1); }
}
:root {
/* Enhanced color palette */
--primary-color: #2563eb;
--primary-light: #eff6ff;
--secondary-color: #1e293b;
--background-color: #f8fafc;
--text-color: #334155;
--text-light: #64748b;
--border-color: #e2e8f0;
--error-color: #ef4444;
--error-light: #fef2f2;
--success-color: #22c55e;
--warning-color: #f59e0b;
--card-shadow: 0 4px 6px -1px rgb(0 0 0 / 0.1), 0 2px 4px -2px rgb(0 0 0 / 0.1);
--hover-shadow: 0 20px 25px -5px rgb(0 0 0 / 0.1), 0 8px 10px -6px rgb(0 0 0 / 0.1);
/* Typography */
--heading-font: "Plus Jakarta Sans", system-ui, sans-serif;
--body-font: "Inter", system-ui, sans-serif;
}
body {
font-family: var(--body-font);
line-height: 1.8;
max-width: 1200px;
margin: 0 auto;
padding: 2rem;
color: var(--text-color);
background-color: var(--background-color);
font-size: 16px;
-webkit-font-smoothing: antialiased;
}
.container {
background: white;
padding: 3rem;
border-radius: 24px;
box-shadow: var(--card-shadow);
transition: all 0.4s cubic-bezier(0.4, 0, 0.2, 1);
animation: fadeIn 0.6s ease-out;
border: 1px solid var(--border-color);
}
.container:hover {
box-shadow: var(--hover-shadow);
transform: translateY(-2px);
}
h1, h2, h3 {
font-family: var(--heading-font);
font-weight: 600;
}
h1 {
color: var(--primary-color);
font-size: 2.8em;
text-align: center;
margin: 2rem 0 3rem;
padding-bottom: 1.5rem;
border-bottom: 3px solid var(--primary-color);
letter-spacing: -0.03em;
position: relative;
display: flex;
align-items: center;
justify-content: center;
gap: 1rem;
}
h1::after {
content: '';
position: absolute;
bottom: -3px;
left: 50%;
transform: translateX(-50%);
width: 120px;
height: 3px;
background: linear-gradient(90deg, var(--primary-color), var(--primary-light));
border-radius: 3px;
transition: width 0.3s ease;
}
h1:hover::after {
width: 180px;
}
h2 {
color: var(--secondary-color);
font-size: 1.9em;
margin: 2.5rem 0 1.5rem;
padding-left: 1.2rem;
border-left: 4px solid var(--primary-color);
letter-spacing: -0.02em;
display: flex;
align-items: center;
gap: 1rem;
transition: all 0.3s ease;
}
h2:hover {
color: var(--primary-color);
transform: translateX(5px);
}
h3 {
color: var(--text-color);
font-size: 1.5em;
margin: 2rem 0 1rem;
padding-bottom: 0.8rem;
border-bottom: 2px solid var(--border-color);
transition: all 0.3s ease;
display: flex;
align-items: center;
gap: 0.8rem;
}
h3:hover {
color: var(--primary-color);
border-bottom-color: var(--primary-color);
}
.summary {
background: var(--primary-light);
padding: 2.5rem;
border-radius: 16px;
margin: 2.5rem 0;
box-shadow: 0 4px 6px -1px rgba(37, 99, 235, 0.1);
position: relative;
overflow: hidden;
transition: transform 0.3s ease, box-shadow 0.3s ease;
animation: slideIn 0.5s ease-out;
}
.summary:hover {
transform: translateY(-3px);
box-shadow: 0 8px 12px -2px rgba(37, 99, 235, 0.15);
}
.summary::before {
content: '';
position: absolute;
top: 0;
left: 0;
width: 4px;
height: 100%;
background: linear-gradient(to bottom, var(--primary-color), rgba(37, 99, 235, 0.6));
}
.summary p {
margin: 1.2rem 0;
line-height: 1.9;
color: var(--text-color);
transition: color 0.3s ease;
}
.summary:hover p {
color: var(--secondary-color);
}
.details {
margin-top: 3.5rem;
padding-top: 2.5rem;
border-top: 2px dashed var(--border-color);
animation: fadeIn 0.8s ease-out;
}
.failed-files {
background: var(--error-light);
padding: 2rem;
border-radius: 16px;
margin: 3rem 0;
border-left: 4px solid var(--error-color);
position: relative;
transition: all 0.3s ease;
animation: slideIn 0.5s ease-out;
}
.failed-files:hover {
transform: translateX(5px);
box-shadow: 0 8px 15px -3px rgba(239, 68, 68, 0.1);
}
.failed-files h2 {
color: var(--error-color);
border-left: none;
padding-left: 0;
}
.failed-files ul {
margin: 1.8rem 0;
padding-left: 1.2rem;
list-style-type: none;
}
.failed-files li {
margin: 1.2rem 0;
padding: 1.2rem 1.8rem;
background: rgba(239, 68, 68, 0.08);
border-radius: 12px;
transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1);
}
.failed-files li:hover {
transform: translateX(8px);
background: rgba(239, 68, 68, 0.12);
}
.directory-section {
margin: 3.5rem 0;
padding: 2rem;
background: var(--background-color);
border-radius: 16px;
position: relative;
transition: all 0.3s ease;
animation: fadeIn 0.6s ease-out;
}
.directory-section:hover {
background: white;
box-shadow: var(--card-shadow);
}
.file-summary {
background: white;
padding: 2rem;
margin: 1.8rem 0;
border-radius: 16px;
box-shadow: var(--card-shadow);
border-left: 4px solid var(--border-color);
transition: all 0.4s cubic-bezier(0.4, 0, 0.2, 1);
position: relative;
overflow: hidden;
}
.file-summary:hover {
border-left-color: var(--primary-color);
transform: translateX(8px) translateY(-2px);
box-shadow: var(--hover-shadow);
}
.file-summary {
background: white;
padding: 2rem;
margin: 1.8rem 0;
border-radius: 16px;
box-shadow: var(--card-shadow);
border-left: 4px solid var(--border-color);
transition: all 0.4s cubic-bezier(0.4, 0, 0.2, 1);
position: relative;
}
.file-summary:hover {
border-left-color: var(--primary-color);
transform: translateX(8px) translateY(-2px);
box-shadow: var(--hover-shadow);
}
.icon {
display: inline-flex;
align-items: center;
justify-content: center;
width: 32px;
height: 32px;
border-radius: 8px;
background: var(--primary-light);
color: var(--primary-color);
font-size: 1.2em;
transition: all 0.3s ease;
}
.file-summary:hover .icon,
.directory-section:hover .icon {
transform: scale(1.1);
background: var(--primary-color);
color: white;
}
/* Smooth scrolling */
html {
scroll-behavior: smooth;
}
/* Selection style */
::selection {
background: var(--primary-light);
color: var(--primary-color);
}
/* Print styles */
@media print {
body {
background: white;
}
.container {
box-shadow: none;
padding: 0;
}
.file-summary, .failed-files {
break-inside: avoid;
box-shadow: none;
}
.icon {
display: none;
}
}
/* Responsive design */
@media (max-width: 768px) {
body {
padding: 1rem;
font-size: 15px;
}
.container {
padding: 1.5rem;
}
h1 {
font-size: 2.2em;
margin: 1.5rem 0 2rem;
}
h2 {
font-size: 1.7em;
}
h3 {
font-size: 1.4em;
}
.summary, .failed-files, .directory-section {
padding: 1.5rem;
}
.file-summary {
padding: 1.2rem;
}
.icon {
width: 28px;
height: 28px;
}
}
/* Dark mode support */
@media (prefers-color-scheme: dark) {
:root {
--primary-light: rgba(37, 99, 235, 0.15);
--background-color: #0f172a;
--text-color: #e2e8f0;
--text-light: #94a3b8;
--border-color: #1e293b;
--error-light: rgba(239, 68, 68, 0.15);
}
.container, .file-summary {
background: #1e293b;
}
.directory-section {
background: #0f172a;
}
.directory-section:hover {
background: #1e293b;
}
}
"""
def format_failed_files(self) -> str:
if not self.failed_files:
return ""
failed_files_html = ['<div class="failed-files">']
failed_files_html.append('<h2><span class="icon">⚠️</span> 处理失败的文件</h2>')
failed_files_html.append("<ul>")
for fp, reason in self.failed_files:
failed_files_html.append(
f'<li><strong>📄 {os.path.basename(fp)}</strong><br><span style="color: var(--text-light)">{reason}</span></li>'
)
failed_files_html.append("</ul></div>")
return "\n".join(failed_files_html)
def format_file_summaries(self) -> str:
formatted_html = []
sorted_paths = sorted(self.file_summaries_map.keys())
current_dir = ""
for path in sorted_paths:
dir_path = os.path.dirname(path)
if dir_path != current_dir:
if dir_path:
formatted_html.append('<div class="directory-section">')
formatted_html.append(f'<h2><span class="icon">📁</span> {dir_path}</h2>')
formatted_html.append('</div>')
current_dir = dir_path
file_name = os.path.basename(path)
formatted_html.append('<div class="file-summary">')
formatted_html.append(f'<h3><span class="icon">📄</span> {file_name}</h3>')
formatted_html.append(self.md.convert(self.file_summaries_map[path]))
formatted_html.append('</div>')
return "\n".join(formatted_html)
def create_document(self) -> str:
"""生成HTML文档
Returns:
str: 完整的HTML文档字符串
"""
return f"""
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>文档总结报告</title>
<link href="https://cdnjs.cloudflare.com/ajax/libs/inter/3.19.3/inter.css" rel="stylesheet">
<link href="https://fonts.googleapis.com/css2?family=Plus+Jakarta+Sans:wght@400;600&display=swap" rel="stylesheet">
<style>{self.css_styles}</style>
</head>
<body>
<div class="container">
<h1><span class="icon">📑</span> 文档总结报告</h1>
<div class="summary">
<h2><span class="icon">📋</span> 总体摘要</h2>
<p>{self.md.convert(self.final_summary)}</p>
</div>
{self.format_failed_files()}
<div class="details">
<h2><span class="icon">📚</span> 各文件详细总结</h2>
{self.format_file_summaries()}
</div>
</div>
</body>
</html>
"""

View File

View File

@@ -0,0 +1,812 @@
import os
import time
from abc import ABC, abstractmethod
from datetime import datetime
from docx import Document
from docx.enum.style import WD_STYLE_TYPE
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT, WD_LINE_SPACING
from docx.oxml.ns import qn
from docx.shared import Inches, Cm
from docx.shared import Pt, RGBColor, Inches
from typing import Dict, List, Tuple
import markdown
from crazy_functions.doc_fns.conversation_doc.word_doc import convert_markdown_to_word
class DocumentFormatter(ABC):
"""文档格式化基类,定义文档格式化的基本接口"""
def __init__(self, final_summary: str, file_summaries_map: Dict, failed_files: List[Tuple]):
self.final_summary = final_summary
self.file_summaries_map = file_summaries_map
self.failed_files = failed_files
@abstractmethod
def format_failed_files(self) -> str:
"""格式化失败文件列表"""
pass
@abstractmethod
def format_file_summaries(self) -> str:
"""格式化文件总结内容"""
pass
@abstractmethod
def create_document(self) -> str:
"""创建完整文档"""
pass
class WordFormatter(DocumentFormatter):
"""Word格式文档生成器 - 符合中国政府公文格式规范(GB/T 9704-2012),并进行了优化"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.doc = Document()
self._setup_document()
self._create_styles()
# 初始化三级标题编号系统
self.numbers = {
1: 0, # 一级标题编号
2: 0, # 二级标题编号
3: 0 # 三级标题编号
}
def _setup_document(self):
"""设置文档基本格式,包括页面设置和页眉"""
sections = self.doc.sections
for section in sections:
# 设置页面大小为A4
section.page_width = Cm(21)
section.page_height = Cm(29.7)
# 设置页边距
section.top_margin = Cm(3.7) # 上边距37mm
section.bottom_margin = Cm(3.5) # 下边距35mm
section.left_margin = Cm(2.8) # 左边距28mm
section.right_margin = Cm(2.6) # 右边距26mm
# 设置页眉页脚距离
section.header_distance = Cm(2.0)
section.footer_distance = Cm(2.0)
# 添加页眉
header = section.header
header_para = header.paragraphs[0]
header_para.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
header_run = header_para.add_run("该文档由GPT-academic生成")
header_run.font.name = '仿宋'
header_run._element.rPr.rFonts.set(qn('w:eastAsia'), '仿宋')
header_run.font.size = Pt(9)
def _create_styles(self):
"""创建文档样式"""
# 创建正文样式
style = self.doc.styles.add_style('Normal_Custom', WD_STYLE_TYPE.PARAGRAPH)
style.font.name = '仿宋'
style._element.rPr.rFonts.set(qn('w:eastAsia'), '仿宋')
style.font.size = Pt(14)
style.paragraph_format.line_spacing_rule = WD_LINE_SPACING.ONE_POINT_FIVE
style.paragraph_format.space_after = Pt(0)
style.paragraph_format.first_line_indent = Pt(28)
# 创建各级标题样式
self._create_heading_style('Title_Custom', '方正小标宋简体', 32, WD_PARAGRAPH_ALIGNMENT.CENTER)
self._create_heading_style('Heading1_Custom', '黑体', 22, WD_PARAGRAPH_ALIGNMENT.LEFT)
self._create_heading_style('Heading2_Custom', '黑体', 18, WD_PARAGRAPH_ALIGNMENT.LEFT)
self._create_heading_style('Heading3_Custom', '黑体', 16, WD_PARAGRAPH_ALIGNMENT.LEFT)
def _create_heading_style(self, style_name: str, font_name: str, font_size: int, alignment):
"""创建标题样式"""
style = self.doc.styles.add_style(style_name, WD_STYLE_TYPE.PARAGRAPH)
style.font.name = font_name
style._element.rPr.rFonts.set(qn('w:eastAsia'), font_name)
style.font.size = Pt(font_size)
style.font.bold = True
style.paragraph_format.alignment = alignment
style.paragraph_format.space_before = Pt(12)
style.paragraph_format.space_after = Pt(12)
style.paragraph_format.line_spacing_rule = WD_LINE_SPACING.ONE_POINT_FIVE
return style
def _get_heading_number(self, level: int) -> str:
"""
生成标题编号
Args:
level: 标题级别 (0-3)
Returns:
str: 格式化的标题编号
"""
if level == 0: # 主标题不需要编号
return ""
self.numbers[level] += 1 # 增加当前级别的编号
# 重置下级标题编号
for i in range(level + 1, 4):
self.numbers[i] = 0
# 根据级别返回不同格式的编号
if level == 1:
return f"{self.numbers[1]}. "
elif level == 2:
return f"{self.numbers[1]}.{self.numbers[2]} "
elif level == 3:
return f"{self.numbers[1]}.{self.numbers[2]}.{self.numbers[3]} "
return ""
def _add_heading(self, text: str, level: int):
"""
添加带编号的标题
Args:
text: 标题文本
level: 标题级别 (0-3)
"""
style_map = {
0: 'Title_Custom',
1: 'Heading1_Custom',
2: 'Heading2_Custom',
3: 'Heading3_Custom'
}
number = self._get_heading_number(level)
paragraph = self.doc.add_paragraph(style=style_map[level])
if number:
number_run = paragraph.add_run(number)
font_size = 22 if level == 1 else (18 if level == 2 else 16)
self._get_run_style(number_run, '黑体', font_size, True)
text_run = paragraph.add_run(text)
font_size = 32 if level == 0 else (22 if level == 1 else (18 if level == 2 else 16))
self._get_run_style(text_run, '黑体', font_size, True)
# 主标题添加日期
if level == 0:
date_paragraph = self.doc.add_paragraph()
date_paragraph.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
date_run = date_paragraph.add_run(datetime.now().strftime('%Y年%m月%d'))
self._get_run_style(date_run, '仿宋', 16, False)
return paragraph
def _get_run_style(self, run, font_name: str, font_size: int, bold: bool = False):
"""设置文本运行对象的样式"""
run.font.name = font_name
run._element.rPr.rFonts.set(qn('w:eastAsia'), font_name)
run.font.size = Pt(font_size)
run.font.bold = bold
def format_failed_files(self) -> str:
"""格式化失败文件列表"""
result = []
if not self.failed_files:
return "\n".join(result)
result.append("处理失败文件:")
for fp, reason in self.failed_files:
result.append(f"{os.path.basename(fp)}: {reason}")
self._add_heading("处理失败文件", 1)
for fp, reason in self.failed_files:
self._add_content(f"{os.path.basename(fp)}: {reason}", indent=False)
self.doc.add_paragraph()
return "\n".join(result)
def _add_content(self, text: str, indent: bool = True):
"""添加正文内容使用convert_markdown_to_word处理文本"""
# 使用convert_markdown_to_word处理markdown文本
processed_text = convert_markdown_to_word(text)
paragraph = self.doc.add_paragraph(processed_text, style='Normal_Custom')
if not indent:
paragraph.paragraph_format.first_line_indent = Pt(0)
return paragraph
def format_file_summaries(self) -> str:
"""
格式化文件总结内容确保正确的标题层级并处理markdown文本
"""
result = []
# 首先对文件路径进行分组整理
file_groups = {}
for path in sorted(self.file_summaries_map.keys()):
dir_path = os.path.dirname(path)
if dir_path not in file_groups:
file_groups[dir_path] = []
file_groups[dir_path].append(path)
# 处理没有目录的文件
root_files = file_groups.get("", [])
if root_files:
for path in sorted(root_files):
file_name = os.path.basename(path)
result.append(f"\n📄 {file_name}")
result.append(self.file_summaries_map[path])
# 无目录的文件作为二级标题
self._add_heading(f"📄 {file_name}", 2)
# 使用convert_markdown_to_word处理文件内容
self._add_content(convert_markdown_to_word(self.file_summaries_map[path]))
self.doc.add_paragraph()
# 处理有目录的文件
for dir_path in sorted(file_groups.keys()):
if dir_path == "": # 跳过已处理的根目录文件
continue
# 添加目录作为二级标题
result.append(f"\n📁 {dir_path}")
self._add_heading(f"📁 {dir_path}", 2)
# 该目录下的所有文件作为三级标题
for path in sorted(file_groups[dir_path]):
file_name = os.path.basename(path)
result.append(f"\n📄 {file_name}")
result.append(self.file_summaries_map[path])
# 添加文件名作为三级标题
self._add_heading(f"📄 {file_name}", 3)
# 使用convert_markdown_to_word处理文件内容
self._add_content(convert_markdown_to_word(self.file_summaries_map[path]))
self.doc.add_paragraph()
return "\n".join(result)
def create_document(self):
"""创建完整Word文档并返回文档对象"""
# 重置所有编号
for level in self.numbers:
self.numbers[level] = 0
# 添加主标题
self._add_heading("文档总结报告", 0)
self.doc.add_paragraph()
# 添加总体摘要使用convert_markdown_to_word处理
self._add_heading("总体摘要", 1)
self._add_content(convert_markdown_to_word(self.final_summary))
self.doc.add_paragraph()
# 添加失败文件列表(如果有)
if self.failed_files:
self.format_failed_files()
# 添加文件详细总结
self._add_heading("各文件详细总结", 1)
self.format_file_summaries()
return self.doc
def save_as_pdf(self, word_path, pdf_path=None):
"""将生成的Word文档转换为PDF
参数:
word_path: Word文档的路径
pdf_path: 可选PDF文件的输出路径。如果未指定将使用与Word文档相同的名称和位置
返回:
生成的PDF文件路径如果转换失败则返回None
"""
from crazy_functions.doc_fns.conversation_doc.word2pdf import WordToPdfConverter
try:
pdf_path = WordToPdfConverter.convert_to_pdf(word_path, pdf_path)
return pdf_path
except Exception as e:
print(f"PDF转换失败: {str(e)}")
return None
class MarkdownFormatter(DocumentFormatter):
"""Markdown格式文档生成器"""
def format_failed_files(self) -> str:
if not self.failed_files:
return ""
formatted_text = ["\n## ⚠️ 处理失败的文件"]
for fp, reason in self.failed_files:
formatted_text.append(f"- {os.path.basename(fp)}: {reason}")
formatted_text.append("\n---")
return "\n".join(formatted_text)
def format_file_summaries(self) -> str:
formatted_text = []
sorted_paths = sorted(self.file_summaries_map.keys())
current_dir = ""
for path in sorted_paths:
dir_path = os.path.dirname(path)
if dir_path != current_dir:
if dir_path:
formatted_text.append(f"\n## 📁 {dir_path}")
current_dir = dir_path
file_name = os.path.basename(path)
formatted_text.append(f"\n### 📄 {file_name}")
formatted_text.append(self.file_summaries_map[path])
formatted_text.append("\n---")
return "\n".join(formatted_text)
def create_document(self) -> str:
document = [
"# 📑 文档总结报告",
"\n## 总体摘要",
self.final_summary
]
if self.failed_files:
document.append(self.format_failed_files())
document.extend([
"\n# 📚 各文件详细总结",
self.format_file_summaries()
])
return "\n".join(document)
class HtmlFormatter(DocumentFormatter):
"""HTML格式文档生成器 - 优化版"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.md = markdown.Markdown(extensions=['extra','codehilite', 'tables','nl2br'])
self.css_styles = """
@keyframes fadeIn {
from { opacity: 0; transform: translateY(20px); }
to { opacity: 1; transform: translateY(0); }
}
@keyframes slideIn {
from { transform: translateX(-20px); opacity: 0; }
to { transform: translateX(0); opacity: 1; }
}
@keyframes pulse {
0% { transform: scale(1); }
50% { transform: scale(1.05); }
100% { transform: scale(1); }
}
:root {
/* Enhanced color palette */
--primary-color: #2563eb;
--primary-light: #eff6ff;
--secondary-color: #1e293b;
--background-color: #f8fafc;
--text-color: #334155;
--text-light: #64748b;
--border-color: #e2e8f0;
--error-color: #ef4444;
--error-light: #fef2f2;
--success-color: #22c55e;
--warning-color: #f59e0b;
--card-shadow: 0 4px 6px -1px rgb(0 0 0 / 0.1), 0 2px 4px -2px rgb(0 0 0 / 0.1);
--hover-shadow: 0 20px 25px -5px rgb(0 0 0 / 0.1), 0 8px 10px -6px rgb(0 0 0 / 0.1);
/* Typography */
--heading-font: "Plus Jakarta Sans", system-ui, sans-serif;
--body-font: "Inter", system-ui, sans-serif;
}
body {
font-family: var(--body-font);
line-height: 1.8;
max-width: 1200px;
margin: 0 auto;
padding: 2rem;
color: var(--text-color);
background-color: var(--background-color);
font-size: 16px;
-webkit-font-smoothing: antialiased;
}
.container {
background: white;
padding: 3rem;
border-radius: 24px;
box-shadow: var(--card-shadow);
transition: all 0.4s cubic-bezier(0.4, 0, 0.2, 1);
animation: fadeIn 0.6s ease-out;
border: 1px solid var(--border-color);
}
.container:hover {
box-shadow: var(--hover-shadow);
transform: translateY(-2px);
}
h1, h2, h3 {
font-family: var(--heading-font);
font-weight: 600;
}
h1 {
color: var(--primary-color);
font-size: 2.8em;
text-align: center;
margin: 2rem 0 3rem;
padding-bottom: 1.5rem;
border-bottom: 3px solid var(--primary-color);
letter-spacing: -0.03em;
position: relative;
display: flex;
align-items: center;
justify-content: center;
gap: 1rem;
}
h1::after {
content: '';
position: absolute;
bottom: -3px;
left: 50%;
transform: translateX(-50%);
width: 120px;
height: 3px;
background: linear-gradient(90deg, var(--primary-color), var(--primary-light));
border-radius: 3px;
transition: width 0.3s ease;
}
h1:hover::after {
width: 180px;
}
h2 {
color: var(--secondary-color);
font-size: 1.9em;
margin: 2.5rem 0 1.5rem;
padding-left: 1.2rem;
border-left: 4px solid var(--primary-color);
letter-spacing: -0.02em;
display: flex;
align-items: center;
gap: 1rem;
transition: all 0.3s ease;
}
h2:hover {
color: var(--primary-color);
transform: translateX(5px);
}
h3 {
color: var(--text-color);
font-size: 1.5em;
margin: 2rem 0 1rem;
padding-bottom: 0.8rem;
border-bottom: 2px solid var(--border-color);
transition: all 0.3s ease;
display: flex;
align-items: center;
gap: 0.8rem;
}
h3:hover {
color: var(--primary-color);
border-bottom-color: var(--primary-color);
}
.summary {
background: var(--primary-light);
padding: 2.5rem;
border-radius: 16px;
margin: 2.5rem 0;
box-shadow: 0 4px 6px -1px rgba(37, 99, 235, 0.1);
position: relative;
overflow: hidden;
transition: transform 0.3s ease, box-shadow 0.3s ease;
animation: slideIn 0.5s ease-out;
}
.summary:hover {
transform: translateY(-3px);
box-shadow: 0 8px 12px -2px rgba(37, 99, 235, 0.15);
}
.summary::before {
content: '';
position: absolute;
top: 0;
left: 0;
width: 4px;
height: 100%;
background: linear-gradient(to bottom, var(--primary-color), rgba(37, 99, 235, 0.6));
}
.summary p {
margin: 1.2rem 0;
line-height: 1.9;
color: var(--text-color);
transition: color 0.3s ease;
}
.summary:hover p {
color: var(--secondary-color);
}
.details {
margin-top: 3.5rem;
padding-top: 2.5rem;
border-top: 2px dashed var(--border-color);
animation: fadeIn 0.8s ease-out;
}
.failed-files {
background: var(--error-light);
padding: 2rem;
border-radius: 16px;
margin: 3rem 0;
border-left: 4px solid var(--error-color);
position: relative;
transition: all 0.3s ease;
animation: slideIn 0.5s ease-out;
}
.failed-files:hover {
transform: translateX(5px);
box-shadow: 0 8px 15px -3px rgba(239, 68, 68, 0.1);
}
.failed-files h2 {
color: var(--error-color);
border-left: none;
padding-left: 0;
}
.failed-files ul {
margin: 1.8rem 0;
padding-left: 1.2rem;
list-style-type: none;
}
.failed-files li {
margin: 1.2rem 0;
padding: 1.2rem 1.8rem;
background: rgba(239, 68, 68, 0.08);
border-radius: 12px;
transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1);
}
.failed-files li:hover {
transform: translateX(8px);
background: rgba(239, 68, 68, 0.12);
}
.directory-section {
margin: 3.5rem 0;
padding: 2rem;
background: var(--background-color);
border-radius: 16px;
position: relative;
transition: all 0.3s ease;
animation: fadeIn 0.6s ease-out;
}
.directory-section:hover {
background: white;
box-shadow: var(--card-shadow);
}
.file-summary {
background: white;
padding: 2rem;
margin: 1.8rem 0;
border-radius: 16px;
box-shadow: var(--card-shadow);
border-left: 4px solid var(--border-color);
transition: all 0.4s cubic-bezier(0.4, 0, 0.2, 1);
position: relative;
overflow: hidden;
}
.file-summary:hover {
border-left-color: var(--primary-color);
transform: translateX(8px) translateY(-2px);
box-shadow: var(--hover-shadow);
}
.file-summary {
background: white;
padding: 2rem;
margin: 1.8rem 0;
border-radius: 16px;
box-shadow: var(--card-shadow);
border-left: 4px solid var(--border-color);
transition: all 0.4s cubic-bezier(0.4, 0, 0.2, 1);
position: relative;
}
.file-summary:hover {
border-left-color: var(--primary-color);
transform: translateX(8px) translateY(-2px);
box-shadow: var(--hover-shadow);
}
.icon {
display: inline-flex;
align-items: center;
justify-content: center;
width: 32px;
height: 32px;
border-radius: 8px;
background: var(--primary-light);
color: var(--primary-color);
font-size: 1.2em;
transition: all 0.3s ease;
}
.file-summary:hover .icon,
.directory-section:hover .icon {
transform: scale(1.1);
background: var(--primary-color);
color: white;
}
/* Smooth scrolling */
html {
scroll-behavior: smooth;
}
/* Selection style */
::selection {
background: var(--primary-light);
color: var(--primary-color);
}
/* Print styles */
@media print {
body {
background: white;
}
.container {
box-shadow: none;
padding: 0;
}
.file-summary, .failed-files {
break-inside: avoid;
box-shadow: none;
}
.icon {
display: none;
}
}
/* Responsive design */
@media (max-width: 768px) {
body {
padding: 1rem;
font-size: 15px;
}
.container {
padding: 1.5rem;
}
h1 {
font-size: 2.2em;
margin: 1.5rem 0 2rem;
}
h2 {
font-size: 1.7em;
}
h3 {
font-size: 1.4em;
}
.summary, .failed-files, .directory-section {
padding: 1.5rem;
}
.file-summary {
padding: 1.2rem;
}
.icon {
width: 28px;
height: 28px;
}
}
/* Dark mode support */
@media (prefers-color-scheme: dark) {
:root {
--primary-light: rgba(37, 99, 235, 0.15);
--background-color: #0f172a;
--text-color: #e2e8f0;
--text-light: #94a3b8;
--border-color: #1e293b;
--error-light: rgba(239, 68, 68, 0.15);
}
.container, .file-summary {
background: #1e293b;
}
.directory-section {
background: #0f172a;
}
.directory-section:hover {
background: #1e293b;
}
}
"""
def format_failed_files(self) -> str:
if not self.failed_files:
return ""
failed_files_html = ['<div class="failed-files">']
failed_files_html.append('<h2><span class="icon">⚠️</span> 处理失败的文件</h2>')
failed_files_html.append("<ul>")
for fp, reason in self.failed_files:
failed_files_html.append(
f'<li><strong>📄 {os.path.basename(fp)}</strong><br><span style="color: var(--text-light)">{reason}</span></li>'
)
failed_files_html.append("</ul></div>")
return "\n".join(failed_files_html)
def format_file_summaries(self) -> str:
formatted_html = []
sorted_paths = sorted(self.file_summaries_map.keys())
current_dir = ""
for path in sorted_paths:
dir_path = os.path.dirname(path)
if dir_path != current_dir:
if dir_path:
formatted_html.append('<div class="directory-section">')
formatted_html.append(f'<h2><span class="icon">📁</span> {dir_path}</h2>')
formatted_html.append('</div>')
current_dir = dir_path
file_name = os.path.basename(path)
formatted_html.append('<div class="file-summary">')
formatted_html.append(f'<h3><span class="icon">📄</span> {file_name}</h3>')
formatted_html.append(self.md.convert(self.file_summaries_map[path]))
formatted_html.append('</div>')
return "\n".join(formatted_html)
def create_document(self) -> str:
"""生成HTML文档
Returns:
str: 完整的HTML文档字符串
"""
return f"""
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>文档总结报告</title>
<link href="https://cdnjs.cloudflare.com/ajax/libs/inter/3.19.3/inter.css" rel="stylesheet">
<link href="https://fonts.googleapis.com/css2?family=Plus+Jakarta+Sans:wght@400;600&display=swap" rel="stylesheet">
<style>{self.css_styles}</style>
</head>
<body>
<div class="container">
<h1><span class="icon">📑</span> 文档总结报告</h1>
<div class="summary">
<h2><span class="icon">📋</span> 总体摘要</h2>
<p>{self.md.convert(self.final_summary)}</p>
</div>
{self.format_failed_files()}
<div class="details">
<h2><span class="icon">📚</span> 各文件详细总结</h2>
{self.format_file_summaries()}
</div>
</div>
</body>
</html>
"""

View File

@@ -0,0 +1,237 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, Type, TypeVar, Generic, Union
from dataclasses import dataclass
from enum import Enum, auto
import logging
from datetime import datetime
# 设置日志
logger = logging.getLogger(__name__)
# 自定义异常类定义
class FoldingError(Exception):
"""折叠相关的自定义异常基类"""
pass
class FormattingError(FoldingError):
"""格式化过程中的错误"""
pass
class MetadataError(FoldingError):
"""元数据相关的错误"""
pass
class ValidationError(FoldingError):
"""验证错误"""
pass
class FoldingStyle(Enum):
"""折叠样式枚举"""
SIMPLE = auto() # 简单折叠
DETAILED = auto() # 详细折叠(带有额外信息)
NESTED = auto() # 嵌套折叠
@dataclass
class FoldingOptions:
"""折叠选项配置"""
style: FoldingStyle = FoldingStyle.DETAILED
code_language: Optional[str] = None # 代码块的语言
show_timestamp: bool = False # 是否显示时间戳
indent_level: int = 0 # 缩进级别
custom_css: Optional[str] = None # 自定义CSS类
T = TypeVar('T') # 用于泛型类型
class BaseMetadata(ABC):
"""元数据基类"""
@abstractmethod
def validate(self) -> bool:
"""验证元数据的有效性"""
pass
def _validate_non_empty_str(self, value: Optional[str]) -> bool:
"""验证字符串非空"""
return bool(value and value.strip())
@dataclass
class FileMetadata(BaseMetadata):
"""文件元数据"""
rel_path: str
size: float
last_modified: Optional[datetime] = None
mime_type: Optional[str] = None
encoding: str = 'utf-8'
def validate(self) -> bool:
"""验证文件元数据的有效性"""
try:
if not self._validate_non_empty_str(self.rel_path):
return False
if self.size < 0:
return False
return True
except Exception as e:
logger.error(f"File metadata validation error: {str(e)}")
return False
class ContentFormatter(ABC, Generic[T]):
"""内容格式化抽象基类
支持泛型类型参数,可以指定具体的元数据类型。
"""
@abstractmethod
def format(self,
content: str,
metadata: T,
options: Optional[FoldingOptions] = None) -> str:
"""格式化内容
Args:
content: 需要格式化的内容
metadata: 类型化的元数据
options: 折叠选项
Returns:
str: 格式化后的内容
Raises:
FormattingError: 格式化过程中的错误
"""
pass
def _create_summary(self, metadata: T) -> str:
"""创建折叠摘要,可被子类重写"""
return str(metadata)
def _format_content_block(self,
content: str,
options: Optional[FoldingOptions]) -> str:
"""格式化内容块,处理代码块等特殊格式"""
if not options:
return content
if options.code_language:
return f"```{options.code_language}\n{content}\n```"
return content
def _add_indent(self, text: str, level: int) -> str:
"""添加缩进"""
if level <= 0:
return text
indent = " " * level
return "\n".join(indent + line for line in text.splitlines())
class FileContentFormatter(ContentFormatter[FileMetadata]):
"""文件内容格式化器"""
def format(self,
content: str,
metadata: FileMetadata,
options: Optional[FoldingOptions] = None) -> str:
"""格式化文件内容"""
if not metadata.validate():
raise MetadataError("Invalid file metadata")
try:
options = options or FoldingOptions()
# 构建摘要信息
summary_parts = [
f"{metadata.rel_path} ({metadata.size:.2f}MB)",
f"Type: {metadata.mime_type}" if metadata.mime_type else None,
(f"Modified: {metadata.last_modified.strftime('%Y-%m-%d %H:%M:%S')}"
if metadata.last_modified and options.show_timestamp else None)
]
summary = " | ".join(filter(None, summary_parts))
# 构建HTML类
css_class = f' class="{options.custom_css}"' if options.custom_css else ''
# 格式化内容
formatted_content = self._format_content_block(content, options)
# 组装最终结果
result = (
f'<details{css_class}><summary>{summary}</summary>\n\n'
f'{formatted_content}\n\n'
f'</details>\n\n'
)
return self._add_indent(result, options.indent_level)
except Exception as e:
logger.error(f"Error formatting file content: {str(e)}")
raise FormattingError(f"Failed to format file content: {str(e)}")
class ContentFoldingManager:
"""内容折叠管理器"""
def __init__(self):
"""初始化折叠管理器"""
self._formatters: Dict[str, ContentFormatter] = {}
self._register_default_formatters()
def _register_default_formatters(self) -> None:
"""注册默认的格式化器"""
self.register_formatter('file', FileContentFormatter())
def register_formatter(self, name: str, formatter: ContentFormatter) -> None:
"""注册新的格式化器"""
if not isinstance(formatter, ContentFormatter):
raise TypeError("Formatter must implement ContentFormatter interface")
self._formatters[name] = formatter
def _guess_language(self, extension: str) -> Optional[str]:
"""根据文件扩展名猜测编程语言"""
extension = extension.lower().lstrip('.')
language_map = {
'py': 'python',
'js': 'javascript',
'java': 'java',
'cpp': 'cpp',
'cs': 'csharp',
'html': 'html',
'css': 'css',
'md': 'markdown',
'json': 'json',
'xml': 'xml',
'sql': 'sql',
'sh': 'bash',
'yaml': 'yaml',
'yml': 'yaml',
'txt': None # 纯文本不需要语言标识
}
return language_map.get(extension)
def format_content(self,
content: str,
formatter_type: str,
metadata: Union[FileMetadata],
options: Optional[FoldingOptions] = None) -> str:
"""格式化内容"""
formatter = self._formatters.get(formatter_type)
if not formatter:
raise KeyError(f"No formatter registered for type: {formatter_type}")
if not isinstance(metadata, FileMetadata):
raise TypeError("Invalid metadata type")
return formatter.format(content, metadata, options)

View File

@@ -0,0 +1,211 @@
import re
import os
import pandas as pd
from datetime import datetime
from openpyxl import Workbook
class ExcelTableFormatter:
"""聊天记录中Markdown表格转Excel生成器"""
def __init__(self):
"""初始化Excel文档对象"""
self.workbook = Workbook()
self._table_count = 0
self._current_sheet = None
def _normalize_table_row(self, row):
"""标准化表格行,处理不同的分隔符情况"""
row = row.strip()
if row.startswith('|'):
row = row[1:]
if row.endswith('|'):
row = row[:-1]
return [cell.strip() for cell in row.split('|')]
def _is_separator_row(self, row):
"""检查是否是分隔行(由 - 或 : 组成)"""
clean_row = re.sub(r'[\s|]', '', row)
return bool(re.match(r'^[-:]+$', clean_row))
def _extract_tables_from_text(self, text):
"""从文本中提取所有表格内容"""
if not isinstance(text, str):
return []
tables = []
current_table = []
is_in_table = False
for line in text.split('\n'):
line = line.strip()
if not line:
if is_in_table and current_table:
if len(current_table) >= 2:
tables.append(current_table)
current_table = []
is_in_table = False
continue
if '|' in line:
if not is_in_table:
is_in_table = True
current_table.append(line)
else:
if is_in_table and current_table:
if len(current_table) >= 2:
tables.append(current_table)
current_table = []
is_in_table = False
if is_in_table and current_table and len(current_table) >= 2:
tables.append(current_table)
return tables
def _parse_table(self, table_lines):
"""解析表格内容为结构化数据"""
try:
headers = self._normalize_table_row(table_lines[0])
separator_index = next(
(i for i, line in enumerate(table_lines) if self._is_separator_row(line)),
1
)
data_rows = []
for line in table_lines[separator_index + 1:]:
cells = self._normalize_table_row(line)
# 确保单元格数量与表头一致
while len(cells) < len(headers):
cells.append('')
cells = cells[:len(headers)]
data_rows.append(cells)
if headers and data_rows:
return {
'headers': headers,
'data': data_rows
}
except Exception as e:
print(f"解析表格时发生错误: {str(e)}")
return None
def _create_sheet(self, question_num, table_num):
"""创建新的工作表"""
sheet_name = f'Q{question_num}_T{table_num}'
if len(sheet_name) > 31:
sheet_name = f'Table{self._table_count}'
if sheet_name in self.workbook.sheetnames:
sheet_name = f'{sheet_name}_{datetime.now().strftime("%H%M%S")}'
return self.workbook.create_sheet(title=sheet_name)
def create_document(self, history):
"""
处理聊天历史中的所有表格并创建Excel文档
Args:
history: 聊天历史列表
Returns:
Workbook: 处理完成的Excel工作簿对象如果没有表格则返回None
"""
has_tables = False
# 删除默认创建的工作表
default_sheet = self.workbook['Sheet']
self.workbook.remove(default_sheet)
# 遍历所有回答
for i in range(1, len(history), 2):
answer = history[i]
tables = self._extract_tables_from_text(answer)
for table_lines in tables:
parsed_table = self._parse_table(table_lines)
if parsed_table:
self._table_count += 1
sheet = self._create_sheet(i // 2 + 1, self._table_count)
# 写入表头
for col, header in enumerate(parsed_table['headers'], 1):
sheet.cell(row=1, column=col, value=header)
# 写入数据
for row_idx, row_data in enumerate(parsed_table['data'], 2):
for col_idx, value in enumerate(row_data, 1):
sheet.cell(row=row_idx, column=col_idx, value=value)
has_tables = True
return self.workbook if has_tables else None
def save_chat_tables(history, save_dir, base_name):
"""
保存聊天历史中的表格到Excel文件
Args:
history: 聊天历史列表
save_dir: 保存目录
base_name: 基础文件名
Returns:
list: 保存的文件路径列表
"""
result_files = []
try:
# 创建Excel格式
excel_formatter = ExcelTableFormatter()
workbook = excel_formatter.create_document(history)
if workbook is not None:
# 确保保存目录存在
os.makedirs(save_dir, exist_ok=True)
# 生成Excel文件路径
excel_file = os.path.join(save_dir, base_name + '.xlsx')
# 保存Excel文件
workbook.save(excel_file)
result_files.append(excel_file)
print(f"已保存表格到Excel文件: {excel_file}")
except Exception as e:
print(f"保存Excel格式失败: {str(e)}")
return result_files
# 使用示例
if __name__ == "__main__":
# 示例聊天历史
history = [
"问题1",
"""这是第一个表格:
| A | B | C |
|---|---|---|
| 1 | 2 | 3 |""",
"问题2",
"这是没有表格的回答",
"问题3",
"""回答包含多个表格:
| Name | Age |
|------|-----|
| Tom | 20 |
第二个表格:
| X | Y |
|---|---|
| 1 | 2 |"""
]
# 保存表格
save_dir = "output"
base_name = "chat_tables"
saved_files = save_chat_tables(history, save_dir, base_name)

View File

@@ -0,0 +1,190 @@
class HtmlFormatter:
"""聊天记录HTML格式生成器"""
def __init__(self, chatbot, history):
self.chatbot = chatbot
self.history = history
self.css_styles = """
:root {
--primary-color: #2563eb;
--primary-light: #eff6ff;
--secondary-color: #1e293b;
--background-color: #f8fafc;
--text-color: #334155;
--border-color: #e2e8f0;
--card-shadow: 0 4px 6px -1px rgb(0 0 0 / 0.1), 0 2px 4px -2px rgb(0 0 0 / 0.1);
}
body {
font-family: system-ui, -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
line-height: 1.8;
margin: 0;
padding: 2rem;
color: var(--text-color);
background-color: var(--background-color);
}
.container {
max-width: 1200px;
margin: 0 auto;
background: white;
padding: 2rem;
border-radius: 16px;
box-shadow: var(--card-shadow);
}
::selection {
background: var(--primary-light);
color: var(--primary-color);
}
@keyframes fadeIn {
from { opacity: 0; transform: translateY(20px); }
to { opacity: 1; transform: translateY(0); }
}
@keyframes slideIn {
from { transform: translateX(-20px); opacity: 0; }
to { transform: translateX(0); opacity: 1; }
}
.container {
animation: fadeIn 0.6s ease-out;
}
.QaBox {
animation: slideIn 0.5s ease-out;
transition: all 0.3s ease;
}
.QaBox:hover {
transform: translateX(5px);
}
.Question, .Answer, .historyBox {
transition: all 0.3s ease;
}
.chat-title {
color: var(--primary-color);
font-size: 2em;
text-align: center;
margin: 1rem 0 2rem;
padding-bottom: 1rem;
border-bottom: 2px solid var(--primary-color);
}
.chat-body {
display: flex;
flex-direction: column;
gap: 1.5rem;
margin: 2rem 0;
}
.QaBox {
background: white;
padding: 1.5rem;
border-radius: 8px;
border-left: 4px solid var(--primary-color);
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);
margin-bottom: 1.5rem;
}
.Question {
color: var(--secondary-color);
font-weight: 500;
margin-bottom: 1rem;
}
.Answer {
color: var(--text-color);
background: var(--primary-light);
padding: 1rem;
border-radius: 6px;
}
.history-section {
margin-top: 3rem;
padding-top: 2rem;
border-top: 2px solid var(--border-color);
}
.history-title {
color: var(--secondary-color);
font-size: 1.5em;
margin-bottom: 1.5rem;
text-align: center;
}
.historyBox {
background: white;
padding: 1rem;
margin: 0.5rem 0;
border-radius: 6px;
border: 1px solid var(--border-color);
}
@media (prefers-color-scheme: dark) {
:root {
--background-color: #0f172a;
--text-color: #e2e8f0;
--border-color: #1e293b;
}
.container, .QaBox {
background: #1e293b;
}
}
"""
def format_chat_content(self) -> str:
"""格式化聊天内容"""
chat_content = []
for q, a in self.chatbot:
question = str(q) if q is not None else ""
answer = str(a) if a is not None else ""
chat_content.append(f'''
<div class="QaBox">
<div class="Question">{question}</div>
<div class="Answer">{answer}</div>
</div>
''')
return "\n".join(chat_content)
def format_history_content(self) -> str:
"""格式化历史记录内容"""
if not self.history:
return ""
history_content = []
for entry in self.history:
history_content.append(f'''
<div class="historyBox">
<div class="entry">{entry}</div>
</div>
''')
return "\n".join(history_content)
def create_document(self) -> str:
"""生成完整的HTML文档
Returns:
str: 完整的HTML文档字符串
"""
return f"""
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>对话存档</title>
<style>{self.css_styles}</style>
</head>
<body>
<div class="container">
<h1 class="chat-title">对话存档</h1>
<div class="chat-body">
{self.format_chat_content()}
</div>
</div>
</body>
</html>
"""

View File

@@ -0,0 +1,39 @@
class MarkdownFormatter:
"""Markdown格式文档生成器 - 用于生成对话记录的markdown文档"""
def __init__(self):
self.content = []
def _add_content(self, text: str):
"""添加正文内容"""
if text:
self.content.append(f"\n{text}\n")
def create_document(self, history: list) -> str:
"""
创建完整的Markdown文档
Args:
history: 历史记录列表,偶数位置为问题,奇数位置为答案
Returns:
str: 生成的Markdown文本
"""
self.content = []
# 处理问答对
for i in range(0, len(history), 2):
question = history[i]
answer = history[i + 1]
# 添加问题
self.content.append(f"\n### 问题 {i//2 + 1}")
self._add_content(question)
# 添加回答
self.content.append(f"\n### 回答 {i//2 + 1}")
self._add_content(answer)
# 添加分隔线
self.content.append("\n---\n")
return "\n".join(self.content)

View File

@@ -0,0 +1,172 @@
from datetime import datetime
import os
import re
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
def convert_markdown_to_pdf(markdown_text):
"""将Markdown文本转换为PDF格式的纯文本"""
if not markdown_text:
return ""
# 标准化换行符
markdown_text = markdown_text.replace('\r\n', '\n').replace('\r', '\n')
# 处理标题、粗体、斜体
markdown_text = re.sub(r'^#\s+(.+)$', r'\1', markdown_text, flags=re.MULTILINE)
markdown_text = re.sub(r'\*\*(.+?)\*\*', r'\1', markdown_text)
markdown_text = re.sub(r'\*(.+?)\*', r'\1', markdown_text)
# 处理列表
markdown_text = re.sub(r'^\s*[-*+]\s+(.+?)(?=\n|$)', r'\1', markdown_text, flags=re.MULTILINE)
markdown_text = re.sub(r'^\s*\d+\.\s+(.+?)(?=\n|$)', r'\1', markdown_text, flags=re.MULTILINE)
# 处理链接
markdown_text = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'\1', markdown_text)
# 处理段落
markdown_text = re.sub(r'\n{2,}', '\n', markdown_text)
markdown_text = re.sub(r'(?<!\n)(?<!^)(?<!•\s)(?<!\d\.\s)\n(?![\s•\d])', '\n\n', markdown_text, flags=re.MULTILINE)
# 清理空白
markdown_text = re.sub(r' +', ' ', markdown_text)
markdown_text = re.sub(r'(?m)^\s+|\s+$', '', markdown_text)
return markdown_text.strip()
class PDFFormatter:
"""聊天记录PDF文档生成器 - 使用 Noto Sans CJK 字体"""
def __init__(self):
self._init_reportlab()
self._register_fonts()
self.styles = self._get_reportlab_lib()['getSampleStyleSheet']()
self._create_styles()
def _init_reportlab(self):
"""初始化 ReportLab 相关组件"""
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import cm
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
self._lib = {
'A4': A4,
'getSampleStyleSheet': getSampleStyleSheet,
'ParagraphStyle': ParagraphStyle,
'cm': cm
}
self._platypus = {
'SimpleDocTemplate': SimpleDocTemplate,
'Paragraph': Paragraph,
'Spacer': Spacer
}
def _get_reportlab_lib(self):
return self._lib
def _get_reportlab_platypus(self):
return self._platypus
def _register_fonts(self):
"""注册 Noto Sans CJK 字体"""
possible_font_paths = [
'/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc',
'/usr/share/fonts/noto-cjk/NotoSansCJK-Regular.ttc',
'/usr/share/fonts/noto/NotoSansCJK-Regular.ttc'
]
font_registered = False
for path in possible_font_paths:
if os.path.exists(path):
try:
pdfmetrics.registerFont(TTFont('NotoSansCJK', path))
font_registered = True
break
except:
continue
if not font_registered:
print("Warning: Could not find Noto Sans CJK font. Using fallback font.")
self.font_name = 'Helvetica'
else:
self.font_name = 'NotoSansCJK'
def _create_styles(self):
"""创建文档样式"""
ParagraphStyle = self._lib['ParagraphStyle']
# 标题样式
self.styles.add(ParagraphStyle(
name='Title_Custom',
fontName=self.font_name,
fontSize=24,
leading=38,
alignment=1,
spaceAfter=32
))
# 日期样式
self.styles.add(ParagraphStyle(
name='Date_Style',
fontName=self.font_name,
fontSize=16,
leading=20,
alignment=1,
spaceAfter=20
))
# 问题样式
self.styles.add(ParagraphStyle(
name='Question_Style',
fontName=self.font_name,
fontSize=12,
leading=18,
leftIndent=28,
spaceAfter=6
))
# 回答样式
self.styles.add(ParagraphStyle(
name='Answer_Style',
fontName=self.font_name,
fontSize=12,
leading=18,
leftIndent=28,
spaceAfter=12
))
def create_document(self, history, output_path):
"""生成PDF文档"""
# 创建PDF文档
doc = self._platypus['SimpleDocTemplate'](
output_path,
pagesize=self._lib['A4'],
rightMargin=2.6 * self._lib['cm'],
leftMargin=2.8 * self._lib['cm'],
topMargin=3.7 * self._lib['cm'],
bottomMargin=3.5 * self._lib['cm']
)
# 构建内容
story = []
Paragraph = self._platypus['Paragraph']
# 添加对话内容
for i in range(0, len(history), 2):
question = history[i]
answer = convert_markdown_to_pdf(history[i + 1]) if i + 1 < len(history) else ""
if question:
q_text = f'问题 {i // 2 + 1}{str(question)}'
story.append(Paragraph(q_text, self.styles['Question_Style']))
if answer:
a_text = f'回答 {i // 2 + 1}{str(answer)}'
story.append(Paragraph(a_text, self.styles['Answer_Style']))
# 构建PDF
doc.build(story)
return doc

View File

@@ -0,0 +1,79 @@
import re
def convert_markdown_to_txt(markdown_text):
"""Convert markdown text to plain text while preserving formatting"""
# Standardize line endings
markdown_text = markdown_text.replace('\r\n', '\n').replace('\r', '\n')
# 1. Handle headers but keep their formatting instead of removing them
markdown_text = re.sub(r'^#\s+(.+)$', r'# \1', markdown_text, flags=re.MULTILINE)
markdown_text = re.sub(r'^##\s+(.+)$', r'## \1', markdown_text, flags=re.MULTILINE)
markdown_text = re.sub(r'^###\s+(.+)$', r'### \1', markdown_text, flags=re.MULTILINE)
# 2. Handle bold and italic - simply remove markers
markdown_text = re.sub(r'\*\*(.+?)\*\*', r'\1', markdown_text)
markdown_text = re.sub(r'\*(.+?)\*', r'\1', markdown_text)
# 3. Handle lists but preserve formatting
markdown_text = re.sub(r'^\s*[-*+]\s+(.+?)(?=\n|$)', r'\1', markdown_text, flags=re.MULTILINE)
# 4. Handle links - keep only the text
markdown_text = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'\1 (\2)', markdown_text)
# 5. Handle HTML links - convert to user-friendly format
markdown_text = re.sub(r'<a href=[\'"]([^\'"]+)[\'"](?:\s+target=[\'"][^\'"]+[\'"])?>([^<]+)</a>', r'\2 (\1)',
markdown_text)
# 6. Preserve paragraph breaks
markdown_text = re.sub(r'\n{3,}', '\n\n', markdown_text) # normalize multiple newlines to double newlines
# 7. Clean up extra spaces but maintain indentation
markdown_text = re.sub(r' +', ' ', markdown_text)
return markdown_text.strip()
class TxtFormatter:
"""Chat history TXT document generator"""
def __init__(self):
self.content = []
self._setup_document()
def _setup_document(self):
"""Initialize document with header"""
self.content.append("=" * 50)
self.content.append("GPT-Academic对话记录".center(48))
self.content.append("=" * 50)
def _format_header(self):
"""Create document header with current date"""
from datetime import datetime
date_str = datetime.now().strftime('%Y年%m月%d')
return [
date_str.center(48),
"\n" # Add blank line after date
]
def create_document(self, history):
"""Generate document from chat history"""
# Add header with date
self.content.extend(self._format_header())
# Add conversation content
for i in range(0, len(history), 2):
question = history[i]
answer = convert_markdown_to_txt(history[i + 1]) if i + 1 < len(history) else ""
if question:
self.content.append(f"问题 {i // 2 + 1}{str(question)}")
self.content.append("") # Add blank line
if answer:
self.content.append(f"回答 {i // 2 + 1}{str(answer)}")
self.content.append("") # Add blank line
# Join all content with newlines
return "\n".join(self.content)

View File

@@ -0,0 +1,155 @@
from docx2pdf import convert
import os
import platform
import subprocess
from typing import Union
from pathlib import Path
from datetime import datetime
class WordToPdfConverter:
"""Word文档转PDF转换器"""
@staticmethod
def convert_to_pdf(word_path: Union[str, Path], pdf_path: Union[str, Path] = None) -> str:
"""
将Word文档转换为PDF
参数:
word_path: Word文档的路径
pdf_path: 可选PDF文件的输出路径。如果未指定将使用与Word文档相同的名称和位置
返回:
生成的PDF文件路径
异常:
如果转换失败,将抛出相应异常
"""
try:
# 确保输入路径是Path对象
word_path = Path(word_path)
# 如果未指定pdf_path则使用与word文档相同的名称
if pdf_path is None:
pdf_path = word_path.with_suffix('.pdf')
else:
pdf_path = Path(pdf_path)
# 检查操作系统
if platform.system() == 'Linux':
# Linux系统需要安装libreoffice
which_result = subprocess.run(['which', 'libreoffice'], capture_output=True, text=True)
if which_result.returncode != 0:
raise RuntimeError("请先安装LibreOffice: sudo apt-get install libreoffice")
print(f"开始转换Word文档: {word_path} 到 PDF")
# 使用subprocess代替os.system
result = subprocess.run(
['libreoffice', '--headless', '--convert-to', 'pdf:writer_pdf_Export',
str(word_path), '--outdir', str(pdf_path.parent)],
capture_output=True, text=True
)
if result.returncode != 0:
error_msg = result.stderr or "未知错误"
print(f"LibreOffice转换失败错误信息: {error_msg}")
raise RuntimeError(f"LibreOffice转换失败: {error_msg}")
print(f"LibreOffice转换输出: {result.stdout}")
# 如果输出路径与默认生成的不同,则重命名
default_pdf = word_path.with_suffix('.pdf')
if default_pdf != pdf_path and default_pdf.exists():
os.rename(default_pdf, pdf_path)
print(f"已将PDF从 {default_pdf} 重命名为 {pdf_path}")
# 验证PDF是否成功生成
if not pdf_path.exists() or pdf_path.stat().st_size == 0:
raise RuntimeError("PDF生成失败或文件为空")
print(f"PDF转换成功文件大小: {pdf_path.stat().st_size} 字节")
else:
# Windows和MacOS使用docx2pdf
print(f"使用docx2pdf转换 {word_path}{pdf_path}")
convert(word_path, pdf_path)
# 验证PDF是否成功生成
if not pdf_path.exists() or pdf_path.stat().st_size == 0:
raise RuntimeError("PDF生成失败或文件为空")
print(f"PDF转换成功文件大小: {pdf_path.stat().st_size} 字节")
return str(pdf_path)
except Exception as e:
print(f"PDF转换异常: {str(e)}")
raise Exception(f"转换PDF失败: {str(e)}")
@staticmethod
def batch_convert(word_dir: Union[str, Path], pdf_dir: Union[str, Path] = None) -> list:
"""
批量转换目录下的所有Word文档
参数:
word_dir: 包含Word文档的目录路径
pdf_dir: 可选PDF文件的输出目录。如果未指定将使用与Word文档相同的目录
返回:
生成的PDF文件路径列表
"""
word_dir = Path(word_dir)
if pdf_dir:
pdf_dir = Path(pdf_dir)
pdf_dir.mkdir(parents=True, exist_ok=True)
converted_files = []
for word_file in word_dir.glob("*.docx"):
try:
if pdf_dir:
pdf_path = pdf_dir / word_file.with_suffix('.pdf').name
else:
pdf_path = word_file.with_suffix('.pdf')
pdf_file = WordToPdfConverter.convert_to_pdf(word_file, pdf_path)
converted_files.append(pdf_file)
except Exception as e:
print(f"转换 {word_file} 失败: {str(e)}")
return converted_files
@staticmethod
def convert_doc_to_pdf(doc, output_dir: Union[str, Path] = None) -> str:
"""
将docx对象直接转换为PDF
参数:
doc: python-docx的Document对象
output_dir: 可选,输出目录。如果未指定,将使用当前目录
返回:
生成的PDF文件路径
"""
try:
# 设置临时文件路径和输出路径
output_dir = Path(output_dir) if output_dir else Path.cwd()
output_dir.mkdir(parents=True, exist_ok=True)
# 生成临时word文件
temp_docx = output_dir / f"temp_{datetime.now().strftime('%Y%m%d_%H%M%S')}.docx"
doc.save(temp_docx)
# 转换为PDF
pdf_path = temp_docx.with_suffix('.pdf')
WordToPdfConverter.convert_to_pdf(temp_docx, pdf_path)
# 删除临时word文件
temp_docx.unlink()
return str(pdf_path)
except Exception as e:
if temp_docx.exists():
temp_docx.unlink()
raise Exception(f"转换PDF失败: {str(e)}")

View File

@@ -0,0 +1,177 @@
import re
from docx import Document
from docx.shared import Cm, Pt
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT, WD_LINE_SPACING
from docx.enum.style import WD_STYLE_TYPE
from docx.oxml.ns import qn
from datetime import datetime
def convert_markdown_to_word(markdown_text):
# 0. 首先标准化所有换行符为\n
markdown_text = markdown_text.replace('\r\n', '\n').replace('\r', '\n')
# 1. 处理标题 - 支持更多级别的标题,使用更精确的正则
# 保留标题标记,以便后续处理时还能识别出标题级别
markdown_text = re.sub(r'^(#{1,6})\s+(.+?)(?:\s+#+)?$', r'\1 \2', markdown_text, flags=re.MULTILINE)
# 2. 处理粗体、斜体和加粗斜体
markdown_text = re.sub(r'\*\*\*(.+?)\*\*\*', r'\1', markdown_text) # 加粗斜体
markdown_text = re.sub(r'\*\*(.+?)\*\*', r'\1', markdown_text) # 加粗
markdown_text = re.sub(r'\*(.+?)\*', r'\1', markdown_text) # 斜体
markdown_text = re.sub(r'_(.+?)_', r'\1', markdown_text) # 下划线斜体
markdown_text = re.sub(r'__(.+?)__', r'\1', markdown_text) # 下划线加粗
# 3. 处理代码块 - 不移除,而是简化格式
# 多行代码块
markdown_text = re.sub(r'```(?:\w+)?\n([\s\S]*?)```', r'[代码块]\n\1[/代码块]', markdown_text)
# 单行代码
markdown_text = re.sub(r'`([^`]+)`', r'[代码]\1[/代码]', markdown_text)
# 4. 处理列表 - 保留列表结构
# 匹配无序列表
markdown_text = re.sub(r'^(\s*)[-*+]\s+(.+?)$', r'\1• \2', markdown_text, flags=re.MULTILINE)
# 5. 处理Markdown链接
markdown_text = re.sub(r'\[([^\]]+)\]\(([^)]+?)\s*(?:"[^"]*")?\)', r'\1 (\2)', markdown_text)
# 6. 处理HTML链接
markdown_text = re.sub(r'<a href=[\'"]([^\'"]+)[\'"](?:\s+target=[\'"][^\'"]+[\'"])?>([^<]+)</a>', r'\2 (\1)',
markdown_text)
# 7. 处理图片
markdown_text = re.sub(r'!\[([^\]]*)\]\([^)]+\)', r'[图片:\1]', markdown_text)
return markdown_text
class WordFormatter:
"""聊天记录Word文档生成器 - 符合中国政府公文格式规范(GB/T 9704-2012)"""
def __init__(self):
self.doc = Document()
self._setup_document()
self._create_styles()
def _setup_document(self):
"""设置文档基本格式,包括页面设置和页眉"""
sections = self.doc.sections
for section in sections:
# 设置页面大小为A4
section.page_width = Cm(21)
section.page_height = Cm(29.7)
# 设置页边距
section.top_margin = Cm(3.7) # 上边距37mm
section.bottom_margin = Cm(3.5) # 下边距35mm
section.left_margin = Cm(2.8) # 左边距28mm
section.right_margin = Cm(2.6) # 右边距26mm
# 设置页眉页脚距离
section.header_distance = Cm(2.0)
section.footer_distance = Cm(2.0)
# 添加页眉
header = section.header
header_para = header.paragraphs[0]
header_para.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
header_run = header_para.add_run("GPT-Academic对话记录")
header_run.font.name = '仿宋'
header_run._element.rPr.rFonts.set(qn('w:eastAsia'), '仿宋')
header_run.font.size = Pt(9)
def _create_styles(self):
"""创建文档样式"""
# 创建正文样式
style = self.doc.styles.add_style('Normal_Custom', WD_STYLE_TYPE.PARAGRAPH)
style.font.name = '仿宋'
style._element.rPr.rFonts.set(qn('w:eastAsia'), '仿宋')
style.font.size = Pt(12) # 调整为12磅
style.paragraph_format.line_spacing_rule = WD_LINE_SPACING.ONE_POINT_FIVE
style.paragraph_format.space_after = Pt(0)
# 创建问题样式
question_style = self.doc.styles.add_style('Question_Style', WD_STYLE_TYPE.PARAGRAPH)
question_style.font.name = '黑体'
question_style._element.rPr.rFonts.set(qn('w:eastAsia'), '黑体')
question_style.font.size = Pt(14) # 调整为14磅
question_style.font.bold = True
question_style.paragraph_format.space_before = Pt(12) # 减小段前距
question_style.paragraph_format.space_after = Pt(6)
question_style.paragraph_format.line_spacing_rule = WD_LINE_SPACING.ONE_POINT_FIVE
question_style.paragraph_format.left_indent = Pt(0) # 移除左缩进
# 创建回答样式
answer_style = self.doc.styles.add_style('Answer_Style', WD_STYLE_TYPE.PARAGRAPH)
answer_style.font.name = '仿宋'
answer_style._element.rPr.rFonts.set(qn('w:eastAsia'), '仿宋')
answer_style.font.size = Pt(12) # 调整为12磅
answer_style.paragraph_format.space_before = Pt(6)
answer_style.paragraph_format.space_after = Pt(12)
answer_style.paragraph_format.line_spacing_rule = WD_LINE_SPACING.ONE_POINT_FIVE
answer_style.paragraph_format.left_indent = Pt(0) # 移除左缩进
# 创建标题样式
title_style = self.doc.styles.add_style('Title_Custom', WD_STYLE_TYPE.PARAGRAPH)
title_style.font.name = '黑体' # 改用黑体
title_style._element.rPr.rFonts.set(qn('w:eastAsia'), '黑体')
title_style.font.size = Pt(22) # 调整为22磅
title_style.font.bold = True
title_style.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
title_style.paragraph_format.space_before = Pt(0)
title_style.paragraph_format.space_after = Pt(24)
title_style.paragraph_format.line_spacing_rule = WD_LINE_SPACING.ONE_POINT_FIVE
# 添加参考文献样式
ref_style = self.doc.styles.add_style('Reference_Style', WD_STYLE_TYPE.PARAGRAPH)
ref_style.font.name = '宋体'
ref_style._element.rPr.rFonts.set(qn('w:eastAsia'), '宋体')
ref_style.font.size = Pt(10.5) # 参考文献使用小号字体
ref_style.paragraph_format.space_before = Pt(3)
ref_style.paragraph_format.space_after = Pt(3)
ref_style.paragraph_format.line_spacing_rule = WD_LINE_SPACING.SINGLE
ref_style.paragraph_format.left_indent = Pt(21)
ref_style.paragraph_format.first_line_indent = Pt(-21)
# 添加参考文献标题样式
ref_title_style = self.doc.styles.add_style('Reference_Title_Style', WD_STYLE_TYPE.PARAGRAPH)
ref_title_style.font.name = '黑体'
ref_title_style._element.rPr.rFonts.set(qn('w:eastAsia'), '黑体')
ref_title_style.font.size = Pt(16)
ref_title_style.font.bold = True
ref_title_style.paragraph_format.space_before = Pt(24)
ref_title_style.paragraph_format.space_after = Pt(12)
ref_title_style.paragraph_format.line_spacing_rule = WD_LINE_SPACING.ONE_POINT_FIVE
def create_document(self, history):
"""写入聊天历史"""
# 添加标题
title_para = self.doc.add_paragraph(style='Title_Custom')
title_run = title_para.add_run('GPT-Academic 对话记录')
# 添加日期
date_para = self.doc.add_paragraph()
date_para.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
date_run = date_para.add_run(datetime.now().strftime('%Y年%m月%d'))
date_run.font.name = '仿宋'
date_run._element.rPr.rFonts.set(qn('w:eastAsia'), '仿宋')
date_run.font.size = Pt(16)
self.doc.add_paragraph() # 添加空行
# 添加对话内容
for i in range(0, len(history), 2):
question = history[i]
answer = convert_markdown_to_word(history[i + 1])
if question:
q_para = self.doc.add_paragraph(style='Question_Style')
q_para.add_run(f'问题 {i//2 + 1}').bold = True
q_para.add_run(str(question))
if answer:
a_para = self.doc.add_paragraph(style='Answer_Style')
a_para.add_run(f'回答 {i//2 + 1}').bold = True
a_para.add_run(str(answer))
return self.doc

View File

@@ -0,0 +1,6 @@
import nltk
nltk.data.path.append('~/nltk_data')
nltk.download('averaged_perceptron_tagger', download_dir='~/nltk_data',
)
nltk.download('punkt', download_dir='~/nltk_data',
)

View File

@@ -0,0 +1,286 @@
from __future__ import annotations
import pandas as pd
import numpy as np
from pathlib import Path
from typing import Optional, List, Set, Dict, Union, Iterator, Tuple
from dataclasses import dataclass, field
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import chardet
from functools import lru_cache
import os
@dataclass
class ExtractorConfig:
"""提取器配置类"""
encoding: str = 'auto'
na_filter: bool = True
skip_blank_lines: bool = True
chunk_size: int = 10000
max_workers: int = 4
preserve_format: bool = True
read_all_sheets: bool = True # 新增:是否读取所有工作表
text_cleanup: Dict[str, bool] = field(default_factory=lambda: {
'remove_extra_spaces': True,
'normalize_whitespace': False,
'remove_special_chars': False,
'lowercase': False
})
class ExcelTextExtractor:
"""增强的Excel格式文件文本内容提取器"""
SUPPORTED_EXTENSIONS: Set[str] = {
'.xlsx', '.xls', '.csv', '.tsv', '.xlsm', '.xltx', '.xltm', '.ods'
}
def __init__(self, config: Optional[ExtractorConfig] = None):
self.config = config or ExtractorConfig()
self._setup_logging()
self._detect_encoding = lru_cache(maxsize=128)(self._detect_encoding)
def _setup_logging(self) -> None:
"""配置日志记录器"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
fh = logging.FileHandler('excel_extractor.log')
fh.setLevel(logging.ERROR)
self.logger.addHandler(fh)
def _detect_encoding(self, file_path: Path) -> str:
if self.config.encoding != 'auto':
return self.config.encoding
try:
with open(file_path, 'rb') as f:
raw_data = f.read(10000)
result = chardet.detect(raw_data)
return result['encoding'] or 'utf-8'
except Exception as e:
self.logger.warning(f"Encoding detection failed: {e}. Using utf-8")
return 'utf-8'
def _validate_file(self, file_path: Union[str, Path]) -> Path:
path = Path(file_path).resolve()
if not path.exists():
raise ValueError(f"File not found: {path}")
if not path.is_file():
raise ValueError(f"Not a file: {path}")
if not os.access(path, os.R_OK):
raise PermissionError(f"No read permission: {path}")
if path.suffix.lower() not in self.SUPPORTED_EXTENSIONS:
raise ValueError(
f"Unsupported format: {path.suffix}. "
f"Supported: {', '.join(sorted(self.SUPPORTED_EXTENSIONS))}"
)
return path
def _format_value(self, value: Any) -> str:
if pd.isna(value) or value is None:
return ''
if isinstance(value, (int, float)):
return str(value)
return str(value).strip()
def _process_chunk(self, chunk: pd.DataFrame, columns: Optional[List[str]] = None, sheet_name: str = '') -> str:
"""处理数据块新增sheet_name参数"""
try:
if columns:
chunk = chunk[columns]
if self.config.preserve_format:
formatted_chunk = chunk.applymap(self._format_value)
rows = []
# 添加工作表名称作为标题
if sheet_name:
rows.append(f"[Sheet: {sheet_name}]")
# 添加表头
headers = [str(col) for col in formatted_chunk.columns]
rows.append('\t'.join(headers))
# 添加数据行
for _, row in formatted_chunk.iterrows():
rows.append('\t'.join(row.values))
return '\n'.join(rows)
else:
flat_values = (
chunk.astype(str)
.replace({'nan': '', 'None': '', 'NaN': ''})
.values.flatten()
)
return ' '.join(v for v in flat_values if v)
except Exception as e:
self.logger.error(f"Error processing chunk: {e}")
raise
def _read_file(self, file_path: Path) -> Union[pd.DataFrame, Iterator[pd.DataFrame], Dict[str, pd.DataFrame]]:
"""读取文件,支持多工作表"""
try:
encoding = self._detect_encoding(file_path)
if file_path.suffix.lower() in {'.csv', '.tsv'}:
sep = '\t' if file_path.suffix.lower() == '.tsv' else ','
# 对大文件使用分块读取
if file_path.stat().st_size > self.config.chunk_size * 1024:
return pd.read_csv(
file_path,
encoding=encoding,
na_filter=self.config.na_filter,
skip_blank_lines=self.config.skip_blank_lines,
sep=sep,
chunksize=self.config.chunk_size,
on_bad_lines='warn'
)
else:
return pd.read_csv(
file_path,
encoding=encoding,
na_filter=self.config.na_filter,
skip_blank_lines=self.config.skip_blank_lines,
sep=sep
)
else:
# Excel文件处理支持多工作表
if self.config.read_all_sheets:
# 读取所有工作表
return pd.read_excel(
file_path,
na_filter=self.config.na_filter,
keep_default_na=self.config.na_filter,
engine='openpyxl',
sheet_name=None # None表示读取所有工作表
)
else:
# 只读取第一个工作表
return pd.read_excel(
file_path,
na_filter=self.config.na_filter,
keep_default_na=self.config.na_filter,
engine='openpyxl',
sheet_name=0 # 读取第一个工作表
)
except Exception as e:
self.logger.error(f"Error reading file {file_path}: {e}")
raise
def extract_text(
self,
file_path: Union[str, Path],
columns: Optional[List[str]] = None,
separator: str = '\n'
) -> str:
"""提取文本,支持多工作表"""
try:
path = self._validate_file(file_path)
self.logger.info(f"Processing: {path}")
reader = self._read_file(path)
texts = []
# 处理Excel多工作表
if isinstance(reader, dict):
for sheet_name, df in reader.items():
sheet_text = self._process_chunk(df, columns, sheet_name)
if sheet_text:
texts.append(sheet_text)
return separator.join(texts)
# 处理单个DataFrame
elif isinstance(reader, pd.DataFrame):
return self._process_chunk(reader, columns)
# 处理DataFrame迭代器
else:
with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor:
futures = {
executor.submit(self._process_chunk, chunk, columns): i
for i, chunk in enumerate(reader)
}
chunk_texts = []
for future in as_completed(futures):
try:
text = future.result()
if text:
chunk_texts.append((futures[future], text))
except Exception as e:
self.logger.error(f"Error in chunk {futures[future]}: {e}")
# 按块的顺序排序
chunk_texts.sort(key=lambda x: x[0])
texts = [text for _, text in chunk_texts]
# 合并文本,保留格式
if texts and self.config.preserve_format:
result = texts[0] # 第一块包含表头
if len(texts) > 1:
# 跳过后续块的表头行
for text in texts[1:]:
result += '\n' + '\n'.join(text.split('\n')[1:])
return result
else:
return separator.join(texts)
except Exception as e:
self.logger.error(f"Extraction failed: {e}")
raise
@staticmethod
def get_supported_formats() -> List[str]:
"""获取支持的文件格式列表"""
return sorted(ExcelTextExtractor.SUPPORTED_EXTENSIONS)
def main():
"""主函数:演示用法"""
config = ExtractorConfig(
encoding='auto',
preserve_format=True,
read_all_sheets=True, # 启用多工作表读取
text_cleanup={
'remove_extra_spaces': True,
'normalize_whitespace': False,
'remove_special_chars': False,
'lowercase': False
}
)
extractor = ExcelTextExtractor(config)
try:
sample_file = 'example.xlsx'
if Path(sample_file).exists():
text = extractor.extract_text(
sample_file,
columns=['title', 'content']
)
print("提取的文本:")
print(text)
else:
print(f"示例文件 {sample_file} 不存在")
print("\n支持的格式:", extractor.get_supported_formats())
except Exception as e:
print(f"错误: {e}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,359 @@
from __future__ import annotations
from pathlib import Path
from typing import Optional, Set, Dict, Union, List
from dataclasses import dataclass, field
import logging
import os
import re
import subprocess
import tempfile
import shutil
@dataclass
class MarkdownConverterConfig:
"""PDF 到 Markdown 转换器配置类
Attributes:
extract_images: 是否提取图片
extract_tables: 是否尝试保留表格结构
extract_code_blocks: 是否识别代码块
extract_math: 是否转换数学公式
output_dir: 输出目录路径
image_dir: 图片保存目录路径
paragraph_separator: 段落之间的分隔符
text_cleanup: 文本清理选项字典
docintel_endpoint: Document Intelligence端点URL (可选)
enable_plugins: 是否启用插件
llm_client: LLM客户端对象 (例如OpenAI client)
llm_model: 要使用的LLM模型名称
"""
extract_images: bool = True
extract_tables: bool = True
extract_code_blocks: bool = True
extract_math: bool = True
output_dir: str = ""
image_dir: str = "images"
paragraph_separator: str = '\n\n'
text_cleanup: Dict[str, bool] = field(default_factory=lambda: {
'remove_extra_spaces': True,
'normalize_whitespace': True,
'remove_special_chars': False,
'lowercase': False
})
docintel_endpoint: str = ""
enable_plugins: bool = False
llm_client: Optional[object] = None
llm_model: str = ""
class MarkdownConverter:
"""PDF 到 Markdown 转换器
使用 markitdown 库实现 PDF 到 Markdown 的转换,支持多种配置选项。
"""
SUPPORTED_EXTENSIONS: Set[str] = {
'.pdf',
}
def __init__(self, config: Optional[MarkdownConverterConfig] = None):
"""初始化转换器
Args:
config: 转换器配置对象如果为None则使用默认配置
"""
self.config = config or MarkdownConverterConfig()
self._setup_logging()
# 检查是否安装了 markitdown
self._check_markitdown_installation()
def _setup_logging(self) -> None:
"""配置日志记录器"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
# 添加文件处理器
fh = logging.FileHandler('markdown_converter.log')
fh.setLevel(logging.ERROR)
self.logger.addHandler(fh)
def _check_markitdown_installation(self) -> None:
"""检查是否安装了 markitdown"""
try:
# 尝试导入 markitdown 库
from markitdown import MarkItDown
self.logger.info("markitdown 库已安装")
except ImportError:
self.logger.warning("markitdown 库未安装,尝试安装...")
try:
subprocess.check_call(["pip", "install", "markitdown"])
self.logger.info("markitdown 库安装成功")
from markitdown import MarkItDown
except (subprocess.SubprocessError, ImportError):
self.logger.error("无法安装 markitdown 库,请手动安装")
self.markitdown_available = False
return
self.markitdown_available = True
def _validate_file(self, file_path: Union[str, Path], max_size_mb: int = 100) -> Path:
"""验证文件
Args:
file_path: 文件路径
max_size_mb: 允许的最大文件大小(MB)
Returns:
Path: 验证后的Path对象
Raises:
ValueError: 文件不存在、格式不支持或大小超限
PermissionError: 没有读取权限
"""
path = Path(file_path).resolve()
if not path.exists():
raise ValueError(f"文件不存在: {path}")
if not path.is_file():
raise ValueError(f"不是一个文件: {path}")
if not os.access(path, os.R_OK):
raise PermissionError(f"没有读取权限: {path}")
file_size_mb = path.stat().st_size / (1024 * 1024)
if file_size_mb > max_size_mb:
raise ValueError(
f"文件大小 ({file_size_mb:.1f}MB) 超过限制 {max_size_mb}MB"
)
if path.suffix.lower() not in self.SUPPORTED_EXTENSIONS:
raise ValueError(
f"不支持的格式: {path.suffix}. "
f"支持的格式: {', '.join(sorted(self.SUPPORTED_EXTENSIONS))}"
)
return path
def _cleanup_text(self, text: str) -> str:
"""清理文本
Args:
text: 原始文本
Returns:
str: 清理后的文本
"""
if self.config.text_cleanup['remove_extra_spaces']:
text = ' '.join(text.split())
if self.config.text_cleanup['normalize_whitespace']:
text = text.replace('\t', ' ').replace('\r', '\n')
if self.config.text_cleanup['lowercase']:
text = text.lower()
return text.strip()
@staticmethod
def get_supported_formats() -> List[str]:
"""获取支持的文件格式列表"""
return sorted(MarkdownConverter.SUPPORTED_EXTENSIONS)
def convert_to_markdown(
self,
file_path: Union[str, Path],
output_path: Optional[Union[str, Path]] = None
) -> str:
"""将 PDF 转换为 Markdown
Args:
file_path: PDF 文件路径
output_path: 输出 Markdown 文件路径,如果为 None 则返回内容而不保存
Returns:
str: 转换后的 Markdown 内容
Raises:
Exception: 转换过程中的错误
"""
try:
path = self._validate_file(file_path)
self.logger.info(f"处理: {path}")
if not self.markitdown_available:
raise ImportError("markitdown 库未安装,无法进行转换")
# 导入 markitdown 库
from markitdown import MarkItDown
# 准备输出目录
if output_path:
output_path = Path(output_path)
output_dir = output_path.parent
output_dir.mkdir(parents=True, exist_ok=True)
else:
# 创建临时目录作为输出目录
temp_dir = tempfile.mkdtemp()
output_dir = Path(temp_dir)
output_path = output_dir / f"{path.stem}.md"
# 图片目录
image_dir = output_dir / self.config.image_dir
image_dir.mkdir(parents=True, exist_ok=True)
# 创建 MarkItDown 实例并进行转换
if self.config.docintel_endpoint:
md = MarkItDown(docintel_endpoint=self.config.docintel_endpoint)
elif self.config.llm_client and self.config.llm_model:
md = MarkItDown(
enable_plugins=self.config.enable_plugins,
llm_client=self.config.llm_client,
llm_model=self.config.llm_model
)
else:
md = MarkItDown(enable_plugins=self.config.enable_plugins)
# 执行转换
result = md.convert(str(path))
markdown_content = result.text_content
# 清理文本
markdown_content = self._cleanup_text(markdown_content)
# 如果需要保存到文件
if output_path:
with open(output_path, 'w', encoding='utf-8') as f:
f.write(markdown_content)
self.logger.info(f"转换成功,输出到: {output_path}")
return markdown_content
except Exception as e:
self.logger.error(f"转换失败: {e}")
raise
finally:
# 如果使用了临时目录且没有指定输出路径,则清理临时目录
if 'temp_dir' in locals() and not output_path:
shutil.rmtree(temp_dir, ignore_errors=True)
def convert_to_markdown_and_save(
self,
file_path: Union[str, Path],
output_path: Union[str, Path]
) -> Path:
"""将 PDF 转换为 Markdown 并保存到指定路径
Args:
file_path: PDF 文件路径
output_path: 输出 Markdown 文件路径
Returns:
Path: 输出文件的 Path 对象
Raises:
Exception: 转换过程中的错误
"""
self.convert_to_markdown(file_path, output_path)
return Path(output_path)
def batch_convert(
self,
file_paths: List[Union[str, Path]],
output_dir: Union[str, Path]
) -> List[Path]:
"""批量转换多个 PDF 文件为 Markdown
Args:
file_paths: PDF 文件路径列表
output_dir: 输出目录路径
Returns:
List[Path]: 输出文件路径列表
Raises:
Exception: 转换过程中的错误
"""
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
output_paths = []
for file_path in file_paths:
path = Path(file_path)
output_path = output_dir / f"{path.stem}.md"
try:
self.convert_to_markdown(file_path, output_path)
output_paths.append(output_path)
self.logger.info(f"成功转换: {path} -> {output_path}")
except Exception as e:
self.logger.error(f"转换失败 {path}: {e}")
return output_paths
def main():
"""主函数:演示用法"""
# 配置
config = MarkdownConverterConfig(
extract_images=True,
extract_tables=True,
extract_code_blocks=True,
extract_math=True,
enable_plugins=False,
text_cleanup={
'remove_extra_spaces': True,
'normalize_whitespace': True,
'remove_special_chars': False,
'lowercase': False
}
)
# 创建转换器
converter = MarkdownConverter(config)
# 使用示例
try:
# 替换为实际的文件路径
sample_file = './crazy_functions/doc_fns/read_fns/paper/2501.12599v1.pdf'
if Path(sample_file).exists():
# 转换为 Markdown 并打印内容
markdown_content = converter.convert_to_markdown(sample_file)
print("转换后的 Markdown 内容:")
print(markdown_content[:500] + "...") # 只打印前500个字符
# 转换并保存到文件
output_file = f"./output_{Path(sample_file).stem}.md"
output_path = converter.convert_to_markdown_and_save(sample_file, output_file)
print(f"\n已保存到: {output_path}")
# 使用LLM增强的示例 (需要添加相应的导入和配置)
# try:
# from openai import OpenAI
# client = OpenAI()
# llm_config = MarkdownConverterConfig(
# llm_client=client,
# llm_model="gpt-4o"
# )
# llm_converter = MarkdownConverter(llm_config)
# llm_result = llm_converter.convert_to_markdown("example.jpg")
# print("LLM增强的结果:")
# print(llm_result[:500] + "...")
# except ImportError:
# print("未安装OpenAI库跳过LLM示例")
else:
print(f"示例文件 {sample_file} 不存在")
print("\n支持的格式:", converter.get_supported_formats())
except Exception as e:
print(f"错误: {e}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,493 @@
from __future__ import annotations
from pathlib import Path
from typing import Optional, Set, Dict, Union, List
from dataclasses import dataclass, field
import logging
import os
import re
from unstructured.partition.auto import partition
from unstructured.documents.elements import (
Text, Title, NarrativeText, ListItem, Table,
Footer, Header, PageBreak, Image, Address
)
@dataclass
class PaperMetadata:
"""论文元数据类"""
title: str = ""
authors: List[str] = field(default_factory=list)
affiliations: List[str] = field(default_factory=list)
journal: str = ""
volume: str = ""
issue: str = ""
year: str = ""
doi: str = ""
date: str = ""
publisher: str = ""
conference: str = ""
abstract: str = ""
keywords: List[str] = field(default_factory=list)
@dataclass
class ExtractorConfig:
"""元数据提取器配置类"""
paragraph_separator: str = '\n\n'
text_cleanup: Dict[str, bool] = field(default_factory=lambda: {
'remove_extra_spaces': True,
'normalize_whitespace': True,
'remove_special_chars': False,
'lowercase': False
})
class PaperMetadataExtractor:
"""论文元数据提取器
使用unstructured库从多种文档格式中提取论文的标题、作者、摘要等元数据信息。
"""
SUPPORTED_EXTENSIONS: Set[str] = {
'.pdf', '.docx', '.doc', '.txt', '.ppt', '.pptx',
'.xlsx', '.xls', '.md', '.org', '.odt', '.rst',
'.rtf', '.epub', '.html', '.xml', '.json'
}
# 定义论文各部分的关键词模式
SECTION_PATTERNS = {
'abstract': r'\b(摘要|abstract|summary|概要|résumé|zusammenfassung|аннотация)\b',
'keywords': r'\b(关键词|keywords|key\s+words|关键字|mots[- ]clés|schlüsselwörter|ключевые слова)\b',
}
def __init__(self, config: Optional[ExtractorConfig] = None):
"""初始化提取器
Args:
config: 提取器配置对象如果为None则使用默认配置
"""
self.config = config or ExtractorConfig()
self._setup_logging()
def _setup_logging(self) -> None:
"""配置日志记录器"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
# 添加文件处理器
fh = logging.FileHandler('paper_metadata_extractor.log')
fh.setLevel(logging.ERROR)
self.logger.addHandler(fh)
def _validate_file(self, file_path: Union[str, Path], max_size_mb: int = 100) -> Path:
"""验证文件
Args:
file_path: 文件路径
max_size_mb: 允许的最大文件大小(MB)
Returns:
Path: 验证后的Path对象
Raises:
ValueError: 文件不存在、格式不支持或大小超限
PermissionError: 没有读取权限
"""
path = Path(file_path).resolve()
if not path.exists():
raise ValueError(f"文件不存在: {path}")
if not path.is_file():
raise ValueError(f"不是文件: {path}")
if not os.access(path, os.R_OK):
raise PermissionError(f"没有读取权限: {path}")
file_size_mb = path.stat().st_size / (1024 * 1024)
if file_size_mb > max_size_mb:
raise ValueError(
f"文件大小 ({file_size_mb:.1f}MB) 超过限制 {max_size_mb}MB"
)
if path.suffix.lower() not in self.SUPPORTED_EXTENSIONS:
raise ValueError(
f"不支持的文件格式: {path.suffix}. "
f"支持的格式: {', '.join(sorted(self.SUPPORTED_EXTENSIONS))}"
)
return path
def _cleanup_text(self, text: str) -> str:
"""清理文本
Args:
text: 原始文本
Returns:
str: 清理后的文本
"""
if self.config.text_cleanup['remove_extra_spaces']:
text = ' '.join(text.split())
if self.config.text_cleanup['normalize_whitespace']:
text = text.replace('\t', ' ').replace('\r', '\n')
if self.config.text_cleanup['lowercase']:
text = text.lower()
return text.strip()
@staticmethod
def get_supported_formats() -> List[str]:
"""获取支持的文件格式列表"""
return sorted(PaperMetadataExtractor.SUPPORTED_EXTENSIONS)
def extract_metadata(self, file_path: Union[str, Path], strategy: str = "fast") -> PaperMetadata:
"""提取论文元数据
Args:
file_path: 文件路径
strategy: 提取策略 ("fast""accurate")
Returns:
PaperMetadata: 提取的论文元数据
Raises:
Exception: 提取过程中的错误
"""
try:
path = self._validate_file(file_path)
self.logger.info(f"正在处理: {path}")
# 使用unstructured库分解文档
elements = partition(
str(path),
strategy=strategy,
include_metadata=True,
nlp=False,
)
# 提取元数据
metadata = PaperMetadata()
# 提取标题和作者
self._extract_title_and_authors(elements, metadata)
# 提取摘要和关键词
self._extract_abstract_and_keywords(elements, metadata)
# 提取其他元数据
self._extract_additional_metadata(elements, metadata)
return metadata
except Exception as e:
self.logger.error(f"元数据提取失败: {e}")
raise
def _extract_title_and_authors(self, elements, metadata: PaperMetadata) -> None:
"""从文档中提取标题和作者信息 - 改进版"""
# 收集所有潜在的标题候选
title_candidates = []
all_text = []
raw_text = []
# 首先收集文档前30个元素的文本用于辅助判断
for i, element in enumerate(elements[:30]):
if isinstance(element, (Text, Title, NarrativeText)):
text = str(element).strip()
if text:
all_text.append(text)
raw_text.append(text)
# 打印出原始文本,用于调试
print("原始文本前10行:")
for i, text in enumerate(raw_text[:10]):
print(f"{i}: {text}")
# 1. 尝试查找连续的标题片段并合并它们
i = 0
while i < len(all_text) - 1:
current = all_text[i]
next_text = all_text[i + 1]
# 检查是否存在标题分割情况:一行以冒号结尾,下一行像是标题的延续
if current.endswith(':') and len(current) < 50 and len(next_text) > 5 and next_text[0].isupper():
# 合并这两行文本
combined_title = f"{current} {next_text}"
# 查找合并前的文本并替换
all_text[i] = combined_title
all_text.pop(i + 1)
# 给合并后的标题很高的分数
title_candidates.append((combined_title, 15, i))
else:
i += 1
# 2. 首先尝试从标题元素中查找
for i, element in enumerate(elements[:15]): # 只检查前15个元素
if isinstance(element, Title):
title_text = str(element).strip()
# 排除常见的非标题内容
if title_text.lower() not in ['abstract', '摘要', 'introduction', '引言']:
# 计算标题分数(越高越可能是真正的标题)
score = self._evaluate_title_candidate(title_text, i, element)
title_candidates.append((title_text, score, i))
# 3. 特别处理常见的论文标题格式
for i, text in enumerate(all_text[:15]):
# 特别检查"KIMI K1.5:"类型的前缀标题
if re.match(r'^[A-Z][A-Z0-9\s\.]+(\s+K\d+(\.\d+)?)?:', text):
score = 12 # 给予很高的分数
title_candidates.append((text, score, i))
# 如果下一行也是全大写,很可能是标题的延续
if i+1 < len(all_text) and all_text[i+1].isupper() and len(all_text[i+1]) > 10:
combined_title = f"{text} {all_text[i+1]}"
title_candidates.append((combined_title, 15, i)) # 给合并标题更高分数
# 匹配全大写的标题行
elif text.isupper() and len(text) > 10 and len(text) < 100:
score = 10 - i * 0.5 # 越靠前越可能是标题
title_candidates.append((text, score, i))
# 对标题候选按分数排序并选取最佳候选
if title_candidates:
title_candidates.sort(key=lambda x: x[1], reverse=True)
metadata.title = title_candidates[0][0]
title_position = title_candidates[0][2]
print(f"所有标题候选: {title_candidates[:3]}")
else:
# 如果没有找到合适的标题,使用一个备选策略
for text in all_text[:10]:
if text.isupper() and len(text) > 10 and len(text) < 200: # 大写且适当长度的文本
metadata.title = text
break
title_position = 0
# 提取作者信息 - 改进后的作者提取逻辑
author_candidates = []
# 1. 特别处理"TECHNICAL REPORT OF"之后的行,通常是作者或团队
for i, text in enumerate(all_text):
if "TECHNICAL REPORT" in text.upper() and i+1 < len(all_text):
team_text = all_text[i+1].strip()
if re.search(r'\b(team|group|lab)\b', team_text, re.IGNORECASE):
author_candidates.append((team_text, 15))
# 2. 查找包含Team的文本
for text in all_text[:20]:
if "Team" in text and len(text) < 30:
# 这很可能是团队名
author_candidates.append((text, 12))
# 添加作者到元数据
if author_candidates:
# 按分数排序
author_candidates.sort(key=lambda x: x[1], reverse=True)
# 去重
seen_authors = set()
for author, _ in author_candidates:
if author.lower() not in seen_authors and not author.isdigit():
seen_authors.add(author.lower())
metadata.authors.append(author)
# 如果没有找到作者,尝试查找隶属机构信息中的团队名称
if not metadata.authors:
for text in all_text[:20]:
if re.search(r'\b(team|group|lab|laboratory|研究组|团队)\b', text, re.IGNORECASE):
if len(text) < 50: # 避免太长的文本
metadata.authors.append(text.strip())
break
# 提取隶属机构信息
for i, element in enumerate(elements[:30]):
element_text = str(element).strip()
if re.search(r'(university|institute|department|school|laboratory|college|center|centre|\d{5,}|^[a-zA-Z]+@|学院|大学|研究所|研究院)', element_text, re.IGNORECASE):
# 可能是隶属机构
if element_text not in metadata.affiliations and len(element_text) > 10:
metadata.affiliations.append(element_text)
def _evaluate_title_candidate(self, text, position, element):
"""评估标题候选项的可能性分数"""
score = 0
# 位置因素:越靠前越可能是标题
score += max(0, 10 - position) * 0.5
# 长度因素:标题通常不会太短也不会太长
if 10 <= len(text) <= 150:
score += 3
elif len(text) < 10:
score -= 2
elif len(text) > 150:
score -= 3
# 格式因素
if text.isupper(): # 全大写可能是标题
score += 2
if re.match(r'^[A-Z]', text): # 首字母大写
score += 1
if ':' in text: # 标题常包含冒号
score += 1.5
# 内容因素
if re.search(r'\b(scaling|learning|model|approach|method|system|framework|analysis)\b', text.lower()):
score += 2 # 包含常见的学术论文关键词
# 避免误判
if re.match(r'^\d+$', text): # 纯数字
score -= 10
if re.search(r'^(http|www|doi)', text.lower()): # URL或DOI
score -= 5
if len(text.split()) <= 2 and len(text) < 15: # 太短的短语
score -= 3
# 元数据因素(如果有)
if hasattr(element, 'metadata') and element.metadata:
# 修复正确处理ElementMetadata对象
try:
# 尝试通过getattr安全地获取属性
font_size = getattr(element.metadata, 'font_size', None)
if font_size is not None and font_size > 14: # 假设标准字体大小是12
score += 3
font_weight = getattr(element.metadata, 'font_weight', None)
if font_weight == 'bold':
score += 2 # 粗体加分
except (AttributeError, TypeError):
# 如果metadata的访问方式不正确尝试其他可能的访问方式
try:
metadata_dict = element.metadata.__dict__ if hasattr(element.metadata, '__dict__') else {}
if 'font_size' in metadata_dict and metadata_dict['font_size'] > 14:
score += 3
if 'font_weight' in metadata_dict and metadata_dict['font_weight'] == 'bold':
score += 2
except Exception:
# 如果所有尝试都失败,忽略元数据处理
pass
return score
def _extract_abstract_and_keywords(self, elements, metadata: PaperMetadata) -> None:
"""从文档中提取摘要和关键词"""
abstract_found = False
keywords_found = False
abstract_text = []
for i, element in enumerate(elements):
element_text = str(element).strip().lower()
# 寻找摘要部分
if not abstract_found and (
isinstance(element, Title) and
re.search(self.SECTION_PATTERNS['abstract'], element_text, re.IGNORECASE)
):
abstract_found = True
continue
# 如果找到摘要部分,收集内容直到遇到关键词部分或新章节
if abstract_found and not keywords_found:
# 检查是否遇到关键词部分或新章节
if (
isinstance(element, Title) or
re.search(self.SECTION_PATTERNS['keywords'], element_text, re.IGNORECASE) or
re.match(r'\b(introduction|引言|method|方法)\b', element_text, re.IGNORECASE)
):
keywords_found = re.search(self.SECTION_PATTERNS['keywords'], element_text, re.IGNORECASE)
abstract_found = False # 停止收集摘要
else:
# 收集摘要文本
if isinstance(element, (Text, NarrativeText)) and element_text:
abstract_text.append(element_text)
# 如果找到关键词部分,提取关键词
if keywords_found and not abstract_found and not metadata.keywords:
if isinstance(element, (Text, NarrativeText)):
# 清除可能的"关键词:"/"Keywords:"前缀
cleaned_text = re.sub(r'^\s*(关键词|keywords|key\s+words)\s*[:]\s*', '', element_text, flags=re.IGNORECASE)
# 尝试按不同分隔符分割
for separator in [';', '', ',', '']:
if separator in cleaned_text:
metadata.keywords = [k.strip() for k in cleaned_text.split(separator) if k.strip()]
break
# 如果未能分割,将整个文本作为一个关键词
if not metadata.keywords and cleaned_text:
metadata.keywords = [cleaned_text]
keywords_found = False # 已提取关键词,停止处理
# 设置摘要文本
if abstract_text:
metadata.abstract = self.config.paragraph_separator.join(abstract_text)
def _extract_additional_metadata(self, elements, metadata: PaperMetadata) -> None:
"""提取其他元数据信息"""
for element in elements[:30]: # 只检查文档前部分
element_text = str(element).strip()
# 尝试匹配DOI
doi_match = re.search(r'(doi|DOI):\s*(10\.\d{4,}\/[a-zA-Z0-9.-]+)', element_text)
if doi_match and not metadata.doi:
metadata.doi = doi_match.group(2)
# 尝试匹配日期
date_match = re.search(r'(published|received|accepted|submitted):\s*(\d{1,2}\s+[a-zA-Z]+\s+\d{4}|\d{4}[-/]\d{1,2}[-/]\d{1,2})', element_text, re.IGNORECASE)
if date_match and not metadata.date:
metadata.date = date_match.group(2)
# 尝试匹配年份
year_match = re.search(r'\b(19|20)\d{2}\b', element_text)
if year_match and not metadata.year:
metadata.year = year_match.group(0)
# 尝试匹配期刊/会议名称
journal_match = re.search(r'(journal|conference):\s*([^,;.]+)', element_text, re.IGNORECASE)
if journal_match:
if "journal" in journal_match.group(1).lower() and not metadata.journal:
metadata.journal = journal_match.group(2).strip()
elif not metadata.conference:
metadata.conference = journal_match.group(2).strip()
def main():
"""主函数:演示用法"""
# 创建提取器
extractor = PaperMetadataExtractor()
# 使用示例
try:
# 替换为实际的文件路径
sample_file = '/Users/boyin.liu/Documents/示例文档/论文/3.pdf'
if Path(sample_file).exists():
metadata = extractor.extract_metadata(sample_file)
print("提取的元数据:")
print(f"标题: {metadata.title}")
print(f"作者: {', '.join(metadata.authors)}")
print(f"机构: {', '.join(metadata.affiliations)}")
print(f"摘要: {metadata.abstract[:200]}...")
print(f"关键词: {', '.join(metadata.keywords)}")
print(f"DOI: {metadata.doi}")
print(f"日期: {metadata.date}")
print(f"年份: {metadata.year}")
print(f"期刊: {metadata.journal}")
print(f"会议: {metadata.conference}")
else:
print(f"示例文件 {sample_file} 不存在")
print("\n支持的格式:", extractor.get_supported_formats())
except Exception as e:
print(f"错误: {e}")
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,86 @@
from pathlib import Path
from crazy_functions.doc_fns.read_fns.unstructured_all.paper_structure_extractor import PaperStructureExtractor
def extract_and_save_as_markdown(paper_path, output_path=None):
"""
提取论文结构并保存为Markdown格式
参数:
paper_path: 论文文件路径
output_path: 输出的Markdown文件路径如果不指定将使用与输入相同的文件名但扩展名为.md
返回:
保存的Markdown文件路径
"""
# 创建提取器
extractor = PaperStructureExtractor()
# 解析文件路径
paper_path = Path(paper_path)
# 如果未指定输出路径,使用相同文件名但扩展名为.md
if output_path is None:
output_path = paper_path.with_suffix('.md')
else:
output_path = Path(output_path)
# 确保输出目录存在
output_path.parent.mkdir(parents=True, exist_ok=True)
print(f"正在处理论文: {paper_path}")
try:
# 提取论文结构
paper = extractor.extract_paper_structure(paper_path)
# 生成Markdown内容
markdown_content = extractor.generate_markdown(paper)
# 保存到文件
with open(output_path, 'w', encoding='utf-8') as f:
f.write(markdown_content)
print(f"已成功保存Markdown文件: {output_path}")
# 打印摘要信息
print("\n论文摘要信息:")
print(f"标题: {paper.metadata.title}")
print(f"作者: {', '.join(paper.metadata.authors)}")
print(f"关键词: {', '.join(paper.keywords)}")
print(f"章节数: {len(paper.sections)}")
print(f"图表数: {len(paper.figures)}")
print(f"表格数: {len(paper.tables)}")
print(f"公式数: {len(paper.formulas)}")
print(f"参考文献数: {len(paper.references)}")
return output_path
except Exception as e:
print(f"处理论文时出错: {e}")
import traceback
traceback.print_exc()
return None
# 使用示例
if __name__ == "__main__":
# 替换为实际的论文文件路径
sample_paper = "crazy_functions/doc_fns/read_fns/paper/2501.12599v1.pdf"
# 可以指定输出路径,也可以使用默认路径
# output_file = "/path/to/output/paper_structure.md"
# extract_and_save_as_markdown(sample_paper, output_file)
# 使用默认输出路径(与输入文件同名但扩展名为.md
extract_and_save_as_markdown(sample_paper)
# # 批量处理多个论文的示例
# paper_dir = Path("/path/to/papers/folder")
# output_dir = Path("/path/to/output/folder")
#
# # 确保输出目录存在
# output_dir.mkdir(parents=True, exist_ok=True)
#
# # 处理目录中的所有PDF文件
# for paper_file in paper_dir.glob("*.pdf"):
# output_file = output_dir / f"{paper_file.stem}.md"
# extract_and_save_as_markdown(paper_file, output_file)

View File

@@ -0,0 +1,275 @@
from __future__ import annotations
from pathlib import Path
from typing import Optional, Set, Dict, Union, List
from dataclasses import dataclass, field
import logging
import os
from unstructured.partition.auto import partition
from unstructured.documents.elements import (
Text, Title, NarrativeText, ListItem, Table,
Footer, Header, PageBreak, Image, Address
)
@dataclass
class TextExtractorConfig:
"""通用文档提取器配置类
Attributes:
extract_headers_footers: 是否提取页眉页脚
extract_tables: 是否提取表格内容
extract_lists: 是否提取列表内容
extract_titles: 是否提取标题
paragraph_separator: 段落之间的分隔符
text_cleanup: 文本清理选项字典
"""
extract_headers_footers: bool = False
extract_tables: bool = True
extract_lists: bool = True
extract_titles: bool = True
paragraph_separator: str = '\n\n'
text_cleanup: Dict[str, bool] = field(default_factory=lambda: {
'remove_extra_spaces': True,
'normalize_whitespace': True,
'remove_special_chars': False,
'lowercase': False
})
class UnstructuredTextExtractor:
"""通用文档文本内容提取器
使用 unstructured 库支持多种文档格式的文本提取,提供统一的接口和配置选项。
"""
SUPPORTED_EXTENSIONS: Set[str] = {
# 文档格式
'.pdf', '.docx', '.doc', '.txt',
# 演示文稿
'.ppt', '.pptx',
# 电子表格
'.xlsx', '.xls', '.csv',
# 图片
'.png', '.jpg', '.jpeg', '.tiff',
# 邮件
'.eml', '.msg', '.p7s',
# Markdown
".md",
# Org Mode
".org",
# Open Office
".odt",
# reStructured Text
".rst",
# Rich Text
".rtf",
# TSV
".tsv",
# EPUB
'.epub',
# 其他格式
'.html', '.xml', '.json',
}
def __init__(self, config: Optional[TextExtractorConfig] = None):
"""初始化提取器
Args:
config: 提取器配置对象如果为None则使用默认配置
"""
self.config = config or TextExtractorConfig()
self._setup_logging()
def _setup_logging(self) -> None:
"""配置日志记录器"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
# 添加文件处理器
fh = logging.FileHandler('text_extractor.log')
fh.setLevel(logging.ERROR)
self.logger.addHandler(fh)
def _validate_file(self, file_path: Union[str, Path], max_size_mb: int = 100) -> Path:
"""验证文件
Args:
file_path: 文件路径
max_size_mb: 允许的最大文件大小(MB)
Returns:
Path: 验证后的Path对象
Raises:
ValueError: 文件不存在、格式不支持或大小超限
PermissionError: 没有读取权限
"""
path = Path(file_path).resolve()
if not path.exists():
raise ValueError(f"File not found: {path}")
if not path.is_file():
raise ValueError(f"Not a file: {path}")
if not os.access(path, os.R_OK):
raise PermissionError(f"No read permission: {path}")
file_size_mb = path.stat().st_size / (1024 * 1024)
if file_size_mb > max_size_mb:
raise ValueError(
f"File size ({file_size_mb:.1f}MB) exceeds limit of {max_size_mb}MB"
)
if path.suffix.lower() not in self.SUPPORTED_EXTENSIONS:
raise ValueError(
f"Unsupported format: {path.suffix}. "
f"Supported: {', '.join(sorted(self.SUPPORTED_EXTENSIONS))}"
)
return path
def _cleanup_text(self, text: str) -> str:
"""清理文本
Args:
text: 原始文本
Returns:
str: 清理后的文本
"""
if self.config.text_cleanup['remove_extra_spaces']:
text = ' '.join(text.split())
if self.config.text_cleanup['normalize_whitespace']:
text = text.replace('\t', ' ').replace('\r', '\n')
if self.config.text_cleanup['lowercase']:
text = text.lower()
return text.strip()
def _should_extract_element(self, element) -> bool:
"""判断是否应该提取某个元素
Args:
element: 文档元素
Returns:
bool: 是否应该提取
"""
if isinstance(element, (Text, NarrativeText)):
return True
if isinstance(element, Title) and self.config.extract_titles:
return True
if isinstance(element, ListItem) and self.config.extract_lists:
return True
if isinstance(element, Table) and self.config.extract_tables:
return True
if isinstance(element, (Header, Footer)) and self.config.extract_headers_footers:
return True
return False
@staticmethod
def get_supported_formats() -> List[str]:
"""获取支持的文件格式列表"""
return sorted(UnstructuredTextExtractor.SUPPORTED_EXTENSIONS)
def extract_text(
self,
file_path: Union[str, Path],
strategy: str = "fast"
) -> str:
"""提取文本
Args:
file_path: 文件路径
strategy: 提取策略 ("fast""accurate")
Returns:
str: 提取的文本内容
Raises:
Exception: 提取过程中的错误
"""
try:
path = self._validate_file(file_path)
self.logger.info(f"Processing: {path}")
# 修改这里:添加 nlp=False 参数来禁用 NLTK
elements = partition(
str(path),
strategy=strategy,
include_metadata=True,
nlp=True,
)
# 其余代码保持不变
text_parts = []
for element in elements:
if self._should_extract_element(element):
text = str(element)
cleaned_text = self._cleanup_text(text)
if cleaned_text:
if isinstance(element, (Header, Footer)):
prefix = "[Header] " if isinstance(element, Header) else "[Footer] "
text_parts.append(f"{prefix}{cleaned_text}")
else:
text_parts.append(cleaned_text)
return self.config.paragraph_separator.join(text_parts)
except Exception as e:
self.logger.error(f"Extraction failed: {e}")
raise
def main():
"""主函数:演示用法"""
# 配置
config = TextExtractorConfig(
extract_headers_footers=True,
extract_tables=True,
extract_lists=True,
extract_titles=True,
text_cleanup={
'remove_extra_spaces': True,
'normalize_whitespace': True,
'remove_special_chars': False,
'lowercase': False
}
)
# 创建提取器
extractor = UnstructuredTextExtractor(config)
# 使用示例
try:
# 替换为实际的文件路径
sample_file = './crazy_functions/doc_fns/read_fns/paper/2501.12599v1.pdf'
if Path(sample_file).exists() or True:
text = extractor.extract_text(sample_file)
print("提取的文本:")
print(text)
else:
print(f"示例文件 {sample_file} 不存在")
print("\n支持的格式:", extractor.get_supported_formats())
except Exception as e:
print(f"错误: {e}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,219 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Dict, Optional, Union
from urllib.parse import urlparse
import logging
import trafilatura
import requests
from pathlib import Path
@dataclass
class WebExtractorConfig:
"""网页内容提取器配置类
Attributes:
extract_comments: 是否提取评论
extract_tables: 是否提取表格
extract_links: 是否保留链接信息
paragraph_separator: 段落分隔符
timeout: 网络请求超时时间(秒)
max_retries: 最大重试次数
user_agent: 自定义User-Agent
text_cleanup: 文本清理选项
"""
extract_comments: bool = False
extract_tables: bool = True
extract_links: bool = False
paragraph_separator: str = '\n\n'
timeout: int = 10
max_retries: int = 3
user_agent: str = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
text_cleanup: Dict[str, bool] = field(default_factory=lambda: {
'remove_extra_spaces': True,
'normalize_whitespace': True,
'remove_special_chars': False,
'lowercase': False
})
class WebTextExtractor:
"""网页文本内容提取器
使用trafilatura库提取网页中的主要文本内容去除广告、导航等无关内容。
"""
def __init__(self, config: Optional[WebExtractorConfig] = None):
"""初始化提取器
Args:
config: 提取器配置对象如果为None则使用默认配置
"""
self.config = config or WebExtractorConfig()
self._setup_logging()
def _setup_logging(self) -> None:
"""配置日志记录器"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
# 添加文件处理器
fh = logging.FileHandler('web_extractor.log')
fh.setLevel(logging.ERROR)
self.logger.addHandler(fh)
def _validate_url(self, url: str) -> bool:
"""验证URL格式是否有效
Args:
url: 网页URL
Returns:
bool: URL是否有效
"""
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except Exception:
return False
def _download_webpage(self, url: str) -> Optional[str]:
"""下载网页内容
Args:
url: 网页URL
Returns:
Optional[str]: 网页HTML内容失败返回None
Raises:
Exception: 下载失败时抛出异常
"""
headers = {'User-Agent': self.config.user_agent}
for attempt in range(self.config.max_retries):
try:
response = requests.get(
url,
headers=headers,
timeout=self.config.timeout
)
response.raise_for_status()
return response.text
except requests.RequestException as e:
self.logger.warning(f"Attempt {attempt + 1} failed: {e}")
if attempt == self.config.max_retries - 1:
raise Exception(f"Failed to download webpage after {self.config.max_retries} attempts: {e}")
return None
def _cleanup_text(self, text: str) -> str:
"""清理文本
Args:
text: 原始文本
Returns:
str: 清理后的文本
"""
if not text:
return ""
if self.config.text_cleanup['remove_extra_spaces']:
text = ' '.join(text.split())
if self.config.text_cleanup['normalize_whitespace']:
text = text.replace('\t', ' ').replace('\r', '\n')
if self.config.text_cleanup['lowercase']:
text = text.lower()
return text.strip()
def extract_text(self, url: str) -> str:
"""提取网页文本内容
Args:
url: 网页URL
Returns:
str: 提取的文本内容
Raises:
ValueError: URL无效时抛出
Exception: 提取失败时抛出
"""
try:
if not self._validate_url(url):
raise ValueError(f"Invalid URL: {url}")
self.logger.info(f"Processing URL: {url}")
# 下载网页
html_content = self._download_webpage(url)
if not html_content:
raise Exception("Failed to download webpage")
# 配置trafilatura提取选项
extract_config = {
'include_comments': self.config.extract_comments,
'include_tables': self.config.extract_tables,
'include_links': self.config.extract_links,
'no_fallback': False, # 允许使用后备提取器
}
# 提取文本
extracted_text = trafilatura.extract(
html_content,
**extract_config
)
if not extracted_text:
raise Exception("No content could be extracted")
# 清理文本
cleaned_text = self._cleanup_text(extracted_text)
return cleaned_text
except Exception as e:
self.logger.error(f"Extraction failed: {e}")
raise
def main():
"""主函数:演示用法"""
# 配置
config = WebExtractorConfig(
extract_comments=False,
extract_tables=True,
extract_links=False,
timeout=10,
text_cleanup={
'remove_extra_spaces': True,
'normalize_whitespace': True,
'remove_special_chars': False,
'lowercase': False
}
)
# 创建提取器
extractor = WebTextExtractor(config)
# 使用示例
try:
# 替换为实际的URL
sample_url = 'https://arxiv.org/abs/2412.00036'
text = extractor.extract_text(sample_url)
print("提取的文本:")
print(text)
except Exception as e:
print(f"错误: {e}")
if __name__ == "__main__":
main()