Netty


Netty

Netty是对Java NIO的封装实现,通过Netty我们可以用简介的代码实现JavaNIO的数据通信,此处要结合之前的ScoketIO文章进行结合阅读,在本文中仅进行了Netty实现的代码进行了实现,同时需要了解BIO和NIO原生的代码实现需要在前文中阅读。

客户端

EventLoopGroup group = new NioEventLoopGroup(); // 创建一个线程池
try {
    Bootstrap client = new Bootstrap(); // 创建客户端处理程序
    client.group(group).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true) // 允许接收大块的返回数据
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4));
                    socketChannel.pipeline().addLast(new JSONDecoder());
                    socketChannel.pipeline().addLast(new LengthFieldPrepender(4));
                    socketChannel.pipeline().addLast(new JSONEncoder());
                    socketChannel.pipeline().addLast(new EchoClientHandler()); // 追加了处理器
                }
            });
    ChannelFuture channelFuture = client.connect(HostInfo.HOST_NAME, HostInfo.PORT).sync();
    channelFuture.channel().closeFuture().sync(); // 关闭连接
} finally {
    group.shutdownGracefully();
}

服务器端

EventLoopGroup bossGroup = new NioEventLoopGroup(10); // 创建接收线程池
EventLoopGroup workerGroup = new NioEventLoopGroup(20); // 创建工作线程池
System.out.println("服务器启动成功,监听端口为:" + HostInfo.PORT);
try {
    // 创建一个服务器端的程序类进行NIO启动,同时可以设置Channel
    ServerBootstrap serverBootstrap = new ServerBootstrap();   // 服务器端
    // 设置要使用的线程池以及当前的Channel类型
    serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class);
    // 接收到信息之后需要进行处理,于是定义子处理器
    serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536,0,4,0,4)) ;
            socketChannel.pipeline().addLast(new JSONDecoder()) ;
            socketChannel.pipeline().addLast(new LengthFieldPrepender(4)) ;
            socketChannel.pipeline().addLast(new JSONEncoder()) ;
            socketChannel.pipeline().addLast(new EchoServerHandler()); // 追加了处理器
        }
    });
    // 可以直接利用常亮进行TCP协议的相关配置
    serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
    serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
    // ChannelFuture描述的时异步回调的处理操作
    ChannelFuture future = serverBootstrap.bind(HostInfo.PORT).sync();
    future.channel().closeFuture().sync();// 等待Socket被关闭
} finally {
    workerGroup.shutdownGracefully() ;
    bossGroup.shutdownGracefully() ;
}

Pipeline 详解

  • Pipeline是Netty对数据处理流程的核心操作类,通过前文的一系列配置,最终操作的实现都需要在其中实现。socketChannel.pipeline().addLast()等一系列的方法就是添加个人的数据操作。通过Netty自定义的操作顺序,对输入输出的数据进行拆包/封装/编码/解码/自定义数据操作方法。
  • pipeline 常用流程:数据输入->拆包->解码->相关的处理操作(继承了ChannelInboundHandlerAdapter的Handler,按照定义的顺序执行)->相关的处理操作(继承了ChannelOutboundHandlerAdapter的Handler,按照定义的顺序执行)->编码->封装->数据输出。
  • pipeline是一个典型的双向链表结构,根据定义时的顺序和结构会将处理操作进行排序,有数据输入时,只执行数据输入相关的操作,数据输出时只执行数据输出的相关操作。其中数据通过ctx上下文进行传输,通过以object进行封装。
  • 其中 ctx.writeAndFlush 和ctx.channel.writeAndFlush 是数据输出的信号发送源头,区别在于前者将此时的handler作为最后的handler并把数据进行编码和封装传输出去,而后者会从tail将所有的handler执行一遍后发出去。

HTTP实现代码


public class HttpServer {
    static {
        DiskFileUpload.baseDirectory = System.getProperty("user.dir") + "/upload/" ;
    }
    public void run() throws Exception {
        // 线程池是提升服务器性能的重要技术手段,利用定长的线程池可以保证核心线程的有效数量
        // 在Netty之中线程池的实现分为两类:主线程池(接收客户端连接)、工作线程池(处理客户端连接)
        EventLoopGroup bossGroup = new NioEventLoopGroup(10); // 创建接收线程池
        EventLoopGroup workerGroup = new NioEventLoopGroup(20); // 创建工作线程池
        System.out.println("服务器启动成功,监听端口为:" + HostInfo.PORT);
        try {
            // 创建一个服务器端的程序类进行NIO启动,同时可以设置Channel
            ServerBootstrap serverBootstrap = new ServerBootstrap();   // 服务器端
            // 设置要使用的线程池以及当前的Channel类型
            serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class);
            // 接收到信息之后需要进行处理,于是定义子处理器
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new HttpResponseEncoder()) ;   // 响应编码
                    socketChannel.pipeline().addLast(new HttpRequestDecoder()) ;    // 请求解码
                    socketChannel.pipeline().addLast(new ChunkedWriteHandler()) ; // 图片传输处理器
                    socketChannel.pipeline().addLast(new HttpServerHandler()) ;
                }
            });
            // 可以直接利用常亮进行TCP协议的相关配置
            serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
            serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
            // ChannelFuture描述的时异步回调的处理操作
            ChannelFuture future = serverBootstrap.bind(HostInfo.PORT).sync();
            future.channel().closeFuture().sync();// 等待Socket被关闭
        } finally {
            workerGroup.shutdownGracefully() ;
            bossGroup.shutdownGracefully() ;
        }
    }
}

public class HttpServerHandler extends ChannelInboundHandlerAdapter {
    private HttpRequest request;
    private DefaultFullHttpResponse response ;
    private HttpSession session ;
    private ChannelHandlerContext ctx ;

    /**
     * 依据传入的标记内容进行是否向客户端Cookie中保存有SessionId数据的操作
     * @param exists
     */
    private void setSessionId(boolean exists) {
        if(exists == false) {    // 用户发送来的头信息里面不包含有SessionId内容
            String encodeCookie = ServerCookieEncoder.STRICT.encode(HttpSession.SESSIONID, HttpSessionManager.createSession()) ;
            this.response.headers().set(HttpHeaderNames.SET_COOKIE,encodeCookie) ;// 客户端保存Cookie数据
        }
    }

    /**
     * 当前所发送的请求里面是否存在有指定的 SessionID数据信息
     * @return 如果存在返回true,否则返回false
     */
    public boolean isHasSessionId() {
        String cookieStr = this.request.headers().get(HttpHeaderNames.COOKIE) ; // 获取客户端头信息发送来的Cookie数据
        if (cookieStr == null || "".equals(cookieStr)) {
            return false ;
        }
        Set<Cookie> cookieSet = ServerCookieDecoder.STRICT.decode(cookieStr);
        Iterator<Cookie> iter = cookieSet.iterator() ;
        while(iter.hasNext()) {
            Cookie cookie = iter.next() ;
            if(HttpSession.SESSIONID.equals(cookie.name())) {
                if (HttpSessionManager.isExists(cookie.value())) {
                    this.session = HttpSessionManager.getSession(cookie.value()) ;
                    return true ;
                }
            }
        }
        return false ;
    }


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        this.ctx = ctx ;
        if (msg instanceof HttpRequest) {    // 实现HTTP请求处理操作
            this.request = (HttpRequest) msg; // 获取Request对象
            System.out.println("【Netty-HTTP服务器端】uri = " + this.request.uri() + "、Method = " + this.request.method() + "、Headers = " + request.headers());
            this.handleUrl(this.request.uri());
        }
    }

    private void responseWrite(String content) {
        ByteBuf buf = Unpooled.copiedBuffer(content,CharsetUtil.UTF_8) ;
        this.response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK,buf) ;
        this.response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/html;charset=UTF-8") ; // 设置MIME类型
        this.response.headers().set(HttpHeaderNames.CONTENT_LENGTH,String.valueOf(buf.readableBytes())) ; // 设置回应数据长度
        this.setSessionId(this.isHasSessionId());
        ctx.writeAndFlush(this.response).addListener(ChannelFutureListener.CLOSE) ; // 数据回应完毕之后进行操作关闭
    }



    private void sendImage(String fileName) throws Exception {
        String filePath = DiskFileUpload.baseDirectory + fileName ;
        File sendFile = new File(filePath) ;
        HttpResponse imageResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK) ;
//        imageResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH,String.valueOf(sendFile.length())) ;
        MimetypesFileTypeMap mimeMap = new MimetypesFileTypeMap() ;
        imageResponse.headers().set(HttpHeaderNames.CONTENT_TYPE,mimeMap.getContentType(sendFile)) ;
        imageResponse.headers().set(HttpHeaderNames.CONNECTION,HttpHeaderValues.KEEP_ALIVE) ;
        this.ctx.writeAndFlush(imageResponse) ;
        this.ctx.writeAndFlush(new ChunkedFile(sendFile)) ;
        // 在多媒体信息发送完毕只后需要设置一个空的消息体,否则内容无法显示
        ChannelFuture channelFuture = this.ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT) ;
        channelFuture.addListener(ChannelFutureListener.CLOSE) ;
    }

    public void handleUrl(String uri) {
        if ("/info".equals(uri)) {
            this.info();
        } else if ("/favicon.ico".equals(uri)) {
            this.favicon();
        } else if ("/show.png".equals(uri)) {
            this.show() ;
        }
     }
    public void info() {
        String content =
                "<html>" +
                        "  <head>" +
                        "       <title>Hello Netty</title>" +
                        "   </head>" +
                        "   <body>" +
                        "       <h1>好好学习,天天向上</h1>" +
                        "       <img src='/show.png'>" +
                        "   </body>" +
                        "</html>";   // HTTP服务器可以回应的数据就是HTML代码
        this.responseWrite(content);
    }

    public void favicon() {
        try {
            this.sendImage("favicon.ico");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
     public void show() {
         try {
             this.sendImage("show.png");
         } catch (Exception e) {
             e.printStackTrace();
         }
     }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

文章作者: Fanrencli
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Fanrencli !
  目录