Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【node源码】net源码阅读 #15

Open
EasonYou opened this issue May 19, 2020 · 0 comments
Open

【node源码】net源码阅读 #15

EasonYou opened this issue May 19, 2020 · 0 comments

Comments

@EasonYou
Copy link
Owner

net

net模块用于创建基于TCP或IPC的服务端或客户端。

这里有一个net创建TCP服务器的基本例子,以此执行服务端和客户端即可在终端看到输出。

// 创建socket服务器
const net = require('net');
let clients = 0;

const server = net.createServer(client => {
  clients++;
  let clientId = clients;
  console.log('Client connect:', clientId);

  client.on('end', () => {
    console.log('Client disconnected:', clientId);
  });
  console.log(client)
  client.write('Welcome client: ' + clientId + ' \r\n');
  client.pipe(client); // 把客户端的数据返回给客户端
});

server.listen(8000, () => {
  console.log('Server started on port 8000');
});

// 创建socket客户端
const net = require('net');

const client = net.connect(8000);

client.on('data', data => {
  console.log('data>>>', data.toString());
});

client.on('end', () => {
  console.log('Client disconnected');
});

源码

net.createServer

通过上面的例子,我可以看到,服务端是通过net.createServer这个工厂函数来进行创建的。在源码中,是通过new了一个Sever实例来进行创建的。

Server方法做了几件很简单的事情

  • 判断入参,进行connection事件的绑定
  • 初始化各种参数,并劫持connections的get/set方法,对其进行改造,似的不直接访问_connections属性
function createServer(options, connectionListener) {
  return new Server(options, connectionListener);
}

function Server(options, connectionListener) {
  // ...继承ee
  EventEmitter.call(this);

  if (typeof options === 'function') {
    // 这里前后都省略一些代码,主要做的是监听connection事件以及初始化options
    this.on('connection', connectionListener);
    options = { ...options };
  } else { /** ... */}
  // 默认_connections为零
  this._connections = 0;
  // 劫持connections属性,对其get/set方法做定制化
  ObjectDefineProperty(this, 'connections', {
    get: deprecate(() => {
      return this._connections;
    }),
    set: deprecate((val) => (this._connections = val)),
    configurable: true, enumerable: false
  });
  // ...
}

在这里可以看到,实例的创建非常简单,但是这样还不能创建一个TCP服务,还需要对端口进行监听。

从源码里可以看到,listen方法做了各种各样的判断条件,以适配各种各样入参条件。这些我们都可以忽略,最重要的是看到了一个方法,listenInCluster。不管什么样的情况都会调用到这个方法,即使是lookupAndListen方法,内部也是listenInCluster

Server.prototype.listen = function(...args) {
  // ...
  // 监听listening事件
  if (cb !== null) {
    this.once('listening', cb);
  }
  // ...
  if (options instanceof TCP) {
    // ...
    listenInCluster(this, null, -1, -1, backlogFromArgs);
    return this;
  }

  if (typeof options.fd === 'number' && options.fd >= 0) {
    listenInCluster(this, null, null, null, backlogFromArgs, options.fd);
    return this;
  }

  let backlog;
  if (typeof options.port === 'number' || typeof options.port === 'string') {
    // ...
    if (options.host) {
      lookupAndListen(this, options.port | 0, options.host, backlog,
                      options.exclusive, flags);
    } else {
      listenInCluster(this, null, options.port | 0, 4,
                      backlog, undefined, options.exclusive);
    }
    return this;
  }
  if (options.path && isPipeName(options.path)) {
    // ...
    listenInCluster(this, pipeName, -1, -1,
                    backlog, undefined, options.exclusive);

    if (!this._handle) {
      // Failed and an error shall be emitted in the next tick.
      // Therefore, we directly return.
      return this;
    }
    // ...
    return this;
  }
  // ...
};

那么接下来看看listenInCluster方法做了些什么事情

function listenInCluster(server, address, port, addressType,
  backlog, fd, exclusive, flags) {
  if (cluster === undefined) cluster = require('cluster');

  if (cluster.isMaster || exclusive) {
    // 如果是主进程,直接进行监听
    server._listen2(address, port, addressType, backlog, fd, flags);
    return;
  }
  // 否则需要在额外处理集群的监听方法,这里先不做深入研究
  cluster._getServer(server, serverQuery, listenOnMasterHandle);
  function listenOnMasterHandle(err, handle) {
    // ...
    server._listen2(address, port, addressType, backlog, fd, flags);
  }
  // ...
}

listenInCluster在主进程调用了_listen2方法,并且处理了在集群中的的监听事件,这里cluster做了什么先不去追究,只要知道最后在回调方法里,也是调用了_listen2方法即可。cluster的会单独在cluster上去做研究。

