Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Save more on your purchases! discount-offer-chevron-icon
Savings automatically calculated. No voucher code required.
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletter Hub
Free Learning
Arrow right icon
timer SALE ENDS IN
0 Days
:
00 Hours
:
00 Minutes
:
00 Seconds
Arrow up icon
GO TO TOP
Node.js Design Patterns

You're reading from   Node.js Design Patterns Level up your Node.js skills and design production-grade applications using proven techniques

Arrow left icon
Product type Paperback
Published in Sep 2025
Publisher Packt
ISBN-13 9781803238944
Length 732 pages
Edition 4th Edition
Languages
Tools
Arrow right icon
Authors (2):
Arrow left icon
Luciano Mammino Luciano Mammino
Author Profile Icon Luciano Mammino
Luciano Mammino
Mario Casciaro Mario Casciaro
Author Profile Icon Mario Casciaro
Mario Casciaro
Arrow right icon
View More author details
Toc

Table of Contents (16) Chapters Close

Preface 1. The Node.js Platform 2. The Module System FREE CHAPTER 3. Callbacks and Events 4. Asynchronous Control Flow Patterns with Callbacks 5. Asynchronous Control Flow Patterns with Promises and Async/Await 6. Coding with Streams 7. Creational Design Patterns 8. Structural Design Patterns 9. Behavioral Design Patterns 10. Testing: Patterns and Best Practices 11. Advanced Recipes 12. Scalability and Architectural Patterns 13. Messaging and Integration Patterns 14. Other Books You May Enjoy
15. Index

Readable stream utilities

In this chapter, we’ve explored how Node.js streams work, how to create custom streams, and how to compose them into efficient, elegant data processing pipelines. To complete the picture, let’s look at some utilities provided by the node:stream module that simplify working with Readable streams. These utilities are designed to streamline data processing in a streaming fashion and bring a functional programming flavor to stream operations.

All these utilities are methods available for any Readable stream, including Duplex, PassThrough, and Transform streams. Since most of these methods return a new Readable stream, they can be chained together to create expressive, pipeline-like code. Unsurprisingly, many of these methods mirror common operations available in the Array prototype, but they are optimized for handling streaming data.

Here’s a summary of the key methods:

Mapping and transformation

  • readable.map(fn): Applies a transformation function (fn) to each chunk in the stream, returning a new stream with the transformed data. If fn returns a Promise, the result is awaited before being passed to the output stream.
  • readable.flatMap(fn): Similar to map, but allows fn to return streams, iterables, or async iterables, which are then flattened and merged into the output stream.

Filtering and iteration

  • readable.filter(fn): Filters the stream by applying fn to each chunk. Only chunks for which fn returns a truthy value are included in the output stream. Supports async fn functions.
  • readable.forEach(fn): Invokes fn for each chunk in the stream. This is typically used for side effects rather than producing a new stream. If fn returns a Promise, it will be awaited before processing the next chunk.

Searching and evaluation

  • readable.some(fn): Checks if at least one chunk satisfies the condition in fn. Once a truthy value is found, the stream is destroyed, and the returned Promise resolves to true. If no chunk satisfies the condition, it resolves to false.
  • readable.every(fn): Verifies if all chunks satisfy the condition in fn. If any chunk fails the condition, the stream is destroyed, and the returned Promise resolves to false. Otherwise, it resolves to true when the stream ends.
  • readable.find(fn): It returns a Promise that will resolve to the value of the first chunk that satisfies the condition in fn. If no chunk meets the condition, the returned Promise will resolve to undefined once the stream ends.

Limiting and reducing

  • readable.drop(n): Skips the first n chunks in the stream, returning a new stream that starts from the (n+1)th chunk.
  • readable.take(n): Returns a new stream that includes, at most, the first n chunks. Once n chunks are reached, the stream is terminated.
  • readable.reduce(fn, initialValue): Reduces the stream by applying fn to each chunk, accumulating a result that is returned as a Promise. If no initialValue is provided, the first chunk is used as the initial value.

