Node 集群源碼初探

引言

當我們談起 nodejs 時,由于 JavaScript 只能在單線程上運行, 導致 一個 Node 進程只能運行在一個CPU上, 無法發揮現代 CPU 多核的特性。 這對于一個 服務端語言來說, 是比較掣肘其發展的。 好在 Node 在 v0.10 后, 可以使用 Cluster 模塊搭建 多進程服務, 并在 v0.12 重寫了該模塊, 大幅提高其性能, 下面我們將走進 Node-Cluster ,看看 Node 是如何實現集群的。

多進程單線程 與 單進程多線程 模型

首先,這涉及一個 程序設計 的兩種模型 本文提及的 Nodejs 便是使用 多進程單線程 實現的, 而 Java 等語言 則使用 單進程多線程 模型 故本文著重 多進程單線程 模型, 至于 單進程多線程 模型,則會在比較中稍微涉及。

多進程單線程

正如上文說到, JavaScript 只能在單線程中運行, 故 Node 在 JS層面只能 在某進程的一個線程中運行。 若要實現集群, 則必須創建多個進程, 以實現多個應用實例同時運行。

Cluster 模塊, 提供了 master-worker 模式 啟動多個應用模式。

接下來我們就走入這個模塊, 看看其內部具體做了哪些事情。

Cluster

Cluster 是什么?

  1. 在服務器上啟動多個進程
  2. 每個進程里都跑的同一份代碼
  3. 每個進程竟然還能監聽同一個端口 (下文分析實現原理)

其中:

  1. 負責啟動其他進程的叫做 Master 進程,他好比是個『包工頭』,不做具體的工作,只負責啟動其他進程。
  2. 其他被啟動的叫 Worker 進程,顧名思義就是干活的『工人』。它們接收請求,對外提供服務。
  3. Worker 進程的數量一般根據服務器的 CPU 核數來定,這樣就可以完美利用多核資源。
const cluster = require('cluster');  
const http = require('http');  
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {  
  // Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', function(worker, code, signal) {
    console.log('worker ' + worker.process.pid + ' died');
  });
} else {
  // Workers can share any TCP connection
  // In this case it is an HTTP server
  http.createServer(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
  }).listen(8000);
}

worker 進程的創建

使用 Cluster 模塊的 fork 方法來創建出子進程

cluster.fork();

先從 worker 進程的初始化開始看, master 進程在 fork 其 worker進程時 會在其環境變量(workerEnv)中 附加上一個 唯一ID(NODE UNIQUE ID) 該ID 是一個從 0 開始的遞增數。

var ids = 0;

//.....

