Netty-排坑日记

阅读更多

1 issue-1:unsupported message type: TextWebSocketFrame

1.1 复现问题

对Client进行如下改造

  1. handshake挪到connect之后执行(原本在WebSocketClientHandler.channelActive方法中执行)
  2. 循环connect,直到出现异常(问题出现的概率较小,因此用死循环循环)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    package org.liuyehcf.netty.ws;

    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.http.DefaultHttpHeaders;
    import io.netty.handler.codec.http.HttpClientCodec;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.websocketx.*;
    import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
    import io.netty.handler.stream.ChunkedWriteHandler;

    import java.net.URI;
    import java.net.URISyntaxException;
    import java.nio.charset.Charset;

    /**
    * @author hechenfeng
    * @date 2018/11/3
    */
    public class Client {
    public static void main(String[] args) throws Exception {
    final URI webSocketURI = getUri();
    final EventLoopGroup group = new NioEventLoopGroup();
    while (true) {

    final WebSocketClientHandshaker handShaker = WebSocketClientHandshakerFactory.newHandshaker(
    webSocketURI, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
    final WebSocketClientHandler webSocketClientHandler = new WebSocketClientHandler(handShaker);

    final Bootstrap boot = new Bootstrap();
    boot.group(group)
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<SocketChannel>() {
    protected void initChannel(SocketChannel socketChannel) {
    ChannelPipeline pipeline = socketChannel.pipeline();
    pipeline.addLast(new HttpClientCodec());
    pipeline.addLast(new HttpObjectAggregator(65535));
    pipeline.addLast(new ChunkedWriteHandler());
    pipeline.addLast(WebSocketClientCompressionHandler.INSTANCE);
    pipeline.addLast(webSocketClientHandler);
    pipeline.addLast(new ClientHandler());
    }
    })
    .option(ChannelOption.SO_KEEPALIVE, true)
    .option(ChannelOption.TCP_NODELAY, true)
    .option(ChannelOption.SO_BACKLOG, 1024);

    final Channel channel = boot.connect(webSocketURI.getHost(), webSocketURI.getPort()).sync().channel();
    handShaker.handshake(channel);
    webSocketClientHandler.handshakeFuture().sync();

    channel.writeAndFlush(new TextWebSocketFrame("Hello, I'm client"))
    .addListener((ChannelFuture future) -> {
    if (!future.isSuccess() && future.cause() != null) {
    future.cause().printStackTrace();
    System.exit(1);
    } else {
    System.out.println("normal case");
    }
    })
    .addListener(ChannelFutureListener.CLOSE);

    }
    }

    private static URI getUri() {
    try {
    return new URI("ws://localhost:8866");
    } catch (URISyntaxException e) {
    throw new RuntimeException(e);
    }
    }

    private static final class ClientHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
    @Override
    @SuppressWarnings("all")
    protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {
    final String content;
    if (msg instanceof BinaryWebSocketFrame) {
    BinaryWebSocketFrame binaryWebSocketFrame = (BinaryWebSocketFrame) msg;
    ByteBuf byteBuf = binaryWebSocketFrame.content();
    byte[] bytes = new byte[byteBuf.readableBytes()];
    byteBuf.getBytes(0, bytes);
    content = new String(bytes, Charset.defaultCharset());
    } else if (msg instanceof TextWebSocketFrame) {
    content = ((TextWebSocketFrame) msg).text();
    } else if (msg instanceof PongWebSocketFrame) {
    content = "Pong";
    } else if (msg instanceof ContinuationWebSocketFrame) {
    content = "Continue";
    } else if (msg instanceof PingWebSocketFrame) {
    content = "Ping";
    } else if (msg instanceof CloseWebSocketFrame) {
    content = "Close";
    ctx.close();
    } else {
    throw new RuntimeException();
    }

    System.out.println("client receive message: " + content);
    }
    }
    }

对WebSocketClientHandler进行如下改造

  1. 注释掉handShaker.handshake(ctx.channel());一句
1
2
3
4
5
6
7
    @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// execution timing must after all the handlers are added
// other wise exception may occurred (ChannelPipeline does not contain a HttpRequestEncoder or HttpClientCodec)
// handShaker.handshake(ctx.channel());
super.channelActive(ctx);
}

运行后得到如下异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java.lang.UnsupportedOperationException: unsupported message type: TextWebSocketFrame (expected: ByteBuf, FileRegion)
    at io.netty.channel.nio.AbstractNioByteChannel.filterOutboundMessage(AbstractNioByteChannel.java:283)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:877)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1391)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
    at io.netty.handler.stream.ChunkedWriteHandler.doFlush(ChunkedWriteHandler.java:305)
    at io.netty.handler.stream.ChunkedWriteHandler.flush(ChunkedWriteHandler.java:135)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
    at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
    at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
    at io.netty.channel.AbstractChannelHandlerContext.access$1500(AbstractChannelHandlerContext.java:38)
    at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1152)
    at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1075)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:466)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

1.2 问题分析

我们分别在写回调中的正常case以及异常case处打上断点,看一看正常情况下以及异常情况下ChannelPipeline的差异

1
2
3
4
5
6
7
8
9
10
11
12
channel.writeAndFlush(new TextWebSocketFrame("Hello, I'm client"))
.addListener((ChannelFuture future) -> {
if (!future.isSuccess() && future.cause() != null) {
// 这里打个断点,异常情况
future.cause().printStackTrace();
System.exit(1);
} else {
// 这里打个断点,正常情况
System.out.println("normal case");
}
})
.addListener(ChannelFutureListener.CLOSE);

正常的时候,其handler如下

  1. WebSocket13FrameDecoder
  2. WebSocket13FrameEncoder
  3. ChunkedWriteHandler
  4. PerMessageDeflateEncoder
  5. PerMessageDeflateDecoder
  6. WebSocketClientHandler
  7. ClientHandler

