My logo
Published on

Netty框架-原理解析和实战

一,快速第一个Netty 应用

1.1 Netty 核心概念

1.1.1 核心组件

  • 事件模型
  • 字节缓冲区
  • 通信API

1.1.2 传输服务

  • NIO
  • epoll
  • OIO
  • 本地
  • 内嵌

1.2 实战应用

1.2.1 开发Echo 协议的服务器

编写管道处理器:

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + " -> Server :" + msg);

        // 写消息到管道
        ctx.write(msg); //写消息
        ctx.flush(); // 冲刷消息

        // 上面两个方法等同于 ctx.writeAndFlush(msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 当出现异常就关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}

编写服务器主程序:

public class EchoServer {

    public static int DEFAULT_PORT = 7;

    public static void main(String[] args) throws Exception {
        int port;

        try {
            port = Integer.parseInt(args[0]);
        } catch (RuntimeException ex) {
            port = DEFAULT_PORT;
        }

        // 多线程时间循环器
        NioEventLoopGroup boosGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // 启动NIO 服务的引导程序类
            ServerBootstrap b = new ServerBootstrap();

            // 设置 EventLoopGroup
            b.group(boosGroup, workerGroup)                                 // 设置EventLoopGroup
                    .channel(NioServerSocketChannel.class)                  // 指明新的Channel的类型
                    .childHandler(new EchoServerHandler())                  // 指定ChannelHandler
                    .option(ChannelOption.SO_BACKLOG, 128)           // 设置的ServerChannel的一些选项
                    .childOption(ChannelOption.SO_KEEPALIVE, true);  // 设置的ServerChannel的子Channel的选项

            // 绑定端口,开始接收进来的连接
            ChannelFuture f = b.bind(port).sync();
            System.out.println("EchoServer已启动,端口:" + port);

            // 等待服务器 socket 关闭 。
            // 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
            f.channel().closeFuture().sync();
        } finally {

            // 优雅的关闭
            workerGroup.shutdownGracefully();
            boosGroup.shutdownGracefully();
        }
    }
}

1.2.2 开发Echo 协议的客户端

编写管道处理器:

public class EchoClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        // 从管道读消息
        ByteBuf buf = (ByteBuf) msg;    // 转为ByteBuf类型
        String m = buf.toString(CharsetUtil.UTF_8);//转换为字符串

        System.out.println("echo: " + m);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 当出现异常就关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}

编写客户端主程序:

public final class EchoClient {

    public static void main(String[] args) {
        if (args.length != 2) {
            System.err.println("用法: java EchoClient <host name> <port number>");
            System.exit(1);
        }

        String hostName = args[0];
        int portNumber = Integer.parseInt(args[1]);

        // 配置客户端
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new EchoClientHandler());

            // 连接到服务器
            ChannelFuture future = bootstrap.connect(hostName, portNumber).sync();
            Channel channel = future.channel();
            ByteBuffer writeBuffer = ByteBuffer.allocate(32);
            try (BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in))) {
                String userInput;
                while ((userInput = stdIn.readLine()) != null) {
                    writeBuffer.put(userInput.getBytes());
                    writeBuffer.flip();
                    writeBuffer.rewind();

                    // 转为ByteBuf
                    ByteBuf buf = Unpooled.copiedBuffer(writeBuffer);
                    // 写消息到管道
                    channel.writeAndFlush(buf);
                    // 清理缓存
                    writeBuffer.clear();
                }
            }
        } catch (UnknownHostException e) {
            System.err.println("不明主机,主机名为: " + hostName);
            System.exit(1);
        } catch (IOException e) {
            System.err.println("不能从主机中获取I/O,主机名为:" + hostName);
            System.exit(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 优雅的关闭
            group.shutdownGracefully();
        }
    }
}

二,Netty 架构设计

2.1 Selector 模型

2.1.2 SelectableChannel

Channel 注册到 Selector

2.1.3 SelectionKey

遍历 SelectionKey

2.1.2 事件驱动

Channel
回调
Future
事件及处理器

2.1.3 责任链模式

ChannlePipeline

三,Netty Channel 功能和接口

Channel(管道):代表网络socket 或能够进行IO 操作的连接关系。

  • Channel 接口
  • ChannelOutBoundInvoker 接口
  • AttributeMap 接口
  • ChannelHandler 接口
    • ChannelHandler 接口子类:ChannelInBoundHandler,ChannelOutsBoundHandler
  • ChannelHandlerAdapter 抽象类
    • ChannelHandlerAdapter方法及其子类:ChannelInBoundHandlerAdapter,ChannelOutsBoundHandlerAdapter
  • ChannelPipeline 接口
  • ChannleHandlerContext 接口
    • ChannleHandlerContext 接口是联系ChannelHandler 与其 ChannelPipeline 之间的纽带。
    • ChannelHandlerContext与其他组件关系图:

netty

  • NioEventLoopGroup 类
  • NioServerSocketChannel 类
  • OioEventLoopGroup 类
  • OioServerSocketChannel 类
  • EpollEventLoopGroup 类
    • 只能支持epoll 的Linux 平台上使用
  • EpollServerSocketChannel 类
  • LocalEventLoopGroup 类
  • LocalServerChannel 类

四,Netty 字节缓冲区

网络数据传输的基本单位是字节,缓冲区就是存储字节的容器。

4.1 ByteBuffer 实现原理

可以参考 SS 里面讲解的IO 关于ByteBuffer 知识
使用案例:

public class ByteBufferDemo {

    public static void main(String[] args) {
        // 创建一个缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(10);
        System.out.println("------------初始时缓冲区------------");
        printBuffer(buffer);

        // 添加一些数据到缓冲区中
        System.out.println("------------添加数据到缓冲区------------");
        String s = "love";
        buffer.put(s.getBytes());
        printBuffer(buffer);

        // 切换成读模式
        System.out.println("------------执行flip切换到读取模式------------");
        buffer.flip();
        printBuffer(buffer);

        // 读取数据
        System.out.println("------------读取数据------------");
        // 创建一个limit()大小的字节数组(因为就只有limit这么多个数据可读)
        byte[] bytes = new byte[buffer.limit()];
        // 将读取的数据装进我们的字节数组中
        buffer.get(bytes);
        printBuffer(buffer);

        // 执行compact
        System.out.println("------------执行compact------------");
        buffer.compact();
        printBuffer(buffer);

        // 执行clear
        System.out.println("------------执行clear清空缓冲区------------");
        buffer.clear();
        printBuffer(buffer);
    }

    /**
     * 打印出ByteBuffer的信息
     *
     * @param buffer
     */
    private static void printBuffer(ByteBuffer buffer) {
        System.out.println("mark:" + buffer.mark());
        System.out.println("position:" + buffer.position());
        System.out.println("limit:" + buffer.limit());
        System.out.println("capacity:" + buffer.capacity());
    }
}

4.2 ByteBuf 实现原理

ByteBuf 通过两个位置指针来协助缓冲区的读写操作,分别是readIndex和writerIndex。和capacity有一下关系:
0 ≤ readIndex ≤ writerIndex ≤ capacity
使用案例:

public class ByteBufDemo {

    public static void main(String[] args) {
        // 创建一个缓冲区
        ByteBuf buffer = Unpooled.buffer(10);
        System.out.println("------------初始时缓冲区------------");
        printBuffer(buffer);

        // 添加一些数据到缓冲区中
        System.out.println("------------添加数据到缓冲区------------");
        String s = "love";
        buffer.writeBytes(s.getBytes());
        printBuffer(buffer);

        // 读取数据
        System.out.println("------------读取数据------------");
        while (buffer.isReadable()) {
            System.out.println(buffer.readByte());
        }
        printBuffer(buffer);

        // 执行 compact
        System.out.println("------------执行discardReadBytes------------");
        buffer.discardReadBytes();
        printBuffer(buffer);

        // 执行clear
        System.out.println("------------执行clear清空缓冲区------------");
        buffer.clear();
        printBuffer(buffer);
    }

    /**
     * 打印出ByteBuf的信息
     *
     * @param buffer
     */
    private static void printBuffer(ByteBuf buffer) {
        System.out.println("readerIndex:" + buffer.readerIndex());
        System.out.println("writerIndex:" + buffer.writerIndex());
        System.out.println("capacity:" + buffer.capacity());
    }
}

4.3 ByteBuf 的三种使用模式

4.3.1 堆缓冲区模式(Heap Buffer)

堆缓冲区模式又称为“支撑数组”(Backing Array),其数据是存放在JVM 的堆空间,通过将数据存储在数组中实现。

  • 优点:可以快速创建和释放,并且提供了数组直接快速访问的方法。
  • 缺点:每次数据与IO进行传输时,都需要将数据复制到直接缓冲区。

代码示例:

public class ByteBufHeapBufferDemo {

    public static void main(String[] args) {

        // 创建一个堆缓冲区
        ByteBuf buffer = Unpooled.buffer(10);
        String s = "waylau";
        buffer.writeBytes(s.getBytes());

        // 检查是否是支撑数组
        if (buffer.hasArray()) {
            // 获取支撑数据的引用
            byte[] array = buffer.array();
            // 计算第一个字节的偏移量
            int offset = buffer.readerIndex() + buffer.arrayOffset();
            // 可读字节数
            int length = buffer.readableBytes();
            printBuffer(array, offset, length);
        }
    }

    /**
     * 打印出Buffer的信息
     *
     * @param buffer
     */
    private static void printBuffer(byte[] array, int offset, int len) {
        System.out.println("array:" + array);
        System.out.println("array->String:" + new String(array));
        System.out.println("offset:" + offset);
        System.out.println("len:" + len);
    }
}

4.3.2 直接缓冲区模式

直接缓冲区属于堆外分配的直接内存,不会占用堆的容量。

  • 优点:使用socket 传递数据时性能很好,避免了数据从JVM堆内存复制到直接缓冲区的过程,提高了性能。
  • 缺点:直接缓冲区分配内存和释放更为昂贵。
  • 建议:对于涉及大量IO的数据读写,建议使用直接缓冲区;而对于后端的业务消息编解码模块,建议使用堆缓冲区。
public class ByteBufDirectBufferDemo {

    public static void main(String[] args) {
        ByteBuf buffer = Unpooled.directBuffer(10);
        String s = "waylau";
        buffer.writeBytes(s.getBytes());

        // 检查是否是支撑数组.
        // 不是支撑数组,则为直接缓冲区
        if (!buffer.hasArray()) {
            // 计算第一个字节的偏移量
            int offset = buffer.readerIndex();
            // 可读字节数
            int length = buffer.readableBytes();
            // 获取字节内容
            byte[] array = new byte[length];
            buffer.getBytes(offset, array);

            printBuffer(array, offset, length);
        }
    }

    /**
     * 打印出Buffer的信息
     *
     * @param buffer
     */
    private static void printBuffer(byte[] array, int offset, int len) {
        System.out.println("array:" + array);
        System.out.println("array->String:" + new String(array));
        System.out.println("offset:" + offset);
        System.out.println("len:" + len);
    }
}

4.3.3 复合缓冲区模式

复合缓冲区是Netty特有的缓冲区。本质上类似于提供一个或多个ByteBuf的组合试图, 可以根据需要添加和删除不同类型的ByteBuf。

  • 优点:提供一种访问方式让使用者自由地组合多个ByteBuf,避免了复制和分配新的缓冲区。
  • 缺点:不支持访问其支撑数组。因此如果要访问,需要先将内容复制到堆内存,在进行访问。

代码示例:

public class ByteBufCompositeBufferDemo {

    public static void main(String[] args) {
        // 创建一个堆缓冲区
        ByteBuf heapBuf = Unpooled.buffer(3);
        String way = "way";
        heapBuf.writeBytes(way.getBytes());

        // 创建一个直接缓冲区
        ByteBuf directBuf = Unpooled.directBuffer(3);
        String lau = "lau";
        directBuf.writeBytes(lau.getBytes());

        // 创建一个复合缓冲区
        CompositeByteBuf compositeBuffer = Unpooled.compositeBuffer(10);
        // 将缓冲区添加到符合缓冲区
        compositeBuffer.addComponents(heapBuf,directBuf);

        // 检查是否是支撑数组.
        // 不是支撑数组,则为复合缓冲区
        if (!compositeBuffer.hasArray()) {
            for (ByteBuf buffer : compositeBuffer) {
                // 计算第一个字节的偏移量
                int offset = buffer.readerIndex();
                // 可读字节数
                int length = buffer.readableBytes();
                // 获取字节内容
                byte[] array = new byte[length];
                buffer.getBytes(offset, array);
                printBuffer(array, offset, length);
            }
        }
    }

    /**
     * 打印出Buffer的信息
     *
     * @param buffer
     */
    private static void printBuffer(byte[] array, int offset, int len) {
        System.out.println("array:" + array);
        System.out.println("array->String:" + new String(array));
        System.out.println("offset:" + offset);
        System.out.println("len:" + len);
    }
}

4.4  ByteBuf 接口和实现类

ByteBufAllocator 接口
ByteBufUtil 类
hexDump:将参数的内容以十六进制字符串的方式打印
字符串的编码和解码
equals 方法:判断两个ByteBuf 实例的相等性
ByteBufHolder 接口:ByteBuf 的容器
CompositeByteBuf 类
ReferenceCounted 接口
Unpooled 类
分配新缓冲区
创建包装的缓冲区
复制现有的字节数组,字节缓冲区和字符串

4.5 零拷贝

零拷贝主要是用来解决操作系统在处理IO操作时,频繁复制数据的问题。关于零拷贝技术主要有
mmap+write,sendfiile,splice 等集中方式

4.5.1 虚拟内存

  • 多个虚拟内存可以指向同一个物理地址;
  • 虚拟内存空间可以远远大于物理内存空间。

利用上面第一条特性可以优化,把内核空间和用户空间的虚拟地址映射到同一个物理地址,这样在IO操作时就不需要来回复制了。

netty

4.5.2 mmap+write 方式

整体流程的核心区别就是,吧数据读取到内核缓冲区后,应用程序进行写入操作时,直接把内核的Read Buffer 的数据复制到Socket Buffer以便写入,内核之间的复制也是需要CPU参与的。
不过上述流程就少了一个CPU Copy,提升了IO速度,但是上下文的切换还是4次并没有减少。

netty

4.5.3 sendfiile 方式

sendfiile 方式可以替换上面的mmap+write 方式来进一步优化。

netty

