Readable stream utilities
In this chapter, we’ve explored how Node.js streams work, how to create custom streams, and how to compose them into efficient, elegant data processing pipelines. To complete the picture, let’s look at some utilities provided by the node:stream
module that simplify working with Readable
streams. These utilities are designed to streamline data processing in a streaming fashion and bring a functional programming flavor to stream operations.
All these utilities are methods available for any Readable
stream, including Duplex
, PassThrough
, and Transform
streams. Since most of these methods return a new Readable
stream, they can be chained together to create expressive, pipeline-like code. Unsurprisingly, many of these methods mirror common operations available in the Array
prototype, but they are optimized for handling streaming data.
Here’s a summary of the key methods:
Mapping and transformation
readable.map(fn)
: Applies a transformation function (fn
) to each chunk in the stream, returning a new stream with the transformed data. Iffn
returns aPromise
, the result is awaited before being passed to the output stream.readable.flatMap(fn)
: Similar to map, but allowsfn
to return streams, iterables, or async iterables, which are then flattened and merged into the output stream.
Filtering and iteration
readable.filter(fn)
: Filters the stream by applyingfn
to each chunk. Only chunks for whichfn
returns a truthy value are included in the output stream. Supports asyncfn
functions.readable.forEach(fn)
: Invokesfn
for each chunk in the stream. This is typically used for side effects rather than producing a new stream. Iffn
returns aPromise
, it will be awaited before processing the next chunk.
Searching and evaluation
readable.some(fn)
: Checks if at least one chunk satisfies the condition infn
. Once a truthy value is found, the stream is destroyed, and the returnedPromise
resolves totrue
. If no chunk satisfies the condition, it resolves tofalse
.readable.every(fn)
: Verifies if all chunks satisfy the condition infn
. If any chunk fails the condition, the stream is destroyed, and the returnedPromise
resolves tofalse
. Otherwise, it resolves totrue
when the stream ends.readable.find(fn)
: It returns aPromise
that will resolve to the value of the first chunk that satisfies the condition infn
. If no chunk meets the condition, the returnedPromise
will resolve toundefined
once the stream ends.
Limiting and reducing
readable.drop(n)
: Skips the firstn
chunks in the stream, returning a new stream that starts from the (n+1)th chunk.readable.take(n)
: Returns a new stream that includes, at most, the firstn
chunks. Oncen
chunks are reached, the stream is terminated.readable.reduce(fn, initialValue)
: Reduces the stream by applyingfn
to each chunk, accumulating a result that is returned as aPromise
. If noinitialValue
is provided, the first chunk is used as the initial value.
The official documentation has lots of examples for all these methods and there are other less common methods we haven’t explored for brevity. We recommend you check out the docs (nodejsdp.link/stream-iterators) if any of these still feel confusing and you are unsure about when to use them.
Just to give you a more practical overview, let’s re-implement the processing pipeline we illustrated before to explain filtering and reducing with a custom Transform
stream, but this time we are going to use only Readable
stream utilities. As a reminder, in this example, we are parsing a CSV file that contains sales data. We want to calculate the total amount of profit made from sales in Italy. Every line of the CSV file has 3 fields: type, country, and profit. The first line contains the CSV headers.
import { createReadStream } from 'node:fs'
import { createInterface } from 'node:readline'
import { Readable, compose } from 'node:stream'
import { createGunzip } from 'node:zlib'
const uncompressedData = compose( // 1
createReadStream('data.csv.gz'),
createGunzip()
)
const byLine = Readable.from( // 2
createInterface({ input: uncompressedData })
)
const totalProfit = await byline // 3
.drop(1) // 4
.map(chunk => { // 5
const [type, country, profit] = chunk.toString().split(',')
return { type, country, profit: Number.parseFloat(profit) }
})
.filter(record => record.country === 'Italy') // 6
.reduce((acc, record) => acc + record.profit, 0) // 7
console.log(totalProfit)
Here’s a step-by-step breakdown of what the preceding code does:
- The data comes from a gzipped CSV file, so we initially compose a file read stream and a decompression stream to create a source stream that gives uncompressed CSV data.
- We want to read the data line by line, so we use the
createInterface()
utility from thenode:readline
module to wrap our source stream and give us a newReadable
stream (byLine
) that produces lines from the original stream. - Here’s where we start to use some of the helpers we discussed in this section. Since the last helper is
.reduce()
, which returns aPromise
, we useawait
here to wait for the returnedPromise
to resolve and to capture the final result in thetotal
variable. - The first helper we use is
.drop(1)
, which allows us to skip the first line of the uncompressed source data. This line will contain the CSV header (“type,country,profit”) and no useful data, so it makes sense to skip it. This operation returns a newReadable
stream, so we can chain other helper methods. - The next helper we use in the chain is
.map()
. In the mapping function, we provide all the necessary logic to parse a line from the original CSV file and convert it into a record object containing the fieldstype
,country
, andprofit
. This operation returns anotherReadable
stream, so we can keep chaining more helper functions to continue building our processing logic. - The next step is
.filter()
, which we use to retain only records that represent profit associated with the country Italy. Once again, this operation gives us a newReadable
stream. - The last step of the processing pipeline is
.reduce()
. We use this helper to aggregate all the filtered records by summing their profit. As we mentioned before, this operation will give us aPromise
that will resolve to the total profit once the stream completes.
This example shows how to create stream processing pipelines using a more direct approach. In this approach, we chain helper methods, and we have all the transformation logic clearly visible in the same context (assuming we define all the transformation functions in line). This approach can be particularly convenient in situations where the transformation logic is very simple, and you don’t need to build highly specialized and reusable custom Transform
streams.
Note that, in this example, we created our own basic way of parsing records out of CSV lines rather than using a dedicated library for it. We did this just to have an excuse to showcase how to use the .drop()
and .map()
methods. Our implementation is very rudimentary, and it doesn’t handle all the possible edge cases. This is fine because we know there aren’t edge cases (e.g., quoted fields) in our input data, but in real-world projects, we would recommend using a reliable CSV parsing library instead.