Python高档编程-协程和异步IO

第十一章:Python高档编程-协程和异步IO

Python3高档中心技术97讲 笔记

目录
第十一章:Python高档编程-协程和异步IO
11.1 并发、并行、同步、异步、堵塞、非堵塞
11.2 C10K问题和IO多路复用(select、poll、epoll)
11.2.1 C10K问题
11.2.2 Unix下五种I/O模型
11.3 select+回调+事情循环
11.4 回调之痛
11.5 什么是协程
11.5.1 C10M问题
11.5.2 协程
11.6 生成器进阶-send、close和throw办法
11.7生成器进阶-yield from
11.8 yield from how
11.9 async和await
11.10 生成器完结协程
11.1 并发、并行、同步、异步、堵塞、非堵塞
并发

并发是指一个时间段内,有几个程序在同一个CPU上运转,可是恣意时间只要一个程序在CPU上运转。

并行

并行是指恣意时间点上,有多个程序一起运转在多个CPU上。

同步

同步是指代码调用IO操作是,有必要等候IO操作完结才回来的调用办法。

异步

异步是指代码调用IO操作是,不用等IO操作完结果回来的调用办法。

堵塞

堵塞是指调用函数时分当时线程被挂起。

非堵塞

堵塞是指调用函数时分当时线程不会被挂起,而是当即回来。

11.2 C10K问题和IO多路复用(select、poll、epoll)
11.2.1 C10K问题
如安在一颗1GHz CPU,2G内存,1gbps网络环境下,让单台服务器一起为一万个客户端供给FTP服务。

11.2.2 Unix下五种I/O模型
堵塞式IO

非堵塞IO

IO复用

信息驱动式IO

异步IO(POSIX的aio_系列函数

select、poll、epoll

select、poll、epoll都是IO多路复用的机制。IO多路复用便是经过一种机制,一个进程能够监督多个描述符,一旦某个描述符安排妥当(一般是读安排妥当或许写安排妥当),能够告诉程序进行相应的读写操作。可是select、poll、epoll本质上都是同步IO,由于他们都需求在读写时间安排妥当后自己担任进行读写,也便是说这个读写进程是堵塞的,而异步IO则无需自己担任进行读写,异步IO的完结会担任把数据从内核拷贝到用户空间。

select

select函数监督的文件描述符分为3类,分别是writefds、readfds、和exceptfds。调用后select函数会堵塞,直到有描述符安排妥当(有数据可读、可写、或许有except),或许超时(timeout指定等候时间,假如当即回来设为null即可),函数回来。当select函数回来后,能够经过遍历fdset,来找到安排妥当的描述符。

select现在简直在一切的渠道上支撑,其杰出跨渠道支撑也是他的一个长处。select的一个缺陷在于单个进程能够监督的文件描述符的数量存在最大约束,在Linux上一般为1024,能够经过修正宏界说乃至从头编译内核的办法提高这一约束,可是这样也会形成功率的下降。

poll

不同于select运用三个位图来表明三个fdset的办法,pollshiyongyigepollfd的指针完结。

pollfd结构包含了要监督的event和发作的event,不再运用select“参数-值”传递的办法。一起,pollfd并没有最大数量约束(可是数量过大后功能也是会下降)。和select函数相同,poll回来后,需求伦轮询pollfd来获取安排妥当的描述符

从上面看,select和poll都需求在回来后,经过遍历文件描述符来获取现已安排妥当的socket。事实上,一起衔接的很多客户端在一时间或许只要很少的处于安排妥当状况,因而跟着监督的描述符数量的增加,其功率也会线性下降。

epoll

epoll是在2.6内核中提出的,是之前的select和poll的增强版别。相对于select和poll来说,epoll愈加灵敏,没有描述符约束。epoll运用一个文件描述符办理多个描述符,将用户联系的文件描述符的事情存放到内核的一个事情表中,这样在用户空间和内核空间的copy只需一次。

11.3 select+回调+事情循环
Copy

1. epoll并不代表必定比select好

在并发高的状况下,衔接活泼度不是很高, epoll比select

并发性不高,一起衔接很活泼, select比epoll好

经过非堵塞io完结http恳求

import socket
from urllib.parse import urlparse

运用非堵塞io完结http恳求

def get_url(url):

#经过socket恳求html
url = urlparse(url)
host = url.netloc
path = url.path
if path == "":
path = "/"
#树立socket衔接
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.setblocking(False)
try:
client.connect((host, 80)) #堵塞不会耗费cpu
except BlockingIOError as e:
pass
#不断的问询衔接是否树立好, 需求while循环不断的去查看状况
#做核算使命或许再次建议其他的衔接恳求
while True:
try:
client.send("GET {} HTTP/1.1rnHost:{}rnConnection:closernrn".format(path, host).encode("utf8"))
break
except OSError as e:
pass
data = b""
while True:
try:
d = client.recv(1024)
except BlockingIOError as e:
continue
if d:
data += d
else:
break
data = data.decode("utf8")
html_data = data.split("rnrn")[1]
print(html_data)
client.close()

if name == "__main__":

get_url("http://www.baidu.com")

Copy

1. epoll并不代表必定比select好

在并发高的状况下,衔接活泼度不是很高, epoll比select

并发性不高,一起衔接很活泼, select比epoll好

经过非堵塞io完结http恳求

select + 回调 + 事情循环

并发性高

运用单线程

import socket
from urllib.parse import urlparse
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

selector = DefaultSelector()

运用select完结http恳求

urls = []
stop = False

class Fetcher:

def connected(self, key):
selector.unregister(key.fd)
self.client.send("GET {} HTTP/1.1rnHost:{}rnConnection:closernrn".format(self.path, self.host).encode("utf8"))
selector.register(self.client.fileno(), EVENT_READ, self.readable)
def readable(self, key):
d = self.client.recv(1024)
if d:
self.data += d
else:
selector.unregister(key.fd)
data = self.data.decode("utf8")
html_data = data.split("rnrn")[1]
print(html_data)
self.client.close()
urls.remove(self.spider_url)
if not urls:
global stop
stop = True
def get_url(self, url):
self.spider_url = url
url = urlparse(url)
self.host = url.netloc
self.path = url.path
self.data = b""
if self.path == "":
self.path = "/"
# 树立socket衔接
self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client.setblocking(False)
try:
self.client.connect((self.host, 80))  # 堵塞不会耗费cpu
except BlockingIOError as e:
pass
#注册
selector.register(self.client.fileno(), EVENT_WRITE, self.connected)

def loop():

#事情循环,不断的恳求socket的状况并调用对应的回调函数
#1. select自身是不支撑register形式
#2. socket状况改变今后的回调是由程序员完结的
while not stop:
ready = selector.select()
for key, mask in ready:
call_back = key.data
call_back(key)
#回调+事情循环+select(pollepoll)

if name == "__main__":

fetcher = Fetcher()
import time
start_time = time.time()
for url in range(20):
url = "http://shop.projectsedu.com/goods/{}/".format(url)
urls.append(url)
fetcher = Fetcher()
fetcher.get_url(url)
loop()
print(time.time()-start_time)

def get_url(url):

经过socket恳求html

url = urlparse(url)

host = url.netloc

path = url.path

if path == "":

path = "/"

树立socket衔接

client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

client.setblocking(False)

try:

client.connect((host, 80)) #堵塞不会耗费cpu

except BlockingIOError as e:

pass

不断的问询衔接是否树立好, 需求while循环不断的去查看状况

做核算使命或许再次建议其他的衔接恳求

while True:

try:

client.send("GET {} HTTP/1.1rnHost:{}rnConnection:closernrn".format(path, host).encode("utf8"))

break

except OSError as e:

pass

data = b""

while True:

try:

d = client.recv(1024)

except BlockingIOError as e:

continue

if d:

data += d

else:

break

data = data.decode("utf8")

html_data = data.split("rnrn")[1]

print(html_data)

client.close()

11.4 回调之痛
假如回调函数履行不正常该怎么?

假如回调里边还要嵌套回调该怎么办?要嵌套很多层怎么办?

假如嵌套了多层,其间某个环节出错了会形成什么结果?

假如有个数据需求被每个回调都处理怎么办?

....

可读性差
同享状况办理困难
反常处理困难
11.5 什么是协程
11.5.1 C10M问题
怎么运用8中心CPU,64G内存,在10gbps的网络上坚持1000万并发衔接

11.5.2 协程
Copy

def get_url(url):

do someting 1

html = get_html(url) #此处暂停,切换到另一个函数去履行

parse html

urls = parse_url(html)

def get_url(url):

do someting 1

html = get_html(url) #此处暂停,切换到另一个函数去履行

parse html

urls = parse_url(html)

传统函数调用 进程 A->B->C

咱们需求一个能够暂停的函数,而且能够在恰当的时分康复该函数的持续履行

呈现了协程 -> 有多个进口的函数, 能够暂停的函数, 能够暂停的函数(能够向暂停的当地传入值)

11.6 生成器进阶-send、close和throw办法
Copy
def gen_func():

#1. 能够产出值, 2. 能够接纳值(调用方传递进来的值)
html = yield "http://projectsedu.com"
print(html)
return "bobby"

1. throw, close

1. 生成器不只能够产出值,还能够接纳值

if name == "__main__":

gen = gen_func()
#在调用send发送非none值之前,咱们有必要发动一次生成器, 办法有两种1. gen.send(None), 2. next(gen)
url = gen.send(None)
#download url
html = "bobby"
print(gen.send(html)) #send办法能够传递值进入生成器内部,一起还能够重启生成器履行到下一个yield方位
print(gen.send(html))
#1.发动生成器办法有两种, next(), send
# print(next(gen))
# print(next(gen))
# print(next(gen))
# print(next(gen))

Copy
def gen_func():

#1. 能够产出值, 2. 能够接纳值(调用方传递进来的值)
try:
yield "http://projectsedu.com"
except BaseException:
pass
yield 2
yield 3
return "bobby"

if name == "__main__":

gen = gen_func()
print(next(gen))
gen.close()
print("bobby")
#GeneratorExit是承继自BaseException, Exception

Copy
def gen_func():

#1. 能够产出值, 2. 能够接纳值(调用方传递进来的值)
try:
yield "http://projectsedu.com"
except Exception as e:
pass
yield 2
yield 3
return "bobby"

if name == "__main__":

gen = gen_func()
print(next(gen))
gen.throw(Exception, "download error")
print(next(gen))
gen.throw(Exception, "download error")

11.7生成器进阶-yield from
Copy

python3.3新加了yield from语法

from itertools import chain

my_list = [1,2,3]
my_dict = {

"bobby1":"http://projectsedu.com",
"bobby2":"http://www.imooc.com",

}

yield from iterable

def g1(iterable):

yield iterable

def g2(iterable):

yield from iterable

for value in g1(range(10)):

print(value)

for value in g2(range(10)):

print(value)

def my_chain(args, *kwargs):

for my_iterable in args:
yield from my_iterable
# for value in my_iterable:
#     yield value

for value in my_chain(my_list, my_dict, range(5,10)):

print(value)

def g1(gen):

yield from gen

def main():

g = g1()
g.send(None)

1. main 调用方 g1(托付生成器) gen 子生成器

1. yield from会在调用方与子生成器之间树立一个双向通道

Copy
final_result = {}

def middle(key):

while True:

final_result[key] = yield from sales_sum(key)

print(key+"销量计算完结!!.")

def main():

data_sets = {

"bobby牌面膜": [1200, 1500, 3000],

"bobby牌手机": [28,55,98,108 ],

"bobby牌大衣": [280,560,778,70],

}

for key, data_set in data_sets.items():

print("start key:", key)

m = middle(key)

m.send(None) # 预激middle协程

for value in data_set:

m.send(value) # 给协程传递每一组的值 # 发送到字生成器里

m.send(None)

print("final_result:", final_result)

if name == '__main__':

main()

def sales_sum(pro_name):

total = 0
nums = []
while True:
x = yield
print(pro_name+"销量: ", x)
if not x:
break
total += x
nums.append(x)
return total, nums

if name == "__main__":

my_gen = sales_sum("bobby牌手机")
my_gen.send(None)
my_gen.send(1200)
my_gen.send(1500)
my_gen.send(3000)
try:
my_gen.send(None)
except StopIteration as e:
result = e.value
print(result)

11.8 yield from how
Copy

pep380

1. RESULT = yield from EXPR能够简化成下面这样

一些阐明

"""
_i:子生成器,一起也是一个迭代器
_y:子生成器出产的值
_r:yield from 表达式终究的值
_s:调用方经过send()发送的值
_e:反常目标

"""

_i = iter(EXPR) # EXPR是一个可迭代目标,_i其实是子生成器;
try:

_y = next(_i)   # 预激子生成器,把产出的榜首个值存在_y中;

except StopIteration as _e:

_r = _e.value   # 假如抛出了`StopIteration`反常,那么就将反常目标的`value`特点保存到_r,这是最简略的状况的回来值;

else:

while 1:    # 测验履行这个循环,托付生成器会堵塞;
_s = yield _y   # 出产子生成器的值,等候调用方`send()`值,发送过来的值将保存在_s中;
try:
_y = _i.send(_s)    # 转发_s,而且测验向下履行;
except StopIteration as _e:
_r = _e.value       # 假如子生成器抛出反常,那么就获取反常目标的`value`特点存到_r,退出循环,康复托付生成器的运转;
break

RESULT = _r # _r便是整个yield from表达式回来的值。

"""

  1. 子生成器或许仅仅一个迭代器,并不是一个作为协程的生成器,所以它不支撑.throw()和.close()办法;
  2. 假如子生成器支撑.throw()和.close()办法,可是在子生成器内部,这两个办法都会抛出反常;
  3. 调用方让子生成器自己抛出反常
  4. 当调用方运用next()或许.send(None)时,都要在子生成器上调用next()函数,当调用方运用.send()发送非 None 值时,才调用子生成器的.send()办法;
    """

_i = iter(EXPR)
try:

_y = next(_i)

except StopIteration as _e:

_r = _e.value

else:

while 1:
try:
_s = yield _y
except GeneratorExit as _e:
try:
_m = _i.close
except AttributeError:
pass
else:
_m()
raise _e
except BaseException as _e:
_x = sys.exc_info()
try:
_m = _i.throw
except AttributeError:
raise _e
else:
try:
_y = _m(*_x)
except StopIteration as _e:
_r = _e.value
break
else:
try:
if _s is None:
_y = next(_i)
else:
_y = _i.send(_s)
except StopIteration as _e:
_r = _e.value
break

RESULT = _r

"""
看完代码,咱们总结一下要害点:

  1. 子生成器出产的值,都是直接传给调用方的;调用方经过.send()发送的值都是直接传递给子生成器的;假如发送的是 None,会调用子生成器的__next__()办法,假如不是 None,会调用子生成器的.send()办法;
  2. 子生成器退出的时分,最终的return EXPR,会触发一个StopIteration(EXPR)反常;
  3. yield from表达式的值,是子生成器停止时,传递给StopIteration反常的榜首个参数;
  4. 假如调用的时分呈现StopIteration反常,托付生成器会康复运转,一起其他的反常会向上 "冒泡";
  5. 传入托付生成器的反常里,除了GeneratorExit之外,其他的一切反常悉数传递给子生成器的.throw()办法;假如调用.throw()的时分呈现了StopIteration反常,那么就康复托付生成器的运转,其他的反常悉数向上 "冒泡";
  6. 假如在托付生成器上调用.close()或传入GeneratorExit反常,会调用子生成器的.close()办法,没有的话就不调用。假如在调用.close()的时分抛出了反常,那么就向上 "冒泡",不然的话托付生成器会抛出GeneratorExit反常。

"""

11.9 async和await
Copy

python为了将语义变得愈加清晰,就引入了async和await要害词用于界说原生的协程

async def downloader(url):

return "bobby"

import types

@types.coroutine
def downloader(url):

yield "bobby"

async def download_url(url):

#dosomethings
html = await downloader(url)
return html

if name == "__main__":

coro = download_url("http://www.imooc.com")
# next(None)
coro.send(None)

11.10 生成器完结协程
Copy

生成器是能够暂停的函数

import inspect

def gen_func():

value=yield from

榜首回来值给调用方, 第二调用方经过send办法回来值给gen

return "bobby"

1. 用同步的办法编写异步的代码, 在恰当的时分暂停函数并在恰当的时分发动函数

import socket
def get_socket_data():

yield "bobby"

def downloader(url):

client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.setblocking(False)
try:
client.connect((host, 80))  # 堵塞不会耗费cpu
except BlockingIOError as e:
pass
selector.register(self.client.fileno(), EVENT_WRITE, self.connected)
source = yield from get_socket_data()
data = source.decode("utf8")
html_data = data.split("rnrn")[1]
print(html_data)

def download_html(html):

html = yield from downloader()

if name == "__main__":

#协程的调度依然是 事情循环+协程形式 ,协程是单线程形式
pass

作者: coderchen01

出处:https://www.cnblogs.com/xunjishu/p/12864396.html