cluster.fork = function(env) {  
  cluster.setupMaster();
  const id = ++ids;
  const workerProcess = createWorkerProcess(id, env);
  const worker = new Worker({
    id: id,
    process: workerProcess
  });

// ....

function createWorkerProcess(id, env) {  
  const workerEnv = util._extend({}, process.env);
  const execArgv = cluster.settings.execArgv.slice();
  const debugArgRegex = /--inspect(?:-brk|-port)?|--debug-port/;

  util._extend(workerEnv, env);
  workerEnv.NODE_UNIQUE_ID = '' + id;

  // .....

  return fork(cluster.settings.exec, cluster.settings.args, {
    cwd: cluster.settings.cwd,
    env: workerEnv,
    silent: cluster.settings.silent,
    windowsHide: cluster.settings.windowsHide,
    execArgv: execArgv,
    stdio: cluster.settings.stdio,
    gid: cluster.settings.gid,
    uid: cluster.settings.uid
  });
}

然后 Node 在實例初始化時,使用該 ID 判斷使用 clild.js Or master.js

const childOrMaster = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master';  
module.exports = require(`internal/cluster/${childOrMaster}`);

把目光再集中到 這個 fork() 函數中來,

exports.fork = function(modulePath /*, args, options*/) {

  // Get options and args arguments.
  var execArgv;
  var options = {};
  var args = [];
  var pos = 1;
  if (pos < arguments.length && Array.isArray(arguments[pos])) {
    args = arguments[pos++];
  }

  if (pos < arguments.length && arguments[pos] != null) {
    if (typeof arguments[pos] !== 'object') {
      throw new TypeError('Incorrect value of args option');
    }

    options = util._extend({}, arguments[pos++]);
  }

  // Prepare arguments for fork:
  execArgv = options.execArgv || process.execArgv;

  if (execArgv === process.execArgv && process._eval != null) {
    const index = execArgv.lastIndexOf(process._eval);
    if (index > 0) {
      // Remove the -e switch to avoid fork bombing ourselves.
      execArgv = execArgv.slice();
      execArgv.splice(index - 1, 2);
    }
  }

  args = execArgv.concat([modulePath], args);

  if (typeof options.stdio === 'string') {
    options.stdio = stdioStringToArray(options.stdio);
  } else if (!Array.isArray(options.stdio)) {
    // Use a separate fd=3 for the IPC channel. Inherit stdin, stdout,
    // and stderr from the parent if silent isn't set.
    options.stdio = options.silent ? stdioStringToArray('pipe') :
      stdioStringToArray('inherit');
  } else if (options.stdio.indexOf('ipc') === -1) {
    throw new TypeError('Forked processes must have an IPC channel');
  }

  options.execPath = options.execPath || process.execPath;
  options.shell = false;

  return spawn(options.execPath, args, options);
};

在函數中 做了一些 參數準備,而重點在于 對 這個 options.stdio 的處理。

options.stdio 用于配置子進程與父進程之間建立的管道, 其值應該為一個數組, 但是為了方便, 值可以是以下的字符串之一:

'pipe' - 等同于 ['pipe', 'pipe', 'pipe'] (默認)

'ignore' - 等同于 ['ignore', 'ignore', 'ignore']

'inherit' - 等同于 [process.stdin, process.stdout, process.stderr] 或 [0,1,2]

其每個值 分別對應 [process.stdin, process.stdout, process.stderr] 標準輸入、標準輸出、標準錯誤 輸出到父進程的方式。

而使用 Fork 方式 衍生出的子進程,又必須加上 一個 IPC通道 用于父子間 傳遞消息或文件描述符, 故 stdioStringToArray 函數代碼如下

function stdioStringToArray(option) {  
  switch (option) {
    case 'ignore':
    case 'pipe':
    case 'inherit':
      return [option, option, option, 'ipc'];
    default:
      throw new TypeError('Incorrect value of stdio option: ' + option);
  }
}

多進程之間 的 進程間通信

下面我們就重點來看,這個 IPC通道 是如何實現的,以及其工作原理

進程間通信(Inter-process communication, IPC)其實是個很簡單的概念,只要你將這個進程的數據傳到 另外一個進程就是 IPC 了, 要實現 這個數據傳遞的方式有非常的多, 如以下

在 Node 中 IPC 實現分兩種, 在 Windows 上通過 命名管道, 在 UNIX 上則使用 UNIX domain sockets (UDS), 詳情參見 官方文檔

目前 Linux 還是主流的服務端操作系統, 主要分析下在 Linux 下 UDS 的使用方式

UDS 是在 Socket 的基礎上發展而來的, Socket 一般我們指的都是 IP Socket , 通過網絡協議進行通信, 但是在同一臺設備上的通信,是否可以繞開網絡層的限制呢。 這里我們就可以通過 UDS 來實現, 所以把 它稱之為 LocalSocket ,看起來更貼切一點。

在 Linux 中, 一切都可以當做是 文件。 UDS 也不例外。

在 Node 的 child_process.js 模塊中, 有如下代碼

stdio = stdio.reduce(function(acc, stdio, i) {  
  // ......
else if (stdio === 'ipc') {  
      if (sync || ipc !== undefined) {
        // Cleanup previously created pipes
        cleanup();
        if (!sync)
          throw new errors.Error('ERR_IPC_ONE_PIPE');
        else
          throw new errors.Error('ERR_IPC_SYNC_FORK');
      }

      ipc = new Pipe(PipeConstants.IPC);
      ipcFd = i;

      acc.push({
        type: 'pipe',
        handle: ipc,
        ipc: true
      });
    }

當 stdio 的 類型為 ipc 時,會創建一個 ipc 管道, 其fd 為 'ipc' 在stdio數組 中的索引

ipc = new Pipe(PipeConstants.IPC);  
ipcFd = i;

此時目光應該被這個 Pipe 所吸引了吧, 那么它又是什么呢, 話不多少直接上 libuv 中對應 Pipe 的實現。

void PipeWrap::New(const FunctionCallbackInfo<Value>& args) {  
  // This constructor should not be exposed to public javascript.
  // Therefore we assert that we are not trying to call this as a
  // normal function.
  CHECK(args.IsConstructCall());
  CHECK(args[0]->IsInt32());
  Environment* env = Environment::GetCurrent(args);

  int type_value = args[0].As<Int32>()->Value();
  PipeWrap::SocketType type = static_cast<PipeWrap::SocketType>(type_value);

  bool ipc;
  ProviderType provider;
  switch (type) {
    case SOCKET:
      provider = PROVIDER_PIPEWRAP;
      ipc = false;
      break;
    case SERVER:
      provider = PROVIDER_PIPESERVERWRAP;
      ipc = false;
      break;
    case IPC:
      provider = PROVIDER_PIPEWRAP;
      ipc = true;
      break;
    default:
      UNREACHABLE();
  }

  new PipeWrap(env, args.This(), provider, ipc);
}


PipeWrap::PipeWrap(Environment* env,  
                   Local<Object> object,
                   ProviderType provider,
                   bool ipc)
    : ConnectionWrap(env, object, provider) {
  int r = uv_pipe_init(env->event_loop(), &handle_, ipc);
  CHECK_EQ(r, 0);  // How do we proxy this error up to javascript?
                   // Suggestion: uv_pipe_init() returns void.
  UpdateWriteQueueSize();
}
int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {  
  uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
  handle->shutdown_req = NULL;
  handle->connect_req = NULL;
  handle->pipe_fname = NULL;
  handle->ipc = ipc;
  return 0;
}
void uv__stream_init(uv_loop_t* loop,  
                     uv_stream_t* stream,
                     uv_handle_type type) {
  int err;

  uv__handle_init(loop, (uv_handle_t*)stream, type);
  stream->read_cb = NULL;
  stream->alloc_cb = NULL;
  stream->close_cb = NULL;
  stream->connection_cb = NULL;
  stream->connect_req = NULL;
  stream->shutdown_req = NULL;
  stream->accepted_fd = -1;
  stream->queued_fds = NULL;
  stream->delayed_error = 0;
  QUEUE_INIT(&stream->write_queue);
  QUEUE_INIT(&stream->write_completed_queue);
  stream->write_queue_size = 0;

  if (loop->emfile_fd == -1) {
    err = uv__open_cloexec("/dev/null", O_RDONLY);
    if (err < 0)
        /* In the rare case that "/dev/null" isn't mounted open "/"
         * instead.
         */
        err = uv__open_cloexec("/", O_RDONLY);
    if (err >= 0)
      loop->emfile_fd = err;
  }

#if defined(__APPLE__)
  stream->select = NULL;
#endif /* defined(__APPLE_) */

  uv__io_init(&stream->io_watcher, uv__stream_io, -1);
}

可以看到對應 ipc 管道, 底層是通過文件流的方式實現的。 那么這個管道 就一樣擁有 和 stream 一樣的模式,即 open—write/read—close

父進程在實際創建子進程前,會創建IPC通道并監聽它,然后才真正創建出子進程,并通過環境變量(NODE CHANNEL FD)告訴子進程這個IPC通信的文件描述符(fd)。子進程在啟動的過程中,根據文件描述符去連接這個已存在的IPC通道,從而完成父子進程之間的連接。

當父進程 send 數據到子進程時, 便通過這個 fd 向 這個特殊的文件開始寫入數據,此時調用底層stream 的 write 方法,而子進程由于在啟動的過程中便已經連接上 該通道, 在應用層通過 message 事件(底層應是 stream 的 read方法),來接收數據。

由于這個IPC通道是 雙工通信的, 故子進程也可以實現向父進程通信。

問題解析

多進程之間是如何實現 端口共享的?

那么 node 又是如何實現多進程 監聽同一個端口呢,這里的關鍵在于 句柄傳遞

如下代碼:

//主進程代碼

var child = require('child_process').fork('child.js');  
// Open up the server object and send the handle
var server = require('net').createServer();  
server.on('connection', function (socket) {  
    socket.end('handled by parent\n');
});
server.listen(1337, function () {  
    child.send('server', server);
});

//子進程代碼
process.on('message', function (m, server) {  
    if (m === 'server') {
        server.on('connection', function (socket) {
            socket.end('handled by child\n');
        });
    }
});

在示例中,父進程直接將創建出來的 Tcp對象 傳遞到子進程中,但是我們知道兩個進程之間又是無法直接共享內存的, 那么這又是怎么實現的呢?

來看這個 send方法

child.send(message, [sendHandle])

目前,子進程對象send()方法可以發送的句柄類型包括如下幾種:

1.net.socket,tcp套接字

2.net.Server,tcp服務器,任意建立在tcp服務上的應用層服務都可以享受到它帶來的好處。

3.net.Native,c++層面的tcp套接字或IPC管道。

4.dgram.socket,UDP套接字

5.dgram.Native,C++層面的UDP套接字

send()方法在將消息發送到IPC管道前,將消息組裝成兩個對象,一個參數是handle,另一個是message, 而發送到子進程的 實際上是這個句柄的 文件描述符, message對象也會序列化為字符串。

而子進程由于之前連接了IPC通道, 可以讀取到父進程發送的消息。獲取到這個消息后, 通過JSON.parse 還原為對象, 并分析 其中的cmd值, 若 message.cmd = NODE_HANDLE, 則表示父進程傳遞的是一個句柄,此時便會通過 獲取到的 fd 和傳遞的句柄類型,還原出這個Tcp對象。

還原出這個server 對象后, 便是去監聽這個端口了。

setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))

而在 libuv 中在 setsockopt 時設置了 SO_REUSEADDR 選項(關于這個選項的含義可以自行百度),簡而言之,設置了這個選項后, 在Linux層面允許我們使用不同的進程 就相同的網卡和端口進行監聽, 對于獨立啟動的進程,并不知道彼此的fd, 所以當一個進程監聽成功后, 后面的進程便會失敗,但是 通過 send 方式還原出來的 server對象,他們的fd是相同的,此時該子進程就可以通過這個fd 去監聽這個端口。

但是 fd 在同一時刻只能被一個進程所占用, 換言之就是網絡請求向服務器端發送時,只有一個幸運的進程能夠搶到連接,也就是說只有他能為這個請求進行服務。這些進程也都是搶占式的。

總結

隨著這些模塊逐漸完善, Nodejs 在服務端的使用場景也越來越豐富,如果你僅僅是因為JS 這個后綴而注意到它的話, 那么我希望你能暫停腳步,好好了解一下這門年輕的語言,相信它會給你帶來驚喜。

參考

https://github.com/nodejs/node

https://nodejs.org/dist/latest-v10.x/docs/api/net.html#net ipc support

https://www.jianshu.com/p/335a9e101c3f

http://docs.libuv.org/en/v1.x/stream.html

我來評幾句
登錄后評論

已發表評論數()

相關站點

熱門文章
贵州11选5走势图软件