预置的 ChannelHandler 和编解码器
预置的 ChannelHandler 和编解码器
SSL/TLS
Java 提供了 javax.net.ssl 支持 SSL/TSL,用以实现数据安全。
添加 SSL/TLS 支持:
public class SslChannelInitializer extends ChannelInitializer<Channel> {
private final SslContext context;
private final boolean startTls;
public SslChannelInitializer(SslContext context, boolean startTls) {
this.context = context;
this.startTls = startTls;
}
@Override
protected void initChannel(Channel channel) throws Exception {
SSLEngine engine = context.newEngine(channel.alloc());//alloc返回channel所配置的ByteBufAllocator
channel.pipeline().addFirst("ssl",
new SslHandler(engine, startTls));//大多数情况SslHandler是第一个ChannelHandler
//这确保了所有其他的ChannelHandler处理数据之后,才会进行加密。
}
}
HTTP/HTTPS 应用程序
完整的 HTTP 请求(FullHttpRequest)包括请求头信息、若干个 HTTPContent 和 LastHttpContent。
完整的 HTTP 响应(FullHttpResponse)包括响应头信息、若干个 HTTPContent 和 LastHttpContent。
所有类型的 HTTP 消息都实现了 HttpObject 接口。
HTTP 编解码器:HttpRequestEncoder、HttpResponseEncoder、HttpReqeustDecoder 和 HttpResponseDecoder。
HttpResponseDecoder:将字节解码为 HttpResponse、HttpContent 和 LastHttpContent。
添加 HTTP 支持
public class HttpPipelineInitializer extends ChannelInitializer<Channel> {
private final boolean client;
public HttpPipelineInitializer(boolean client) {
this.client = client;
}
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
if (client) {
pipeline.addLast("decoder", new HttpResponseDecoder());
pipeline.addLast("encoder", new HttpRequestEncoder());
} else {
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("encoder", new HttpResponseEncoder());
}
}
}
判断是否是客户端,如果是客户端,则添加 HttpResponseDecoder 对服务器响应进行解码。
聚合 HTTP 消息
由于 HTTP 请求和响应可能由多个部分组成,需要将它们聚合成完整的消息。Netty 提供了一个聚合器,可以将多个消息部分合并成 FullHttpRequest 或者 FullHttpResponse 消息。
自动聚合 HTTP 的消息片段:
public class HttpAggregarotInitializer extends ChannelInitializer<Channel> {
private final boolean isClient;
public HttpAggregarotInitializer(boolean isClient) {
this.isClient = isClient;
}
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
if (isClient) {
pipeline.addLast("codec", new HttpClientCodec());
} else {
pipeline.addLast("codec", new HttpServerCodec());
}
pipeline.addLast("aggregator", //最大消息大小是512kb
new HttpObjectAggregator(512*1024));
}
}
HttpServerCodec 里面组合了HttpResponseEncoder和HttpRequestDecoder。
HttpClientCodec 里面组合了HttpRequestEncoder和HttpResponseDecoder。
HTTP 压缩
当使用HTTP 时,建议服务器端开启压缩功能以尽可能多地减小传输数据的大小。Netty 为压缩和解压缩提供了ChannelHandler 实现,它们同时支持gzip 和deflate 编码。
自动压缩 HTTP 消息:
public class HttpCompressionInitializer extends ChannelInitializer<Channel> {
private final boolean isClient;
public HttpCompressionInitializer(boolean isClient) {
this.isClient = isClient;
}
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
if (isClient) {
pipeline.addLast("codec", new HttpClientCodec());
pipeline.addLast("decompressor",
new HttpContentDecompressor());//处理来自服务器的压缩内容
} else {
pipeline.addLast("codec", new HttpServerCodec());
pipeline.addLast("compressor",
new HttpContentCompressor());//服务器端压缩数据
}
}
}
HTTPS
启动 HTTPS 只需要将 SslHandler 添加到 ChannelPipeline。
public class HttpsCodecInitializer extends ChannelInitializer<Channel> {
private final SslContext context;
private final boolean isClient;
public HttpsCodecInitializer(SslContext context, boolean isClient) {
this.context = context;
this.isClient = isClient;
}
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
SSLEngine engine = context.newEngine(channel.alloc());
pipeline.addFirst("ssl", new SslHandler(engine));//添加SslHandler之后可以使用https
if (isClient) {
pipeline.addLast("codec", new HttpClientCodec());
} else {
pipeline.addLast("codec", new HttpServerCodec());
}
}
}
WebSocket
WebSocket 在客户端和服务器之间提供了真正的双向数据交换。
WebSocketFrame 类型:
名称 | 描述 |
---|---|
BinaryWebSocketFrame | 二进制数据帧 |
TextWebSocketFrame | 文本数据帧 |
ContinuationWebSocketFrame | 二进制和文本数据帧结合体 |
CloseWebSocketFrame | 控制帧:一个close请求、关闭的状态码以及关闭的原因 |
PingWebSocketFrame | 控制帧:请求一个PongWebSocketFrame |
PongWebSocketFrame | 控制帧:对PingWebSocketFrame请求的响应 |
WebSocketServerProtocolHandler 处理协议升级握手,以及三种控制帧--Close、Ping 和 Pong。Text和Binary数据帧将会被传递给下一个 ChannelHandler 进行处理。
public class WebSocketServerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast(
new HttpServerCodec(),
new HttpObjectAggregator(65536),
new WebSocketServerProtocolHandler("/websocket"),//升级握手
new TextFrameHandler(),
new BinaryFrameHandler(),
new ContinuationFrameHandler());
}
public static final class TextFrameHandler extends
SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
//handle text frame
}
}
public static final class BinaryFrameHandler extends
SimpleChannelInboundHandler<BinaryWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, BinaryWebSocketFrame binaryWebSocketFrame) throws Exception {
//handle binary frame
}
}
public static final class ContinuationFrameHandler extends
SimpleChannelInboundHandler<ContinuationWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ContinuationWebSocketFrame continuationWebSocketFrame) throws Exception {
//handle continuation frame
}
}
}
要想为WebSocket 添加安全性,只需要将SslHandler 作为第一个ChannelHandler 添加到ChannelPipeline 中。
空闲的连接和超时
用于空闲连接以及超时的 ChannelHandler。
发送心跳:
public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//60s没有接受或发送数据,IdelStateHandler会使用IdleStateEvent调用fireUserEventTriggered()
pipeline.addLast(new IdleStateHandler(
0, 0, 60, TimeUnit.SECONDS));
pipeline.addLast(new HeartbeatHandler());
}
public static final class HeartbeatHandler extends
ChannelInboundHandlerAdapter {
//发送到远程节点的心跳信息
private static final ByteBuf HEARTBEAT_SEQUENCE =
Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(
"HEARTBEAT", CharsetUtil.ISO_8859_1));
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
//连接空闲时间太长时,发送心跳消息,并在发送失败时关闭该连接
ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate())
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
super.userEventTriggered(ctx, evt);//传递给下一个ChannelInboundHandler
}
}
}
}
使用 IdleStateHandler 测试远程节点是否还活着,失活时关闭连接释放资源。
基于分隔符的协议
基于分隔符的协议的解码器
名称 | 描述 |
---|---|
DelimiterBasedFrameDecoder | 使用自定义分隔符提取帧的通用解码器 |
LineBasedFrameDecoder | 提取由行尾符分隔的解码器,速度比DeimiterBasedFrameDecoder快 |
分隔符提取帧:
public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//提取帧,并传给下一个ChannelHandler
pipeline.addLast(new LineBasedFrameDecoder(64*1024));
pipeline.addLast(new FrameHandler());//接收数据帧
}
public static final class FrameHandler extends
SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
//处理LineBasedFrameDecoder传进的帧
}
}
示例:1.每个帧都由换行符(\n)分隔;2.每个帧由一系列的元素组成,每个元素都由的单个空格字符分隔;3.一个帧内容代表一个命令,定义为一个命令名称后面跟着数目可变的参数。
public class CmdHandlerInitializer extends ChannelInitializer<Channel> {
static final byte SPACE = (byte)' ';
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new CmdDecoder(64 * 1024));
pipeline.addLast(new CmdHandler());
}
public static final class Cmd {
private final ByteBuf name;
private final ByteBuf args;
public Cmd(ByteBuf name, ByteBuf args) {
this.name = name;
this.args = args;
}
public ByteBuf getName() {
return name;
}
public ByteBuf getArgs() {
return args;
}
}
public static final class CmdDecoder
extends LineBasedFrameDecoder {
public CmdDecoder(int maxLength) {
super(maxLength);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
ByteBuf frame = (ByteBuf) super.decode(ctx, buffer);
if (frame == null) {
return null;
}
//查找第一个空格字符的索引,空格前是命令名称,后面是参数
int index = frame.indexOf(frame.readerIndex(),
frame.writerIndex(), SPACE);
return new Cmd(frame.slice(frame.readerIndex(), index),
frame.slice(index + 1, frame.writerIndex()));
}
}
public static final class CmdHandler extends
SimpleChannelInboundHandler<Cmd> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Cmd cmd) throws Exception {
//处理cmd
}
}
}
基于长度的协议
基于长度的协议的解码器:
名称 | 描述 |
---|---|
FixedLengthFrameDecoder | 提取固定长度的帧 |
LengthFieldBasedFrameDecoder | 根据帧头部中的长度值提取帧;该字段的偏移量以及长度在构造函数中指定 |
变长帧:
public class LengthBasedInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeLine = channel.pipeline();
//帧起始的前8字节是帧长度
pipeLine.addLast(new LengthFieldBasedFrameDecoder(64 * 1024, 0, 8));
pipeLine.addLast(new FrameHandler());
}
public static class FrameHandler extends
SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
//处理帧
}
}
}
写大型数据
当写大型数据到远程节点时,如果连接速度比较慢,数据依然不断的往内存写,可能导致内存耗尽。利用 NIO 的零拷贝特性,可以消除将文件内容从文件系统移动到网络栈的复制过程。应用程序需要做的就是实现一个 FileRegion 的接口。
利用零拷贝特性(FileRegion)来传输一个文件的内容。
FileInputStream in = new FileInputStream(File);
FileRegion region = new DefaultFileRegion(in.getChannel(), 0, file.length());
channel.writeAndFlush(region).addListener(
new ChannelFuture(region).addListener(
new ChannelFutureListener() {
}
)
);