Skip to content

What are nodejs streams?

Posted on:August 27, 2020 at 12:12 PM

Intro

In this article, we will dedicate some time exploring the nodejs streams module, the purpose of the article is just to get a better understanding how the API works and get some ideas where to use it. So first let’s check what the nodejs documentation says about So by the nodejs documentation

The stream module is an abstract interface for working with streaming data in Node.js.The stream module provides an API for implementing the stream interface. There are many stream object provide by Node.js. For instance, a request to an HTTP server and process.stdout are bot stream instance.

So in layman’s terms, the stream module is a data-handling approach which is used for reading input, make some trannformation to it and then write the changes to an output in a sequntially way. Streams are quite handy for specific tasks like:

Streams are designed in a way that enables us to reuse them for multiple purposes with the help of the pipe method, which means less code to write for us.

Below you can see an example of how we are piping our readable stream to the gzip transformer which compresses our data and sends it to our writable stream http.responses

const { createGzip } = require("zlib");
const { createReadStream } = require("fs");
const { pipeline } = require("stream");
const { createServer } = require("http");

const PORT = 7000;

// Old approach
createServer((req, res) => {
  // will create stream which will read the content of the file
  createReadStream("./alphabet.txt").pipe(new createGzip()).pipe(res);
}).listen(PORT, () => console.log(`Server listening on port ${PORT}`));

// New approach
// The documentation is recommending this aproach because errors are being forwarded from stream to stream,
// in this case we still can register just one error handler for all the stream rather that registerung an
// error handler for every stream.

createServer((req, res) => {
  pipeline(createReadStream("./alphabet.txt"), new createGzip(), res, err => {
    if (err) {
      console.log("There was an error");
      return;
    }
    console.log("👌");
  });
}).listen(PORT, () => console.log(`Server listening on port ${PORT}`));

Types of streams

now that we have getting some basic understanind of stream let’s see some basic anatomy of streams. Nodejs core module support 4 types of streams:

each of those streams are instances of EventEmitter module

Redable stream

A readable stream is used for emitting data from a specific source. Most notable readble streams can be seen in the nodejs ecosystem like process.stdin, fs.createreadstream, http.ClientRequest and etc.

When implementing a readable stream we have to implment only one method and that is the read method. The purpos of this method is the emitting of data. The readable stream will push data until we will push a null value which will signal to the stream that there is no more data.

In the below example we can see a basic example where we are creating an ASCII stream which will generate the whole ASCII alphabet.

const fs = require("fs");
const { Readable } = require("stream");

const alphabetStream = new Readable({
  read() {
    for (let index = 65; index < 65 + 25; index++) {
      this.push(String.fromCharCode(index));
    }
    // don't forget to send null otherwise the stream will never end 😬
    this.push(null);
  },
});

alphabetStream.pipe(fs.createWriteStream("alphabet.txt"));

The basic API is quite simple the only thing that we have to know is that we have to implement the read method and not forget to end the stream with the null value. But this is not the only way to implement a redable stream with never version of node we can use the method Redables.from which as an arguments takes an iterable object

In our example we will use the generator API which implements the iterable protocol, where we will rewrite our prev implmentation with the new version.

in the below example we can see then implementation with the help of the generators API.

const { Readable } = require("stream");

async function* generate() {
  for (let index = 65; index < 65 + 25; index++) {
    yield String.fromCharCode(index);
  }
}

const readable = Readable.from(generate());
redable.pipe(fs.createWriteStream("alphabet.txt"));

// Compared with the prev example we are listening to the stream events. `data` which outputs
// the streams content and then `end` event which tells us that the stream has finished.
// important all stream have those events, because like we mentioned at the beginning streams
// inherit the `EventEmitter instance`

readable.on("data", chunk => {
  console.log(chunk);
});

readable.on("end", chunk => {
  console.log("End of alphabet stream");
});

as you can see the api is easier to use plus there is less way to meka an error like forgetting to prodived the null value after finishig sending data from the writable stream.

Writable stream

Now that we know what readable streams are let’s look into Writable streams. A writable stream is meant for receving data. What this means is that our stream gets our data from a redable stream and then stores,displayes it with the writable stream.

To be honest in the previous example we have already used a writable stream and that is fs.createWriteStream. With the help of this stream, we have gotten the data from the redable stream and then store it in the file alphabet.txt. For this to work we use the pipe method on the readable stream where we have provided the writable stream which will store the data to a file.

In the below example you can check how the piping is working.

// write cotent to the file system

Readable.from(generate()).pipe(fs.createWriteStream("alphabet.txt"));

with the previous example, we store our generated character to the file system, but with a writable stream, we can store our readable stream into different destinations. In the below example, we will display our readable stream data as an http response. We will use the http module where we will pipe our readable stream to the writable response stream. With this, we will tell our server that on every request create a readable stream and send the data to the response type of our request which then will be visible on the port localhost:4444.

const PORT = 4444;
// write content to http response
http
  .createServer((request, response) => {
    Readable.from(generate()).pipe(response);
  })
  .listen(PORT, () => console.log(`Server listening: ${PORT}`)); // running the port on serve