The official documentation has lots of examples for all these methods and there are other less common methods we haven’t explored for brevity. We recommend you check out the docs (nodejsdp.link/stream-iterators) if any of these still feel confusing and you are unsure about when to use them.

Just to give you a more practical overview, let’s re-implement the processing pipeline we illustrated before to explain filtering and reducing with a custom Transform stream, but this time we are going to use only Readable stream utilities. As a reminder, in this example, we are parsing a CSV file that contains sales data. We want to calculate the total amount of profit made from sales in Italy. Every line of the CSV file has 3 fields: type, country, and profit. The first line contains the CSV headers.

import { createReadStream } from 'node:fs'
import { createInterface } from 'node:readline'
import { Readable, compose } from 'node:stream'
import { createGunzip } from 'node:zlib'
const uncompressedData = compose( // 1
  createReadStream('data.csv.gz'),
  createGunzip()
)
const byLine = Readable.from( // 2
  createInterface({ input: uncompressedData })
)
const totalProfit = await byline // 3
  .drop(1) // 4
  .map(chunk => { // 5
    const [type, country, profit] = chunk.toString().split(',')
    return { type, country, profit: Number.parseFloat(profit) }
  })
  .filter(record => record.country === 'Italy') // 6
  .reduce((acc, record) => acc + record.profit, 0) // 7
console.log(totalProfit)

Here’s a step-by-step breakdown of what the preceding code does:

  1. The data comes from a gzipped CSV file, so we initially compose a file read stream and a decompression stream to create a source stream that gives uncompressed CSV data.
  2. We want to read the data line by line, so we use the createInterface() utility from the node:readline module to wrap our source stream and give us a new Readable stream (byLine) that produces lines from the original stream.
  3. Here’s where we start to use some of the helpers we discussed in this section. Since the last helper is .reduce(), which returns a Promise, we use await here to wait for the returned Promise to resolve and to capture the final result in the total variable.
  4. The first helper we use is .drop(1), which allows us to skip the first line of the uncompressed source data. This line will contain the CSV header (“type,country,profit”) and no useful data, so it makes sense to skip it. This operation returns a new Readable stream, so we can chain other helper methods.
  5. The next helper we use in the chain is .map(). In the mapping function, we provide all the necessary logic to parse a line from the original CSV file and convert it into a record object containing the fields type, country, and profit. This operation returns another Readable stream, so we can keep chaining more helper functions to continue building our processing logic.
  6. The next step is .filter(), which we use to retain only records that represent profit associated with the country Italy. Once again, this operation gives us a new Readable stream.
  7. The last step of the processing pipeline is .reduce(). We use this helper to aggregate all the filtered records by summing their profit. As we mentioned before, this operation will give us a Promise that will resolve to the total profit once the stream completes.

This example shows how to create stream processing pipelines using a more direct approach. In this approach, we chain helper methods, and we have all the transformation logic clearly visible in the same context (assuming we define all the transformation functions in line). This approach can be particularly convenient in situations where the transformation logic is very simple, and you don’t need to build highly specialized and reusable custom Transform streams.

Note that, in this example, we created our own basic way of parsing records out of CSV lines rather than using a dedicated library for it. We did this just to have an excuse to showcase how to use the .drop() and .map() methods. Our implementation is very rudimentary, and it doesn’t handle all the possible edge cases. This is fine because we know there aren’t edge cases (e.g., quoted fields) in our input data, but in real-world projects, we would recommend using a reliable CSV parsing library instead.

lock icon The rest of the chapter is locked
Visually different images
CONTINUE READING
83
Tech Concepts
36
Programming languages
73
Tech Tools
Icon Unlimited access to the largest independent learning library in tech of over 8,000 expert-authored tech books and videos.
Icon Innovative learning tools, including AI book assistants, code context explainers, and text-to-speech.
Icon 50+ new titles added per month and exclusive early access to books as they are being written.
Node.js Design Patterns
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $19.99/month. Cancel anytime
Modal Close icon
Modal Close icon