1、netty如何解析多协议

前提:

项目地址:https://gitee.com/q529075990qqcom/NB-IOT.git

我们需要一个创建mavne项目,这个项目是我已经写好的项目,项目结构图如下:

 

 

 

创建公共模块

创建子模块,准备好依赖Netty4.1版本

<dependencies>
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-all</artifactId>
      <version>4.1.72.Final</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.28</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-simple</artifactId>
      <version>1.7.28</version>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>RELEASE</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>com.esotericsoftware</groupId>
      <artifactId>kryo</artifactId>
      <version>5.3.0</version>
    </dependency>
  </dependencies>
maven依赖

 

序列化的定义是:将一个对象编码成一个字节流(I/O);而与之相反的操作被称为反序列化。

package serializer;

/**
 * @description:
 * @author: quliang
 * @create: 2022-10-20 15:16
 **/
public interface Serializer {
    /**
     * 序列化
     *
     * @param obj
     * @return
     * @throws Exception
     */
    byte[] serialize(Object obj) throws Exception;

    /**
     * 反序列化
     *
     * @param bytes
     * @param clazz
     * @param <T>
     * @return
     * @throws Exception
     */
    <T> T deserialize(byte[] bytes, Class<T> clazz) throws Exception;

}
自定义序列化接口
package serializer;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy;
import org.objenesis.strategy.StdInstantiatorStrategy;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;

/**
 * @description:
 * @author: quliang
 * @create: 2022-10-20 15:18
 **/

public class KryoSerializer implements Serializer {
    private static final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> {
        Kryo kryo = new Kryo();
        kryo.setReferences(true);
        kryo.setRegistrationRequired(false);
        ((DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy())
                .setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
        return kryo;
    });

    @Override
    public byte[] serialize(Object obj) throws Exception {
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
            Output output = new Output(baos);
            Kryo kryo = kryoThreadLocal.get();
            kryo.writeObject(output, obj);
            kryoThreadLocal.remove();
            return output.toBytes();
        } catch (IOException e) {
            throw new Exception("序列化失败", e);
        }
    }

    @Override
    public <T> T deserialize(byte[] bytes, Class<T> clazz) throws Exception {
        try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) {
            Input input = new Input(bais);
            Kryo kryo = kryoThreadLocal.get();
            Object obj = kryo.readObject(input, clazz);
            kryoThreadLocal.remove();
            return clazz.cast(obj);
        } catch (IOException e) {
            throw new Exception("反序化失败");
        }
    }
}
Kryo实现序列化接口

 

我们需要解析两种协议,那我们就要提前定义好两种协议,分别是消息协议、登录协议

 

消息协议相关

package protocol.msg;

import lombok.Data;
import lombok.Getter;

/**
 * @description: 消息协议: |magic|version|data|
 * @author: quliang
 * @create: 2022-12-10 20:46
 **/
@Data
public class MsgProtocol {
    @Getter
    private byte magic=0;
    @Getter
    private byte version=1;
}
消息协议基类
package protocol.msg.request;

import lombok.Data;
import protocol.msg.MsgProtocol;

/**
 * @description:
 * @author: quliang
 * @create: 2022-12-10 20:58
 **/
@Data
public class MsgRequest extends MsgProtocol {
    private String msg;
}
消息请求子类
package protocol.msg.response;

import lombok.Data;
import protocol.msg.MsgProtocol;

/**
 * @description:
 * @author: quliang
 * @create: 2022-12-10 20:41
 **/
@Data
public class MsgResponse extends MsgProtocol {
    private int statCode;
}
消息响应子类
package encoder;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import protocol.msg.MsgProtocol;
import serializer.KryoSerializer;

/**
 * @description:
 * @author: quliang
 * @create: 2022-12-10 20:53
 **/

public class MsgEncoder extends MessageToByteEncoder<MsgProtocol> {

    @Override
    protected void encode(ChannelHandlerContext ctx, MsgProtocol msgProtocol, ByteBuf in) throws Exception {
        in.writeByte(msgProtocol.getMagic());
//        in.writeByte(msgProtocol.code());
        in.writeByte(msgProtocol.getVersion());
        byte[] data = new KryoSerializer().serialize(msgProtocol);
        in.writeShort(data.length);
        in.writeBytes(data);
    }
}
消息协议编码
package decoder;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import protocol.msg.MsgProtocol;
import serializer.KryoSerializer;

import java.util.List;

