0


揭秘数据抓取:用Selenium+Requests打造高效并发爬虫!

自动化Web数据抓取与处理

引言

自动化Web数据抓取在现代信息化系统中具有重要意义,能够高效、批量地获取所需数据,极大减少人工成本。它的应用场景广泛,比如:

  1. 数据聚合:比如电商平台上商品价格的波动、社交媒体的用户评论,通过自动化抓取可以实现快速的数据收集与聚合,帮助企业进行分析与决策。
  2. 竞争情报:实时跟踪竞争对手的网站变动,获取最新的产品、价格、促销等信息,帮助制定更具竞争力的策略。
  3. 内容监控:在金融、新闻等领域,自动化抓取技术可以实时监控特定网站内容更新,帮助决策者及时掌握动态信息。
  4. 学术和研究:研究人员可以通过抓取公开数据,获取社会网络关系、经济走势等大规模数据,进行进一步分析。
  5. 系统集成:在内网或者其他受限的环境中,自动化抓取可以代替人工访问,定时将信息采集并同步到本地数据库,实现自动化的数据更新。

本文将介绍一个自动化脚本,能够解决从目标网站中高效抓取数据的问题。从而是这些数据得到应用,该脚本的功能和目标包括:

  1. 模拟登录:通过Selenium模拟用户在网页上的登录操作,绕过手动输入验证码、账号密码等繁琐过程,自动化获取访问权限。
  2. 获取关键参数:在成功登录后,抓取目标请求所需的参数ID,该ID是后续请求中必需的,通常由登录成功后的页面或接口响应中提供。
  3. 并发请求获取数据:使用requests库结合线程池机制,通过获取的登录信息和参数ID,批量发送请求,获取目标数据。线程池的引入大幅提升了请求的效率,确保在短时间内完成大量数据的抓取任务。
  4. 数据存储:将通过requests抓取到的目标信息格式化处理后,存储为CSV文件,方便后续的数据分析与处理。

该脚本的目标是实现自动化、并发、稳定的数据抓取流程,减少人工操作,同时确保数据的及时性与准确性。

环境准备

在实现该自动化数据抓取脚本时,所需的关键Python库和工具如下:

  1. Selenium 4.0.0:用于模拟浏览器操作,执行自动化登录和页面交互。 安装方式:pip install selenium==4.0.0
  2. Selenium-Wire 4.5.6:用于捕获和修改浏览器请求的库,方便抓取请求参数和ID。 安装方式:pip install selenium-wire==4.5.6
  3. Pandas 1.3.5:用于数据处理和存储,最终将抓取到的数据导出为CSV文件。 安装方式:pip install pandas==1.3.5
  4. Requests 2.28.1:用于在登录成功后发送HTTP请求,获取目标数据。 安装方式:pip install requests==2.28.1
  5. 谷歌浏览器 79.0.3945.36:配合Selenium进行自动化操作,需要确保浏览器版本与驱动版本匹配。
  6. Chrome驱动 79.0.3945.36:Chrome浏览器的驱动程序,用于Selenium与浏览器交互。需要下载对应版本的驱动,并配置在系统路径中,或者指定驱动路径。

因为我的目标运行机器是一台win7,所以计划开发完成后打包为。exe,因此为了兼容win7,此处Python选择了3.7,并使用上述版本的第三方库,在使用这些工具时,确保浏览器版本、驱动版本与Selenium版本一致,避免兼容性问题。关于如何配置selenium驱动和浏览器版本的方式再次不多赘述,详情可参考之前的博客。

配置文件读取

在自动化数据抓取过程中,使用配置文件来管理常用参数(如登录信息、请求URL、保存路径等)是一种非常方便的方式。Python的

json

模块可以帮助读取和解析配置文件,保证代码的灵活性与可维护性。通过以下步骤,我们可以使用

json

模块读取配置文件:

配置文件的格式

首先,假设有一个

config.json

文件,用于存储一些关键参数,例如:

{"login_url":"http://example.com/login","request_url":"http://example.com/data","save_path":"output/data.csv","username":"admin","password":"password123"}
示例代码

以下是一个读取配置文件的示例函数``:

import json
import time

defread_config():# 尝试读取config.json文件,不指定编码try:withopen('config.json','r')as f:
            config_info = json.load(f)except UnicodeDecodeError:# 如果默认编码读取失败,则尝试使用utf-8编码withopen('config.json','r', encoding='utf-8')as f:
            config_info = json.load(f)# 处理JSON, 将save_path加上时间后缀
    config_info["save_path"]= config_info["save_path"][:-4]+"_"+ time.strftime("%m-%d-%H-%M-%S", time.localtime())+".csv"return config_info

