This commit is contained in:
van
2026-04-26 13:55:54 +08:00
commit 83c48dbed9
24 changed files with 1955 additions and 0 deletions

479
.py Normal file
View File

@@ -0,0 +1,479 @@
import time
import json
import re
import os
import platform
from DrissionPage import ChromiumPage, ChromiumOptions
# Ubuntu 上常见的 Chrome/Chromium 路径
UBUNTU_CHROME_PATHS = [
'/usr/bin/google-chrome',
'/usr/bin/google-chrome-stable',
'/usr/bin/chromium-browser',
'/usr/bin/chromium',
'/snap/bin/chromium',
'/opt/google/chrome/chrome',
]
# 是否使用无头模式headless
# True: 无界面模式,适合服务器环境
# False: 有界面模式,需要 X11 或 Wayland
USE_HEADLESS = True # 可以根据需要修改
# 全局浏览器实例
global_page = None
def find_chrome_path():
"""自动查找 Ubuntu 系统中的 Chrome/Chromium 路径"""
print("正在查找 Chrome/Chromium 浏览器...")
# 首先尝试常见的路径
for path in UBUNTU_CHROME_PATHS:
if os.path.exists(path):
print(f"✅ 找到浏览器: {path}")
return path
# 尝试使用 which 命令查找
import subprocess
try:
result = subprocess.run(['which', 'google-chrome'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0 and os.path.exists(result.stdout.strip()):
path = result.stdout.strip()
print(f"✅ 通过 which 找到浏览器: {path}")
return path
except:
pass
try:
result = subprocess.run(['which', 'chromium-browser'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0 and os.path.exists(result.stdout.strip()):
path = result.stdout.strip()
print(f"✅ 通过 which 找到浏览器: {path}")
return path
except:
pass
# 如果都找不到,返回最常见的路径
default_path = '/usr/bin/google-chrome'
print(f"⚠️ 未找到浏览器,将使用默认路径: {default_path}")
print("请确保已安装 Google Chrome 或 Chromium:")
print(" sudo apt update")
print(" sudo apt install -y google-chrome-stable")
print(" 或者")
print(" sudo apt install -y chromium-browser")
return default_path
def get_global_browser():
"""获取全局浏览器实例Ubuntu 版本)"""
global global_page
if global_page is None:
print("="*60)
print("Ubuntu 浏览器初始化")
print("="*60)
# 检查操作系统
if platform.system() != 'Linux':
print(f"⚠️ 警告: 当前系统是 {platform.system()},此脚本专为 Ubuntu 设计")
# 查找 Chrome 路径
chrome_path = find_chrome_path()
options = ChromiumOptions()
options.set_browser_path(chrome_path)
# Ubuntu 服务器环境通常使用无头模式
if USE_HEADLESS:
print("配置为无头模式headless...")
try:
options.headless(True)
except:
# 如果 headless 方法不存在,使用参数
try:
options.set_argument('--headless=new')
options.set_argument('--no-sandbox')
options.set_argument('--disable-dev-shm-usage')
except:
pass
else:
print("配置为有界面模式...")
# 检查是否有显示环境
display = os.environ.get('DISPLAY')
if not display:
print("⚠️ 警告: 未检测到 DISPLAY 环境变量")
print("如果无法显示浏览器,请:")
print(" 1. 设置 USE_HEADLESS = True")
print(" 2. 或者设置 DISPLAY 环境变量(如 DISPLAY=:0")
print(" 3. 或者使用 Xvfb虚拟显示")
# Linux 特定参数
try:
options.set_argument('--no-sandbox') # 在某些环境下需要
options.set_argument('--disable-dev-shm-usage') # 避免 /dev/shm 空间不足
options.set_argument('--disable-gpu') # 禁用 GPU可选在 headless 模式下有用)
except:
pass
print(f"正在启动浏览器...")
print(f"浏览器路径: {chrome_path}")
if USE_HEADLESS:
print("模式: 无头模式(后台运行)")
else:
print("模式: 有界面模式")
try:
global_page = ChromiumPage(options)
print("✅ 浏览器已成功启动!")
time.sleep(2) # 等待浏览器完全启动
except Exception as e:
print(f"❌ 浏览器启动失败: {e}")
print("\n可能的解决方案:")
print("1. 确保已安装 Chrome/Chromium:")
print(" sudo apt update")
print(" sudo apt install -y google-chrome-stable")
print("2. 如果使用无头模式失败,尝试设置 USE_HEADLESS = False")
print("3. 确保有足够的权限")
print("4. 检查是否缺少依赖:")
print(" sudo apt install -y libnss3 libatk-bridge2.0-0 libdrm2 libxkbcommon0 libxcomposite1 libxdamage1 libxfixes3 libxrandr2 libgbm1 libasound2")
import traceback
traceback.print_exc()
raise
else:
print("使用已存在的浏览器实例")
return global_page
def extract_logistics_info(tracking_url):
"""
从京东物流追踪页面提取运单号、承运人等信息Ubuntu 版本)
Args:
tracking_url: 物流追踪页面 URL例如 https://3.cn/2t-Iibig
Returns:
dict: 包含运单号、承运人、承运人电话、物流跟踪信息等的字典
"""
page = get_global_browser()
try:
print(f"\n正在打开物流追踪页面: {tracking_url}")
page.get(tracking_url)
print("页面加载中,请稍候...")
time.sleep(5) # 等待页面加载
# 检查页面是否成功加载
current_url = page.url
print(f"当前页面 URL: {current_url}")
# 检查页面标题
try:
title = page.title
print(f"页面标题: {title}")
except:
print("无法获取页面标题")
# 检查页面是否有内容
try:
html_length = len(page.html)
print(f"页面 HTML 长度: {html_length} 字符")
if html_length < 100:
print("⚠️ 警告: 页面内容可能未完全加载")
except Exception as e:
print(f"⚠️ 无法获取页面 HTML: {e}")
result = {
"waybill_no": None, # 运单号
"carrier": None, # 国内承运人
"carrier_phone": None, # 国内承运人电话
"tracking_info": [], # 物流跟踪信息列表
"raw_html": None # 原始 HTML用于调试
}
# 方法1: 监听网络请求,查找物流数据 API
print("\n方法1: 监听网络请求...")
page.listen.start()
# 滚动页面触发可能的请求
page.scroll.down(300)
time.sleep(2)
page.scroll.to_bottom()
time.sleep(3)
# 检查监听到的请求
responses = page.listen.get()
print(f"监听到 {len(responses)} 个请求")
# 查找可能的物流数据接口
possible_urls = [
'track', 'logistics', 'waybill', 'express',
'delivery', '3.cn', 'jd.com/logistics',
'api.m.jd.com', 'mapi.jd.com'
]
for resp in responses:
url = resp.url if hasattr(resp, 'url') else ''
url_lower = url.lower()
# 检查是否可能是物流相关的 API
if any(keyword in url_lower for keyword in possible_urls):
print(f"发现可能的物流 API: {url[:100]}")
try:
if hasattr(resp, 'response') and hasattr(resp.response, 'body'):
body = resp.response.body
# 处理 JSON 响应
if isinstance(body, dict):
json_data = body
elif isinstance(body, str):
try:
json_data = json.loads(body)
except:
continue
else:
continue
# 尝试从 JSON 中提取运单号等信息
extracted = extract_from_json(json_data)
if extracted:
result.update(extracted)
print("成功从 API 响应中提取数据")
return result
except Exception as e:
print(f"解析 API 响应时出错: {e}")
# 方法2: 从页面 HTML/DOM 中提取
print("\n方法2: 从页面 DOM 提取数据...")
html = page.html
result['raw_html'] = html[:5000] # 保存部分 HTML 用于调试
# 从 HTML 文本中提取运单号
waybill_patterns = [
r'运单号[:\s]*(\d+)',
r'waybill[_\s]*no["\']?\s*[:]\s*["\']?(\d+)',
r'tracking[_\s]*number["\']?\s*[:]\s*["\']?(\d+)',
r'"waybillNo"\s*[:]\s*["\']?(\d+)',
r'"trackingNumber"\s*[:]\s*["\']?(\d+)',
]
for pattern in waybill_patterns:
matches = re.findall(pattern, html, re.IGNORECASE)
if matches:
result['waybill_no'] = matches[0]
print(f"找到运单号: {result['waybill_no']}")
break
# 提取承运人
carrier_patterns = [
r'国内承运人[:\s]*([^\s<,]+)',
r'carrier[:\s]*([^\s<,]+)',
r'"carrier"\s*[:]\s*["\']?([^"\']+)',
]
for pattern in carrier_patterns:
matches = re.findall(pattern, html, re.IGNORECASE)
if matches:
result['carrier'] = matches[0].strip()
print(f"找到承运人: {result['carrier']}")
break
# 提取承运人电话
phone_patterns = [
r'国内承运人电话[:\s]*(\d+)',
r'carrier[_\s]*phone[:\s]*(\d+)',
r'"carrierPhone"\s*[:]\s*["\']?(\d+)',
]
for pattern in phone_patterns:
matches = re.findall(pattern, html, re.IGNORECASE)
if matches:
result['carrier_phone'] = matches[0]
print(f"找到承运人电话: {result['carrier_phone']}")
break
# 方法3: 从 DOM 元素中提取
print("\n方法3: 从 DOM 元素提取数据...")
# 尝试查找运单号元素
waybill_elements = page.eles('xpath=//*[contains(text(), "运单号") or contains(text(), "运单")]')
for elem in waybill_elements:
text = elem.text
parent_text = elem.parent().text if elem.parent() else ""
full_text = text + " " + parent_text
# 从文本中提取数字作为运单号
numbers = re.findall(r'\d{8,}', full_text)
if numbers and not result['waybill_no']:
result['waybill_no'] = numbers[0]
print(f"从元素文本中找到运单号: {result['waybill_no']}")
# 提取承运人
if '承运人' in text and not result['carrier']:
carrier_match = re.search(r'承运人[:\s]*([^\s<,]+)', full_text)
if carrier_match:
result['carrier'] = carrier_match.group(1).strip()
print(f"从元素文本中找到承运人: {result['carrier']}")
# 提取电话
if '电话' in text and not result['carrier_phone']:
phone_match = re.search(r'电话[:\s]*(\d+)', full_text)
if phone_match:
result['carrier_phone'] = phone_match.group(1)
print(f"从元素文本中找到电话: {result['carrier_phone']}")
# 提取物流跟踪信息(时间线)
print("\n提取物流跟踪信息...")
tracking_elements = page.eles('xpath=//*[contains(@class, "track") or contains(@class, "logistics") or contains(@class, "timeline")]')
if not tracking_elements:
# 尝试查找包含时间戳的元素
tracking_elements = page.eles('xpath=//*[contains(text(), "2025") or contains(text(), "货物") or contains(text(), "到达")]')
tracking_info = []
for elem in tracking_elements[:20]: # 限制数量
text = elem.text
if text and len(text) > 5:
# 尝试提取时间戳
time_match = re.search(r'(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})', text)
if time_match or any(keyword in text for keyword in ['货物', '到达', '揽收', '运输', '配送', '签收']):
tracking_info.append({
'text': text.strip(),
'time': time_match.group(1) if time_match else None
})
result['tracking_info'] = tracking_info[:10] # 最多保存10条
return result
except Exception as e:
print(f"提取物流信息时出错: {e}")
import traceback
traceback.print_exc()
return None
def extract_from_json(json_data):
"""
从 JSON 数据中提取物流信息
Args:
json_data: JSON 字典
Returns:
dict: 提取到的物流信息
"""
result = {}
def search_dict(d, key_patterns):
"""递归搜索字典中的值"""
if isinstance(d, dict):
for k, v in d.items():
# 检查键名
for pattern in key_patterns:
if re.search(pattern, k, re.IGNORECASE):
return v
# 递归搜索值
if isinstance(v, (dict, list)):
found = search_dict(v, key_patterns)
if found:
return found
elif isinstance(d, list):
for item in d:
found = search_dict(item, key_patterns)
if found:
return found
return None
# 搜索运单号
waybill = search_dict(json_data, [r'waybill', r'tracking.*number', r'运单号', r'waybillNo'])
if waybill:
result['waybill_no'] = str(waybill)
# 搜索承运人
carrier = search_dict(json_data, [r'carrier', r'承运人', r'carrierName'])
if carrier:
result['carrier'] = str(carrier)
# 搜索承运人电话
phone = search_dict(json_data, [r'carrier.*phone', r'承运人电话', r'carrierPhone', r'phone'])
if phone:
result['carrier_phone'] = str(phone)
# 搜索物流跟踪信息
tracking = search_dict(json_data, [r'track', r'logistics', r'物流', r'轨迹', r'history'])
if tracking:
if isinstance(tracking, list):
result['tracking_info'] = tracking
elif isinstance(tracking, dict):
result['tracking_info'] = [tracking]
return result if result else None
def print_result(result):
"""打印提取结果"""
if not result:
print("未能提取到物流信息")
return
print("\n" + "="*50)
print("物流信息提取结果:")
print("="*50)
print(f"运单号: {result.get('waybill_no', '未找到')}")
print(f"国内承运人: {result.get('carrier', '未找到')}")
print(f"国内承运人电话: {result.get('carrier_phone', '未找到')}")
if result.get('tracking_info'):
print(f"\n物流跟踪信息 (共 {len(result['tracking_info'])} 条):")
for idx, info in enumerate(result['tracking_info'], 1):
if isinstance(info, dict):
text = info.get('text', str(info))
time_str = info.get('time', '')
print(f" {idx}. {text}")
if time_str:
print(f" 时间: {time_str}")
else:
print(f" {idx}. {info}")
else:
print("\n物流跟踪信息: 未找到")
print("="*50)
# 主程序
if __name__ == '__main__':
# 测试 URL
tracking_url = "https://3.cn/2t-Iibig"
print("="*60)
print("京东物流信息提取工具 (Ubuntu 版本)")
print("="*60)
print(f"目标 URL: {tracking_url}")
print(f"无头模式: {'' if USE_HEADLESS else ''}")
print("开始提取物流信息...\n")
try:
result = extract_logistics_info(tracking_url)
except Exception as e:
print(f"\n❌ 执行过程中出错: {e}")
import traceback
traceback.print_exc()
result = None
if result:
print_result(result)
# 保存结果到文件
output_file = "logistics_result.json"
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
print(f"\n结果已保存到: {output_file}")
else:
print("提取失败")
print("\n脚本执行完成")