0%

SourceAnalysis-Netty-Client-Start

阅读更多

1 源码Maven坐标

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.17.Final</version>
</dependency>

2 客户端启动代码清单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package org.liuyehcf.netty.echo;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;

public class EchoClient {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup(); // (1)
try {

Bootstrap boot = new Bootstrap(); // (2)
boot.group(group) // (3)
.channel(NioSocketChannel.class) // (4)
.handler(new ChannelInitializer<SocketChannel>() { // (5)
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new ClientHandler());
}
})
.option(ChannelOption.SO_KEEPALIVE, true) // (6)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_BACKLOG, 1024);

// (7)
Channel channel = boot.connect("localhost", 8080).sync().channel();

// (8)
channel.writeAndFlush(Unpooled.wrappedBuffer("hello!".getBytes()));

TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}

private static final class ClientHandler extends SimpleChannelInboundHandler<Object> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;

int num = byteBuf.readableBytes();
byte[] bytes = new byte[num];
byteBuf.readBytes(bytes);

System.out.println("client receive message: " + new String(bytes, Charset.defaultCharset()));
}
}
}

3 启动过程概述

启动过程可以概括为以下步骤

  1. 配置启动参数
  2. 创建Channel
  3. 初始化Channel
  4. 注册Channel
  5. 连接Channel

4 配置启动参数

  1. 根据代码清单中的(1)。创建一个EventLoopGroup用于异步处理,包括读写数据等等
  2. 根据代码清单中的(2)。创建一个Bootstrap,会调用无参构造方法,参数的配置采用建造者模式
  3. 根据代码清单中的(3)。绑定EventLoopGroup
  4. 根据代码清单中的(4)。配置生产的Channel类型,这里指定为NioSocketChannel.class
  5. 根据代码清单中的(5)。绑定work的Handler
  6. 根据代码清单中的(6)。设置键值对

5 创建Channel

  1. 根据代码清单中的(7)。进行后续创建Channel以及连接操作

    • connect方法位于Bootstrap,将host以及port封装成SocketAddress,并转调同名方法connect
    1
    2
    3
    public ChannelFuture connect(String inetHost, int inetPort) {
    return connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
    }
    • connect方法位于Bootstrap。该方法首先做一些校验工作,然后调用doResolveAndConnect方法
    1
    2
    3
    4
    5
    6
    7
    8
    public ChannelFuture connect(SocketAddress remoteAddress) {
    if (remoteAddress == null) {
    throw new NullPointerException("remoteAddress");
    }

    validate();
    return doResolveAndConnect(remoteAddress, config.localAddress());
    }
    • doResolveAndConnect方法位于Bootstrap。该方法创建Channel并注册,然后调用doResolveAndConnect0进行连接操作
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();

    if (regFuture.isDone()) {
    if (!regFuture.isSuccess()) {
    return regFuture;
    }
    return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
    } else {
    // Registration future is almost always fulfilled already, but just in case it's not.
    final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
    regFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
    // Directly obtain the cause and do a null check so we only need one volatile read in case of a
    // failure.
    Throwable cause = future.cause();
    if (cause != null) {
    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
    // IllegalStateException once we try to access the EventLoop of the Channel.
    promise.setFailure(cause);
    } else {
    // Registration was successful, so set the correct executor to use.
    // See https://github.com/netty/netty/issues/2586
    promise.registered();
    doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
    }
    }
    });
    return promise;
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
    channel = channelFactory.newChannel();
    init(channel);
    } catch (Throwable t) {
    if (channel != null) {
    // channel can be null if newChannel crashed (eg SocketException("too many open files"))
    channel.unsafe().closeForcibly();
    }
    // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
    return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
    if (channel.isRegistered()) {
    channel.close();
    } else {
    channel.unsafe().closeForcibly();
    }
    }

    // If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    // i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    // added to the event loop's task queue for later execution.
    // i.e. It's safe to attempt bind() or connect() now:
    // because bind() or connect() will be executed *after* the scheduled registration task is executed
    // because register(), bind(), and connect() are all bound to the same thread.

    return regFuture;
    }

