Spaces:
Paused
Paused
| /** | |
| * A test Runner that uses a {@link module:buffered-worker-pool}. | |
| * @module parallel-buffered-runner | |
| * @private | |
| */ | |
| ; | |
| const allSettled = require('@ungap/promise-all-settled').bind(Promise); | |
| const Runner = require('../runner'); | |
| const {EVENT_RUN_BEGIN, EVENT_RUN_END} = Runner.constants; | |
| const debug = require('debug')('mocha:parallel:parallel-buffered-runner'); | |
| const {BufferedWorkerPool} = require('./buffered-worker-pool'); | |
| const {setInterval, clearInterval} = global; | |
| const {createMap, constants} = require('../utils'); | |
| const {MOCHA_ID_PROP_NAME} = constants; | |
| const {createFatalError} = require('../errors'); | |
| const DEFAULT_WORKER_REPORTER = require.resolve( | |
| './reporters/parallel-buffered' | |
| ); | |
| /** | |
| * List of options to _not_ serialize for transmission to workers | |
| */ | |
| const DENY_OPTIONS = [ | |
| 'globalSetup', | |
| 'globalTeardown', | |
| 'parallel', | |
| 'p', | |
| 'jobs', | |
| 'j' | |
| ]; | |
| /** | |
| * Outputs a debug statement with worker stats | |
| * @param {BufferedWorkerPool} pool - Worker pool | |
| */ | |
| /* istanbul ignore next */ | |
| const debugStats = pool => { | |
| const {totalWorkers, busyWorkers, idleWorkers, pendingTasks} = pool.stats(); | |
| debug( | |
| '%d/%d busy workers; %d idle; %d tasks queued', | |
| busyWorkers, | |
| totalWorkers, | |
| idleWorkers, | |
| pendingTasks | |
| ); | |
| }; | |
| /** | |
| * The interval at which we will display stats for worker processes in debug mode | |
| */ | |
| const DEBUG_STATS_INTERVAL = 5000; | |
| const ABORTED = 'ABORTED'; | |
| const IDLE = 'IDLE'; | |
| const ABORTING = 'ABORTING'; | |
| const RUNNING = 'RUNNING'; | |
| const BAILING = 'BAILING'; | |
| const BAILED = 'BAILED'; | |
| const COMPLETE = 'COMPLETE'; | |
| const states = createMap({ | |
| [IDLE]: new Set([RUNNING, ABORTING]), | |
| [RUNNING]: new Set([COMPLETE, BAILING, ABORTING]), | |
| [COMPLETE]: new Set(), | |
| [ABORTED]: new Set(), | |
| [ABORTING]: new Set([ABORTED]), | |
| [BAILING]: new Set([BAILED, ABORTING]), | |
| [BAILED]: new Set([COMPLETE, ABORTING]) | |
| }); | |
| /** | |
| * This `Runner` delegates tests runs to worker threads. Does not execute any | |
| * {@link Runnable}s by itself! | |
| * @public | |
| */ | |
| class ParallelBufferedRunner extends Runner { | |
| constructor(...args) { | |
| super(...args); | |
| let state = IDLE; | |
| Object.defineProperty(this, '_state', { | |
| get() { | |
| return state; | |
| }, | |
| set(newState) { | |
| if (states[state].has(newState)) { | |
| state = newState; | |
| } else { | |
| throw new Error(`invalid state transition: ${state} => ${newState}`); | |
| } | |
| } | |
| }); | |
| this._workerReporter = DEFAULT_WORKER_REPORTER; | |
| this._linkPartialObjects = false; | |
| this._linkedObjectMap = new Map(); | |
| this.once(Runner.constants.EVENT_RUN_END, () => { | |
| this._state = COMPLETE; | |
| }); | |
| } | |
| /** | |
| * Returns a mapping function to enqueue a file in the worker pool and return results of its execution. | |
| * @param {BufferedWorkerPool} pool - Worker pool | |
| * @param {Options} options - Mocha options | |
| * @returns {FileRunner} Mapping function | |
| * @private | |
| */ | |
| _createFileRunner(pool, options) { | |
| /** | |
| * Emits event and sets `BAILING` state, if necessary. | |
| * @param {Object} event - Event having `eventName`, maybe `data` and maybe `error` | |
| * @param {number} failureCount - Failure count | |
| */ | |
| const emitEvent = (event, failureCount) => { | |
| this.emit(event.eventName, event.data, event.error); | |
| if ( | |
| this._state !== BAILING && | |
| event.data && | |
| event.data._bail && | |
| (failureCount || event.error) | |
| ) { | |
| debug('run(): nonzero failure count & found bail flag'); | |
| // we need to let the events complete for this file, as the worker | |
| // should run any cleanup hooks | |
| this._state = BAILING; | |
| } | |
| }; | |
| /** | |
| * Given an event, recursively find any objects in its data that have ID's, and create object references to already-seen objects. | |
| * @param {Object} event - Event having `eventName`, maybe `data` and maybe `error` | |
| */ | |
| const linkEvent = event => { | |
| const stack = [{parent: event, prop: 'data'}]; | |
| while (stack.length) { | |
| const {parent, prop} = stack.pop(); | |
| const obj = parent[prop]; | |
| let newObj; | |
| if (obj && typeof obj === 'object') { | |
| if (obj[MOCHA_ID_PROP_NAME]) { | |
| const id = obj[MOCHA_ID_PROP_NAME]; | |
| newObj = this._linkedObjectMap.has(id) | |
| ? Object.assign(this._linkedObjectMap.get(id), obj) | |
| : obj; | |
| this._linkedObjectMap.set(id, newObj); | |
| parent[prop] = newObj; | |
| } else { | |
| throw createFatalError( | |
| 'Object missing ID received in event data', | |
| obj | |
| ); | |
| } | |
| } | |
| Object.keys(newObj).forEach(key => { | |
| const value = obj[key]; | |
| if (value && typeof value === 'object' && value[MOCHA_ID_PROP_NAME]) { | |
| stack.push({obj: value, parent: newObj, prop: key}); | |
| } | |
| }); | |
| } | |
| }; | |
| return async file => { | |
| debug('run(): enqueueing test file %s', file); | |
| try { | |
| const {failureCount, events} = await pool.run(file, options); | |
| if (this._state === BAILED) { | |
| // short-circuit after a graceful bail. if this happens, | |
| // some other worker has bailed. | |
| // TODO: determine if this is the desired behavior, or if we | |
| // should report the events of this run anyway. | |
| return; | |
| } | |
| debug( | |
| 'run(): completed run of file %s; %d failures / %d events', | |
| file, | |
| failureCount, | |
| events.length | |
| ); | |
| this.failures += failureCount; // can this ever be non-numeric? | |
| let event = events.shift(); | |
| if (this._linkPartialObjects) { | |
| while (event) { | |
| linkEvent(event); | |
| emitEvent(event, failureCount); | |
| event = events.shift(); | |
| } | |
| } else { | |
| while (event) { | |
| emitEvent(event, failureCount); | |
| event = events.shift(); | |
| } | |
| } | |
| if (this._state === BAILING) { | |
| debug('run(): terminating pool due to "bail" flag'); | |
| this._state = BAILED; | |
| await pool.terminate(); | |
| } | |
| } catch (err) { | |
| if (this._state === BAILED || this._state === ABORTING) { | |
| debug( | |
| 'run(): worker pool terminated with intent; skipping file %s', | |
| file | |
| ); | |
| } else { | |
| // this is an uncaught exception | |
| debug('run(): encountered uncaught exception: %O', err); | |
| if (this.allowUncaught) { | |
| // still have to clean up | |
| this._state = ABORTING; | |
| await pool.terminate(true); | |
| } | |
| throw err; | |
| } | |
| } finally { | |
| debug('run(): done running file %s', file); | |
| } | |
| }; | |
| } | |
| /** | |
| * Listen on `Process.SIGINT`; terminate pool if caught. | |
| * Returns the listener for later call to `process.removeListener()`. | |
| * @param {BufferedWorkerPool} pool - Worker pool | |
| * @returns {SigIntListener} Listener | |
| * @private | |
| */ | |
| _bindSigIntListener(pool) { | |
| const sigIntListener = async () => { | |
| debug('run(): caught a SIGINT'); | |
| this._state = ABORTING; | |
| try { | |
| debug('run(): force-terminating worker pool'); | |
| await pool.terminate(true); | |
| } catch (err) { | |
| console.error( | |
| `Error while attempting to force-terminate worker pool: ${err}` | |
| ); | |
| process.exitCode = 1; | |
| } finally { | |
| process.nextTick(() => { | |
| debug('run(): imminent death'); | |
| this._state = ABORTED; | |
| process.kill(process.pid, 'SIGINT'); | |
| }); | |
| } | |
| }; | |
| process.once('SIGINT', sigIntListener); | |
| return sigIntListener; | |
| } | |
| /** | |
| * Runs Mocha tests by creating a thread pool, then delegating work to the | |
| * worker threads. | |
| * | |
| * Each worker receives one file, and as workers become available, they take a | |
| * file from the queue and run it. The worker thread execution is treated like | |
| * an RPC--it returns a `Promise` containing serialized information about the | |
| * run. The information is processed as it's received, and emitted to a | |
| * {@link Reporter}, which is likely listening for these events. | |
| * | |
| * @param {Function} callback - Called with an exit code corresponding to | |
| * number of test failures. | |
| * @param {{files: string[], options: Options}} opts - Files to run and | |
| * command-line options, respectively. | |
| */ | |
| run(callback, {files, options = {}} = {}) { | |
| /** | |
| * Listener on `Process.SIGINT` which tries to cleanly terminate the worker pool. | |
| */ | |
| let sigIntListener; | |
| // assign the reporter the worker will use, which will be different than the | |
| // main process' reporter | |
| options = {...options, reporter: this._workerReporter}; | |
| // This function should _not_ return a `Promise`; its parent (`Runner#run`) | |
| // returns this instance, so this should do the same. However, we want to make | |
| // use of `async`/`await`, so we use this IIFE. | |
| (async () => { | |
| /** | |
| * This is an interval that outputs stats about the worker pool every so often | |
| */ | |
| let debugInterval; | |
| /** | |
| * @type {BufferedWorkerPool} | |
| */ | |
| let pool; | |
| try { | |
| pool = BufferedWorkerPool.create({maxWorkers: options.jobs}); | |
| sigIntListener = this._bindSigIntListener(pool); | |
| /* istanbul ignore next */ | |
| debugInterval = setInterval( | |
| () => debugStats(pool), | |
| DEBUG_STATS_INTERVAL | |
| ).unref(); | |
| // this is set for uncaught exception handling in `Runner#uncaught` | |
| // TODO: `Runner` should be using a state machine instead. | |
| this.started = true; | |
| this._state = RUNNING; | |
| this.emit(EVENT_RUN_BEGIN); | |
| options = {...options}; | |
| DENY_OPTIONS.forEach(opt => { | |
| delete options[opt]; | |
| }); | |
| const results = await allSettled( | |
| files.map(this._createFileRunner(pool, options)) | |
| ); | |
| // note that pool may already be terminated due to --bail | |
| await pool.terminate(); | |
| results | |
| .filter(({status}) => status === 'rejected') | |
| .forEach(({reason}) => { | |
| if (this.allowUncaught) { | |
| // yep, just the first one. | |
| throw reason; | |
| } | |
| // "rejected" will correspond to uncaught exceptions. | |
| // unlike the serial runner, the parallel runner can always recover. | |
| this.uncaught(reason); | |
| }); | |
| if (this._state === ABORTING) { | |
| return; | |
| } | |
| this.emit(EVENT_RUN_END); | |
| debug('run(): completing with failure count %d', this.failures); | |
| callback(this.failures); | |
| } catch (err) { | |
| // this `nextTick` takes us out of the `Promise` scope, so the | |
| // exception will not be caught and returned as a rejected `Promise`, | |
| // which would lead to an `unhandledRejection` event. | |
| process.nextTick(() => { | |
| debug('run(): re-throwing uncaught exception'); | |
| throw err; | |
| }); | |
| } finally { | |
| clearInterval(debugInterval); | |
| process.removeListener('SIGINT', sigIntListener); | |
| } | |
| })(); | |
| return this; | |
| } | |
| /** | |
| * Toggle partial object linking behavior; used for building object references from | |
| * unique ID's. | |
| * @param {boolean} [value] - If `true`, enable partial object linking, otherwise disable | |
| * @returns {Runner} | |
| * @chainable | |
| * @public | |
| * @example | |
| * // this reporter needs proper object references when run in parallel mode | |
| * class MyReporter() { | |
| * constructor(runner) { | |
| * this.runner.linkPartialObjects(true) | |
| * .on(EVENT_SUITE_BEGIN, suite => { | |
| // this Suite may be the same object... | |
| * }) | |
| * .on(EVENT_TEST_BEGIN, test => { | |
| * // ...as the `test.parent` property | |
| * }); | |
| * } | |
| * } | |
| */ | |
| linkPartialObjects(value) { | |
| this._linkPartialObjects = Boolean(value); | |
| return super.linkPartialObjects(value); | |
| } | |
| /** | |
| * If this class is the `Runner` in use, then this is going to return `true`. | |
| * | |
| * For use by reporters. | |
| * @returns {true} | |
| * @public | |
| */ | |
| isParallelMode() { | |
| return true; | |
| } | |
| /** | |
| * Configures an alternate reporter for worker processes to use. Subclasses | |
| * using worker processes should implement this. | |
| * @public | |
| * @param {string} path - Absolute path to alternate reporter for worker processes to use | |
| * @returns {Runner} | |
| * @throws When in serial mode | |
| * @chainable | |
| */ | |
| workerReporter(reporter) { | |
| this._workerReporter = reporter; | |
| return this; | |
| } | |
| } | |
| module.exports = ParallelBufferedRunner; | |
| /** | |
| * Listener function intended to be bound to `Process.SIGINT` event | |
| * @private | |
| * @callback SigIntListener | |
| * @returns {Promise<void>} | |
| */ | |
| /** | |
| * A function accepting a test file path and returning the results of a test run | |
| * @private | |
| * @callback FileRunner | |
| * @param {string} filename - File to run | |
| * @returns {Promise<SerializedWorkerResult>} | |
| */ | |