目录
背景
做rumor detection 用到了twitter15和twitter16数据集,里边只给了推文id和评论者的uid,想要爬取其他数据就只能自己动手。
我需要爬取推文评论用户在评论时间点前两个月的历史推文,然而这两个数据集都太老了,里边的数据都是13-14年的,所以用twitter API无法获取到(因为官方API只能爬用户最近3000条历史推文),因此只能用推特的搜索API来爬数据。
这篇文章给出了用推特搜索api的爬取过程,但是万恶的马斯克限制了搜索API的使用权限,现在只能登陆后才能调用搜索API了。
之前有过一段时间selenium使用经验,所以打算用selenium模拟登陆后来爬取数据。因为数据集中推文很多,大概两千条,然后底下的评论平均有五个用户吧,所以一共要爬取差不多一万个用户的历史推文,规模还是蛮大的。因此在这个过程中还用到了多进程来加快爬取的速度。
获取cookies
用了selenium来模拟登陆过程,自动输入twitter账号和密码。
import time
from selenium import webdriver
twi_username ="username"
twi_keyword ="password"
browser = webdriver.Chrome()
browser.get(r'https://twitter.com/i/flow/login')#这里睡几秒,等待页面加载。
time.sleep(5)
browser.find_element("xpath","//*[@id=\"layers\"]/div/div/div/div/div/div/div[2]/div[2]/div/div/div[2]/div[2]/div/div/div/div[5]/label/div/div[2]/div/input").send_keys(twi_username)
time.sleep(3)
browser.find_element("xpath","//*[@id=\"layers\"]/div/div/div/div/div/div/div[2]/div[2]/div/div/div[2]/div[2]/div/div/div/div[6]/div").click()
time.sleep(2)
browser.find_element("xpath","//*[@id=\"layers\"]/div/div/div/div/div/div/div[2]/div[2]/div/div/div[2]/div[2]/div[1]/div/div/div[3]/div/label/div/div[2]/div[1]/input").send_keys(twi_keyword)
browser.find_element("xpath","//*[@id=\"layers\"]/div/div/div/div/div/div/div[2]/div[2]/div/div/div[2]/div[2]/div[2]/div/div[1]/div/div/div/div/span/span").click()
time.sleep(2)#使用这个函数可以导出browser的cookies
savedCookies = browser.get_cookies()print(savedCookies)
savedCookies 是一个词典,将cookies打印出来后复制一下,之后会用到。
应用cookies
推特现在对搜索作出了一些限制,首先就是得登陆后才能用搜索,其次就是当搜索次数过多后,会禁用一段时间搜索功能。
为了加快爬取进度,可以用多个推特账号来登陆。
这里参考了这篇文章
这个函数返回了一个浏览器
definit_browser(i):#因为用了多线程,i表示线程编号
savedCookies1=[{}]
savedCookies2=[{}]#两个推特账号,对应俩cookies
savedCookies_list=[savedCookies1,savedCookies3]#通过线程编号来分配cookies
savedCookies=savedCookies_list[i%2]#d和co俩变量是为了之后获取浏览器日志
co = webdriver.chrome.options.Options()
co.add_experimental_option('w3c',False)
d = webdriver.common.desired_capabilities.DesiredCapabilities.CHROME
d['loggingPrefs']={'performance':'ALL'}
d["goog:loggingPrefs"]={"performance":"ALL"}
browser = webdriver.Chrome(desired_capabilities=d,options=co)
browser.get(r'https://twitter.com/i/flow/login')
time.sleep(1)
browser.delete_all_cookies()for cookie in savedCookies:for k in{'name','value','domain','path','expiry'}:# cookie.keys()属于'dict_keys'类,通过list将它转化为列表if k notinlist(cookie.keys()):# saveCookies中的第一个元素,由于记录的是登录前的状态,所以它没有'expiry'的键名,我们给它增加if k =='expiry':
t = time.time()
cookie[k]=int(t)# 时间戳s
browser.add_cookie({k: cookie[k]for k in{'name','value','domain','path','expiry'}})return browser
使用搜索API爬取推文
推特高级搜索里边有很多字段,选定字段后可以搜到相关推文。
这里参考了这篇文章的思路,通过adative.json可以得到推文信息,但是那位大佬用request库来搞的,我不是很了解orz。用selenium做的话,可以通过获取浏览器日志来实现,参考了这篇文章
在长时间爬数据时,可能会遇到一些问题:
- 网不好,所以可能还在加载,所以用了is_search_loading(driver)函数来检测网页是否还在加载
- 被推特限制搜索了,用到is_search_loading(driver)函数
这俩函数原理很简单,就是判断有没有出现相关的元素,如果出现了,就返回True。
如果
defis_search_limitted(driver):#调用该函数时页面可能没有加载完毕,睡个2秒
time.sleep(2)try:
driver.find_element("xpath","/html/body/div[1]/div/div/div[2]/main/div/div/div/div/div/div[3]/div/div/div[2]/div/span/span")returnTrueexcept:returnFalsedefis_search_loading(driver):try:
time.sleep(2)
driver.find_element("xpath","/html/body/div[1]/div/div/div[2]/main/div/div/div/div/div/div[1]/div[1]/div[1]/div/div/div/div/div[2]/div[2]/div/div/div/svg")returnTrueexcept:returnFalsereturndefscrollUntilLoaded(driver):
last_height = driver.execute_script("return document.body.scrollHeight")while is_search_loading(driver):
time.sleep(2)whileTrue:
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(2)
new_height = driver.execute_script("return document.body.scrollHeight")if new_height == last_height:break
last_height = new_height
defscrapUserTweets(browser,username,since_time,until_time):# username = ""# since_time = "2019-08-22"# until_time = "2019-09-22"
browser.get(fr'https://twitter.com/search?q=from%3A{username}%20until%3A{until_time}%20since%3A{since_time}%20-filter%3Alinks%20-filter%3Areplies&src=typed_query')
time.sleep(2)
sleep_time =60#如果搜索限制了,就睡一分钟while is_search_limitted(browser):print("*****search limitted, sleep {} seconds".format(sleep_time))
time.sleep(sleep_time)
browser.get(fr'https://twitter.com/search?q=from%3A{username}%20until%3A{until_time}%20since%3A{since_time}%20-filter%3Alinks%20-filter%3Areplies&src=typed_query')
scrollUntilLoaded(browser)#获取浏览器日志
logs = browser.get_log("performance")
tweets =[]for log in logs:
logjson = json.loads(log["message"])["message"]if logjson['method']=='Network.responseReceived':
params = logjson['params']try:# 找到那啥adaptive.json玩意
requestUrl = params['response']['url']if"adaptive.json"in requestUrl:
requestId = params['requestId']
response_body = browser.execute_cdp_cmd('Network.getResponseBody',{'requestId': requestId})
_content = json.loads(response_body["body"])
tweets_set = _content['globalObjects']['tweets']for w in tweets_set.keys():
tweets.append(tweets_set[w])else:continueexcept:
requestUrl ="None"continueelse:continueiflen(tweets)!=0:
tweets_json=json.loads(json.dumps(tweets, ensure_ascii=False))
user_tweets=[]for i in tweets_json:
tweet ={}
tweet["id"]= i["id"]
tweet["full_text"]= i["full_text"]
tweet["created_at"]= i["created_at"]
user_tweets.append(tweet)return user_tweets
else:returnNone
多进程
这里用了队列来载入需要爬取的推特推文id,通过这个队列来给不同的进程分配任务,每一个进程都从队列获取推文id,然后根据这个推文id来爬取相应的信息,在这个推文id的信息爬取完后,会把这个推文id写入finished.txt中,这样在整个爬取过程中如果程序中断,从中断点能恢复任务,继续爬虫。
import multiprocessing
defmulti_scrap_tree_func( queue, dataset_path,i):
browser = scrap2.init_browser(i)whilenot queue.empty():#从队列中获取推特idfile= queue.get()
file_path = os.path.join(dataset_path,file)
get_tree_user(browser,file_path)
logging.debug(file+" is scrapped")withopen("./data/twitter15/finished.txt",mode="a", encoding="utf-8")as f:
f.write(file+"\n")print("process{}:{} / {} ,{} is finished".format(i,len(twi16_dir_ls)-queue.qsize(),len(twi16_dir_ls),file))if __name__ =='__main__':
lock = multiprocessing.Lock()
queue = multiprocessing.Queue()
continue_file =""#读取已完成爬取的推文idwithopen("./data/twitter15/finished.txt",mode="r",encoding="utf-8")as f:
finished_file_list = f.readlines()for i inrange(len(twi16_dir_ls)):
is_finished =False#如果一个推文id已经完成爬取,就不加入队列中for j in finished_file_list:if j[:-1]==twi16_dir_ls[i]:
is_finished=Trueifnot is_finished:
queue.put(twi16_dir_ls[i])print(queue.qsize())
process_list =[]for i inrange(12):#创建12个进程
process = multiprocessing.Process(target=multi_scrap_tree_func,args=(queue,twi16_dir,i))
process.start()
process_list.append(process)for p in process_list:
p.join()
结语
在有现成API的情况下,用selenium来爬数据确实不够优雅,开12个chrome的性能需求太大了。但是因为我爬的数据量大,以及推特的登陆和请求次数限制,用request和httpx来爬数据在时间上也不会特别快。不过以后还是可以学学用requests和httpx爬数据
版权归原作者 Fishtail 所有, 如有侵权,请联系我们删除。