6 初始化Channel

我们回到位于AbstractBootstrapinitAndRegister方法中来,该方法在创建Channel完毕后,调用了init方法对其进行初始化操作

  1. init方法位于Bootstrap,该方法主要就是将之前启动时通过建造者模式配置的参数注入到该Channel中去

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    p.addLast(config.handler());

    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
    setChannelOptions(channel, options, logger);
    }

    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
    for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
    channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }
    }
    }

7 注册Channel

channel注册的详细过程,其详细过程参见SourceAnalysis-Netty-Server-Start,这里不再赘述

特别地,我们再来回顾一下注册过程中的重要一环,即位于AbstractChannelregister0方法

  1. 首先,执行doRegister方法,进行真正的底层的register操作

  2. 然后,执行pipeline.invokeHandlerAddedIfNeeded();

  3. 将initAndRegister对应的ChannelFuture设置为成功

  4. 最后,触发其他生命周期,例如fireChannelRegistered以及fireChannelActive

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    private void register0(ChannelPromise promise) {
    try {
    // check if the channel is still open as it could be closed in the mean time when the register
    // call was outside of the eventLoop
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
    return;
    }
    boolean firstRegistration = neverRegistered;
    doRegister();
    neverRegistered = false;
    registered = true;

    // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
    // user may already fire events through the pipeline in the ChannelFutureListener.
    pipeline.invokeHandlerAddedIfNeeded();

    // initAndRegister对应的ChannelFuture设置为成功
    safeSetSuccess(promise);
    pipeline.fireChannelRegistered();
    // Only fire a channelActive if the channel has never been registered. This prevents firing
    // multiple channel actives if the channel is deregistered and re-registered.
    if (isActive()) {
    if (firstRegistration) {
    pipeline.fireChannelActive();
    } else if (config().isAutoRead()) {
    // This channel was registered before and autoRead() is set. This means we need to begin read
    // again so that we process inbound data.
    //
    // See https://github.com/netty/netty/issues/4805
    beginRead();
    }
    }
    } catch (Throwable t) {
    // Close the channel directly to avoid FD leak.
    closeForcibly();
    closeFuture.setClosed();
    safeSetFailure(promise, t);
    }
    }

8 连接Channel

