Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using async/await with pg-cursor? #1839

Open
zachsa opened this issue Feb 19, 2019 · 21 comments
Open

Using async/await with pg-cursor? #1839

zachsa opened this issue Feb 19, 2019 · 21 comments

Comments

@zachsa
Copy link

zachsa commented Feb 19, 2019

Hi - thanks for a great library! I have a question regarding the pg-cursor API.

In the example on using a cursor from start to finish, this is shown in the docs HERE:

const { Pool } = require('pg')
const Cursor = require('pg-cursor')

const pool = new Pool()
const client = await pool.connect()
const cursor = client.query(new Cursor('select * from generate_series(0, 5)'))
cursor.read(100, (err, rows) => {
  if (err) {
    throw err;
  }
  assert(rows.length == 6)
  cursor.read(100, (err, res) => {
    assert(rows.length == 0)
  })
})

It seems like I would need to wrap this in a promise if I wanted to be able to cycle through rows returned via the cursor?

Is it possible to use async/await here? I.e. something along the lines of the following:

const { Pool } = require('pg')
const Cursor = require('pg-cursor')

const pool = new Pool()
const client = await pool.connect()
const cursor = client.query(new Cursor('select * from generate_series(0, 5)'))
let rows = await cursor.read(100)
while (rows.length) {
  // do something with rows
  rows = await cursor.read(100)
}

Apologies if I've missed something in the documentation!

@charmander
Copy link
Collaborator

You should be able to promisify pg-cursor manually:

const { promisify } = require('util')
const Cursor = require('pg-cursor')

Cursor.prototype.readAsync = promisify(Cursor.prototype.read)



let rows = await cursor.readAsync(100)
while (rows.length) {
  // do something with rows
  rows = await cursor.readAsync(100)
}

But that seems like it would make a good built-in pg-cursor feature.

@zachsa
Copy link
Author

zachsa commented Jul 6, 2019

Hi @charmander. I agree that this would be nice to include in the pg library. I'd quite like to contribute on this - would that be okay?

I've used the Mongo Node.js client before and if I remember correctly they support either callback or promise style dynamically. I was pointed to code HERE that achieves this (the executeOperation function in case the line numbers change). Would this be a suitable approach for this library?

Do you have any pointers/preferences for a good way to incorporate promise support into the the cursor.read function? If not, and if this is something that you wouldn't mind me working on, I could look through the code and come up with suggestions.

@charmander
Copy link
Collaborator

@zachsa See https://github.com/brianc/node-pg-cursor/blob/1cdad4d8d20000dc6fb9a783394249b727106b84/index.js#L170-L185 and

node-postgres/lib/client.js

Lines 265 to 280 in 2c7be86

Client.prototype.connect = function (callback) {
if (callback) {
this._connect(callback)
return
}
return new this._Promise((resolve, reject) => {
this._connect((error) => {
if (error) {
reject(error)
} else {
resolve()
}
})
})
}
.

@zachsa
Copy link
Author

zachsa commented Jul 8, 2019

Thank you. That actually looks more straightforward than I was expecting.

@zachsa
Copy link
Author

zachsa commented Aug 26, 2019

Hi @charmander - I had a quick look at this over the weekend and will hopefully have a bit of time this month to look more into this (I would really like to contribute to this if I am able).

Previously I have used this library without paying too much attention to how it works. Looking at pg-cursor's package.json file I see that it is dependent on "pg": "6.x". However pg is at 7.x.

In a node app, my package.json app looks like this:

"pg": "^7.4.3",
"pg-cursor": "^2.0.0",

Does this mean that at app start I'm loading 2 separate instantiations of the pg module (of different versions)? I.e. I'm requiring pg 7.4.3, and also requiring pg-cursor. Does pg-cursor require the pg library separately as the module is loaded?

If so, why?

I am using it like this:

...
const client = await pool.connect()
const cursor = client.query(new Cursor(sql))

So to me it seems that the cursor wouldn't need to include a dependency on the pg library itself?

@charmander
Copy link
Collaborator

