The data contained in this repository can be downloaded to your computer using one of several clients.
Please see the documentation of your version control software client for more information.

Please select the desired protocol below to get the URL.

This URL has Read-Only access.

Statistics
| Branch: | Revision:

main_repo / lib / cluster.js @ 5e7e51c2

History | View | Annotate | Download (16.1 KB)

1
// Copyright Joyent, Inc. and other Node contributors.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the
5
// "Software"), to deal in the Software without restriction, including
6
// without limitation the rights to use, copy, modify, merge, publish,
7
// distribute, sublicense, and/or sell copies of the Software, and to permit
8
// persons to whom the Software is furnished to do so, subject to the
9
// following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included
12
// in all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20
// USE OR OTHER DEALINGS IN THE SOFTWARE.
21

    
22
var assert = require('assert');
23
var fork = require('child_process').fork;
24
var net = require('net');
25
var EventEmitter = require('events').EventEmitter;
26
var util = require('util');
27

    
28
function isObject(o) {
29
  return (typeof o === 'object' && o !== null);
30
}
31

    
32
var debug;
33
if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) {
34
  debug = function(x) {
35
    var prefix = process.pid + ',' +
36
        (process.env.NODE_UNIQUE_ID ? 'Worker' : 'Master');
37
    console.error(prefix, x);
38
  };
39
} else {
40
  debug = function() { };
41
}
42

    
43
// cluster object:
44
function Cluster() {
45
  EventEmitter.call(this);
46
}
47

    
48
util.inherits(Cluster, EventEmitter);
49

    
50
var cluster = module.exports = new Cluster();
51

    
52
// Used in the master:
53
var masterStarted = false;
54
var ids = 0;
55
var serverHandlers = {};
56

    
57
// Used in the worker:
58
var serverListeners = {};
59
var queryIds = 0;
60
var queryCallbacks = {};
61

    
62
// Define isWorker and isMaster
63
cluster.isWorker = 'NODE_UNIQUE_ID' in process.env;
64
cluster.isMaster = ! cluster.isWorker;
65

    
66
// The worker object is only used in a worker
67
cluster.worker = cluster.isWorker ? {} : null;
68
// The workers array is only used in the master
69
cluster.workers = cluster.isMaster ? {} : null;
70

    
71
// Settings object
72
var settings = cluster.settings = {};
73

    
74
// Simple function to call a function on each worker
75
function eachWorker(cb) {
76
  // Go through all workers
77
  for (var id in cluster.workers) {
78
    if (cluster.workers.hasOwnProperty(id)) {
79
      cb(cluster.workers[id]);
80
    }
81
  }
82
}
83

    
84
// Extremely simple progress tracker
85
function ProgressTracker(missing, callback) {
86
  this.missing = missing;
87
  this.callback = callback;
88
}
89
ProgressTracker.prototype.done = function() {
90
  this.missing -= 1;
91
  this.check();
92
};
93
ProgressTracker.prototype.check = function() {
94
  if (this.missing === 0) this.callback();
95
};
96

    
97
cluster.setupMaster = function(options) {
98
  // This can only be called from the master.
99
  assert(cluster.isMaster);
100

    
101
  // Don't allow this function to run more than once
102
  if (masterStarted) return;
103
  masterStarted = true;
104

    
105
  // Get filename and arguments
106
  options = options || {};
107

    
108
  // By default, V8 writes the profile data of all processes to a single
109
  // v8.log.
110
  //
111
  // Running that log file through a tick processor produces bogus numbers
112
  // because many events won't match up with the recorded memory mappings
113
  // and you end up with graphs where 80+% of ticks is unaccounted for.
114
  //
115
  // Fixing the tick processor to deal with multi-process output is not very
116
  // useful because the processes may be running wildly disparate workloads.
117
  //
118
  // That's why we fix up the command line arguments to include
119
  // a "--logfile=v8-%p.log" argument (where %p is expanded to the PID)
120
  // unless it already contains a --logfile argument.
121
  var execArgv = options.execArgv || process.execArgv;
122
  if (execArgv.some(function(s) { return /^--prof/.test(s); }) &&
123
      !execArgv.some(function(s) { return /^--logfile=/.test(s); }))
124
  {
125
    execArgv = execArgv.slice();
126
    execArgv.push('--logfile=v8-%p.log');
127
  }
128

    
129
  // Set settings object
130
  settings = cluster.settings = {
131
    exec: options.exec || process.argv[1],
132
    execArgv: execArgv,
133
    args: options.args || process.argv.slice(2),
134
    silent: options.silent || false
135
  };
136

    
137
  // emit setup event
138
  cluster.emit('setup');
139
};
140

    
141
// Check if a message is internal only
142
var INTERNAL_PREFIX = 'NODE_CLUSTER_';
143
function isInternalMessage(message) {
144
  return (isObject(message) &&
145
          typeof message.cmd === 'string' &&
146
          message.cmd.indexOf(INTERNAL_PREFIX) === 0);
147
}
148

    
149
// Modify message object to be internal
150
function internalMessage(inMessage) {
151
  var outMessage = util._extend({}, inMessage);
152

    
153
  // Add internal prefix to cmd
154
  outMessage.cmd = INTERNAL_PREFIX + (outMessage.cmd || '');
155

    
156
  return outMessage;
157
}
158

    
159
// Handle callback messages
160
function handleResponse(outMessage, outHandle, inMessage, inHandle, worker) {
161

    
162
  // The message there will be sent
163
  var message = internalMessage(outMessage);
164

    
165
  // callback id - will be undefined if not set
166
  message._queryEcho = inMessage._requestEcho;
167

    
168
  // Call callback if a query echo is received
169
  if (inMessage._queryEcho) {
170
    queryCallbacks[inMessage._queryEcho](inMessage.content, inHandle);
171
    delete queryCallbacks[inMessage._queryEcho];
172
  }
173

    
174
  // Send if outWrap contains something useful
175
  if (!(outMessage === undefined && message._queryEcho === undefined)) {
176
    sendInternalMessage(worker, message, outHandle);
177
  }
178
}
179

    
180
// Handle messages from both master and workers
181
var messageHandler = {};
182
function handleMessage(worker, inMessage, inHandle) {
183

    
184
  // Remove internal prefix
185
  var message = util._extend({}, inMessage);
186
  message.cmd = inMessage.cmd.substr(INTERNAL_PREFIX.length);
187

    
188
  var respondUsed = false;
189
  function respond(outMessage, outHandler) {
190
    respondUsed = true;
191
    handleResponse(outMessage, outHandler, inMessage, inHandle, worker);
192
  }
193

    
194
  // Run handler if it exists
195
  if (messageHandler[message.cmd]) {
196
    messageHandler[message.cmd](message, worker, respond);
197
  }
198

    
199
  // Send respond if it hasn't been called yet
200
  if (respondUsed === false) {
201
    respond();
202
  }
203
}
204

    
205
// Messages to the master will be handled using these methods
206
if (cluster.isMaster) {
207

    
208
  // Handle online messages from workers
209
  messageHandler.online = function(message, worker) {
210
    worker.state = 'online';
211
    debug('Worker ' + worker.process.pid + ' online');
212
    worker.emit('online');
213
    cluster.emit('online', worker);
214
  };
215

    
216
  // Handle queryServer messages from workers
217
  messageHandler.queryServer = function(message, worker, send) {
218

    
219
    // This sequence of information is unique to the connection
220
    // but not to the worker
221
    var args = [message.address,
222
                message.port,
223
                message.addressType,
224
                message.fd];
225
    var key = args.join(':');
226
    var handler;
227

    
228
    if (serverHandlers.hasOwnProperty(key)) {
229
      handler = serverHandlers[key];
230
    } else if (message.addressType === 'udp4' ||
231
               message.addressType === 'udp6') {
232
      var dgram = require('dgram');
233
      handler = dgram._createSocketHandle.apply(net, args);
234
      serverHandlers[key] = handler;
235
    } else {
236
      handler = net._createServerHandle.apply(net, args);
237
      serverHandlers[key] = handler;
238
    }
239

    
240
    // echo callback with the fd handler associated with it
241
    send({}, handler);
242
  };
243

    
244
  // Handle listening messages from workers
245
  messageHandler.listening = function(message, worker) {
246

    
247
    worker.state = 'listening';
248

    
249
    // Emit listening, now that we know the worker is listening
250
    worker.emit('listening', {
251
      address: message.address,
252
      port: message.port,
253
      addressType: message.addressType,
254
      fd: message.fd
255
    });
256
    cluster.emit('listening', worker, {
257
      address: message.address,
258
      port: message.port,
259
      addressType: message.addressType,
260
      fd: message.fd
261
    });
262
  };
263

    
264
  // Handle suicide messages from workers
265
  messageHandler.suicide = function(message, worker) {
266
    worker.suicide = true;
267
  };
268
}
269

    
270

    
271
// Messages to a worker will be handled using these methods
272
else if (cluster.isWorker) {
273

    
274
  // Handle worker.disconnect from master
275
  messageHandler.disconnect = function(message, worker) {
276
    worker.disconnect();
277
  };
278
}
279

    
280
function toDecInt(value) {
281
  value = parseInt(value, 10);
282
  return isNaN(value) ? null : value;
283
}
284

    
285
// Create a worker object, that works both for master and worker
286
function Worker(customEnv) {
287
  if (!(this instanceof Worker)) return new Worker();
288
  EventEmitter.call(this);
289

    
290
  var self = this;
291
  var env = process.env;
292

    
293
  // Assign a unique id, default null
294
  this.id = cluster.isMaster ? ++ids : toDecInt(env.NODE_UNIQUE_ID);
295

    
296
  // XXX: Legacy.  Remove in 0.9
297
  this.workerID = this.uniqueID = this.id;
298

    
299
  // Assign state
300
  this.state = 'none';
301

    
302
  // Create or get process
303
  if (cluster.isMaster) {
304

    
305
    // Create env object
306
    // first: copy and add id property
307
    var envCopy = util._extend({}, env);
308
    envCopy['NODE_UNIQUE_ID'] = this.id;
309
    // second: extend envCopy with the env argument
310
    if (isObject(customEnv)) {
311
      envCopy = util._extend(envCopy, customEnv);
312
    }
313

    
314
    // fork worker
315
    this.process = fork(settings.exec, settings.args, {
316
      'env': envCopy,
317
      'silent': settings.silent,
318
      'execArgv': settings.execArgv
319
    });
320
  } else {
321
    this.process = process;
322
  }
323

    
324
  if (cluster.isMaster) {
325
    // Save worker in the cluster.workers array
326
    cluster.workers[this.id] = this;
327

    
328
    // Emit a fork event, on next tick
329
    // There is no worker.fork event since this has no real purpose
330
    process.nextTick(function() {
331
      cluster.emit('fork', self);
332
    });
333
  }
334

    
335
  // handle internalMessage, exit and disconnect event
336
  this.process.on('internalMessage', handleMessage.bind(null, this));
337
  this.process.once('exit', function(exitCode, signalCode) {
338
    prepareExit(self, 'dead');
339
    self.emit('exit', exitCode, signalCode);
340
    cluster.emit('exit', self, exitCode, signalCode);
341
  });
342
  this.process.once('disconnect', function() {
343
    prepareExit(self, 'disconnected');
344
    self.emit('disconnect');
345
    cluster.emit('disconnect', self);
346
  });
347

    
348
  // relay message and error
349
  this.process.on('message', this.emit.bind(this, 'message'));
350
  this.process.on('error', this.emit.bind(this, 'error'));
351

    
352
}
353
util.inherits(Worker, EventEmitter);
354
cluster.Worker = Worker;
355

    
356
function prepareExit(worker, state) {
357

    
358
  // set state to disconnect
359
  worker.state = state;
360

    
361
  // Make suicide a boolean
362
  worker.suicide = !!worker.suicide;
363

    
364
  // Remove from workers in the master
365
  if (cluster.isMaster) {
366
    delete cluster.workers[worker.id];
367
  }
368
}
369

    
370
// Send internal message
371
function sendInternalMessage(worker, message/*, handler, callback*/) {
372

    
373
  // Exist callback
374
  var callback = arguments[arguments.length - 1];
375
  if (typeof callback !== 'function') {
376
    callback = undefined;
377
  }
378

    
379
  // exist handler
380
  var handler = arguments[2] !== callback ? arguments[2] : undefined;
381

    
382
  if (!isInternalMessage(message)) {
383
    message = internalMessage(message);
384
  }
385

    
386
  // Store callback for later
387
  if (callback) {
388
    message._requestEcho = worker.id + ':' + (++queryIds);
389
    queryCallbacks[message._requestEcho] = callback;
390
  }
391

    
392

    
393
  worker.send(message, handler);
394
}
395

    
396
// Send message to worker or master
397
Worker.prototype.send = function() {
398

    
399
  // You could also just use process.send in a worker
400
  this.process.send.apply(this.process, arguments);
401
};
402

    
403
// Kill the worker without restarting
404
Worker.prototype.destroy = function() {
405
  var self = this;
406

    
407
  this.suicide = true;
408

    
409
  if (cluster.isMaster) {
410
    // Disconnect IPC channel
411
    // this way the worker won't need to propagate suicide state to master
412
    if (self.process.connected) {
413
      self.process.once('disconnect', function() {
414
        self.process.kill();
415
      });
416
      self.process.disconnect();
417
    } else {
418
      self.process.kill();
419
    }
420

    
421
  } else {
422
    // Channel is open
423
    if (this.process.connected) {
424

    
425
      // Inform master to suicide and then kill
426
      sendInternalMessage(this, {cmd: 'suicide'}, function() {
427
        process.exit(0);
428
      });
429

    
430
      // When channel is closed, terminate the process
431
      this.process.once('disconnect', function() {
432
        process.exit(0);
433
      });
434
    } else {
435
      process.exit(0);
436
    }
437
  }
438
};
439

    
440
// The .disconnect function will close all servers
441
// and then disconnect the IPC channel.
442
if (cluster.isMaster) {
443
  // Used in master
444
  Worker.prototype.disconnect = function() {
445
    this.suicide = true;
446

    
447
    sendInternalMessage(this, {cmd: 'disconnect'});
448
  };
449

    
450
} else {
451
  // Used in workers
452
  Worker.prototype.disconnect = function() {
453
    var self = this;
454

    
455
    this.suicide = true;
456

    
457
    // keep track of open servers
458
    var servers = Object.keys(serverListeners).length;
459
    var progress = new ProgressTracker(servers, function() {
460
      // There are no more servers open so we will close the IPC channel.
461
      // Closing the IPC channel will emit a disconnect event
462
      // in both master and worker on the process object.
463
      // This event will be handled by prepareExit.
464
      self.process.disconnect();
465
    });
466

    
467
    // depending on where this function was called from (master or worker)
468
    // The suicide state has already been set,
469
    // but it doesn't really matter if we set it again.
470
    sendInternalMessage(this, {cmd: 'suicide'}, function() {
471
      // in case there are no servers
472
      progress.check();
473

    
474
      // closing all servers gracefully
475
      var server;
476
      for (var key in serverListeners) {
477
        server = serverListeners[key];
478

    
479
        // in case the server is closed we won't close it again
480
        if (server._handle === null) {
481
          progress.done();
482
          continue;
483
        }
484

    
485
        server.on('close', progress.done.bind(progress));
486
        server.close();
487
      }
488
    });
489

    
490
  };
491
}
492

    
493
// Fork a new worker
494
cluster.fork = function(env) {
495
  // This can only be called from the master.
496
  assert(cluster.isMaster);
497

    
498
  // Make sure that the master has been initialized
499
  cluster.setupMaster();
500

    
501
  return (new cluster.Worker(env));
502
};
503

    
504
// execute .disconnect on all workers and close handlers when done
505
cluster.disconnect = function(callback) {
506
  // This can only be called from the master.
507
  assert(cluster.isMaster);
508

    
509
  // Close all TCP handlers when all workers are disconnected
510
  var workers = Object.keys(cluster.workers).length;
511
  var progress = new ProgressTracker(workers, function() {
512
    for (var key in serverHandlers) {
513
      serverHandlers[key].close();
514
      delete serverHandlers[key];
515
    }
516

    
517
    // call callback when done
518
    if (callback) callback();
519
  });
520

    
521
  // begin disconnecting all workers
522
  eachWorker(function(worker) {
523
    worker.once('disconnect', progress.done.bind(progress));
524
    worker.disconnect();
525
  });
526

    
527
  // in case there weren't any workers
528
  progress.check();
529
};
530

    
531
// Internal function. Called from src/node.js when worker process starts.
532
cluster._setupWorker = function() {
533

    
534
  // Get worker class
535
  var worker = cluster.worker = new Worker();
536

    
537
  // we will terminate the worker
538
  // when the worker is disconnected from the parent accidentally
539
  process.once('disconnect', function() {
540
    if (worker.suicide !== true) {
541
      process.exit(0);
542
    }
543
  });
544

    
545
  // Tell master that the worker is online
546
  worker.state = 'online';
547
  sendInternalMessage(worker, { cmd: 'online' });
548
};
549

    
550
// Internal function. Called by net.js and dgram.js when attempting to bind a
551
// TCP server or UDP socket.
552
cluster._getServer = function(tcpSelf, address, port, addressType, fd, cb) {
553
  // This can only be called from a worker.
554
  assert(cluster.isWorker);
555

    
556
  // Store tcp instance for later use
557
  var key = [address, port, addressType, fd].join(':');
558
  serverListeners[key] = tcpSelf;
559

    
560
  // Send a listening message to the master
561
  tcpSelf.once('listening', function() {
562
    cluster.worker.state = 'listening';
563
    sendInternalMessage(cluster.worker, {
564
      cmd: 'listening',
565
      address: address,
566
      port: tcpSelf.address().port || port,
567
      addressType: addressType,
568
      fd: fd
569
    });
570
  });
571

    
572
  // Request the fd handler from the master process
573
  var message = {
574
    cmd: 'queryServer',
575
    address: address,
576
    port: port,
577
    addressType: addressType,
578
    fd: fd
579
  };
580

    
581
  // The callback will be stored until the master has responded
582
  sendInternalMessage(cluster.worker, message, function(msg, handle) {
583
    cb(handle);
584
  });
585

    
586
};