Skip to content

Commit

Permalink
Merge pull request #121 from stanley-cheung/master
Browse files Browse the repository at this point in the history
Implement the grpc-web protocol
  • Loading branch information
stanley-cheung authored Dec 14, 2017
2 parents 46eec25 + 98a74df commit b1b345a
Show file tree
Hide file tree
Showing 8 changed files with 770 additions and 102 deletions.
101 changes: 5 additions & 96 deletions javascript/net/grpc/web/clientreadablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,94 +12,15 @@
goog.provide('grpc.web.ClientReadableStream');


goog.require('goog.net.XhrIo');
goog.require('goog.net.streams.NodeReadableStream');
goog.require('goog.net.streams.createXhrNodeReadableStream');
goog.require('grpc.web.Status');



/**
* A stream that the client can read from. Used for calls that are streaming
* from the server side.
*
* @template RESPONSE
* @constructor
* @final
* @param {!goog.net.XhrIo} xhr The XhrIo object
* @param {function(?):!RESPONSE} responseDeserializeFn
* The deserialize function for the proto
* @param {function(?):!grpc.web.Status}
* rpcStatusParseFn A function to parse the Rpc status response
* @interface
*/
grpc.web.ClientReadableStream = function(
xhr, responseDeserializeFn, rpcStatusParseFn) {
/**
* @private
* @type {?goog.net.streams.NodeReadableStream} The XHR Node Readable
* Stream
*/
this.xhrNodeReadableStream_ =
goog.net.streams.createXhrNodeReadableStream(xhr);

/**
* @private
* @type {function(?):!RESPONSE} The deserialize function for the proto
*/
this.responseDeserializeFn_ = responseDeserializeFn;

/**
* @private
* @type {!goog.net.XhrIo} The XhrIo object
*/
this.xhr_ = xhr;

/**
* @private
* @type {function(!RESPONSE)|null} The data callback
*/
this.onDataCallback_ = null;

/**
* @private
* @type {function(!grpc.web.Status)|null}
* The status callback
*/
this.onStatusCallback_ = null;

/**
* @private
* @type {function(...):?|null}
* The stream end callback
*/
this.onEndCallback_ = null;

/**
* @private
* @type {function(?):!grpc.web.Status}
* A function to parse the Rpc Status response
*/
this.rpcStatusParseFn_ = rpcStatusParseFn;


// Add the callback to the underlying stream
var self = this;
this.xhrNodeReadableStream_.on('data', function(data) {
if ('1' in data && self.onDataCallback_) {
var response = self.responseDeserializeFn_(data['1']);
self.onDataCallback_(response);
}
if ('2' in data && self.onStatusCallback_) {
var status = self.rpcStatusParseFn_(data['2']);
self.onStatusCallback_(status);
}
});
this.xhrNodeReadableStream_.on('end', function() {
if (self.onEndCallback_) {
self.onEndCallback_();
}
});
};
grpc.web.ClientReadableStream = function() {};


/**
Expand All @@ -110,23 +31,11 @@ grpc.web.ClientReadableStream = function(
* an optional input object
* @return {!grpc.web.ClientReadableStream} this object
*/
grpc.web.ClientReadableStream.prototype.on = function(
eventType, callback) {
// TODO(stanleycheung): change eventType to @enum type
if (eventType == 'data') {
this.onDataCallback_ = callback;
} else if (eventType == 'status') {
this.onStatusCallback_ = callback;
} else if (eventType == 'end') {
this.onEndCallback_ = callback;
}
return this;
};
grpc.web.ClientReadableStream.prototype.on = goog.abstractMethod;



/**
* Close the stream.
*/
grpc.web.ClientReadableStream.prototype.cancel = function() {
this.xhr_.abort();
};
grpc.web.ClientReadableStream.prototype.cancel = goog.abstractMethod;
7 changes: 5 additions & 2 deletions javascript/net/grpc/web/gatewayclientbase.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ goog.require('grpc.web.AbstractClientBase');
goog.require('grpc.web.ClientReadableStream');
goog.require('grpc.web.Status');
goog.require('grpc.web.StatusCode');
goog.require('grpc.web.StreamBodyClientReadableStream');
goog.require('proto.google.rpc.Status');
goog.require('proto.grpc.gateway.Pair');

