Skip to content

Commit

Permalink
Merge pull request #49 from mcollina/limit
Browse files Browse the repository at this point in the history
Limit & Offset for get and search
  • Loading branch information
mcollina committed Dec 23, 2013
2 parents 15f2fd5 + e82081f commit 1613d2f
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 16 deletions.
39 changes: 38 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,18 @@ db.put(triple, function() {
});
```

#### Limit and Offset

It is possible to implement pagination of get results by using
`'offset'` and `'limit'`, like so:

```
db.get({ subject: "a", limit: 4, offset: 2}, function(err, list) {
expect(list).to.eql([triple]);
done();
});
```

### Multiple Puts

__LevelGraph__ also supports adding putting multiple triples:
Expand All @@ -103,7 +115,7 @@ db.put([triple1, triple2], function(err) {
});
```

### searches
### Searches

__LevelGraph__ also supports searches:
```
Expand Down Expand Up @@ -154,6 +166,8 @@ db.put([{
});
```

#### Search Streams

It also support a similar API without streams:
```
db.put([{
Expand All @@ -179,6 +193,8 @@ db.put([{
});
```

#### Triple Generation

It also allows to generate a stream of triples, instead of a solution:
```
db.search([{
Expand Down Expand Up @@ -209,6 +225,27 @@ It also allows to generate a stream of triples, instead of a solution:
});
```

#### Limit and Offset

It is possible to implement pagination of search results by using
`'offset'` and `'limit'`, like so:

```
db.search([{
subject: db.v("a"),
predicate: "friend",
object: db.v("x")
}, {
subject: db.v("x"),
predicate: "friend",
object: db.v("y")
}], { limit: 4, offset: 2 }, function(err, list) {
expect(list).to.eql([triple]);
done();
});
```

### Deleting

