| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| import { Runnable } from '../../../../src/index.js';
|
|
|
| |
| |
| |
| |
| |
|
|
| class DelayedMultiplierRunnable extends Runnable {
|
| constructor(factor, delayMs = 100) {
|
| super();
|
| this.factor = factor;
|
| this.delayMs = delayMs;
|
| }
|
|
|
| async _call(input, config) {
|
| if (typeof input !== 'number') {
|
| throw new Error('Input must be a number');
|
| }
|
|
|
|
|
| await new Promise(resolve => setTimeout(resolve, this.delayMs));
|
|
|
| return input * this.factor;
|
| }
|
|
|
| toString() {
|
| return `DelayedMultiplier(Γ${this.factor}, ${this.delayMs}ms)`;
|
| }
|
| }
|
|
|
| |
| |
| |
| |
|
|
| async function processSequentially(runnable, inputs) {
|
|
|
|
|
| }
|
|
|
| |
| |
| |
| |
|
|
| async function processInParallel(runnable, inputs) {
|
|
|
|
|
| }
|
|
|
|
|
|
|
|
|
|
|
| async function runTests() {
|
| console.log('π§ͺ Testing Batch Processing...\n');
|
|
|
| try {
|
|
|
| console.log('Test 1: Basic batch processing');
|
| const multiplier = new DelayedMultiplierRunnable(2, 100);
|
| const inputs = [1, 2, 3, 4, 5];
|
|
|
| const startTime = Date.now();
|
| const results = await multiplier.batch(inputs);
|
| const duration = Date.now() - startTime;
|
|
|
| console.log(` Inputs: [${inputs.join(', ')}]`);
|
| console.log(` Results: [${results.join(', ')}]`);
|
| console.log(` Time: ${duration}ms`);
|
| console.assert(results.length === 5, 'Should have 5 results');
|
| console.assert(results[0] === 2, 'First result should be 2');
|
| console.assert(results[4] === 10, 'Last result should be 10');
|
| console.assert(duration < 200, 'Should complete in ~100ms (parallel), not 500ms (sequential)');
|
| console.log('β
Batch processing works!\n');
|
|
|
|
|
| console.log('Test 2: Sequential vs Parallel comparison');
|
| const runnable = new DelayedMultiplierRunnable(3, 100);
|
| const testInputs = [1, 2, 3, 4, 5];
|
|
|
| console.log(' Processing sequentially...');
|
| const seqStart = Date.now();
|
| const seqResults = await processSequentially(runnable, testInputs);
|
| const seqDuration = Date.now() - seqStart;
|
| console.log(` Sequential: ${seqDuration}ms`);
|
|
|
| console.log(' Processing in parallel...');
|
| const parStart = Date.now();
|
| const parResults = await processInParallel(runnable, testInputs);
|
| const parDuration = Date.now() - parStart;
|
| console.log(` Parallel: ${parDuration}ms`);
|
|
|
| console.log(` Speedup: ${(seqDuration / parDuration).toFixed(1)}x faster`);
|
| console.assert(parDuration < seqDuration / 2, 'Parallel should be much faster');
|
| console.log('β
Parallel is significantly faster!\n');
|
|
|
|
|
| console.log('Test 3: Large batch (10 items)');
|
| const largeBatch = Array.from({ length: 10 }, (_, i) => i + 1);
|
| const startLarge = Date.now();
|
| const largeResults = await multiplier.batch(largeBatch);
|
| const durationLarge = Date.now() - startLarge;
|
|
|
| console.log(` Processed ${largeBatch.length} items in ${durationLarge}ms`);
|
| console.assert(largeResults.length === 10, 'Should process all items');
|
| console.assert(durationLarge < 200, 'Should complete quickly due to parallelism');
|
| console.log('β
Large batch works!\n');
|
|
|
|
|
| console.log('Test 4: Error handling in batch');
|
| const mixedInputs = [1, 2, 'invalid', 4, 5];
|
|
|
| try {
|
| await multiplier.batch(mixedInputs);
|
| console.log('β Should have thrown an error');
|
| } catch (error) {
|
| console.log(` Caught error: ${error.message}`);
|
| console.log('β
Errors are caught in batch processing!\n');
|
| }
|
|
|
|
|
| console.log('Test 5: Empty batch');
|
| const emptyResults = await multiplier.batch([]);
|
| console.assert(emptyResults.length === 0, 'Empty batch should return empty array');
|
| console.log('β
Empty batch handled correctly!\n');
|
|
|
|
|
| console.log('Test 6: Batch through a pipeline');
|
| class AddConstant extends Runnable {
|
| constructor(constant) {
|
| super();
|
| this.constant = constant;
|
| }
|
| async _call(input) {
|
| await new Promise(resolve => setTimeout(resolve, 50));
|
| return input + this.constant;
|
| }
|
| }
|
|
|
| const pipeline = new DelayedMultiplierRunnable(2, 50)
|
| .pipe(new AddConstant(10));
|
|
|
| const pipelineInputs = [1, 2, 3];
|
| const startPipeline = Date.now();
|
| const pipelineResults = await pipeline.batch(pipelineInputs);
|
| const durationPipeline = Date.now() - startPipeline;
|
|
|
| console.log(` Inputs: [${pipelineInputs.join(', ')}]`);
|
| console.log(` Results: [${pipelineResults.join(', ')}]`);
|
| console.log(` Expected: [12, 14, 16] (multiply by 2, then add 10)`);
|
| console.log(` Time: ${durationPipeline}ms`);
|
| console.assert(pipelineResults[0] === 12, 'First should be 12');
|
| console.assert(pipelineResults[2] === 16, 'Last should be 16');
|
| console.log('β
Batch works through pipelines!\n');
|
|
|
| console.log('π All tests passed!');
|
| console.log('\nπ‘ Key Learnings:');
|
| console.log(' β’ batch() processes inputs in parallel');
|
| console.log(' β’ Much faster than sequential processing');
|
| console.log(' β’ Uses Promise.all() under the hood');
|
| console.log(' β’ All inputs must succeed (or all fail)');
|
| console.log(' β’ Works with pipelines too!');
|
| } catch (error) {
|
| console.error('β Test failed:', error.message);
|
| console.error(error.stack);
|
| }
|
| }
|
|
|
|
|
| if (import.meta.url === `file://${process.argv[1]}`) {
|
| runTests();
|
| }
|
|
|
| export {
|
| DelayedMultiplierRunnable,
|
| processSequentially,
|
| processInParallel
|
| }; |