sendfiile 方式只有3次数据复制(其中一次CPU Copy) 以及2次上下文切换。但是带有 scatter/gather 的sendfile 方式,能把CPU Copy 减少到没有。

带有 scatter/gather 的sendfile 方式
其原理就是在内核空间Read Buffer和Socket Buffer 不做数据复制,而是将Read Buffer的内存地址,偏移量记录到相应的Socket Buffer,这样就不需要复制。本质和虚拟内存的姐姐方法思路一样,就是内存地址记录。不过这一种收集复制功能是需要硬件及驱动程序支持的。

netty

4.5.4 splice 方式

splice调用和sendfile非常类似,用户应用程序必须拥有两个已经打开的文件描述符,一个表示输入设备,一个表示输出设备,与sendfile 不同的是,splice 允许任意两个文件之间的互相连接,而并不只是文件到socket 进行数据传输。sendfile 只是splice的一个子集,在Linux 2.6.17版本中引入splice,在Linux 2.6.23版本中sendfile 机制的实现已经没有了,但是其API及相应的功能还存在,只不过API 及相应的功能利用了splice 机制来实现。
和sendfile 不同的是,splice 不需要硬件支持。

4.5.5 各种零拷贝的对比图

CPU拷贝DMA拷贝系统调用上下文切换
传统方法22read/write4
内存映射12mmap/write4
sendfile12sendfile2
sendfile with dma
scatter/gather copy02sendfile2
splice02splice2

4.5.6 Java 实现零拷贝

Java实现零拷贝是基于底层操作系统的。目前支持2种:mmap/write 方式和sendfile 方式。

1. Java 提供的mmap/write 方式

  • Java Nio 提供的MappedByteBuffer,用于提供 mmap/write方式;
  • Java Nio Channel 就相当于操作系统中的内核缓存区,有可能是读缓冲区,也有可能是网络缓存区,而buffer 就相当操作系统中的用户缓冲区。

2. Netty 实现零拷贝

Netty 中的零拷贝的实现是基于 Java的,换言之,底层也是基于操作系统实现的。相比于 Java 中的零拷贝而言,Netty 的零拷贝更多的是偏向于优化数据操作的概念。
Netty 的零拷贝体现在如下几个方面:

  • Netty 提供了 CompositeByteBuf 类,它可以将多个ByteBuf合并为一个逻辑上的ByteBuf,避免了各个ByteBuf之间的复制。
  • 通过 wrap 操作,可以将 byte[]数组、ByteBuf、ByteBuffer 等包装成一个 NetyByteBuf 对象,进而避免了复制操作。
  • ByteBuf 支持 slice 操作,因此可以将 ByteBuf分解为多个共享同一个存储区域的ByteBuf,避免了内存的复制。
  • 通过 FileRegion 包装的 FileChannel.tranferTo 实现文件传输,可以直接将文件缓冲区的数据发送到目标Channel,避免了通过循环write 方式导致的内存复制问题。

从上面几个方法可以看出,前 3 个方法都是广义零拷贝,其实现方式都是为了减少不必要数据复制,偏向于应用层数据优化的操作。而第 4 个方法,FileRegion 包装的 FileChamnel.tranferTo,才是真正的零拷贝(狭义零拷贝)。

4.5.7 动态扩容

如果容量无法动态扩展将会给用户带来很大的麻烦。例如,当要解析某条报文内容时,需要事先分配缓冲区。如果缓冲区分配的太小,则会导致报文解析失败;如果缓冲区设置的太大,有可能会导致缓冲区的浪费。特别是在海量推送服务系统中,资源是非常紧缺的,看似微小的资源浪费最终造成的是整个服务端内存负担。假设单条推送消息最大上限为10KB,消息平均大小为 5KB,为了满足 10KB 消息的处理,缓冲区的容量被设置为 10KB,这样每条链路实际上多消耗了 5KB 内存。如果长链接链路数为 100 万,每个链路都独立持有接收缓冲区,则额外损耗的总内存如下。
1000000 * 5KB = 4882.8125MB
内存消耗过大,不仅增加了硬件成本,而且大内存容易导致长时间的 Full GC,对系统稳
定性会造成比较大的冲击。因此,如果能够提供灵活的动态调整内存的机制,即接收缓冲区可以根据以往接收的消息进行计算,动态调整内存,利用 CPU 资源来换内存资源,就可以减少资源的浪费。
动态扩容主要有以下几种策略:

  • 缓冲区支持容量的扩展和收缩,可以按需灵活调整,以节约内存。
  • 接收消息时,可以按照指定的算法对之前接收的消息大小进行分析,并预测未来的消息大小,按照预测值灵活调整缓冲区容量,以做到用最小的资源损耗满足程序正常功能。

Netty 提供了 RevBytaBuBAlloceanor接口以支持容量动态调整。使用 RecvByteBufAllocam分配的接收缓冲区,其容量可能足够大以读取所有入站数据,并且不会浪费其空间。对于接收缓冲区的内存分配器,Nety提供了两个子接口:MaxBytesRecvByteBufAlloceatr彩ManMesagsReviByeBufAlcanor。 MaxByesReevByteBufAllbcator 接口的默认实现类有DeauthMastbytseevByeBufAlcanor MaxMessagesReevByteBufAllocator 接口的默认实现有 DetinhMasMesagesRecevBydBuAlloeaor
而 DefaultMaxMessagesRecvByteBufAllocator 又有两种子类:
• FixedReevByeBufAllocanor:固定长度的接收缓冲区分配器,由它分配的 ByteBuf 长度都是固定大小的,并不会根据实际数据报的大小动态收缩。但是,如果容量不足,支持动态扩展。
• AdaptivekeevByeBufAllocator:容量动态调整的接收缓冲区分配器,它会根据之前Channel接收到的数据报大小进行计算,如果连续填充满接收缓冲区的可写空间,则动态扩展容量。如果连续2次接收到的数据报都小于指定值,则收缩当前的容量,以节约内存。

五,Netty 引导程序(Bootstrap)

可以理解为是一个程序的入口程序,在Java 中就是包含main方法的程序。在Netty 中,引导孩包含一系列的配置项。

5.1 引导程序类

AbstractBootstrap 抽象类
Bootstrap 类
ServerBootstrap 类:引导服务器程序

5.2 引导无连接协议(UDP)

在前面两个 Echo服务器和客户端示例中,都是使用了面向连接协议(TCP)。
本节演示引导使用无连接协议(UDP)的 Netty 程序。
UDP 是用户数据报协议(User Datagrame Protocol)的简称,主要作用是将网络数据压缩成数据报的形式,提供面向事务的简单信息传送服务。UDP 具有以下特点。

  • UDP 传送数据前并不与对方建立连接,即 UDP 是无连接的。
  • UDP 接收到的数据报不发送确认信号,发送端不知道数据是否被正确接收。
  • UDP 传送数据比 TCP 快,系统开销也少。

在Netty中,用 DatagramChame 来代表无连接协议的 Channel。DatagramChannel 具体实现着EplDataganChannel、KQueueDatagamChamel、NioDatagramChannel 和 OioDatagramChannel
1.编写管道处理器:

public class DatagramChannelEchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 消息转为DatagramPacket类型
        DatagramPacket packet = (DatagramPacket) msg;
        System.out.println(packet.sender() + " -> Server :" + msg);

        /**
         * 构建新DatagramPacket
         * 将新的 DatagramPacket的接收者设置为packet 的发送者,实现将接收到的客户端消息发回给客户端
         */
        DatagramPacket data = new DatagramPacket(packet.content(), packet.sender());

        // 写消息到管道
        ctx.write(data);// 写消息
        ctx.flush(); // 刷新消息

        // 上面两个方法等同于 ctx.writeAndFlush(data);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 当出现异常就关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}

2.编写服务器主程序:

public class DatagramChannelEchoServer {

    public static int DEFAULT_PORT = 7;

    public static void main(String[] args) throws Exception {
        int port;

        try {
            port = Integer.parseInt(args[0]);
        } catch (RuntimeException ex) {
            port = DEFAULT_PORT;
        }

        // 配置事件循环器
        // UDP 只需要一个
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            // 启动NIO服务的引导程序类
            Bootstrap b = new Bootstrap();

            b.group(group) // 设置EventLoopGroup
                    .channel(NioDatagramChannel.class) // 指明新的Channel的类型
                    .option(ChannelOption.SO_BROADCAST, true) // 设置的Channel的一些选项 SO_BROADCAST 发送广播消息
                    .handler(new DatagramChannelEchoServerHandler()); // 指定ChannelHandler

            // 绑定端口
            ChannelFuture f = b.bind(port).sync();

            System.out.println("DatagramChannelEchoServer已启动,端口:" + port);

            // 等待服务器 socket 关闭 。
            // 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
            f.channel().closeFuture().sync();
        } finally {

            // 优雅的关闭
            group.shutdownGracefully();
        }
    }
}

3.编写客户端管道处理器:

public class DatagramChannelEchoClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {

        // 从管道读消息
        DatagramPacket packet = (DatagramPacket) msg; // 转为DatagramPacket类型
        String m = packet.content().toString(CharsetUtil.UTF_8);  // 转为字符串
        System.out.println("echo :" + m);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

        // 当出现异常就关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}

3.编写客户端管主程序:

public class DatagramChannelEchoClient {

    public static void main(String[] args) throws InterruptedException {
        if (args.length != 2) {
            System.err.println("用法: java DatagramChannelEchoClient <host name> <port number>");
            System.exit(1);
        }
        String host = args[0];
        int port = Integer.parseInt(args[1]);

        // 配置客户端
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioDatagramChannel.class)
                    .option(ChannelOption.SO_BROADCAST, true)	// 广播消息
                    .handler(new DatagramChannelEchoClientHandler());

            // 绑定端口
            ChannelFuture f = b.bind(port).sync();
            System.out.println("DatagramChannelEchoClient已启动,端口:" + port);

            Channel channel = f.channel();
            ByteBuffer writeBuffer = ByteBuffer.allocate(32);
            try (BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in))) {
                String userInput;
                while ((userInput = stdIn.readLine()) != null) {
                    writeBuffer.put(userInput.getBytes());
                    writeBuffer.flip();
                    writeBuffer.rewind();

                    // 转为ByteBuf
                    ByteBuf buf = Unpooled.copiedBuffer(writeBuffer);
                    // 写消息到管道
                    // 消息封装为DatagramPacket类型
                    channel.writeAndFlush(new DatagramPacket(buf,
                            new InetSocketAddress(host, DatagramChannelEchoServer.DEFAULT_PORT)));

                    // 清理缓冲区
                    writeBuffer.clear();
                }
            } catch (UnknownHostException e) {
                System.err.println("不明主机,主机名为: " + host);
                System.exit(1);
            } catch (IOException e) {
                System.err.println("不能从主机中获取I/O,主机名为:" + host);
                System.exit(1);
            }
        } finally {
            // 优雅的关闭
            group.shutdownGracefully();
        }
    }
}

5.3 从Channel 引导客户端

5.4 在引导过程中添加多个ChannelHandler

5.5 ChannelOption 属性

5.6 优雅的关闭应用

六,Netty 线程模型

线程模型是指程序内部管理线程的方式。
线程模型的类型:

  • 传统服务设计模式
  • NIO 分发模型
  • 事件驱动模式
  • Reactor 模型

6.1 Reactor 模型

(1) Reactor 也可以称作反应器模型,它有以下几个特点:

  • Reactor 模型中会通过分配适当的处理器来响应 I/O 事件。
  • 每个处理器执行非阻塞的操作。
  • 通过将处理器绑定到事件进行管理。

Reactor 模型整合了分发模型和事件驱动这两大优势,特别适合处理海量的 I/O 事件及高并发的场景。
**(2) **Reactor 处理请求的流程
Reactor 处理请求的流程主要分为读取和写入两种操作。
对于读取操作而言,流程如下:

  • 应用程序注册读就绪事件和相关联的事件处理器。
  • 事件分离器等待事件的发生。
  • 当发生读就绪事件时,事件分离器调用第一步注册的事件处理器。

写入操作类似于读取操作,只不过第一步注册的是写就绪事件。
**(3) **Reactor 三种角色

  • Reactor:负责监听和分配事件,将 I/O 事件分派给对应的 Handler。新的事件包含连接建立就绪、读就绪、写就绪等。
  • Acceptor:处理客户端新连接,并分派请求到处理器链中。
  • Handler:将自身与事件绑定,执行非阻塞读1写任务,完成 channel 的读入,完成处理业务逻辑后,负责将结果写出 Channel。可用资源池来管理。

6.1.1 单Reactor 单线程模型

Reactor 线程负责多路分离套接字,Accept 负责接收新连接,并派请求到Handler,如图:

netty

著名的缓存系统Redis 就是使用单Reactor 单线程的模型。
单线程模型的消息处理流程:

  • Reactor 对象通过 select 监控连接事件,收到事件后通过 dispatch 进行转发;
  • 如果是连接建立的事件,则由 Acceptor 接收连接,并创建 Handler 处理后续事件。如果不是建立连接事件,则 Reactor会分发调用 Handler 来响应;
  • Handler会完成 read、decode、compute、encode、send 等一整套流程。

单线程模型的缺点:

  • 单Reactor单线程模型只是在代码上进行了组件的区分,但是整体操作还是单线程,不能充分利用硬件资源。Handler 业务处理部分没有异步。
  • 对于一些小容量应用场景,可以使用单 Reactor 单线程模型。但是对于高负载、高并发的应用场景却不合适,主要原因如下。
  • 即便 Reactor 线程的 CPU 负荷达到 100%,也无法满足海量消息的 read、decode,compute、encode 和 send。
  • 当 Reactor 线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了 Reactor 线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈。
  • 一旦Reactor 线程意外中断或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障。为了解决上述问题,单 Reactor 多线程模型便出现了。

为了解决上述问题,单 Reactor 多线程模型便出现了。

6.1.2 单Reactor 多线程模型

该模型在事件处理器(Handler)部分采用了多线程(线程池),如图:

netty

单Reactor 多线程模型的消息处理流程:

  • Reactor对象通过 Seleet 监控客户端请求事件,收到事件后通过 dispatch 进行分发;
  • 如果是建立连接请求事件,则由 Acceptor 接收连接,并创建一个Handler 对象处理连接完成后续的各种事件。如果不是建立连接事件,则 Reactor会分发调用 Handler 来响应;
  • Handler 只负责响应事件,不做具体业务处理,通过 Read 读取数据后,会分发给后面的 Worker 线程池进行业务处理;
  • Worker 线程池会分配独立的线程完成真正的业务处理,将响应结果发给 Handler 进行处理;
  • Handler 收到响应结果后通过 send 将响应结果返回给 Client。

