0%

Netty-ByteBuf

阅读更多

1 Netty ByteBuf

1.1 CompositeByteBuf

有时,我们想将多个ByteBuf拼接成一个ByteBuf,但是又不想进行拷贝操作(数据量大时有性能开销),那么CompositeByteBuf就是最好的解决方案。CompositeByteBuf封装了一组ByteBuf,我们可以像操作普通ByteBuf一样操作这一组ByteBuf,同时又可避免拷贝,极大地提高了效率

注意,writeXXX不要和addComponent混用

1
2
3
4
5
6
7
8
public static void main(String[] args) {
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();

compositeByteBuf.addComponent(true, Unpooled.wrappedBuffer("hello, ".getBytes()));
compositeByteBuf.addComponent(true, Unpooled.wrappedBuffer("world".getBytes()));

System.out.println(new String(ByteBufUtil.getBytes(compositeByteBuf)));
}
1
2
3
4
5
6
7
8
public static void main(String[] args) {
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();

compositeByteBuf.writeBytes("hello, ".getBytes());
compositeByteBuf.writeBytes("world".getBytes());

System.out.println(new String(ByteBufUtil.getBytes(compositeByteBuf)));
}

2 Java Direct Buffer

通俗来说,Direct Buffer就是一块在Java堆外分配的,但是可以在Java程序中访问的内存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) {
ByteBuffer dorectButeBiffer = ByteBuffer.allocateDirect(1024);
ByteBuffer heapByteBuffer = ByteBuffer.allocate(1024);
}

public static ByteBuffer allocate(int capacity) {
if (capacity < 0)
throw new IllegalArgumentException();
return new HeapByteBuffer(capacity, capacity);
}

public static ByteBuffer allocateDirect(int capacity) {
return new DirectByteBuffer(capacity);
}

3 内存泄漏

最近在实现一个自定义协议的时候用到了ByteToMessageCodec这个抽象类,该类由个encode方法需要实现,该方法的定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.github.liuyehcf.framework.io.athena.protocol.handler;

import com.github.liuyehcf.framework.io.athena.protocol.AthenaFrame;
import com.github.liuyehcf.framework.io.athena.protocol.ProtocolConstant;
import com.github.liuyehcf.framework.io.athena.util.ByteUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;

import java.util.List;

/**
* @author hechenfeng
* @date 2020/2/6
*/
public class AthenaFrameHandler extends ByteToMessageCodec<AthenaFrame> {

@Override
protected void encode(ChannelHandlerContext ctx, AthenaFrame msg, ByteBuf out) {
out.writeBytes(msg.serialize());
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
while (in.readableBytes() >= ProtocolConstant.MIN_HEADER_LENGTH) {
final int originReaderIndex = in.readerIndex();

in.setIndex(originReaderIndex + ProtocolConstant.TOTAL_LENGTH_OFFSET, in.writerIndex());
int totalLength = ByteUtils.toInt(in.readByte(), in.readByte());

// reset readable index to read all frame bytes
in.setIndex(originReaderIndex, in.writerIndex());

if (in.readableBytes() < totalLength) {
break;
}

// 重点在这里,这里通过ByteBuf.readBytes(int length)方法获取了一个新的ByteBuf
out.add(AthenaFrame.deserialize(in.readBytes(totalLength)));
}
}
}

问题在于in.readBytes(totalLength),该方法创建了一个新的ByteBuf,但是这个ByteBuf并没有被释放掉,造成了内存泄漏

如何筛查堆外内存泄露?可以通过反射查看PlatformDependent类中的静态字段DIRECT_MEMORY_COUNTER

4 参考