Expand Down Expand Up @@ -109,8 +110,10 @@ grpc.web.GatewayClientBase.prototype.newXhr_ = function() {
*/
grpc.web.GatewayClientBase.prototype.createClientReadableStream_ = function(
xhr, responseDeserializeFn) {
return new grpc.web.ClientReadableStream(xhr, responseDeserializeFn,
grpc.web.GatewayClientBase.parseRpcStatus_);
var stream = new grpc.web.StreamBodyClientReadableStream(xhr);
stream.setResponseDeserializeFn(responseDeserializeFn);
stream.setRpcStatusParseFn(grpc.web.GatewayClientBase.parseRpcStatus_);
return stream;
};


Expand Down
117 changes: 117 additions & 0 deletions javascript/net/grpc/web/grpcwebclientbase.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* @fileoverview gRPC browser client library.
*
* Base class for gRPC Web JS clients using the application/grpc-web wire
* format
*
* @author [email protected] (Stanley Cheung)
*/
goog.provide('grpc.web.GrpcWebClientBase');


goog.require('goog.crypt.base64');
goog.require('goog.net.XhrIo');
goog.require('grpc.web.AbstractClientBase');
goog.require('grpc.web.GrpcWebClientReadableStream');
goog.require('grpc.web.StatusCode');


/**
* Base class for gRPC web client using the application/grpc-web wire format
* @param {?Object=} opt_options
* @constructor
* @implements {grpc.web.AbstractClientBase}
*/
grpc.web.GrpcWebClientBase = function(opt_options) {
};


/**
* @override
*/
grpc.web.GrpcWebClientBase.prototype.rpcCall = function(
method, request, metadata, methodInfo, callback) {
var xhr = this.newXhr_();
var serialized = methodInfo.requestSerializeFn(request);
xhr.headers.addAll(metadata);

var stream = new grpc.web.GrpcWebClientReadableStream(xhr);
stream.setResponseDeserializeFn(methodInfo.responseDeserializeFn);

stream.on('data', function(response) {
callback(null, response);
});

stream.on('status', function(status) {
if (status.code != grpc.web.StatusCode.OK) {
callback({
'code': status.code,
'message': status.details
}, null);
}
});

xhr.headers.set('Content-Type', 'application/grpc-web-text');
xhr.headers.set('Accept', 'application/grpc-web-text');

var payload = this.encodeRequest_(serialized);
payload = goog.crypt.base64.encodeByteArray(payload);
xhr.send(method, 'POST', payload);
return;
};


/**
* @override
*/
grpc.web.GrpcWebClientBase.prototype.serverStreaming = function(
method, request, metadata, methodInfo) {
var xhr = this.newXhr_();
var serialized = methodInfo.requestSerializeFn(request);
xhr.headers.addAll(metadata);

var stream = new grpc.web.GrpcWebClientReadableStream(xhr);
stream.setResponseDeserializeFn(methodInfo.responseDeserializeFn);

xhr.headers.set('Content-Type', 'application/grpc-web-text');
xhr.headers.set('Accept', 'application/grpc-web-text');

var payload = this.encodeRequest_(serialized);
payload = goog.crypt.base64.encodeByteArray(payload);
xhr.send(method, 'POST', payload);

return stream;
};


/**
* Create a new XhrIo object
*
* @private
* @return {!goog.net.XhrIo} The created XhrIo object
*/
grpc.web.GrpcWebClientBase.prototype.newXhr_ = function() {
return new goog.net.XhrIo();
};



/**
* Encode the grpc-web request
*
* @private
* @param {!Uint8Array} serialized The serialized proto payload
* @return {!Uint8Array} The application/grpc-web padded request
*/
grpc.web.GrpcWebClientBase.prototype.encodeRequest_ = function(serialized) {
var len = serialized.length;
var bytesArray = [0, 0, 0, 0];
var payload = new Uint8Array(5 + len);
for (var i = 3; i >= 0; i--) {
bytesArray[i] = (len % 256);
len = len >>> 8;
}
payload.set(new Uint8Array(bytesArray), 1);
payload.set(serialized, 5);
return payload;
};
Loading

0 comments on commit b1b345a

Please sign in to comment.