下面来看下_listen2方法

// 实际上调用的是setupListenHandle方法
Server.prototype._listen2 = setupListenHandle;

function setupListenHandle(address, port, addressType, backlog, fd, flags) {
  if (this._handle) { /** */} else {
    let rval = null;
    if (!address && typeof fd !== 'number') {
      rval = createServerHandle(DEFAULT_IPV6_ADDR, port, 6, fd, flags);
      // ...
    }

    if (rval === null)
      rval = createServerHandle(address, port, addressType, fd, flags);
    // ...
    this._handle = rval; // 绑定返回的句柄实例
  }

  // 绑定onconnection方法
  this._handle.onconnection = onconnection;
  // ...
  // 生成一个唯一的key,作为连接的唯一标识
  this._connectionKey = addressType + ':' + address + ':' + port;
  // ...
}

setupListenHandle方法它通过createServerHandle创建了一个服务句柄,并绑定在了实例上,这里来看下createServerHandle的内部实现

function createServerHandle(address, port, addressType, fd, flags) {
	// ...
  if (typeof fd === 'number' && fd >= 0) {
    // 当有传入文件标识符的时候,监听其文件描述符
    try {
      handle = createHandle(fd, true);
    } catch (e) { /** ... */}
		// ...
  } else if (port === -1 && addressType === -1) {
    // 当port为-1且addressType为-1的时候,创建创建 UNIX domain socket 或 Windows pipe 服务器
    handle = new Pipe(PipeConstants.SERVER);
		// ...
  } else {
    // tcp服务
    handle = new TCP(TCPConstants.SERVER);
    isTCP = true;
  }

  if (address || port || isTCP) {
    debug('bind to', address || 'any');
    if (!address) {
      // 尝试绑定ipv6
      err = handle.bind6(DEFAULT_IPV6_ADDR, port, flags);
      if (err) {
        handle.close();
        // 回调绑定ipv4
        return createServerHandle(DEFAULT_IPV4_ADDR, port);
      }
    } else if (addressType === 6) {
      err = handle.bind6(address, port, flags); // ipv6
    } else {
      err = handle.bind(address, port); // ipv4
    }
  }
	// ...
  return handle;
}

通过入参判断是否是监听文件标识符还是管道服务还是TCP,然后通过address判断是否要用默认地址,再判断用ipv4还是ipv6

默认的IP地址在上面有定义

const DEFAULT_IPV4_ADDR = '0.0.0.0';
const DEFAULT_IPV6_ADDR = '::';

setupListenHandle方法中,有这么一段代码,用于触发connection事件

function onconnection(err, clientHandle) {
  // ... 
  const socket = new Socket({
    handle: clientHandle,
    allowHalfOpen: self.allowHalfOpen,
    pauseOnCreate: self.pauseOnConnect,
    readable: true,
    writable: true
  });
  // ...
  self.emit('connection', socket);
}

// socket内部不再赘述,这里只点出socket是一个双工流的实例
function Socket(options) {
  // ...
  // 继承双工流
	stream.Duplex.call(this, options);
  // ...
}

从这里可以看到,socket其实是一个双工流,用于监听和实现流的读写。这个方法的引用,在c++的内建模块,这里不再深入赘述

总结

最后总结下net的整体流程

  • createServer只是创建了一个EE的实例,并监听一些事件,没有做创建TCP服务的事情
  • 服务的创建在listen方法上,根据入参的变化优化参数后,最终都调用的listenInCluster方法
  • listenInCluster内,做了cluster集群的一些优化处理(这里在cluster模块再深入了解)。不管是cluster还是master,最终是调用的_listen2方法,寻找根源,其实是setupListenHandle方法
  • setupListenHandle内,调用了createServerHandle方法,并绑定了onconnection事件
  • onconnection实际上由c++的内建模块调用,绑定在handle.onconnection。在内部emit了onconnection事件,并创建了一个socket,从代码得知其实socket是一个双工流实例,并回传到回调方法里,所以我们才可以对socket进行读写和数据监听操作
  • 回到createServerHandle方法,改方法通过不同的入参,可创建fd文件描述符的句柄,还有windows下的pipe管道句柄,当然还有ipv4/ipv6TCP句柄,最后当顶在Server实例的_handle属性上。
  • 至此,server的监听结束,后续靠连接触发服务间的数据交互

结语

在核心模块中,JavaScript对服务的处理比较浅薄,只是在TCP层上封装了监听事件,最终调用的还是c++的内建模块来实现服务的监听,后面有机会,可以再往内建模块继续深入研究。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant