Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Save more on your purchases! discount-offer-chevron-icon
Savings automatically calculated. No voucher code required.
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletter Hub
Free Learning
Arrow right icon
timer SALE ENDS IN
0 Days
:
00 Hours
:
00 Minutes
:
00 Seconds
Arrow up icon
GO TO TOP
Node.js Design Patterns

You're reading from   Node.js Design Patterns Level up your Node.js skills and design production-grade applications using proven techniques

Arrow left icon
Product type Paperback
Published in Sep 2025
Publisher Packt
ISBN-13 9781803238944
Length 732 pages
Edition 4th Edition
Languages
Tools
Arrow right icon
Authors (2):
Arrow left icon
Luciano Mammino Luciano Mammino
Author Profile Icon Luciano Mammino
Luciano Mammino
Mario Casciaro Mario Casciaro
Author Profile Icon Mario Casciaro
Mario Casciaro
Arrow right icon
View More author details
Toc

Table of Contents (16) Chapters Close

Preface 1. The Node.js Platform 2. The Module System FREE CHAPTER 3. Callbacks and Events 4. Asynchronous Control Flow Patterns with Callbacks 5. Asynchronous Control Flow Patterns with Promises and Async/Await 6. Coding with Streams 7. Creational Design Patterns 8. Structural Design Patterns 9. Behavioral Design Patterns 10. Testing: Patterns and Best Practices 11. Advanced Recipes 12. Scalability and Architectural Patterns 13. Messaging and Integration Patterns 14. Other Books You May Enjoy
15. Index

Asynchronous control flow patterns with streams

Going through the examples that we have presented so far, it should be clear that streams can be useful not only to handle I/O, but also as an elegant programming pattern that can be used to process any kind of data. But the advantages do not end at its simple appearance; streams can also be leveraged to turn “asynchronous control flow” into “flow control,” as we will see in this section.

Sequential execution

By default, streams will handle data in sequence. For example, the _transform() function of a Transform stream will never be invoked with the next chunk of data until the previous invocation completes by calling callback(). This is an important property of streams, crucial for processing each chunk in the right order, but it can also be exploited to turn streams into an elegant alternative to the traditional control flow patterns.

Let’s look at some code to clarify what we mean. We will be working on an example to demonstrate how we can use streams to execute asynchronous tasks in sequence. Let’s create a function that concatenates a set of files received as input, making sure to honor the order in which they are provided. Let’s create a new module called concat-files.js and define its contents as follows:

import { createReadStream, createWriteStream } from 'node:fs'
import { Readable, Transform } from 'node:stream'
export function concatFiles(dest, files) {
  return new Promise((resolve, reject) => {
    const destStream = createWriteStream(dest)
    Readable.from(files) // 1
      .pipe(
        new Transform({ // 2
          objectMode: true,
          transform(filename, _enc, done) {
            const src = createReadStream(filename)
            src.pipe(destStream, { end: false })
            // same as ((err) => done(err))
            // propagates the error
            src.on('error', done)
            // same as (() => done())
            // propagates correct completion
            src.on('end', done) // 3
          },
        })
      )
      .on('error', err => {
        destStream.end()
        reject(err)
      })
      .on('finish', () => { // 4
        destStream.end()
        resolve()
      })
  })
}

The preceding function implements a sequential iteration over the files array by transforming it into a stream. The algorithm can be explained as follows:

  1. First, we use Readable.from() to create a Readable stream from the files array. This stream operates in object mode (the default setting for streams created with Readable.from()) and it will emit filenames: every chunk is a string indicating the path to a file. The order of the chunks respects the order of the files in the files array.
  2. Next, we create a custom Transform stream to handle each file in the sequence. Since we are receiving strings, we set the option objectMode to true. In our transformation logic, for each file we create a Readable stream to read the file content and pipe it into destStream (a Writable stream for the destination file). We make sure not to close destStream after the source file finishes reading by specifying { end: false } in the pipe() options.
  3. When all the contents of the source file have been piped into destStream, we invoke the done function to communicate the completion of the current processing, which is necessary to trigger the processing of the next file.
  4. When all the files have been processed, the finish event is fired; we can finally end destStream and invoke the cb() function of concatFiles(), which signals the completion of the whole operation.

We can now try to use the little module we just created:

// concat.js
import { concatFiles } from './concat-files.js'
try {
  await concatFiles(process.argv[2], process.argv.slice(3))
} catch (err) {
  console.error(err)
  process.exit(1)
}
console.log('All files concatenated successfully')

We can now run the preceding program by passing the destination file as the first command-line argument, followed by a list of files to concatenate; for example:

node concat.js all-together.txt file1.txt file2.txt

This should create a new file called all-together.txt containing, in order, the contents of file1.txt and file2.txt.

With the concatFiles() function, we were able to obtain an asynchronous sequential iteration using only streams. This is an elegant and compact solution that enriches our toolbelt, along with the techniques we already explored in Chapter 4, Asynchronous Control Flow Patterns with Callbacks, and Chapter 5, Asynchronous Control Flow Patterns with Promises and Async/Await.

Pattern

Use a stream, or combination of streams, to easily iterate over a set of asynchronous tasks in sequence.

In the next section, we will discover how to use Node.js streams to implement unordered concurrent task execution.

Unordered concurrent execution

We just saw that streams process data chunks in sequence, but sometimes, this can be a bottleneck as we would not make the most of the concurrency of Node.js. If we have to execute a slow asynchronous operation for every data chunk, it can be advantageous to make the execution concurrent and speed up the overall process. Of course, this pattern can only be applied if there is no relationship between each chunk of data, which might happen frequently for object streams, but very rarely for binary streams.

Caution

Unordered concurrent streams cannot be used when the order in which the data is processed is important.

To make the execution of a Transform stream concurrent, we can apply the same patterns that we learned about in Chapter 4, Asynchronous Control Flow Patterns with Callbacks, but with some adaptations to get them working with streams. Let’s see how this works.

Implementing an unordered concurrent stream

Let’s immediately demonstrate how to implement an unordered concurrent stream with an example. Let’s create a module called concurrent-stream.js and define a generic Transform stream that executes a given transform function concurrently:

import { Transform } from 'node:stream'
export class ConcurrentStream extends Transform {
  constructor(userTransform, opts) { // 1
    super({ objectMode: true, ...opts })
    this.userTransform = userTransform
    this.running = 0
    this.terminateCb = null
  }
  _transform(chunk, enc, done) { // 2
    this.running++
    this.userTransform(
      chunk,
      enc,
      this.push.bind(this),
      this._onComplete.bind(this)
    )
    done()
  }
  _flush(done) { // 3
    if (this.running > 0) {
      this.terminateCb = done
    } else {
      done()
    }
  }
  _onComplete(err) { // 4
    this.running--
    if (err) {
      return this.emit('error', err)
    }
    if (this.running === 0) {
      this.terminateCb?.()
    }
  }
}

Let’s analyze this new class step by step:

  1. As you can see, the constructor accepts a userTransform() function, which is then saved as an instance variable. This function will implement the transformation logic that should be executed for every object flowing through the stream. In this constructor, we invoke the parent constructor to initialize the internal state of the stream, and we enable the object mode by default.
  2. Next, it is the _transform() method. In this method, we execute the userTransform() function and then increment the count of running tasks. Finally, we notify the Transform stream that the current transformation step is complete by invoking done(). The trick for triggering the processing of another item concurrently is exactly this. We are not waiting for the userTransform() function to complete before invoking done(); instead, we do it immediately. On the other hand, we provide a special callback to userTransform(), which is the this._onComplete() method. This allows us to get notified when the execution of userTransform() completes.
  3. The _flush() method is invoked just before the stream terminates, so if there are still tasks running, we can put the release of the finish event on hold by not invoking the done() callback immediately. Instead, we assign it to the this.terminateCallback variable.
  4. To understand how the stream is then properly terminated, we have to look into the _onComplete() method. This last method is invoked every time an asynchronous task completes. It checks whether there are any more tasks running and, if there are none, it invokes the this.terminateCallback() function, which will cause the stream to end, releasing the finish event that was put on hold in the _flush() method. Note that _onComplete() is a method that we introduced for convenience as part of the implementation of our ConcurrentStream; it is not a method we are overriding from the base Transform stream class.

The ConcurrentStream class we just built allows us to easily create a Transform stream that executes its tasks concurrently, but there is a caveat: it does not preserve the order of the items as they are received. In fact, while it starts every task in order, asynchronous operations can complete and push data at any time, regardless of when they are started. This property does not play well with binary streams where the order of data usually matters, but it can surely be useful with some types of object streams.

Implementing a URL status monitoring application

Now, let’s apply our ConcurrentStream to a concrete example. Let’s imagine that we want to build a simple service to monitor the status of a big list of URLs. Let’s imagine all these URLs are contained in a single file and are newline-separated.

Streams can offer a very efficient and elegant solution to this problem, especially if we use our ConcurrentStream class to check the URLs in a concurrent fashion.