/**
 * @description:
 * @author: quliang
 * @create: 2022-12-10 20:52
 **/
@Slf4j
public class MsgDecoder extends ByteToMessageDecoder {

    private Class<MsgProtocol> msgClass;

    public MsgDecoder(Class clazz) {
        this.msgClass = clazz;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        try {
            byte magic = in.readByte();
            byte version = in.readByte();

            short dataSize = in.readShort();
            byte[] data = new byte[dataSize];
            in.readBytes(data);

            MsgProtocol baseProtocol = new KryoSerializer().deserialize(data, msgClass);
            out.add(baseProtocol);
        } catch (Exception e) {
            //如果解码错误,将数据传递到下一个解码器中
            log.error("msg decoder {}",e.getMessage());
            // 重置读取字节索引,因为上边已经读了(readBytes),不加这个会导致数据为空
            in.resetReaderIndex();
            // 这里是复制流,复制一份,防止skipBytes跳过,导致传递的消息变成空;
            ByteBuf buff = in.retainedDuplicate();
            //原因是netty不允许有字节内容不读的情况发生,所以采用下边的方法解决。
            in.skipBytes(in.readableBytes());
            //继续传递到下一个解码器中
            out.add(buff);
        }
    }
}
消息协议解码

 

登录协议相关

package protocol.system;



import lombok.Getter;

/**
 * @description: 登录协议: |magic|version|code|data|
 * @author: quliang
 * @create: 2022-12-09 18:10
 **/

public class LoginProtocol {
    @Getter
    private byte magic=0;
    @Getter
    private byte version=1;
    @Getter
    public byte code;
}
登录协议基类
package protocol.system.request;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import protocol.system.LoginProtocol;

/**
 * @description:
 * @author: quliang
 * @create: 2022-12-06 18:17
 **/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class LoginRequest extends LoginProtocol {

    private String userId;

    private String userName;
}
登录请求子类
package protocol.system.response;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import protocol.system.LoginProtocol;

/**
 * @description:
 * @author: quliang
 * @create: 2022-12-06 18:22
 **/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class LoginResponse extends LoginProtocol {

    private String msg;

    private String data;

}
登录响应子类
package encoder;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import protocol.system.LoginProtocol;
import serializer.KryoSerializer;

/**
 * @description:
 * @author: quliang
 * @create: 2022-12-06 22:11
 **/

public class LoginEncoder extends MessageToByteEncoder<LoginProtocol> {

    @Override
    protected void encode(ChannelHandlerContext ctx, LoginProtocol baseProtocol, ByteBuf in) throws Exception {
        in.writeByte(baseProtocol.getMagic());
        in.writeByte(baseProtocol.getCode());
        in.writeByte(baseProtocol.getVersion());
        byte[] data = new KryoSerializer().serialize(baseProtocol);
        in.writeShort(data.length);
        in.writeBytes(data);
    }
}
登录协议编码
package decoder;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import protocol.system.LoginProtocol;
import serializer.KryoSerializer;
import java.util.List;

/**
 * @description:
 * @author: quliang
 * @create: 2022-12-06 17:59
 **/
@Slf4j
public class LoginDecoder extends ByteToMessageDecoder {

    private Class<LoginProtocol> clazz;

    public LoginDecoder(Class clazz) {
        this.clazz = clazz;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        try {
            byte magic = in.readByte();
            byte code = in.readByte();
            byte version = in.readByte();

            short dataSize = in.readShort();
            byte[] data = new byte[dataSize];
            in.readBytes(data);

            LoginProtocol baseProtocol = new KryoSerializer().deserialize(data, clazz);
            out.add(baseProtocol);
        } catch (Exception e) {
            //如果解码错误,将数据传递到下一个解码器中
            log.error("login decoder {}", e.getMessage());
            // 重置读取字节索引,因为上边已经读了(readBytes),不加这个会导致数据为空
            in.resetReaderIndex();
            // 这里是复制流,复制一份,防止skipBytes跳过,导致传递的消息变成空;
            ByteBuf buff = in.retainedDuplicate();
            //原因是netty不允许有字节内容不读的情况发生,所以采用下边的方法解决。
            in.skipBytes(in.readableBytes());
            //继续传递到下一个解码器中
            out.add(buff);
        }
    }
}
登录协议解码

 这样公共模块就创建完成了

创建服务端

package com.ql;

