Files
tb_pl/jd/jd.py
2025-08-13 16:02:21 +08:00

263 lines
9.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import random
import json
import threading
from flask import Flask, request, jsonify
from DrissionPage import ChromiumPage, ChromiumOptions
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# =================== 配置部分 ===================
# 浏览器路径(请根据本地实际路径修改)
CHROME_PATH = r'C:\Program Files\Google\Chrome\Application\chrome.exe'
# MySQL 配置
db_config = {
"host": "192.168.8.88",
"port": 3306,
"user": "root",
"password": "mysql_7sjTXH", # 修改为你的密码
"database": "jd"
}
# 初始化 Flask 应用
app = Flask(__name__)
# 初始化锁
fetch_lock = threading.Lock()
# 初始化数据库连接
db_url = f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['database']}?charset=utf8mb4"
engine = create_engine(db_url, echo=False)
Session = sessionmaker(bind=engine)
Base = declarative_base()
# 定义评论模型
class Comment(Base):
__tablename__ = 'comments'
id = Column(Integer, primary_key=True)
product_id = Column(String(50), nullable=False)
user_name = Column(String(100))
comment_text = Column(Text)
comment_id = Column(String(100))
picture_urls = Column(Text) # 存储 JSON 字符串
created_at = Column(DateTime, default=time.strftime('%Y-%m-%d %H:%M:%S'))
comment_date = Column(DateTime, default=time.strftime('%Y-%m-%d %H:%M:%S'))
# 创建表(如果不存在)
Base.metadata.create_all(engine)
# =================== 核心爬虫函数 ===================
# 全局浏览器实例(只初始化一次)
global_page = None
def get_global_browser():
global global_page
if global_page is None:
options = ChromiumOptions()
options.set_browser_path(CHROME_PATH)
global_page = ChromiumPage(options)
return global_page
def fetch_jd_comments(product_id):
page = get_global_browser() # 使用全局浏览器
try:
# 打开商品页面
page.get(f'https://item.jd.com/{product_id}.html#crumb-wrap')
time.sleep(random.uniform(5, 8))
# 向下滚动主页面
page.scroll.down(150)
time.sleep(random.uniform(3, 5))
# 点击“买家赞不绝口”
element1 = page.ele('xpath=//div[contains(text(), "买家赞不绝口")]')
if element1:
element1.click()
time.sleep(random.uniform(3, 5))
else:
element1 = page.ele('xpath=//div[contains(text(), "好评率")]')
if element1:
element1.click()
time.sleep(random.uniform(3, 5))
# 点击“当前商品”
element2 = page.ele('xpath=//div[contains(text(), "当前商品")]')
if element2:
element2.click()
time.sleep(random.uniform(3, 5))
# 定位弹窗区域
popup = page.ele('xpath=//*[@id="rateList"]/div/div[3]')
if not popup:
return []
# 点击“视频”
element3 = page.ele('xpath=//div[contains(text(), "视频")]')
if element3:
element3.click()
time.sleep(random.uniform(3, 5))
# 监听请求
page.listen.start('https://api.m.jd.com/client.action')
max_retries = 10 # 最多尝试 5 次无新数据
retry_count = 0
new_comments = [] # 存储最终的新评论
seen_ids = set() # 已处理过的 comment_id
while retry_count < max_retries and len(new_comments) < 10:
scroll_amount = random.randint(10000, 100000)
popup.scroll.down(scroll_amount)
print(f"弹窗向下滚动了 {scroll_amount} 像素")
time.sleep(random.uniform(3, 5))
resp = page.listen.wait(timeout=5)
if resp and 'getCommentListPage' in resp.request.postData:
json_data = resp.response.body
if 'result' in json_data and 'floors' in json_data['result']:
comment_floor = json_data['result']['floors'][2]
if 'data' in comment_floor and isinstance(comment_floor['data'], list):
batch_comments = comment_floor['data']
# 提取这批评论中的新评论
fresh_comments = []
for comment in batch_comments:
comment_info = comment.get('commentInfo', {})
comment_id = comment_info.get('commentId', '')
comment_score = comment_info.get('commentScore', '') # 获取评分字段
if not comment_id:
continue
# 只保留五星好评
if comment_score != '5':
print(f"跳过非五星评论:{comment_id},评分为 {comment_score}")
continue
# 如果该评论已存在数据库或本次已收集,则跳过
exists_in_db = False
if comment_id in seen_ids:
exists_in_db = True
else:
session = Session()
exists_in_db = session.query(Comment).filter_by(comment_id=comment_id).first() is not None
session.close()
if exists_in_db:
print(f"评论已存在:{comment_id}")
continue
seen_ids.add(comment_id)
fresh_comments.append(comment)
if fresh_comments:
print(f"本次获取到 {len(fresh_comments)} 条新评论")
new_comments.extend(fresh_comments)
retry_count = 0 # 有新数据,重置重试计数器
else:
print("本次无新评论,继续滚动...")
retry_count += 1
else:
print("未找到有效的评论列表")
retry_count += 1
else:
print("返回数据结构异常")
retry_count += 1
else:
print("未捕获到新的评论数据,继续滚动...")
retry_count += 1
print(f"共抓取到 {len(new_comments)} 条新评论最多需要10条")
return new_comments[:10] # 只保留前10条
except Exception as e:
print("发生错误:", e)
return []
# =================== 提取评论并保存到数据库 ===================
def save_comments_to_db(product_id, comments):
session = Session()
try:
for comment in comments:
comment_info = comment.get('commentInfo', {})
comment_id = comment_info.get('commentId', '')
# 如果 comment_id 为空,跳过这条评论
if not comment_id:
print("跳过无 comment_id 的评论")
continue
# 检查是否已存在该评论
exists = session.query(Comment).filter_by(comment_id=comment_id).first()
if exists:
print(f"评论已存在:{comment_id}")
continue
# 提取其他字段
user_name = comment_info.get('userNickName', '匿名用户')
comment_text = comment_info.get('commentData', '无评论内容')
product_id = comment_info.get('productId', product_id)
picture_list = comment_info.get('pictureInfoList', [])
comment_date = comment_info.get('commentDate', '')
picture_urls = [pic.get('largePicURL') for pic in picture_list if pic.get('largePicURL')]
new_comment = Comment(
product_id=product_id,
user_name=user_name,
comment_text=comment_text,
comment_id=comment_id,
picture_urls=json.dumps(picture_urls, ensure_ascii=False),
comment_date=comment_date
)
session.add(new_comment)
session.commit()
except Exception as e:
session.rollback()
print("保存失败:", e)
finally:
session.close()
# =================== Flask API 接口 ===================
@app.route('/fetch_comments', methods=['POST'])
def fetch_comments():
product_id = request.args.get('product_id')
if not product_id:
return jsonify({"error": "缺少 product_id"}), -200
try:
with fetch_lock: # 加锁,防止并发调用
comments = fetch_jd_comments(product_id)
if not comments:
return jsonify({"message": "未获取到评论数据"}), -200
save_comments_to_db(product_id, comments)
return jsonify({
"message": f"成功保存 {len(comments)} 条评论",
"product_id": product_id
}), 200
except Exception as e:
return jsonify({"error": str(e)}), -200
# =================== 启动服务 ===================
if __name__ == '__main__':
try:
app.run(host='0.0.0.0', port=5000, debug=True)
finally:
if 'global_page' in globals() and global_page:
global_page.quit()
print("浏览器已关闭")