Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Update Client to comply with 1.5 changes (#21)
Browse files Browse the repository at this point in the history
use transaction using session().transaction().write()/.read()
Grakn entry point renamed to GraknClient
client.close() is now needed to close channels
  • Loading branch information
Marco Scoppetta authored Mar 6, 2019
1 parent 406198f commit ecf2c85
Show file tree
Hide file tree
Showing 20 changed files with 173 additions and 151 deletions.
36 changes: 22 additions & 14 deletions src/Grakn.js → src/GraknClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
* under the License.
*/

const grpc = require("grpc");
const Session = require('./Session');
const KeyspaceService = require('./service/Keyspace/KeyspaceService');
const messages = require("../client-nodejs-proto/protocol/session/Session_pb");
const sessionServices = require("../client-nodejs-proto/protocol/session/Session_grpc_pb");
const keyspaceServices = require("../client-nodejs-proto/protocol/keyspace/Keyspace_grpc_pb");

/**
* Entry-point for Grakn client, it communicates with a running Grakn server using gRPC.
Expand All @@ -30,18 +33,32 @@ const messages = require("../client-nodejs-proto/protocol/session/Session_pb");
* @param {String} uri String containing host address and gRPC port of a running Grakn instance, e.g. "localhost:48555"
* @param {Object} credentials Optional object containing user credentials - only used when connecting to a KGMS instance
*/
function Grakn(uri, credentials) {
const keyspaceService = new KeyspaceService(uri, credentials);
function GraknClient(uri, credentials) {
// Open grpc node clients. A grpc node client is composed of stub + channel.
// When creating clients to the same uri, the channel will be automatically shared.
const sessionClient = new sessionServices.SessionServiceClient(uri, grpc.credentials.createInsecure());
const keyspaceClient = new keyspaceServices.KeyspaceServiceClient(uri, grpc.credentials.createInsecure());

this.session = (keyspace) => new Session(uri, keyspace, credentials);
const keyspaceService = new KeyspaceService(keyspaceClient, credentials);

this.keyspaces = ()=> ({
this.session = async (keyspace) => {
const session = new Session(sessionClient, credentials);
await session.open(keyspace);
return session;
};

this.keyspaces = () => ({
delete: (keyspace) => keyspaceService.delete(keyspace),
retrieve: () => keyspaceService.retrieve()
});

this.close = () => {
grpc.closeClient(sessionClient);
grpc.closeClient(keyspaceClient);
}
}

module.exports = Grakn
module.exports = GraknClient

/**
* List of available dataTypes for Grakn Attributes
Expand All @@ -55,12 +72,3 @@ module.exports.dataType = {
DOUBLE: messages.AttributeType.DATA_TYPE.DOUBLE,
DATE: messages.AttributeType.DATA_TYPE.DATE
};

/**
* List of available transaction types supported by Grakn
*/
module.exports.txType = {
READ: messages.Transaction.Type.READ,
WRITE: messages.Transaction.Type.WRITE,
BATCH: messages.Transaction.Type.BATCH
};
40 changes: 32 additions & 8 deletions src/Session.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,59 @@

const Transaction = require("./Transaction");
const SessionService = require("./service/Session/SessionService");
const messages = require("../client-nodejs-proto/protocol/session/Session_pb");

/**
* List of available transaction types supported by Grakn
*/
const txType = {
READ: messages.Transaction.Type.READ,
WRITE: messages.Transaction.Type.WRITE
};

/**
* Session object that can be used to:
* - create a new Transaction
*
* @param {String} uri String containing host and port of a valid Grakn server
* @param {String} keyspace Grakn keyspace to which this sessions should be bound to
* @param {Object} grpcClient grpc node client (session stub + channel)
* @param {Object} credentials Optional object containing user credentials - only used when connecting to a KGMS instance
*/
function Session(uri, keyspace, credentials) {
this.sessionService = new SessionService(uri, keyspace, credentials);
function Session(grpcClient, credentials) {
this.sessionService = new SessionService(grpcClient, credentials);
}

/**
* Open a new Session on the server side
* @param {String} keyspace Grakn keyspace to which this sessions should be bound to
*/
Session.prototype.open = function(keyspace){
return this.sessionService.open(keyspace);
}

/**
* Create new Transaction, which is already open and ready to be used.
* @param {Grakn.txType} txType Type of transaction to open READ, WRITE or BATCH
* @returns {Transaction}
*/
Session.prototype.transaction = async function (txType) {
const transactionService = await this.sessionService.transaction(txType).catch(e => { throw e; });
return new Transaction(transactionService);
Session.prototype.transaction = function () {
return {
read: async () => {
const transactionService = await this.sessionService.transaction(txType.READ).catch(e => { throw e; });
return new Transaction(transactionService);
},
write: async () => {
const transactionService = await this.sessionService.transaction(txType.WRITE).catch(e => { throw e; });
return new Transaction(transactionService);
}
}

}

/**
* Close stream connected to gRPC server
*/
Session.prototype.close = function close() {
this.sessionService.close();
return this.sessionService.close();
}

module.exports = Session;
16 changes: 4 additions & 12 deletions src/service/Keyspace/KeyspaceService.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,17 @@
* under the License.
*/

const grpc = require("grpc");
const messages = require("../../../client-nodejs-proto/protocol/keyspace/Keyspace_pb");
const service = require("../../../client-nodejs-proto/protocol/keyspace/Keyspace_grpc_pb");

function KeyspaceService(uri, credentials) {
this.uri = uri;
function KeyspaceService(grpcClient, credentials) {
this.credentials = credentials;
this.stub = new service.KeyspaceServiceClient(uri, grpc.credentials.createInsecure());
this.client = grpcClient;
}


KeyspaceService.prototype.retrieve = function () {
const retrieveRequest = new messages.Keyspace.Retrieve.Req();
return new Promise((resolve, reject) => {
this.stub.retrieve(retrieveRequest, (err, resp) => {
this.client.retrieve(retrieveRequest, (err, resp) => {
if (err) reject(err);
else resolve(resp.getNamesList());
});
Expand All @@ -42,15 +38,11 @@ KeyspaceService.prototype.delete = function (keyspace) {
const deleteRequest = new messages.Keyspace.Delete.Req();
deleteRequest.setName(keyspace);
return new Promise((resolve, reject) => {
this.stub.delete(deleteRequest, (err) => {
this.client.delete(deleteRequest, (err) => {
if (err) reject(err);
else resolve();
});
});
}

KeyspaceService.prototype.close = function close() {
grpc.closeClient(this.stub);
}

module.exports = KeyspaceService;
45 changes: 23 additions & 22 deletions src/service/Session/SessionService.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,53 +17,54 @@
* under the License.
*/

const grpc = require("grpc");
const services = require("../../../client-nodejs-proto/protocol/session/Session_grpc_pb");
const TxService = require("./TransactionService");
const RequestBuilder = require("./util/RequestBuilder");


/**
* This creates a new connection to the server over HTTP2,
* the connection will contain all the Transaction streams
*/
function SessionService(uri, keyspace, credentials) {
this.keyspace = keyspace;
function SessionService(grpcClient, credentials) {
this.credentials = credentials;
this.stub = new services.SessionServiceClient(uri, grpc.credentials.createInsecure());
this.client = grpcClient;
}

function wrapInPromise(self, fn, requestMessage){
return new Promise((resolve, reject) => {
fn.call(self, requestMessage, (error, response) => {
if (error) { reject(error); }
resolve(response);
});
});
/**
* This sends an open Session request and retrieves the sessionId that will be needed
* to open a Transaction.
*/
SessionService.prototype.open = async function open(keyspace){
const openResponse = await wrapInPromise(this.client, this.client.open, RequestBuilder.openSession(keyspace));
this.sessionId = openResponse.getSessionid();
}

/**
* This method creates a new Duplex Stream (this.stub.transaction()) over which gRPC will communicate when
* This method creates a new Duplex Stream (this.client.transaction()) over which gRPC will communicate when
* exchanging messages related to the Transaction service.
* It also sends an Open request before returning the TransactionService
* @param {Grakn.txType} txType type of transaction to be open
*/
SessionService.prototype.transaction = async function create(txType) {
if (this.sessionId === undefined) {
this.sessionId = (await wrapInPromise(this.stub, this.stub.open, RequestBuilder.openSession(this.keyspace))).getSessionid();
}

const txService = new TxService(this.stub.transaction());
const txService = new TxService(this.client.transaction());
await txService.openTx(this.sessionId, txType, this.credentials);
return txService;
}

/**
* Closes connection to the server
*/
SessionService.prototype.close = async function close() {
await wrapInPromise(this.stub, this.stub.close, RequestBuilder.closeSession(this.sessionId));
grpc.closeClient(this.stub);
SessionService.prototype.close = function close() {
return wrapInPromise(this.client, this.client.close, RequestBuilder.closeSession(this.sessionId));
}


function wrapInPromise(self, fn, requestMessage){
return new Promise((resolve, reject) => {
fn.call(self, requestMessage, (error, response) => {
if (error) { reject(error); }
resolve(response);
});
});
}

module.exports = SessionService;
4 changes: 2 additions & 2 deletions tests/service/keyspace/Keyspace.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ afterAll(async () => {
describe("Keyspace methods", () => {

test("retrieve and delete", async () => {
const session = graknClient.session("retrievetest");
const tx = await session.transaction(env.txType().WRITE);
const session = await graknClient.session("retrievetest");
const tx = await session.transaction().write();
tx.close();
const keyspaces = await graknClient.keyspaces().retrieve();
expect(keyspaces.length).toBeGreaterThan(0);
Expand Down
6 changes: 3 additions & 3 deletions tests/service/session/transaction/Attribute.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ const env = require('../../../support/GraknTestEnvironment');
let session;
let tx;

beforeAll(() => {
session = env.session();
beforeAll(async () => {
session = await env.session();
});

afterAll(async () => {
await env.tearDown();
});

beforeEach(async () => {
tx = await session.transaction(env.txType().WRITE);
tx = await session.transaction().write();
})

afterEach(() => {
Expand Down
6 changes: 3 additions & 3 deletions tests/service/session/transaction/AttributeType.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ const env = require('../../../support/GraknTestEnvironment');
let session;
let tx;

beforeAll(() => {
session = env.session();
beforeAll(async () => {
session = await env.session();
});

afterAll(async () => {
await env.tearDown();
});

beforeEach(async () => {
tx = await session.transaction(env.txType().WRITE);
tx = await session.transaction().write();
})

afterEach(() => {
Expand Down
30 changes: 13 additions & 17 deletions tests/service/session/transaction/CommitTx.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ const env = require('../../../support/GraknTestEnvironment');
let graknClient;
let session;

beforeAll(() => {
beforeAll(async () => {
graknClient = env.graknClient;
session = graknClient.session("testcommit");
session = await graknClient.session("testcommit");
});

afterAll(async () => {
Expand All @@ -35,38 +35,34 @@ afterAll(async () => {

describe('Integration test', () => {

test('Open tx with invalid parameter throws error', async () => {
await expect(session.transaction('invalidTxType')).rejects.toThrowError();
});

test("Tx open in READ mode should throw when trying to define", async () => {
const tx = await session.transaction(env.txType().READ);
const tx = await session.transaction().read();
await expect(tx.query("define person sub entity;")).rejects.toThrowError();
tx.close();
});

test("If tx does not commit, different Tx won't see changes", async () => {
const tx = await session.transaction(env.txType().WRITE);
const tx = await session.transaction().write();
await tx.query("define catwoman sub entity;");
tx.close()
const newTx = await session.transaction(env.txType().WRITE);
const newTx = await session.transaction().write();
await expect(newTx.query("match $x sub catwoman; get;")).rejects.toThrowError(); // catwoman label does not exist in the graph
newTx.close();
});

test("When tx commit, different tx will see changes", async () => {
const tx = await session.transaction(env.txType().WRITE);
const tx = await session.transaction().write();
await tx.query("define superman sub entity;");
await tx.commit();
const newTx = await session.transaction(env.txType().WRITE);
const newTx = await session.transaction().write();
const superman = await newTx.getSchemaConcept('superman');
expect(superman.isSchemaConcept()).toBeTruthy();
newTx.close();
});

test("explanation and default of infer is true", async () => {
const localSession = graknClient.session("gene");
const tx = await localSession.transaction(env.txType().WRITE);
const localSession = await graknClient.session("gene");
const tx = await localSession.transaction().write();
const iterator = await tx.query("match $x isa cousins; get;"); // TODO: put back offset 0; limit 1;
const answer = await iterator.next();
expect(answer.map().size).toBe(1);
Expand All @@ -77,8 +73,8 @@ describe('Integration test', () => {
});

test("explanation with join explanation", async () => {
const localSession = graknClient.session("gene");
const tx = await localSession.transaction(env.txType().WRITE);
const localSession = await graknClient.session("gene");
const tx = await localSession.transaction().write();
const iterator = await tx.query(`match ($x, $y) isa marriage; ($y, $z) isa marriage;
$x != $z; get;`);
const answers = await iterator.collect();
Expand All @@ -91,8 +87,8 @@ describe('Integration test', () => {
});

test("no results with infer false", async () => {
const localSession = graknClient.session("gene");
const tx = await localSession.transaction(env.txType().WRITE);
const localSession = await graknClient.session("gene");
const tx = await localSession.transaction().write();
const iterator = await tx.query("match $x isa cousins; get;", { infer: false }); // TODO: put back offset 0; limit 1;
const answer = await iterator.next();
expect(answer).toBeNull();
Expand Down
Loading

0 comments on commit ecf2c85

Please sign in to comment.