相对于第一种模型来说,该业务逻辑是交由线程池来处理的,Handler 收到响应后通过send 将响应结果返回给客户端。这样可以降低 Reactor 的性能开销,从而更专注地做事件分发工作,提升了整个应用的吞吐性能。
单Reactor 多线程模型缺点:

  • 多线程数据共享和访问比较复杂。如果子线程完成业务处理后,把結果传递给主线程Reactor进行发送,就会涉及共享数据的互斥和保护机制;
  • Reacfor 承担所有事件的监听和响应,只在主线程中运行,可能会存在性能问题。例的,并发百万客户端连接,或者服务端需要对客户端握手进行安全认证,但是认证本身非常p耗性能。为了解决上述性能问题,产生了第3种主从 Reactor 多线程模型。

为了解决上述问题,主从 Reactor 多线程模型便出现了。

6.1.3 主从Reactor 多线程模型

相较于单 Reactor 多线程模型,主从 Reactor 多线程模型是将 Reactor 分成两部分,如图:

netty

  • mainReactor(主 Reactor)负责监听 Server Socket,用来处理网络 I/O 连接建立操作,将建立的 SocketChannel 指定注册给 subReactor;
  • subReactor( 从 Reactor)主要和建立起来的 socket做数据交互和事件业务处理操作。通常,subReactor 个数上可与 CPU 个数等同;

Nginx、Swoole、Memcached 和 Netty 都是采用这种实现。
主从 Reactor 多线程模型的消息处理流程如下:

  • 从主线程池中随机选择一个 Reactor 线程作为 Acceptor 线程,用于绑定监听端口,接收客户端连接;
  • Acceptor 线程接收客户端连接请求之后创建新的 SocketChannel,将其注册到主线程池的其他 Reactor 线程上,由其负责接入认证、IP 黑白名单过滤、握手等操作;
  • 上述步骤完成之后,业务层的链路正式建立,将 SocketChannel 从主线程池的Reactor 线程的多路复用器上摘除,重新注册到子线程池的线程上,并创建一个 Handler 用于处理各种连接事件;
  • 当有新的事件发生时,subReactor 会调用连接对应的 Handler 进行响应;
  • Handler 通过 Read 读取数据后,会分发给后面的 Worker 线程池进行业务处理。Worker 线程池会分配独立的线程完成真正的业务处理,如将响应结果发给 Handle进行处理;
  • Handler 收到响应结果后通过 Send 将响应结果返回给 Client。

相对于前面几种模型而言,主从 Reactor多线程模型业务逻辑是交由线程池来处理的Handler收到响应后通过 send 将响应结果返回给客户端。这样可以降低 Reactor 的性能开销,从而更专注地做事件分发工作了,提升了整个应用的吞吐性能。

6.1.4 实战主从Reactor 多线程模型示例

  1. Reactor 类:
public class Reactor implements Runnable {

	private final Selector selector;
	private final ServerSocketChannel serverSocketChannel;

	public Reactor(int port) throws IOException {
		selector = Selector.open(); // 打开一个Selector
		serverSocketChannel = ServerSocketChannel.open(); // 建立一个Server端通道
		serverSocketChannel.socket().bind(new InetSocketAddress(port)); // 绑定服务端口
		serverSocketChannel.configureBlocking(false); // 置为非阻塞模式

		// Reactor是入口,最初给一个channel注册上去的事件都是accept
		SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

		// 绑定Acceptor处理类
		sk.attach(new Acceptor(serverSocketChannel));
	}

