Netty学习——实战篇8 Handler链调用、TCP粘包和拆包

1 Handler链调用-需求

        使用自定义的编码器和解码器来说明Netty的Handler调用机制。客户端发送long类型数据到服务端;服务端发送long到客户端。

2 Handler链调用-实战

2.1  NettyServer.java

public class NettyServer {
    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new MyServerInitializer());//自定义一个初始化类
            ChannelFuture sync = serverBootstrap.bind(8000).sync();
            sync.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

2.2 MyServerInitializer.java

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //对入站的Hadler进行解码
        pipeline.addLast(new MyByteToLongDecoder2());
        //对出站的Handler进行解码
        pipeline.addLast(new MyLongToByteEncoder());
        //自定义Handler
        pipeline.addLast(new MyServerHandler());
    }
}

2.3 MyByteToLongDecoder2.java

public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("MyByteToLongDecoder2 被调用");
        out.add(in.readLong());
    }
}

 2.4 MyLongToByteEncoder.java

public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
        System.out.println("MyLongToByteEncoder 被调用");
        System.out.println("msg = " + msg);
        out.writeLong(msg);
    }
}

2.5 MyServerHandler.java

public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
        System.out.println("从客户端:"+ctx.channel().remoteAddress() + "读取到long类型数据:"+msg);
        //向客户端发送一个long类型数据
        ctx.writeAndFlush(123456L);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

2.6 NettyClient.java

public class NettyClient {
    public static void main(String[] args) {
        NioEventLoopGroup g = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(g)
                    .channel(NioSocketChannel.class)
                    .handler(new MyClientInitializer());
            ChannelFuture sync = bootstrap.connect("127.0.0.1", 8000).sync();
            sync.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            g.shutdownGracefully();
        }
    }
}

2.7 MyClientInitializer.java

public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //加入一个出站的handler 对数据进行一个编码
        pipeline.addLast(new MyLongToByteEncoder());

        //这时一个入站的解码器(入站handler )
        pipeline.addLast(new MyByteToLongDecoder2());
        //加入一个自定义的handler , 处理业务
        pipeline.addLast(new MyClientHandler());
    }
}

2.8 MyClientHandler.java

public class MyClientHandler extends SimpleChannelInboundHandler<SocketChannel> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, SocketChannel msg) throws Exception {
        System.out.println("服务器的ip=" + ctx.channel().remoteAddress());
        System.out.println("收到服务器消息=" + msg);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("MyClientHandler 发送数据");
        //ctx.writeAndFlush(Unpooled.copiedBuffer(""))
        ctx.writeAndFlush(654321L); //发送的是一个long

    }
}

3 运行结果

服务端

客户端

 4 复现TCP粘包和拆包

4.1 NettyServer.java

public class NettyServer {
    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new MyServerInitializer ());
            ChannelFuture channelFuture = serverBootstrap.bind(8000).sync();
            channelFuture.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

4.2 MyServerInitializer.java

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new NettyServerHandler());
    }
}

4.3 NettyServerHandler.java

@Slf4j
public class NettyServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
    private int count;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        byte[] bytes = new byte[msg.readableBytes()];
        msg.readBytes(bytes);
        //将bytes转换成字符串
        String result = new String(bytes, Charset.forName("utf-8"));
        log.info("服务器接收到的数据:{}",result);
        log.info("服务器接收的消息量是:{}",(++this.count));
        //服务器返回数据给客户端
        ByteBuf byteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString(),Charset.forName("utf-8"));
        ctx.writeAndFlush(byteBuf);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

4.4 NettyClient.java

public class NettyClient {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new MyClientInitializer ());
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8000).sync();
            channelFuture.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            group.shutdownGracefully();
        }
    }
}

4.5 MyClientInitializer.java

public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new MyClientHandler ());
    }
}

4.6 MyClientHandler.java

@Slf4j
public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private int count;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i = 0; i < 10; i++) {
            ByteBuf byteBuf = Unpooled.copiedBuffer("hello,server" + i, Charset.forName("UTF-8"));
            ctx.writeAndFlush(byteBuf);
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        byte[] buffer = new byte[msg.readableBytes()];
        msg.readBytes(buffer);
        String result = new String(buffer, Charset.forName("utf-8"));
        log.info("客户端接收的消息是:{}",result);
        log.info("客户端接收的消息数量是:{}",(++this.count));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

多次运行客户端。服务端运行结果如下:

5 TCP 粘包和拆包解决方案

5.1 解决思路

        使用自定义协议+编解码器来解决

        重点:要解决服务器端每次读取数据长度的问题,这个问题解决后,就不会出现服务器多读或者少读数据的问题,从而避免TCP粘包和拆包。

5.2 需求

         1、要求客户端发送5个Message对象,客户端每次发送一个Message对象。

         2、服务端每次接收一个Message对象,分5次进行解码。每次读取一个Message,就回复一个Message对象给客户端。

5.3 自定义 消息协议类:MessageProtocol.java

public class MessageProtocol {
    private int length;//长度
    private byte[] content;//内容



    public int getLength() {
        return length;
    }

    public void setLength(int length) {
        this.length = length;
    }

    public byte[] getContent() {
        return content;
    }

    public void setContent(byte[] content) {
        this.content = content;
    }
}

5.4 自定义编码器:    MyMessageEncoder.java

@Slf4j
public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
    @Override
    protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
        log.info("MyMessageEncoder 的 encode 方法被调用");
        out.writeInt(msg.getLength());
        out.writeBytes(msg.getContent());
    }
}

5.5 自定义解码器: MyMessageDecoder.java

/*
    自定义解码器,把二进制字节码转换成 MessageProtocol对象
 */
@Slf4j
public class MyMessageDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        log.info("MyMessageDecoder 的 decode方法被调用");
        int length = in.readInt();
        byte[] bytes = new byte[length];
        in.readBytes(bytes);
        //封装成MessageProtocol对象
        MessageProtocol messageProtocol = new MessageProtocol();
        messageProtocol.setLength(length);
        messageProtocol.setContent(bytes);
        //把 MessageProtocol对象放入list中
        out.add(messageProtocol);

    }
}

5.6 MyServerHandler.java

@Slf4j
public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
    //计数器
    private int count;

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
        //接收数据,并处理
        int length = msg.getLength();
        byte[] content = msg.getContent();
        log.info("服务器接收到信息如下,消息长度是:{},消息内容是{}",length,new String(content, Charset.forName("utf-8")));
        log.info("服务器接收到数据包的数量是:{}",(++this.count));
        //服务器回复消息
        String responseMessage = UUID.randomUUID().toString();
        int responseLength = responseMessage.getBytes("utf-8").length;
        byte[] responseMessageBytes = responseMessage.getBytes("utf-8");
        //构建协议包
        MessageProtocol messageProtocol = new MessageProtocol();
        messageProtocol.setLength(responseLength);
        messageProtocol.setContent(responseMessageBytes);
        ctx.writeAndFlush(messageProtocol);
    }
}

5.7 MyServerInitializer.java

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new MyMessageDecoder());
        pipeline.addLast(new MyMessageEncoder());
        pipeline.addLast(new MyServerHandler());
    }
}

5.8 MyServer.java

public class MyServer {
    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new MyServerInitializer());
            ChannelFuture channelFuture = serverBootstrap.bind(8000).sync();
            channelFuture.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

5.9 MyClientHandler.java

@Slf4j
public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {
    private int count;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //使用客户端发送10条数据
        for (int i = 0; i < 5; i++) {
            String msg = "孔乙己" + i;
            int length = msg.getBytes().length;
            byte[] bytes = msg.getBytes();
            MessageProtocol messageProtocol = new MessageProtocol();
            messageProtocol.setContent(bytes);
            messageProtocol.setLength(length);
            ctx.writeAndFlush(messageProtocol);
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
        int length = msg.getLength();
        byte[] content = msg.getContent();
        log.info("客户端接收到消息如下,长度:{},内容:{}",length,new String(content, Charset.forName("utf-8")));
        log.info("客户端接收消息数量是:{}",(++this.count));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

5.10 MyClientInitializer.java

public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new MyMessageEncoder());
        pipeline.addLast(new MyMessageDecoder());

        pipeline.addLast(new MyClientHandler());
    }
}

5.11 MyClient.java

