Files
jd_wl_python/.py
2026-04-26 13:55:54 +08:00

480 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import time
import json
import re
import os
import platform
from DrissionPage import ChromiumPage, ChromiumOptions
# Ubuntu 上常见的 Chrome/Chromium 路径
UBUNTU_CHROME_PATHS = [
'/usr/bin/google-chrome',
'/usr/bin/google-chrome-stable',
'/usr/bin/chromium-browser',
'/usr/bin/chromium',
'/snap/bin/chromium',
'/opt/google/chrome/chrome',
]
# 是否使用无头模式headless
# True: 无界面模式,适合服务器环境
# False: 有界面模式,需要 X11 或 Wayland
USE_HEADLESS = True # 可以根据需要修改
# 全局浏览器实例
global_page = None
def find_chrome_path():
"""自动查找 Ubuntu 系统中的 Chrome/Chromium 路径"""
print("正在查找 Chrome/Chromium 浏览器...")
# 首先尝试常见的路径
for path in UBUNTU_CHROME_PATHS:
if os.path.exists(path):
print(f"✅ 找到浏览器: {path}")
return path
# 尝试使用 which 命令查找
import subprocess
try:
result = subprocess.run(['which', 'google-chrome'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0 and os.path.exists(result.stdout.strip()):
path = result.stdout.strip()
print(f"✅ 通过 which 找到浏览器: {path}")
return path
except:
pass
try:
result = subprocess.run(['which', 'chromium-browser'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0 and os.path.exists(result.stdout.strip()):
path = result.stdout.strip()
print(f"✅ 通过 which 找到浏览器: {path}")
return path
except:
pass
# 如果都找不到,返回最常见的路径
default_path = '/usr/bin/google-chrome'
print(f"⚠️ 未找到浏览器,将使用默认路径: {default_path}")
print("请确保已安装 Google Chrome 或 Chromium:")
print(" sudo apt update")
print(" sudo apt install -y google-chrome-stable")
print(" 或者")
print(" sudo apt install -y chromium-browser")
return default_path
def get_global_browser():
"""获取全局浏览器实例Ubuntu 版本)"""
global global_page
if global_page is None:
print("="*60)
print("Ubuntu 浏览器初始化")
print("="*60)
# 检查操作系统
if platform.system() != 'Linux':
print(f"⚠️ 警告: 当前系统是 {platform.system()},此脚本专为 Ubuntu 设计")
# 查找 Chrome 路径
chrome_path = find_chrome_path()
options = ChromiumOptions()
options.set_browser_path(chrome_path)
# Ubuntu 服务器环境通常使用无头模式
if USE_HEADLESS:
print("配置为无头模式headless...")
try:
options.headless(True)
except:
# 如果 headless 方法不存在,使用参数
try:
options.set_argument('--headless=new')
options.set_argument('--no-sandbox')
options.set_argument('--disable-dev-shm-usage')
except:
pass
else:
print("配置为有界面模式...")
# 检查是否有显示环境
display = os.environ.get('DISPLAY')
if not display:
print("⚠️ 警告: 未检测到 DISPLAY 环境变量")
print("如果无法显示浏览器,请:")
print(" 1. 设置 USE_HEADLESS = True")
print(" 2. 或者设置 DISPLAY 环境变量(如 DISPLAY=:0")
print(" 3. 或者使用 Xvfb虚拟显示")
# Linux 特定参数
try:
options.set_argument('--no-sandbox') # 在某些环境下需要
options.set_argument('--disable-dev-shm-usage') # 避免 /dev/shm 空间不足
options.set_argument('--disable-gpu') # 禁用 GPU可选在 headless 模式下有用)
except:
pass
print(f"正在启动浏览器...")
print(f"浏览器路径: {chrome_path}")
if USE_HEADLESS:
print("模式: 无头模式(后台运行)")
else:
print("模式: 有界面模式")
try:
global_page = ChromiumPage(options)
print("✅ 浏览器已成功启动!")
time.sleep(2) # 等待浏览器完全启动
except Exception as e:
print(f"❌ 浏览器启动失败: {e}")
print("\n可能的解决方案:")
print("1. 确保已安装 Chrome/Chromium:")
print(" sudo apt update")
print(" sudo apt install -y google-chrome-stable")
print("2. 如果使用无头模式失败,尝试设置 USE_HEADLESS = False")
print("3. 确保有足够的权限")
print("4. 检查是否缺少依赖:")
print(" sudo apt install -y libnss3 libatk-bridge2.0-0 libdrm2 libxkbcommon0 libxcomposite1 libxdamage1 libxfixes3 libxrandr2 libgbm1 libasound2")
import traceback
traceback.print_exc()
raise
else:
print("使用已存在的浏览器实例")
return global_page
def extract_logistics_info(tracking_url):
"""
从京东物流追踪页面提取运单号、承运人等信息Ubuntu 版本)
Args:
tracking_url: 物流追踪页面 URL例如 https://3.cn/2t-Iibig
Returns:
dict: 包含运单号、承运人、承运人电话、物流跟踪信息等的字典
"""
page = get_global_browser()
try:
print(f"\n正在打开物流追踪页面: {tracking_url}")
page.get(tracking_url)
print("页面加载中,请稍候...")
time.sleep(5) # 等待页面加载
# 检查页面是否成功加载
current_url = page.url
print(f"当前页面 URL: {current_url}")
# 检查页面标题
try:
title = page.title
print(f"页面标题: {title}")
except:
print("无法获取页面标题")
# 检查页面是否有内容
try:
html_length = len(page.html)
print(f"页面 HTML 长度: {html_length} 字符")
if html_length < 100:
print("⚠️ 警告: 页面内容可能未完全加载")
except Exception as e:
print(f"⚠️ 无法获取页面 HTML: {e}")
result = {
"waybill_no": None, # 运单号
"carrier": None, # 国内承运人
"carrier_phone": None, # 国内承运人电话
"tracking_info": [], # 物流跟踪信息列表
"raw_html": None # 原始 HTML用于调试
}
# 方法1: 监听网络请求,查找物流数据 API
print("\n方法1: 监听网络请求...")
page.listen.start()
# 滚动页面触发可能的请求
page.scroll.down(300)
time.sleep(2)
page.scroll.to_bottom()
time.sleep(3)
# 检查监听到的请求
responses = page.listen.get()
print(f"监听到 {len(responses)} 个请求")
# 查找可能的物流数据接口
possible_urls = [
'track', 'logistics', 'waybill', 'express',
'delivery', '3.cn', 'jd.com/logistics',
'api.m.jd.com', 'mapi.jd.com'
]
for resp in responses:
url = resp.url if hasattr(resp, 'url') else ''
url_lower = url.lower()
# 检查是否可能是物流相关的 API
if any(keyword in url_lower for keyword in possible_urls):
print(f"发现可能的物流 API: {url[:100]}")
try:
if hasattr(resp, 'response') and hasattr(resp.response, 'body'):
body = resp.response.body
# 处理 JSON 响应
if isinstance(body, dict):
json_data = body
elif isinstance(body, str):
try:
json_data = json.loads(body)
except:
continue
else:
continue
# 尝试从 JSON 中提取运单号等信息
extracted = extract_from_json(json_data)
if extracted:
result.update(extracted)
print("成功从 API 响应中提取数据")
return result
except Exception as e:
print(f"解析 API 响应时出错: {e}")
# 方法2: 从页面 HTML/DOM 中提取
print("\n方法2: 从页面 DOM 提取数据...")
html = page.html
result['raw_html'] = html[:5000] # 保存部分 HTML 用于调试
# 从 HTML 文本中提取运单号
waybill_patterns = [
r'运单号[:\s]*(\d+)',
r'waybill[_\s]*no["\']?\s*[:]\s*["\']?(\d+)',
r'tracking[_\s]*number["\']?\s*[:]\s*["\']?(\d+)',
r'"waybillNo"\s*[:]\s*["\']?(\d+)',
r'"trackingNumber"\s*[:]\s*["\']?(\d+)',
]
for pattern in waybill_patterns:
matches = re.findall(pattern, html, re.IGNORECASE)
if matches:
result['waybill_no'] = matches[0]
print(f"找到运单号: {result['waybill_no']}")
break
# 提取承运人
carrier_patterns = [
r'国内承运人[:\s]*([^\s<,]+)',
r'carrier[:\s]*([^\s<,]+)',
r'"carrier"\s*[:]\s*["\']?([^"\']+)',
]
for pattern in carrier_patterns:
matches = re.findall(pattern, html, re.IGNORECASE)
if matches:
result['carrier'] = matches[0].strip()
print(f"找到承运人: {result['carrier']}")
break
# 提取承运人电话
phone_patterns = [
r'国内承运人电话[:\s]*(\d+)',
r'carrier[_\s]*phone[:\s]*(\d+)',
r'"carrierPhone"\s*[:]\s*["\']?(\d+)',
]
for pattern in phone_patterns:
matches = re.findall(pattern, html, re.IGNORECASE)
if matches:
result['carrier_phone'] = matches[0]
print(f"找到承运人电话: {result['carrier_phone']}")
break
# 方法3: 从 DOM 元素中提取
print("\n方法3: 从 DOM 元素提取数据...")
# 尝试查找运单号元素
waybill_elements = page.eles('xpath=//*[contains(text(), "运单号") or contains(text(), "运单")]')
for elem in waybill_elements:
text = elem.text
parent_text = elem.parent().text if elem.parent() else ""
full_text = text + " " + parent_text
# 从文本中提取数字作为运单号
numbers = re.findall(r'\d{8,}', full_text)
if numbers and not result['waybill_no']:
result['waybill_no'] = numbers[0]
print(f"从元素文本中找到运单号: {result['waybill_no']}")
# 提取承运人
if '承运人' in text and not result['carrier']:
carrier_match = re.search(r'承运人[:\s]*([^\s<,]+)', full_text)
if carrier_match:
result['carrier'] = carrier_match.group(1).strip()
print(f"从元素文本中找到承运人: {result['carrier']}")
# 提取电话
if '电话' in text and not result['carrier_phone']:
phone_match = re.search(r'电话[:\s]*(\d+)', full_text)
if phone_match:
result['carrier_phone'] = phone_match.group(1)
print(f"从元素文本中找到电话: {result['carrier_phone']}")
# 提取物流跟踪信息(时间线)
print("\n提取物流跟踪信息...")
tracking_elements = page.eles('xpath=//*[contains(@class, "track") or contains(@class, "logistics") or contains(@class, "timeline")]')
if not tracking_elements:
# 尝试查找包含时间戳的元素
tracking_elements = page.eles('xpath=//*[contains(text(), "2025") or contains(text(), "货物") or contains(text(), "到达")]')
tracking_info = []
for elem in tracking_elements[:20]: # 限制数量
text = elem.text
if text and len(text) > 5:
# 尝试提取时间戳
time_match = re.search(r'(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})', text)
if time_match or any(keyword in text for keyword in ['货物', '到达', '揽收', '运输', '配送', '签收']):
tracking_info.append({
'text': text.strip(),
'time': time_match.group(1) if time_match else None
})
result['tracking_info'] = tracking_info[:10] # 最多保存10条
return result
except Exception as e:
print(f"提取物流信息时出错: {e}")
import traceback
traceback.print_exc()
return None
def extract_from_json(json_data):
"""
从 JSON 数据中提取物流信息
Args:
json_data: JSON 字典
Returns:
dict: 提取到的物流信息
"""
result = {}
def search_dict(d, key_patterns):
"""递归搜索字典中的值"""
if isinstance(d, dict):
for k, v in d.items():
# 检查键名
for pattern in key_patterns:
if re.search(pattern, k, re.IGNORECASE):
return v
# 递归搜索值
if isinstance(v, (dict, list)):
found = search_dict(v, key_patterns)
if found:
return found
elif isinstance(d, list):
for item in d:
found = search_dict(item, key_patterns)
if found:
return found
return None
# 搜索运单号
waybill = search_dict(json_data, [r'waybill', r'tracking.*number', r'运单号', r'waybillNo'])
if waybill:
result['waybill_no'] = str(waybill)
# 搜索承运人
carrier = search_dict(json_data, [r'carrier', r'承运人', r'carrierName'])
if carrier:
result['carrier'] = str(carrier)
# 搜索承运人电话
phone = search_dict(json_data, [r'carrier.*phone', r'承运人电话', r'carrierPhone', r'phone'])
if phone:
result['carrier_phone'] = str(phone)
# 搜索物流跟踪信息
tracking = search_dict(json_data, [r'track', r'logistics', r'物流', r'轨迹', r'history'])
if tracking:
if isinstance(tracking, list):
result['tracking_info'] = tracking
elif isinstance(tracking, dict):
result['tracking_info'] = [tracking]
return result if result else None
def print_result(result):
"""打印提取结果"""
if not result:
print("未能提取到物流信息")
return
print("\n" + "="*50)
print("物流信息提取结果:")
print("="*50)
print(f"运单号: {result.get('waybill_no', '未找到')}")
print(f"国内承运人: {result.get('carrier', '未找到')}")
print(f"国内承运人电话: {result.get('carrier_phone', '未找到')}")
if result.get('tracking_info'):
print(f"\n物流跟踪信息 (共 {len(result['tracking_info'])} 条):")
for idx, info in enumerate(result['tracking_info'], 1):
if isinstance(info, dict):
text = info.get('text', str(info))
time_str = info.get('time', '')
print(f" {idx}. {text}")
if time_str:
print(f" 时间: {time_str}")
else:
print(f" {idx}. {info}")
else:
print("\n物流跟踪信息: 未找到")
print("="*50)
# 主程序
if __name__ == '__main__':
# 测试 URL
tracking_url = "https://3.cn/2t-Iibig"
print("="*60)
print("京东物流信息提取工具 (Ubuntu 版本)")
print("="*60)
print(f"目标 URL: {tracking_url}")
print(f"无头模式: {'' if USE_HEADLESS else ''}")
print("开始提取物流信息...\n")
try:
result = extract_logistics_info(tracking_url)
except Exception as e:
print(f"\n❌ 执行过程中出错: {e}")
import traceback
traceback.print_exc()
result = None
if result:
print_result(result)
# 保存结果到文件
output_file = "logistics_result.json"
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
print(f"\n结果已保存到: {output_file}")
else:
print("提取失败")
print("\n脚本执行完成")