// check-urls.js
import { createInterface } from 'node:readline'
import { createReadStream, createWriteStream } from 'node:fs'
import { pipeline } from 'node:stream/promises'
import { ConcurrentStream } from './concurrent-stream.js'
const inputFile = createReadStream(process.argv[2]) // 1
const fileLines = createInterface({ // 2
  input: inputFile,
})
const checkUrls = new ConcurrentStream( // 3
  async (url, _enc, push, done) => {
    if (!url) {
      return done()
    }
    try {
      await fetch(url, {
        method: 'HEAD',
        timeout: 5000,
        signal: AbortSignal.timeout(5000),
      })
      push(`${url} is up\n`)
    } catch (err) {
      push(`${url} is down: ${err}\n`)
    }
    done()
  }
)
const outputFile = createWriteStream('results.txt') // 4
await pipeline(fileLines, checkUrls, outputFile) // 5
console.log('All urls have been checked')

As we can see, with streams, our code looks very elegant and straightforward: we initialize the various components of our streaming pipeline and then we combine them together. But let’s discuss some important details:

  1. First, we create a Readable stream from the file given as input.
  2. We leverage the createInterface() function from the node:readline module to create a stream that wraps the input stream and provides the content of the original file line by line. This is a convenient helper that is very flexible and allows us to read lines from various sources.
  3. At this point, we create our ConcurrentStream instance. In our custom transformation logic, we expect to receive one URL at a time. If the URL is empty (e.g., if there’s an empty line in the source file), we just ignore the current entry. Otherwise, we make a HEAD request to the given URL with a timeout of 5 seconds. If the request is successful, the stream emits a string that describes the positive outcome; otherwise, it emits a string that describes an error. Either way, we call the done() callback, which tells the ConcurrentStream that we have completed processing the current task. Note that, since we are handling failure gracefully, the stream can continue processing tasks even if one of them fails. Also, note that we are using both timeout and an AbortSignal because AbortSignal ensures that the request will fail if it takes longer than 5 seconds, regardless of whether data is actively being transferred. Some bot prevention tools deliberately keep connections open by sending responses at very slow rates, effectively causing bots to hang indefinitely. By implementing this mechanism, we ensure that requests are treated as failed if they exceed 5 seconds for any reason.
  4. The last stream that we need to create is our output stream: a file called results.txt.
  5. Finally, we have all the pieces together! We just need to combine the streams into a pipeline to let the data flow between them. And, once the pipeline completes, we print a success message.

Now, we can run the check-urls.js module with a command such as this:

node check-urls.js urls.txt

Here, the file urls.txt contains a list of URLs (one per line); for example:

https://mario.fyi
https://loige.co
http://thiswillbedownforsure.com

When the command finishes running, we will see that a file, results.txt, was created. This contains the results of the operation; for example:

http://thiswillbedownforsure.com is down
https://mario.fyi is up
https://loige.co is up

There is a good probability that the order in which the results are written is different from the order in which the URLs were specified in the input file. This is clear evidence that our stream executes its tasks concurrently, and it does not enforce any order between the various data chunks in the stream.

For the sake of curiosity, we might want to try replacing ConcurrentStream with a normal Transform stream and compare the behavior and performance of the two (you might want to do this as an exercise). Using Transform directly is way slower, because each URL is checked in sequence, but on the other hand, the order of the results in the file results.txt is preserved.

In the next section, we will see how to extend this pattern to limit the number of concurrent tasks running at a given time.

Unordered limited concurrent execution

If we try to run the check-urls.js application against a file that contains thousands or millions of URLs, we will surely run into issues. Our application will create an uncontrolled number of connections all at once, sending a considerable amount of data concurrently, and potentially undermining the stability of the application and the availability of the entire system. As we already know, the solution to keep the load and resource usage under control is to limit the number of concurrent tasks running at any given time.

Let’s see how this works with streams by creating a limited-concurrent-stream.js module, which is an adaptation of concurrent-stream.js, which we created in the previous section.

Let’s see what it looks like, starting from its constructor (we will highlight the changed parts):

export class LimitedConcurrentStream extends Transform {
  constructor (concurrency, userTransform, opts) {
    super({ ...opts, objectMode: true })
    this.concurrency = concurrency
    this.userTransform = userTransform
    this.running = 0
    this.continueCb = null
    this.terminateCb = null
  }
// ...

We need a concurrency limit to be taken as input, and this time, we are going to save two callbacks, one for any pending _transform method (continueCb—more on this next) and another one for the callback of the _flush method (terminateCb).

Next is the _transform() method:

