网站流程/杭州产品推广服务公司
通常我们习惯将编码(Encode)称为序列化(serialization),它将对象序列化为字节数组,用于网络传输、数据持久化或者其它用途。
反之,解码(Decode)称为反序列化(deserialization),它把从网络、磁盘等读取的字节数组还原成原始对象(通常是原始对象的拷贝),以方便后续的业务逻辑操作。
Java序列化
相信大多数Java程序员接触到的第一种序列化或者编解码技术就是Java默认提供的序列化机制,需要序列化的Java对象只需要实现java.io.Serializable接口并生成序列化ID,这个类就能够通过java.io.ObjectInput和java.io.ObjectOutput序列化和反序列化。
其他序列化框架
Java默认的序列化机制效率很低、序列化后的码流也较大,所以涌现出了非常多的优秀的Java序列化框架,例如:hessian、protobuf、thrift、protostuff、kryo、msgpack、avro、fst 等等。
扩展Netty 解码器
Netty提供了 io.netty.handler.codec.MessageToByteEncoder
和io.netty.handler.codec.ByteToMessageDecoder
接口,方便我们扩展编解码。
为了扩展序列化框架更方便,我们首先定义Serializer
接口:
import java.io.IOException;/*** @author Ricky Fung*/
public interface Serializer {byte[] encode(Object msg) throws IOException;<T> T decode(byte[] buf, Class<T> type) throws IOException;
}
定义Serializer工厂:
import com.mindflow.netty4.serialization.hessian.HessianSerializer;/*** @author Ricky Fung*/
public class SerializerFactory {public static Serializer getSerializer(){return new HessianSerializer();}
}
接下来,我们在Netty Decoder中使用上面定义的Serializer接口,如下:
import com.mindflow.netty4.serialization.Serializer;
import com.mindflow.netty4.serialization.SerializerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;/*** ${DESCRIPTION}** @author Ricky Fung*/
public class NettyMessageDecoder<T> extends LengthFieldBasedFrameDecoder {private Logger logger = LoggerFactory.getLogger(getClass());//判断传送客户端传送过来的数据是否按照协议传输,头部信息的大小应该是 byte+byte+int = 1+1+4 = 6private static final int HEADER_SIZE = 6;private Serializer serializer = SerializerFactory.getSerializer();private Class<T> clazz;public NettyMessageDecoder(Class<T> clazz, int maxFrameLength, int lengthFieldOffset,int lengthFieldLength) throws IOException {super(maxFrameLength, lengthFieldOffset, lengthFieldLength);this.clazz = clazz;}@Overrideprotected Object decode(ChannelHandlerContext ctx, ByteBuf in)throws Exception {if (in.readableBytes() < HEADER_SIZE) {return null;}in.markReaderIndex();//注意在读的过程中,readIndex的指针也在移动byte type = in.readByte();byte flag = in.readByte();int dataLength = in.readInt();//logger.info("read type:{}, flag:{}, length:{}", type, flag, dataLength);if (in.readableBytes() < dataLength) {logger.error("body length < {}", dataLength);in.resetReaderIndex();return null;}byte[] data = new byte[dataLength];in.readBytes(data);try{return serializer.decode(data, clazz);} catch (Exception e){throw new RuntimeException("serializer decode error");}}
}
NettyMessageEncoder.java
import com.mindflow.netty4.serialization.Serializer;
import com.mindflow.netty4.serialization.SerializerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** ${DESCRIPTION}** @author Ricky Fung*/
public final class NettyMessageEncoder<T> extendsMessageToByteEncoder {private Logger logger = LoggerFactory.getLogger(getClass());private final byte type = 0X00;private final byte flag = 0X0F;private Serializer serializer = SerializerFactory.getSerializer();private Class<T> clazz;public NettyMessageEncoder(Class<T> clazz) {this.clazz = clazz;}@Overrideprotected void encode(ChannelHandlerContext ctx, Object msg,ByteBuf out) throws Exception {try {out.writeByte(type);out.writeByte(flag);byte[] data = serializer.encode(msg);out.writeInt(data.length);out.writeBytes(data);//logger.info("write type:{}, flag:{}, length:{}", type, flag, data.length);} catch (Exception e){e.printStackTrace();}}
}
服务端:
import com.mindflow.netty4.serialization.model.Request;
import com.mindflow.netty4.serialization.model.Response;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;/*** @author Ricky Fung*/
public class NettyServer {private Logger logger = LoggerFactory.getLogger(getClass());public void bind() throws Exception {// 配置服务端的NIO线程组EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch)throws IOException {ch.pipeline().addLast(new NettyMessageDecoder<>(Request.class,1<<20, 2, 4));ch.pipeline().addLast(new NettyMessageEncoder(Response.class));ch.pipeline().addLast(new NettyServerHandler());}});// 绑定端口,同步等待成功ChannelFuture future = b.bind(Constants.HOST, Constants.PORT).sync();logger.info("Netty server start ok host:{}, port:{}", Constants.HOST , Constants.PORT);future.channel().closeFuture().sync();}class NettyServerHandler extends SimpleChannelInboundHandler<Request> {@Overrideprotected void channelRead0(ChannelHandlerContext context, Request request) throws Exception {logger.info("Rpc server receive request id:{}", request.getId());//处理请求processRpcRequest(context, request);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {logger.error("捕获异常", cause);}}private void processRpcRequest(final ChannelHandlerContext context, final Request request) {Response response = new Response();response.setId(request.getId());response.setResult("echo "+request.getMessage());context.writeAndFlush(response);}public static void main(String[] args) throws Exception {new NettyServer().bind();}}
客户端:
import com.mindflow.netty4.serialization.model.Request;
import com.mindflow.netty4.serialization.model.Response;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.net.InetSocketAddress;/*** ${DESCRIPTION}** @author Ricky Fung*/
public class NettyClient {private Logger logger = LoggerFactory.getLogger(getClass());private EventLoopGroup group = new NioEventLoopGroup();public void connect(int port, String host) throws Exception {// 配置客户端NIO线程组try {Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch)throws Exception {ch.pipeline().addLast(new NettyMessageDecoder<Response>(Response.class, 1024 * 1024, 2, 4));ch.pipeline().addLast(new NettyMessageEncoder<Request>(Request.class));ch.pipeline().addLast(new NettyClientHandler());;}});// 发起异步连接操作ChannelFuture future = b.connect(host, port).sync();if (future.awaitUninterruptibly(5000)) {logger.info("client connect host:{}, port:{}", host, port);if (future.channel().isActive()) {logger.info("开始发送消息");for(int i=0; i<100; i++){Request req = new Request();req.setId((long) i);req.setMessage("hello world");future.channel().writeAndFlush(req);}logger.info("发送消息完毕");}}} finally {}}class NettyClientHandler extends SimpleChannelInboundHandler<Response> {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, Response msg) throws Exception {final Response response = msg;logger.info("Rpc client receive response id:{}", response.getId());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {logger.error("捕获异常", cause);}}/*** @param args* @throws Exception*/public static void main(String[] args) throws Exception {new NettyClient().connect(Constants.PORT, Constants.HOST);}
}
参考资料
Netty系列之Netty编解码框架分析
Java深度历险(十)——Java对象序列化与RMI
源码下载
https://github.com/TiFG/netty4-in-action/tree/master/netty4-serialization-demo