config = read_config()
代码说明:
  1. 读取文件with open('config.json', 'r') 读取配置文件,如果文件的编码格式默认无法识别,则捕获UnicodeDecodeError异常,并改用utf-8编码重读文件。
  2. 处理数据:在成功读取后,解析出的JSON对象会存储为字典config_info
  3. 动态生成保存路径:脚本会将save_path后缀(即.csv前)加上当前的时间戳,确保每次运行时保存的文件不被覆盖。
  4. 返回配置:最后,返回处理后的配置字典,供后续代码使用。

通过

read_config()

函数,可以轻松地获取配置文件中的参数,并根据需要灵活使用。例如,登录时可以通过

config["username"]

config["password"]

获取用户名和密码。

日志记录

logging

模块用于记录程序的运行信息,便于调试和跟踪。通过以下代码创建一个日志记录器:

import logging

defcreat_logger():
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.WARNING)# 设置日志级别为WARNING
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    
    file_handler = logging.FileHandler('new_spider_power.log')# 日志写入文件
    file_handler.setFormatter(formatter)
    file_handler.encoding ='utf-8'# 设置日志文件编码为utf-8
    logger.addHandler(file_handler)return logger

logger = creat_logger()

该日志器将

WARNING

级别以上的日志写入

new_spider_power.log

文件,帮助记录运行中的重要事件和错误信息。

浏览器自动化

使用Selenium-Wire和WebDriver模拟浏览器操作

在自动化数据抓取任务中,Selenium与Selenium-Wire结合使用,能够模拟用户操作浏览器登录、导航和抓取数据。Selenium负责模拟浏览器行为,Selenium-Wire则可以拦截、查看HTTP请求和响应,从而帮助我们进一步分析网页的数据交互。本模块将详细介绍如何实现这些操作。

1. 初始化WebDriver

在进行任何浏览器操作之前,首先需要初始化WebDriver,用于控制浏览器。

from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait as wait
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options

defcreat_driver():# 创建 ChromeOptions 对象
    chrome_options = Options()# 添加实验性选项,排除 "enable-automation" 开关
    chrome_options.add_experimental_option("excludeSwitches",["enable-automation"])# 启用无头模式# chrome_options.add_argument("--headless")# 禁用自动化扩展
    chrome_options.add_experimental_option('useAutomationExtension',False)# 禁用 Blink 引擎中的 AutomationControlled 特性
    chrome_options.add_argument("disable-blink-features=AutomationControlled")# 创建 Service 对象
    ser = Service(executable_path=chrome_driver_path)# 初始化selenium-wire WebDriver,使用之前创建的 Service 对象和 ChromeOptions 对象
    driver = webdriver.Chrome(service=ser, options=chrome_options)

    driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined});")return driver
2. 使用Selenium模拟登录操作

模拟用户登录是许多抓取任务中的第一步,通过定位用户名、密码输入框及登录按钮,实现自动化登录。

from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
import time, random

deflogin(driver):global username, password
    driver.get('http://front.emss.js.sgcc.com.cn/cmn/login')# 访问登录页面try:# 切换到登录iframe
        iframe_element = wait.until(EC.frame_to_be_available_and_switch_to_it((By.ID,'loginFrame')))# 输入用户名
        username_input = wait.until(EC.visibility_of_element_located((By.XPATH,'//*[@id="yx_user_id"]')))
        actions = ActionChains(driver)
        actions.move_to_element(username_input).perform()for char in username:
            username_input.send_keys(char)
            time.sleep(random.uniform(0.1,0.3))# 随机延迟模拟真人输入# 输入密码
        password_input = driver.find_element(By.XPATH,'//*[@id="yx_pwd"]')for char in password:
            password_input.send_keys(char)
            time.sleep(random.uniform(0.1,0.3))# 点击登录按钮
        login_button = driver.find_element(By.XPATH,'****************')
        actions.move_to_element(login_button).click().perform()except TimeoutException as e:
        logger.error(f"登录超时: {e}")except Exception as e:
        logger.error(f"登录失败: {e}")

关键步骤

  • driver.get():访问目标登录页面。
  • EC.frame_to_be_available_and_switch_to_it():等待并切换到登录的iframe。
  • ActionChains:模拟鼠标移动到输入框,逐字符输入用户名和密码,并点击登录按钮。
