自定义异步IO模块

高性能爬虫

假设有3个url需要发请求。

串行

import requests

urls_list = [
'https://www.cnblogs.com/longyunfeigu/p/9491496.html',
'https://dig.chouti.com/',
'https://dig.chouti.com/r/pic/hot/1'
]

for url in urls_list:
requests.get(url)

串行肯定是最慢的,怎么改进系统运维面试题及答案?第一反应:开多个线程。好吧,lowB 第一反应都是这个方法

多线程

import time
from concurrent.futures import ThreadPoolExecutor
import requests

urls_list = [
'https://www.cnblogs.com/longyunfeigu/p/9491496.html',
'https://dig.chouti.com/',
'https://dig.chouti.com/r/pic/hot/1'
]

start_time = time.time()
pool = ThreadPoolExecutor(3)
def task(url):
requests.get(url)
print('ending')

for i in urls_list:
pool.submit(task, i)
pool.shutdown()
end_time = time.time()
print(end_time - start_time)

开多个线程虽然可以实现并发,但是开线程总归是要耗费资源的,那能不能利用一个线程帮忙提高效率呢,这就需要异步数据漫游非阻塞登场了

异步非阻塞

什么是异步非阻塞?异步非阻塞就是 异步+非阻塞。异步就是回调,非阻塞指的是单个任务不等待。

在网络IO中,有两个阶段会出现"浪费时间"的情况,一个阶段是connect的时候,请求发出去等待信息回来通知这个客户端socket就绪可以发http报文信息。

另一个阶段是recv接受数据的时系统运维工程师面试问题及答案候(发送数据不需要等待,直接发就行)需要等待,因为服务端需通过网络把数据发送过来。非阻多路复用塞就是不再等待,connectlinux必学的60个命令原先需要等待是吧,ok现在我不等了,

recv需要等待是吧,ok我也不等了。linux操作系统基础知识不等报错咋办?报错linux常用命令就报错呗,大不了异常捕获就行。如果是非阻塞socket,那么3个urllinux是什么操作系统的请求在connect的时候都发出去了,注意,即使报错,请求也是如弦上linux之箭射出去了。

那么等connect成功后就应该发多路复用是指送数据了呀,这时候就体现出回调了,成功就回来调用一段代码。所有的抽象的话语都可以结合大致的代码来理解。以后如果遇linux常用命令到一些抽象话语不好理解,那么就应该用代码去理解这个抽象话语。

有了异步非阻塞的概念是,那么我们就可以利用一个线程把所有的连接都发送出去,等连接都发送出去而且任意一个connect的信息都没返回的数据恢复时候,线程就只多路复用名词解释好等待了,

等有connect的信息返回表示这个客户端socket准备就绪可以发http报文了,这时候再去执行发送数据的代码。这个过程说起来简单,但有一多路复用不包括个问题还不清晰:程序怎么知道哪个socket是就绪的?这个大致有2种解决方式:要么是通知(叫醒服务,这个更偏向底层,我们写的应用程序代码可以利用底层已经实现好的技术);要么是用whlinux删除文件命令ile死循环不断去检测 。python中有一些模块已经帮我们linux系统实现了异步非阻塞的功能。

twisted多路复用器

from twisted.web.client import getPage, defer
from twisted.internet import reactor


def all_done(arg):
reactor.stop()


def callback(contents):
print(contents)


deferred_list = []

url_list = ['http://www.bing.com', 'http://www.baidu.com', ]
for url in url_list:
deferred = getPage(bytes(url, encoding='utf8'))
# 次数使用回调函数,猜测用到了异步功能
deferred.addCallback(callback)
deferred_list.append(deferred)

dlist = defer.DeferredList(deferred_list)
dlist.addBoth(all_done)
# 死循环,不断地处理deferred_list里的socket对象,按理说处理完成就应该自动停止了,但可能是这个框架架构设计问题,不能做到自动停止,需要手动停
reactor.run()

gevent

gevent 是基于greenlet做的,greenlet实现了协程。协程和线程不一样,协程不是真实存在的,多路复用技术分为三种是程序员伪造出来的具有类似于线程的切换效果。单纯的协程不能完成提高效率的方式,如greenlet需要手动切换,所以这出现了gevent。

gevent 可以在遇到IO阻塞的时候自动切换协程linux是什么操作系统的运行,这样就可以提升效率。而协程切来切去的功能正好可以利用过来实现异步非阻塞

import gevent
from gevent import monkey

monkey.patch_all()
import requests

def fetch_async(method, url, req_kwargs):
print(method, url, req_kwargs)
response = requests.request(method=method, url=url, **req_kwargs)
print(response.url)

# ##### 发送请求 #####
gevent.joinall([
gevent.spawn(fetch_async, method='get', url='https://www.cnblogs.com/longyunfeigu/', req_kwargs={}),
gevent.spawn(fetch_async, method='get', url='https://www.baidu.com/', req_kwargs={}),
gevent.spawn(fetch_async, method='get', url='https://github.com/', req_kwargs={}),
])