@zachsa pg-cursor has "pg": "6.x" in its devDependencies, which it uses for testing. It should be tested on the latest pg, but it doesn’t cause pg 6.x to be installed for you.

@zachsa
Copy link
Author

zachsa commented Aug 27, 2019

Ah thanks. I should have picked that up

@zachsa
Copy link
Author

zachsa commented Sep 9, 2019

Hi @charmander, would it be welcome if I were to update the package.json dependency versions? For example, using the npm-check-updates package, these packages are out of date:

 eslint                   ^4.4.0  →   ^6.3.0
 eslint-config-standard  ^10.2.1  →  ^14.1.0
 eslint-plugin-import     ^2.7.0  →  ^2.18.2
 eslint-plugin-node       ^5.1.1  →  ^10.0.0
 eslint-plugin-promise    ^3.5.0  →   ^4.2.1
 eslint-plugin-standard   ^3.0.1  →   ^4.0.1
 mocha                    ^3.5.0  →   ^6.2.0
 pg                          6.x  →      7.x

Updating mocha didn't break anything. Updating the eslint requires some simple renaming of functions (for example, assert.equal => assert.strictEqual). But one test is now failing (https://github.com/brianc/node-pg-cursor/blob/master/test/query-config.js#L20-L34):

  it('passes types to result', (done) => {
    const client = new pg.Client()
    client.connect()
    const text = 'SELECT generate_series as num FROM generate_series(0, 2)'
    const types = {
      getTypeParser: () => () => 'foo'
    }
    const cursor = client.query(new Cursor(text, null, { types }))
    cursor.read(10, (err, rows) => {
      assert(!err)
      assert.deepStrictEqual(rows, [{ num: 'foo' }, { num: 'foo' }, { num: 'foo' }]) // changed from assert.deepEqual
      client.end()
      done()
    })
  })

The test output is this:

  1) query config passed to result
       passes types to result:

      Uncaught AssertionError [ERR_ASSERTION]: Expected values to be strictly deep-equal:
+ actual - expected

  [
+   anonymous {
-   {
      num: 'foo'
    },
+   anonymous {
-   {
      num: 'foo'
    },
+   anonymous {
-   {
      num: 'foo'
    }
  ]
      + expected - actual

I'm not sure what the expected behaviour should be? I can see that the test suite isn't expecting the constructor name for each object in the row.

@zachsa
Copy link
Author

zachsa commented Sep 9, 2019

What is actually happening here?

const types = {
  getTypeParser: () => () => 'foo'
}
const cursor = client.query(new Cursor(text, null, { types }))

I see from the documentation that a cursor is an instance of Submittable. But I'm not sure that that means in the context of passing the types object to the cursor constructor:

@zachsa
Copy link
Author

zachsa commented Sep 9, 2019

Also - is this the same issue (as in, promisifying the cursor) as discussed here? brianc/node-pg-cursor#36

@zachsa
Copy link
Author

zachsa commented Sep 9, 2019

Another question: The Cursor.prototype.read function - https://github.com/brianc/node-pg-cursor/blob/master/index.js#L170.

I assume this function is what is called in this scenario?

const client = await pool.connect()
const clientQueryResult = client.query(new Cursor(sql))
clientQueryResult.read(...)

@felixfbecker
Copy link

It would be amazing if Cursor could implement Symbol.asyncIterator so that it can be iterated with for await (const row of cursor) {}. This should be backwards compatible because old versions would never call it

@tienzochi
Copy link

tienzochi commented Oct 7, 2020

@charmander Great solution! Although, I tried your solution above but it doesn't seem to work with calling the client again inside the loop when the size of rows is smaller than the actual size. So for example I have 10 users in the users_table and I tried getting two at a time to double check if it reads the next bit, it doesn't seem to go along with another client call inside. Do you know the explanation for it? Here's a sample code of what I am trying to achieve:

const { Pool } = require("pg");
const Cursor = require("pg-cursor");
const { promisify } = require("util");

Cursor.prototype.readAsync = promisify(Cursor.prototype.read);

(async function () {
  const pool = new Pool({
    user: POSTGRES_USER,
    host: POSTGRES_HOST,
    port: POSTGRES_PORT,
    database: POSTGRES_DATABASE,
    password: POSTGRES_PASSWORD,
  });
  const client = await pool.connect();
  const cursor = client.query(
    new Cursor("select user_id, username from user_table")
  );

  let rows = await cursor.readAsync(2);

  while (rows.length) {
    for (let row of rows) {
      const rowrow = await client.query(
        `SELECT * FROM sales 
            WHERE user_id = '${row.user_id}'
            LIMIT 1`
      );

      // code below is unreachable
      console.log(rowrow);
    }

    rows = await cursor.readAsync(2);
  }

  cursor.close(() => {
    client.end();
  });
})();

@charmander
Copy link
Collaborator

@tienzochi You can’t make new queries on a client while one is already in progress (the one with the cursor). pool.query instead should work. (That example code should be written without queries in a loop at all, though – something like this, for example:

SELECT DISTINCT ON (user_id) user_id, username, sales.[…]
  FROM user_table LEFT JOIN sales USING (user_id)
  -- ORDER BY probably; if not, maybe EXISTS or equivalent join would be more appropriate

@brianc
Copy link
Owner

brianc commented Oct 9, 2020

@felixfbecker

It would be amazing if Cursor could implement Symbol.asyncIterator so that it can be iterated with for await (const row of cursor) {}.

For this you should probably use pg-query-stream which already supports this. pg-cursor is used by pg-query-stream. I'm definitely cool with promisifying the cursor, but really the cursor is a bit lower level than pg-query-stream and can be useful primarily when you want to read exactly a certain number of rows at a time in chunks to carefully control how you're consuming. pg-query-stream is probably more generally useful for "pipe this query to this other thing" but due to high water marks, back pressure, and internal buffering in node streams you don't have as fine grained control over the exact number of rows read. In practice this is usually fine, and kinda preferred as you usually don't need to think about things at that level...so would recommend pg-query-stream for most use cases, including async iteration.

@disfated
Copy link

I'm using this guy to sugarize cursors

const pg = require('pg');
const Cursor = require('pg-cursor');

const client = new pg.Client(/* ... */);

client.cursor = async function* cursor(query, values, chunkSize = 100) {
	const cursor = this.query(new Cursor(query, values));
	try {
		while (true) {
			const rows = await new Promise((resolve, reject) => {
				cursor.read(chunkSize, (error, rows) => error ? reject(error) : resolve(rows));
			});
			if (rows.length === 0) break;
			for (const row of rows) {
				yield row;
			}
		}
	} finally {
		cursor.close();
	}
};

// usage

try {
	const rows = client.cursor(`SELECT ...`, [], 20);
	for await (const row of rows) {
		use(row);
	}
} catch (error) {
	handle(error); // should perfectly catch all errors here
}

@sw360cab
Copy link

sw360cab commented Mar 8, 2021

@brianc It is not clear to me that if using an async iterator with pg-query-stream with a huge table, will basically give me the same result (memory saving), of trying promisifying the pg-cursor?

@mariusa
Copy link

mariusa commented Mar 4, 2022

I'm definitely cool with promisifying the cursor

@brianc could this be included, please? It would enable the example given by @charmander , out of the box, and close this issue.

Cursor.prototype.readAsync = promisify(Cursor.prototype.read) // should be built-in

let rows = await cursor.readAsync(100)
while (rows.length) {
  // do something with rows
  rows = await cursor.readAsync(100)
}

@eMuonTau
Copy link

cursor.read() already returns Promise if no callback supplied. Am i missing something?
https://github.com/brianc/node-postgres/blob/master/packages/pg-cursor/index.js#L231

pg: 8.7.3
pg-cursor: 2.7.3

@mariusa
Copy link

mariusa commented Apr 17, 2022

Looks so. I'll be able to test in 2 weeks. Thanks

@mariusa
Copy link

mariusa commented May 9, 2022

cursor.read() already returns Promise if no callback supplied.

This is correct, works fine. This issue can be closed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants