在向channelpipeline中添加ChannelHandler时,添加顺序和执行顺序有什么关系呢?在一开始学习Netty的时候,对这个没有概念,随意加了几个ChannelHandler之后,Server端却无法接收到数据,所以这部分还是要好好理清楚才好。
基本概念
Netty是一个非阻塞的网络通信框架,非阻塞是因为使用了java的NIO来实现网络IO,通信方面使用Socket来维护一个长连接。Netty是异步的,体现在server端通过ChannelFuture获得响应,每个ChannelFuture注册一个ChannelFutureListener,当处理完成后自动回调。 Netty基于事件驱动,事件到来后经过ChannelPipeline中的ChannelHandler处理,
在Server端,通过Selector来负责网络连接的建立,和网络请求的接收;当接收到数据后,分配给一个新线程去处理请求。
异步
事件驱动
ChannelHandler相关
ChannelPipeline
每次新建Channel的时候,需要新建一个ChannelPipeline,并将这个ChannelPipeline和这个新的Channel做绑定,ChannelPipeline是一系列ChannelHandler的组合,进入Channel和出Channel的过程中,对数据的处理就是通过ChannelHandler实例来完成的。
netty提供了方法来添加、修改、删除pipeline中的ChannelHandler,包括addFirst(),addBefore(),addAfter(),addLast(),remove(),replace().从这些方法中也可以看出来,在添加ChannelHandler时,是存在先后顺序的,对于一个inbound data,首先会从ChannelPipeline的首个ChannelInboundHandler开始处理,直到最后一个ChannelInboundHandler;而outbound data则从最后一个ChannelOutBoundHandler开始处理,直到最后一个ChannelOutBoundHandler。对应的,对于outbound data,在处理时,会自动跳过ChannelInboundHandler。
每次在ChannelPipeline中添加一个ChannelHandler时,一个ChannelHandlerContext也会和这个ChannelHandler绑定,用来在两个ChannelHandler之间传递数据。但如果使用UDP协议的话,将数据绑定到ChannelHandlerContext会不安全。也可以把数据绑定到channel上,这样信息会从尾部传到头部;而绑定到context上,信息只从下一个Handler获得。
ChannelHandler
ChannelHandler用来处理channel发送和接收的数据
Client端和Server端之间发送和接收的数据分为两类:outbound data和inbound data,其中outbound是数据从本地到远程,而inbound则是从远程到本地;对应的,ChannelOutBoundHandler处理outbound数据,而ChannelInBoundHandler处理inbound数据。
对outbound I/Oevents,从channelPipeline尾端到头部开始处理 对inbound I/Oevents,从channelpipeline头部到尾部处理
ChannelHandlerContext
当一个ChannelHandler添加到ChannelPipeline时,一个新的ChannelHandlerContext也会被创建和应用,它用来处理ChannelHandler和下一个ChannelHandler之间的交互。
当一个event从Channel中发出后,先通过第一个ChannelHandler处理,然后通过ChannelHandlerContext传递给下一个ChannelHandler。如果希望一个event从特定的ChannelHandler开始处理,也可以通过ChannelHandlerContext来达到目的。
ChannelHandlerContext ctx=..;
ctx.write(Unpooled.copiedBuffer("msg",CharsetUtil.UTF-8));
ChannelHandler包括InboundHandler和OutboundHandler,其中InboundHandler处理接收到的数据,以及各种状态变化,包括(Register,Activate,Unactivate,Unregister);而OutboundHandler处理要发送的数据,并且支持各种拦截操作
EventLoop
当一个channel注册后,会从EventLoopGroup中获取一个EventLoop,用来处理所有的I/O事件,可以把这个EventLoop视为一个线程,对于一个channel中的所有event,都会由这一个EventLoop来处理
Bootstrap
Bootstrap用来配置Netty应用,有两个Bootstrap类,一个用来配置clients(Bootstrap),一个用来配置servers(ServerBootStrap),其中ServerBootstrap会和本地端口绑定,而Bootstrap用来和远程host、ip建立连接。ServerBootstrap使用两个EventLoopGroup,一个标示server自己的和本地端口绑定的socket,用来接收连接;另一个包含所有server接收到的channel连接,即处理请求。而BootStrap只有一个EventLoopGroup
问题分析
在我的 job-schedule的项目代码中,在swa.job.register.Client类中我添加了两个ChannelInBoundHandler,两个ChannelOutBoundHandler,一个ChannelDuplexHandler子类,代码如下:
public void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DataEncoder());//out
ch.pipeline().addLast(new JobInfoSenderTest(jobInfo));//out//
ch.pipeline().addLast(new DataDecoder());//in
ch.pipeline().addLast(new JobInfoSender(jobInfo));//in
ch.pipeline().addLast(new LoggingHandler());
}
});
Channel channel = b.connect(host, port).sync().channel();
channel.closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public class DataEncoder extends MessageToByteEncoder<String> {
private static final Logger logger = LoggerFactory.getLogger(DataEncoder.class);
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, String s, ByteBuf byteBuf) throws Exception {
s+="DataEncoder.encode&";
byteBuf.writeBytes(s.getBytes());
logger.info("encode-end:{},{}", s, byteBuf);
}
}
public class JobInfoSender extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(JobInfoSender.class);
private final String jobInfo;
public JobInfoSender(String jobInfo) {
this.jobInfo = jobInfo;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
String temp = jobInfo;
temp += "JobInfoSender.channelActive&";
logger.debug("channelActive:{}", temp);
ctx.writeAndFlush(temp);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
logger.debug("read:{}", msg);
msg += "JobInfoSender.channelRead&";
logger.debug("channelRead:{}", msg);
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
logger.debug("channelReadComplete");
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
public class JobInfoSenderTest extends ChannelOutboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(JobInfoSenderTest.class);
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
msg += "JobInfoSenderTest.write&";
logger.debug("write:{}", msg);
ctx.write(msg, promise);
}
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
logger.debug("connect:{},{}", remoteAddress, localAddress);
ctx.connect(remoteAddress, localAddress, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
public class DataDecoder extends ByteToMessageDecoder {
private static final Logger logger = LoggerFactory.getLogger(DataDecoder.class);
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
logger.info("decode:{},{}", byteBuf.toString(), list);
byteBuf.markReaderIndex();
byte[] decoded = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(decoded);
list.add(new String(decoded));
}
}
那么我的疑问是:Client端在向Server发送消息的时候,应该是属于outbound data,那么为什么需要添加JobInfoSender这么一个ChannelInBoundHandlerAdaptor子类呢?于是我添加了JobInfoSenderTest类做测试,当建立连接时向Context写入数据,但
接下来就是我的疑问: channelOutBoundHandler:当IO-outbound-operations时,通知channelOutBoundHandler channelInBoundHandler:为状态变更添加回调方法,也就是在状态发生变更的时候调用
在以上ChannelHandler中添加日志后,查看日志结果,以下是删减后的日志:
16:21:17.735 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xb0c469a8] REGISTERED
16:21:17.736 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xb0c469a8] CONNECT: /127.0.0.1:8087
16:21:17.736 [nioEventLoopGroup-2-1] DEBUG swa.job.register.JobInfoSenderTest - connect:/127.0.0.1:8087,null
16:21:17.739 [nioEventLoopGroup-2-1] DEBUG swa.job.register.JobInfoSender - channelActive:{"jobId":12}JobInfoSender.channelActive&
16:21:17.740 [nioEventLoopGroup-2-1] DEBUG swa.job.register.JobInfoSenderTest - write:{"jobId":12}JobInfoSender.channelActive&JobInfoSenderTest.write&
16:21:17.779 [nioEventLoopGroup-2-1] INFO swa.job.DataEncoder - encode-end:{"jobId":12}JobInfoSender.channelActive&JobInfoSenderTest.write&DataEncoder.encode&,PooledUnsafeDirectByteBuf(ridx: 0, widx: 83, cap: 256)
16:21:17.793 [nioEventLoopGroup-2-1] INFO swa.job.DataDecoder - decode:PooledUnsafeDirectByteBuf(ridx: 0, widx: 7, cap: 1024),[]
16:21:17.793 [nioEventLoopGroup-2-1] DEBUG swa.job.register.JobInfoSender - read:success
16:21:17.793 [nioEventLoopGroup-2-1] DEBUG swa.job.register.JobInfoSender - channelRead:successJobInfoSender.channelRead&
16:21:17.793 [nioEventLoopGroup-2-1] DEBUG swa.job.register.JobInfoSenderTest - write:successJobInfoSender.channelRead&JobInfoSenderTest.write&
16:21:17.793 [nioEventLoopGroup-2-1] INFO swa.job.DataEncoder - encode-end:successJobInfoSender.channelRead&JobInfoSenderTest.write&DataEncoder.encode&,PooledUnsafeDirectByteBuf(ridx: 0, widx: 76, cap: 256)
16:21:17.793 [nioEventLoopGroup-2-1] DEBUG swa.job.register.JobInfoSender - channelReadComplete
16:21:17.794 [nioEventLoopGroup-2-1] DEBUG swa.job.register.JobInfoSender - channelReadComplete
16:21:17.809 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xb0c469a8] INACTIVE
16:21:17.809 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xb0c469a8] UNREGISTERED
从日志分析,当Client和Server建立连接后,先调用JobInfoSenderTest中的connect方法,当Channel连通后,JobInfoSender.channelActive得到执行,此时向ChannelHandlerContext写入数据,后续依次经过:JobInfoSenderTest.write——>DataEncoder.encode 然后Server端接收到数据:{“jobId”:12}JobInfoSender.channelActive&JobInfoSenderTest.write&DataEncoder.encode& Server端向Client发送响应:[2017-12-07 16:21:17.782 [nioEventLoopGroup-3-3] INFO swa.rpc.DataEncoder - encode:success,PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 256)]
Client端接收到数据,经过:DataDecoder.decode——>JobInfoSender.channelRead Client端向Server端再次响应:JobInfoSenderTest.write——>DataEncoder.encode 当Channel中数据读取完毕,JobInfoSender.channelReadComplete回调,关闭连接
以上代码中JobInfoSenderTest用来做测试,从日志中可以看到,当Channel状态为Active时,触发JobInfoSender的channelActive方法,该方法执行完成后,下一个ChannelHandler做处理。
在Netty的源码中,对ChannelInboundHandler的说明如下:
可以看到,使用ChannelInboundHandler,可以对Channel状态做监听,当状态发生改变时,触发对应的回调函数。在以上的测试代码中,当Channel状态为Active时,JobInfoSender向Context写入数据,然后下一个ChannelHandler,也就是JobInfoSenderTest做相应的处理。
以上测试代码验证了:
- 添加ChannelInboundHandler的顺序和执行顺序相同,而ChannelOutboundHandler的添加顺序和执行顺序相反
- 对于本地到远程的数据,由ChannelOutBoundHandler和添加了自定义状态处理的ChannelInboundHandler处理,而从远程到本地的数据,只有ChannelOutboundHandler处理。
还有一个问题是:
在向ChannelInitializer添加ChannelHandler时,不能把JobInfoSender放在任何一个ChannelOutBoundHandler前边,否则无法将数据传给下一个ChannelOutboundHandler。