文章目录
发现宝藏
前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。【宝藏入口】。
一、 目标
爬取
https://news.berkeley.edu/
的字段,包含标题、内容,作者,发布时间,链接地址,文章快照 (可能需要翻墙才能访问)
二、简单分析网页
1. 寻找所有新闻
2. 分析模块、版面和文章
我们可以按照新闻模块、版面、和文章对网页信息进行拆分,分别按照步骤进行爬取
三、爬取新闻
1. 爬取模块
由于该新闻只有一个模块,所以直接请求该模块地址即可获取该模块的所有信息,但是为了兼容多模块的新闻,我们还是定义一个数组存储模块地址
classMitnewsScraper:def__init__(self, root_url, model_url, img_output_dir):
self.root_url = root_url
self.model_url = model_url
self.img_output_dir = img_output_dir
self.headers ={'Referer':'https://news.berkeley.edu/','User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ''Chrome/122.0.0.0 Safari/537.36','Cookie':'替换成你自己的',}...defrun():# 根路径
root_url ='https://news.berkeley.edu/'# 模块地址数组
model_urls =['https://news.berkeley.edu/news']# 文章图片保存路径
output_dir ='D://imgs//berkeley-news'for model_url in model_urls:
scraper = MitnewsScraper(root_url, model_url, output_dir)
scraper.catalogue_all_pages()if __name__ =="__main__":
run()
多模块的新闻网站例子如下(4个模块)
2. 爬取版面
- f12打开控制台,点击网络(network),通过切换页面观察接口的参数传递,发现只有一个page参数
- 于是我们可以获取页面下面的页数(page x of xxxx), 然后进行遍历传参,也就遍历获取了所有版面
# 获取一个模块有多少版面defcatalogue_all_pages(self):
response = requests.get(self.model_url, headers=self.headers)
soup = BeautifulSoup(response.text,'html.parser')try:match= re.search(r'of (\d+)', soup.text)
num_pages =int(match.group(1))print('模块一共有'+str(num_pages)+'页版面,')for page inrange(1, num_pages +1):
self.parse_catalogues(page)print(f"========Finished modeles page {page}========")except:returnFalse
- F12打开控制台后按照如下步骤获取版面列表对应的dom结构
catalogue_list = soup.find('div','filtered-items')
catalogues_list = catalogue_list.find_all('article')
- 遍历版面列表,获取版面标题
for index, catalogue inenumerate(catalogues_list):# 版面标题
catalogue_title = catalogue.find('div','news-item__description').find('a').get_text(strip=True)print('第'+str(index +1)+'个版面标题为:'+ catalogue_title)
- 获取版面更新时间和当下的操作时间
# 操作时间
date = datetime.now()# 更新时间
publish_time = catalogue.find('div','news-item__description').find('time').get('datetime')# 将日期字符串转换为datetime对象
updatetime = datetime.strptime(publish_time,'%Y-%m-%d')
- 保存版面url和版面id, 由于该新闻是一个版面对应一篇文章,所以版面url和文章url是一样的,而且文章没有明显的标识,我们把地址后缀作为文章id,版面id则是文章id后面加上个01, 为了避免标题重复也可以把日期前缀也加上去
# 版面url
catalogue_href = catalogue.find('div','news-item__description').find('a').get('href')
catalogue_url = self.root_url + catalogue_href
# 版面id
catalogue_id = catalogue_href[1:]print('第'+str(index +1)+'个版面地址为:'+ catalogue_url)
- 保存版面信息到mogodb数据库(由于每个版面只有一篇文章,所以版面文章数量cardsize的值赋为1)
# 连接 MongoDB 数据库服务器
client = MongoClient('mongodb://localhost:27017/')# 创建或选择数据库
db = client['berkeley-news']# 创建或选择集合
catalogues_collection = db['catalogues']# 插入示例数据到 catalogues 集合
catalogue_data ={'id': catalogue_id +'01','date': date,'title': catalogue_title,'url': catalogue_url,'cardSize':1,'updatetime': updatetime
}
3. 爬取文章
- 由于一个版面对应一篇文章,所以版面url 、更新时间、标题和文章是一样的,并且按照设计版面id和文章id的区别只是差了个01,所以可以传递版面url、版面id、更新时间和标题四个参数到解析文章的函数里面
- 获取文章id,文章url,文章更新时间和当下操作时间
# 解析版面defparse_catalogues(self, page):...
self.parse_cards_list(catalogue_url, catalogue_id, updatetime, catalogue_title)...# 解析文章defparse_cards_list(self, url, catalogue_id, updatetime, cardtitle):
card_response = requests.get(url, headers=self.headers)
soup = BeautifulSoup(card_response.text,'html.parser')
- 获取文章作者
# 文章作者
author = soup.find('a', href='/author/news').get_text()
- 获取文章原始htmldom结构,并删除无用的部分(以下仅是部分举例),用html_content字段保留原始dom结构
# 原始htmldom结构
html_dom = soup.find('div','single-post cb-section cb-stretch')# 标题上方的冗余
html_cut1 = html_dom.find('div','single-post__heading').find('strong')# 链接冗余
html_cut2 = html_dom.find_all('a','a2a_dd share-link')# 移除元素if html_cut1:
html_cut1.extract()if html_cut2:for item in html_cut2:
item.extract()
html_content = html_dom
- 进行文章清洗,保留文本,去除标签,用content保留清洗后的文本
# 解析文章列表里的文章defparse_cards_list(self, url, catalogue_id, cardupdatetime, cardtitle):...# 增加保留html样式的源文本
origin_html = html_dom.prettify()# String# 转义网页中的图片标签
str_html = self.transcoding_tags(origin_html)# 再包装成
temp_soup = BeautifulSoup(str_html,'html.parser')# 反转译文件中的插图
str_html = self.translate_tags(temp_soup.text)# 绑定更新内容
content = self.clean_content(str_html)# 工具 转义标签deftranscoding_tags(self, htmlstr):
re_img = re.compile(r'\s*<(img.*?)>\s*', re.M)
s = re_img.sub(r'\n @@##\1##@@ \n', htmlstr)# IMG 转义return s
# 工具 转义标签deftranslate_tags(self, htmlstr):
re_img = re.compile(r'@@##(img.*?)##@@', re.M)
s = re_img.sub(r'<\1>', htmlstr)# IMG 转义return s
# 清洗文章defclean_content(self, content):if content isnotNone:
content = re.sub(r'\r',r'\n', content)
content = re.sub(r'\n{2,}','', content)
content = re.sub(r' {6,}','', content)
content = re.sub(r' {3,}\n','', content)
content = re.sub(r'<img src="../../../image/zxbl.gif"/>','', content)
content = content.replace('<img border="0" src="****处理标记:[Article]时, 字段 [SnapUrl] 在数据源中没有找到! ****"/> ','')
content = content.replace(''' <!--/enpcontent<INPUT type=checkbox value=0 name=titlecheckbox sourceid="<Source>SourcePh " style="display:none">''','') \
.replace(' <!--enpcontent','').replace('<TABLE>','')
content = content.replace('<P>','').replace('<\P>','').replace(' ',' ')return content
- 下载保存图片
defparse_cards_list(self, url, catalogue_id, cardupdatetime, cardtitle):...
imgs =[]
img_array = soup.find('figure','cb-image cb-float--none cb-float--none--md cb-float--none--lg cb-100w cb-100w--md cb-100w--lg new-figure').find_all('img')for item in img_array:
img_url = item.get('src')
imgs.append(img_url)iflen(imgs)!=0:# 下载图片
illustrations = self.download_images(imgs, card_id)# 下载图片defdownload_images(self, img_urls, card_id):
result = re.search(r'[^/]+$', card_id)
last_word = result.group(0)# 根据card_id创建一个新的子目录
images_dir = os.path.join(self.img_output_dir,str(last_word))ifnot os.path.exists(images_dir):
os.makedirs(images_dir)
downloaded_images =[]for index, img_url inenumerate(img_urls):try:
response = requests.get(img_url, stream=True, headers=self.headers)if response.status_code ==200:# 从URL中提取图片文件名
img_name_with_extension = img_url.split('/')[-1]
pattern =r'^[^?]*'match= re.search(pattern, img_name_with_extension)
img_name =match.group(0)# 保存图片withopen(os.path.join(images_dir, img_name),'wb')as f:
f.write(response.content)
downloaded_images.append([img_url, os.path.join(images_dir, img_name)])except requests.exceptions.RequestException as e:print(f'请求图片时发生错误:{e}')except Exception as e:print(f'保存图片时发生错误:{e}')return downloaded_images
# 如果文件夹存在则跳过else:print(f'文章id为{card_id}的图片文件夹已经存在')return[]
- 保存文章数据
# 连接 MongoDB 数据库服务器
client = MongoClient('mongodb://localhost:27017/')# 创建或选择数据库
db = client['berkeley-news']# 创建或选择集合
cards_collection = db['cards']# 插入示例数据到 catalogues 集合
card_data ={'id': card_id,'catalogueId': catalogue_id,'type':'berkeley-news','date': date,'title': card_title,'author': author,'updatetime': updateTime,'url': url,'html_content':str(html_content),'content': content,'illustrations': illustrations,}
cards_collection.insert_one(card_data)
四、完整代码
import os
from datetime import datetime
import requests
from bs4 import BeautifulSoup
from pymongo import MongoClient
import re
import traceback
classMitnewsScraper:def__init__(self, root_url, model_url, img_output_dir):
self.root_url = root_url
self.model_url = model_url
self.img_output_dir = img_output_dir
self.headers ={'Referer':'https://news.berkeley.edu/','User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ''Chrome/122.0.0.0 Safari/537.36','Cookie':'替换成你自己的',}# 获取一个模块有多少版面defcatalogue_all_pages(self):
response = requests.get(self.model_url, headers=self.headers)
soup = BeautifulSoup(response.text,'html.parser')try:match= re.search(r'of (\d+)', soup.text)
num_pages =int(match.group(1))print('模块一共有'+str(num_pages)+'页版面')for page inrange(1, num_pages +1):print(f"========start catalogues page {page}"+"/"+str(num_pages)+"========")
self.parse_catalogues(page)print(f"========Finished catalogues page {page}"+"/"+str(num_pages)+"========")except Exception as e:print(f'Error: {e}')
traceback.print_exc()# 解析版面列表里的版面defparse_catalogues(self, page):
params ={'page': page}
response = requests.get(self.model_url, params=params, headers=self.headers)if response.status_code ==200:
soup = BeautifulSoup(response.text,'html.parser')
catalogue_list = soup.find('div','filtered-items')
catalogues_list = catalogue_list.find_all('article')for index, catalogue inenumerate(catalogues_list):print(f"========start catalogue {index+1}"+"/"+"10========")# 版面标题
catalogue_title = catalogue.find('div','news-item__description').find('a').get_text(strip=True)# 操作时间
date = datetime.now()# 更新时间
publish_time = catalogue.find('div','news-item__description').find('time').get('datetime')# 将日期字符串转换为datetime对象
updatetime = datetime.strptime(publish_time,'%Y-%m-%d')# 版面url
catalogue_href = catalogue.find('div','news-item__description').find('a').get('href')
catalogue_url = self.root_url + catalogue_href
# 版面id
catalogue_id = catalogue_href[1:]
self.parse_cards_list(catalogue_url, catalogue_id, updatetime, catalogue_title)# 连接 MongoDB 数据库服务器
client = MongoClient('mongodb://localhost:27017/')# 创建或选择数据库
db = client['berkeley-news']# 创建或选择集合
catalogues_collection = db['catalogues']# 插入示例数据到 catalogues 集合
catalogue_data ={'id': catalogue_id,'date': date,'title': catalogue_title,'url': catalogue_url,'cardSize':1,'updatetime': updatetime
}# 在插入前检查是否存在相同id的文档
existing_document = catalogues_collection.find_one({'id': catalogue_id})# 如果不存在相同id的文档,则插入新文档if existing_document isNone:
catalogues_collection.insert_one(catalogue_data)print("[爬取版面]版面 "+ catalogue_url +" 已成功插入!")else:print("[爬取版面]版面 "+ catalogue_url +" 已存在!")print(f"========finsh catalogue {index+1}"+"/"+"10========")returnTrueelse:raise Exception(f"Failed to fetch page {page}. Status code: {response.status_code}")# 解析文章列表里的文章defparse_cards_list(self, url, catalogue_id, cardupdatetime, cardtitle):
url ='https://news.berkeley.edu/2024/03/05/meet-our-new-faculty-antoine-levy-economics'
card_response = requests.get(url, headers=self.headers)
soup = BeautifulSoup(card_response.text,'html.parser')# 对应的版面id
card_id = catalogue_id
# 文章标题
card_title = cardtitle
# 文章更新时间
updateTime = cardupdatetime
# 操作时间
date = datetime.now()# 文章作者try:
author = soup.find('a', href='/author/news').get_text()except:
author = soup.find('div','single-post__heading').find('p').find('a').get_text()# 原始htmldom结构
html_dom = soup.find('div','single-post cb-section cb-stretch')# 标题上方的冗余
html_cut1 = html_dom.find('div','single-post__heading').find('strong')# 链接冗余
html_cut2 = html_dom.find_all('a','a2a_dd share-link')# 移除元素if html_cut1:
html_cut1.extract()if html_cut2:for item in html_cut2:
item.extract()
html_content = html_dom
# 增加保留html样式的源文本
origin_html = html_dom.prettify()# String# 转义网页中的图片标签
str_html = self.transcoding_tags(origin_html)# 再包装成
temp_soup = BeautifulSoup(str_html,'html.parser')# 反转译文件中的插图
str_html = self.translate_tags(temp_soup.text)# 绑定更新内容
content = self.clean_content(str_html)# 下载图片
imgs =[]try:
img_array = soup.find('figure','cb-image cb-float--none cb-float--none--md cb-float--none--lg cb-100w cb-100w--md cb-100w--lg new-figure').find_all('img')except:
img_array = soup.find('div','container container--lg cb-container').find_all('img')iflen(img_array)isnotNone:for item in img_array:
img_url = item.get('src')if img_url isNone:
img_url = item.get('data-src')
imgs.append(img_url)iflen(imgs)!=0:# 下载图片
illustrations = self.download_images(imgs, card_id)# 连接 MongoDB 数据库服务器
client = MongoClient('mongodb://localhost:27017/')# 创建或选择数据库
db = client['berkeley-news']# 创建或选择集合
cards_collection = db['cards']# 插入示例数据到 cards 集合
card_data ={'id': card_id,'catalogueId': catalogue_id,'type':'berkeley-news','date': date,'title': card_title,'author': author,'updatetime': updateTime,'url': url,'html_content':str(html_content),'content': content,'illustrations': illustrations,}# 在插入前检查是否存在相同id的文档
existing_document = cards_collection.find_one({'id': card_id})# 如果不存在相同id的文档,则插入新文档if existing_document isNone:
cards_collection.insert_one(card_data)print("[爬取文章]文章 "+ url +" 已成功插入!")else:print("[爬取文章]文章 "+ url +" 已存在!")# 下载图片defdownload_images(self, img_urls, card_id):
result = re.search(r'[^/]+$', card_id)
last_word = result.group(0)# 根据card_id创建一个新的子目录
images_dir = os.path.join(self.img_output_dir,str(last_word))ifnot os.path.exists(images_dir):
os.makedirs(images_dir)
downloaded_images =[]for index, img_url inenumerate(img_urls):try:
response = requests.get(img_url, stream=True, headers=self.headers)if response.status_code ==200:# 从URL中提取图片文件名
img_name_with_extension = img_url.split('/')[-1]
pattern =r'^[^?]*'match= re.search(pattern, img_name_with_extension)
img_name =match.group(0)# 保存图片withopen(os.path.join(images_dir, img_name),'wb')as f:
f.write(response.content)
downloaded_images.append([img_url, os.path.join(images_dir, img_name)])print(f'[爬取文章图片]文章id为{card_id}的图片已保存到本地')except requests.exceptions.RequestException as e:print(f'请求图片时发生错误:{e}')except Exception as e:print(f'保存图片时发生错误:{e}')return downloaded_images
# 如果文件夹存在则跳过else:print(f'[爬取文章图片]文章id为{card_id}的图片文件夹已经存在')return[]# 工具 转义标签deftranscoding_tags(self, htmlstr):
re_img = re.compile(r'\s*<(img.*?)>\s*', re.M)
s = re_img.sub(r'\n @@##\1##@@ \n', htmlstr)# IMG 转义return s
# 工具 转义标签deftranslate_tags(self, htmlstr):
re_img = re.compile(r'@@##(img.*?)##@@', re.M)
s = re_img.sub(r'<\1>', htmlstr)# IMG 转义return s
# 清洗文章defclean_content(self, content):if content isnotNone:
content = re.sub(r'\r',r'\n', content)
content = re.sub(r'\n{2,}','', content)
content = re.sub(r' {6,}','', content)
content = re.sub(r' {3,}\n','', content)
content = re.sub(r'<img src="../../../image/zxbl.gif"/>','', content)
content = content.replace('<img border="0" src="****处理标记:[Article]时, 字段 [SnapUrl] 在数据源中没有找到! ****"/> ','')
content = content.replace(''' <!--/enpcontent<INPUT type=checkbox value=0 name=titlecheckbox sourceid="<Source>SourcePh " style="display:none">''','') \
.replace(' <!--enpcontent','').replace('<TABLE>','')
content = content.replace('<P>','').replace('<\P>','').replace(' ',' ')return content
defrun():# 根路径
root_url ='https://news.berkeley.edu/'# 模块地址数组
model_urls =['https://news.berkeley.edu/news']# 文章图片保存路径
output_dir ='D://imgs//berkeley-news'for model_url in model_urls:
scraper = MitnewsScraper(root_url, model_url, output_dir)
scraper.catalogue_all_pages()if __name__ =="__main__":
run()
五、效果展示
版权归原作者 东离与糖宝 所有, 如有侵权,请联系我们删除。