Transferring Large Files in TypeScript using Streams
We recently tried to optimize a Lambda function that is transferring files between two systems through REST-APIs. It was downloading a file from one system, splitting it up into chunks of a few MB before uploading the chunks to the destination system.
We noticed that this implementation was far from perfect because the Lambda needs to keep the complete binary in memory during the transfer and the upload will only start after the download was completed. For large files, we experienced a lot of timeouts and decided that throwing more resources and longer timeouts on the problem were no sustainable solutions.
One idea proposed by a colleague was to use streams to process the file chunk by chunk, continuously uploading parts of the file as they were being downloaded from the source system. The initial part of the implementation was rather trivial as downloading a binary file as a stream is really easy to do in TypeScript / JavaScript via Axios:
const downloadStream = await axios.get('<fileUrl>', {
maxContentLength: Infinity,
responseType: 'stream',
})
downloadStream.on('data', (chunk) => {
// upload chunk logic
})
downloadStream.on('end', () => {
// complete transfer logic
})
Unfortunately, we ran into a problem while uploading the chunks one by one, because we needed to request a new upload URL for every chunk, and uploading the chunks was far slower than downloading new ones. Since Axios provided us with chunks of only a few KB of size we soon experienced a classic backpressure problem, which means that we could not process the incoming data fast enough and therefore created a critical bottleneck that would crash the whole Lambda when it was handling larger files.
To fix this issue we tried to buffer the stream by storing chunks until we collected around 500 KB of data before uploading it as a chunk to the destination system. Since we like abstraction and Node.js has some abstract classes for streams that are very easy to implement we created the following two classes:
import { Transform } from 'stream'
export default class MemoryStream extends Transform {
private memory: Buffer
constructor(private readonly desiredChunkSize: number) {
super()
this.memory = Buffer.alloc(0)
}
_transform(chunk: Buffer, _: string, cb: () => void): void {
if (Buffer.byteLength(this.memory) + Buffer.byteLength(chunk) >= this.desiredChunkSize) {
this.push(this.memory)
this.memory = Buffer.alloc(0)
}
this.memory = Buffer.concat([this.memory, chunk])
cb()
}
_flush(cb: () => void): void {
this.push(this.memory)
cb()
}
}
import { Transform } from 'stream'
export default class UploadStream extends Transform {
processedChunks: Promise<string>[]
constructor(
private readonly onData: (chunk: Buffer) => Promise<string>,
private readonly onEnd: (chunkIds: string[]) => Promise<void>,
) {
super()
this.processedChunks = []
}
async _transform(chunk: Buffer, _: string, cb: () => void): Promise<void> {
this.processedChunks.push(this.onData(chunk))
cb()
}
async _flush(cb: () => void): Promise<void> {
await this.onEnd(await Promise.all(this.processedChunks))
cb()
}
}
MemoryStream
acts as a buffer for the incoming download stream. It extends Stream.Transform
meaning that it transforms one stream into another one. The two methods _transform
and _flush
are used to process incoming chunks and to flush the remaining data when the incoming stream has ended.
The class can be initialized with a desired chunk size of the output stream and it has a memory
which concatenates the incoming chunks. When a new chunk is handled, the MemoryStream
checks if the size of the existing memory
plus the size of the new chunk would exceed the desiredChunkSize
. If so, memory
is send to the output stream using push()
and the memory
is cleared. In any case the new chunk will be added to memory
at the end. When the incoming stream has ended the current memory
is pushed as one final chunk to the output stream.
Our second stream class UploadStream
is used to actually upload the file to the destination system. It is initialized by providing two callbacks:
- What should be done with each chunk. The assumption is that your asynchronous callback will return a string in the end, but you can change this to whatever else you need. In our case a new upload url is requested and a POST request is being made to it containing the chunk data. The callback will return an identifier for the chunk that we will need later.
- What should be done when the stream is closed. In our case the transfer is completed by making another POST request containing an ordered list of all chunk identifiers.
Since _transform
must be synchronous, processedChunks
is an Array of Promise<string>
, meaning that we do not yet await the actual chunk uploads to be completed. This is done in _flush
, which is automatically executed when the incoming stream has ended. After all the chunk uploads are completed, the second callback is executed.
Here is the final logic for transfering a file using our two new stream classes:
const downloadStream = await axios.get('<fileUrl>', {
maxContentLength: Infinity,
responseType: 'stream',
})
const memoryStream = new MemoryStream(500000)
const uploadStream = new UploadStream(
(chunk: Buffer) => (
new Promise((resolve) => {
// upload chunk resolve(chunkId)
})
), async (chunkIds: string[]) => {
// complete transfer
})
)
downloadStream.pipe(memoryStream).pipe(uploadStream)
All three streams are initialized and a pipeline is set up:
- A file is downloaded via Axios as a stream
- Each chunk of this
downloadStream
is piped into thememoryStream
- When
memoryStream
collected enough data or thedownloadStream
has ended it will pipe a new chunk of aggregated data into theuploadStream
uploadStream
uploads all chunks that it receives and keeps an ordered list of chunk identifiers- When
uploadStream
received the final chunk it waits for all chunk uploads to be completed before executing logic that ends the file transfer providing all chunk identifiers to the destination system
It was very interesting for me to implement some stream classes in Node.js since I only touched the topic very briefly in the past using premade classes. Our new transfer logic using streams easily outperforms the old one for all fill sizes. Previously we could not handle files that are bigger than 30 MB. Now we can easily transfer files of around 100 MB without increasing the timeout of the Lambda function.
ThedesiredChunkSize
ofMemoryStream
is something that you can play around with. Setting it too low will cause back pressure in the stream pipeline and might crash your service while setting it too high can negatively impact the execution time. The logic within both callbacks of theUploadStream
constructor can be adapted to fulfill many different use-cases.