	@Override
	public void run() {
		try {
			while (!Thread.interrupted()) {
				int count = selector.select(); // 就绪事件到达之前,阻塞
				if (count == 0) {
					continue;
				}
				Set<SelectionKey> selected = selector.selectedKeys(); // 拿到本次select获取的就绪事件
				Iterator<SelectionKey> it = selected.iterator();
				while (it.hasNext()) {
					// 这里进行任务分发
					dispatch((SelectionKey) (it.next()));
				}
				selected.clear();
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	void dispatch(SelectionKey k) {
		// 附带对象为Acceptor
		Runnable r = (Runnable) (k.attachment());

		// 调用之前注册的回调对象
		if (r != null) {
			r.run();
		}
	}
}
  1. Acceptor 类:
public class Acceptor implements Runnable {

	private final ServerSocketChannel serverSocketChannel;

	private final int coreNum = Runtime.getRuntime().availableProcessors(); // CPU核心数

	private final Selector[] selectors = new Selector[coreNum]; // 创建selector给SubReactor使用

	private int next = 0; // 轮询使用subReactor的下标索引

	private SubReactor[] reactors = new SubReactor[coreNum]; // subReactor

	private Thread[] threads = new Thread[coreNum]; // subReactor的处理线程

	Acceptor(ServerSocketChannel serverSocketChannel) throws IOException {
		this.serverSocketChannel = serverSocketChannel;
		// 初始化
		for (int i = 0; i < coreNum; i++) {
			selectors[i] = Selector.open();
			reactors[i] = new SubReactor(selectors[i], i); // 初始化sub reactor
			threads[i] = new Thread(reactors[i]); // 初始化运行sub reactor的线程
			threads[i].start(); // 启动(启动后的执行参考SubReactor里的run方法)
		}
	}

	@Override
	public void run() {
		SocketChannel socketChannel;
		try {
			socketChannel = serverSocketChannel.accept(); // 连接
			if (socketChannel != null) {
				System.out.println(String.format("accpet %s", socketChannel.getRemoteAddress()));
				socketChannel.configureBlocking(false);

				// 注意一个selector在select时是无法注册新事件的,因此这里要先暂停下select方法触发的程序段,
				// 下面的weakup和这里的setRestart都是做这个事情的,具体参考SubReactor里的run方法
				reactors[next].registering(true);
				selectors[next].wakeup(); // 使一个阻塞住的selector操作立即返回
				SelectionKey selectionKey = 
						socketChannel.register(selectors[next], SelectionKey.OP_READ); // 注册一个读事件
				selectors[next].wakeup(); // 使一个阻塞住的selector操作立即返回

				// 本次事件注册完成后,需要再次触发select的执行,
				// 因此这里Restart要在设置回false(具体参考SubReactor里的run方法)
				reactors[next].registering(false);

				// 绑定Handler
				selectionKey.attach(new AsyncHandler(socketChannel, selectors[next], next));
				if (++next == selectors.length) {
					next = 0; // 越界后重新分配
				}
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}
  1. SubReactor 类:
public class SubReactor implements Runnable {
	private final Selector selector;
	private boolean register = false; // 注册开关表示
	private int num; // 序号,也就是Acceptor初始化SubReactor时的下标

	SubReactor(Selector selector, int num) {
		this.selector = selector;
		this.num = num;
	}

	@Override
	public void run() {
		while (!Thread.interrupted()) {
			System.out.println(String.format("NO %d SubReactor waitting for register...", num));
			while (!Thread.interrupted() && !register) {
				try {
					if (selector.select() == 0) {
						continue;
					}
				} catch (IOException e) {
					e.printStackTrace();
				}
				Set<SelectionKey> selectedKeys = selector.selectedKeys();
				Iterator<SelectionKey> it = selectedKeys.iterator();
				while (it.hasNext()) {
					dispatch(it.next());
					it.remove();
				}
			}
		}
	}

	private void dispatch(SelectionKey key) {
		Runnable r = (Runnable) (key.attachment());
		if (r != null) {
			r.run();
		}
	}

	void registering(boolean register) {
		this.register = register;
	}

}
  1. AsyncHandler 类:
public class AsyncHandler implements Runnable {

	private final Selector selector;

	private final SelectionKey selectionKey;
	private final SocketChannel socketChannel;

	private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
	private ByteBuffer sendBuffer = ByteBuffer.allocate(2048);

	private final static int READ = 0; // 读取就绪
	private final static int SEND = 1; // 响应就绪
	private final static int PROCESSING = 2; // 处理中

	private int status = READ; // 所有连接完成后都是从一个读取动作开始的

	private int num; // 从反应堆序号

	// 开启线程数为4的异步处理线程池
	private static final ExecutorService workers = Executors.newFixedThreadPool(5);

	AsyncHandler(SocketChannel socketChannel, Selector selector, int num) throws IOException {
		this.num = num; // 为了区分Handler被哪个从反应堆触发执行做的标记
		this.socketChannel = socketChannel; // 接收客户端连接
		this.socketChannel.configureBlocking(false); // 置为非阻塞模式
		selectionKey = socketChannel.register(selector, 0); // 将该客户端注册到selector
		selectionKey.attach(this); // 附加处理对象,当前是Handler对象
		selectionKey.interestOps(SelectionKey.OP_READ); // 连接已完成,那么接下来就是读取动作
		this.selector = selector;
		this.selector.wakeup();
	}

	@Override
	public void run() {
		// 如果一个任务正在异步处理,那么这个run是直接不触发任何处理的,
		// read和send只负责简单的数据读取和响应,业务处理完全不阻塞这里的处理
		switch (status) {
		case READ:
			read();
			break;
		case SEND:
			send();
			break;
		default:
		}
	}

	private void read() {
		if (selectionKey.isValid()) {
			try {
				readBuffer.clear();

				// read方法结束,意味着本次"读就绪"变为"读完毕",标记着一次就绪事件的结束
				int count = socketChannel.read(readBuffer);
				if (count > 0) {
					status = PROCESSING; // 置为处理中
					workers.execute(this::readWorker); // 异步处理
				} else {
					selectionKey.cancel();
					socketChannel.close();
					System.out.println(String.format("NO %d SubReactor read closed", num));
				}
			} catch (IOException e) {
				System.err.println("处理read业务时发生异常!异常信息:" + e.getMessage());
				selectionKey.cancel();
				try {
					socketChannel.close();
				} catch (IOException e1) {
					System.err.println("处理read业务关闭通道时发生异常!异常信息:" + e.getMessage());
				}
			}
		}
	}

	void send() {
		if (selectionKey.isValid()) {
			status = PROCESSING; // 置为执行中
			workers.execute(this::sendWorker); // 异步处理
			selectionKey.interestOps(SelectionKey.OP_READ); // 重新设置为读
		}
	}

	// 读入信息后的业务处理
	private void readWorker() {
		try {

			// 模拟一段耗时操作
			Thread.sleep(5000L);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		try {
			System.out.println(String.format("NO %d %s -> Server: %s", 
					num, socketChannel.getRemoteAddress(),
					new String(readBuffer.array())));
		} catch (IOException e) {
			System.err.println("异步处理read业务时发生异常!异常信息:" + e.getMessage());
		}
		status = SEND;
		selectionKey.interestOps(SelectionKey.OP_WRITE); // 注册写事件
		this.selector.wakeup(); // 唤醒阻塞在select的线程
	}

	private void sendWorker() {
		try {
			sendBuffer.clear();
			sendBuffer.put(String.format("NO %d SubReactor recived %s from %s", num,
					new String(readBuffer.array()), 
					socketChannel.getRemoteAddress()).getBytes());
			sendBuffer.flip();

			// write方法结束,意味着本次写就绪变为写完毕,标记着一次事件的结束
			int count = socketChannel.write(sendBuffer);

			if (count < 0) {
				// 同上,write场景下,取到-1,也意味着客户端断开连接
				selectionKey.cancel();
				socketChannel.close();
				System.out.println(String.format("%d SubReactor send closed", num));
			}

			// 没断开连接,则再次切换到读
			status = READ;
		} catch (IOException e) {
			System.err.println("异步处理send业务时发生异常!异常信息:" + e.getMessage());
			selectionKey.cancel();
			try {
				socketChannel.close();
			} catch (IOException e1) {
				System.err.println("异步处理send业务关闭通道时发生异常!异常信息:" + e.getMessage());
			}
		}
	}
}
  1. MainSubReactorDemo 类
public class MainSubReactorDemo {

    public static void main(String[] args) throws IOException {
        new Thread(new Reactor(2333)).start();
    }

}

6.2 EventLoop 接口

在Netty中,一个EventLoop 将由一个永远都不会改变的 Thnead 驱动,同时任务(Runnable或者Callable)可以直接接交给EventLoop实现。以立即执行或者调度执行。根据醒翻明核心的不同。可能会创建多个EventLoop实例用以优化资源的使用,并且单个EventLoop可能会被派用于服务多个Channel, Netty 中的EventLoop 系别类型。对应于经典Reactor模型的Reactor,完成Channel的注册,轮询和分发,如图:

netty

最后总结 EventLoop 如下:

  • 一个EventLoop 在它的生命周期内只和一个 Thread 绑定;
  • 所有有 EnventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理;
  • 一个Channel 在它的生命周期内只注册于一个 EventLoop;
  • 每一个EventLoop 负责处理一个或多个 Channel。

6.2 EventLoopGroup 接口

EventLoopGroup 接口是一组EventLoop 的抽象。一个EventLoopGroup 当中会包含一个或多个EventLoop,Netty 服务器编程中,需要boss和worker 两个EventLoopGroup 来进行工作。

  • boss 类型的 EventLoopGroup 通常是一个单线程的 EventLoop。该 EventLoop 维护着一个注册了ServerSocketChannel的 Selector 实例,EventLoop 的实现涵盖 I/O 事件的分离和分发。
  • EventLoop 的实现充当了 Reactor 模式中的 Reactor 的角色。boss 类型的 EventLoopGroup 只负责处理连接,故开销非常小,连接到来,马上按照策略将 SocketChannel 转发给 worker 类型的 EventLoopGroup。而 worker 类型的 EventLoopGroup会由 next 选择其中一个 EventLoop 来将这个 SocketChannel 注册到其维护的 Selector 并对其后续的IO事件进行处理。所以通常可以将 boss 类型的EventLoopGroup 的线程数参数设置为1。通过设置 bosGroup、workerGroup 线程数的大小,可以实现不同的 Reactor模型。

6.2.1 Netty 的单 Reactor 单线程模式

下面代码中配置了单个NioEventLoopGroup,且NioEventLoopGroup 的线程指定为1。

EventLoopGroup group = new NioEventLoopGroup(1); // 线程为1
ServerBootstrap b = new ServerBootstrap();

6.2.2 Netty 的单 Reactor 多线程模式

下面代码中配置了单个NioEventLoopGroup,但是NioEventLoopGroup 的线程数会根据CPU核数计算。因为目前大多数计算机都是多核的,因此,不指定线程数,则必然是多线程。

EventLoopGroup group = new NioEventLoopGroup(); // 不指定线程数,则为cpu 核数
ServerBootstrap b = new ServerBootstrap();

6.2.3 Netty 的主从 Reactor 多线程模式

下面代码配置了两个NioEventLoopGroup,承担只从Reactor的职责。且每个Reactor 都不指定线程数,因此都是多线程。

 // 多事件循环器
 EventLoopGroup bossGroup = new NioEventLoopGroup(); // boss
 EventLoopGroup workerGroup = new NioEventLoopGroup(); // worker

6.3 任务调度

6.3.1 EventLoop 调度任务

  1. 定时执行任务
  2. 间隔执行任务
  3. 取消任务

6.3.2 Netty 调度的实现原理

Netty 任务的实现只保证一个近似执行,也就优是说任务的执行可能不是 100%准确。在实践中,这种近似值已经被证明是一个可容忍的限制,不影响多数应用程序。所以,定时执行任务不可能 100%准确地技时执行。为了更好地理解它是如何工作的,分析如下:
(1)在指定的延迟时间后调度任务。
(2)任务被插入到 EventLoop 的调度任务队列中。
(3)如果任务需要马上执行,EventLoop 会检查每个运行。
(4)如果有一个任务要执行,EventLoop 将立刻执行它,并从队列中删除。
(5) EventLoop 等待下一次运行,从第 4 步开始一遍又一遍地重复。
因为这样的实现计划执行不可能100%准确,因此在Nety中,这样的工作几乎没有资源开销。但是如果需要更准确的执行呢?可以很容易地使用 Java ScheduledExecutorService 的另一个实现,但这不是 Netty所关注的内容。记住,如果要开发未遵循 Netty 线程模型的程序。开发者就需要同步并发访问。

6.4 Netty 的Future

Netty 中 Future 相对于 JDK 中的 Future 做了以下几个方面的扩展:

  • 操作完成的结果做了区分,分为 success、fail、canceled 三种;
  • 通过 addlisteners()方法可以添加回调操作,即触发或者完成时需要进行的操作;
  • await()和 sync(),可以以阻塞的方式等待异步完成;
  • getnow( 可以获得异步操作的结果,如果还未完成则返回 mull。

6.4.1 ChannelFuture 接口

Channel 接口基本上继承自Netty 的Future接口,一个ChammelFuture 要么是完成的,要么是未完成的。当一个 IO 操作开始时,会创建一个Fiuture对象,Future 初始化时为完成的状态,既不是成功的,或者失败的,也不是取消的。因为10 操作还没有完成;如果一个 I/O 不管是成功,还是失败,或者被取消,Funure 都会被标记一些特殊的信息,如失败的原因,请注意即使是失败和取消也属于完成状态。
ChannelFuture提供了很多方法来检查UO操作是否完成、等待完成、获取UO操作的结果,也允许添加 ChanneIFutureListener,因此可以在 I/O 操作完成时被通知。
推荐优先使用 addListener(GenericFutureListener),而不是await()。在可能的情况下,这样就能在 I/O 操作完成时收到通知,并且可以去做后续的任务处理。addListener(GenericFutureListener)本身非阻塞 ,它会添加一个指定的 ChannelFfutureLisener到 ChanteiFiuture,并且UO线程完成对应的操作将会通知监听器,ChannelFutureListener也会提供最好的性能和资源利用率,因为它永远不会阻塞,但是如果不是基于事件编程,它可能在顺序逻辑上存在棘
相反的,awaitO是一个阻塞的操作,一旦被调用,调用者线程在操作完成之前是阻塞的。实现顺序的逻辑比较容易,但是让调用者线程等待是没有必要的,会造成资源的消耗,更可能会造成死锁,接下来会介绍。
ChannelHandler 的时间处理器通常会被 1/O 线程调用,如果 await()被一个时间处理方法这是一个死锁。因此,不要在 ChamnelHandler中调用 await() 方法。

6.4.2 Netty 的Promise

Netty 的Promise 其实就是 类似 JDK8 引入的CompletableFuture 类(参考链接)。

Netty DefaultPromise 类

  • 一部分是为调用者提供 get()和addListener() 用于获取 Future 任务执行结果和添加监听事件
  • 另一部分是为业务处理任务提供setSuccess()等方法设置任务的成功或失败。

说明:
在添加监听器的过程中,如果任务刚好执行完毕 done(),则立即触发监听事件。触发监听通过 notifyListeners( 实现。主要逻辑如下:如果当前 adListener 的线程(准确来说应该是调用 notifyListeners 的线程,因为 addListener 和 setSuccess 都会调用 notifyLisiteners)与当前执行的线程是同一个线程,则放在线程池中执行,否则提
交到线程池去执行。例如,main 线程中调用 addListener 时任务完成,notifyListeners( 执行
回调,会提交到线程池中执行;而如果是执行 Future 任务的线程池中 setSuccess()时,调用notiftyListeners(),会放在当前线程中执行。内部维护了 notifyingListeners 用来记录是否已经触发过监听事件,只有未触发过且监听列表不为空,才会依次遍历并调用 operationComplete。cancel 用来取消任务,根据 result 判断,如果可以取消,则唤醒等待线程,通知监听事件。

Netty DefaultChannelPromise 类

DefauhChannelPromise 是 DefautPromise 的子类,内部维护了一个通道变量chamel。Promise 机制相关的方法都是调用父类方法。
除此之外,DefaultChannelPromise 还实现了 FlushCheckpoint 接口,供 ChamelFhushPromiseNotifier
使用,将 ChannelFuture 注册到 ChamelFhushPromiseNotifier 类,当有数据写入或到达checkpoint 时使用。

七,Netty 编解码

数据从一种特定协议格式到另一种格式的转换。处理编码和解码的程序通常被称为编码器和解码器。Netty提供了一些组件,利用它们可以很容易地为各种不同协议编写编解码器。

7.1 解码器

解码器的主要职责是负责将“入站”数据从一种格式转换到另一种格式。Netty 提供了丰富的解码器抽象基类,方便开发者自定义解码器。这些基类主要分为以下两类。

  • 解码从字节到消息(ByteToMesageDeoder和ReplayingDecouder)。
  • 解码从消息到消息(MessageToMessageDecoder)。

Netty 的解码器是 ChannelInBoundHandlerAdapter 的抽象实现。在实际应用中使用解码器很简单,就是将入站数据转换格式后传递到ChannelPline中的下一个 ChannelInboundHandler 进行处理。将解码器放在ChannelPline中,会很整个程序变得灵活。同时也能方便重用逻辑。

7.1.1 ByteToMessageDeoder 抽象类

ByeToMessageDecoder 抽象类用于将字节转为消息(或其他字节序列)。ByeToMessageDecoder继承自 ChamelnboundiHanderAdapter。ChamnelinboundHandlerAdapter 以类似流的方式将字节从 ByteBuf解码为另一种消息类型。例如,下面例子是从输入 ByteBuf 读取所有可读字节并创建新的 ByteBuf 的实现。

public class SquareDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        out.add(in.readBytes(in.readableBytes()));
    }
}

7.1.2 ReplayingDecoder 抽象类

尽管ByeToMessageDecoder 可以简化于发编码器的工作量,但这个类有个不好的地相于,必须在实际的载操作(如上面例子的readableBytes))之前,必须要验证输人的ByteBuf剩足够的数据。那么是否有更加智能的方式来的这类验证呢? ReplayingDecoder这个纯可陈现这类功能。
ReplayingDecoder 抽象类是 ByteloMessageDecoder的一个子类,会自动检查缓冲区是否有足够的字节。若ByeBbuf中有足够的字节。则会正常读取;若没有足够的字节则会停止解码。
那么 ReplayingDecoder 是怎么做到的呢?
当 ReplayingDecoder 接收的 buffer 的数据不足时,会抛出一个错误,ReplayingDecoder通过一个ByteBuf 的具体实现来完成。如果 ReplayingDecoder 捕捉到这个错误,然后会将读的索引重置到刚开始的位置(buffer 的开始位置),当数据继续进入 buffer 时再次调用 decode方法。请注意 ReplayingDecoder 总是返回一个缓冲的 Error 实例,来避免创建新的 Eror对象和每次填充堆栈的负担。
大多数时候,使用 ReplayingDecoder 会比 ByteToMessageDecoder 更加方便。但不是任何时候都应该使用 ReplayingDecoder。使用 ReplayingDecoder 需考虑以下限制。

  • 不是所有的标准ByteBuf操作都被支持,如果调用一个不支持的操作会抛UhrplayableOperationException;
  • 在性能上,使用 ReplayingDecoder 要略慢于 ByteToMessageDecoder。如果能接受这些限制,则使用 ReplayingDecoder。如果不想引入过多的复杂性,则使用ByeToMessageDecoder。以下是一个使用ReplayingDecoder 的示例。
public class ToIntegerDecoder2 extends ReplayingDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        out.add(in.readInt());
    }
}

ReplayingDecoder 的性能优化
使用 ReplayingDecoder 的 checkpoint 方法可以显著提高复杂解码器实现的性能。checkpoint 方法分为不带参数的 checkpoint()方法和带参数的 checkpoint(S state)方法。其中,带参数的 checkpoint(S state)方法可以方便管理解码器的状态。
1.checkpoint()使用示例checkpoint()方法可以更新缓冲区的“初始”位置,以便 ReplayingDecoder 将缓冲区的readerIndex 倒退到调用 checkpoint()方法的最后位置:

public class IntegerHeaderFrameDecoder extends ReplayingDecoder<Void> {

    private boolean readLength;
    private int length;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
        if (!readLength) {
            length = buf.readInt();
            readLength = true;
            checkpoint();
        }
        if (readLength) {
            ByteBuf frame = buf.readBytes(length);
            readLength = false;
            checkpoint();
            out.add(frame);
        }
    }
}

2.checkpoint(S stauts) 使用示例:

public class IntegerHeaderFrameDecoder2 extends ReplayingDecoder<MyDecoderState> {

    private int length;

    public IntegerHeaderFrameDecoder2() {
        // 设置初始状态
        super(MyDecoderState.READ_LENGTH);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
        switch (state()) {
            case READ_LENGTH:
                length = buf.readInt();
                checkpoint(MyDecoderState.READ_CONTENT);
            case READ_CONTENT:
                ByteBuf frame = buf.readBytes(length);
                checkpoint(MyDecoderState.READ_LENGTH);
                out.add(frame);
                break;
            default:
                throw new Error("Should`t reach here.");
        }
    }
}

7.1.3 MessageToMessageDecoder 抽象类

MessageToMessageDecoder 抽象类用于从一种消息解码为另外一种消息,例如,从一种POJO转为另外一种POJO。

public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer> {
    @Override
    protected void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
        out.add(String.valueOf(msg));
    }
}

7.1.4 使用TooLongFrameException 处理太大的帧

Netty 是异步框架,需要缓冲区字节在内存中,直到能够好码它们。因此,要注意解码器中如果缓存太多的数据会耗尽可用内存。为了解法这个问题,Netty 提供了一个ToolangFrameException 异类,通常由解码器在帧太长时抛出。
为了避免帧太大这个问题,可以在解玛器中设置一个最大字节数阀值,如果超出阀值。就出ToolangFrameException。然后由解码器的用户决定的何处理它。虽然一些协议,如HTTP分许这种情况下有一个特殊的响应,但大多数协议   应对该事件唯一的选择可能就是关闭连接。

public class SafeByteMessageDecoder extends ByteToMessageDecoder {  // 继承主要用于将字节解码为消息
    private static final int MAX_FRAME_SIZE = 1024;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int readbale = in.readableBytes();
        if (readbale > MAX_FRAME_SIZE) {    // 检测缓冲区数据是否大于常量 MAX_FRAME_SIZE
            // 忽略所有可读的字节,并抛出 TooLongFrameException 来通知ChannelPipeline 中的ChannelHander
            // 以说明这个帧数据超长
            in.skipBytes(readbale);
            throw new TooLongFrameException("Frame too big !");
        }
    }
}

7.1.5 自定义基于行的解码器

在实际开发中,希望服务端接收的数据是一个字符串或者是一个对象类型,而不是字节码,那么就需要解码器。
在 Netty 中,Bytebuf 数据转 String 是不需要开发者手动处理的,只需要在管道中添加-个SringDecoder 即可。如果需要考虑网络传输过程中的半包粘包问题,Netty 也能胜任。
Netty 常用下面 3 种方法来解决 String 的半包粘包问题。

  • LineBasedFrameDecoder:基于换行。
  • DelimiterBasedFrameDecoder:基于指定字符串。
  • FixedLengthFrameDecoder :基于固定长度。

1-LineBasedFrameDecoder 类

LineBasedFrameDecoder 类是基于换行的,意思是只要在接收数据时遇到以换行符"\n"或者回车换行符"\r\n"结尾时,就表示数据已经接收完成可以被处理了。
代码示例:
(1)定义解码器

public class MyLineBasedFrameDecoder extends LineBasedFrameDecoder {

    private final static int MAX_LENGTH = 1024; // 帧的最大长度

    public MyLineBasedFrameDecoder() {
        super(MAX_LENGTH);
    }

}

(2)定义ChannelHandler

public class MyLineBasedFrameDecoderServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 接收msg消息,此处已经无需解码了
        System.out.println("Client -> Server: " + msg);
    }
}

(3)定义ChannelInitializer

public class MyLineBasedFrameDecoderChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        // 基于换行符号
        channel.pipeline().addLast(new MyLineBasedFrameDecoder());
        // 解码转String
        channel.pipeline().addLast(new StringDecoder());
        // 自定义ChannelHandler
        channel.pipeline().addLast(new MyLineBasedFrameDecoderServerHandler());
    }
}

注意添加到 ChannelPipeline 的 ChannelHandler 的顺序,MyLineBasedFrameDecoder在前,
MyLineBasdFrameDecoderServerHandler在后,意味着数据先经过 MyLineBasedFrameDecoder
解码,然后再交给 MyLineBasedFrameDecoderServerHandler 处理。

(4)编写服务器

public class MyLineBasedFrameDecoderServer {

    public static int DEFAULT_PORT = 8023;

