Asynchronous control flow patterns with streams
Going through the examples that we have presented so far, it should be clear that streams can be useful not only to handle I/O, but also as an elegant programming pattern that can be used to process any kind of data. But the advantages do not end at its simple appearance; streams can also be leveraged to turn “asynchronous control flow” into “flow control,” as we will see in this section.
Sequential execution
By default, streams will handle data in sequence. For example, the _transform()
function of a Transform
stream will never be invoked with the next chunk of data until the previous invocation completes by calling callback()
. This is an important property of streams, crucial for processing each chunk in the right order, but it can also be exploited to turn streams into an elegant alternative to the traditional control flow patterns.
Let’s look at some code to clarify what we mean. We will be working on an example to demonstrate how we can use streams to execute asynchronous tasks in sequence. Let’s create a function that concatenates a set of files received as input, making sure to honor the order in which they are provided. Let’s create a new module called concat-files.js
and define its contents as follows:
import { createReadStream, createWriteStream } from 'node:fs'
import { Readable, Transform } from 'node:stream'
export function concatFiles(dest, files) {
return new Promise((resolve, reject) => {
const destStream = createWriteStream(dest)
Readable.from(files) // 1
.pipe(
new Transform({ // 2
objectMode: true,
transform(filename, _enc, done) {
const src = createReadStream(filename)
src.pipe(destStream, { end: false })
// same as ((err) => done(err))
// propagates the error
src.on('error', done)
// same as (() => done())
// propagates correct completion
src.on('end', done) // 3
},
})
)
.on('error', err => {
destStream.end()
reject(err)
})
.on('finish', () => { // 4
destStream.end()
resolve()
})
})
}
The preceding function implements a sequential iteration over the files
array by transforming it into a stream. The algorithm can be explained as follows:
- First, we use
Readable.from()
to create aReadable
stream from thefiles
array. This stream operates in object mode (the default setting for streams created withReadable.from()
) and it will emit filenames: every chunk is a string indicating the path to a file. The order of the chunks respects the order of the files in thefiles
array. - Next, we create a custom
Transform
stream to handle each file in the sequence. Since we are receiving strings, we set the optionobjectMode
totrue
. In our transformation logic, for each file we create aReadable
stream to read the file content and pipe it intodestStream
(aWritable
stream for the destination file). We make sure not to closedestStream
after the source file finishes reading by specifying{ end: false }
in thepipe()
options. - When all the contents of the source file have been piped into
destStream
, we invoke thedone
function to communicate the completion of the current processing, which is necessary to trigger the processing of the next file. - When all the files have been processed, the
finish
event is fired; we can finally enddestStream
and invoke thecb()
function ofconcatFiles()
, which signals the completion of the whole operation.
We can now try to use the little module we just created:
// concat.js
import { concatFiles } from './concat-files.js'
try {
await concatFiles(process.argv[2], process.argv.slice(3))
} catch (err) {
console.error(err)
process.exit(1)
}
console.log('All files concatenated successfully')
We can now run the preceding program by passing the destination file as the first command-line argument, followed by a list of files to concatenate; for example:
node concat.js all-together.txt file1.txt file2.txt
This should create a new file called all-together.txt
containing, in order, the contents of file1.txt
and file2.txt
.
With the concatFiles()
function, we were able to obtain an asynchronous sequential iteration using only streams. This is an elegant and compact solution that enriches our toolbelt, along with the techniques we already explored in Chapter 4, Asynchronous Control Flow Patterns with Callbacks, and Chapter 5, Asynchronous Control Flow Patterns with Promises and Async/Await.
Pattern
Use a stream, or combination of streams, to easily iterate over a set of asynchronous tasks in sequence.
In the next section, we will discover how to use Node.js streams to implement unordered concurrent task execution.
Unordered concurrent execution
We just saw that streams process data chunks in sequence, but sometimes, this can be a bottleneck as we would not make the most of the concurrency of Node.js. If we have to execute a slow asynchronous operation for every data chunk, it can be advantageous to make the execution concurrent and speed up the overall process. Of course, this pattern can only be applied if there is no relationship between each chunk of data, which might happen frequently for object streams, but very rarely for binary streams.
Caution
Unordered concurrent streams cannot be used when the order in which the data is processed is important.
To make the execution of a Transform
stream concurrent, we can apply the same patterns that we learned about in Chapter 4, Asynchronous Control Flow Patterns with Callbacks, but with some adaptations to get them working with streams. Let’s see how this works.
Implementing an unordered concurrent stream
Let’s immediately demonstrate how to implement an unordered concurrent stream with an example. Let’s create a module called concurrent-stream.js
and define a generic Transform
stream that executes a given transform function concurrently:
import { Transform } from 'node:stream'
export class ConcurrentStream extends Transform {
constructor(userTransform, opts) { // 1
super({ objectMode: true, ...opts })
this.userTransform = userTransform
this.running = 0
this.terminateCb = null
}
_transform(chunk, enc, done) { // 2
this.running++
this.userTransform(
chunk,
enc,
this.push.bind(this),
this._onComplete.bind(this)
)
done()
}
_flush(done) { // 3
if (this.running > 0) {
this.terminateCb = done
} else {
done()
}
}
_onComplete(err) { // 4
this.running--
if (err) {
return this.emit('error', err)
}
if (this.running === 0) {
this.terminateCb?.()
}
}
}
Let’s analyze this new class step by step:
- As you can see, the constructor accepts a
userTransform()
function, which is then saved as an instance variable. This function will implement the transformation logic that should be executed for every object flowing through the stream. In this constructor, we invoke the parent constructor to initialize the internal state of the stream, and we enable the object mode by default. - Next, it is the
_transform()
method. In this method, we execute theuserTransform()
function and then increment the count of running tasks. Finally, we notify theTransform
stream that the current transformation step is complete by invokingdone()
. The trick for triggering the processing of another item concurrently is exactly this. We are not waiting for theuserTransform()
function to complete before invokingdone()
; instead, we do it immediately. On the other hand, we provide a special callback touserTransform()
, which is thethis._onComplete()
method. This allows us to get notified when the execution ofuserTransform()
completes. - The
_flush()
method is invoked just before the stream terminates, so if there are still tasks running, we can put the release of thefinish
event on hold by not invoking thedone()
callback immediately. Instead, we assign it to thethis.terminateCallback
variable. - To understand how the stream is then properly terminated, we have to look into the
_onComplete()
method. This last method is invoked every time an asynchronous task completes. It checks whether there are any more tasks running and, if there are none, it invokes thethis.terminateCallback()
function, which will cause the stream to end, releasing thefinish
event that was put on hold in the_flush()
method. Note that_onComplete()
is a method that we introduced for convenience as part of the implementation of ourConcurrentStream
; it is not a method we are overriding from the baseTransform
stream class.
The ConcurrentStream
class we just built allows us to easily create a Transform
stream that executes its tasks concurrently, but there is a caveat: it does not preserve the order of the items as they are received. In fact, while it starts every task in order, asynchronous operations can complete and push data at any time, regardless of when they are started. This property does not play well with binary streams where the order of data usually matters, but it can surely be useful with some types of object streams.
Implementing a URL status monitoring application
Now, let’s apply our ConcurrentStream
to a concrete example. Let’s imagine that we want to build a simple service to monitor the status of a big list of URLs. Let’s imagine all these URLs are contained in a single file and are newline-separated.
Streams can offer a very efficient and elegant solution to this problem, especially if we use our ConcurrentStream
class to check the URLs in a concurrent fashion.
// check-urls.js
import { createInterface } from 'node:readline'
import { createReadStream, createWriteStream } from 'node:fs'
import { pipeline } from 'node:stream/promises'
import { ConcurrentStream } from './concurrent-stream.js'
const inputFile = createReadStream(process.argv[2]) // 1
const fileLines = createInterface({ // 2
input: inputFile,
})
const checkUrls = new ConcurrentStream( // 3
async (url, _enc, push, done) => {
if (!url) {
return done()
}
try {
await fetch(url, {
method: 'HEAD',
timeout: 5000,
signal: AbortSignal.timeout(5000),
})
push(`${url} is up\n`)
} catch (err) {
push(`${url} is down: ${err}\n`)
}
done()
}
)
const outputFile = createWriteStream('results.txt') // 4
await pipeline(fileLines, checkUrls, outputFile) // 5
console.log('All urls have been checked')
As we can see, with streams, our code looks very elegant and straightforward: we initialize the various components of our streaming pipeline and then we combine them together. But let’s discuss some important details:
- First, we create a
Readable
stream from the file given as input. - We leverage the
createInterface()
function from thenode:readline
module to create a stream that wraps the input stream and provides the content of the original file line by line. This is a convenient helper that is very flexible and allows us to read lines from various sources. - At this point, we create our
ConcurrentStream
instance. In our custom transformation logic, we expect to receive one URL at a time. If the URL is empty (e.g., if there’s an empty line in the source file), we just ignore the current entry. Otherwise, we make aHEAD
request to the given URL with a timeout of 5 seconds. If the request is successful, the stream emits a string that describes the positive outcome; otherwise, it emits a string that describes an error. Either way, we call thedone()
callback, which tells theConcurrentStream
that we have completed processing the current task. Note that, since we are handling failure gracefully, the stream can continue processing tasks even if one of them fails. Also, note that we are using bothtimeout
and anAbortSignal
becauseAbortSignal
ensures that the request will fail if it takes longer than 5 seconds, regardless of whether data is actively being transferred. Some bot prevention tools deliberately keep connections open by sending responses at very slow rates, effectively causing bots to hang indefinitely. By implementing this mechanism, we ensure that requests are treated as failed if they exceed 5 seconds for any reason. - The last stream that we need to create is our output stream: a file called
results.txt
. - Finally, we have all the pieces together! We just need to combine the streams into a pipeline to let the data flow between them. And, once the pipeline completes, we print a success message.
Now, we can run the check-urls.js
module with a command such as this:
node check-urls.js urls.txt
Here, the file urls.txt
contains a list of URLs (one per line); for example:
https://mario.fyi
https://loige.co
http://thiswillbedownforsure.com
When the command finishes running, we will see that a file, results.txt
, was created. This contains the results of the operation; for example:
http://thiswillbedownforsure.com is down
https://mario.fyi is up
https://loige.co is up
There is a good probability that the order in which the results are written is different from the order in which the URLs were specified in the input file. This is clear evidence that our stream executes its tasks concurrently, and it does not enforce any order between the various data chunks in the stream.
For the sake of curiosity, we might want to try replacing ConcurrentStream
with a normal Transform
stream and compare the behavior and performance of the two (you might want to do this as an exercise). Using Transform
directly is way slower, because each URL is checked in sequence, but on the other hand, the order of the results in the file results.txt
is preserved.
In the next section, we will see how to extend this pattern to limit the number of concurrent tasks running at a given time.
Unordered limited concurrent execution
If we try to run the check-urls.js
application against a file that contains thousands or millions of URLs, we will surely run into issues. Our application will create an uncontrolled number of connections all at once, sending a considerable amount of data concurrently, and potentially undermining the stability of the application and the availability of the entire system. As we already know, the solution to keep the load and resource usage under control is to limit the number of concurrent tasks running at any given time.
Let’s see how this works with streams by creating a limited-concurrent-stream.js
module, which is an adaptation of concurrent-stream.js
, which we created in the previous section.
Let’s see what it looks like, starting from its constructor (we will highlight the changed parts):
export class LimitedConcurrentStream
extends Transform {
constructor (concurrency, userTransform, opts) {
super({ ...opts, objectMode: true })
this.concurrency = concurrency
this.userTransform = userTransform
this.running = 0
this.continueCb = null
this.terminateCb = null
}
// ...
We need a concurrency
limit to be taken as input, and this time, we are going to save two callbacks, one for any pending _transform
method (continueCb
—more on this next) and another one for the callback of the _flush
method (terminateCb
).
Next is the _transform()
method:
_transform (chunk, enc, done) {
this.running++
this.userTransform(
chunk,
enc,
this.push.bind(this),
this._onComplete.bind(this)
)
if (this.running < this.concurrency) {
done()
} else {
this.continueCb = done
}
}
This time, in the _transform()
method, we must check whether we have any free execution slots before we can invoke done()
and trigger the processing of the next item. If we have already reached the maximum number of concurrently running streams, we save the done()
callback in the continueCb
variable so that it can be invoked as soon as a task finishes.
The _flush()
method remains exactly the same as in the ConcurrentStream
class, so let’s move directly to implementing the _onComplete()
method:
_onComplete (err) {
this.running--
if (err) {
return this.emit('error', err)
}
const tmpCb = this.continueCb
this.continueCb = null
tmpCb?.()
if (this.running === 0) {
this.terminateCb && this.terminateCb()
}
}
Every time a task completes, we invoke any saved continueCb()
that will cause the stream to unblock, triggering the processing of the next item.
That’s it for the LimitedConcurrentStream
class. We can now use it in the check-urls.js
module in place of ConcurrentStream
and have the concurrency of our tasks limited to the value that we set (check the code in the book’s repository for a complete example).
Ordered concurrent execution
The concurrent streams that we created previously may shuffle the order of the emitted data, but there are situations where this is not acceptable. Sometimes, in fact, it is necessary to have each chunk emitted in the same order in which it was received. However, not all hope is lost. We can still run the transform function concurrently; all we must do is sort the data emitted by each task so that it follows the same order in which the data was received. It’s important here to clearly distinguish between the internal processing logic applied to each received chunk, which can safely occur concurrently and therefore in any arbitrary order, and how the processed data is ultimately emitted by the transform stream, which might need to preserve the original order of chunks.
The technique we are going to use involves the use of a buffer to reorder the chunks while they are emitted by each running task. For brevity, we are not going to provide an implementation of such a stream, as it’s quite verbose for the scope of this book. What we are going to do instead is reuse one of the available packages on npm built for this specific purpose, that is, parallel-transform
(nodejsdp.link/parallel-transform).
We can quickly check the behavior of an ordered concurrent execution by modifying our existing check-urls
module. Let’s say that we want our results to be written in the same order as the URLs in the input file, while executing our checks concurrently. We can do this using parallel-transform
:
//...
import parallelTransform from 'parallel-transform' // v1.2.0
const inputFile = createReadStream(process.argv[2])
const fileLines = createInterface({
input: inputFile,
})
const checkUrls = parallelTransform(8, async function (url, done) {
if (!url) {
return done()
}
try {
await fetch(url, { method: 'HEAD', timeout: 5 * 1000 })
this.push(`${url} is up\n`)
} catch (err) {
this.push(`${url} is down: ${err}\n`)
}
done()
})
const outputFile = createWriteStream('results.txt')
await pipeline(fileLines, checkUrls, outputFile)
console.log('All urls have been checked')
In the example here, parallelTransform()
creates a Transform
stream in object mode that executes our transformation logic with a maximum concurrency of 8. If we try to run this new version of check-urls.js
, we will now see that the results.txt
file lists the results in the same order as the URLs appear in the input file. It is important to see that, even though the order of the output is the same as the input, the asynchronous tasks still run concurrently and can possibly complete in any order.
When using the ordered concurrent execution pattern, we need to be aware of slow items blocking the pipeline or growing the memory indefinitely. In fact, if there is an item that requires a very long time to complete, depending on the implementation of the pattern, it will either cause the buffer containing the pending ordered results to grow indefinitely or the entire processing to block until the slow item completes. With the first strategy, we are optimizing for performance, while with the second, we get predictable memory usage. parallel-transform
implementation opts for predictable memory utilization and maintains an internal buffer that will not grow more than the specified maximum concurrency.
With this, we conclude our analysis of the asynchronous control flow patterns with streams. Next, we are going to focus on some piping patterns.