异常的时候,其handler如下

  1. WebSocket13FrameDecoder
  2. ChunkedWriteHandler
  3. PerMessageDeflateEncoder
  4. PerMessageDeflateDecoder
  5. WebSocketClientHandler
  6. ClientHandler

对比正常/异常情况下的handler,我们可以发现,异常情况下,缺少了WebSocket13FrameEncoder

WebSocket13FrameEncoderhandshake过程中被添加到ChannelPipeline中去,handshake方法如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public final ChannelFuture handshake(Channel channel, final ChannelPromise promise) {
FullHttpRequest request = newHandshakeRequest();

HttpResponseDecoder decoder = channel.pipeline().get(HttpResponseDecoder.class);
if (decoder == null) {
HttpClientCodec codec = channel.pipeline().get(HttpClientCodec.class);
if (codec == null) {
promise.setFailure(new IllegalStateException("ChannelPipeline does not contain " +
"a HttpResponseDecoder or HttpClientCodec"));
return promise;
}
}

channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
ChannelPipeline p = future.channel().pipeline();
ChannelHandlerContext ctx = p.context(HttpRequestEncoder.class);
if (ctx == null) {
ctx = p.context(HttpClientCodec.class);
}

// 实际情况是这里抛出了异常,导致下一句没有执行
if (ctx == null) {
promise.setFailure(new IllegalStateException("ChannelPipeline does not contain " +
"a HttpRequestEncoder or HttpClientCodec"));
return;
}
p.addAfter(ctx.name(), "ws-encoder", newWebSocketEncoder());

promise.setSuccess();
} else {
promise.setFailure(future.cause());
}
}
});
return promise;
}

为什么在外部执行handshake会导致这个问题,目前还不清楚

2 issue-2:webSocket连接占用内存过高

表面原因是由于增加了以下两个Handler,这两个handler会用到JdkZlibDecoder,而JdkZlibDecoder在处理过程中会分配大量内存

  • WebSocketClientCompressionHandler.INSTANCE
  • WebSocketServerCompressionHandler

3 issue-3:OutOfDirectMemoryError

在项目中,我需要将获取到的FullHttpRequest转成对应的字节数组,用到了Netty提供的EmbeddedChannel来进行转换,最开始代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) {
EmbeddedChannel ch = new EmbeddedChannel(new HttpRequestEncoder());
ByteBuf byteBuf = null;
try {
ch.writeOutbound(msg.retain());
byteBuf = ch.readOutbound();

byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
} finally {
ch.close();
}
}

在测试环境压测一端时间后发现了如下的异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
[2019-06-25 09:52:15]11.158.132.167
content: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 1056964615, max: 1073741824)
content: at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:656)
content: at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:610)
content: at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:764)
content: at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:740)
content: at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244)
content: at io.netty.buffer.PoolArena.allocate(PoolArena.java:226)
content: at io.netty.buffer.PoolArena.allocate(PoolArena.java:146)
content: at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324)
content: at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)
content: at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176)
content: at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:137)
content: at io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114)
content: at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:147)
content: at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:648)
content: at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:583)
content: at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:500)
content: at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462)
content: at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
content: at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
content: at java.lang.Thread.run(Thread.java:766)

原因,没有释放ch.readOutbound()返回的ByteBuf,调整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) {
EmbeddedChannel ch = new EmbeddedChannel(new HttpRequestEncoder());
ByteBuf byteBuf = null;
try {
ch.writeOutbound(msg.retain());
byteBuf = ch.readOutbound();

byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
} finally {
ReferenceCountUtil.release(byteBuf); // 释放
ch.close();
}
}

4 issue-4:LEAK: ByteBuf.release() was not called before it’s garbage-collected

在使用EmbeddedChannel的时候,并发高的时候,总是会出现如下异常堆栈。我已经在适当的时间进行资源清理操作了(调用了EmbeddedChannel#close()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2020-04-17 10:30:24.784 [ERROR] [nioEventLoopGroup-3-4] --- LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
Created at:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:331)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176)
io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:113)
io.netty.handler.ssl.SslHandler.allocate(SslHandler.java:1914)
io.netty.handler.ssl.SslHandler.allocateOutNetBuf(SslHandler.java:1923)
io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:822)
io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:793)
io.netty.handler.ssl.SslHandler.flush(SslHandler.java:774)
io.netty.handler.ssl.SslHandler.flush(SslHandler.java:1650)
io.netty.handler.ssl.SslHandler.closeOutboundAndChannel(SslHandler.java:1618)
io.netty.handler.ssl.SslHandler.close(SslHandler.java:732)
io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:624)
io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:608)
io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.close(CombinedChannelDuplexHandler.java:507)
io.netty.channel.ChannelOutboundHandlerAdapter.close(ChannelOutboundHandlerAdapter.java:71)
io.netty.channel.CombinedChannelDuplexHandler.close(CombinedChannelDuplexHandler.java:318)
io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:624)
io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:608)
io.netty.channel.DefaultChannelPipeline.close(DefaultChannelPipeline.java:1040)
io.netty.channel.AbstractChannel.close(AbstractChannel.java:274)
io.netty.channel.embedded.EmbeddedChannel.close(EmbeddedChannel.java:550)
io.netty.channel.embedded.EmbeddedChannel.close(EmbeddedChannel.java:537)

原因如下:EmbeddedChannel在异常情况下关闭时,可能还存在尚未读取的消息或者尚未写入的消息,这些消息如果不得到即使清理(调用release方法),那么就会产生LEAK异常

正确的做法是,调用EmbeddedChannel#finishAndReleaseAll()方法来清理资源,该方法会负责清理所有暂未处理的消息

4.1 参考