import time import random import json import threading from flask import Flask, request, jsonify from DrissionPage import ChromiumPage, ChromiumOptions from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker # =================== 配置部分 =================== # 浏览器路径(请根据本地实际路径修改) CHROME_PATH = r'C:\Program Files\Google\Chrome\Application\chrome.exe' # MySQL 配置 db_config = { "host": "192.168.8.88", "port": 3306, "user": "root", "password": "mysql_7sjTXH", # 修改为你的密码 "database": "jd" } # 初始化 Flask 应用 app = Flask(__name__) # 初始化锁 fetch_lock = threading.Lock() # 初始化数据库连接 db_url = f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['database']}?charset=utf8mb4" engine = create_engine(db_url, echo=False) Session = sessionmaker(bind=engine) Base = declarative_base() # 定义评论模型 class Comment(Base): __tablename__ = 'comments' id = Column(Integer, primary_key=True) product_id = Column(String(50), nullable=False) user_name = Column(String(100)) comment_text = Column(Text) comment_id = Column(String(100)) picture_urls = Column(Text) # 存储 JSON 字符串 created_at = Column(DateTime, default=time.strftime('%Y-%m-%d %H:%M:%S')) comment_date = Column(DateTime, default=time.strftime('%Y-%m-%d %H:%M:%S')) # 创建表(如果不存在) Base.metadata.create_all(engine) # =================== 核心爬虫函数 =================== # 全局浏览器实例(只初始化一次) global_page = None def get_global_browser(): global global_page if global_page is None: options = ChromiumOptions() options.set_browser_path(CHROME_PATH) global_page = ChromiumPage(options) return global_page def fetch_jd_comments(product_id): page = get_global_browser() # 使用全局浏览器 try: # 打开商品页面 page.get(f'https://item.jd.com/{product_id}.html#crumb-wrap') time.sleep(random.uniform(5, 8)) # 向下滚动主页面 page.scroll.down(150) time.sleep(random.uniform(3, 5)) # 点击“买家赞不绝口” element1 = page.ele('xpath=//div[contains(text(), "买家赞不绝口")]') if element1: element1.click() time.sleep(random.uniform(3, 5)) else: element1 = page.ele('xpath=//div[contains(text(), "好评率")]') if element1: element1.click() time.sleep(random.uniform(3, 5)) # 点击“当前商品” element2 = page.ele('xpath=//div[contains(text(), "当前商品")]') if element2: element2.click() time.sleep(random.uniform(3, 5)) # 定位弹窗区域 popup = page.ele('xpath=//*[@id="rateList"]/div/div[3]') if not popup: return [] # 点击“视频” element3 = page.ele('xpath=//div[contains(text(), "视频")]') if element3: element3.click() time.sleep(random.uniform(3, 5)) # 监听请求 page.listen.start('https://api.m.jd.com/client.action') max_retries = 10 # 最多尝试 5 次无新数据 retry_count = 0 new_comments = [] # 存储最终的新评论 seen_ids = set() # 已处理过的 comment_id while retry_count < max_retries and len(new_comments) < 10: scroll_amount = random.randint(10000, 100000) popup.scroll.down(scroll_amount) print(f"弹窗向下滚动了 {scroll_amount} 像素") time.sleep(random.uniform(3, 5)) resp = page.listen.wait(timeout=5) if resp and 'getCommentListPage' in resp.request.postData: json_data = resp.response.body if 'result' in json_data and 'floors' in json_data['result']: comment_floor = json_data['result']['floors'][2] if 'data' in comment_floor and isinstance(comment_floor['data'], list): batch_comments = comment_floor['data'] # 提取这批评论中的新评论 fresh_comments = [] for comment in batch_comments: comment_info = comment.get('commentInfo', {}) comment_id = comment_info.get('commentId', '') comment_score = comment_info.get('commentScore', '') # 获取评分字段 if not comment_id: continue # 只保留五星好评 if comment_score != '5': print(f"跳过非五星评论:{comment_id},评分为 {comment_score}") continue # 如果该评论已存在数据库或本次已收集,则跳过 exists_in_db = False if comment_id in seen_ids: exists_in_db = True else: session = Session() exists_in_db = session.query(Comment).filter_by(comment_id=comment_id).first() is not None session.close() if exists_in_db: print(f"评论已存在:{comment_id}") continue seen_ids.add(comment_id) fresh_comments.append(comment) if fresh_comments: print(f"本次获取到 {len(fresh_comments)} 条新评论") new_comments.extend(fresh_comments) retry_count = 0 # 有新数据,重置重试计数器 else: print("本次无新评论,继续滚动...") retry_count += 1 else: print("未找到有效的评论列表") retry_count += 1 else: print("返回数据结构异常") retry_count += 1 else: print("未捕获到新的评论数据,继续滚动...") retry_count += 1 print(f"共抓取到 {len(new_comments)} 条新评论(最多需要10条)") return new_comments[:10] # 只保留前10条 except Exception as e: print("发生错误:", e) return [] # =================== 提取评论并保存到数据库 =================== def save_comments_to_db(product_id, comments): session = Session() try: for comment in comments: comment_info = comment.get('commentInfo', {}) comment_id = comment_info.get('commentId', '') # 如果 comment_id 为空,跳过这条评论 if not comment_id: print("跳过无 comment_id 的评论") continue # 检查是否已存在该评论 exists = session.query(Comment).filter_by(comment_id=comment_id).first() if exists: print(f"评论已存在:{comment_id}") continue # 提取其他字段 user_name = comment_info.get('userNickName', '匿名用户') comment_text = comment_info.get('commentData', '无评论内容') product_id = comment_info.get('productId', product_id) picture_list = comment_info.get('pictureInfoList', []) comment_date = comment_info.get('commentDate', '') picture_urls = [pic.get('largePicURL') for pic in picture_list if pic.get('largePicURL')] new_comment = Comment( product_id=product_id, user_name=user_name, comment_text=comment_text, comment_id=comment_id, picture_urls=json.dumps(picture_urls, ensure_ascii=False), comment_date=comment_date ) session.add(new_comment) session.commit() except Exception as e: session.rollback() print("保存失败:", e) finally: session.close() # =================== Flask API 接口 =================== @app.route('/fetch_comments', methods=['POST']) def fetch_comments(): product_id = request.args.get('product_id') if not product_id: return jsonify({"error": "缺少 product_id"}), -200 try: with fetch_lock: # 加锁,防止并发调用 comments = fetch_jd_comments(product_id) if not comments: return jsonify({"message": "未获取到评论数据"}), -200 save_comments_to_db(product_id, comments) return jsonify({ "message": f"成功保存 {len(comments)} 条评论", "product_id": product_id }), 200 except Exception as e: return jsonify({"error": str(e)}), -200 # =================== 启动服务 =================== if __name__ == '__main__': try: app.run(host='0.0.0.0', port=5000, debug=True) finally: if 'global_page' in globals() and global_page: global_page.quit() print("浏览器已关闭")