    public static void main(String[] args) throws Exception {
        int port;

        try {
            port = Integer.parseInt(args[0]);
        } catch (RuntimeException ex) {
            port = DEFAULT_PORT;
        }

        // 多线程事件循环器
        EventLoopGroup bossGroup = new NioEventLoopGroup(1); // boss
        EventLoopGroup workerGroup = new NioEventLoopGroup(); // worker

        try {
            // 启动NIO服务的引导程序类
            ServerBootstrap b = new ServerBootstrap();

            b.group(bossGroup, workerGroup) // 设置EventLoopGroup
                    .channel(NioServerSocketChannel.class) // 指明新的Channel的类型
                    .childHandler(new MyLineBasedFrameDecoderChannelInitializer()) // 指定ChannelHandler
                    .option(ChannelOption.SO_BACKLOG, 128) // 设置的ServerChannel的一些选项
                    .childOption(ChannelOption.SO_KEEPALIVE, true); // 设置的ServerChannel的子Channel的选项

            // 绑定端口,开始接收进来的连接
            ChannelFuture f = b.bind(port).sync();

            System.out.println("MyLineBasedFrameDecoderServer已启动,端口:" + port);

            // 等待服务器 socket 关闭 。
            // 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
            f.channel().closeFuture().sync();
        } finally {
            // 优雅的关闭
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

(5)编写客户端

public class TcpClient {

    public static void main(String[] args) throws IOException {
        Socket socket = null;
        OutputStream out = null;

        try {

            socket = new Socket("localhost", 8023);
            out = socket.getOutputStream();

            // 请求服务器
            String lines = "床前明月光\r\n疑是地上霜\r\n举头望明月\r\n低头思故乡\r\n";
            byte[] outputBytes = lines.getBytes("UTF-8");
            out.write(outputBytes);
            out.flush();
        } finally {
            // 关闭连接
            out.close();
            socket.close();
        }
    }
}

(5)测试

netty

说明:MyLineBasedFrameDecoderServerHandler 接收到4 条数据,因为在MyLineBasedFrameDecoderChannelInitializer 中,MyLineBasedFrameDecoder先被添加到了ChannelPipeline,然后才添加 MyLineBasedFrameDecoderServerHandler,意味着数据先经过解码,在交给MyLineBasedFrameDecoderServerHandler处理。在数据解码的过程中数据包含了4哥回车换行符号,因此数据被解码为4条。

7.2 编码器

回顾之前的定义。编码器就是用来把出站数据从一种格式转换到另外一种格式,因此他实现了ChannelOutboundHandler,类似于解码器, Netty也提供了一组来帮助开发者快速上手编码器,当然这些类提供的是与解码器相反的方法,如下所示:

  • 编码从消息到字节(MessageToByteEneoder)。
  • 编码从消息到消息(MessageToMessageEncoder)。

7.2.1 MessageToByteDeoder 抽象类

public class ShortToByteEncoder extends MessageToByteEncoder<Short> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out) throws Exception {
        out.writeShort(msg);
    }
}

上述示例中,ShortToByteEncoder收到了 Short消息。编码它们,并把它们写入ByteBuf。ByteBuf 接着前进到ChannelPipeline和下一个 ChannelOutboundHandler, 每个 Short将占用ByteBuf的两个字节。实现 ShortToByteEncoder主要分为以下两步:

  • 实现继承自MessageToByteEncoder;
  • 写Short 到Byte Buf。

7.2.2 MessageToMessageEncoder 抽象类

public class IntegerToStringEncoder extends MessageToMessageEncoder {
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, List out) throws Exception {
        out.add(String.valueOf(msg));
    }
}

上述例子将Integer 消息编码为String消息,分为两部:

  • 实现继承 MessageToMessageEncoder;
  • 将Integer 转为 String,并添加到MessageBuf

7.2.3 自定义编码器

定义消息通信协议
消息通信协议是连接客户端和服务器的“密语”,只有熟知双方的通信协议,客户端和最务器才能正常识别消息的内容。常见的消息通信协议有 HTTP、MQTT、XMPP、STOMP、AMQP和RTMP 等。
定义消息的通信协议,消息头(msg Type),消息头(len),消息体(body),代码如下:

public enum MsgType {

    // 请求
    EMGW_LOGIN_REQ((byte) 0x00),
    // 响应
    EMGW_LOGIN_RES((byte) 0x01);

    private byte value;

    public byte getValue() {
        return value;
    }

    private MsgType(byte value) {
        this.value = value;
    }
}
public class MsgHeader {
    private byte msgType; // 消息类型
    private int len; // 长度

    public MsgHeader() {
    }

    public MsgHeader(byte msgType, int len) {
        this.msgType = msgType;
        this.len = len;
    }

    public byte getMsgType() {
        return msgType;
    }

    public void setMsgType(byte msgType) {
        this.msgType = msgType;
    }

    public int getLen() {
        return len;
    }

    public void setLen(int len) {
        this.len = len;
    }
}
public class Msg {

    private MsgHeader msgHeader = new MsgHeader();
    private String body;

    public MsgHeader getMsgHeader() {
        return msgHeader;
    }

    public void setMsgHeader(MsgHeader msgHeader) {
        this.msgHeader = msgHeader;
    }

    public String getBody() {
        return body;
    }

    public void setBody(String body) {
        this.body = body;
    }
}

定义编码器:

/**
 * 编码器
 * @author: wangyj
 * @create: 2022-04-30
 * @version: 1.0.0
 **/
public class MyEncoder extends MessageToByteEncoder<Msg> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Msg msg, ByteBuf out) throws Exception {
        if (msg == null | msg.getMsgHeader() == null) {
            throw new Exception("The encode message is null");
        }

        // 获取消息头
        MsgHeader header = msg.getMsgHeader();
        // 获取消息体
        String body = msg.getBody();
        byte[] bodyBytes = body.getBytes(Charset.forName("utf-8"));
        // 计算消息体的长度
        int bodySize = bodyBytes.length;
        System.out.printf("MyEncoder header: %s, body: %s", header.getMsgType(), body);

        out.writeByte(MsgType.EMGW_LOGIN_RES.getValue());
        out.writeInt(bodySize);
        out.writeBytes(bodyBytes);
    }
}

定义解码器:

public class MyDecoder extends LengthFieldBasedFrameDecoder {

    private static final int MAX_FRAME_LENGTH = 1024 * 1024;
    private static final int LENGTH_FIELD_LENGTH = 4;
    private static final int LENGTH_FIELD_OFFSET = 1;
    private static final int LENGTH_ADJUSTMENT = 0;
    private static final int INITIAL_BYTES_TO_STRIP = 0;

    private static final int HEADER_SIZE = 5;
    private byte msgType; // 消息类型
    private int len; // 长度

    public MyDecoder() {
        super(MAX_FRAME_LENGTH,
                LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH,
                LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP);
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in2) throws Exception {
        ByteBuf in = (ByteBuf) super.decode(ctx, in2);
        if (in == null) {
            return null;
        }
        // 校验头长度
        if (in.readableBytes() < HEADER_SIZE) {
            return null;
        }
        msgType = in.readByte();
        len = in.readInt();

        // 校验消息体长度
        if (in.readableBytes() < len) {
            return null;
        }

        ByteBuf buf = in.readBytes(len);
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");

        // ByteBuf转为Msg类型
        Msg msg = new Msg();
        MsgHeader msgHeader = new MsgHeader(msgType, len);
        msg.setBody(body);
        msg.setMsgHeader(msgHeader);
        return msg;
    }
}

MyDecoder 集成自 Netty 内嵌的解码器 LengthFieldBasedFameDecoder。LengthFieldBasedFianeDevoder 是一种基于灵活长度的解码器。在数据包中,加了一个长度字段,保存上层包的长度。解码时,会按照这个长度,进行上层 ByteBuf应用包的提取。其中,初始化LengthFieldBasedFrameDecoder 时,需要指定以下参数:

  • MAX_FRAME_LENGTH:发送的数据包最大长度。
  • LENGTH_FIELD_OFFSET:长度域偏移量,指的是长度城位于整个数据包字节数组中的下标。
  • LENGTH_FIELD_LENGTH:长度域的字节数长度。
  • LENGTH_ADJUSTMENT:长度域的偏移量矫正。如果长度域的值除了包含有效数据域的长度外,还包含了其他域(如长度域自身)长度,就需要进行矫正。矫正的值为包长-长度域的值-长度域偏移-长度域长。
  • INITIAL_BYTES_TO_STRIP:丢来的起始字节数。丢喜处于有效获据前面的宇节数量。

定义服务器ChannelHandler:

public class MyServerHandler extends SimpleChannelInboundHandler {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object obj) throws Exception {
        Channel incoming = ctx.channel();

        if (obj instanceof Msg) {
            Msg msg = (Msg) obj;
            System.out.println("Client->Server:" + incoming.remoteAddress() + msg.getBody());
            incoming.write(obj);
        }
    }
}

定义服务器主程序:

public class MyServer {

    private int port;

    public MyServer(int port) {
        this.port = port;
    }

    private void run() throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 这里需要注意添加的顺序
                            ch.pipeline().addLast("decoder", new MyDecoder());
                            ch.pipeline().addLast("encoder", new MyEncoder());
                            ch.pipeline().addLast(new MyServerHandler());
                        }
                    }).option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture f = b.bind(port).sync();
            System.out.println("Server start listen at " + port);

            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8082;
        }
        new MyServer(port).run();
    }
}

定义客户端主程序:

public class MyClient {

    private String host;
    private int port;

    public MyClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    private void run() throws InterruptedException {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast("decoder", new MyDecoder());
                    ch.pipeline().addLast("encoder", new MyEncoder());
                    ch.pipeline().addLast(new MyClientHandler());

                }
            });

            // 启动客户端
            ChannelFuture f = b.connect(host, port).sync();

            while (true) {
                Msg msg = new Msg();
                MsgHeader msgHeader = new MsgHeader();
                msgHeader.setMsgType(MsgType.EMGW_LOGIN_REQ.getValue());
                String body = "床前明月光,疑是地上霜。举头望明月,低头思故乡。";

                byte[] bodyBytes = body.getBytes(Charset.forName("utf-8"));
                int bodySize = bodyBytes.length;
                msgHeader.setLen(bodySize);
                msg.setMsgHeader(msgHeader);
                msg.setBody(body);

                f.channel().writeAndFlush(msg);
                Thread.sleep(2000);
            }
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new MyClient("localhost", 8082).run();
    }
}

7.3 编解码器

编解码器就是指的结合了编码和解码功能的程序,这些类都实现了 ChannelnboundHandler 接口和 ChannelOutboundHandler 接口,编解码的抽象类主要有两种:

  • 解码从字节到消息的编解码 (ByteToMesageCodec)。
  • 解码从消息到消息的编解码(MessageToMessageCodec)。

7.3.1 ByteToMesageCodec 抽象类

用于将字节实时编码/解码为消息

7.3.2 MessageToMessageCodec 抽象类

用于将字节实时编码/解码为消息

7.3.3 ChannelDuplexHandler 类

观察ByteToMessageCodec 和 MessageToMessageCodec 的源码,发现它们都是继承自ChannelDuplexHandler 类。ChannelDuplexHandler 类是 ChamnelHandler 的一个实现,表示ChannelInboundHander 和 ChannelOutboundHandler 的组合。如果 ChannelHandler 的实现需要拦截操作及状态更新,则这个 ChannelDuplexHandler 类会是一个很好的起点。

CombinedChannelDuplexHandler类
CombinedChannelDuplexHandler 类是ChannelDuplexHandler 子类,用于将 ChannelInboundHander 和 ChannelOutboundHandler  组合到一个CHannelHandler 中。

7.3.4 自定义编解码器

使用上面的 CombinedChannelDuplexHandler 类来组合自定义编解码器。
(1)实现编码器和解码器:
这里和上面的是相同的类,抽取出来一起共用,代码可以参考上面,结构如下:

netty

(2)自定义编解码器:

public class MyCodec extends CombinedChannelDuplexHandler<MyDecoder, MyEncoder> {
    public MyCodec() {
        super(new MyDecoder(), new MyEncoder());
    }
}

(3)分别修改MyServer 和 MyClient 类:

b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    // 添加自己的编解码器
                    ch.pipeline().addLast("codec", new MyCodec());
                    ch.pipeline().addLast(new MyClientHandler());

                }
            });

7.4 序列化数据

7.4.1 Java 原生序列化

使用Java 原生序列化的好处是不必引入第三方包,而是可以直接使用 ObjectOutputSteam和ObjectInputSteam 接口即可实现与远程节点的交互,并且在Java平台上有较好的兼容性。Netty支持 Java 原生序列化,为此,Netty 提供了以下序列化类。
CompatibleObjectDecoder:和使用 Java序列化的非基于Netty 的远程节点进行互接作的解码器;
CompatibleObjectEecoder:和使用Java序列化的非基于Netty 的远程节点进行互操作的编码器;
ObjectDecoder:构建于 Java 序列化之上的使用自定义的序列化来解码的解码器;当没有其他的外部依赖时,它提供了速度上的改进。否则其他的序列化实现更加可取;
ObjectEecoder:构建于 Java序列化之上的使用自定义的序列化来编码的编码器;当没有其他的外部依赖时,它提供了速度上的改进。否则其他的序列化实现更加可取。

7.4.2 JBoss Marshalling 序列化

Netty 为 Boss Marshalling 提供了以下支持:

  • CompatibleMarshallingDecoder
  • CompatibleMarshallingEncoder
  • MarshallingDecoder
  • MarshallingEncoder

其中 CompatibleMarshallingDecoder 和 CompatibleMarshallingEncoder与只使用Java 原l生序列他的远程节点兼容,而以MarshallingDecoder和MarshallingEncoder 适用于使用JBoss Marshalling 的节点。

7.4.3 Protocol Buffers 序列化

Protocol Buffers  是一种由 Google 公司开发的、现在已经开源的数据交换格式。Protocol Buffers  以一种紧凑而高效的方式对结构化的数据进行编码及解码。它具有许多的编程语言绑定,使得它很适合跨语言的项目
以下是 Netty 支持Protocol Buffers  所提供的 ChannelHandler 实现。
ProtobufDecoder:使用 Protocol Buftfers 对消息进行解码。
ProtabufEncoder:使用 Protocol Buffers 对消息进行编码。
ProtobufVarint32FrameDecoder:根据消息中的 Protocol Buffers 的“Base 128 Varints”
整型长度字段值动态地分割所接收到的 ByteBuf。
ProtobufVarint32LengthFieldPrepender :在 ByteBuf 前追加一个 Protocol Buffers 的“Base 128 Varints”整型的长度字段值。