Deleting is easy too:
Expand Down
26 changes: 21 additions & 5 deletions lib/joinstream.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@ function JoinStream(options) {
this.matcher = matcher(options.triple);
this.mask = queryMask(options.triple);
this.maskUpdater = maskUpdater(options.triple);
this.limit = options.limit;
this._limitCounter = 0;
this.offset = options.offset;
this._offsetCounter = 0;
this.db = options.db;
this._index = options.index;
this._ended = false;

var that = this;
this.once('pipe', function(source) {
Expand All @@ -37,18 +42,29 @@ JoinStream.prototype = Object.create(
);

JoinStream.prototype._transform = function(solution, encoding, done) {
if (this._ended) {
return done();
}

var that = this;

var newMask = this.maskUpdater(solution, this.mask);
var readStream = this.db.getStream(newMask, { index: this._index });
var that = this
, newMask = this.maskUpdater(solution, this.mask)
, readStream = this.db.getStream(newMask, { index: this._index });

readStream.on('data', function(triple) {
var newsolution = that.matcher(solution, triple);

if (newsolution) {
if (that._ended || !newsolution) {
return;
}

if (!that.offset || ++that._offsetCounter > that.offset) {
that.push(newsolution);
}

if (that.limit && ++that._limitCounter === that.limit) {
readStream.destroy();
that._ended = true;
}
});

readStream.on('error', function(err) {
Expand Down
15 changes: 11 additions & 4 deletions lib/keyfilterstream.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ function KeyFilterStream(options) {
});

this.filter = options.filter;
this.offset = options.offset;

this._offsetCounter = 0;

this._destroyed = false;
}
Expand All @@ -29,19 +32,23 @@ KeyFilterStream.prototype = Object.create(
);

KeyFilterStream.prototype._transform = function(data, encoding, done) {

data.value = JSON.parse(data.value);

if (this._destroyed || data.key.indexOf(this.start) < 0) {
this.source.destroy();
} else if (!this.filter || this.filter(data.value)) {
if (data.key.indexOf(this.start) < 0) {
this.destroy();
} else if ((!this.offset || ++this._offsetCounter > this.offset) &&
(!this.filter || this.filter(data.value))) {
this.push(data.value);
}

done();
};

KeyFilterStream.prototype.destroy = function() {
if (!this._destroyed) {
this.source.destroy();
this.push(null);
}
this._destroyed = true;
};

Expand Down
10 changes: 8 additions & 2 deletions lib/levelgraph.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ var joinDefaults = {
};

module.exports = function levelgraph(leveldb, options, readyCallback) {
function initDB(options, leveldb){
}

var name = leveldb
, db
Expand Down Expand Up @@ -134,6 +132,14 @@ searchStream = function(db, options) {
}));
}

if (options.limit) {
streams[streams.length - 1].limit = options.limit;
}

if (options.offset) {
streams[streams.length - 1].offset = options.offset;
}

if (options.materialized) {
streams.push(materializer({
pattern: options.materialized
Expand Down
15 changes: 13 additions & 2 deletions lib/sortjoinstream.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ function SortJoinStream(options) {
this._previousTriple = null;
this._lastDone = null;
this._restart();
this.limit = options.limit;
this._limitCounter = 0;
this.offset = options.offset;
this._offsetCounter = 0;
}

SortJoinStream.prototype = Object.create(
Expand Down Expand Up @@ -138,8 +142,15 @@ SortJoinStream.prototype._transform = function(solution, encoding, done) {
key = genKey(that.index, materializer(that.triple, solution)) + '::\xff';

if (newsolution) {
that.push(newsolution);
that._nextTriple(doRead, true);
if (!that.offset || ++that._offsetCounter > that.offset) {
that.push(newsolution);
}

if (that.limit && ++that._limitCounter === that.limit) {
that._readStream.destroy();
} else {
that._nextTriple(doRead, true);
}
return;
}

Expand Down
4 changes: 3 additions & 1 deletion lib/utilities.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,16 @@ function typesFromPattern(pattern) {

function createQuery(pattern, options) {
var types = typesFromPattern(pattern)
, preferiteIndex = (options || {}).index
, preferiteIndex = options && options.index
, index = findIndex(types, preferiteIndex)
, key = genKey(index, pattern)
, query = {
start: key
, end: key + '\xff'
, fillCache: true
, filter: pattern.filter
, limit: pattern.limit > 0 && pattern.limit
, offset: pattern.offset
};

return query;
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
"xtend": "~2.1.1",
"callback-stream": "~1.0.0",
"async": "~0.2.9",
"readable-stream": ">= 1.0.2 < 2.0.0",
"readable-stream": "~ 1.0.2",
"level-writestream": "~0.1.1",
"levelup": ">= 0.10.0 < 0.19.0",
"leveldown": "> 0.7.0 < 0.11.0",
Expand Down
48 changes: 48 additions & 0 deletions test/abstract_join_algorithm.js
Original file line number Diff line number Diff line change
Expand Up @@ -299,4 +299,52 @@ module.exports = function(joinAlgorithm) {
done();
});
});

it('should return only one solution with limit 1', function(done) {
db.search([{
subject: db.v('x'),
predicate: 'friend',
object: 'marco'
}, {
subject: db.v('x'),
predicate: 'friend',
object: 'matteo'
}], { limit: 1 }, function(err, results) {
expect(results).to.have.property('length', 1);
expect(results[0]).to.have.property('x', 'daniele');
done();
});
});

it('should return only one solution with limit 1 (bis)', function(done) {
db.search([{
subject: 'lucio',
predicate: 'friend',
object: db.v('x')
}, {
subject: 'daniele',
predicate: 'friend',
object: db.v('x')
}], { limit: 1 }, function(err, results) {
expect(results).to.have.property('length', 1);
expect(results[0]).to.have.property('x', 'marco');
done();
});
});

it('should return skip the first solution with offset 1', function(done) {
db.search([{
subject: db.v('x'),
predicate: 'friend',
object: 'marco'
}, {
subject: db.v('x'),
predicate: 'friend',
object: 'matteo'
}], { offset: 1 }, function(err, results) {
expect(results).to.have.property('length', 1);
expect(results[0]).to.have.property('x', 'lucio');
done();
});
});
};
83 changes: 83 additions & 0 deletions test/triple_store_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,34 @@ describe('a basic triple store', function() {
});
stream.on('end', done);
});

it('should get the triple if limit 1 is used', function(done) {
db.get({ limit: 1 }, function(err, list) {
expect(list).to.eql([triple]);
done();
});
});

it('should get the triple if limit 0 is used', function(done) {
db.get({ limit: 0 }, function(err, list) {
expect(list).to.eql([triple]);
done();
});
});

it('should get the triple if offset 0 is used', function(done) {
db.get({ offset: 0 }, function(err, list) {
expect(list).to.eql([triple]);
done();
});
});

it('should not get the triple if offset 1 is used', function(done) {
db.get({ offset: 1 }, function(err, list) {
expect(list).to.eql([]);
done();
});
});
});

it('should put an array of triples', function(done) {
Expand Down Expand Up @@ -154,6 +182,61 @@ describe('a basic triple store', function() {

stream.on('end', done);
});

it('should return only one triple with limit 1', function(done) {
db.get({ predicate: 'b', limit: 1 }, function(err, list) {
expect(list).to.eql([triple1]);
done();
});
});

it('should return two triples with limit 2', function(done) {
db.get({ predicate: 'b', limit: 2 }, function(err, list) {
expect(list).to.eql([triple1, triple2]);
done();
});
});

it('should return three triples with limit 3', function(done) {
db.get({ predicate: 'b', limit: 3 }, function(err, list) {
expect(list).to.eql([triple1, triple2]);
done();
});
});

it('should support limit over streams', function(done) {
var triples = [triple1]
, stream = db.getStream({ predicate: 'b', limit: 1 });
stream.on('data', function(data) {
expect(data).to.eql(triples.shift());
});

stream.on('end', done);
});

it('should return only one triple with offset 1', function(done) {
db.get({ predicate: 'b', offset: 1 }, function(err, list) {
expect(list).to.eql([triple2]);
done();
});
});

it('should return only no triples with offset 2', function(done) {
db.get({ predicate: 'b', offset: 2 }, function(err, list) {
expect(list).to.eql([]);
done();
});
});

it('should support offset over streams', function(done) {
var triples = [triple2]
, stream = db.getStream({ predicate: 'b', offset: 1 });
stream.on('data', function(data) {
expect(data).to.eql(triples.shift());
});

stream.on('end', done);
});
});

it('should put triples using a stream', function(done) {
Expand Down

0 comments on commit 1613d2f

Please sign in to comment.