  _transform (chunk, enc, done) {
    this.running++
    this.userTransform(
      chunk,
      enc,
      this.push.bind(this),
      this._onComplete.bind(this)
    )
    if (this.running < this.concurrency) {
      done()
    } else {
      this.continueCb = done
    }
  }

This time, in the _transform() method, we must check whether we have any free execution slots before we can invoke done() and trigger the processing of the next item. If we have already reached the maximum number of concurrently running streams, we save the done() callback in the continueCb variable so that it can be invoked as soon as a task finishes.

The _flush() method remains exactly the same as in the ConcurrentStream class, so let’s move directly to implementing the _onComplete() method:

  _onComplete (err) {
    this.running--
    if (err) {
      return this.emit('error', err)
    }
    const tmpCb = this.continueCb
    this.continueCb = null
    tmpCb?.()
    if (this.running === 0) {
      this.terminateCb && this.terminateCb()
    }
  }

Every time a task completes, we invoke any saved continueCb() that will cause the stream to unblock, triggering the processing of the next item.

That’s it for the LimitedConcurrentStream class. We can now use it in the check-urls.js module in place of ConcurrentStream and have the concurrency of our tasks limited to the value that we set (check the code in the book’s repository for a complete example).

Ordered concurrent execution

The concurrent streams that we created previously may shuffle the order of the emitted data, but there are situations where this is not acceptable. Sometimes, in fact, it is necessary to have each chunk emitted in the same order in which it was received. However, not all hope is lost. We can still run the transform function concurrently; all we must do is sort the data emitted by each task so that it follows the same order in which the data was received. It’s important here to clearly distinguish between the internal processing logic applied to each received chunk, which can safely occur concurrently and therefore in any arbitrary order, and how the processed data is ultimately emitted by the transform stream, which might need to preserve the original order of chunks.

The technique we are going to use involves the use of a buffer to reorder the chunks while they are emitted by each running task. For brevity, we are not going to provide an implementation of such a stream, as it’s quite verbose for the scope of this book. What we are going to do instead is reuse one of the available packages on npm built for this specific purpose, that is, parallel-transform (nodejsdp.link/parallel-transform).

We can quickly check the behavior of an ordered concurrent execution by modifying our existing check-urls module. Let’s say that we want our results to be written in the same order as the URLs in the input file, while executing our checks concurrently. We can do this using parallel-transform:

//...
import parallelTransform from 'parallel-transform' // v1.2.0
const inputFile = createReadStream(process.argv[2])
const fileLines = createInterface({
  input: inputFile,
})
const checkUrls = parallelTransform(8, async function (url, done) {
  if (!url) {
    return done()
  }
  try {
    await fetch(url, { method: 'HEAD', timeout: 5 * 1000 })
    this.push(`${url} is up\n`)
  } catch (err) {
    this.push(`${url} is down: ${err}\n`)
  }
  done()
})
const outputFile = createWriteStream('results.txt')
await pipeline(fileLines, checkUrls, outputFile)
console.log('All urls have been checked')

In the example here, parallelTransform() creates a Transform stream in object mode that executes our transformation logic with a maximum concurrency of 8. If we try to run this new version of check-urls.js, we will now see that the results.txt file lists the results in the same order as the URLs appear in the input file. It is important to see that, even though the order of the output is the same as the input, the asynchronous tasks still run concurrently and can possibly complete in any order.

When using the ordered concurrent execution pattern, we need to be aware of slow items blocking the pipeline or growing the memory indefinitely. In fact, if there is an item that requires a very long time to complete, depending on the implementation of the pattern, it will either cause the buffer containing the pending ordered results to grow indefinitely or the entire processing to block until the slow item completes. With the first strategy, we are optimizing for performance, while with the second, we get predictable memory usage. parallel-transform implementation opts for predictable memory utilization and maintains an internal buffer that will not grow more than the specified maximum concurrency.

With this, we conclude our analysis of the asynchronous control flow patterns with streams. Next, we are going to focus on some piping patterns.

lock icon The rest of the chapter is locked
Visually different images
CONTINUE READING
83
Tech Concepts
36
Programming languages
73
Tech Tools
Icon Unlimited access to the largest independent learning library in tech of over 8,000 expert-authored tech books and videos.
Icon Innovative learning tools, including AI book assistants, code context explainers, and text-to-speech.
Icon 50+ new titles added per month and exclusive early access to books as they are being written.
Node.js Design Patterns
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $19.99/month. Cancel anytime
Modal Close icon
Modal Close icon