7.4.4 基于Netty 的对象序列化

例子使用ObjectDecoder 和 ObjectEecoder
(1)定义序列化对象

public class SerializationBean implements Serializable {

	private static final long serialVersionUID = 3235432002462705915L;
	private int age;
	private String name;

	public int getAge() {
		return age;
	}

	public void setAge(int age) {
		this.age = age;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	@Override
	public String toString() {
		return "SerializationBean [age=" + age + ", name=" + name + "]";
	}
}

(2)定义服务器处理器

public class SerializationServerHandler extends SimpleChannelInboundHandler<Object> {

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, Object obj) throws Exception {
		if (obj instanceof SerializationBean) {
			SerializationBean user = (SerializationBean) obj;
			ctx.writeAndFlush(user);
			System.out.println("Client -> Server: " + user);
		}
	}
}

(3)定义服务器 ChannelInitializer

public class SerializationServerInitializer extends ChannelInitializer<Channel> {

	private final static int MAX_OBJECT_SIZE = 1024 * 1024;

	@Override
	protected void initChannel(Channel ch) throws Exception {
		ChannelPipeline pipeline = ch.pipeline();
		pipeline.addLast(new ObjectDecoder(MAX_OBJECT_SIZE,
				ClassResolvers.weakCachingConcurrentResolver(this.getClass()
						.getClassLoader())));
		pipeline.addLast(new ObjectEncoder());
		pipeline.addLast(new SerializationServerHandler());
	}
}

(4)定义服务器启动类

public final class SerializationServer {

    static final int PORT = 8082;

    public static void main(String[] args) throws Exception {

        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .childOption(ChannelOption.SO_KEEPALIVE, true)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new SerializationServerInitializer());

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

(4)定义客户端处理器

public class SerializationClientHandler extends
		SimpleChannelInboundHandler<Object> {

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, Object obj)
			throws Exception {
		if (obj instanceof SerializationBean) {
			SerializationBean user = (SerializationBean) obj;
			System.out.println("Server -> Client: " + user);
		}
	}
}

(6)定义客户端 ChannelInitializer

public class SerializationClientInitializer extends
		ChannelInitializer<Channel> {
 
	@Override
	protected void initChannel(Channel ch) throws Exception {
		ChannelPipeline pipeline = ch.pipeline();
		pipeline.addLast(new ObjectDecoder(MAX_OBJECT_SIZE,
				ClassResolvers.weakCachingConcurrentResolver(this.getClass()
						.getClassLoader())));
		pipeline.addLast(new ObjectEncoder());
		pipeline.addLast(new SerializationClientHandler());
	}
}

(7)定义客户端 启动类

public class SerializationClient {

    public static void main(String[] args) throws Exception{
        new SerializationClient("localhost", 8082).run();
    }

    private final String host;
    private final int port;

    public SerializationClient(String host, int port){
        this.host = host;
        this.port = port;
    }

    public void run() throws Exception{
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap  = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
		            .handler(new SerializationClientInitializer());
            
            Channel channel = bootstrap.connect(host, port).sync().channel();

            SerializationBean user = new SerializationBean();

            for (int i = 0; i < 10; i++) {
                user = new SerializationBean();
                user.setAge(i);
                user.setName("waylau");
                channel.write(user);
            }
            channel.flush();

            // 等待连接关闭
            channel.closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }
}

7.4.5 基于Jackson的Json 序列化

(1)定义序列化对象

public class JacksonBean {

	private int age;
	private String name;
	private List<String> sons;
	private Map<String, String> addrs;

	public int getAge() {
		return age;
	}

	public void setAge(int age) {
		this.age = age;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public List<String> getSons() {
		return sons;
	}

	public void setSons(List<String> sons) {
		this.sons = sons;
	}

	public Map<String, String> getAddrs() {
		return addrs;
	}

	public void setAddrs(Map<String, String> addrs) {
		this.addrs = addrs;
	}
}

(2)定义ObjectMapper 工具类

public class JacksonMapper {

	private static final ObjectMapper MAPPER = new ObjectMapper();

	public static ObjectMapper getInstance() {
		return MAPPER;
	}

}

(3)定义解码器:用于将字节转化为对象

public class JacksonDecoder<T> extends ByteToMessageDecoder {

    private final Class<T> clazz;

    public JacksonDecoder(Class<T> clazz) {
        this.clazz = clazz;
	}

	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf in,
			List<Object> out) throws Exception {
        InputStream byteBufInputStream = new ByteBufInputStream(in);
        ObjectMapper mapper = JacksonMapper.getInstance();
        out.add(mapper.readValue(byteBufInputStream, clazz));
	}

}

(4)定义编码器:用于将对象转换为字符

public class JacksonEncoder extends MessageToByteEncoder<Object> {

	@Override
	protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) 
			throws Exception {

		ObjectMapper mapper = JacksonMapper.getInstance();
		byte[] body = mapper.writeValueAsBytes(msg); // 将对象转换为byte
		out.writeBytes(body); // 消息体中包含我们要发送的数据
	}

}

(5)定义服务器处理器

public class JacksonServerHandler extends SimpleChannelInboundHandler<Object> {

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, Object obj) throws Exception {
		String jsonString = "";
		if (obj instanceof JacksonBean) {
			JacksonBean user = (JacksonBean) obj;
			ctx.writeAndFlush(user);
			jsonString = JacksonMapper.getInstance().writeValueAsString(user); // 对象转为json字符串
			System.out.println("Client -> Server: " + jsonString);
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		Channel incoming = ctx.channel();
		System.out.println("SimpleChatClient:" + incoming.remoteAddress() + "异常");
		// 当出现异常就关闭连接
		cause.printStackTrace();
		ctx.close();
	}
}

(6)定义服务器 ChannelInitializer

public class JacksonServerInitializer extends ChannelInitializer<Channel> {

	@Override
	protected void initChannel(Channel ch) throws Exception {
		ChannelPipeline pipeline = ch.pipeline();
		pipeline.addLast(new JacksonDecoder<JacksonBean>(JacksonBean.class));
		pipeline.addLast(new JacksonEncoder());
		pipeline.addLast(new JacksonServerHandler());
	}
}

(7)定义服务器 启动类

public final class JacksonServer {

    static final int PORT = 8082;

    public static void main(String[] args) throws Exception {

        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .childOption(ChannelOption.SO_KEEPALIVE, true)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new JacksonServerInitializer());

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

(8)定义客户端处理器

public class JacksonClientHandler extends
		SimpleChannelInboundHandler<Object> {

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, Object obj)
			throws Exception {
		String jsonString = "";
		if (obj instanceof JacksonBean) {
			JacksonBean user = (JacksonBean) obj;
			jsonString = JacksonMapper.getInstance().writeValueAsString(user); // 对象转为json字符串
			System.out.println("Server -> Client: " + jsonString);
		}
	}

}

(9)定义客户端 ChannelInitializer

public class JacksonClientInitializer extends
		ChannelInitializer<Channel> {
 
	@Override
	protected void initChannel(Channel ch) throws Exception {
		ChannelPipeline pipeline = ch.pipeline();
		pipeline.addLast(new JacksonDecoder<JacksonBean>(JacksonBean.class));
		pipeline.addLast(new JacksonEncoder());
		pipeline.addLast(new JacksonClientHandler());
	}
}

(10)定义客户端 启动类

public class JacksonClient {

    public static void main(String[] args) throws Exception{
        new JacksonClient("localhost", 8082).run();
    }

    private final String host;
    private final int port;

    public JacksonClient(String host, int port){
        this.host = host;
        this.port = port;
    }

    public void run() throws Exception{
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap  = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
		            .handler(new JacksonClientInitializer());
            
            Channel channel = bootstrap.connect(host, port).sync().channel();

            // 发送对象
			JacksonBean user = new JacksonBean();
			user.setAge(27);
			user.setName("waylau");
			List<String> sons = new ArrayList<String>();
			for (int i = 0;i <10; i++) {
				sons.add("Lucy"+i);
				sons.add("Lily"+i);
			}

			user.setSons(sons);
			Map<String, String> addrs = new HashMap<String, String>();
			for (int i = 0;i <10; i++) {
				addrs.put("001"+i, "18998366112");
				addrs.put("002"+i, "15014965012");
			}

			user.setAddrs(addrs);
			channel.write(user);
			channel.flush();
 
			// 等待连接关闭
			channel.closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }
}

八,Netty ChannelHandler

ChannelHandler 承担着 Netty的核心处理器功能,这里要从消息流程控制、flush行为控制、IP 地址过滤、VO事件记录超时处理、大数据流处理、数据加密、流量整形等几个方面来介绍 ChannelHandler 的具体使用场景。

8.1 ChannelHandler 介绍

ChanelHandler (管道处理器)其工作模式类似于 Java Servlet过滤器,负责对IO 事件或者IO操作进行拦截处理。采用事件的好处是,ChannelHandler 可以选择自己感兴趣的事件进行处理,也可以对不感兴趣的事件进行透传或者终止。

8.1.1 ChannelHandler 接口

基于 ChannelHandler 接口,用户可以方便实现自己的业务,比如日志记录,编解码合数据过滤等。

8.1.2 ChannelHandlerAdapter 抽象类

ChannelHandlerAdapter 对 exceptionCaught 方法做了实现,并提供了 isSharable 方法。需要注意的是,ChannelHandlerAdapter 是抽象类,用户可以自由地选择是否要覆盖 ChannelHandlerAdapter类实现。如果对某个方法感兴趣,直接覆盖掉这个方法即可,这样代码就变得简单清晰。
ChannelHandlerAdapter 抽象类提供了两个子类 ChannelinboundHanderAdapter、ChannelOutboundHanderAdapter 用于针对出站事件、入站事件进行处理。其中 ChannelHandlerAdapter 实现了 ChannelInboundHandler 接口,而.ChannelOutboundHanderAdapter实现
ChannelOutboundHandler 接口。在实际开发过程中,自定义的 ChannelHandler 多数是继承自ChannelinboundHanderAdapter 和 ChannelOutboundHanderAdapter 类或者是这两个类的子类。例如,在前面章节中所涉及編解码器 ByteToMessageDecoder、MessageToMessageDecoderMessageToByteEncoder、MssageToMessageEncoder 等,就是这两个类的子类。

8.2 消息流程控制

Netty 将ChannelHandler 类型分为两类:ChannelInboundHandler 和 ChannelOutboundHandler,分别用于处理入站事件和出站事件。

8.2.1 处理 OP_READ 事件

ChannelInboundHandler 用于处理 OP_READ 事件。

netty

8.2.2 处理 OP_WRITE 事件

ChannelOutboundHandler 用于处理 OP_WRITE 事件。

netty

8.2.3 flush 行为控制

flush 操作负责将ByteBuf 消息写入 SocketChanel 中并发送给对方。代码示例:

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + "->Server: " + msg.toString());
        ctx.write(msg);
        ctx.flush();
    }
}

在上述示例中,先将数据通过 write 方法写入 ChannelHandlerContext,然后调用 fiush头行发送。当然,Netty 也提供了 writeAndFlush 方法,用于将这两个方法合二为一。那么对么发送数据需要经过两个步骤呢?
write 和 flush 两者作用概括如下:

  • write:将需要写的 ByteBuff 存储到 ChannelOutboundBuffer 中;
  • flush:从ChannelOutboundBuffer中将需要发送的数据读出来,并通过Channel发送出去接下来重点看 ChannelOutboundBuffer 的实现方式。

8.2.4 ChannelOutboundBuffer 类

ChannelOutboundBuffer 类主要用于存储其待处理的出站写请求的内部数据。当Netty 调用 write 时数据不会真正地去发送而是写入到 ChamelOutboundBuffer 缓存队列,直到用 fush 方法 Netty 才会从ChamnelOutboundBurffer 取数据发送。详细注解在源码文件里面。

8.3 I/O 事件记录

日志记录,能找到事情发生的时点以及故障原因。

8.3.1 LoggingHanler 类

LoggingHanler 继承自 ChannelDuplexHandler,因此LoggingHanler 可以像其他ChannelHandler 一样,添加到ChannelPipeline 中。

8.3.2 IP 地址过滤

在现实项目中经常有这样的需求,需要的对某些特定的IP做过滤,以实现IP白名单,IP黑名单等功能。Netty 本身就已经内嵌了相关的实现机制。相关的功能其中在 io.netty.handler.ipfilter 包下。

RuleBasedFilter 类

RuleBasedFilter 类允许跟进传递给其构造函数的IpFilterRules 来筛选Channel,如果没有给出规则,则默认接受所有连接。

UniqueIpFilter 类

此类允许确保每个IP地址在任何时候都最多有一个通道连接到服务器。

8.3.3 超时处理

超时检测:

  • Netty 的超时类型 IdleState 主要分为以下 3类。
  • ALL_IDLE:一段时间内没有数据接收或者发送。
  • READER_IDLE:一段时间内没有数据接收。
  • WRITER_IDLE:一段时间内没有数据发送。
  • 针对上面 3 类超时异常,Netty 提供了 3 类 ChannelHandler 来进行检测。
  • IdleStateHandler :当 Channel 一段时间未执行读取、写入或两者都未执行时,触发ldeStateEvent 事件。
  • ReadTimeoutHandler:在一定时间内未读取任何数据时,引发 ReadTimeoutException异常。
  • WriteTimeoutHandler:当写操作在一定时间内无法完成时,引发 WritceTimeoutException异常。

IdleStateHandler 类

public class MyChannelInitalizer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(60, 40, 0));
        ch.pipeline().addLast("myHandler", new MyHandler());
    }

    // 处理由 IdleStateHandler 触发的 IdleStateEvent
    private class MyHandler extends ChannelDuplexHandler {
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent e = (IdleStateEvent) evt;
                if (e.state() == IdleState.READER_IDLE) {
                    ctx.close();
                } else if (e.state() == IdleState.WRITER_IDLE) {
                    ctx.writeAndFlush(new PingMessage());
                }
            }
        }
    }
}

在上述示例中,ldleStateHandler 设置了读超时时间是60秒,写超时时间是30秒。
MyHandler 是针对超时事件 idleStateEvent 的处理。

  • 如果 30 秒内没有出站流量(写超时)时发送 ping 消息的示例。
  • 如果60秒内没有入站流量(读超时)时,连接将关闭。

ReadTimeoutHandler 类
包含了读超时状态处理,ReadTimeoutHandler 继承自 IdleStateHandler,并在构造函数中禁用了写超时,读写超时,而且在处理超时时,只会针对READER_IDLE状态进行处理,并引发 ReadTimeoutException 异常。

public class MyChannelInitalizer2 extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast("ReadTimeoutHandler", new ReadTimeoutHandler(30));
        ch.pipeline().addLast("myHandler", new MyHandler());
    }

    // 处理由 IdleStateHandler 触发的 IdleStateEvent
    private class MyHandler extends ChannelDuplexHandler {
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            if (cause instanceof ReadTimeoutException) {
                // ..
            } else {
                super.exceptionCaught(ctx, cause);
            }
        }
    }
}

WriteTimeoutHandler 类
包含了写超时状态处理。

public class MyChannelInitalizer3 extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast("WriteTimeoutHandler", new WriteTimeoutHandler(30));
        ch.pipeline().addLast("myHandler", new MyHandler());
    }

    // 处理由 IdleStateHandler 触发的 IdleStateEvent
    private class MyHandler extends ChannelDuplexHandler {
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            if (cause instanceof WriteTimeoutException) {
                // ..
            } else {
                super.exceptionCaught(ctx, cause);
            }
        }
    }
}

8.3.4 实战:实现心跳机制

上面内容讲述了 Netty 如何检测超时,接下来价绍针对超时的解决方案-心跳机制。在程序开发中,心跌机制是非常常见的,其原理是,当连接闲置时可以发送一个心跳来维持连接。一般而言,心跳就是一段短小的通信。

1. 定义心跳处理器

public class HeartbeatServerHandler extends ChannelInboundHandlerAdapter {

    // (1)心跳内容
    private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(
            Unpooled.copiedBuffer("Heartbeat ", CharsetUtil.UTF_8));

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        // (2)判断超时类型
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            String type = "";
            if (event.state() == IdleState.READER_IDLE) {
                type = "read idle";
            } else if (event.state() == IdleState.WRITER_IDLE) {
                type = "write idle";
            } else if (event.state() == IdleState.ALL_IDLE) {
                type = "all idle";
            }
            // (3)发送心跳 发送给客户端
            ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(
                    ChannelFutureListener.CLOSE_ON_FAILURE);
            System.out.println(ctx.channel().remoteAddress() + "超时类型:" + type);
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

2. 定义ChannelInitializer

public class HeartbeatHandlerInitializer extends ChannelInitializer<Channel> {

    private static final int READ_IDEL_TIME_OUT = 4; // 读超时
    private static final int WRITE_IDEL_TIME_OUT = 5;// 写超时
    private static final int ALL_IDEL_TIME_OUT = 7; // 所有超时

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // (1)添加了一个IdleStateHandler 到 ChannelPipeline,并分别设置了读,写超时的时间。
        pipeline.addLast(new IdleStateHandler(READ_IDEL_TIME_OUT,
                WRITE_IDEL_TIME_OUT, ALL_IDEL_TIME_OUT, TimeUnit.SECONDS));
        // (2)添加了HeartbeatServerHandler,用来处理超时时,发送心跳
        pipeline.addLast(new HeartbeatServerHandler());
    }
}

3. 编写服务器

public class HeartbeatServer {

    static final int PORT = 8082;

    public static void main(String[] args) throws InterruptedException {
        // 配置服务器
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new HeartbeatHandlerInitializer());
            // 启动
            ChannelFuture f = b.bind(PORT).sync();

            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

4. 测试

  1. 先启动 HeartbeatServer
  2. 在通过操作系统的Telnet 来访问服务器

netty

netty

8.4 大数据流处理

Netty 提供了 ChunkedWriteHandler 以支持大数据流处理。

8.4.1 ChunkedWriteHandler类

ChunkedInput 接口的实现包含 ChunkedFile、ChunkedNiorile、ChunrkedNioSteam、ChunrkedSteam、HttpChunrkedInput,HttpPostRequestEncoder 和 WebSocketChunkedmput等。
在 ChunkedWriteHandler 中,主要用到以下 4 种实现:

  • ChunkedFile :用于从文件中逐块获取数据。
  • ChunkedNioFile:使用 NIO FileChannel 从文件中逐块获取数据。
  • ChunkedNioStream:用于从 ReadableByteChannel 中逐块获取数据。
  • ChunkedStream :用于从 InputStream 中逐块获取数据。

8.5 数据加密

8.5.1 Sslhandler 类

8.5.2 实战:基于SSL/TSL 的双向认证 echo服务端和客户端

  • 上下文工厂
public final class SslContextFactory {
	private static final String PROTOCOL = "TLS";

	private static SSLContext SERVER_CONTEXT;// 服务器上下文

	private static SSLContext CLIENT_CONTEXT;// 客户端上下文

	public static SSLContext getServerContext(String pkPath, String caPath, String password) {
		if (SERVER_CONTEXT != null)
			return SERVER_CONTEXT;

		SERVER_CONTEXT = getContext(pkPath, caPath, password);

		return SERVER_CONTEXT;
	}

	public static SSLContext getClientContextgetContext(String pkPath, String caPath, String password) {
		if (CLIENT_CONTEXT != null)
			return CLIENT_CONTEXT;

		CLIENT_CONTEXT = getContext(pkPath, caPath, password);

		return CLIENT_CONTEXT;
	}

	public static SSLContext getContext(String pkPath, String caPath, String password) {
		if (CLIENT_CONTEXT != null)
			return CLIENT_CONTEXT;

		InputStream in = null;
		InputStream tIN = null;
		try {
			KeyManagerFactory kmf = null;
			if (pkPath != null) {
				KeyStore ks = KeyStore.getInstance("JKS");
				in = new FileInputStream(pkPath);
				ks.load(in, password.toCharArray());
				kmf = KeyManagerFactory.getInstance("SunX509");
				kmf.init(ks, password.toCharArray());
			}

			TrustManagerFactory tf = null;
			if (caPath != null) {
				KeyStore tks = KeyStore.getInstance("JKS");
				tIN = new FileInputStream(caPath);
				tks.load(tIN, password.toCharArray());
				tf = TrustManagerFactory.getInstance("SunX509");
				tf.init(tks);
			}

			CLIENT_CONTEXT = SSLContext.getInstance(PROTOCOL);
			CLIENT_CONTEXT.init(kmf.getKeyManagers(), tf.getTrustManagers(), null);

		} catch (Exception e) {
			throw new Error("Failed to initialize the client-side SSLContext");
		} finally {
			if (in != null) {
				try {
					in.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
				in = null;
			}

			if (tIN != null) {
				try {
					tIN.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
				tIN = null;
			}
		}

		return CLIENT_CONTEXT;
	}

}
  • 服务端初始化
public class EchoServerChannelInitializer extends ChannelInitializer<SocketChannel> {

	@Override
	protected void initChannel(SocketChannel ch) throws Exception {
		
		// 先添加SslHandler
		String pkPath = System.getProperties().getProperty("user.dir") + "/src/main/resources/ssl/nettyServer.jks";
		String password = "defaultPass";
		SSLEngine engine = SslContextFactory.getServerContext(pkPath, pkPath, password).createSSLEngine();
		engine.setUseClientMode(false); // 设置为服务器模式
		engine.setNeedClientAuth(true); // 需要客户端认证
		ch.pipeline().addLast(new SslHandler(engine));

		// 再添加其他ChannelHandler
		ch.pipeline().addLast(new EchoServerHandler());
	}

}
  • 客户端初始化
public class EchoClientChannelInitializer extends ChannelInitializer<SocketChannel> {

	@Override
	protected void initChannel(SocketChannel ch) throws Exception {

		// 先添加SslHandler
		String pkPath = System.getProperties().getProperty("user.dir") 
				+ "/src/main/resources/ssl/nettyClient.jks";
		String password = "defaultPass";
		SSLEngine engine = SslContextFactory.getServerContext(pkPath, pkPath, password).createSSLEngine();
		engine.setUseClientMode(true); // 设置为服务器模式
		engine.setNeedClientAuth(true); // 需要客户端认证
		ch.pipeline().addLast(new SslHandler(engine));

		// 再添加其他ChannelHandler
		ch.pipeline().addLast(new EchoClientHandler());
	}
}

8.6 流量整型

网络编程中,如果发送方把数据发送的过快,接收方可能会来不及接收,这就会造成数据的丢失。所谓流量控制就是让发送方的发送速率不要太快,要让接收方来得及接收。利用滑动窗口机制可以很方便地在 TCP 连接上实现对发送方的流量控制。
流量整形(Traiic Shaping)是流量控制的一种机制。流量整形是一种主动调整流量输出速在率的措施,其实现原理是对流量监控中需要丢弃的报文进行级存(通常是将它们放入缓冲区或队列内)。当报文的发送速度过快时,报文首先在缓冲区进行缓存。再在流量计量算的的制下“均匀”地发送这些被缓冲的报文。当缓冲区满时,会告知客户编服务器将不再接收消息(TCP 的流量窗口控制)。流量整形可能会增加延迟。
Netty 提供了 AbstractTraffcShapingHandler 抽象类来对流量整形以便对带宽进行限制。Netty 也提供了 TraffcCounter(用于统计限速流量的读写字节数量) 类来进行流量的统计。
根据流量整形的限制范围,AbstractTraffcShapingHandler 提供了以下3个子类。

  • 管道流量整形:ChannelTrafficShapingHandler。
  • 全局管道流量整形:GlobalChannelTrafficShapingHiandler。
  • 全局流量整形:GlobalTrafficShapingHandler。

九,Netty 常用的网络协议

Netty 支持丰富的网络协议,如 TCP、UDP、HTTP、(TTP/2、WebSocket、SSLTLS、FTP、SMTP、二进制和基于文本的协议等,这些协议实现开箱即用。因此,Netty 开发者能够在不失灵活性的前提下来实现开发的简易性、高性能和稳定性。

9.1 Netty 对于Http 的支持

HttpRequestDecoder 类

HtpRequestDecoder 类用于将 ByteBuf解码为 HtpRequest 和 HttpContent。
HttpRequestDecoder 继承自 HttpObjectDecoder。HttpObjectDecoder是抽象类,继承自ByteToMessageDecoder 抽象类。HtpRequestDecoder 的解码功能主要由 HttpObjectDecode 的decode 方法提供。

HttpReponseEecoder 类

HttpResponseEncoder 类用于将 HttpResponse 和 HttpContent 编码为 ByteBuf。
HtpResponseEncoder继承自 HttpObjectEecoder抽象类,而 HttpObjectEecoder继承目MesageToMessageEncoder抽象类。HttpResponseEncoder的编码功能主要由 HttpObjectEecoder的 encode 方法提供。

HttpServerCodec 类

HttpServerCodec 类实现了编解码功能。既包括HtpRequestDecoder 解码功能,又包含了HttpResponseEncoder编码功能。

HttpContent 接口

实现类DefaultHttpContent 类,主要是为了包装解码前的ByteBuf 数据。

HttpMessage 接口

HttpMessage 定义了HTTP 消息的接口,为HttpRequest 和 HttpResponse 提供公共属性。

HttpObjectAggregator 类

HttpObjectAggregator 聚合了多种类型的数据,包括 HttpObject,HttpMessage和FullHttpMessage。

实战:基于HTTP 的 Web 服务器

实现ChannelHandler

public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
        this.readRequest(msg);

        String sendMsg;
        String uri = msg.uri();

        switch (uri) {
            case "/":
                sendMsg = "<h3>Netty HTTP Server</h3><p>Welcome to <a href=\"https://waylau.com\">waylau.com</a>!</p>";
                break;
            case "/hi":
                sendMsg = "<h3>Netty HTTP Server</h3><p>Hello Word!</p>";
                break;
            case "/love":
                sendMsg = "<h3>Netty HTTP Server</h3><p>I Love You!</p>";
                break;
            default:
                sendMsg = "<h3>Netty HTTP Server</h3><p>I was lost!</p>";
                break;
        }
        this.writeResponse(ctx, sendMsg);
    }

    private void writeResponse(ChannelHandlerContext ctx, String msg) {
        ByteBuf bf = Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8);
        FullHttpResponse res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, bf);
        res.headers().set(HttpHeaderNames.CONTENT_LENGTH, msg.length());
        ctx.writeAndFlush(res).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
    }

    private void readRequest(FullHttpRequest msg) {
        System.out.println("======请求行======");
        System.out.println(msg.method() + " " + msg.uri() + " " + msg.protocolVersion());

        System.out.println("======请求头======");
        for (String name : msg.headers().names()) {
            System.out.println(name + ": " + msg.headers().get(name));

        }

        System.out.println("======消息体======");
        System.out.println(msg.content().toString(CharsetUtil.UTF_8));
    }
}

实现ChannelInitializer

public class HttpServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast("codec", new HttpServerCodec());
        ch.pipeline().addLast("aggregator", new HttpObjectAggregator(1048576));
        ch.pipeline().addLast("serverHandler", new HttpServerHandler());
    }
}

实现服务器启动程序

public class HttpServer {

    public static int DEFAULT_PORT = 8080;