3. 导航至目标页面并进行数据抓取

登录后,通常需要继续导航到特定页面,找到我们需要的元素,并抓取相关数据。以下代码展示了如何通过Selenium控制浏览器进行导航,并抓取数据。

defopen_user_search(driver):try:# 等待“客户管理”选项可见并点击
        Customer_management_label = wait.until(EC.visibility_of_element_located((By.XPATH,'//********')))
        actions = ActionChains(driver)
        actions.move_to_element(Customer_management_label).click().perform()# 点击“客户信息”和“客户360视图”选项
        Customer_information_label = driver.find_element(By.XPATH,'//*[@id="app"]********************************')
        actions.move_to_element(Customer_information_label).click().perform()
        Customer_360_view_label = driver.find_element(By.XPATH,'//*[@id="app"]*******************************8')
        actions.move_to_element(Customer_360_view_label).click().perform()# 切换到目标iframe,等待搜索框加载
        iframe_element = wait.until(EC.frame_to_be_available_and_switch_to_it((By.ID,"bf37cba102a5d79b29c9b21660536c7d47bfdb17dcb6c830bba1d76a0abc7a6d")))
        search_input = wait.until(EC.visibility_of_element_located((By.XPATH,'*******************************')))except TimeoutException as e:
        logger.error(f"导航或查找用户超时: {e}")except Exception as e:
        logger.error(f"查找用户失败: {e}")

关键步骤

  • wait.until():显式等待某些元素加载完成,确保页面加载完全再进行操作。
  • ActionChains:用于模拟用户点击页面中的多个选项,逐步导航到需要抓取数据的页面。
  • EC.frame_to_be_available_and_switch_to_it():切换至iframe内的元素,确保正确操作页面中的嵌套内容。

通过Selenium和Selenium-Wire,我们能够自动化执行复杂的浏览器操作,包括模拟登录、页面导航、抓取页面数据等步骤。上述代码展示了如何通过等待元素加载、切换iframe、模拟鼠标操作等手段,使得数据抓取更加稳定、可靠。在接下来的模块中,将会展示如何结合线程池和

requests

库进行高效的数据抓取和存储。

多线程与并发控制

使用
concurrent.futures.ThreadPoolExecutor

提高数据抓取效率

在抓取大量数据时,单线程操作往往效率低下,尤其是每个请求都涉及网络等待或需要处理大量用户数据。通过

concurrent.futures.ThreadPoolExecutor

,我们可以同时执行多个任务,从而显著提升数据抓取的效率。本模块将介绍如何使用线程池并发处理抓取任务。

1. 使用ThreadPoolExecutor启动并发任务
ThreadPoolExecutor

是Python内置的线程池管理工具,它允许我们指定工作线程的数量,并提交任务以便在不同的线程中并发执行。我们可以使用

submit()

方法,将需要并发执行的任务提交到线程池。

import concurrent.futures

defmain():# 创建一个最大工作线程数为5的线程池with concurrent.futures.ThreadPoolExecutor(max_workers=5)as executor:whileTrue:try:ifnot job_queue.empty():# 从任务队列中获取任务参数
                    user_data, date, compositeRate, assetNo = job_queue.get()# 提交任务到线程池进行异步处理
                    executor.submit(process_user_data, user_data, date, compositeRate, assetNo)else:# 如果队列为空,进行等待try:
                        time.sleep(60)ifnot job_queue.empty():continueelse:
                            time.sleep(20)ifnot job_queue.empty():continueexcept Exception as e:
                        logger.error(f"等待任务时出错,错误信息为: {e}")except Exception as e:
                logger.error(f"request主线程出错,错误信息为: {e}")continue

关键点

  • max_workers=5:指定线程池中的最大工作线程数为5,这意味着最多同时处理5个任务。
  • executor.submit():将抓取任务提交到线程池中执行,每个任务在独立的线程中并发运行。
  • 任务队列(job_queue :我们从任务队列中获取任务,任务完成后再继续获取下一个任务,这种机制保证了任务的顺序处理和高效分发。
2. 并发处理用户数据抓取任务
process_user_data

函数是每个线程执行的具体抓取任务。它负责处理单个用户的数据,包括获取用户ID、抓取目标数据、存储结果等操作。如果遇到网络或cookie问题,它还会自动重试。

defprocess_user_data(user_data, date, compositeRate, assetNo):global data_queue, cookies, token
    try:
        logger.info(f"尝试获取{user_data['户号']}的信息")
        retries =0while retries <2:try:# 获取用户ID
                meterid = get_id(assetNo=assetNo)
                sleep_time()# 模拟请求间隔# 获取用户的数据
                aim_data_list = get_data(user_data['户号'], meterid, date)break# 数据获取成功,跳出重试循环except Exception as e:
                logger.error(f"用户获取consid失败,错误信息为: {e}")
                retries +=1
                logger.warning(f"重试获取cookie,当前重试次数: {retries}")try:
                    headers = get_request_headers()if headers:
                        cookies = headers['Cookie']
                        token = headers['token']
                        logger.info("cookie重新获取成功")breakexcept Exception as e:
                    logger.error(f"cookie重新获取失败,错误信息为: {e}")if retries ==2:
                    logger.error("达到最大重试次数,consid获取失败")returnNone# 将抓取到的数据放入数据队列
        aim_data_dict ={item['dataDate']: item['pap']for item in aim_data_list}
        data_queue.put((aim_data_dict, date, user_data['户号'], compositeRate))
        sleep_time()# 模拟请求间隔
        logger.info(f"用户{user_data['户号']}的{date}的数据已获取")try:
            save_data()# 保存抓取到的数据
            logger.info(f"用户{user_data['户号']}的{date}的数据已保存")except Exception as e:
            logger.error(f"用户{user_data['户号']}的{date}的数据保存失败,错误信息为: {e}")except Exception as e:
        logger.error(f"用户{user_data['户号']}的{date}的数据获取失败,错误信息为: {e}")

关键点

  • 重试机制:每次获取用户ID和数据时,如果失败,会重试两次。如果重试仍然失败,会记录错误并跳过该任务。
  • 并发抓取数据:多个线程可以同时调用process_user_data,提高了数据抓取的效率。
  • 保存数据:抓取到的数据在每个线程内被保存至CSV文件,确保数据不会丢失。
3. 线程池的优势

相比单线程,使用

ThreadPoolExecutor

的主要优势在于:

  • 并发处理:通过线程池,可以同时发起多个请求,减少等待时间,极大提高任务处理速度。
  • 资源控制:通过max_workers可以控制并发线程数,避免因同时发起过多请求导致服务器阻塞或本地资源耗尽。
  • 异常捕获:每个线程独立运行,即使某个任务失败,也不会影响其他任务的执行。

通过

ThreadPoolExecutor

,我们可以并发处理大量数据抓取任务,极大提升了数据抓取的效率。在实际应用中,结合重试机制与任务队列管理,可以确保抓取任务的可靠性和稳定性。

数据保存

线程安全与锁的使用
defsave_data():global err_data, data_queue, config_info, save_flag
    if data_queue.qsize()>=50or save_flag:with lock:# 使用 with 语句自动获取和释放锁
            data_list =[]whilenot data_queue.empty():
                data_dict, date, userid, compositeRate = data_queue.get()
                data_dict["date"]= date
                data_dict["userid"]= userid
                data_dict["compositeRate"]= compositeRate
                data_list.append(data_dict)if data_list:
                aim_df = pd.DataFrame(data_list)else:return# 如果队列为空,则直接返回

            logger.debug(f"保存数据{len(aim_df)}条")
            logger.debug(f"{aim_df}")# 从队列中取出数据后直接释放锁,减小锁的颗粒度,以减少其他线程等待该线程io开销的时间try:if os.path.exists(config_info["save_path"]):# 如果文件存在,则以追加模式写入,不写入表头
                aim_df.to_csv(config_info["save_path"], mode='a', header=False, index=False)# save_len += len(aim_df)  # 更新保存的行数else:# 如果文件不存在,创建新的 CSV 文件并写入数据,包含表头
                aim_df.to_csv(config_info["save_path"], mode='w', header=True, index=False)# save_len = len(aim_df)  # 初始化保存的行数except Exception as e:
            logger.error(f"保存数据时出错: {e}")# 将数据保存到异常数据集里if err_data.empty:
                err_data = aim_df
            else:
                err_data = pd.concat([err_data, aim_df], ignore_index=True)else:pass
    data_queue.task_done()

在多线程数据处理场景中,线程安全是确保多个线程可以同时执行而不会产生数据冲突或破坏的数据一致性的关键。Python 的

threading.Lock

是实现线程安全的常用工具,通过使用锁,可以确保某一代码块在同一时刻只被一个线程执行,从而避免多个线程同时修改共享数据时引发的竞态条件(Race Condition)。

在上面的

save_data

函数中,使用

with lock

来确保数据的保存过程是线程安全的。当多个线程尝试向同一个 CSV 文件写入数据时,锁可以避免多个线程同时操作文件,确保数据的完整性和一致性。

锁的重要性:

  1. 避免数据竞争:如果没有锁,多个线程可能同时从 data_queue 中读取数据或同时写入文件,导致数据丢失或覆盖。
  2. 保障数据一致性:在数据保存时,加锁可以确保每次的写入操作是完整的,不会被其他线程打断。
  3. 降低等待时间:通过在锁内仅执行关键操作,尽量减少锁定时间。例如在该例中,数据从 data_queue 读取后就释放锁,其他线程无需等待 I/O 操作完成,从而提高了程序的并发效率。
with lock:# 锁定关键操作,防止竞态条件
    data_list =[]whilenot data_queue.empty():
        data_dict, date, userid, compositeRate = data_queue.get()# 将数据提取到 data_list 中

使用

with

语句管理锁是最佳实践,能够确保在执行完关键代码后自动释放锁,避免死锁或资源浪费的风险。

使用pandas将数据保存到CSV文件

在数据抓取过程中,数据需要高效地存储到文件中,尤其是在多线程环境下。

pandas

提供了强大的数据处理和保存功能,可以方便地将数据转换成

DataFrame

后写入 CSV 文件。这个过程不仅直观,而且性能较高,特别适合处理批量数据。

在代码中,通过

pandas.DataFrame.to_csv()

方法,将已经从队列中取出的数据保存为 CSV 格式。示例代码展示了如何使用此方法:

defsave_data():global err_data, data_queue, config_info, save_flag
    if data_queue.qsize()>=50or save_flag:# 当数据量达到一定规模或手动触发保存with lock:# 确保多线程情况下的数据保存过程是安全的
            data_list =[]whilenot data_queue.empty():
                data_dict, date, userid, compositeRate = data_queue.get()
                data_dict["date"]= date
                data_dict["userid"]= userid
                data_dict["compositeRate"]= compositeRate
                data_list.append(data_dict)if data_list:# 如果数据列表非空
                aim_df = pd.DataFrame(data_list)# 将数据列表转换为pandas DataFrameelse:return# 如果数据队列为空则返回
定期保存数据以避免数据丢失

在长时间的数据抓取任务中,避免数据丢失是非常重要的。可以通过定期保存数据来减少突发问题(如程序崩溃、网络中断)造成的损失。在这个例子中,使用

data_queue.qsize() >= 50

来判断是否需要保存数据,也可以通过设置一个

save_flag

来手动触发保存。

每当数据队列中的数据量达到一定规模,或达到其他预设的条件时,程序会调用

to_csv()

方法将数据写入文件:

try:if os.path.exists(config_info["save_path"]):# 如果文件已经存在# 追加模式写入,避免覆盖之前的数据
        aim_df.to_csv(config_info["save_path"], mode='a', header=False, index=False)else:# 创建新的 CSV 文件并写入数据,包含表头
        aim_df.to_csv(config_info["save_path"], mode='w', header=True, index=False)except Exception as e:
    logger.error(f"保存数据时出错: {e}")

定期保存数据的方式,不仅可以减少数据丢失的风险,还能为后续数据分析和处理提供方便的检查点。

总结

在本文中,我们介绍了如何通过结合 Selenium、requests 和线程池等技术高效地进行数据抓取。关键功能模块包括模拟浏览器登录获取所需的请求参数,通过线程池并发处理数据抓取任务,并使用 pandas 将抓取的数据定期保存为 CSV 文件。还讨论了如何使用线程锁来确保多线程环境下的线程安全,防止数据竞争和不一致问题。定期保存数据的策略则有效防止了突发状况导致的数据丢失风险。

通过这些技术的结合,能够大大提高数据抓取的效率和稳定性,确保在复杂的内网环境中,数据能够持续、可靠地获取并存储。

附录

-关于驱动下载和配置可参考下文:
使用Selenium和bs4进行Web数据爬取和自动化(爬取掘金首页文章列表)引言: Web 数据爬取和自动化已成为许多 - 掘金 (juejin.cn)


本文转载自: https://blog.csdn.net/kilig_CSM/article/details/142364512
版权归原作者 冷月半明 所有, 如有侵权,请联系我们删除。

“揭秘数据抓取:用Selenium+Requests打造高效并发爬虫!”的评论:

还没有评论