public class MyClient {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new MyClientInitializer());
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8000).sync();
            channelFuture.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            group.shutdownGracefully();
        }
    }
}

5.12 运行结果

服务端:

客户端:

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/569307.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C语言——小知识和小细节16

一、左旋字符串 例如字符串 abcd &#xff0c;左旋一个就是 bcda &#xff0c;左旋两个就是 cdab 。 方法一&#xff1a;循环 #include <stdio.h> #include <string.h>void func(char* str, int n) {int i 0;int j 0;int len (int)strlen(str);n % len;//超出…

算法打卡day48|动态规划篇16| Leetcode 583. 两个字符串的删除操作、72. 编辑距离

算法题 Leetcode 583. 两个字符串的删除操作 题目链接:583. 两个字符串的删除操作 大佬视频讲解&#xff1a;583. 两个字符串的删除操作视频讲解 个人思路 本题和115.不同的子序列相比&#xff0c;变为了两个字符串都可以删除&#xff0c;整体思路是不变的&#xff0c;依旧…

vue 表格获取当前行索引,加颜色

vue 表格获取当前行索引&#xff0c;加颜色 <span styledisplay:inline-block;width:10px;height:10px;border-radius:50% :style"{background:color[scope.$index]}" />//定义颜色color: [#5387F7, #A794E0, #F3543C, #999999, #77D3F8, #FFA1B4, #26CEBA, #…

ElementUI RUOYI 深色适配

1. 切换按钮&#xff1a;随便找个页面放上去 页面触发逻辑如下 a. html 按钮结构&#xff08;可自定义&#xff09; <el-switchstyle"margin-top: 4px; margin-left: 8px; margin-right: 8px"v-model"isDark"inline-promptactive-icon"Moon"…

使用 Gradio 的“热重载”模式快速开发 AI 应用

在这篇文章中&#xff0c;我将展示如何利用 Gradio 的热重载模式快速构建一个功能齐全的 AI 应用。但在进入正题之前&#xff0c;让我们先了解一下什么是重载模式以及 Gradio 为什么要采用自定义的自动重载逻辑。如果你已熟悉 Gradio 并急于开始构建&#xff0c;请直接跳转到第…

vite和webpacke的常规配置

文章目录 1、vite和webpacke的区分2、vite的常规配置介绍主要部分介绍vite基本配置示例 3、webpacke的常规配置介绍主要部分介绍Webpack 基本配置示例 1、vite和webpacke的区分 相同点&#xff1a; 都是构建工具&#xff0c;用于资源打包 &#xff1b; 都有应用到摇树原理 tre…

网络研讨会 | 数据中心中的人工智能

人工智能&#xff08;AI&#xff09;是嵌入式开发人员必须解决的最复杂的技术之一。将其集成到您的系统中会带来很多问题而不是很多答案。行业媒体Embedded Computing Design特地推出“工程师的人工智能集成指南”月度网络研讨会系列&#xff0c;目的是尽可能地简化嵌入式计算设…

「共沐书香 分享喜“阅”」世界读书日交流活动在国际数字影像产业园圆满举行

人间最美四月天&#xff0c;正是读书好时节。4月23日&#xff0c;在数媒大厦的春日里&#xff0c;我们共同迎来了第29个“世界读书日"。由数字影像联合工会委员会、树莓科技&#xff08;成都&#xff09;集团有限公司工会委员会主办&#xff0c;成都树莓信息技术有限公司、…

Linux(韦东山)

linux和windows的差别 推荐学习路线 先学习 应用程序 然后&#xff1a; 驱动程序基础 最后&#xff1a;项目 韦东山课程学习顺序 看完第六篇之后&#xff0c;还可以继续做更多的官网的项目 入门之后&#xff0c;根据自己的需要学习bootloader / 驱动大全 / LVGL

误差的一阶和二阶——MSE/MAE

variance和bias MSE之前&#xff0c;先看两个更为朴素的指标&#xff1a;variance和bias。 在打靶中&#xff0c;有的人所有的子弹都离靶心很远&#xff0c;偏差显然过高&#xff0c;但是很稳定地维持在某一点附近&#xff1b;有的人平均环数更高&#xff0c;但是分布太过分散…

网络安全之SQL注入漏洞复现(中篇)(技术进阶)

目录 一&#xff0c;报错注入 二&#xff0c;布尔盲注 三&#xff0c;sleep延时盲注 四&#xff0c;DNSlogs 盲注 五&#xff0c;二次注入 六&#xff0c;堆叠注入 总结 一&#xff0c;报错注入 报错注入就是在错误信息中执行 sql 语句&#xff0c;利用网站的报错信息来带…

2024-04-23 linux 查看内存占用情况的命令free -h和cat /proc/meminfo

一、要查看 Linux 系统中的内存占用大小&#xff0c;可以使用 free 命令或者 top 命令。下面是这两个命令的简要说明&#xff1a; 使用 free 命令&#xff1a; free -h这将显示系统当前的内存使用情况&#xff0c;包括总内存、已用内存、空闲内存以及缓冲区和缓存的使用情况。…

使用 Flutter 打造引人入胜的休闲游戏体验

作者 / Zoey Fan 去年&#xff0c;Flutter 休闲游戏工具包进行了一次重大更新。近期在旧金山举办的游戏开发者大会 (GDC) 上&#xff0c;Flutter 首次亮相。GDC 是游戏行业的顶级专业盛会&#xff0c;致力于帮助游戏开发者不断提升开发技能。欢迎您继续阅读&#xff0c;了解开发…

小程序AI智能名片商城系统如何依赖CPM、CPC、CPS技术应用进行营销

在数字化营销的新纪元中&#xff0c;小程序AI智能名片商城系统以其高效、智能的特性&#xff0c;成为了企业营销的重要工具。而CPM、CPC、CPS这三种技术应用&#xff0c;更是为该系统赋予了强大的营销能力。接下来&#xff0c;我们将通过详细的例子&#xff0c;探讨这些技术是如…

微信小程序webview和小程序通讯

1.背景介绍 1.1需要在小程序嵌入vr页面&#xff0c;同时在vr页面添加操作按钮与小程序进行通信交互 1.2 开发工具&#xff1a;uniapp开发小程序 1.3原型图 功能&#xff1a;.点击体验官带看跳转小程序的体验官带看页面 功能&#xff1a;点击立即咨询唤起小程序弹窗打电话 2.…

力扣数据库题库学习(4.23日)

610. 判断三角形 问题链接 解题思路 题目要求&#xff1a;对每三个线段报告它们是否可以形成一个三角形。以 任意顺序 返回结果表。 对于三个线段能否组成三角形的判定&#xff1a;任意两边之和大于第三边&#xff0c;对于这个表内的记录&#xff0c;要求就是&#xff08;x…

【C语言】指针篇-一篇搞定不同类型指针变量-必读指南(3/5)

男儿不展风云志&#xff0c;空负天生八尺躯。——《警世通言卷四十》&#x1f308;个人主页&#xff1a;是店小二呀 &#x1f308;C语言笔记专栏&#xff1a;C语言笔记 &#x1f308;C笔记专栏&#xff1a; C笔记 &#x1f308;喜欢的诗句:无人扶我青云志 我自踏雪至山巅 上篇…

vue项目启动npm install和npm run serve时出现错误Failed to resolve loader:node-sass

1.常见问题 问题1&#xff1a;当执行npm run serve时&#xff0c;出现Failed to resolve loader: node-sass&#xff0c;You may need to install it 解决方法&#xff1a; npm install node-sass4.14.1问题2&#xff1a;当执行npm run serve时&#xff0c;出现以下错误 Fa…

ADC内部运行原理

1以一个简单的外置ADC为例讲解 1在外部由地址锁存和译码经行去控制通道选择开关//去控制外部那一条IO口输入&#xff0c;输入到比较器 2逐次逼近寄存器SAR每次从三态锁存缓冲器读取值在由DAC&#xff08;数模转换成模拟电压&#xff09;在输入到比较器当io信号和DAC信号几乎一样…

JWT原理解析

一、概述 虽然现在很多的开发框架会支持JWT的使用&#xff0c;但是对JWT还是没有一个详细的了解&#xff0c;有很多疑惑&#xff1a; JWT比之前的session或者token有什么好处&#xff1f;JWT的构成元素是什么&#xff1f;JWT从生成到使用的详细流程&#xff1f; 二、 JWT 2…
最新文章