    public static void main(String[] args) throws Exception {
        int port;

        try {
            port = Integer.parseInt(args[0]);
        } catch (RuntimeException ex) {
            port = DEFAULT_PORT;
        }

        // 多线程事件循环器
        EventLoopGroup bossGroup = new NioEventLoopGroup(1); // boss
        EventLoopGroup workerGroup = new NioEventLoopGroup(); // worker

        try {
            // 启动NIO服务的引导程序类
            ServerBootstrap b = new ServerBootstrap();

            b.group(bossGroup, workerGroup) // 设置EventLoopGroup
                    .channel(NioServerSocketChannel.class) // 指明新的Channel的类型
                    .childHandler(new HttpServerChannelInitializer()) // 指定ChannelHandler
                    .option(ChannelOption.SO_BACKLOG, 128) // 设置的ServerChannel的一些选项
                    .childOption(ChannelOption.SO_KEEPALIVE, true); // 设置的ServerChannel的子Channel的选项

            // 绑定端口,开始接收进来的连接
            ChannelFuture f = b.bind(port).sync();

            System.out.println("HttpServer已启动,端口:" + port);

            // 等待服务器 socket 关闭 。
            // 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
            f.channel().closeFuture().sync();
        } finally {

            // 优雅的关闭
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

测试

netty

9.1 了解 HTTP/2

HTTP/2 通信分解为二进制编码帧的交换,这些帧对应着特定数据流中的消息。所有这些都在一个TCP 连接内复用。这是HTTP/2 所有其他功能性能优化的基础。
特点:

  • 请求与响应复用
  • 头部压缩算法 HPack
  • 协商机制

9.2 Netty 对于Http/2 的支持

Http/2 核心处理类主要是负责解码与编码,帧的解析与处理,Http/2 请求透处理,streamId的管理和Http/2 链接管理等。

Http2ConnectionHandler 类

Http2ConnectionEncoder 接口

Http2FrameReader 类

Http/1.1 与 Http/2 协商

ApplicationProtocolNegotiationHandler 抽象类 用于协议的协商。

HttpServerUpgradeHandler 类

Http/2 帧处理

Http2FrameCoderc 类

Netty 对 Http/2 的9中帧进行了一次封装,这样可以让下游的ChannelHandler 处理

实战:基于Http/2 的 Web服务器和客户端

具体代码请参考Idea Jmall jinggo.wang.netty.userGuide.chap09.http2

9.4 Netty 的 WebSocket

9.4.1 了解WebSocket

9.4.2 Netty 对于WebSocket 的支持

WebSocketFrame 类

对 WebSocket 帧的处理

WebSockeServerProtocolHanler 类

WebSockeServerProtocolHanler 类承担着 Websocket 服务器的处理器职责。

  • 处理 Websocket 握手及处理控制帧(close,Ping,Pong等)。
  • 将文本和二进制数据帧传递到管道中的下一个处理程序进行处理。

9.4.3 基于 WebSocket 的聊天室

应用描述:
WebSocket 是通过“Upgrade handshake”(升级握手)从标准的 HTTP 或 HTTPS 协议转为WebSocket。因此,使用 WebSocket 的应用程序将始终以 HTTP 开始,然后进行升级。在什么时候发生这种情况取决于具体的应用,它可以是在应用启动时,或当一个特定的 URL被请求时。在本例中,当 URL 请求以“/ws”结束时,才会将 HTTP 升级为 WebSocket 协议。否则,服务器将使用基本的 HTTP。一旦升级连接成功后,将使用 WebSocket 传输所有数据。整个服务器的逻辑如下。

  • 客户端(用户)连接到服务器并加入聊天。
  • HTTP 请求页面或 WebSocket 升级握手。
  • 服务器处理所有客户端/用户请求。
  • 响应 URI“1”的请求,转到默认 HTML 页面。
  • 如果访问的是 URI“/ws”,处理 WebSocket 升级握手。
  • 升级握手完成后,通过 WebSocket 发送聊天消息。

实现服务器

(1)处理Http 请求

public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {  // (1)用于处理 FullHttpRequest 信息

    private final String wsUri;
    private static final File INDEX;
    private static final String staFile = "/Users/wangyunjing/Projects/Fun/jmall/netty/netty-demo/src/main/resources/WebsocketChatClient.html";

    static {
        URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();
        try {
            String path = location.toURI() + "WebsocketChatClient.html";
            path = !path.contains("file:") ? path : path.substring(5);
            INDEX = new File(staFile);
        } catch (URISyntaxException e) {
            throw new IllegalStateException("Unable to locate WebsocketChatClient.html", e);
        }
    }

    public HttpRequestHandler(String wsUri) {
        this.wsUri = wsUri;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        // (2) 如果请求是WebSocket升级,递增引用计数器(保留)并且将它传递给在ChannelPipeline 中的下一个 ChannelInboundHandler
        if (wsUri.equalsIgnoreCase(request.uri())) {
            ctx.fireChannelRead(request.retain());
        } else {
            if (HttpUtil.is100ContinueExpected(request)) {
                send100Continue(ctx); // (3)处理复合Http/1.1 的 "100 continue" 请求
            }
            // (4)读取默认的WebsocketChatClient.html 页面
            RandomAccessFile file = new RandomAccessFile(INDEX, "r");
            HttpResponse response = new DefaultHttpResponse(request.protocolVersion(), HttpResponseStatus.OK);
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");
            boolean keepAlive = HttpUtil.isKeepAlive(request);
            if (keepAlive) { // (5)判断 keepAlive 是否在请求头里面
                response.headers().set(HttpHeaderNames.CONTENT_LENGTH, file.length());
                response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
            }
            ctx.write(response); // (6)写HttpResponse 到客户端
            // (7)写index.html 到客户端,判断SsHandler是否在 ChannelPipeline中
            // 决定是使用DefaultFileRegion 还是 ChunkedNioFile
            if (ctx.pipeline().get(SslHandler.class) == null) {
                ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
            } else {
                ctx.write(new ChunkedNioFile(file.getChannel()));
            }
            // (8)写并刷新 LastHttpContent 到客户端,标记响应完成
            ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
            // (9)如果 keepAlive 没有要求,当写完成时,关闭Channel
            if (!keepAlive) {
                future.addListener(ChannelFutureListener.CLOSE);
            }
            file.close();
        }
    }

    private void send100Continue(ChannelHandlerContext ctx) {
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
        ctx.writeAndFlush(response);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("Client:" + incoming.remoteAddress() + "异常");
        // 当出现异常就关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}

(2)处理WebSocket 帧

public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    // (4) 每当从服务端读到客户端写入信息时,将信息转发给其他客户Channel
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            if (channel != incoming) {
                channel.writeAndFlush(new TextWebSocketFrame("[" + incoming.remoteAddress() + "]" + msg.text()));
            } else {
                channel.writeAndFlush(new TextWebSocketFrame("[you]" + msg.text()));
            }
        }
    }

    // (2) 每当从服务端收到新的客户端连接时,客户端的Channel 存入 ChannelGroup 列表中
    // 并通知列表中的其他客户端 Channel
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        // 广播
        channels.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 加入"));
        channels.add(incoming);
        System.out.println("Client:" + incoming.remoteAddress() + "加入");
    }

    // (3) 每当从服务器端收到客户端断开时,客户端的Channel 自动从 ChannelGroup 列表中移除,并通知列表中的其他客户端
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        // 广播
        channels.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 离开"));
        System.out.println("Client:" + incoming.remoteAddress() + "离开");
    }

    // (5) 覆盖了channelActive() 事件处理方法。服务端监听到客户端活动
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("Client:" + incoming.remoteAddress() + "在线");
    }

    // (6) 覆盖了channelInactive() 事件处理方法。服务端监听到客户端不活动。
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("Client:" + incoming.remoteAddress() + "掉线");
    }

    // (7) 当出现了 Throwable 对象才会被调用,即当 netty 出现IO 错误或者处理器在处理事件过程中抛出异常时。
    // 在大部分情况下,捕获的异常应该被最下来并且把关联的 Channel 关闭。然而这个方法的处理方式会在遇到不同异常的情况下
    // 有不同实现,如可在关闭连接之前发送一个错误码的响应消息。
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("Client:" + incoming.remoteAddress() + "异常");
        // 当出现异常就关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}

(3)实现ChannelInitializer

public class WebSocketChatServerInitializer extends ChannelInitializer<SocketChannel> {

    // 添加ChannelHandler 到 ChannelPipeline
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new HttpObjectAggregator(64*1024));
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpRequestHandler("/ws"));
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        pipeline.addLast(new TextWebSocketFrameHandler());
    }
}

(4)实现服务器主程序启动类

public class WebSocketChatServer {

    private int port;

    public WebSocketChatServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) // (3)
                    .childHandler(new WebSocketChatServerInitializer())  //(4)
                    .option(ChannelOption.SO_BACKLOG, 128)          // (5)
                    .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

            System.out.println("WebsocketChatServer 启动了" + port);

            // 绑定端口,开始接收进来的连接
            ChannelFuture f = b.bind(port).sync(); // (7)

            // 等待服务器 socket 关闭 。
            // 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
            f.channel().closeFuture().sync();

        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();

            System.out.println("WebsocketChatServer 关闭了");
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new WebSocketChatServer(port).run();

    }
}

实现客户端�

<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket Chat</title>
</head>
<body>
	<script type="text/javascript">
		var socket;
		if (!window.WebSocket) {
			window.WebSocket = window.MozWebSocket;
		}
		if (window.WebSocket) {
			socket = new WebSocket("ws://localhost:8080/ws");
			socket.onmessage = function(event) {
				var ta = document.getElementById('responseText');
				ta.value = ta.value + '\n' + event.data
			};
			socket.onopen = function(event) {
				var ta = document.getElementById('responseText');
				ta.value = "连接开启!";
			};
			socket.onclose = function(event) {
				var ta = document.getElementById('responseText');
				ta.value = ta.value + "连接被关闭";
			};
		} else {
			alert("你的浏览器不支持 WebSocket!");
		}

		function send(message) {
			if (!window.WebSocket) {
				return;
			}
			if (socket.readyState == WebSocket.OPEN) {
				socket.send(message);
			} else {
				alert("连接没有开启.");
			}
		}
	</script>
	<form onsubmit="return false;">
		<h3>WebSocket 聊天室:</h3>
		<textarea id="responseText" style="width: 500px; height: 300px;"></textarea>
		<br> 
		<input type="text" name="message"  style="width: 300px" value="Welcome to waylau.com">
		<input type="button" value="发送消息" onclick="send(this.form.message.value)">
		<input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空聊天记录">
	</form>
	<br> 
	<br> 
	<a href="https://waylau.com/" >更多例子请访问  waylau.com</a>
</body>
</html>

十,Netty 测试

如何使用EmbeddedChannel 类来为ChannelHandler 的实现创建单元测试用例。
对Netty 的 ChamnelHandler 进行单元测试,Nety 提供了 EmbeddedChannel 嵌入式传输通道;来完成这一过程,主要使用该通道来测试数据的入站、出站过程是否合法。EmbeddedChannel 类提供以下常用的 API。

  • writeInbond(Object... msg):将入站消息写到 EmbeddedChannel 中。如果可以通过readInbound()方法从 EmbeddedChannel 中读取数据,则返回true。
  • readInbound():从EmbeddedChannel中读取一个人站消息。任何返回的教据都穿越了整个 ChannelPipeline。如果没有任何可供读取的,则返回 ull。
  • writeUutbound(Object…msg):将出站消息写到 EmbeddedChannel中。如果现在可(通过 readOutbound()方法从 EmbeddedChannel 中读取到数据,则返回true。
  • readOutbound():从 EmbeddedChannel 中读取一个出站消息。任何返回的数据都穿越了整ChannelPipeline。如果没有任何可供读取的,则返回nul。
  • finish():将EmbeddedChannel 标记为完成,并且如果有可被读取的入站数据或者出站数据,则返回true。这个方法还将会调用 EmbeddedChannel 中的 close()方法。

图 10.1 展示了 EmheddedChannel 的处理流程。

netty

10.1 实战:EmbeddedChannel 测试入站信息

使用 FixedLengthFrameDecoder 类

Netty提供了内置的固定长度解玛器FixedLengthFrameDecoder: FixedLengthFrameDecoder能够按照指完的长度对消息进行自动解码,开发者不需要考虑TCP 的粘包和拆包问题,非常实用。无论一次接收到多少数据报,它都会按照构造器中设置的固定长度进行解码。如果是半包消息, FixedLengthFrameDecoder 会缓存半包消息并等待下个包到达之后进行拼包合并,直到读取一个完整的消息包。
比如下面4 个分段的数据包:
A,BC,DEFG,HI
那么,FixedLengthFrameDecoder(3) 会将他们解码为以下3哥具有固定长度的数据包。
ABC,DEF,GHI

测试 FixedLengthFrameDecoder 类

public class FixedLengthFrameDecoderTest {

    @Test
    public void testFramesDecoded() {
        ByteBuf buf = Unpooled.buffer();
        for (int i = 0; i < 9; i++) {
            buf.writeByte(i);
        }
        // 拷贝一个新对象,在新对象上修改不会影响前对象
        ByteBuf input = buf.duplicate();
        EmbeddedChannel channel = new EmbeddedChannel(new FixedLengthFrameDecoder(3));

        // 写字节
        assertTrue(channel.writeInbound(input.retain()));
        assertTrue(channel.finish());

        // 读消息
        ByteBuf read = (ByteBuf) channel.readInbound();
        assertEquals(buf.readSlice(3), read);
        read.release();
        read = (ByteBuf) channel.readInbound();
        assertEquals(buf.readSlice(3), read);
        read.release();
        read = (ByteBuf) channel.readInbound();
        assertEquals(buf.readSlice(3), read);
        read.release();
        assertNull(channel.readInbound());
        buf.release();
    }

    @Test
    public void testFramesDecoded2() {
        ByteBuf buf = Unpooled.buffer();
        for (int i = 0; i < 9; i++) {
            buf.writeByte(i);
        }
        ByteBuf input = buf.duplicate();
        EmbeddedChannel channel = new EmbeddedChannel(new FixedLengthFrameDecoder(3));
        assertFalse(channel.writeInbound(input.readBytes(2)));
        assertTrue(channel.writeInbound(input.readBytes(7)));
        assertTrue(channel.finish());
        ByteBuf read = (ByteBuf) channel.readInbound();
        assertEquals(buf.readSlice(3), read);
        read.release();
        read = (ByteBuf) channel.readInbound();
        assertEquals(buf.readSlice(3), read);
        read.release();
        read = (ByteBuf) channel.readInbound();
        assertEquals(buf.readSlice(3), read);
        read.release();
        assertNull(channel.readInbound());
        buf.release();
    }
}

10.2 实战:EmbeddedChannel 测试出站信息

使用 AbsIntegerEncoder 类

AbsIntegerEncoder 类是一自定义的一个编码器,用于将负值整数转换为绝对值。

public class AbsIntegerEncoder extends MessageToMessageEncoder<ByteBuf> {
    @Override
    protected void encode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        while (in.readableBytes() >= 4) {
            int value = Math.abs(in.readInt());
            out.add(value);
        }
    }
}
public class AbsIntegerEncoderTest {
    @Test
    public void testEncoded() {
        ByteBuf buf = Unpooled.buffer();
        for (int i = 1; i < 10; i++) {
            buf.writeInt(i * -1); //(1)
        }

        EmbeddedChannel channel =
                new EmbeddedChannel(new AbsIntegerEncoder()); //(2)
        assertTrue(channel.writeOutbound(buf)); //(3)
        assertTrue(channel.finish()); //(4)

        // 读字节
        for (int i = 1; i < 10; i++) {
            assertEquals(Integer.valueOf(i+""),
                    channel.readOutbound()); //(5)
        }
        assertNull(channel.readOutbound());
    }
}