Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NodeJs - Pause/Resume stream request #471

Closed
hosseinGanjyar opened this issue May 10, 2017 · 8 comments
Closed

NodeJs - Pause/Resume stream request #471

hosseinGanjyar opened this issue May 10, 2017 · 8 comments

Comments

@hosseinGanjyar
Copy link

hosseinGanjyar commented May 10, 2017

I fetch n millions records from SqlServer (streaming) and process over every record. I want to call nested functions into request.on('row', row => { }) and pause streaming until end of latest nested functions.

#NodeJs
#SqlServer

I want this:

const sql = require('mssql')
 
sql.connect(config, err => {
    // ... error checks 
 
    const request = new sql.Request()
    request.stream = true // You can set streaming differently for each request 
    request.query('select * from verylargetable') // or request.execute(procedure) 
 
    request.on('recordset', columns => {
        // Emitted once for each recordset in a query 
    })
 
    request.on('row', row => {
        // Emitted for each row in a recordset 
        //call process_1(row)
    })
 
    request.on('error', err => {
        // May be emitted multiple times 
    })
 
    request.on('done', result => {
        // Always emitted as the last one 
    })
})
 
sql.on('error', err => {
    // ... error handler 
})

function process_1(data){
    //process done after n minutes
   //if end process, then call process_2(newData)
}
function process_2(newData){
    //process done after n minutes
   //if end process, then resume streaming request
}

Notice: I road tediousjs/tedious#181 and other issue about pause/pesume stream request but I didn't get any idea!
thanks

@simonbuchan
Copy link

Use-case: backpressure from async-iterable usage:

type Deferred<T> = {
  promise: Promise<T>,
  resolve: (result: T) => void,
  reject: (reason: any) => void,
};
function createDeferred<T>(): Deferred<T> {
  const result = {} as Deferred<T>;
  result.promise = new Promise<T>((resolve, reject) => {
    result.resolve = resolve;
    result.reject = reject;
  });
  return result;
}

function streamQuery<T>(query: TemplateStringsArray, ...interpolations: any[]): AsyncIterable<T> {
  return { [Symbol.asyncIterator]: asyncIterator };

  function asyncIterator(): AsyncIterator<any> {
    const request = pool.request();
    request.stream = true;

    // Reproduce pool.query but for request.
    // This is equivalent to request._template('query', query, interpolations);
    const command = [query[0]];
    for (let i = 0; i !== interpolations.length; i++) {
      request.input(`p${i}`, interpolations[i]);
      command.push(`@p${i}`, query[i + 1]);
    }
    request.query(command.join('')); // ignoring query result.
    request.on('row', row);
    request.on('error', error);
    request.on('done', done);

    // Note this will buffer without bound, mssql doesn't let us apply backpressure (suspend streaming).

    // Results received, but not requested, in the order received
    const buffer: Array<Promise<IteratorResult<T>>> = [];
    let bufferDone = false;
    // Results requested, but not received, in the order requested
    const pending: Array<Deferred<IteratorResult<T>>> = [];
    // At least one of the above should always be empty.

    return { next };

    function next(): Promise<IteratorResult<T>> {
      // if (buffer.length === LOW) request.resume();
      if (buffer.length) {
        return buffer.shift()!;
      }
      if (bufferDone) {
        return Promise.resolve({ value: undefined as any as T, done: true });
      }
      const deferred = createDeferred<IteratorResult<T>>();
      pending.push(deferred);
      return deferred.promise;
    }

    function row(row: T) {
      const result = { value: row, done: false };
      if (!pending.length) {
        buffer.push(Promise.resolve(result));
        // if (buffer.length === FULL) request.pause();
      } else {
        pending.shift()!.resolve(result);
      }
    }

    function error(reason: any) {
      if (!pending.length) {
        buffer.push(Promise.reject(reason));
        // if (buffer.length === FULL) request.pause();
      } else {
        pending.shift()!.reject(reason);
      }
    }

    function done() {
      bufferDone = true;
      while (pending.length) {
        pending.shift()!.resolve({ value: undefined as any as T, done: true });
      }
    }
  }
}

(note that this code is not terribly well tested, sorry!)

@mariotacke
Copy link

@simonbuchan, do you have an example which does not use TypeScript?

@simonbuchan
Copy link

If you can't be bothered to strip the types yourself, you can paste it in the typescript playground

@ilusharulkov
Copy link

Hello!
I have the same problem and solve this with tedious pause/pesume stream request.
I've create small repo with example: tediousStreaming
There are 2 examples:

  1. raw tedious
  2. node-mssql
    When use node-mssql make sure this commit is in node_modules
    I think data flow (process_1, process_2) is easy to emplement using librabry such as https://highlandjs.org/

@dhensby
Copy link
Collaborator

dhensby commented May 7, 2019

pause/resume of streams is now added

@dhensby dhensby closed this as completed May 7, 2019
@hosseinGanjyar
Copy link
Author

hosseinGanjyar commented May 8, 2019

OH MY GOD...After about 2 years solved this issue.

Tha's one small step for a man, one giant leap for mankind
https://www.linkedin.com/feed/update/urn:li:activity:6531761519288418304/

@dhensby
Copy link
Collaborator

dhensby commented May 8, 2019

Yep - This was added in #775 and released in v5.0.0 - however, due to a bug in the tedious driver, it didn't actually work properly and is now in v6.0.0 without any bugs (see #838 and #832) 🎉

@mudouasenha
Copy link

Thanks!!!!!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants