admin管理员组

文章数量:1429708

I am trying to create 150 million lines of data and write the data into a csv file so that I can insert the data into different databases with little modification.

I am using a few functions to generate seemingly random data and pushing the data into the writable stream.

The code that I have right now is unsuccessful at handling memory issue.

After a few hours of research, I am starting to think that I should not be pushing each data at the end of the for loop because it seems that the pipe method simply cannot handle garbage collection this way.

Also, I found a few StackOverFlow answers and NodeJS docs that remend against using push at all.

However, I am very new to NodeJS and I feel like I am blocked and do not know how to proceed from here.

If someone can provide me any guidance on how to proceed and give me an example, I would really appreciate it.

Below is a part of my code to give you a better understanding of what I am trying to achieve.

P.S. -

I have found a way to write successfully handle memory issue without using pipe method at all --I used the drain event-- but I had to start from scratch and now I am curious to know if there is a simple way to handle this memory issue without pletely changing this bit of code.

Also, I have been trying to avoid using any library because I feel like there should be a relatively easy tweak to make this work without using a library but please tell me if I am wrong. Thank you in advance.

// This is my target number of data
const targetDataNum = 150000000; 

// Create readable stream
const readableStream = new Stream.Readable({
  read() {}
});

// Create writable stream
const writableStream = fs.createWriteStream('./database/RDBMS/test.csv');

// Write columns first
writableStream.write('id, body, date, dp\n', 'utf8');

// Then, push a number of data to the readable stream (150M in this case)
for (var i = 1; i <= targetDataNum; i += 1) {
  const id = i;
  const body = lorem.paragraph(1);
  const date = randomDate(new Date(2014, 0, 1), new Date());
  const dp = randomNumber(1, 1000);
  const data = `${id},${body},${date},${dp}\n`;
  readableStream.push(data, 'utf8');
};

// Pipe readable stream to writeable stream
readableStream.pipe(writableStream);

// End the stream
readableStream.push(null);

I am trying to create 150 million lines of data and write the data into a csv file so that I can insert the data into different databases with little modification.

I am using a few functions to generate seemingly random data and pushing the data into the writable stream.

The code that I have right now is unsuccessful at handling memory issue.

After a few hours of research, I am starting to think that I should not be pushing each data at the end of the for loop because it seems that the pipe method simply cannot handle garbage collection this way.

Also, I found a few StackOverFlow answers and NodeJS docs that remend against using push at all.

However, I am very new to NodeJS and I feel like I am blocked and do not know how to proceed from here.

If someone can provide me any guidance on how to proceed and give me an example, I would really appreciate it.

Below is a part of my code to give you a better understanding of what I am trying to achieve.

P.S. -

I have found a way to write successfully handle memory issue without using pipe method at all --I used the drain event-- but I had to start from scratch and now I am curious to know if there is a simple way to handle this memory issue without pletely changing this bit of code.

Also, I have been trying to avoid using any library because I feel like there should be a relatively easy tweak to make this work without using a library but please tell me if I am wrong. Thank you in advance.

// This is my target number of data
const targetDataNum = 150000000; 

// Create readable stream
const readableStream = new Stream.Readable({
  read() {}
});

// Create writable stream
const writableStream = fs.createWriteStream('./database/RDBMS/test.csv');

// Write columns first
writableStream.write('id, body, date, dp\n', 'utf8');

// Then, push a number of data to the readable stream (150M in this case)
for (var i = 1; i <= targetDataNum; i += 1) {
  const id = i;
  const body = lorem.paragraph(1);
  const date = randomDate(new Date(2014, 0, 1), new Date());
  const dp = randomNumber(1, 1000);
  const data = `${id},${body},${date},${dp}\n`;
  readableStream.push(data, 'utf8');
};

// Pipe readable stream to writeable stream
readableStream.pipe(writableStream);

// End the stream
readableStream.push(null);
Share Improve this question asked Jul 15, 2020 at 7:39 Will ParkWill Park 753 silver badges7 bronze badges 1
  • 1 Why are you surprised? You're trying to push 150000000 objects into an array before you write anything to disk. You're running out of memory trying to do that. This has nothing to do with .pipe() - it has to do with pushing all that data into memory before you start writing anything. Instead, you should write the data incrementally as you create it so you don't have to keep any of it in memory other than the current object you're creating. While this could be done with .pipe(), that's not really the right tool for this job. – jfriend00 Commented Jul 15, 2020 at 8:04
Add a ment  | 

4 Answers 4

Reset to default 2

You were running out of memory because you were pre-generating all the data in memory before you wrote any of it to disk. Instead, you need a strategy to write is as you generate so you don't have to hold large amounts of data in memory.