现在我们回到位于BootstrapdoResolveAndConnect方法中,继续跟踪doResolveAndConnect0方法

  • doResolveAndConnect0方法位于Bootstrap

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
    final SocketAddress localAddress, final ChannelPromise promise) {
    try {
    final EventLoop eventLoop = channel.eventLoop();
    final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);

    if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
    // Resolver has no idea about what to do with the specified remote address or it's resolved already.
    doConnect(remoteAddress, localAddress, promise);
    return promise;
    }

    final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);

    if (resolveFuture.isDone()) {
    final Throwable resolveFailureCause = resolveFuture.cause();

    if (resolveFailureCause != null) {
    // Failed to resolve immediately
    channel.close();
    promise.setFailure(resolveFailureCause);
    } else {
    // Succeeded to resolve immediately; cached? (or did a blocking lookup)
    doConnect(resolveFuture.getNow(), localAddress, promise);
    }
    return promise;
    }

    // Wait until the name resolution is finished.
    resolveFuture.addListener(new FutureListener<SocketAddress>() {
    @Override
    public void operationComplete(Future<SocketAddress> future) throws Exception {
    if (future.cause() != null) {
    channel.close();
    promise.setFailure(future.cause());
    } else {
    doConnect(future.getNow(), localAddress, promise);
    }
    }
    });
    } catch (Throwable cause) {
    promise.tryFailure(cause);
    }
    return promise;
    }
  • doConnect位于Bootstrap

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    private static void doConnect(
    final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {

    // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    final Channel channel = connectPromise.channel();
    channel.eventLoop().execute(new Runnable() {
    @Override
    public void run() {
    if (localAddress == null) {
    channel.connect(remoteAddress, connectPromise);
    } else {
    channel.connect(remoteAddress, localAddress, connectPromise);
    }
    connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
    }
    });
    }
  • connect方法位于AbstractChannel

    1
    2
    3
    public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return pipeline.connect(remoteAddress, promise);
    }
  • connect方法位于DefaultChannelPipeline

    1
    2
    3
    public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, promise);
    }
  • connect方法位于AbstractChannelHandlerContext,该方法继续转调同名connect方法,以及invokeConnect方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return connect(remoteAddress, null, promise);
    }

    public ChannelFuture connect(
    final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

    if (remoteAddress == null) {
    throw new NullPointerException("remoteAddress");
    }
    if (isNotValidPromise(promise, false)) {
    // cancelled
    return promise;
    }

    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
    next.invokeConnect(remoteAddress, localAddress, promise);
    } else {
    safeExecute(executor, new Runnable() {
    @Override
    public void run() {
    next.invokeConnect(remoteAddress, localAddress, promise);
    }
    }, promise, null);
    }
    return promise;
    }

    private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    if (invokeHandler()) {
    try {
    ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
    } catch (Throwable t) {
    notifyOutboundHandlerException(t, promise);
    }
    } else {
    connect(remoteAddress, localAddress, promise);
    }
    }
  • connect方法位于DefaultChannelPipeline#HeadContext

    1
    2
    3
    4
    5
    6
    public void connect(
    ChannelHandlerContext ctx,
    SocketAddress remoteAddress, SocketAddress localAddress,
    ChannelPromise promise) throws Exception {
    unsafe.connect(remoteAddress, localAddress, promise);
    }
  • connect方法位于AbstractNioChannel。在完成doConnect方法之后,fulfillConnectPromise方法将connect对应的ChannelPromise设置为成功

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    public final void connect(
    final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
    return;
    }

    try {
    if (connectPromise != null) {
    // Already a connect in process.
    throw new ConnectionPendingException();
    }

    boolean wasActive = isActive();
    if (doConnect(remoteAddress, localAddress)) {
    // 连接完成后,将promise设置为已完成(boot.connect对应的ChannelFuture)
    fulfillConnectPromise(promise, wasActive);
    } else {
    connectPromise = promise;
    requestedRemoteAddress = remoteAddress;

    // Schedule connect timeout.
    int connectTimeoutMillis = config().getConnectTimeoutMillis();
    if (connectTimeoutMillis > 0) {
    connectTimeoutFuture = eventLoop().schedule(new Runnable() {
    @Override
    public void run() {
    ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
    ConnectTimeoutException cause =
    new ConnectTimeoutException("connection timed out: " + remoteAddress);
    if (connectPromise != null && connectPromise.tryFailure(cause)) {
    close(voidPromise());
    }
    }
    }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
    }

    promise.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
    if (future.isCancelled()) {
    if (connectTimeoutFuture != null) {
    connectTimeoutFuture.cancel(false);
    }
    connectPromise = null;
    close(voidPromise());
    }
    }
    });
    }
    } catch (Throwable t) {
    promise.tryFailure(annotateConnectException(t, remoteAddress));
    closeIfClosed();
    }
    }
  • doConnect方法位于NioSocketChannel

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    if (localAddress != null) {
    doBind0(localAddress);
    }

    boolean success = false;
    try {
    boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
    if (!connected) {
    selectionKey().interestOps(SelectionKey.OP_CONNECT);
    }
    success = true;
    return connected;
    } finally {
    if (!success) {
    doClose();
    }
    }
    }
  • connect方法位于SocketUtils,执行底层java.nio的连接操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)
    throws IOException {
    try {
    return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
    @Override
    public Boolean run() throws IOException {
    return socketChannel.connect(remoteAddress);
    }
    });
    } catch (PrivilegedActionException e) {
    throw (IOException) e.getCause();
    }
    }

至此,连接操作完毕