// !important know you can use pipe in the web server, but there can be memory leaks, because of read stream
// not being closed or destroyed. So rather than use the pipeline [package](https://nodejs.org/api/stream.html#stream_stream_pipeline_source_transforms_destination_callback)
// Decisions  https://stackoverflow.com/questions/58875655/whats-the-difference-between-pipe-and-pipeline-on-streams

Info about modules

Transform stream

Now that we have looked at the Readable and Writable stream we can finally check the Tranform stream. Reminder if you didn’t read the prev section, please do that first. The transform stream implements the Readable stream and the Writable stream so if you don’t know what those are then what are you doing here. The transform stream inherits all the functionality from the above two streams, including that it provides a new method called transform. As the name suggests the transform method is used for transforming the data from the readable stream and propagating it to the writable stream. An example use of a transform stream would be when sending an http response, where we would like to compress our file. In this case, we could use the nodejs zlib library where we could compress our content with the help of a compressing algorithm like gzip or brotli.

In the below example we will reuse our prev readable stream and simple convert all characters to Uppercase.

const upperCaseTransform = new Transform({
  transform(chunk, encoding, callback) {
    const value = chunk.toString().toLowerCase();
    callback(null, `${value}`);
  },
});

Readable.from(generate()).pipe(upperCaseTransform).pipe(process.stdout);

that is the basic example of how to create a simple transform stream.

Duplex stream

and the last stream the duplex stream is similar than the transform stream with the small change that rather than having the readable and writable stream combined in one, we have one readable stream and one writable stream. So what does that mean is that we can work with both streams independently. I didn’t really have i good idea how to present this so i will just point you to some modules that are using duplex streams like node modules like TCP sockets, Zlib.

Ok, now that we have gone over all the streams types. Let’s go over some good patterns to use when working with streams. When working with a stream you will most likely encounter and will improve your development experience.

Basic streaming pattern:

this section is meant to show you some modules and approaches for working with stream easier. There are three patterns that i think you will encounter a lot of time and those are combining, forking, and merging stream. In the subsection, we will go over all three and show examples plus provide some npm modules for using them in real projects.

Combining multiple streams into one stream

Combining streams in it’s essence is just a function which combines multiple streams into one stream which hides it’s inner implmentation from the outsider. Important the streams are executed in sequnce. That means that the streams in the combine function are executed one after another.

In this example you can see how we are combining the zlib and crypo stream in one stream which purpose is to compress and encripy our readable strea.

//
// input -> combineStream( stream1 -> stream2 -> steam3) -> output
const { PassThrough } = require("stream");
const fs = require("fs");
const zlib = require("zlib");
const crypto = require("crypto");

const compressAndEncrpy = password => {
  return CombineStreams(
    zlib.createGzip(),
    crypto.createCipher("aes192", password)
  );
};

// Read file `urls.txt
//  -> combineStream( gzip stream -> encript stream )
//  -> write to file encrypt.txt file
fs.createReadStream("./urls.txt")
  .pipe(compressAndEncrpy("test1234"))
  .pipe(fs.createWriteStream("./encrpyt.txt"));

For production project use:

in the below example you can see a simple implementation of a combine function.

// needed for showing the pattern
const { PassThrough } = require("stream");

// Pipe for Compbining multiple streams
const CombineStreams = function (...streams) {
  const streamPass = new PassThrough();

  streamPass.on("pipe", source => {
    source.unpipe(streamPass);
    for (const stream of streams) {
      source = source.pipe(stream);
    }

    streamPass.pipe = (dest, options) => {
      return source.pipe(dest, options);
    };
  });

  return streamPass;
};

Forking streams

Forking stream is the easiest pattern, because we don’t have to do anything because of the help of the pipe function. For example if we wanted to send some data to our disk and network we could use the same source and pipe it to different destination.

In the below example you can see how we are sending our readable data to the file system to two separate files at the same time.

const { createReadStream, createWriteStream } = require("fs");
const zlib = require("zlib");

const source = createReadStream("./example.txt");

// storing data to destination one
source.pipe(createWriteStream("destination-1.txt"));

// first compressing the file and then storing it to destination 2
source.pipe(zlib.createGzip()).pipe(createWriteStream("destination-2.txt"));

Merging streams

Mergin streams takes multiples streams and merges them together into one stream. Example of that would be getting data from two redable streams which we would merge into one stream and them send it to a writable stream.

For production projects use:

// example fo implementing merge streams
function merge(...streams) {
  const streamPass = new PassThrough();
  let waiting = streams.length;
  for (const stream of streams) {
    stream.pipe(streamPass, { end: false });
    // used for signaling that all streamas finished and
    // this should finish to
    stream.once("end", () => --waiting === 0 && streamPass.end());
  }
  return streamPass;
}

And with this we have go over most of the stream module. Maybe in the future i will update the articles. If i will find something interesting to include into the article.

Terminology

Buffers

The Buffer class is used for working with raw binary data. Every buffer corresponds to some raw memory which is allocated outside of V8.

Buffers

Streams

Stream allow us to process the data as it is generated or retrieved rather than Buffers where we have to wait for the end result.

Streams

More Resources