diff --git a/examples/streams.js b/examples/streams.js index fdae10b..ac8e630 100644 --- a/examples/streams.js +++ b/examples/streams.js @@ -1,54 +1,77 @@ -var Emitter = require('events').EventEmitter; +var http = require('http'); var co = require('..'); +var url = process.argv[2] || 'http://nodejs.org'; co(function *(){ - var res = yield get('http://google.com'); - console.log('-> %s', res.status); + var res = yield get(url); + console.log('-> %s', res.statusCode); var buf; - while (buf = yield res.read()) { - console.log(buf.toString()); + var total = 0; + while (buf = yield read(res)) { + total += buf.length; + console.log('\nread %d bytes (%d total):\n%j', buf.length, total, buf.toString()); } console.log('done'); -}) - -// I couldn't get streams2 to work... so, here's a fake request :) +}); function get(url) { console.log('GET %s', url); return function(done){ - done(null, new Response); - } + var req = http.get(url); + req.once('response', function(res) { + done(null, res); + }); + req.once('error', function(err) { + done(err); + }); + }; } -function Response() { - var self = this; - this.status = 200; +function read(res) { + return function(done){ - var id = setInterval(function(){ - self.emit('data', new Buffer('hello')); - }, 10); + function onreadable() { + // got a "readable" event, try to read a Buffer + cleanup(); + check(); + } - setTimeout(function(){ - clearInterval(id); - self.emit('end'); - }, 200); -} + function onend() { + // got an "end" event, send `null` as the value to signify "EOS" + cleanup(); + done(null, null); + } -Response.prototype.__proto__ = Emitter.prototype; + function onerror(err) { + // got an "error" event while reading, pass it upstream... + cleanup(); + done(err); + } -Response.prototype.read = function(){ - var self = this; - return function(done){ - // push kinda sucks for this... we need to - // handle whichever comes first with this hack - self.once('data', function(buf){ - self.removeListener('end', done); - done(null, buf); - }); + function cleanup() { + res.removeListener('readable', onreadable); + res.removeListener('end', onend); + res.removeListener('error', onerror); + } - self.on('end', done); - } -}; + function check() { + var buf = res.read(); + if (buf) { + // got a Buffer, send it! + done(null, buf); + } else { + // otherwise, wait for any of a "readable", "end", or "error" event... + // wow, streams2 kinda sucks, doesn't it? + res.on('readable', onreadable); + res.on('end', onend); + res.on('error', onerror); + } + } + + // kick things off... + check(); + }; +}