0


python+selenium爬虫自动化批量下载文件

一、项目需求

在一个业务网站有可以一个个打开有相关内容的文本,需要逐个保存为TXT,数据量是以千为单位,人工操作会麻木到崩溃。

二、解决方案

目前的基础办法就是使用python+selenium自动化来代替人工去操作,虽然效率比其他爬虫低,但是也防止被封IP的风险。也能满足项目的需求。准备工作,先从网站下载项目清单xls文件,里面会有对应的唯一识别码,就是编号。

三、写代码具体技术路线

  1. init一个初始化函数,把selenium的驱动相关参数设置好。

2.get_html函数,实现打开浏览器登录进入需要爬取的主页面。

3.excel_to_dict函数,实现将前面准备的xls文件转为字典。

4.read_leftover函数,实现将尚未下载和下载失败的清单单独提取出来。

5.text_write函数,实现把页面上的文本内容保存到指定路径下的TXT文件中。

6.download函数,实现从主页面进入需要爬取的页面,完成导出后,再恢复初始状态。

7.run函数,是把所有功能函数驱动起来,实现爬虫过程。

8.讲以上函数定义到一个类里去,实现代码规范化。

四、具体代码如下:

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
import time
import xlrd
from xlutils.copy import copy
import os

class Zbph_Spider:
    def __init__(self):
        self.url = 'https://******'
        self.options = webdriver.FirefoxOptions()
        # 设置无头模式
        # self.options.add_argument("--headless")
        self.options.add_argument("--disable-gpu")
        # 隐身模式(无痕模式)
        self.driver = webdriver.Firefox(options=self.options)

    def get_html(self):
        # 调用WebDriver 对象的get方法 可以让浏览器打开指定网址
        self.driver.get(self.url)
        # 设置最大等待时长为 3秒,全局的。
        self.driver.implicitly_wait(15)
        # 最大化窗口
        self.driver.maximize_window()
        # 设置为本台电脑最大的分辨率,不然有头模式能运行,无头模式将无法运行。
        self.driver.set_window_rect(x=0, y=0, width=1920, height=1080)
        # 把弹窗点击确认让其消失
        self.driver.find_element(By.XPATH, '//span[contains(text(),"确 定")]').click()
        # 输入账号
        self.driver.find_element(By.XPATH, '//input[@id="txtLoginId"]').send_keys('****')
        # 输入密码
        self.driver.find_element(By.XPATH, '//input[@id="txtPassword"]').send_keys('*****')
        # 输入行政区代码
        self.driver.find_element(By.XPATH, '//input[@id="txtXzqdm"]').send_keys('522700')
        # 点击获取(手机)验证码,然后等待页面提示弹窗出来
        self.driver.find_element(By.XPATH, '//input[@id="btn_getValidCode"]').click()
        time.sleep(2)
        # 切换到alert弹窗并点击确定
        self.driver.switch_to.alert.accept()
        sj_yzm = input('请输入手机验证码:')
        time.sleep(0.5)
        self.driver.find_element(By.XPATH, '//input[@id="CheckCode"]').send_keys(sj_yzm)
        self.driver.find_element(By.XPATH, '//div/input[@value="登  录"]').click()
        time.sleep(3)

    # 把excel内容读取转为字典的函数
    def excel_to_dict(self, path):
        # 打开excel表格
        data_excel = xlrd.open_workbook(path)
        # 获取book中的第一sheet工作表法,返回一个xlrd.sheet.Sheet()对象
        table = data_excel.sheets()[0]  # 通过索引顺序获取sheet
        keys = table.row_values(0)
        # 获取总行数
        rowNum = table.nrows
        # 获取总列数
        colNum = table.ncols
        # 需要读取的列,这里选择读取第二列,和第六列和第17列
        design_col_num = [0, 1, 5, 16]
        if rowNum <= 1:
            print("总行数小于1")
        else:
            r = []
            j = 1
            for i in range(rowNum - 1):
                s = {}
                # 从第二行获取对应的values值,方便下一步建立Keys使用
                values = table.row_values(j)
                for x in design_col_num:
                    s[keys[x]] = values[x]
                r.append(s)
                j += 1
        return r

    # 把下载失败的项目清单写入excel
    def append_to_excel(self, words, row, column, filename):
        # 打开excel
        word_book = xlrd.open_workbook(filename)
        # 把xlrw对象变成xlwt对象,压入内存,使之可编辑
        new_work_book = copy(word_book)
        # 把可编辑的第一张表拿到
        new_sheet = new_work_book.get_sheet(0)
        # 设置开始写入的行数
        # 把文本写入对应的单元格内,参数分别是行、列和值,行列数从0起算。
        new_sheet.write(row + 1, column, words)
        # 从内存中把修改后的数据保存覆盖原来的文件
        new_work_book.save(filename)
        # print('下载状态追加写入成功!')

    # 读取仍需下载的项目清单。通过删掉下载状态为”下载成功“的记录实现,下一步只针对未下载或者下载失败的条目进行操作
    def read_leftover(self, list):
        i = 0
        while i < len(list):
            if list[i]['下载状态'] == '下载成功':
                del list[i]
            else:
                i += 1
        return list

    # 创建txt文件并并写入文本
    def text_write(self, name, msg, directory):
        full_path = directory + '\\TXT\\' + name + '.txt'
        file = open(full_path, 'w')
        file.write(msg)
        file.close()

    # 调用项目清单逐个下载坐标
    def download(self, xmmc, directory):
        # 点击左侧功能列表
        if xmmc['项目阶段'] == '验收项目':
            self.driver.find_element(By.XPATH, '//li/div[@title="项目验收阶段"]').click()
            time.sleep(0.5)
            self.driver.find_element(By.XPATH, '//span[text()="验收项目查询"]').click()
            time.sleep(0.5)
        else:
            self.driver.find_element(By.XPATH, '//li/div[@title="项目立项阶段"]').click()
            time.sleep(0.5)
            self.driver.find_element(By.XPATH, '//span[text()="立项项目查询"]').click()
            time.sleep(0.5)
        # 切入默认框架
        self.driver.switch_to.parent_frame()
        # 切入页面内嵌的第一层框架
        self.driver.switch_to.frame('u0')
        # 清除并输入新的备案编号
        self.driver.find_element(By.XPATH, '//*[@id="inp_xmbh"]').clear()
        self.driver.find_element(By.XPATH, '//*[@id="inp_xmbh"]').send_keys(xmmc['备案编号'])
        time.sleep(1)
        # 点击查询
        self.driver.find_element(By.XPATH, '//*[@id="btn_query"]').click()
        time.sleep(2.5)
        # 显性等待,等查询结果第一条是被查询对象,一直等到后面的条件成立(能定位)。
        WebDriverWait(self.driver, 20).until(
            EC.presence_of_element_located((By.XPATH, "//table/tbody/tr[1]/td[3][text()='" + xmmc['备案编号'] + "']")))
        # 定位到项目名称并双击
        element = self.driver.find_element(By.XPATH, '//*[@id="tlw_list_table"]/tbody/tr[1]/td[4]/input')
        ActionChains(self.driver).double_click(element).perform()
        time.sleep(1)
        # 等待新增耕地坐标按钮出现
        WebDriverWait(self.driver, 20).until(EC.element_to_be_clickable((By.XPATH, '//div[(text()="新增耕地坐标")]')))
        # 点击新增耕地坐标,使用JS脚本来点击,能解决需要点击的要素被遮挡无法点击的情况。
        xzgdzb = self.driver.find_element(By.XPATH, '//div[(text()="新增耕地坐标")]')
        self.driver.execute_script('arguments[0].click()', xzgdzb)
        time.sleep(3)
        # 切入内嵌的第二层框架
        self.driver.switch_to.frame('iframe_tab')
        time.sleep(0.2)
        dc_zz = self.driver.find_element(By.XPATH, '//div[(text()="导出")]')
        self.driver.execute_script('arguments[0].click()', dc_zz)
        time.sleep(0.5)
        # 切换到最新打开的标签页
        self.driver.switch_to.window(self.driver.window_handles[-1])
        try:
            # 获取页面文本
            words = self.driver.find_element(By.XPATH, '/html/body/pre').text
            # 调用写入txt函数把页面文本内容写入txt文件
            self.text_write(xmmc['备案编号'], words, directory)
            time.sleep(0.2)
            # 关闭当前标签页
            self.driver.close()
            print('---下载成功---')
            self.append_to_excel('下载成功', int(xmmc['序号']) - 1, 16, 'demo.xls')
        except:
            print('---下载失败---')
            self.append_to_excel('下载失败', int(xmmc['序号'] - 1), 16, 'demo.xls')

        self.driver.switch_to.window(self.driver.window_handles[0])
        # 切入页面内嵌的第一层框架
        self.driver.switch_to.frame('u0')
        # 点击返回
        self.driver.find_element(By.XPATH, '//div[text()="返回"]').click()
        time.sleep(0.5)
        # 把左侧列表返回默认状态
        if xmmc['项目阶段'] == '验收项目':
            # 退出框架,回到主页面
            self.driver.switch_to.parent_frame()
            self.driver.find_element(By.XPATH, '//li/div[@title="项目验收阶段"]').click()
            time.sleep(0.2)
        else:
            # 退出框架,回到主页面
            self.driver.switch_to.parent_frame()
            self.driver.find_element(By.XPATH, '//li/div[@title="项目立项阶段"]').click()
            time.sleep(0.2)

    # 驱动函数
    def run(self, path):
        # 调用读取xls函数读取项目清单
        list = self.excel_to_dict(path)
        new_list = self.read_leftover(list)
        # 打开、登录进入内部的页面
        self.get_html()
        # 调用下载函数根据项目清单依次下载所有项目坐标
        directory = os.path.dirname(path)  # 提取xls表格所在文件夹
        i = 1
        numbers_sum = len(new_list)
        for Id in range(numbers_sum):
            print("{}/{}  {}".format(i, numbers_sum, new_list[Id]['备案编号']))
            self.download(new_list[Id], directory)
            i += 1
        print(20*"-", '下载完毕', 20*"-")
        # 下载完毕后退出驱动
        self.driver.quit()

# 主函数
if __name__ == '__main__':
    # 调用读取xls函数读取项目清单
    Path = r"D:\demo\批量下载\demo.xls"
    spider = Zbph_Spider()
    spider.run(Path)
标签: python 爬虫 selenium

本文转载自: https://blog.csdn.net/seattle2009/article/details/128738991
版权归原作者 @_简单就好 所有, 如有侵权,请联系我们删除。

“python+selenium爬虫自动化批量下载文件”的评论:

还没有评论