Spaces:
Runtime error
Runtime error
; | |
import stream from 'stream'; | |
import utils from '../utils.js'; | |
import throttle from './throttle.js'; | |
import speedometer from './speedometer.js'; | |
const kInternals = Symbol('internals'); | |
class AxiosTransformStream extends stream.Transform{ | |
constructor(options) { | |
options = utils.toFlatObject(options, { | |
maxRate: 0, | |
chunkSize: 64 * 1024, | |
minChunkSize: 100, | |
timeWindow: 500, | |
ticksRate: 2, | |
samplesCount: 15 | |
}, null, (prop, source) => { | |
return !utils.isUndefined(source[prop]); | |
}); | |
super({ | |
readableHighWaterMark: options.chunkSize | |
}); | |
const self = this; | |
const internals = this[kInternals] = { | |
length: options.length, | |
timeWindow: options.timeWindow, | |
ticksRate: options.ticksRate, | |
chunkSize: options.chunkSize, | |
maxRate: options.maxRate, | |
minChunkSize: options.minChunkSize, | |
bytesSeen: 0, | |
isCaptured: false, | |
notifiedBytesLoaded: 0, | |
ts: Date.now(), | |
bytes: 0, | |
onReadCallback: null | |
}; | |
const _speedometer = speedometer(internals.ticksRate * options.samplesCount, internals.timeWindow); | |
this.on('newListener', event => { | |
if (event === 'progress') { | |
if (!internals.isCaptured) { | |
internals.isCaptured = true; | |
} | |
} | |
}); | |
let bytesNotified = 0; | |
internals.updateProgress = throttle(function throttledHandler() { | |
const totalBytes = internals.length; | |
const bytesTransferred = internals.bytesSeen; | |
const progressBytes = bytesTransferred - bytesNotified; | |
if (!progressBytes || self.destroyed) return; | |
const rate = _speedometer(progressBytes); | |
bytesNotified = bytesTransferred; | |
process.nextTick(() => { | |
self.emit('progress', { | |
'loaded': bytesTransferred, | |
'total': totalBytes, | |
'progress': totalBytes ? (bytesTransferred / totalBytes) : undefined, | |
'bytes': progressBytes, | |
'rate': rate ? rate : undefined, | |
'estimated': rate && totalBytes && bytesTransferred <= totalBytes ? | |
(totalBytes - bytesTransferred) / rate : undefined | |
}); | |
}); | |
}, internals.ticksRate); | |
const onFinish = () => { | |
internals.updateProgress(true); | |
}; | |
this.once('end', onFinish); | |
this.once('error', onFinish); | |
} | |
_read(size) { | |
const internals = this[kInternals]; | |
if (internals.onReadCallback) { | |
internals.onReadCallback(); | |
} | |
return super._read(size); | |
} | |
_transform(chunk, encoding, callback) { | |
const self = this; | |
const internals = this[kInternals]; | |
const maxRate = internals.maxRate; | |
const readableHighWaterMark = this.readableHighWaterMark; | |
const timeWindow = internals.timeWindow; | |
const divider = 1000 / timeWindow; | |
const bytesThreshold = (maxRate / divider); | |
const minChunkSize = internals.minChunkSize !== false ? Math.max(internals.minChunkSize, bytesThreshold * 0.01) : 0; | |
function pushChunk(_chunk, _callback) { | |
const bytes = Buffer.byteLength(_chunk); | |
internals.bytesSeen += bytes; | |
internals.bytes += bytes; | |
if (internals.isCaptured) { | |
internals.updateProgress(); | |
} | |
if (self.push(_chunk)) { | |
process.nextTick(_callback); | |
} else { | |
internals.onReadCallback = () => { | |
internals.onReadCallback = null; | |
process.nextTick(_callback); | |
}; | |
} | |
} | |
const transformChunk = (_chunk, _callback) => { | |
const chunkSize = Buffer.byteLength(_chunk); | |
let chunkRemainder = null; | |
let maxChunkSize = readableHighWaterMark; | |
let bytesLeft; | |
let passed = 0; | |
if (maxRate) { | |
const now = Date.now(); | |
if (!internals.ts || (passed = (now - internals.ts)) >= timeWindow) { | |
internals.ts = now; | |
bytesLeft = bytesThreshold - internals.bytes; | |
internals.bytes = bytesLeft < 0 ? -bytesLeft : 0; | |
passed = 0; | |
} | |
bytesLeft = bytesThreshold - internals.bytes; | |
} | |
if (maxRate) { | |
if (bytesLeft <= 0) { | |
// next time window | |
return setTimeout(() => { | |
_callback(null, _chunk); | |
}, timeWindow - passed); | |
} | |
if (bytesLeft < maxChunkSize) { | |
maxChunkSize = bytesLeft; | |
} | |
} | |
if (maxChunkSize && chunkSize > maxChunkSize && (chunkSize - maxChunkSize) > minChunkSize) { | |
chunkRemainder = _chunk.subarray(maxChunkSize); | |
_chunk = _chunk.subarray(0, maxChunkSize); | |
} | |
pushChunk(_chunk, chunkRemainder ? () => { | |
process.nextTick(_callback, null, chunkRemainder); | |
} : _callback); | |
}; | |
transformChunk(chunk, function transformNextChunk(err, _chunk) { | |
if (err) { | |
return callback(err); | |
} | |
if (_chunk) { | |
transformChunk(_chunk, transformNextChunk); | |
} else { | |
callback(null); | |
} | |
}); | |
} | |
setLength(length) { | |
this[kInternals].length = +length; | |
return this; | |
} | |
} | |
export default AxiosTransformStream; | |