reques数据分析ts模块发请求,内部也是用socket,先去connect,然后发送请求,最后收到响应数据结构

asyncio

asyncio默认只支持发tcp层的报文,想要发http请求,就需要自己linux必学的60个命令封装http报文

import asyncio

@asyncio.coroutine
def fetch_async(host, url='/'):
print(host, url)
reader, writer = yield from asyncio.open_connection(host, 80)

request_header_content = """GET %s HTTP/1.0\r\nHost: %s\r\n\r\n""" % (url, host,)
request_header_content = bytes(request_header_content, encoding='utf-8')

writer.write(request_header_content)
yield from writer.drain()
text = yield from reader.read()
print(host, url, text)
writer.close()

tasks = [
fetch_async('www.cnblogs.com', '/longyunfeigu/'),
fetch_async('baidu.com', '/')
]

loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

自定义IO模块

运用知识:非阻塞socket + IO多路复非阻塞

from socket import *
import select
import time

class HttpContext(object):
def __init__(self, sock, url_dict):
import time
self._start_time = time.time()
self.sock = sock
self.port = ''
self.host = ''
self.method = ''
self.data = ''
self.path = ''
self.content = b''
self.text = ''
self.timeout = 0
self.callback = None
self.initial(url_dict)

def initial(self, url_dict):
self.port = url_dict.get('port')
self.host = url_dict.get('host')
self.method = url_dict.get('method')
self.data = url_dict.get('data')
self.path = url_dict.get('path')
self.timeout = url_dict.get('timeout', 5)
self.callback = url_dict.get('callback')

def connect(self):
self.sock.connect((self.host, self.port))

def fileno(self):
return self.sock.fileno()

def send_get(self):
content = """%s %s HTTP/1.0\r\nHost: %s\r\n\r\n"""%(
self.method.upper(), self.path, self.host)
self.sock.sendall(bytes(content, encoding='utf8'))

def sendall(self):
if self.method.upper() == 'GET':
self.send_get()
elif self.method.upper() == 'POST':
self.send_post()
else:
pass

def send_post(self):
content = """%s %s HTTP/1.0\r\nHost: %s\r\n\r\n%s"""%(
self.method.upper(), self.path, self.host, self.data)
self.sock.sendall(bytes(content, encoding='utf8'))

def recv(self):
import time
while 1:
data = self.sock.recv(8096)
if not data:
break
self.content += data
self.text += str(data, encoding='utf8')
time.sleep(0.1)
self.finish()

def finish(self, msg=''):
if msg:
self.text = msg
self.content = bytes(msg, encoding='utf8')
if self.callback:
self.callback(self.text)


class AsynchRequest(object):
def __init__(self):
self.conn_socket_list = []
self.recv_socket_list = []

def add_request(self, **url_dict):
# 立即发起connect连接
soc = socket(AF_INET,SOCK_STREAM)
soc.setblocking(0)
ctx = HttpContext(soc, url_dict)
self.conn_socket_list.append(ctx)
self.recv_socket_list.append(ctx)
try:
ctx.connect()
except BlockingIOError as e:
pass
# print('request is sended')

def check_timeout(self):
# 检验是否超时
ctime = time.time()
for ctx in self.recv_socket_list:
if ctx._start_time + ctx.timeout <= ctime:
self.recv_socket_list.remove(ctx)
self.conn_socket_list.remove(ctx)
ctx.finish('connect超时')

def run(self):
while 1:
# r_list 代表socket对象是否有数据可以读, w_list代表socket是否可以写,也就是是否可以发送数据,写服务端程序一般只需要用到 r_list
# IO多路复用监听的对象不一定是socket对象,只要对象有fileno方法都能监听,内部也是拿对象的fileno的返回值来监听
r_list, w_list, e_list = select.select(self.conn_socket_list, self.recv_socket_list, [], 0.05)
for w in w_list:
w.sendall()
self.recv_socket_list.remove(w)
for r in r_list:
r.recv()
self.conn_socket_list.remove(r)
if not self.conn_socket_list:
break
self.check_timeout()

def callback(response):
print(response)

url_list = [
{'host': 'www.baidu.com','port': 80, 'path':'/', 'name':'baidu', 'method':'GET', 'callback': callback},
{'host': 'cn.bing.com','port': 80,'path':'/', 'name':'chouti', 'method':'GET'},
]
if __name__ == '__main__':
obj = AsynchRequest()
for i in url_list:
obj.add_request(**i)
obj.run()

这里的自定义IO模块是站在客户端的角度来定义的,借助底层的IO多路复用来帮我们检测socket是否已经准备好(不用我们手动写死循环去检测socket对象是否已经准备好)