import com.ql.handler.MsgHandler;
import decoder.LoginDecoder;
import decoder.MsgDecoder;
import com.ql.handler.LoginHandler;
import encoder.LoginEncoder;
import encoder.MsgEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import protocol.system.request.LoginRequest;
import protocol.msg.request.MsgRequest;

/**
 * @author quliang
 * @description 服务端
 * @date 2022-12-06 17:39:14
 */
@Slf4j
public class IotServer {

    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap().group(
                    bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            ChannelPipeline pipeline = channel.pipeline();
                            pipeline.addLast(new LoggingHandler(LogLevel.INFO));
                            /**
                             * 心跳机制
                             */
                            //pipeline.addLast(new IdleStateHandler(5, 10, 5, TimeUnit.SECONDS));

                            /**
                             * 消息、登录解码器
                             */
                            pipeline.addLast(new LoginDecoder(LoginRequest.class));
                            pipeline.addLast(new MsgDecoder(MsgRequest.class));

                            /**
                             * 消息、登录处理器
                             */
                            pipeline.addLast(new MsgHandler());
                            pipeline.addLast(new LoginHandler());

                            /**
                             * 消息、登录编码器
                             */
                            pipeline.addLast(new MsgEncoder());
                            pipeline.addLast(new LoginEncoder());

                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 1024);

            ChannelFuture cf = bootstrap.bind(8849).sync();
            log.info("socket服务端启动成功 {}", cf.channel().localAddress().toString());

            cf.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }

    }
}
服务端代码
package com.ql.handler;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import protocol.msg.request.MsgRequest;
import protocol.msg.response.MsgResponse;

/**
 * @description: 消息处理器
 * @author: quliang
 * @create: 2022-12-10 20:57
 **/
@Slf4j
@ChannelHandler.Sharable
public class MsgHandler extends SimpleChannelInboundHandler<MsgRequest> {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("上线{}", ctx.channel().remoteAddress().toString());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MsgRequest request) throws Exception {
        log.info("服务端读取消息体数据为{}", request.toString());
        MsgResponse response = new MsgResponse();
        response.setStatCode(200);
        ctx.channel().writeAndFlush(response);
    }
}
服务端消息处理器
package com.ql.handler;

import io.netty.channel.*;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import protocol.system.request.LoginRequest;
import protocol.system.response.LoginResponse;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * @description: 登录处理器
 * @author: quliang
 * @create: 2022-12-06 18:14
 **/
@Slf4j
@ChannelHandler.Sharable
public class LoginHandler extends SimpleChannelInboundHandler<LoginRequest>{
    private static AtomicInteger READER_COUNT = new AtomicInteger(0);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("服务端:{} 通道开启!", ctx.channel().localAddress().toString());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("服务端: {} 通道关闭!", ctx.channel().localAddress().toString());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, LoginRequest loginRequest) throws Exception {
        log.info("读取数据 {} ", loginRequest.toString());
        LoginResponse response= new LoginResponse("success", null);
        ctx.channel().writeAndFlush(response);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        log.info("...............数据接收-完毕...............");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
        log.error("...............业务处理异常...............{}", cause);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            Channel channel = ctx.channel();
            switch (event.state()) {
                case READER_IDLE:
                    log.info("读空闲");
                    READER_COUNT.addAndGet(1);
                    break;
                case WRITER_IDLE:
                    log.info("写空闲");
                    break;
                default:
                    break;
            }
            ctx.disconnect();
            if (READER_COUNT.get() > 3) {
                log.info("close this channel {}", channel.remoteAddress().toString());
            }
        }
    }

}
服务端登录处理器

 

服务端其实很多都是直接引用公共模块的,代码也并不复杂

创建消息客户端

package com.ql;

import com.ql.handler.ClientMsgHandler;
import decoder.MsgDecoder;
import encoder.MsgEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import protocol.msg.response.MsgResponse;

import java.net.InetSocketAddress;

/**
 * @author quliang
 * @description 客户端
 * @date 2022-12-06 17:37:56
 */
@Slf4j
public class IotClientMsg {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup clientGroup = new NioEventLoopGroup();
        try {
            Bootstrap bs = new Bootstrap();
            bs.group(clientGroup)
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress("169.254.190.154", 8849))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new LoggingHandler(LogLevel.INFO));

                            //消息解码器
                            pipeline.addLast(new MsgDecoder(MsgResponse.class));
                            //客户端消息处理器
                            pipeline.addLast(new ClientMsgHandler());
                            //消息编码器
                            pipeline.addLast(new MsgEncoder());
                        }
                    });

            ChannelFuture cf = bs.connect().sync();
            log.info("启动成功{}", cf.channel().localAddress().toString());

