Skip to content
This repository has been archived by the owner on Dec 19, 2019. It is now read-only.

Getting Next Results Programmatically #36

Open
trevorglick opened this issue Jan 23, 2018 · 5 comments
Open

Getting Next Results Programmatically #36

trevorglick opened this issue Jan 23, 2018 · 5 comments

Comments

@trevorglick
Copy link

trevorglick commented Jan 23, 2018

Hey Brian,

I tweeted you about trying to get next results and you asked me to gist and open an issue here. I already solved my problem in code, but I wasn't able to us pg-cursor to do it.

Basically I had this:

async function test() {
  console.log("THIS IS THE TEST METHOD");
  const client = await pool.connect();
  const renditionsCursor = client.query(new Cursor("select * from object_stacks"));
  renditionsCursor.read(10, (err, rows) => {
      if (err) {
          throw err;
      }
      for (var row of rows) {
          console.log(`Checking: ${row.object_name}`);
      }
      renditionsCursor.read(10, (err, rows) => {
          for (var row of rows) {
              console.log(`Checking: ${row.object_name}`);
          }
      });
  });
}

And I was trying to figure out a way to programmatically do another renditionsCursor.read() and drain the cursor. The scenario is that I have about 1.5million rows I wanted to process and I thought pg-cursor might be able to help. It did seem like it could do the job but at the fault of my own I was unable to accomplish the goal with pg-cursor. I tried several tactics but I was never able to have one block of renditionsCursor.read() and then iterate it several times to get the next result sets.

On your NPM wiki you have a comment that says:
//Also - you probably want to use some sort of async or promise library to deal with paging //through your cursor results. node-pg-cursor makes no asumptions for you on that front.

So I was trying to use async/await to handle this but I just couldn't get it to grab the next results. Not really a big deal as I got this taken care of but I was curious to see if you've implemented what I was trying to do or if you have seen any examples of other people implementing the next results scenario.

Thanks for your time.

@brianc
Copy link
Owner

brianc commented Jan 23, 2018

hmm I think you can do something like this....bare with me this is off the top of my head so it wont be perfect but hopefully conveys the gist

import { promisifyAll } from 'bluebird';
import { Client } from 'pg'
import Cursor from 'pg-cursor'
const PromiseCursor = promsifyAll(PromiseCursor)

const client = new Client()
await client.connect()
const cursor = client.query(new PromiseCursor('your query here'))
let rows;
do {
  rows = await cursor.read() 
  // do something with rows here
} while(rows.length)

I think that should work?

@trevorglick
Copy link
Author

trevorglick commented Jan 23, 2018

I have been talking with some people in the SpeakJS Discord on the Node channel and @MrLeebo came up with this:

async function test() {
  console.log("THIS IS THE TEST METHOD");
  const client = await pool.connect();
  const renditionsCursor = client.query(new Cursor("select * from object_stacks"));

  function processData(err, rows) {
    if (err) {
      throw err;
    }

    for (let row of rows) {
      console.log(row.object_id);
      successCount++;
    }

    if (rows.length === 10) {
      renditionsCursor.read(10, processData);
    }
    console.log(`Success count is: ${successCount}`);
  }
  renditionsCursor.read(10, processData);
}

Worked well, and I added some additional logging just to make sure I was getting all my rows.

I haven't given yours a try, having just seen it. I'll give it a try and let you know what happens.

EDIT: Wasn't able to get your code to return the correct result set. Looks like with a little tweaking it could work. Going to just go with what I had for now since I've spent way to much time on this. Thanks for your quick replies and a great module!

@guomo
Copy link

guomo commented Jun 29, 2018

+1, same issues.

