Socket粘包问题终极解决方案—Netty版(2W字)!(5)

Netty 使用

对 Netty 有了大概的认识之后,接下来我们用 Netty 来编写一个基础的通讯服务器,它包含两个端:服务器端和客户端,客户端负责发送消息,服务器端负责接收并打印消息,具体的实现步骤如下。

1.添加 Netmaven命令ty阿平赶海maven是干嘛的

首先我们需要先添加 Nejava培训tty 框架的支持,如果是 Maven 项目添加如下配置即可:

<!-- 添加 Netty 框架 -->
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.56.Final</version>
</dependency>
Netty 版本说明

Netty 的 3.x 和 4.x 为主流的稳定maven配置版本,而最新的 5.x 已经是放弃的测试版了,因此推荐使用 Netty 4.x 的最新稳定版。

2. 服务器端实现代码

按照官方的推荐,这里将服务器端的代码分为以下 3 个部分:

  • MyNettyServer:服务器端的核心业务代码;
  • ServerInitializer:服务器端通道ÿ阿飘是什么意思08;Channel)初始化&#xapi接口ff1maven命令b;
  • ServerHandler:服务器端接收到信息之后的处理逻辑。

PS:Channel 字面意阿片类药物思为“通道”,它是网络通信的载体。Channel 提供了基本的 API 用于网络 I/O 操作,如 register、bind、connect、read、write、flush 等。Netty 自己实api接口现的 Channel 是以 JDK NIO Channel 为基础的,相比较于 JDK NIO,Netty 的 Channel 提供了更高层次的抽象,同时屏蔽了底层 Socket 的复杂性,赋予了 Channel 更加强大的功能,你在使用 Netty 时基本不需要再与 Java Socket 类直接打交道。

服务器端的实现代码如下:

// 定义服务器的端口号
static final int PORT = 8007;
/**
 * 服务器端
 */
static class MyNettyServer {
    public static void main(String[] args) {
        // 创建一个线程组,用来负责接收客户端连接
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 创建另一个线程组,用来负责 I/O 的读写
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 创建一个 Server 实例(可理解为 Netty 的入门类)
            ServerBootstrap b = new ServerBootstrap();
            // 将两个线程池设置到 Server 实例
            b.group(bossGroup, workerGroup)
                    // 设置 Netty 通道的类型为 NioServerSocket(非阻塞 I/O Socket 服务器)
                    .channel(NioServerSocketChannel.class)
                    // 设置建立连接之后的执行器(ServerInitializer 是我创建的一个自定义类)
                    .childHandler(new ServerInitializer());
            // 绑定端口并且进行同步
            ChannelFuture future = b.bind(PORT).sync();
            // 对关闭通道进行监听
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 资源关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
/**
 * 服务端通道初始化
 */
static class ServerInitializer extends ChannelInitializer<SocketChannel> {
    // 字符串编码器和解码器
    private static final StringDecoder DECODER = new StringDecoder();
    private static final StringEncoder ENCODER = new StringEncoder();
    // 服务器端连接之后的执行器(自定义的类)
    private static final ServerHandler SERVER_HANDLER = new ServerHandler();
    /**
     * 初始化通道的具体执行方法
     */
    @Override
    public void initChannel(SocketChannel ch) {
        // 通道 Channel 设置
        ChannelPipeline pipeline = ch.pipeline();
        // 设置(字符串)编码器和解码器
        pipeline.addLast(DECODER);
        pipeline.addLast(ENCODER);
        // 服务器端连接之后的执行器,接收到消息之后的业务处理
        pipeline.addLast(SERVER_HANDLER);
    }
}
/**
 * 服务器端接收到消息之后的业务处理类
 */
static class ServerHandler extends SimpleChannelInboundHandler<String> {
    /**
     * 读取到客户端的消息
     */
    @Override
    public void channelRead0(ChannelHandlerContext ctx, String request) {
        if (!request.isEmpty()) {
            System.out.println("接到客户端的消息:" + request);
        }
    }
    /**
     * 数据读取完毕
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }
    /**
     * 异常处理,打印异常并关闭通道
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}