Stream consumer utilities
As we’ve repeated countless times throughout this chapter, streams are designed to transfer and process large amounts of data in small chunks. However, there are situations where you need to consume the entire content of a stream and accumulate it in memory. This is more common than it might seem, largely because many abstractions in the Node.js ecosystem use streams as the fundamental building block for data transfer. This design provides a great deal of flexibility, but it also means that sometimes you need to handle chunk-by-chunk data manually. In such cases, it’s important to understand how to convert a stream of discrete chunks into a single, buffered piece of data that can be processed as a whole.
A good example of this is the low-level node:http
module, which allows you to make HTTP requests. When handling an HTTP response, Node.js represents the response body as a Readable
stream. This means you’re expected to process the response data incrementally, as chunks arrive.
But what if you know in advance that the response body contains a JSON-serialized object? In that case, you can’t process the chunks independently; you need to wait until the entire response has been received so you can parse it as a complete string using JSON.parse()
.
A simple implementation of this pattern might look like the following code:
import { request } from 'node:http'
const req = request('http://example.com/somefile.json', res => { // 1
let buffer = '' // 2
res.on('data', chunk => {
buffer += chunk
})
res.on('end', () => { // 3
console.log(JSON.parse(buffer))
})
})
req.end() // 4
To better understand this example, let’s discuss its main points:
- Here, a request is being made to
http://example.com/somefile.json
. The second argument is a callback that receives the response (res
) object, which is aReadable
stream. This stream emits chunks of data as they arrive over the network. - Inside the response callback, we initialize an empty string called
buffer
. As each chunk of data arrives (via the'data'
event), we concatenate it to thebuffer
string. This effectively buffers the entire response body in memory. This approach is necessary when you need to handle the whole response as a complete unit – for example, when parsing JSON, sinceJSON.parse()
only works on complete strings. - Once the entire response has been received and no more data will arrive (
'end'
event), we useJSON.parse()
to deserialize the accumulated string into a JavaScript object. The resulting object is then logged to the console. - Finally,
req.end()
is called to signal that no request body will be sent (our request is complete and can be forwarded). Since this is a GET request with no body, it’s necessary to explicitly finalize the request.
A final point worth noting is that this code doesn’t require async/await because it relies entirely on event-based callbacks, which is the traditional way of handling asynchronous operations in Node.js streams.
This solution works, but it’s a bit boilerplate-heavy. Thankfully, there’s a better solution, thanks to the node:stream/consumers
module.
This built-in library was introduced in Node.js version 16 to expose various utilities that make it easy to consume the entire content from a Node.js Readable
instance or a Web Streams ReadableStream
instance.
This module exposes the consumers
object, which implements the following static methods:
consumers.arrayBuffer(stream)
consumers.blob(stream)
consumers.buffer(stream)
consumers.text(stream)
consumers.json(stream)
Each one of these methods consumes the given stream and returns a Promise
that resolves only when the stream has been fully consumed.
It’s easy to guess that each method accumulates the data into a different kind of object. arrayBuffer()
, blob()
, and buffer()
will accumulate chunks as binary data in an ArrayBuffer
, a Blob
, or a Buffer
instance, respectively. text()
accumulates data in a string object, while json()
accumulates data in a string object and will also try to deserialize the data using JSON.parse()
before resolving the corresponding Promise
.
This means that we can rewrite the previous example as follows:
import { request } from 'node:https'
import consumers from 'node:stream/consumers'
const req = request(
'http://example.com/somefile.json',
async res => {
const buffer = await consumers.json(res)
console.log(buffer)
}
)
req.end()
Much more concise and elegant, isn’t it?
If you use fetch to make HTTP(s) requests, the response object provided by the fetch API has various consumers built in. You could rewrite the previous example as follows:
const res = await fetch('http://example.com/somefile.json')
const buffer = await res.json()
console.log(buffer)
The response object (res
) also exposes .blob()
, .arrayBuffer()
, and .text()
if you want to accumulate the response data as a binary buffer or as text. Note that the .buffer()
method is missing, though. This is because the Buffer
class is not part of the Web standard, but it exists only in Node.js.