//            Scanner scanner = new Scanner(System.in);

            cf.channel().closeFuture().sync();
        } finally {
            clientGroup.shutdownGracefully().sync();
        }
    }
}
客户端代码
package com.ql.handler;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import protocol.msg.request.MsgRequest;
import protocol.msg.response.MsgResponse;

/**
 * @description:
 * @author: quliang
 * @create: 2022-12-10 20:40
 **/
@Slf4j
@ChannelHandler.Sharable
public class ClientMsgHandler extends SimpleChannelInboundHandler<MsgResponse> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        MsgRequest request = new MsgRequest();
        request.setMsg("hello");
        ctx.channel().writeAndFlush(request);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MsgResponse response) throws Exception {
        int code = response.getStatCode();
        log.info("消息处理器读取响应对象数据为{}", code);
    }
}
客户端消息处理器

 

 消息客户端代码也并不复杂

创建登录客户端

package com.ql;

import com.ql.handler.ClientLoginHandler;
import decoder.LoginDecoder;
import encoder.LoginEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import protocol.system.response.LoginResponse;

import java.net.InetSocketAddress;

/**
 * @author quliang
 * @description 客户端
 * @date 2022-12-06 17:37:56
 */
@Slf4j
public class IotClientLogin {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup clientGroup = new NioEventLoopGroup();
        try {
            Bootstrap bs = new Bootstrap();
            bs.group(clientGroup)
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress("169.254.190.154", 8849))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new LoggingHandler(LogLevel.INFO));

                            pipeline.addLast(new LoginDecoder(LoginResponse.class));
                            //pipeline.addLast(new MsgDecoder(MsgResponse.class));

                            //pipeline.addLast(new ClientMsgHandler());
                            pipeline.addLast(new ClientLoginHandler());

                            //pipeline.addLast(new MsgEncoder());
                            pipeline.addLast(new LoginEncoder());
                        }
                    });

            ChannelFuture cf = bs.connect().sync();
            log.info("启动成功{}", cf.channel().localAddress().toString());

//            Scanner scanner = new Scanner(System.in);

            cf.channel().closeFuture().sync();
        } finally {
            clientGroup.shutdownGracefully().sync();
        }
    }
}
客户端代码
package com.ql.handler;

import io.netty.channel.*;
import lombok.extern.slf4j.Slf4j;
import protocol.system.request.LoginRequest;
import protocol.system.response.LoginResponse;

import java.util.Scanner;

/**
 * @description:
 * @author: quliang
 * @create: 2022-12-06 22:16
 **/
@Slf4j
@ChannelHandler.Sharable
public class ClientLoginHandler extends SimpleChannelInboundHandler<LoginResponse>  {

    private Scanner scanner = new Scanner(System.in);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("客户端:{} 通道开启!", ctx.channel().localAddress().toString());
        login(ctx);
    }

    /**
     * 登录方法
     * @param ctx
     */
    private void login(ChannelHandlerContext ctx) {
        LoginRequest request = new LoginRequest("123", "123");
        ctx.channel().writeAndFlush(request);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("客户端: {} 读取数据 {}", ctx.channel().localAddress().toString(), msg.toString());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, LoginResponse response) throws Exception {
        log.info("客户端: {} 读取数据 {}", ctx.channel().localAddress().toString(), response.toString());
        String msg = response.getMsg();
        log.info("========{}", msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        log.info("...............数据接收-完毕...............");
    }
}
客户端登录处理器

 

 

 

 

我们是怎么通过这个项目来实现不同协议编解码?

其实也不难,我们仔细看MsgDecoder、LoginDecoder两个类其中一个类的代码,其中有个巧妙的操作就是使用try-catch,

只要解码器无法解码发生异常,就重置读取字节索引传递到下一个解码器中,直到传递到正确解码器中。不过为了兼容多种协议,

解码异常也会让服务端性能有所下降的,取舍之间必有得失。

 

 

 

内容来源于网络如有侵权请私信删除

文章来源: 博客园

原文链接: https://www.cnblogs.com/quliang/p/17087419.html

你还没有登录,请先登录注册
  • 还没有人评论,欢迎说说您的想法!