The examples docs for for pg-cursor are not very good. Showing an example of just 6 rows isn't what cursors are for. Fundamentally, if the solution requires recursion to effectively drain the cursor, this is sub-optimal. For example, I was able to pull 365,219 rows but the recursion depth got up to 367 calls. I've got a use case where need to pull millions of rows from the DB so I have no idea if it will work. Perhaps promises or pipes with flow control are a much better way? I'd submit a pull request if I had the bandwidth in my life to contribute, but I don't. Brian, I am deeply appreciative of your library and I'm not trying to crap on your efforts. Way better than I could do, kudos and many thanks! So, please take my design suggestion and criticism as kind as I intended it to be.

Thanks @Hudspeth for your example. It worked great for me with 300k rows. Crossing my fingers for millions.

@nmchaves
Copy link

nmchaves commented Aug 15, 2018

The solution below is working well.

import { Client } from "pg";
const Cursor = require("pg-cursor");
import { promisify } from "util";

const run = async () => {
  const client = new Client();
  await client.connect();

  const cursor = client.query(
    new Cursor("select * from my_big_table")
  );
  // ! IMPORTANT: do NOT overlook the `bind`
  // The cursor is storing some data internally to track its state. 
  // It needs to be able to access that data using the `this` keyword.
  // We're effectively going to pass `read` around, so we need to bind `this` to the cursor object.
  const promisifiedCursorRead = promisify(cursor.read.bind(cursor));

  const ROW_COUNT = 10;
  while (true) {
    const result = await promisifiedCursorRead(ROW_COUNT);

    if (result.length === 0) {
      break;
    }

    // handle `result` (the current batch of rows) here...

  }

  cursor.close(() => {
    client.end();
  });
};

run().catch(e => {
  console.error(e);
});

If you're using babel or Node 10 (which I'm not), then you can abstract away the edge conditions using an async generator and a for/await loop:

const run = async () => {
  const client = new Client();
  await client.connect();

  const cursor = client.query(new Cursor("select * from my_big_table"));
  // ! Same thing here. Don't forget the `bind`
  const promisifiedCursorRead = promisify(cursor.read.bind(cursor));

  for await (const result of resultGenerator(promisifiedCursorRead)) {
    // handle `result` (the current batch of rows) here. messy stuff is abstracted away by the generator
  }

  cursor.close(() => {
    client.end();
  });
};

async function* resultGenerator(promisifiedCursorRead) {
  const ROW_COUNT = 10;
  while (true) {
    const result = await promisifiedCursorRead(ROW_COUNT);

    if (result.length > 0) {
      yield result;
    } else {
      // no more rows left to process
      return;
    }
  }
}

I'm not using Node 10 yet, so I haven't tested the async generator approach, but it should at least be close to the right solution. Anyways, the 1st solution I posted works.

@zachsa
Copy link

zachsa commented Sep 9, 2019

I found the same using the cursor - that it would be nice if callingclient.query(new Cursor()) returned an iterator. The way I achieved this in my code (using the pg-cursor library), was by creating an iterator using promises, used like so:

let iterator = await createIterator ('select ... from ...')
while (!iterator.done) {
 let rows = iterator.rows
 // do stuff with rows
 iterator = await rowIterator.next()
}

The createIterator() function

const createIterator = async sql => {
  const client = await pool.connect()
  const cursor = client.query(new Cursor(sql))
  const batchSize = 100

  return (async function getRows(client, cursor, batchSize) {
    let done = false

    // Get next rows
    const rows = await new Promise((resolve, reject) =>
      cursor.read(
        batchSize,
        (err, rows) => err ? reject(err) : resolve(rows)
      )
    )

    // Check if iteration is finished
    if (rows.length < 1) {
      done = true
      cursor.close(() => client.release())
    }

    // Return the iterator
    return {
      done,
      rows,
      next: async () => await getRows(client, cursor, batchSize)
    } 
  })(client, cursor, batchSize)
}

(This is actually slightly modified, untested from the original code i had. The original, definitely working code is here: https://gist.github.com/zachsa/0c91bfd4ab435ef6e8ac4f85d541bd8b)

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants