import time import random import json import threading from flask import Flask, request, jsonify from DrissionPage import ChromiumPage, ChromiumOptions from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime from sqlalchemy.orm import declarative_base, sessionmaker # =================== 配置部分 =================== # 浏览器路径(请根据本地实际路径修改) CHROME_PATH = r'C:\Program Files\Google\Chrome\Application\chrome.exe' # MySQL 配置 db_config = { "host": "192.168.8.88", "port": 3306, "user": "root", "password": "mysql_7sjTXH", # 修改为你的密码 "database": "jd" } # 初始化 Flask 应用 app = Flask(__name__) # 初始化锁 fetch_lock = threading.Lock() # 全局爬虫控制标志 crawler_running = False crawler_thread = None current_product_id = None # 当前“允许运行”的抓取任务 product_id(新请求会覆盖,旧线程检测到不匹配则退出) active_fetch_product_id = None # 初始化数据库连接 db_url = f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['database']}?charset=utf8mb4" engine = create_engine(db_url, echo=False) Session = sessionmaker(bind=engine) Base = declarative_base() # 定义评论模型 class Comment(Base): __tablename__ = 'comments' id = Column(Integer, primary_key=True) product_id = Column(String(50), nullable=False) user_name = Column(String(100)) comment_text = Column(Text) comment_id = Column(String(100)) picture_urls = Column(Text) # 存储 JSON 字符串 created_at = Column(DateTime, default=time.strftime('%Y-%m-%d %H:%M:%S')) comment_date = Column(DateTime, default=time.strftime('%Y-%m-%d %H:%M:%S')) # 创建表(如果不存在) Base.metadata.create_all(engine) # =================== 核心爬虫函数 =================== # 全局浏览器实例(只初始化一次) global_page = None def get_global_browser(): global global_page if global_page is None: options = ChromiumOptions() options.set_browser_path(CHROME_PATH) global_page = ChromiumPage(options) return global_page def _is_fetch_cancelled(product_id): """当前任务是否已被新请求取消(只保留最新请求的 product_id)""" global active_fetch_product_id return active_fetch_product_id is not None and active_fetch_product_id != product_id def fetch_jd_comments(product_id): global active_fetch_product_id page = get_global_browser() # 使用全局浏览器 try: # 打开商品页面 page.get(f'https://item.jd.com/{product_id}.html#crumb-wrap') time.sleep(random.uniform(5, 8)) if _is_fetch_cancelled(product_id): return 0 # 向下滚动主页面 page.scroll.down(150) time.sleep(random.uniform(3, 5)) if _is_fetch_cancelled(product_id): return 0 # 点击“买家赞不绝口” element1 = page.ele('xpath=//div[contains(text(), "买家赞不绝口")]') if element1: element1.click() time.sleep(random.uniform(3, 5)) else: element1 = page.ele('xpath=//div[contains(text(), "好评率")]') if element1: element1.click() time.sleep(random.uniform(3, 5)) if _is_fetch_cancelled(product_id): return 0 # 点击“当前商品” element2 = page.ele('xpath=//div[contains(text(), "当前商品")]') if element2: element2.click() time.sleep(random.uniform(3, 5)) if _is_fetch_cancelled(product_id): return 0 # 定位弹窗区域 popup = page.ele('xpath=//*[@id="rateList"]/div/div[3]') if not popup: return 0 # 点击“视频” element3 = page.ele('xpath=//div[contains(text(), "视频")]') if element3: element3.click() time.sleep(random.uniform(3, 5)) if _is_fetch_cancelled(product_id): return 0 # 监听请求 page.listen.start('https://api.m.jd.com/client.action') retry_count = 0 new_comments = [] # 存储最终的新评论 seen_ids = set() # 已处理过的 comment_id total_comments_saved = 0 # 总共保存的评论数 # 持续获取评论,直到被新请求取消或手动停止 while True: if _is_fetch_cancelled(product_id): print(f"[fetch_jd_comments] 商品 {product_id} 已被新请求取消,退出") break scroll_amount = random.randint(10000, 100000) popup.scroll.down(scroll_amount) print(f"弹窗向下滚动了 {scroll_amount} 像素") time.sleep(random.uniform(3, 5)) if _is_fetch_cancelled(product_id): break resp = page.listen.wait(timeout=5) if resp and 'getCommentListPage' in resp.request.postData: json_data = resp.response.body if 'result' in json_data and 'floors' in json_data['result']: comment_floor = json_data['result']['floors'][2] if 'data' in comment_floor and isinstance(comment_floor['data'], list): batch_comments = comment_floor['data'] # 提取这批评论中的新评论 fresh_comments = [] for comment in batch_comments: comment_info = comment.get('commentInfo', {}) comment_id = comment_info.get('commentId', '') comment_score = comment_info.get('commentScore', '') # 获取评分字段 if not comment_id: continue # 只保留五星好评 if comment_score != '5': print(f"跳过非五星评论:{comment_id},评分为 {comment_score}") continue # 如果该评论已存在数据库或本次已收集,则跳过 exists_in_db = False if comment_id in seen_ids: exists_in_db = True else: session = Session() exists_in_db = session.query(Comment).filter_by(comment_id=comment_id).first() is not None session.close() if exists_in_db: print(f"评论已存在:{comment_id}") continue seen_ids.add(comment_id) fresh_comments.append(comment) if fresh_comments: print(f"本次获取到 {len(fresh_comments)} 条新评论") new_comments.extend(fresh_comments) retry_count = 0 # 有新数据,重置重试计数器 # 立即保存这批评论到数据库 save_comments_to_db(product_id, fresh_comments) total_comments_saved += len(fresh_comments) print(f"已保存 {len(fresh_comments)} 条评论到数据库,总计保存 {total_comments_saved} 条评论") else: print("本次无新评论,继续滚动...") retry_count += 1 else: print("未找到有效的评论列表") retry_count += 1 else: print("返回数据结构异常") retry_count += 1 else: print("未捕获到新的评论数据,继续滚动...") retry_count += 1 if _is_fetch_cancelled(product_id): break print(f"爬虫已停止,共抓取到 {total_comments_saved} 条评论") return total_comments_saved except Exception as e: print("发生错误:", e) return 0 # =================== 持续爬虫后台运行函数 =================== def continuous_crawler(product_id): """持续爬取评论的后台函数""" global crawler_running try: print(f"开始持续爬取商品 {product_id} 的评论...") while crawler_running: result = fetch_jd_comments(product_id) if not crawler_running: break # 如果没有获取到数据,等待一段时间再继续 time.sleep(10) print(f"商品 {product_id} 的持续爬取已停止") except Exception as e: print(f"持续爬虫发生错误: {e}") crawler_running = False # =================== 提取评论并保存到数据库 =================== def save_comments_to_db(product_id, comments): session = Session() try: for comment in comments: comment_info = comment.get('commentInfo', {}) comment_id = comment_info.get('commentId', '') # 如果 comment_id 为空,跳过这条评论 if not comment_id: print("跳过无 comment_id 的评论") continue # 检查是否已存在该评论 exists = session.query(Comment).filter_by(comment_id=comment_id).first() if exists: print(f"评论已存在:{comment_id}") continue # 提取其他字段 user_name = comment_info.get('userNickName', '匿名用户') comment_text = comment_info.get('commentData', '无评论内容') product_id = comment_info.get('productId', product_id) picture_list = comment_info.get('pictureInfoList', []) comment_date = comment_info.get('commentDate', '') picture_urls = [pic.get('largePicURL') for pic in picture_list if pic.get('largePicURL')] new_comment = Comment( product_id=product_id, user_name=user_name, comment_text=comment_text, comment_id=comment_id, picture_urls=json.dumps(picture_urls, ensure_ascii=False), comment_date=comment_date ) session.add(new_comment) session.commit() except Exception as e: session.rollback() print("保存失败:", e) finally: session.close() # =================== Flask API 接口 =================== @app.route('/start_crawler', methods=['POST']) def start_crawler(): """启动持续爬虫""" global crawler_running, crawler_thread, current_product_id product_id = request.args.get('product_id') if not product_id: return jsonify({"error": "缺少 product_id"}), 400 if crawler_running: return jsonify({ "message": f"爬虫已在运行中,当前商品ID: {current_product_id}", "status": "already_running" }), 200 try: with fetch_lock: crawler_running = True current_product_id = product_id crawler_thread = threading.Thread(target=continuous_crawler, args=(product_id,)) crawler_thread.daemon = True crawler_thread.start() return jsonify({ "message": f"已启动持续爬虫,商品ID: {product_id}", "status": "started", "product_id": product_id }), 200 except Exception as e: crawler_running = False return jsonify({"error": str(e)}), 500 @app.route('/stop_crawler', methods=['POST']) def stop_crawler(): """停止持续爬虫""" global crawler_running, crawler_thread, current_product_id if not crawler_running: return jsonify({ "message": "爬虫未在运行", "status": "not_running" }), 200 try: with fetch_lock: crawler_running = False stopped_product_id = current_product_id current_product_id = None # 等待线程结束 if crawler_thread and crawler_thread.is_alive(): crawler_thread.join(timeout=10) return jsonify({ "message": f"已停止持续爬虫,商品ID: {stopped_product_id}", "status": "stopped", "product_id": stopped_product_id }), 200 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/crawler_status', methods=['GET']) def crawler_status(): """获取爬虫状态""" global crawler_running, current_product_id return jsonify({ "running": crawler_running, "product_id": current_product_id, "status": "running" if crawler_running else "stopped" }), 200 @app.route('/test', methods=['GET']) def test(): """测试端点,验证服务器是否正常工作""" print("测试端点被访问") return jsonify({"message": "服务器运行正常", "status": "ok"}), 200 @app.route('/fetch_comments', methods=['GET', 'POST']) def fetch_comments(): """单次获取评论(在后台运行,立即返回)。新请求会中断所有历史请求线程,只执行本次请求。""" global crawler_running, active_fetch_product_id print(f"[fetch_comments] 收到请求,方法: {request.method}, 参数: {request.args}") product_id = request.args.get('product_id') if not product_id: print("[fetch_comments] 错误: 缺少 product_id") return jsonify({"error": "缺少 product_id"}), 400 print(f"[fetch_comments] 开始处理商品ID: {product_id},将中断所有历史请求后执行") try: # 立刻中断所有历史:停止持续爬虫并标记“当前任务”为新 product_id,旧线程在循环中检测到会自行退出 with fetch_lock: crawler_running = False active_fetch_product_id = product_id def run_fetch(): try: print(f"[后台线程] 开始获取商品 {product_id} 的评论...") result = fetch_jd_comments(product_id) print(f"[后台线程] 获取完成,结果: {result}") except Exception as e: import traceback error_msg = f"后台获取评论时发生错误: {e}\n{traceback.format_exc()}" print(f"[后台线程] {error_msg}") fetch_thread = threading.Thread(target=run_fetch) fetch_thread.daemon = True fetch_thread.start() print(f"[fetch_comments] 后台线程已启动(历史请求已标记为取消)") response_data = { "message": f"已开始获取商品 {product_id} 的评论,正在后台运行中...(已中断之前的请求)", "status": "started", "product_id": product_id, "note": "评论获取在后台进行,请稍后查看数据库或使用 /crawler_status 查看状态" } print(f"[fetch_comments] 返回响应: {response_data}") return jsonify(response_data), 200 except Exception as e: import traceback error_msg = f"处理请求时发生错误: {e}\n{traceback.format_exc()}" print(f"[fetch_comments] {error_msg}") return jsonify({"error": str(e)}), 500 # =================== 启动服务 =================== if __name__ == '__main__': try: app.run(host='0.0.0.0', port=5008, debug=True) finally: if 'global_page' in globals() and global_page: global_page.quit() print("浏览器已关闭")