| | 'use strict'; |
| |
|
| | import stream from 'stream'; |
| | import utils from '../utils.js'; |
| |
|
| | const kInternals = Symbol('internals'); |
| |
|
| | class AxiosTransformStream extends stream.Transform{ |
| | constructor(options) { |
| | options = utils.toFlatObject(options, { |
| | maxRate: 0, |
| | chunkSize: 64 * 1024, |
| | minChunkSize: 100, |
| | timeWindow: 500, |
| | ticksRate: 2, |
| | samplesCount: 15 |
| | }, null, (prop, source) => { |
| | return !utils.isUndefined(source[prop]); |
| | }); |
| |
|
| | super({ |
| | readableHighWaterMark: options.chunkSize |
| | }); |
| |
|
| | const internals = this[kInternals] = { |
| | timeWindow: options.timeWindow, |
| | chunkSize: options.chunkSize, |
| | maxRate: options.maxRate, |
| | minChunkSize: options.minChunkSize, |
| | bytesSeen: 0, |
| | isCaptured: false, |
| | notifiedBytesLoaded: 0, |
| | ts: Date.now(), |
| | bytes: 0, |
| | onReadCallback: null |
| | }; |
| |
|
| | this.on('newListener', event => { |
| | if (event === 'progress') { |
| | if (!internals.isCaptured) { |
| | internals.isCaptured = true; |
| | } |
| | } |
| | }); |
| | } |
| |
|
| | _read(size) { |
| | const internals = this[kInternals]; |
| |
|
| | if (internals.onReadCallback) { |
| | internals.onReadCallback(); |
| | } |
| |
|
| | return super._read(size); |
| | } |
| |
|
| | _transform(chunk, encoding, callback) { |
| | const internals = this[kInternals]; |
| | const maxRate = internals.maxRate; |
| |
|
| | const readableHighWaterMark = this.readableHighWaterMark; |
| |
|
| | const timeWindow = internals.timeWindow; |
| |
|
| | const divider = 1000 / timeWindow; |
| | const bytesThreshold = (maxRate / divider); |
| | const minChunkSize = internals.minChunkSize !== false ? Math.max(internals.minChunkSize, bytesThreshold * 0.01) : 0; |
| |
|
| | const pushChunk = (_chunk, _callback) => { |
| | const bytes = Buffer.byteLength(_chunk); |
| | internals.bytesSeen += bytes; |
| | internals.bytes += bytes; |
| |
|
| | internals.isCaptured && this.emit('progress', internals.bytesSeen); |
| |
|
| | if (this.push(_chunk)) { |
| | process.nextTick(_callback); |
| | } else { |
| | internals.onReadCallback = () => { |
| | internals.onReadCallback = null; |
| | process.nextTick(_callback); |
| | }; |
| | } |
| | } |
| |
|
| | const transformChunk = (_chunk, _callback) => { |
| | const chunkSize = Buffer.byteLength(_chunk); |
| | let chunkRemainder = null; |
| | let maxChunkSize = readableHighWaterMark; |
| | let bytesLeft; |
| | let passed = 0; |
| |
|
| | if (maxRate) { |
| | const now = Date.now(); |
| |
|
| | if (!internals.ts || (passed = (now - internals.ts)) >= timeWindow) { |
| | internals.ts = now; |
| | bytesLeft = bytesThreshold - internals.bytes; |
| | internals.bytes = bytesLeft < 0 ? -bytesLeft : 0; |
| | passed = 0; |
| | } |
| |
|
| | bytesLeft = bytesThreshold - internals.bytes; |
| | } |
| |
|
| | if (maxRate) { |
| | if (bytesLeft <= 0) { |
| | |
| | return setTimeout(() => { |
| | _callback(null, _chunk); |
| | }, timeWindow - passed); |
| | } |
| |
|
| | if (bytesLeft < maxChunkSize) { |
| | maxChunkSize = bytesLeft; |
| | } |
| | } |
| |
|
| | if (maxChunkSize && chunkSize > maxChunkSize && (chunkSize - maxChunkSize) > minChunkSize) { |
| | chunkRemainder = _chunk.subarray(maxChunkSize); |
| | _chunk = _chunk.subarray(0, maxChunkSize); |
| | } |
| |
|
| | pushChunk(_chunk, chunkRemainder ? () => { |
| | process.nextTick(_callback, null, chunkRemainder); |
| | } : _callback); |
| | }; |
| |
|
| | transformChunk(chunk, function transformNextChunk(err, _chunk) { |
| | if (err) { |
| | return callback(err); |
| | } |
| |
|
| | if (_chunk) { |
| | transformChunk(_chunk, transformNextChunk); |
| | } else { |
| | callback(null); |
| | } |
| | }); |
| | } |
| | } |
| |
|
| | export default AxiosTransformStream; |
| |
|