真不戳!!!🙄
ZY.Zhang
本文档基于B站视频教程
一、爬虫基础简介
1. 爬虫简介
**什么是爬虫:**通过编写程序,模拟浏览器上网,然后让其去互联网上抓取数据的过程。
2. 爬虫合法性探究
爬虫究竟是合法还是违法的?
在法律中是不被禁止的
具有违法风险
善意爬虫 & 恶意爬虫
爬虫带来的风险可以体现在如下两个方面:
爬虫干扰了被访问网站的正常运营
爬虫抓取了受到法律保护的特定类型的数据或信息
如何在编写使用的过程中避免进入局子的厄运?
时常优化自己的程序,避免干扰被访问网站的正常运行
在使用,传播爬取到的数据时,审查抓取到的内容,如果发现了涉及到用户隐私或者商业机密等敏感内容,需要及时停止爬取或者传播。
3. 爬虫初试深入
爬虫在使用场景中的分类:
**通用爬虫:**抓取系统的重要组成部分。抓取的是一整张页面数据。
**聚焦爬虫:**是建立在通用爬虫的基础之上。抓取的是页面中特定的局部内容。
**增量式爬虫:**监测网站中数据更新的情况。只会抓取网站中最新更新出来的数据。
爬虫的矛与盾:
**robots.txt协议:**君子协议。规定了网站中那些数据可以被爬虫爬取,那些数据不允许被爬取。
例如:www.tabao.com/robots.txt
4. http&https协议
(1)http协议
**概念:**就是服务器和客户端进行数据交互的一种形式。
常用请求头信息:
**User-Agent:**请求载体的身份标识
**Connection:**请求完毕后,是断开连接还是保持连接
常用响应头信息:
**Content-Type:**服务器响应回客户端的数据类型
(2)https协议
**概念:**安全的超文本传输协议
(3)加密方式
二、requests模块基础
1. requests第一血
**requests模块:**Python中原生的一款基于网络请求的模块,功能非常强大,简单便捷,效率极高。
**作用:**模拟浏览器发请求。
如何使用:(requests模块的编码流程)
环境的安装: pip install requests
实战编码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import requestsif __name__ == '__main__' : url = 'https://www.sogou.com/' response = requests.get(url = url) page_text = response.text print(page_text) with open('./sogou.html' ,'w' ,encoding = 'utf-8' ) as fp: fp.write(page_text) print('爬取数据结束!' )
2. requests巩固深入案例介绍
(1)简易网页采集器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 '''UA检测:门户网站的服务器会监测对应请求的载体身份标识, 如果检测到请求载体身份标识是某一款浏览器,说明该请求时一个正常的请求; 但是,如果检测到请求的载体身份不是基于某一款浏览器的,则表示该请求为不正常请求(爬虫), 则服务器很有可能拒绝该次请求''' import requestsif __name__ == '__main__' : headers = { 'User-Agent' :'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36' } url = 'https://www.sogou.com/web' kw = input('Enter a word:' ) param ={ 'query' :kw } response = requests.get(url = url,params = param,headers = headers) page_text = response.text fileName = kw + '.html' with open(fileName,'w' ,encoding ='utf-8' ) as fp: fp.write(page_text) print(fileName,'保存成功!!' )
(2)破解百度翻译
post请求(携带了参数)
响应数据是一组json数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import requestsimport jsonif __name__ == '__main__' : post_url = 'https://fanyi.baidu.com/sug' headers = { 'User-Agent' :'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36' } word = input('Enter a word:\n' ) data = { 'kw' :word } response = requests.post(url = post_url,data = data,headers = headers) dict_obj = response.json() print(dict_obj) fileName = word + '.json' fp = open(fileName,'w' ,encoding='utf-8' ) json.dump(dict_obj,fp = fp,ensure_ascii = False ) print('Over!' )
(3)豆瓣电影
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import requestsimport jsonif __name__ == '__main__' : url = 'https://movie.douban.com/j/chart/top_list' param = { 'type' :'24' , 'interval_id' :'100:90' , 'action' :'' , 'start' :'0' , 'limit' :'20' } headers = { 'User-Agent' :'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36' } response = requests.get(url = url,params = param,headers = headers) list_data = response.json() fp = open('./douban.json' ,'w' ,encoding = 'utf-8' ) json.dump(list_data,fp = fp,ensure_ascii = False ) print('Over!' )
3. 作业—肯德基餐厅查询
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import requestsimport jsonif __name__ == '__main__' : post_url = 'https://www.kfc.com.cn/kfccda/ashx/GetStoreList.ashx?op=keyword' keyword = input('请输入要查询的城市:' ) data ={ 'cname' : '' , 'pid' : '' , 'keyword' : keyword, 'pageindex' : '1' , 'pageSize' : '10' } headers = { 'User-Agent' :'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36' } response = requests.post(url = post_url, data = data, headers = headers) page = response.json() for dict in page['Table1' ]: StoreName = dict['storeName' ] address = dict['addressDetail' ] print('StoreName:' + StoreName, 'address:' + address + '\n' )
4. 综合练习—药监总局
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 import requestsimport jsonif __name__ == '__main__' : headers = { 'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36' } id_list = [] all_data_list = [] url = 'http://scxk.nmpa.gov.cn:81/xk/itownet/portalAction.do?method=getXkzsList' for page in range(1 , 11 ): page = str(page) data = { 'on' : 'true' , 'page' : page, 'pageSize' : '15' , 'productName' : '' , 'conditionType' : '1' , 'applyname' : '' , 'applysn' : '' , } json_ids = requests.post(url=url, headers=headers, data=data).json() for dic in json_ids['list' ]: id_list.append(dic['ID' ]) post_url = 'http://scxk.nmpa.gov.cn:81/xk/itownet/portalAction.do?method=getXkzsById' for id in id_list: data = { 'id' : id } json_detail = requests.post(url=post_url, data=data, headers=headers).json() all_data_list.append(json_detail ) all_data_list.append('---------------------------------------------------------' ) fp = open('./allData.json' , 'w' , encoding='utf-8' ) json.dump(all_data_list, fp=fp, ensure_ascii=False , indent= True ) print('Over!' )
三、数据解析
1. 数据解析概述
**聚焦爬虫:**爬取页面中指定的页面内容。
编码流程:1. 指定URL → 2. 发起请求 → 3. 获取响应数据 → 4. 数据解析 → 5. 持久化存储
数据解析分类:
正则表达式
bs4
解析
xpath
解析(重点)
**数据解析原理概述:**解析的局部的文本内容都会在标签对应的属性中进行存储。
进行指定标签的定位
标签或者标签对应的属性中存储的数据值进行提取(解析)
2. 图片数据爬取—正则表达式
操作符
说明
实例
.
表示任意单个字符
[ ]
字符集,对单个字符给出取值范围
[abc]表示a,b,c,[a-z]表示a-z的
[^ ]
非字符集,对单个字符给出排除范围
[^abc]表示非a或b或c的单个字符
*****
前一个字符0次或无限次扩展
abc* 表示ab、abc、abcc、abccc等
+
前一个字符1次或无限次扩展
abc+ 表示abc、abcc、abccc等
?
前一个字符0次或1次扩展
abc? 表示ab、abc
|
左右表达式任意一个
abc|def 表示abc、def
{m}
扩展前一个字符m次
ab{2}c表示abbc
{m,n}
扩展前一个字符m至n次(含n)
ab{1,2}c表示abc、abbc
^
匹配字符串开头
^abc表示abc且在一个字符串的开头
$
匹配字符串结尾
abc$表示abc且在一个字符串的结尾
( )
分组标记,内部只能使用|操作符
(abc)表示abc,(abc|def)表示abc、def
\d
数字,等价于[0-9]
\w
单词字符,等价于[A-Za-z0-9_]
函数
说明
re.search()
在一个字符串中搜索匹配正则表达式的第一个位置,返回match对象
re.match()
从字符串的开始位置起匹配正则表达式,返回match对象
re.findall()
搜搜字符串,以列表类型返回全部能匹配的子串
re.split()
将一个字符串按照正则表达式匹配结果进行分割,返回列表类型
re.finditer()
搜索字符串,返回一个匹配结果的迭代类型,每个迭代元素是match对象
re.sub()
在一个字符串中替换所有匹配正则表达式的子串,返回替换后的字符串
修饰符
描述
re.I
使匹配对大小写不敏感
re.L
做本地化识别匹配
re.M
多行匹配,影响^和$
re.S
使.匹配包括换行在内的所有字符
re.U
根据Unicode字符集解析字符,这个标志影响\w,\W,\b,\B
re.X
该标志通过给予你跟灵活的格式以便你将正则表达式写得更易于理解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 常用的正则表达式 单字符: . : 除换行以外所有字符 [ ] : [aoe] [a-w] 匹配集合中任意一个字符 \d : 数字 [0-9] \D : 非数字 \w : 数字、字母、下划线、中文 \W : 非\w \s : 所有的空白字符包,包括空格、制表符、换页符等等,等价于[ \f \n \r \t \v ] \S : 非空白 数量修饰: \* : 任意多次 >=0 \+ : 至少一次 >=1 ? : 可有可无 0次或者1次 {m} : 固定m次 hello{3,} {m,} : 至少m次 {m,n} : m-n次 边界: \$ : 以某某结尾 ^ : 以某某开头 分组: (ab) 贪婪模式: .\* 非贪婪(惰性)模式: .\*? re.I : 忽略大小写 re.M : 多行匹配 re.S : 单行匹配 re.sub : 正则表达式,替换内容,字符串
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 '''正则练习''' import rekey = "javapythonc++php" re.findall('python' , key)[0 ] key = "<html><h1><hello world><h1></html>" re.findall('<h1>(.*)<h1>' , key)[0 ] string = '我喜欢身高为170的女孩’ re.findall(' \d+', string) #提取出http://和https:// key = ' http://www.baidu.com and https://boob.com' re.findall('https?://', key) key = 'lalala<hTml><hello></HtMl>hahah' re.findall('<[Hh][Tt][mM][lL]>(.*)</[Hh][Tt][mM][lL]>' , key) key = 'bobo@hit.edu.com' re.findall('h.*?\.' , key) key = 'sasa and sas and saaas' re.findall('sa{1,2}s' , key)
1 2 3 4 5 6 7 8 9 import requestsif __name__ == '__main__' : url = 'https://pic.qiushibaike.com/system/pictures/12409/124098453/medium/YNPHJQC101MS31E1.jpg' img_data = requests.get(url = url).content with open('./qiutu.jpg' , 'wb' ) as fp: fp.write(img_data)
3. 正则解析案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 '''<div class="thumb"> <a href="/article/124098472" target="_blank"> <img src="//pic.qiushibaike.com/system/pictures/12409/124098472/medium/HSN2WWN0TP1VUPNG.jpg" alt="糗事#124098472" class="illustration" width="100%" height="auto"> </a> </div>''' import reimport osimport requestsif __name__ == '__main__' : if not os.path.exists('./qiutuLibs' ): os.mkdir('./qiutuLibs' ) url = 'https://www.qiushibaike.com/imgrank/ ' headers = { 'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36' } page_text = requests.get(url=url, headers=headers).text ex = '<div class="thumb">.*?<img src="(.*?)" alt=.*?</div>' img_src_list = re.findall(ex, page_text, re.S) print(img_src_list) for src in img_src_list: src = 'https:' + src img_data = requests.get(url = src, headers = headers).content img_name = src.split('/' )[-1 ] imgPath = './qiutuLibs/' + img_name with open(imgPath, 'wb' ) as fp: fp.write(img_data) print(img_name, '下载成功!' )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import reimport osimport requestsif __name__ == '__main__' : if not os.path.exists('./qiutuLibs' ): os.mkdir('./qiutuLibs' ) url = 'https://www.qiushibaike.com/imgrank/page/%d/' for pageNum in range(1 , 11 ): new_url = format(url % pageNum) headers = { 'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36' } page_text = requests.get(url=new_url, headers=headers).text ex = '<div class="thumb">.*?<img src="(.*?)" alt=.*?</div>' img_src_list = re.findall(ex, page_text, re.S) print(img_src_list) for src in img_src_list: src = 'https:' + src img_data = requests.get(url = src, headers = headers).content img_name = src.split('/' )[-1 ] imgPath = './qiutuLibs/' + img_name with open(imgPath, 'wb' ) as fp: fp.write(img_data) print(img_name, '下载成功!' )
4. bs4解析概述
5. bs4 解析具体讲解
如何实例化 BeautifulSoup 对象:
导包,from bs4 import BeautifulSoup
对象的实例化:
(1)将本地的 html 文档中的数据加载到该对象中;
(2)将互联网上获取的页面源码加载到该对象中。
提供的用于数据解析的方法和属性:
soup.tagName
:返回的是文档中第一次出现的 tagName
标签;
soup.find(tagName)
:可以等同于soup.tagName
;也可以进行属性定位;
soup.find_all( )
:返回符合要求的所有标签;
select('某种选择器(id,class,标签...选择器)')
返回的是一个列表;层级选择器
获取标签之间的文本数据:soup.a.text/string/get_text( )
text/get_text( )
:可以获取某一个标签中所有的文本内容
string
:只可以获取该标签下面直系的文本内容
获取标签中的属性值: soup.a['href']
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 <html lang ="en" > <head > <meta charset ="UTF-8" /> <title > 测试bs4</title > </head > <body > <div > <p > 百里守约</p > </div > <div class ="song" > <p > 李清照</p > <p > 王安石</p > <p > 苏轼</p > <p > 柳宗元</p > <a href ="http://www.song.com/" title ="赵匡胤" target ="_self" > <span > this is span</span > 宋朝是最强大的王朝,不是军队的强大,而是经济很强大,国民都很有钱</a > <a href ="" class ="du" > 总为浮云能蔽日,长安不见使人愁</a > <img src ="http://www.baidu.com/meinv.jpg" alt ="" /> </div > <div class ="tang" > <ul > <li > <a href ="http://www.baidu.com" title ="qing" > 清明时节雨纷纷,路上行人欲断魂,借问酒家何处有,牧童遥指杏花村</a > </li > <li > <a href ="http://www.163.com" title ="qin" > 秦时明月汉时关,万里长征人未还,但使龙城飞将在,不教胡马度阴山</a > </li > <li > <a href ="http://www.126.com" alt ="qi" > 岐王宅里寻常见,崔九堂前几度闻,正是江南好风景,落花时节又逢君</a > </li > <li > <a href ="http://www.sina.com" class ="du" > 杜甫</a > </li > <li > <a href ="http://www.dudu.com" class ="du" > 杜牧</a > </li > <li > <b > 杜小月</b > </li > <li > <i > 度蜜月</i > </li > <li > <a href ="http://www.haha.com" id ="feng" > 凤凰台上凤凰游,凤去台空江自流,吴宫花草埋幽径,晋代衣冠成古丘</a > </li > </ul > </div > </body > </html >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from bs4 import BeautifulSoupif __name__ == '__main__' : fp = open('./test.html' , 'r' , encoding='utf-8' ) soup = BeautifulSoup(fp, 'lxml' ) print(soup.a) print(soup.div) print(soup.find('div' )) print(soup.find('div' , class_='song' )) print(soup.find_all('a' )) print(soup.select('.tang' )) print(soup.select('.tang > ul > li > a' )[0 ]) print(soup.select('.tang > ul a' )[0 ]) print(soup.select('.tang > ul a' )[0 ].text) print(soup.select('.tang > ul a' )[0 ].get_text()) print(soup.select('.tang > ul a' )[0 ].string) print(soup.select('.tang > ul a' )[0 ]['href' ])
6. bs4 解析案例实战
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import requestsfrom bs4 import BeautifulSoupif __name__ == '__main__' : headers = { 'User-Agent' :'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36' } url = 'https://www.shicimingju.com/book/sanguoyanyi.html' response = requests.get(url = url, headers = headers) response.encoding = 'utf-8' page_text = response.text soup = BeautifulSoup(page_text, 'lxml' ) li_list = soup.select('.book-mulu > ul > li' ) fp = open('./sanguo.txt' , 'w' , encoding = 'utf-8' ) for li in li_list: title = li.a.string detail_url ='http://www.shicimingju.com' + li.a['href' ] detail_response = requests.get(url = detail_url, headers = headers) detail_response.encoding = 'utf-8' detail_page_text = detail_response.text detail_soup = BeautifulSoup(detail_page_text, 'lxml' ) div_tag = detail_soup.find('div' , class_ = 'chapter_content' ) content = div_tag.text fp.write(title + ':' + content + '\n' ) print(title, '爬取成功!' )
7. xpath解析基础
**xpath解析:**最常用且最便捷高效的一种解析方式。通用性。
xpath解析原理:
(1)实例化一个etree的对象,且需要将被解析的页面源码数据加载到该对象中;
(2)调用etree对象中的xpath方法结合着xpath表达式实现标签的定位和内容的捕获。
环境的安装: pip install lxml
(lxml解析器)
如何实例化一个etree对象: from lxml import etree
(1)将本地的html文档中的源码数据加载到etree对象中:etree.parse(filePath)
(2)可以将从互联网上获取的源码数据加载到该对象中:etree.HTML('page_text')
xpath(‘xpath表达式’):
其中 / 表示从根节点定位或者表示一个层级;
// 表示多个层级或者从任意位置开始定位;
属性定位:tag[@attrName="attrValue"]
;
索引定位:tag[@attrName="attrValue"]/p[3]
,注意索引从1开始
取文本:/text( )
:获取的是标签中直系的文本内容;//text( )
:标签中非直系的文本内容(所有的文本内容)
取属性:/@attrName ==> img/@src
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from lxml import etreeif __name__ == "__main__" : tree = etree.parse('test.html' ) r = tree.xpath('//div[@class="song"]/img/@src' ) print(r)
8. xpath实战-58二手房
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import requestsfrom lxml import etreeif __name__ == '__main__' : headers = { 'User-Agent' :'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36' } url = 'https://bj.58.com/ershoufang/' page_text = requests.get(url = url,headers = headers).text tree = etree.HTML(page_text) div_list = tree.xpath('//section[@class="list"]/div' ) fp = open('58.txt' ,'w' ,encoding = 'utf-8' ) for div in div_list: title = div.xpath('./a/div[2]//h3/text()' )[0 ] fp.write(title + '\n\n' ) print('---------------Over!------------------' )
9. xpath解析案例
(1)4k图片解析下载
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import requestsfrom lxml import etreeimport osif __name__ == "__main__" : url = 'http://pic.netbian.com/4kmeinv/' headers = { 'User-Agent' :'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36' } response = requests.get(url=url, headers=headers) page_text = response.text tree = etree.HTML(page_text) li_list = tree.xpath('//div[@class="slist"]/ul/li' ) if not os.path.exists('./picLibs' ): os.mkdir('./picLibs' ) for li in li_list: img_src = 'http://pic.netbian.com' +li.xpath('./a/img/@src' )[0 ] img_name = li.xpath('./a/img/@alt' )[0 ]+'.jpg' img_name = img_name.encode('iso-8859-1' ).decode('gbk' ) img_data = requests.get(url=img_src, headers=headers).content img_path = 'picLibs/' +img_name with open(img_path, 'wb' ) as fp: fp.write(img_data) print(img_name, '下载成功!!!' ) print('------------------------OVER!---------------------------------' )
(2)全国城市名称爬取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 import requestsfrom lxml import etreeif __name__ == '__main__' : '''headers = { 'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36' } url = 'https://www.aqistudy.cn/historydata/' page_text = requests.get(url=url,headers=headers).text tree = etree.HTML(page_text) #数据解析 hot_li_list = tree.xpath('//div[@class="bottom"]/ul/li') all_city_names = [] #解析热门城市名字 for li in hot_li_list: hot_city_names = li.xpath('./a/text()')[0] all_city_names.append(hot_city_names) #解析全部城市名字: city_names_list = tree.xpath('.//div[@class="bottom"]/ul/div[2]/li') for li in city_names_list: city_name = li.xpath('./a/text()')[0] all_city_names.append(city_name) print(all_city_names,len(all_city_names))''' headers = { 'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36' } url = 'https://www.aqistudy.cn/historydata/' page_text = requests.get(url=url, headers=headers).text tree = etree.HTML(page_text) a_list = tree.xpath('//div[@class="bottom"]/ul/li/a | //div[@class="bottom"]/ul/div[2]/li/a ' ) all_city_names = [] for a in a_list: a_name = a.xpath('./text()' )[0 ] all_city_names.append(a_name) print(all_city_names, len(all_city_names))
10. xpath作业—爬取站长素材中免费简历模板
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import osimport requestsfrom lxml import etreeif __name__ == '__main__' : if not os.path.exists('./jianli' ): os.mkdir('./jianli' ) headers = { 'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36' } url = 'https://sc.chinaz.com/jianli/free_%d.html' page = int(input('您一共想要爬取多少页:' )) for pageNum in range(1 , page): if pageNum == 1 : new_url = 'https://sc.chinaz.com/jianli/free.html' else : new_url = format(url%pageNum) page_text = requests.get(url = new_url, headers = headers).text tree = etree.HTML(page_text) url_div_list = tree.xpath('//*[@id="container"]/div' ) for detail_url in url_div_list: detail_url = 'https:' + detail_url.xpath('./a/@href' )[0 ] detail_page_text = requests.get(url = detail_url, headers =headers).text tree = etree.HTML(detail_page_text) name = tree.xpath('//h1/text()' )[0 ].encode('iso-8859-1' ).decode('utf-8' ) download_url = tree.xpath('//*[@id="down"]/div[2]/ul/li[1]/a/@href' )[0 ] file_path = 'jianli/' + name + '.rar' download_content = requests.get(url = download_url, headers = headers).content with open(file_path, 'wb' ) as fp: fp.write(download_content) print(name, '下载完成' ) print('-------------------------------OVER!---------------------------------------' )
四、验证码
1. 验证码识别简介
验证码和爬虫之间的爱恨情仇:
反爬机制:验证码。识别验证码图片中的数据,用于模拟登录操作。
识别验证码的操作:
2. 云打码使用流程
注册:用户中心身份
登录:用户中心身份
查询余额,题分是否足够(第一次使用,绑定微信即可免费获赠1000题分;非首次使用,建议小额充值,1元即可)
创建软件ID——用户中心左下角
下载示例代码 ——开发文档
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 from lxml import etreeimport requestsfrom hashlib import md5class Chaojiying_Client (object ): def __init__ (self, username, password, soft_id ): self.username = username password = password.encode('utf8' ) self.password = md5(password).hexdigest() self.soft_id = soft_id self.base_params = { 'user' : self.username, 'pass2' : self.password, 'softid' : self.soft_id, } self.headers = { 'Connection' : 'Keep-Alive' , 'User-Agent' : 'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0)' , } def PostPic (self, im, codetype ): """ im: 图片字节 codetype: 题目类型 参考 http://www.chaojiying.com/price.html """ params = { 'codetype' : codetype, } params.update(self.base_params) files = {'userfile' : ('ccc.jpg' , im)} r = requests.post('http://upload.chaojiying.net/Upload/Processing.php' , data=params, files=files, headers=self.headers) return r.json() def ReportError (self, im_id ): """ im_id:报错题目的图片ID """ params = { 'id' : im_id, } params.update(self.base_params) r = requests.post('http://upload.chaojiying.net/Upload/ReportError.php' , data=params, headers=self.headers) return r.json() def tranformImgCode (imgPath,imgType ): chaojiying = Chaojiying_Client('此处是账户' , '此处是密码' , '此处是软件ID' ) im = open(imgPath, 'rb' ).read() return chaojiying.PostPic(im,imgType)['pic_str' ] print(tranformImgCode('./a.jpg' ,1902 ))
3. 古诗文网验证码识别
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 session = requests.Session() headers = { 'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36' } url = 'https://so.gushiwen.cn/user/login.aspx?from=http://so.gushiwen.cn/user/collect.aspx' page_text = session.get(url=url, headers=headers).text tree = etree.HTML(page_text) img_src = 'https://so.gushiwen.org' + tree.xpath('//*[@id="imgCode"]/@src' )[0 ] img_data = session.get(img_src, headers=headers).content with open('./code.jpg' , 'wb' ) as fp: fp.write(img_data) code_text = tranformImgCode('./code.jpg' , 1902 ) print(code_text) login_url = 'https://so.gushiwen.cn/user/login.aspx?from=http%3a%2f%2fso.gushiwen.cn%2fuser%2fcollect.aspx' data = { '__VIEWSTATE' : 'f1ECt6+6MPtdTZMJtYOYS/7ww2d/DPy9t8JQcIt1QuOneLTbNQuYqPcCjZNbDAbfb9vj3k6f0M7EKTf0YqElM1k1A5ELwyTvUzBii+9LDRBbIMmc/jb0DJPsYfI=' , '__VIEWSTATEGENERATOR' : 'C93BE1AE' , 'from' : 'http://so.gushiwen.cn/user/collect.aspx' , 'email' : '账号' , 'pwd' : '密码' , 'code' : code_text, 'denglu' : '登录' , } page_text_login = session.post(url=login_url, headers=headers, data=data).text with open('./gushiwen.html' , 'w' , encoding='utf-8' ) as fp: fp.write(page_text_login)
五、requests模块高级
1. 模拟登录实现流程梳理
**模拟登录:**爬取基于某些用户的用户信息。
**需求:**对人人网进行模拟登录
点击登录按钮后会发起一个post请求
post请求中会携带登陆之前录入的相关的登录信息(用户名、密码、验证码…)
验证码:每次请求都会动态变化
2. 人人网模拟登录
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import requestsfrom lxml import etreeheaders = { 'User-Agent' :'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36' } url = 'http://www.renren.com/SysHome.do' page_text = response.get(url = url,headers = headers).text tree = etree.HTML(page_text) code_img_src = tree.xpath('//*[@id="verifyPic_login"]/@src' )[0 ] code_img_data = requests.get(url = code_img_src,headers = headers).content with open('./code.jpg' ,'wb' ) as fp: fp.write(code_img_data) login_url = ' ' data = { } response = requests.post(url = login_url,headers = headers,data = data) print(response.satus_code) fp.write(login_page_text)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 '''视频UP主的源代码''' from CodeClass import YDMHttpimport requestsfrom lxml import etreedef getCodeText (imgPath,codeType ): username = 'bobo328410948' password = 'bobo328410948' appid = 6003 appkey = '1f4b564483ae5c907a1d34f8e2f2776c' filename = imgPath codetype = codeType timeout = 20 result = None if (username == 'username' ): print('请设置好相关参数再测试' ) else : yundama = YDMHttp(username, password, appid, appkey) uid = yundama.login(); print('uid: %s' % uid) balance = yundama.balance(); print('balance: %s' % balance) cid, result = yundama.decode(filename, codetype, timeout); print('cid: %s, result: %s' % (cid, result)) return result headers = { 'User-Agent' : 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36' } url = 'http://www.renren.com/SysHome.do' page_text = requests.get(url=url,headers=headers).text tree = etree.HTML(page_text) code_img_src = tree.xpath('//*[@id="verifyPic_login"]/@src' )[0 ] code_img_data = requests.get(url=code_img_src,headers=headers).content with open('./code.jpg' ,'wb' ) as fp: fp.write(code_img_data) result = getCodeText('code.jpg' ,1000 ) print(result) login_url = 'http://www.renren.com/ajaxLogin/login?1=1&uniqueTimestamp=2019431046983' data = { 'email' : 'www.zhangbowudi@qq.com' , 'icode' : result, 'origURL' : 'http://www.renren.com/home' , 'domain' : 'renren.com' , 'key_id' : '1' , 'captcha_type' : 'web_login' , 'password' : '06768edabba49f5f6b762240b311ae5bfa4bcce70627231dd1f08b9c7c6f4375' , 'rkey' : '1028219f2897941c98abdc0839a729df' , 'f' :'https%3A%2F%2Fwww.baidu.com%2Flink%3Furl%3Dgds6TUs9Q1ojOatGda5mVsLKC34AYwc5XiN8OuImHRK%26wd%3D%26eqid%3D8e38ba9300429d7d000000035cedf53a' , } response = requests.post(url=login_url,headers=headers,data=data) print(response.text) print(response.status_code)
3. 模拟登录cookie操作
**需求:**爬取当前用户的相关用户信息(个人主页中显示的用户信息)
**http/https协议:**无状态。
没有请求到对应页面数据的原因:发起的第二次基于个人主页页面请求的时候,服务器并不知道该次请求是基于登录状态下的请求。
**cookie:**用来让服务器端记录客户端的相关状态
手动处理:抓包工具获取 Cookie
值,将值封装到 headers
中(不推荐)
自动处理:
Cookie
值的来源是哪里?模拟登录 post
请求后,由服务器端创建的。
session
会话对象:1. 可以进行请求的发送;2. 如果请求过程中产生了Cookie,则该Cookie会被自动存储/携带在该session对象中。
创建一个session 对象:session = requests.Session( )
使用session 对象进行模拟登录post
请求的发送(Cookie会被存储在session中)
session 对象对个人主页对应的get请求进行发送(携带了Cookie)
1 2 3 4 5 6 7 8 9 10 11 session = requests.Session() '''手动获取Cookie(不推荐) headers = { ‘'Cookie':'xxxx' }''' detail_url = 'http://www.renren.com/976279344/profile' detail_page_test = session.get(url = detail_url,headers = headers).text with open('bobo.html' ,'w' ,encoding = 'utf-8' ) as fp: fp.write(detail_page_test)
4. 代理理论讲解
**代理:**破解封 IP 这种反爬机制。
**什么是代理?**代理服务器。
代理的作用:
突破自身 IP 被访问的限制
可以隐藏自身真实的 IP,免受攻击
相关网站:
代理 ip 的类型:
http:只能应用到 http 协议对应的 url 中
https:只能应用到 https 协议对应的 url 中
代理ip的匿名度:
透明:服务器知道该次请求使用了代理,也知道请求对应的真实 ip
匿名:知道使用了代理,不知道真实 ip
高匿:不知道使用了代理,也不知道真实 ip
5. 代理在爬虫中的应用
1 2 3 4 5 6 7 8 9 import requestsurl = 'http://www.baidu.com/s?wd=ip' headers = { 'User-Agent' :'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36' } page_text = requests.get(url = url, headers = headers, proxies = {"http" : "http://124.205.155.153:9090" }).text with open('ip.html' , 'w' , encoding = 'utf-8' ) as fp: fp.write(page_text)
六、高性能异步爬虫
1. 异步爬虫概述
**同步:**不同程序单元为了完成某个任务,在执行过程中需靠某种通信方式以协调一致,称这些程序单元是同步执行的。 例如购物系统中更新商品库存,需要用 “行锁” 作为通信信号,让不同的更新请求强制排队顺序执行,那更新库存的操作是同步的。 简言之,同步意味着有序。
**异步:**为完成某个任务,不同程序单元之间过程中无需通信协调,也能完成任务的方式,不相关的程序单元之间可以是异步的。 例如,爬虫下载网页。调度程序调用下载程序后,即可调度其他任务,而无需与该下载任务保持通信以协调行为。不同网页的下载、保存等操作都是无关的,也无需相互通知协调。这些异步操作的完成时刻并不确定。 简言之,异步意味着无序。
**目的:**在爬虫中使用异步实现高性能的数据爬取操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import requestsheaders = { 'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36' } urls = [ 'https://downsc.chinaz.net/Files/DownLoad/jianli/202102/jianli14667.rar' , 'https://downsc.chinaz.net/Files/DownLoad/jianli/202102/jianli14665.rar' , 'https://downsc.chinaz.net/Files/DownLoad/jianli/202102/jianli14648.rar' ] def get_content (url ): print('正在爬取:' , url) response = requests.get(url=url, headers=headers) if response.status_code == 200 : return response.content def parse_content (content ): print('响应数据的长度为:' , len(content)) for url in urls: content = get_content(url) parse_content(content)
2. 多线程and多线程
异步爬虫的方式:
多线程,多进程:(不建议)
好处:可以为相关阻塞的操作单独开启线程或者进程,阻塞操作就可以异步执行
弊端:无法无限制的开启多线程或者多进程
3. 线程池and进程池
线程池、进程池:(适当使用)
好处:可以降低系统对进程或者线程创建和销毁的一个频率,从而很好地降低系统地开销。
弊端:池中线程或进程地数量是有上限的。
4. 线程池的基本使用
1 2 3 4 5 6 7 8 9 10 11 12 13 import timedef get_page (str ): print('正在下载:' ,str) time.sleep(2 ) print('下载成功:' ,str) name_list = ['xiaozi' ,'aa' ,'bb' ,'cc' ] start_time = time.time() for i in range(len(name_list)): get_page(name_list[i]) end_time = time.time() print('%d second' % (end_time-start_time))
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import timefrom multiprocessing.dummy import Poolstart_time = time.time() def get_page (str ): print('正在下载:' , str) time.sleep(2 ) print('下载成功:' , str) name_list = ['xiaozi' ,'aa' ,'bb' ,'cc' ] pool = Pool(4 ) pool.map(get_page, name_list) end_time = time.time() print(end_time - start_time)
5. 线程池案例应用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 import requestsimport osfrom multiprocessing.dummy import Poolfrom lxml import etreeimport randomheaders = { 'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36' } if __name__ == '__main__' : if not os.path.exists('./video' ): os.mkdir('./video' ) url = 'https://www.pearvideo.com/category_5' page_text = requests.get(url=url, headers=headers).text tree = etree.HTML(page_text) li_list = tree.xpath('//ul[@id="listvideoListUl"]/li' ) urls = [] for li in li_list: detail_url = 'https://www.pearvideo.com/' + li.xpath('./div/a/@href' )[0 ] name = li.xpath('./div/a/div[2]/text()' )[0 ] + '.mp4' detail_page_text = requests.get(url=detail_url, headers=headers).text detail_tree = etree.HTML(detail_page_text) name = detail_tree.xpath('//*[@id="detailsbd"]/div[1]/div[2]/div/div[1]/h1/text()' )[0 ] str_ = str(li.xpath('./div/a/@href' )[0 ]).split('_' )[1 ] ajax_url = 'https://www.pearvideo.com/videoStatus.jsp?' params = { 'contId' : str_, 'mrd' : str(random.random()) } ajax_headers = { 'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36' , 'Referer' : 'https://www.pearvideo.com/video_' + str_ } dic_obj = requests.get(url=ajax_url, params=params, headers=ajax_headers).json() video_url = dic_obj["videoInfo" ]['videos' ]["srcUrl" ] video_true_url = '' s_list = str(video_url).split('/' ) for i in range(0 , len(s_list)): if i < len(s_list) - 1 : video_true_url += s_list[i] + '/' else : ss_list = s_list[i].split('-' ) for j in range(0 , len(ss_list)): if j == 0 : video_true_url += 'cont-' + str_ + '-' elif j == len(ss_list) - 1 : video_true_url += ss_list[j] else : video_true_url += ss_list[j] + '-' dic = { 'name' : name, 'url' : video_true_url } urls.append(dic) def get_video_data (dic ): urll = dic['url' ] data = requests.get(url=urll, headers=headers).content path = './video/' + dic['name' ] + '.mp4' print(dic['name' ], '正在下载.......' ) with open(path, 'wb' ) as fp: fp.write(data) print(dic['name' ]+ '.mp4' , '下载成功!' ) pool = Pool(4 ) pool.map(get_video_data, urls) pool.close() pool.join()
6. 协程相关概念回顾
7. 协程相关操作回顾
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import asyncioasync def request (url ): print('正在请求的url是' ,url) print('请求成功,' ,url) return url c = request('www.baidu.com' ) def callback_func (task ): print(task.result()) loop = asyncio.get_event_loop() task = asyncio.ensure_future(c) task.add_done_callback(callback_func) loop.run_until_complete(task)
8. 多任务异步协程实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import timeimport asyncioasync def request (url ): print('正在下载' ,url) await asyncio.sleep(2 ) print('下载完毕' ,url) start = time.time() urls =[ 'www.baidu.com' , 'www.sougou.com' , 'www.goubanjia.com' ] stasks = [] for url in urls: c = request(url) task = asyncio.ensure_future(c) stasks.append(task) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(stasks)) print(time.time()-start)
9. aiohttp 模块引出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import requestsimport asyncioimport timestart = time.time() urls = [ 'http://127.0.0.1:1080/bobo' , 'http://127.0.0.1:1080/jay' , 'http://127.0.0.1:1080/tom' ] async def get_page (url ): print('正在下载' , url) response = requests.get(url = url) print('下载完毕' , response.text) tasks = [] for url in urls: c = get_page(url) task = asyncio.ensure_future(c) tasks.append(task) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) end = time.time() print('总耗时:' , end-start)
10. aiohttp + 多任务异步协程实现异步爬虫
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import asyncioimport timeimport aiohttpstart = time.time() urls = [ 'http://www.baidu.com' , 'http://www.sougou.com' , 'http://www.taobao.com' ] async def get_page (url ): async with aiohttp.ClientSession() as session: async with await session.get(url) as response: page_text = await response.text() tasks = [] for url in urls: c = get_page(url) task = asyncio.ensure_future(c) tasks.append(task) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) end = time.time() print('总耗时:' , end-start)
七、动态加载数据处理
1. selenium简介
2. selenium初试
selenium使用流程:
环境安装:pip install selenium
下载一个对应浏览器的驱动程序(以谷歌浏览器为例)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from selenium import webdriverfrom lxml import etreefrom time import sleepbro = webdriver.Chrome(executable_path='./chromedriver.exe' ) bro.get('http://scxk.nmpa.gov.cn:81/xk/' ) page_text = bro.page_source tree = etree.HTML(page_text) li_list = tree.xpath('//ul[@id="gzlist"]/li' ) for li in li_list: name = li.xpath('./dl/@title' )[0 ] print(name) sleep(5 ) bro.quit()
3. selenium其他自动化操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from selenium import webdriverfrom time import sleepbro = webdriver.Chrome(executable_path='./chromedriver.exe' ) bro.get('https://www.taobao.com/' ) search_input = bro.find_element_by_id('q' ) search_input.send_keys('iphone' ) bro.execute_script('window.scrollTo(0,document.body.scrollHeight)' ) sleep(2 ) btn = bro.find_element_by_css_selector('.btn-search' ) btn.click() bro.get('https://baidu.com/' ) sleep(2 ) bro.back() sleep(2 ) bro.forward() sleep(5 ) bro.quit()
4. iframe 处理+动作链
**selenium
处理iframe
:**
如果定位的标签存在于iframe标签之中,则必须使用switch_to.frame(id)
动作链(拖动):from selenium.webdriver import ActionChains
实例化一个动作链对象:action = ActionChains(bro)
click_and_hold(div)
:长按且点击
move_by_offset(x,y)
perform( )
:让动作链立即执行
action.release( )
:释放动作链对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from selenium import webdriverfrom time import sleepfrom selenium.webdriver import ActionChainsbro = webdriver.Chrome(executable_path='./chromedriver.exe' ) bro.get('https://www.runoob.com/try/try.php?filename=jqueryui-example-droppable' ) bro.switch_to.frame('iframeResult' ) div = bro.find_element_by_id('draggable' ) action = ActionChains(bro) action.click_and_hold(div) for i in range(5 ): action.move_by_offset(11 , 0 ).perform() sleep(0.3 ) action.release() bro.quit()
5. selenium模拟登录QQ空间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from selenium import webdriverfrom time import sleepbro = webdriver.Chrome(executable_path='./chromedriver.exe' ) bro.get('https://qzone.qq.com/' ) bro.switch_to.frame('login_frame' ) a_tag = bro.find_element_by_id('switcher_plogin' ) a_tag.click() userName_tag = bro.find_element_by_id('u' ) password_tag = bro.find_element_by_id('p' ) sleep(1 ) userName_tag.send_keys('QQ号码' ) password_tag.send_keys('QQ密码' ) sleep(1 ) btn = bro.find_element_by_id('login_button' ) btn.click() sleep(3 ) bro.quit()
6. 无头浏览器+规避操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from selenium import webdriverfrom time import sleepfrom selenium.webdriver.chrome.options import Optionsfrom selenium.webdriver import ChromeOptionschrome_options = Options() chrome_options.add_argument('--headless' ) chrome_options.add_argument('--disable-gpu' ) option = ChromeOptions() option.add_experimental_option('excludeSwitches' , ['enable-automation' ]) bro = webdriver.Chrome(executable_path='./chromedriver.exe' , chrome_options=chrome_options,options=option) bro.get('https://www.baidu.com' ) print(bro.page_source) sleep(2 ) bro.quit()
7. 超级鹰的基本使用
**超级鹰:**https://www.chaojiying.com/about.html
注册:普通用户
登录:普通用户
题分查询:充值
软件ID——创建一个软件ID
下载示例代码
8. 12306模拟登录
编码流程:
使用selenium
打开登录界面
对当前selenium
打开的这张界面进行截图
对截取的图片进行局部区域(验证码图片)的裁剪
使用超级鹰识别验证码图片(坐标)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 import requestsfrom hashlib import md5class Chaojiying_Client (object ): def __init__ (self, username, password, soft_id ): self.username = username password = password.encode('utf8' ) self.password = md5(password).hexdigest() self.soft_id = soft_id self.base_params = { 'user' : self.username, 'pass2' : self.password, 'softid' : self.soft_id, } self.headers = { 'Connection' : 'Keep-Alive' , 'User-Agent' : 'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0)' , } def PostPic (self, im, codetype ): """ im: 图片字节 codetype: 题目类型 参考 http://www.chaojiying.com/price.html """ params = { 'codetype' : codetype, } params.update(self.base_params) files = {'userfile' : ('ccc.jpg' , im)} r = requests.post('http://upload.chaojiying.net/Upload/Processing.php' , data=params, files=files, headers=self.headers) return r.json() def ReportError (self, im_id ): """ im_id:报错题目的图片ID """ params = { 'id' : im_id, } params.update(self.base_params) r = requests.post('http://upload.chaojiying.net/Upload/ReportError.php' , data=params, headers=self.headers) return r.json() from selenium import webdriverimport timefrom PIL import Imagefrom selenium.webdriver import ActionChainsbro = webdriver.Chrome(executable_path='./chromedriver.exe' ) bro.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument" , { "source" : """ Object.defineProperty(navigator, 'webdriver', { get: () => undefined }) """ }) bro.get('https://kyfw.12306.cn/otn/resources/login.html' ) bro.maximize_window() time.sleep(1 ) zhanghao_tag = bro.find_element_by_class_name('login-hd-account' ) zhanghao_tag.click() time.sleep(1 ) bro.save_screenshot('aa.png' ) code_img_ele = bro.find_element_by_class_name('touclick-wrapper' ) location = code_img_ele.location print('location:' , location) size = code_img_ele.size print('size:' , size) rangle = (location['x' ]*1.25 , location['y' ]*1.25 , (location['x' ]+size['width' ])*1.25 , (location['y' ]+size['height' ])*1.25 ) i = Image.open('./aa.png' ) code_img_name = './code.png' frame = i.crop(rangle) frame.save(code_img_name) time.sleep(3 ) chaojiying = Chaojiying_Client('超级🦅账号' , '超级🦅密码' , '软件ID' ) im = open('code.png' , 'rb' ).read() print(chaojiying.PostPic(im, 9004 )['pic_str' ]) result = chaojiying.PostPic(im, 9004 )['pic_str' ] all_list = [] if '|' in result: list_1 = result.split('|' ) count_1 = len(list_1) for i in range(count_1): xy_list = [] x = int(list_1[i].split(',' )[0 ]) y = int(list_1[i].split(',' )[1 ]) xy_list.append(x) xy_list.append(y) all_list.append(xy_list) else : x = int(result.split(',' )[0 ]) y = int(result.split(',' )[1 ]) xy_list = [] xy_list.append(x) xy_list.append(y) all_list.append(xy_list) print(all_list) for l in all_list: x = l[0 ] y = l[1 ] ActionChains(bro).move_to_element_with_offset(code_img_ele, x/1.25 , y/1.25 ).click().perform() time.sleep(1 ) bro.find_element_by_id('J-userName' ).send_keys('12306账号' ) time.sleep(1 ) bro.find_element_by_id('J-password' ).send_keys('12306密码' ) time.sleep(1 ) bro.find_element_by_id('J-login' ).click() time.sleep(5 ) bro.quit()
八、scrapy框架
1. scrapy框架初识
2. scrapy基本使用
scrapy框架的基本使用:
环境的安装:
mac or linux:pip install scrapy
windows:
创建一个工程:scrapy startproject xxxPro
cd xxxPro
在spiders子目录中创建一个爬虫文件
scrapy genspider spiderName www.xxx.com
执行工程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import scrapyclass FirstSpider (scrapy.Spider ): name = 'first' start_urls = ['https://www.baidu.com/' , 'https://www.sogou.com/' ] def parse (self, response ): print(response)
3. scrapy数据解析操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import scrapyclass QiubaiSpider (scrapy.Spider ): name = 'qiubai' start_urls = ['https://www.qiushibaike.com/text/' ] def parse (self, response ): div_list = response.xpath('//div[@id="col1 old-style-col1"]/div' ) for div in div_list: author = div.xpath('./div[1]/a[2]/h2/text()' )[0 ].extract() content = div.xpath('./a[1]/div/span//text()' ).extract() content = '' .join(content) print(author,content) break
4. 基于终端指令的持久化存储
scrapy持久化存储:
基于终端指令:
要求:只可以将parse方法的返回值存储到本地的文本文件中
注意:持久化存储对应的文本文件类型只可以为:json、jsonlines、jl、csv、xml、marshal、pickle
指令:scrapy crawl xxx -o filePath
好处:简洁高效便捷
缺点:局限性比较强(数据只可以存储到指定后缀的文本文件中)
5. 基于管道持久化存储操作
基于管道:
编码流程:
数据解析
在item类中定义相关的属性
将解析的数据封装到item类型的对象
将item类型的对象提交给管道进行持久化存储的操作
在管道类的process_item中要将其接收到的item对象中存储的数据进行持久化存储操作
在配置文件中开启管道
好处:
面试题:将爬取到的数据一份存储到本地,一份存储到数据库,如何实现?
管道文件中一个管道类对应的是将数据存储到一种平台
爬虫文件提交的item只会给管道文件中第一个被执行的管道类接收
process_item
中的return item
表示将item传递给下一个即将被执行的管道类
6. 全站数据爬取
**基于spider的全站数据爬取:**就是将网站中某板块下的全部页码对应的页面数据进行爬取。
爬取:校花网明星写真的名称
实现方式:
将所有页面的url
添加到start_urls
列表(不推荐)
自行手动进行请求发送(推荐)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 '''------------校花网xiaohua.py----------------''' import scrapyclass XiaohuaSpider (scrapy.Spider ): name = 'xiaohua' start_urls = ['http://www.521609.com/tuku/mxxz/' ] url = 'http://www.521609.com/tuku/mxxz/index_%d.html' page_num = 2 def parse (self, response ): li_list = response.xpath('/html/body/div[4]/div[3]/ul/li' ) for li in li_list: img_name = li.xpath('./a/p/text()' ).extract_first() print(img_name) if self.page_num <= 28 : new_url = format(self.url%self.page_num) self.page_num += 1 yield scrapy.Request(url=new_url,callback=self.parse) '''---------------校花网pipelines.py--------------------''' class XiaohuaproPipeline (object ): def process_item (self, item, spider ): return item '''----------------校花网settings.py部分代码---------------------------''' ROBOTSTXT_OBEY = False LOG_LEVEL = 'ERROR' USER_AGENT = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36'
7. 五大核心组件
五大核心组件:
Spiders:
引擎(Scrapy Engine):
调度器(Scheduler):
下载器(Downloader):
负责获取页面数据并提供给引擎,而后提供给Spider
项目管道(Item Pipeline):
负责处理爬虫从网页中抽取的实体,页面被爬虫解析所需的数据存入item后,将被发送到管道,经过特定的次序处理数据,最后存入本地文件或者数据库。
8. 请求传参
**使用场景:**如果爬取解析的数据不在同一张页面中。(深度爬取)
**需求:**爬取boss的岗位名称和岗位描述
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import scrapyfrom bossPro.items import BossproItemclass BossSpider (scrapy.Spider ): name = 'boss' start_urls = ['https://www.zhipin.com/c100010000/?page=1&ka=page-1' ] url = 'https://www.zhipin.com/c100010000/?page=%d' page_num = 2 def parse_detail (self,response ): item = response.meta['item' ] job_desc = response.xpath('//*[@id="main"]/div[3]/div/div[2]/div[2]/div[1]/div//text()' ).extract() job_desc = '' .join(job_desc) print(job_desc) item['job_desc' ] = job_desc yield item def parse (self, response ): li_list = response.xpath('//*[@id="main"]/div/div[2]/ul/li' ) for li in li_list: item = BossproItem() job_name = li.xpath('.//div/div[1]/div[1]/div/div[1]/span[1]/a/text()' ).extract_first() item['job_name' ] = job_name print(job_name) detail_url = 'https://www.zhipin.com' + li.xpath('.//div/div[1]/div[1]/div/div[1]/span[1]/a/@href' ).extract_first() yield scrapy.Request(detail_url,callback=self.parse_detail,meta={'item' :item}) if self.page_num <= 5 : new_url = format(self.url%self.page_num) self.page_num += 1 yield scrapy.Request(new_url,callback=self.parse)
9. scrapy图片爬取
图片数据爬取之ImagesPipline:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 '''----------------爬取站长素材高清图片 img.py-----------------------''' import scrapyfrom imgsPro.items import ImgsproItemclass ImgSpider (scrapy.Spider ): name = 'img' start_urls = ['http://sc.chinaz.com/tupian/' ] def parse (self, response ): div_list = response.xpath('//div[@id="container"]/div' ) for div in div_list: src = 'https:' + div.xpath('./div/a/img/@src2' ).extract_first() item = ImgsproItem() item['src' ] = src yield item '''----------------------爬取站长素材高清图片 pipelines.py---------------------------''' from scrapy.pipelines.images import ImagesPipelineimport scrapyclass imgsPileLine (ImagesPipeline ): def get_media_requests (self, item, info ): yield scrapy.Request(item['src' ]) def file_path (self, request, response=None, info=None ): imgName = request.url.split('/' )[-1 ] return imgName def item_completed (self, results, item, info ): return item '''---------------------------------爬取站长素材高清图片 items.py-----------------------------''' import scrapyclass ImgsproItem (scrapy.Item ): src = scrapy.Field() '''------------------------------爬取站长素材高清图片 setting.py部分代码-------------------''' IMAGES_STORE = './imgs_ZYZhang' ITEM_PIPELINES = { 'imgsPro.pipelines.imgsPileLine' : 300 , } LOG_LEVEL = 'ERROR' USER_AGENT = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36' ROBOTSTXT_OBEY = False
10. 中间件
下载中间件:
位置:引擎和下载器之间
作用:批量拦截到整个工程中所有的请求和响应
拦截请求:
UA伪装:process_request
代理IP:process_exception:return request
拦截响应:
11. 网易新闻
**需求:**爬取网易新闻的新闻数据(标题和内容)
通过网易新闻的首页解析出几大板块对应的详情页的url(经验证,无动态加载)
每个板块点击后,其中的新闻标题都是动态加载出来的(动态加载)
通过解析出每一条新闻详情页的url,获取详情页的页面源码,解析出新闻内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 '''-------------------------------网易新闻 wangyi.py------------------------''' import scrapyfrom selenium import webdriverfrom wangyiPro.items import WangyiproItemclass WangyiSpider (scrapy.Spider ): name = 'wangyi' start_urls = ['https://news.163.com/' ] models_urls = [] def __init__ (self ): self.bro = webdriver.Chrome(executable_path='F:\PythonProjects\爬虫\动态加载数据处理\chromedriver.exe' ) def parse (self, response ): li_list = response.xpath('//*[@id="index2016_wrap"]/div[1]/div[2]/div[2]/div[2]/div[2]/div/ul/li' ) alist = [3 ,4 ,6 ,7 ,8 ] for index in alist: model_url = li_list[index].xpath('./a/@href' ).extract_first() self.models_urls.append(model_url) for url in self.models_urls: yield scrapy.Request(url,callback=self.parse_model) def parse_model (self,response ): div_list = response.xpath('/html/body/div/div[3]/div[4]/div[1]/div/div/ul/li/div/div' ) for div in div_list: title = div.xpath('./div/div[1]/h3/a/text()' ).extract_first() new_detail_url = div.xpath('./div/div[1]/h3/a/@href' ).extract_first() item = WangyiproItem() item['title' ] = title yield scrapy.Request(url=new_detail_url, callback=self.parse_detail, meta={'item' : item}) def parse_detail (self,response ): content = response.xpath('//*[@id="content"]/div[2]//text()' ).extract() content = '' .join(content) item = response.meta['item' ] item['content' ] = content yield item def closed (self, spider ): self.bro.quit() '''-------------------------------网易新闻 pipelines.py-----------------------------------''' class WangyiproPipeline (object ): def process_item (self, item, spider ): print(item) return item '''-------------------------------网易新闻 middlewares.py-------------------------''' from scrapy import signalsfrom scrapy.http import HtmlResponsefrom time import sleepclass WangyiproDownloaderMiddleware (object ): def process_request (self, request, spider ): return None def process_response (self, request, response, spider ): bro = spider.bro if request.url in spider.models_urls: bro.get(request.url) sleep(3 ) page_text = bro.page_source new_response = HtmlResponse(url=request.url, body=page_text, encoding='utf-8' , request=request) return new_response else : return response def process_exception (self, request, exception, spider ): pass '''-----------------------------网易新闻 setting.py部分代码---------------------------------''' USER_AGENT = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36' ROBOTSTXT_OBEY = False DOWNLOADER_MIDDLEWARES = { 'wangyiPro.middlewares.WangyiproDownloaderMiddleware' : 543 , } ITEM_PIPELINES = { 'wangyiPro.pipelines.WangyiproPipeline' : 300 , } LOG_LEVEL = 'ERROR'
12. CrawlSpider的全站数据爬取
**CrawlSpider:**基于Spider的一个子类
全站数据爬取的方式
基于Spider:手动请求发送
基于CrawlSpider
CrawlSpider的使用:
创建一个工程
cd XXX
创建爬虫文件(CrawlSpider)
scrapy genspider -t crawl xxx www.xxxx.com
**链接提取器(LinkExtractor):**根据指定规则(allow=“正则”)进行指定链接的提取
**规则解析器(Rule):**将链接提取器提取到的链接进行指定规则(callback)的解析操作
**需求:**爬取阳光热线网站中的编号,新闻标题,新闻内容,标号
分析:爬取的数据没有在同一张页面中
可以使用链接提取器提取所有的页码链接
让链接提取器提取所有的问政详情页链接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 '''---------------------阳光问政 sun.py---------------------------''' '''网站页面源码跟视频课有改动,建议follow先改False爬一下,不然容易被封IP,有兴趣的可以改改,搞个代理啥的再爬''' import scrapyfrom scrapy.linkextractors import LinkExtractorfrom scrapy.spiders import CrawlSpider, Rulefrom sunPro.items import SunproItem, DetailItemclass SunSpider (CrawlSpider ): name = 'sun' start_urls = ['http://wz.sun0769.com/political/index/politicsNewest?id=1&page=' ] link = LinkExtractor(allow=r'id=1&page=\d+' ) link_detail = LinkExtractor(allow=r'index\?id=\d+' ) rules = ( Rule(link, callback='parse_item' , follow=False ), Rule(link_detail, callback='parse_detail' ) ) def parse_item (self, response ): li_list = response.xpath('/html//div[2]/div[3]/ul[2]/li' ) for li in li_list: new_num = li.xpath('./span[1]/text()' ).extract_first() new_title = li.xpath('./span[3]/a/text()' ).extract_first() item = SunproItem() item['title' ] = new_title item['new_num' ] = new_num yield item def parse_detail (self,response ): new_id = response.xpath('/html//div[3]/div[2]/div[2]/div[1]/span[4]/text()' ).extract_first().strip().replace("\r\n" , "" ).replace(" " , "" ) new_content = response.xpath('/html//div[3]/div[2]/div[2]/div[2]/pre/text()' ).extract() new_content = '' .join(new_content) item = DetailItem() item['content' ] = new_content item['new_id' ] = new_id yield item '''-------------------------------pipelines.py------------------------------''' class SunproPipeline (object ): def process_item (self, item, spider ): if item.__class__.__name__ == 'DetailItem' : print(item['new_id' ],item['content' ]) else : print(item['new_num' ],item['title' ]) return item '''---------------------------items.py----------------------''' import scrapyclass SunproItem (scrapy.Item ): title = scrapy.Field() new_num = scrapy.Field() class DetailItem (scrapy.Item ): new_id = scrapy.Field() content = scrapy.Field()
13. 分布式概述及搭建
分布式爬虫:
概念:我们需要搭建一个分布式的机群,让其对一组资源进行分布联合爬取。
作用:提升爬取数据的效率
如何实现分布式?
安装一个scrapy-redis的组件
原生的scrapy是不可以实现分布式爬虫的,必须要让scrapy-redis组件一起实现分布式爬虫。
为什么原生的scrapy不可以实现分布式?
调度器不可以被分布式机群共享
管道不可以被分布式机群共享
scrapy-redis组件作用:
可以给原生的scrapy框架提供可以被共享的管道 和调度器 。
scrapy-redis实现流程:
14. 增量式爬虫
**概念:**监测网站数据更新的情况,只会爬取网站最新更新出来的数据。
分析:
指定一个起始url
基于CrawlSpider获取其他页码链接
基于Rule将其他页码链接进行请求
从每一个页码对应的页面源码中解析出每一个电影详情页的URL
核心:检测电影详情页的url之前有没有请求过
将爬取过的电影详情页的url存储
存储到redis的set数据结构
对详情页的url发起请求,然后解析出电影的名称和简介
进行持久化存储
九、补充——异步编程
为什么要讲?
这一部分的知识点不太容易学习(异步非阳塞、 asyncio)
异步相关话题和框架越来越多,例如:tornado、fastapi、django 3.x asgi、aiohttp都在异步→提升性能
如何讲解?
第一部分:协程
第二部分:asyncio模块进行异步编程
第三部分:实战案例
1. 协程
协程不是计算机提供,程序员人为创造。
协程( Coroutine),也可以被称为微线程,是一种用户态内的上下文切换技术。简而言之,其实就是通过一个线程实现代码块相互切换执行。
1 2 3 4 5 6 7 8 9 10 def func1 (): print(1 ) ... print(2 ) def func2 (): print(3 ) ... print(4 ) func1() func2()
实现协程的集中方法:
greelet,早期模块
yield关键字
asyncio装饰器(py3.4及以后版本)
async、await关键字(py3.5及以后版本)
(1)greenlet实现协程
pip install greenlet
1 2 3 4 5 6 7 8 9 10 11 12 13 from greenlet import greenletdef func1 (): print(1 ) gr2.switch() print(2 ) gr2.switch() def func2 (): print(3 ) gr1.switch() print(4 ) gr1 = greenlet(func1) gr2 = greenlet(func2) gr1.switch()
(2)yield关键字
1 2 3 4 5 6 7 8 9 10 def func1 (): yield 1 yield from func2() yield 2 def func2 (): yield 3 yield 4 f1 = func1() for item in f1: print(item)
(3)asyncio装饰器
==遇到IO阻塞自动切换==
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import asyncio@asyncio.coroutine def func1 (): print(1 ) yield from asyncio.sleep(2 ) print(2 ) @asyncio.coroutine def func2 (): print(3 ) yield from asyncio.sleep(2 ) print(4 ) tasks = [ asyncio.ensure_future(func1()), asyncio.ensure_future(func2()) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
(4)async、await关键字(推荐)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import asyncioasync def func1 (): print(1 ) await asyncio.sleep(2 ) print(2 ) async def func2 (): print(3 ) await asyncio.sleep(2 ) print(4 ) tasks = [ asyncio.ensure_future(func1()), asyncio.ensure_future(func2()) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
2. 协程的意义
在一个线程中,如果遇到IO等待的时间,线程不会等待,利用空闲的时间去做其他的事情。
需求:下载三张图片(网络IO)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 '''普通的request方式''' import requestsdef download_image (url ): print('开始下载:' , url) response = requests.get(url) print('下载完成' ) file_name = url.rsplit('-' )[-1 ] with open(file_name, mode='wb' ) as file_object: file_object.write(response.content) if __name__ == '__main__' : url_list = [ 'https://pic.netbian.com/uploads/allimg/210302/000706-1614614826df15.jpg' , 'https://pic.netbian.com/uploads/allimg/210228/010301-1614445381005c.jpg' , 'https://pic.netbian.com/uploads/allimg/190902/152344-1567409024af8c.jpg' ] for item in url_list: download_image(item)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 '''使用aiohttp模块下载 协程方式''' import aiohttpimport asyncioimport timestart = time.time() async def fetch (session, url ): print('发送请求:' , url) async with session.get(url, verify_ssl = False ) as response: content = await response.content.read() file_name = url.rsplit('-' )[-1 ] with open(file_name, mode='wb' ) as file_object: file_object.write(content) print('下载完成' , url) async def main (): async with aiohttp.ClientSession() as session: url_list = [ 'https://pic.netbian.com/uploads/allimg/210302/000706-1614614826df15.jpg' , 'https://pic.netbian.com/uploads/allimg/210228/010301-1614445381005c.jpg' , 'https://pic.netbian.com/uploads/allimg/190902/152344-1567409024af8c.jpg' ] tasks = [asyncio.create_task(fetch(session, url)) for url in url_list] await asyncio.wait(tasks) if __name__ == '__main__' : loop = asyncio.get_event_loop() loop.run_until_complete(main()) print(time.time() - start)
3. 异步编程
(1)事件循环
**概念:**理解为一个死循环,去检测并执行某些代码。
1 2 3 4 5 6 7 8 9 # 伪代码 任务列表 = [任务1 , 任务2 , 任务3 ....] while True: 可执行的任务列表,已完成的任务列表-->去任务列表中检测所有的任务,将“可执行”和“已完成”的任务返回 for 就绪任务 in 可执行的任务列表: 执行已就绪的任务 for 已完成的任务 in 已完成的任务列表: 在任务列表中移除 已完成的任务 如果 任务列表 中的任务都已经完成,则终止循环。
1 2 3 4 5 6 import asyncioloop = asyncio.get_event_loop() loop.run_until_complete(task)
(2)快速上手
**协程函数:**定义函数时 async def 函数名
**协程对象:**执行 协程函数 得到的对象
1 2 3 async def func (): pass result = func()
1 2 3 4 5 6 7 8 import asyncioasync def func (): print('快来打我吧!' ) result = func() loop = asyncio.get_event_loop() loop.run_until_complete(result)
(3)await关键字
await 可等待的对象(协程对象、Future对象、Task对象)
1 2 3 4 5 6 7 8 '''示例一''' import asyncioasync def func (): print('来玩呀' ) response = await asyncio.sleep(2 ) print('结束' , response) asyncio.run(func())
1 2 3 4 5 6 7 8 9 10 11 12 13 14 '''示例二''' import asyncioasync def others (): print('start' ) await asyncio.sleep(2 ) print('end' ) return '返回值' async def func (): print('执行协程函数内部代码' ) response = await others() print('IO请求结束,结果为:' , response) asyncio.run(func())
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 '''示例三''' import asyncioasync def others (): print('start' ) await asyncio.sleep(2 ) print('end' ) return '返回值' async def func (): print('执行协程函数内部代码' ) response1 = await others() print('IO请求结束,结果为:' , response1) response2 = await others() print('IO请求结束,结果为:' , response2) asyncio.run(func())
(4)Task对象
Task对象官方文档
主要就是在事件循环中添加多个任务。
Task 用于并发调度协程,通过asyncio.create_task(协程对象)
的方式创建 Task 对象,这样可以让协程加入事件循环中等待被调度执行。除了使用 asyncio.create_task()
函数之外,还可以使用低层级的 loop.create_task()
或者 ensure_future()
函数,不建议手动实例化 Task 对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 '''示例1''' import asyncioasync def func (): print(1 ) await asyncio.sleep(2 ) print(2 ) return '返回值' async def main (): print('main开始' ) task1 = asyncio.create_task(func()) task2 = asyncio.create_task(func()) print('main结束' ) ret1 = await task1 ret2 = await task2 print(ret1,ret2) asyncio.run(main())
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 '''示例2''' import asyncioasync def func (): print(1 ) await asyncio.sleep(2 ) print(2 ) return '返回值' async def main (): print('main开始' ) task_list = [ asyncio.create_task(func(), name='n1' ), asyncio.create_task(func(), name='n2' ) ] print('main结束' ) done, pending = await asyncio.wait(task_list, timeout=None ) print(done) asyncio.run(main())
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 '''示例3''' import asyncioasync def func (): print(1 ) await asyncio.sleep(2 ) print(2 ) return '返回值' task_list = [ func(), func() ] done, pending = asyncio.run(asyncio.wait(task_list)) print(done)
(5)asyncio.Future对象
asyncio.Future官方文档
Task 对象继承 Future,Task 对象内部 await 结果的处理是基于 Future 对象来的。
1 2 3 4 5 6 7 8 9 10 11 12 13 '''示例1''' import asyncioasync def main (): loop = asyncio.get_running_loop() fut = loop.create_future() await fut asyncio.run(main())
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 '''示例2''' import asyncioasync def set_after (fut ): await asyncio.sleep(2 ) fut.set_result('666' ) async def main (): loop = asyncio.get_running_loop() await loop.create_task(set_after(fut)) data = await fut print(data) asyncio.run(main())
(6)concurrent.futures.Future对象
concurrent.futures官方文档
使用进程池或者线程池实现异步操作时用到的对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import timefrom concurrent.futures import Futurefrom concurrent.futures.thread import ThreadPoolExecutorfrom concurrent.futures.process import ProcessPoolExecutordef func (value ): time.sleep(1 ) print(value) pool = ThreadPoolExecutor(max_workers= 5 ) for i in range(10 ): fut = pool.submit(func, i) print(fut)
以后写代码或许还有交叉使用。例如:crm项目80%都是属于基于协程异步编程 + MySQL(不支持)【线程或者进程做异步编程】
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import timeimport asyncioimport concurrent.futuresdef func1 (): time.sleep(2 ) return "SB" async def main (): loop = asyncio.get_running_loop() fut = loop.run_in_executor(None , func1) result = await fut print('default thread pool' , result) asyncio.run(main())
(7)案例:asyncio + 不支持异步的模块
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import requestsimport asyncioasync def download_image (url ): print('开始下载' , url) loop = asyncio.get_event_loop() future = loop.run_in_executor(None , requests.get, url) response = await future print('下载完成' ) file_name = url.rsplit('-' )[-1 ] with open(file_name, mode='wb' ) as file_object: file_object.write(response.content) if __name__ == '__main__' : url_list = [ 'https://pic.netbian.com/uploads/allimg/210302/000706-1614614826df15.jpg' , 'https://pic.netbian.com/uploads/allimg/200910/200207-1599739327e5a8.jpg' , 'https://pic.netbian.com/uploads/allimg/190902/152344-1567409024af8c.jpg' ] tasks = [download_image(url) for url in url_list] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
(8)异步迭代器
什么是异步迭代器?
实现了 __aiter__()
和 __anext__()
方法的对象。__anext__()
必须返回一个 awaitable
对象。async for
会处理异步迭代器的 __anext__()
方法所返回的可等待对象,直到其引发一个 StopAsyncIteration
异常。由 PEP 492
引入。
什么是异步可迭代对象?
可在 async for
语句中被使用的对象。必须通过它的 __aiter__()
方法返回一个 asynchronous iterator
。由 PEP 492
引入。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import asyncioclass Reader (object ): '''自定义异步迭代器 (同时也是一部可迭代对象)''' def __init__ (self ): self.count = 0 async def readline (self ): self.count += 1 if self.count == 100 : return None return self.count def __aiter__ (self ): return self async def __anext__ (self ): val = await self.readline() if val == None : raise StopAsyncIteration return val async def func (): obj = Reader() async for item in obj: print(item) asyncio.run( func() )
(9)异步上下文管理器
此种对象通过定义 __aenter__()
和 __aexit__()
方法来对 async with
语句中的环境进行控制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import asyncioclass AsyncContextManager : def __init__ (self, conn=None ): self.conn = conn async def do_something (self ): return 666 async def __aenter__ (self ): self.conn = await asyncio.sleep(1 ) return self async def __aexit__ (self, exc_type, exc, tb ): await asyncio.sleep(1 ) async def func (): async with AsyncContextManager() as f: result = await f.do_something() print(result) asyncio.run(func())
4. uvloop
uvloop 是 asyncio 的事件循环的替代方案。事件循环 > 默认 asyncio 的事件循环。
pip install uvloop
1 2 3 4 5 6 7 8 import asyncioimport uvloopasyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) asyncio.run(...)
5. 实战案例
(1)异步 redis
在使用 python 代码操作 redis 时,链接/操作/断开都是网络IO。
pip install aioredis
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import asyncioimport aioredisasync def execute (address, password ): print('开始执行' , address) redis = await aioredis.create_redis(address, password = password) await redis.hmset_dict('car' , key1 = 1 , key2 = 2 , key3 = 3 ) result = await redis.hgetall('car' , encoding = 'utf-8' ) print(result) redis.close() await redis.wait_closed() print('结束' , address) asyncio.run(execute('redis://47.93.4.198:6379' , "root!2345" ))
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 '''示例2''' import asyncioimport aioredisasync def execute (address, password ): print('开始执行' , address) redis = await aioredis.create_pool(address, password = password) await redis.hmset_dict('car' , key1 = 1 , key2 = 2 , key3 = 3 ) result = await redis.hgetall('car' , encoding = 'utf-8' ) print(result) redis.close() await redis.wait_closed() print('结束' , address) task_list =[ execute('redis://47.93.4.197:6379' ,'root!2345' ), execute('redis://47.93.4.198:6379' ,'root!2345' ) ] asyncio.run(asyncio.wait(task_list))
(2)异步MySQL
pip3 install aiomysql
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 '''示例1''' import asyncioimport aiomysqlasync def execute (): conn = await aiomysql.connect(host='127.0.0.1' , port= 3306 , user = 'root' , password = '123' ,db= 'mysql' ) cur = await conn.cursor() await cur.execute('SELECT Host,User FROM user' ) result = await cur.fetchall() print(result) await cur.close() conn.close() asyncio.run(execute())
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 '''示例2''' import asyncioimport aiomysqlasync def execute (host, password ): print('开始' , host) conn = await aiomysql.connect(host = host, port= 3306 , user = 'root' , password = password,db= 'mysql' ) cur = await conn.cursor() await cur.execute('SELECT Host,User FROM user' ) result = await cur.fetchall() print(result) await cur.close() conn.close() print('结束' , host) task_list =[ execute('47.93.4.197:6379' ,'root!2345' ), execute('47.93.4.198:6379' ,'root!2345' ) ] asyncio.run(asyncio.wait(task_list))
(3)FastAPI框架
pip3 install fastapi
pip3 install uvicorn
1 2 3 4 5 6 7 8 9 10 11 12 13 14 '''示例''' import uvicornimport asynciofrom fastapi import FastAPIapp = FastAPI() @app.get("/") def index (): '''普通操作接口''' return {"message" :"Hello World" } if __name__ == '__main__' : uvicorn.run("luffy:app" ,host= '127.0.0.1' ,port= 5000 , log_level= 'info' )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 '''示例2''' from aioredis import Redisimport uvicornimport aioredisimport asynciofrom fastapi import FastAPIapp = FastAPI() REDIS_POOL = aioredis.ConnectionPool('redis://47.193.14.198:6379' , password= 'root123' , minsize = 1 , maxsize = 10 ) @app.get("/") def index (): '''普通操作接口''' return {"message" :"Hello World" } @app.get('/red') async def red (): print('请求来了' ) await asyncio.sleep(3 ) conn = await REDIS_POOL.acquire() redis = Redis(conn) await redis.hmset_dict('car' ,key1 = 1 ,key2 = 2 ,key3 =3 ) result = await redis.hgetall('car' , encoding ='utf-8' ) print(result) REDIS_POOL.release(conn) return result if __name__ == '__main__' : uvicorn.run("脚本名:app" ,host= '127.0.0.1' ,port= 5000 , log_level= 'info' )
(4)异步爬虫
pip3 install aiohttp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 '''使用aiohttp模块下载 协程方式''' import aiohttpimport asyncioasync def fetch (session, url ): print('发送请求:' , url) async with session.get(url, verify_ssl = False ) as response: text = await response.text() file_name = url.rsplit('-' )[-1 ] print('得到结果:' , url , len(text)) return text async def main (): async with aiohttp.ClientSession() as session: url_list = [ 'https://python.org' , 'https://www.baidu.com' , 'https://www.pythonav.com' ] tasks = [asyncio.create_task(fetch(session, url)) for url in url_list] done, pending = await asyncio.wait(tasks) if __name__ == '__main__' : asyncio.run(main())