It does not seem like you need .pipe() here because you control the generation of the data (it's not ing from some random readStream).

So, you can just generate the data and immediately write it and handle the drain event when needed. Here's a runnable example (this creates a very large file):

const {once} = require('events');
const fs = require('fs');

// This is my target number of data
const targetDataNum = 150000000;

async function run() {

    // Create writable stream
    const writableStream = fs.createWriteStream('./test.csv');

    // Write columns first
    writableStream.write('id, body, date, dp\n', 'utf8');

    // Then, push a number of data to the readable stream (150M in this case)
    for (let i = 1; i <= targetDataNum; i += 1) {
      const id = i;
      const body = lorem.paragraph(1);
      const date = randomDate(new Date(2014, 0, 1), new Date());
      const dp = randomNumber(1, 1000);
      const data = `${id},${body},${date},${dp}\n`;
      const canWriteMore = writableStream.write(data);
      if (!canWriteMore) {
          // wait for stream to be ready for more writing
          await once(writableStream, "drain");       
      }
    }
    writableStream.end();
}

run().then(() => {
    console.log(done);
}).catch(err => {
    console.log("got rejection: ", err);
});

// placeholders for the functions that were being used
function randomDate(low, high) {
    let rand = randomNumber(low.getTime(), high.getTime());
    return new Date(rand);
}

function randomNumber(low, high) {
    return Math.floor(Math.random() * (high - low)) + low;
}

const lorem = {
    paragraph: function() {
        return "random paragraph";
    }
}

Since you're new to streams, maybe start with an easier abstraction: generators. Generators generate data only when it is consumed (just like Streams should), but they don't have buffering and plicated constructors and methods.

This is just your for loop, moved into a generator function:

function * generateData(targetDataNum) {
  for (var i = 1; i <= targetDataNum; i += 1) {
    const id = i;
      const body = lorem.paragraph(1);
    const date = randomDate(new Date(2014, 0, 1), new Date());
    const dp = randomNumber(1, 1000);
    yield `${id},${body},${date},${dp}\n`;
  }
}

In Node 12, you can create a Readable stream directly from any iterable, including generators and async generators:

const stream = Readable.from(generateData(), {encoding: 'utf8'})
stream.pipe(writableStream)

i suggest to try a solution like the following:

const { Readable } = require('readable-stream');

class CustomReadable extends Readable {
  constructor(max, options = {}) {
    super(options);
    this.targetDataNum = max;
    this.i = 1;
  }

  _read(size) {
    if (i <= this.targetDataNum) {
      // your code to build the csv content
      this.push(data, 'utf8');
      return;
    }
    this.push(null);
  }
}

const rs = new CustomReadable(150000000);

rs.pipe(ws);

Just plete it with your portion of code to fill the csv and create the writable stream.

With this solution you leave calling the rs.push method to the internal _read stream method invoked until this.push(null) is not called. Probably before you were filling the internal stream buffer too fast calling push manually in a loop getting the out memory error.

Try pipeing to the WritableStream before you start pumping data into the ReadableStream and yield before you write the next chunk.

...

// Write columns first
writableStream.write('id, body, date, dp\n', 'utf8');

// Pipe readable stream to writeable stream
readableStream.pipe(writableStream);

// Then, push a number of data to the readable stream (150M in this case)
for (var i = 1; i <= targetDataNum; i += 1) {
  const id = i;
  const body = lorem.paragraph(1);
  const date = randomDate(new Date(2014, 0, 1), new Date());
  const dp = randomNumber(1, 1000);
  const data = `${id},${body},${date},${dp}\n`;
  readableStream.push(data, 'utf8');

  // somehow YIELD for the STREAM to drain out.

};
...

The entire Stream implementation of Node.js relies on the fact that the wire is slow and that the CPU can actually have a downtime before the next chunk of data es in from the stream source or till the next chunk of data has been written to the stream destination.

In the current implementation, since the for-loop has booked up the CPU, there is no downtime for the actual pipeing of the data to the writestream. You will be able to catch this if you watch cat test.csv which will not change while the loop is running.

As (I am sure) you know, pipe helps in guaranteeing that the data you are working with is buffered in memory only in chunks and not as a whole. But that guarantee only holds true if the CPU gets enough downtime to actually drain the data.

Having said all that, I wrapped your entire code into an async IIFE and ran it with an await for a setTimeout which ensures that I yield for the stream to drain the data.

let fs = require('fs');
let Stream = require('stream');

(async function () {

  // This is my target number of data
  const targetDataNum = 150000000;

  // Create readable stream
  const readableStream = new Stream.Readable({
    read() { }
  });

  // Create writable stream
  const writableStream = fs.createWriteStream('./test.csv');

  // Write columns first
  writableStream.write('id, body, date, dp\n', 'utf8');

  // Pipe readable stream to writeable stream
  readableStream.pipe(writableStream);

  // Then, push a number of data to the readable stream (150M in this case)
  for (var i = 1; i <= targetDataNum; i += 1) {
    console.log(`Pushing ${i}`);
    const id = i;
    const body = `body${i}`;
    const date = `date${i}`;
    const dp = `dp${i}`;
    const data = `${id},${body},${date},${dp}\n`;
    readableStream.push(data, 'utf8');

    await new Promise(resolve => setImmediate(resolve));

  };

  // End the stream
  readableStream.push(null);

})();

This is what top looks like pretty much the whole time I am running this.

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
15213 binaek    **  **  ******  *****  ***** * ***.*  0.5   *:**.** node   

Notice the %MEM which stays more-or-less static.

本文标签: javascriptNodeJS Using Pipe To Write A File From A Readable Stream Gives Heap Memory ErrorStack Overflow