Websocket 学习总结

Websocket 学习总结

简介:

概念:

维基百科:WebSocket是一种网络传输协议,可在单个TCP连接上进行全双工通信,位于OSI模型的应用层。WebSocket协议在2011年由IETF标准化为RFC 6455,后由RFC 7936补充规范。Web IDL中的WebSocket API由W3C标准化。

WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器服务器只需要完成一次握手,两者之间就可以创建持久性的连接,并进行双向数据传输。

和HTTP的关系:

​ 和HTTP一样,都处于OSI模型的第七层(应用层)。都依赖TCP作为传输层协议。虽然它们不同,但RFC 6455规定:“WebSocket设计为通过80和443端口工作,以及支持HTTP代理和中介”,从而使其与HTTP协议兼容。 为了实现兼容性,WebSocket握手使用HTTP Upgrade头[1]从HTTP协议更改为WebSocket协议

总结为,Websocket通过HTTP的方式进行握手(升级/upgrade),握手之后就和HTTP没啥关系了。是TCP通讯

历史背景:

在以往的Web技术中,为了实现服务端数据的推送。采用的是HTTP轮询。其中分为定期轮询和长轮询。

  1. 定期轮询:客户端定期向服务端询问数据,一直到请求到服务端的数据为止。
  2. 长轮询:客户端向服务端发送请求,服务端等待,数据准备好后,返回给客户端。

以上方案,简单易实现,在很多负载不高的初期系统上有大量应用,但有几个明显的缺陷

  1. 定期轮询会产生很多空轮询(无效查询/无效负载),HTTP本身头部长,占用网络带宽。
  2. 长轮询,服务端要维护很多请求,等待数据的返回,负载很高。(我个人理解是客户端变成了服务端轮询)
  3. 时效性不高,数据无法实施推送到客户端。

为了解决以上问题,Websocket应运而生。

优点

  • 较少的控制开销。在连接创建后,服务器和客户端之间交换数据时,用于协议控制的数据包头部相对较小。在不包含扩展的情况下,对于服务器到客户端的内容,此头部大小只有2至10字节(和数据包长度有关);对于客户端到服务器的内容,此头部还需要加上额外的4字节的掩码。相对于HTTP请求每次都要携带完整的头部,此项开销显著减少了。
  • 更强的实时性。由于协议是全双工的,所以服务器可以随时主动给客户端下发数据。相对于HTTP请求需要等待客户端发起请求服务端才能响应,延迟明显更少;即使是和Comet等类似的长轮询比较,其也能在短时间内更多次地传递数据。
  • 保持连接状态。与HTTP不同的是,Websocket需要先创建连接,这就使得其成为一种有状态的协议,之后通信时可以省略部分状态信息。而HTTP请求可能需要在每个请求都携带状态信息(如身份认证等)。
  • 更好的二进制支持。Websocket定义了二进制帧,相对HTTP,可以更轻松地处理二进制内容。
  • 可以支持扩展。Websocket定义了扩展,用户可以扩展协议、实现部分自定义的子协议。如部分浏览器支持压缩等。
  • 更好的压缩效果。相对于HTTP压缩,Websocket在适当的扩展支持下,可以沿用之前内容的上下文,在传递类似的数据时,可以显著地提高压缩率。

URI-Scheme:

ws/wss: 分别对应不启用/启用TLS的websocket

典型应用:

  • 聊天室/IM
  • 多玩家游戏
  • 弹幕
  • 协同编辑
  • 股票/基金实时报价
  • 视频会议/聊天
  • 基于位置的应用
  • ...

案例介绍:

基于go实现的千万级websocket消息弹幕系统--来自慕课@小鱼儿老师分享的,做一个总结,代码我重写了

弹幕系统的业务复杂度:

​ 假设1个直播间,100万人在线,每秒的弹幕数1000条

​ 推送频率=100w*1000=10亿级推送

​ 那么平台N个直播间,对应(*N)级

技术选型:

  • NodeJS:由于前端同学的熟悉和偏爱,本身websocket可以说是浏览器编程技术。所以很自然关联用nodejs.但是它是单线程模型。推送能力有限,不适合此场景
  • C/C++: 编程门槛太高,编程接口不够高度抽象。不适合业务开发
  • Go:语言先天支持高并发,性能高,websocket也是标准库的一部分。最终确定用它!

golang实现:

HTTP服务:

一个空接口:

func main() {
http.HandleFunc("/ws", wsHandler)
http.ListenAndServe(":9999", nil)
}
func wsHandler(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("HELLO"))
}
Websocket握手
var upgrader = websocket.Upgrader{
// 允许跨域,这个回调函数需要特别指定下。
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func wsHandler(w http.ResponseWriter, r *http.Request) {
//握手
wsConn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
//错误处理
defer func(err error) {
if err != nil {
//关闭连接
wsConn.Close()
}
}(err)
//循环读客户端,简单版本
for {
// 协议规定了一些消息类型,text,binary,close等
msgType, msg, err := wsConn.ReadMessage()
if err != nil {
return
}
//读取的消息回写给客户端
if err = wsConn.WriteMessage(msgType, msg); err != nil {
return
}
}
}

打开浏览器调试工具,可以使用在线测试工具,调用ws://localhost:9999/ws,在连接阶段可以看到ws请求,在

request,response中均有Upgrade: websocket的headler,且返回码是101 Swiching Protocols

封装Websocoket

上述小节的样例仍有问题,缺乏工程化的设计:

  1. 其他代码模块,无法直接操作websocket连接。
  2. websocket连接非线程安全,并发读写需要同步手段。

所以定义一个Connection结构,隐藏底层连接

封装Connection的API,对外提供Send/Read/Close的线程安全接口

设计原理:

  1. 采用线程安全的channel来做消息的收发缓存。分别定义inChan(收) outChan(发)
  2. 分别启动读协程和写协程来负责消息的收发。

conn.go

package conn
import (
"sync"
"errors"
"github.com/gorilla/websocket"
)
type Connection struct {
WsConn    *websocket.Conn
inChan    chan []byte //缓存读取的消息
outChan   chan []byte //缓存发送的消息
closeChan chan byte   //独写线程同步连接状态
mux       sync.Mutex
isClosed  bool //标记是否被关闭,保证closeChan只被关闭一次
}
//InitConnection 初始化一个连接
func InitConnection(wc *websocket.Conn) (conn *Connection, err error) {
conn = &Connection{
WsConn:    wc,
inChan:    make(chan []byte, 1024),
outChan:   make(chan []byte, 1024),
closeChan: make(chan byte, 1024),
}
// 启动读协程
go conn.readLoop()
// 启动写协程
go conn.writeLoop()
return
}
// --API--
//ReadMsg 收消息
func (conn *Connection) ReadMsg() (data []byte, err error) {
select {
case data = <-conn.inChan:
case <-conn.closeChan:
err = errors.New("connection is closed")
}
return
}
//SendMsg 发消息
func (conn *Connection) SendMsg(data []byte) (err error) {
select {
case conn.outChan <- data:
case <-conn.closeChan:
err = errors.New("connection is closed")
}
return
}
//SendMsg 发消息
func (conn *Connection) Close() (err error) {
// 关闭连接,线程安全,可重入的Close
conn.WsConn.Close()
// 关闭此通道,但注意这个只能被调用一次
// 加锁操作
conn.mux.Lock()
if !conn.isClosed {
close(conn.closeChan)
conn.isClosed = true
}
conn.mux.Unlock()
return
}
// --内部实现--
func (conn *Connection) readLoop() {
var (
data []byte
err  error
)
defer func() {
if err != nil {
conn.Close()
}
}()
for {
if _, data, err = conn.WsConn.ReadMessage(); err != nil {
//读失败
return
}
select {
case conn.inChan <- data:
case <-conn.closeChan:
return
}
}
}
func (conn *Connection) writeLoop() {
var (
data []byte
err  error
)
defer func() {
if err != nil {
conn.Close()
}
}()
for {
select {
case data = <-conn.outChan:
case <-conn.closeChan:
return
}
if err = conn.WsConn.WriteMessage(websocket.TextMessage, data); err != nil {
//写失败
return
}
}
}

main.go

package main
import (
"net/http"
"time"
"./conn"
"github.com/gorilla/websocket"
)
func main() {
http.HandleFunc("/ws", wsHandler)
http.ListenAndServe(":9999", nil)
}
var upgrader = websocket.Upgrader{
// 允许跨域
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func wsHandler(w http.ResponseWriter, r *http.Request) {
wsConn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
//错误处理
defer func() {
if err != nil {
//关闭连接
wsConn.Close()
}
}()
c, err := conn.InitConnection(wsConn)
if err != nil {
return
}
// 简单业务,将客户端消息回写,同时给客户端发心跳
go func() {
for {
if err := c.SendMsg([]byte("heartbeat")); err != nil {
return
}
time.Sleep(time.Second)
}
}()
for {
data, err := c.ReadMsg()
if err != nil {
return
}
if err = c.SendMsg(data); err != nil {
return
}
}
}

架构解密:

技术难点:

高吞吐量的弹幕系统的系统瓶颈:

  1. 内核瓶颈,Linux内核处理TCP包的极限包频≈100w/s,而消息量大于这个数。
  2. 锁瓶颈

    1. 通常用户集是个大的map
    2. 推送消息需要遍历整个集合,耗时长
    3. 推送期间,用户在上/下线,需要上锁
  3. CPU瓶颈

    1. 浏览器与服务端的通信通常是json通信
    2. json序列化/反序列化非常耗费CPU
技术难点的解决方案:
  1. 减少网络小包(参考mtu/mss大小,几百字节内都算)的发送。将将消息进行拼接。每秒对单个用户发送一个消息
  2. 锁瓶颈优化:大拆小,将长连接hash打散到多个集合。每个集合有自己的锁。读写锁替代互斥锁
  3. json优化是:json处理一次,发送N次。
分布式扩展
  • 单机场景:
  • 分布式:

    • 逻辑集群负责广播,采用http/2用作rpc性能更好
    • 网关层其实就对应单机版,我们上边实现的消息服务
    • 逻辑集群还暴露更加通用的推送接口http/1.1。

延申话题:

感谢@小鱼儿老师。关于技术/思想/精神的探讨。不断学习、实践、总结的过程。最终才能融汇贯通。

基于长连接代理服务。下期见。