Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPC实现原理 #3

Open
tsy77 opened this issue Mar 23, 2018 · 0 comments
Open

RPC实现原理 #3

tsy77 opened this issue Mar 23, 2018 · 0 comments

Comments

@tsy77
Copy link
Owner

tsy77 commented Mar 23, 2018

在前端业务越来越向后扩展的情况下,RPC的调用也变成了我们获取数据重要的一部分,所以本文主要介绍RPC及其基本原理,主要有以下三部分:

1.RPC client/server的搭建及使用

2.client/server是如何处理RPC请求和调用的,其中包括我们每次用thrift命令生成的service和types到底是干嘛的

3.RPC的原理总结

RPC client/server的搭建及使用

RPC client的搭建和使用大家可以看官网上的实现就好了

官网:https://thrift.apache.org/tutorial/nodejs

client和server的业务逻辑建议大家简单搭建一个client和server看一下,我这里已官网上的例子为例进行分析:

client:

1.createConnection

var connection = thrift.createConnection("localhost", 9090, {
  transport : transport,
  protocol : protocol
});

这里的协议主要包括JSON, XML, plain text, compact binary

2.createClient

var Calculator = require('./gen-nodejs/Calculator');
......
var client = thrift.createClient(Calculator, connection);

这里的createClient其实就是实例化了Calculator.Client对象,以下是代码:

new Calculator.Client(connection.transport, connection.protocol)

Calculator.Client构造函数如下:

var CalculatorClient = exports.Client = function(output, pClass) {
    this.output = output;
    this.pClass = pClass;
    this._seqid = 0;
    this._reqs = {};
};

大家首先关注下_seqid属性,该属性在rpc中很重要,试想一下,我们调用了某个方法,并传入callback,当结果返回时执callback(),那么程序异步获取result后,如何知道其对应callack呢,_seqid和_reqs出现了,RPC在this._reqs[seqid]中存储每个方法的callback,每个方法的seqid是在this._seqid基础上递增而来。

上面client对象中包含了output对象,对象中包含了thrift的接口,thrift协议规定了传输数据和内存中的变量之间的转换及其序列化、反序列化。这些接口可以在文档https://thrift.apache.org/docs/concepts中找到。下面只简单介绍下thrift的`writeMessageBegin`方便大家理解。

 public void writeMessageBegin(TMessage message) throws TException {  
    if (strictWrite_) {//判断是否强制写入版本号,是  
      int version = VERSION_1 | message.type;  
      writeI32(version);//写入版本号  
      writeString(message.name);//写入功能方法的名称  
      writeI32(message.seqid);//写入客户端的标识,这个标识是自动增加的  
    } else {//否  
      writeString(message.name);//写入功能方法的名称  
      writeByte(message.type);//写入类型  
      writeI32(message.seqid);//写入客户端的标识,这个标识是自动增加的  
    }  
}  

3.client[method](args, callback)

下面以caculate方法为例:

方法首先调用CalculatorClient.prototype.calculate(),其中主要代码如下:

this._reqs[this.seqid()] = callback;
this.send_calculate(logid, w);

this._reqs[this.seqid()]即咱们上面所说的存储callback的地方。

send_calculate()代码如下:

var output = new this.pClass(this.output);
output.writeMessageBegin('calculate', Thrift.MessageType.CALL, this.seqid());
var params = {
	logid: logid,
	w: w
};
var args = new Calculator_calculate_args(params);
args.write(output);
output.writeMessageEnd();
return this.output.flush();

首先调用output.writeMessageBegin()接口,表示消息的开始;
接着args.write(output),其实就是运用thrift接口,把方法所需参数传递过去,代码如下:

	output.writeStructBegin('Calculator_calculate_args');
	if (this.logid !== null && this.logid !== undefined) {
		output.writeFieldBegin('logid', Thrift.Type.I32, 1);
		output.writeI32(this.logid);
		output.writeFieldEnd();
	}
	if (this.w !== null && this.w !== undefined) {
		output.writeFieldBegin('w', Thrift.Type.STRUCT, 2);
		this.w.write(output);
		output.writeFieldEnd();
	}
	output.writeFieldStop();
	output.writeStructEnd();
	return;

最后调用flush清空缓冲区,数据发出

当结果返回后,RPC将调用CalculatorClient.prototype.recv_calculate(),代码如下:

var callback = this._reqs[rseqid] || function() {};
delete this._reqs[rseqid];
......
var result = new Calculator_calculate_result();
result.read(input);
input.readMessageEnd();

if (null !== result.ouch) {
	return callback(result.ouch);
}
if (null !== result.success) {
	return callback(null, result.success);
}
return callback('calculate failed: unknown result');

其实就是实例化一个Calculator_calculate_result对象,调用了thrift各种read接口,最后执行callback。

server:

1.createServer()

var Calculator = require("./gen-nodejs/Calculator");
.....
var server = thrift.createServer(Calculator, {
	calculate: function(logid, work, result) {
	    console.log("calculate(", logid, ",", work, ")");
	
	    var val = 0;
	    if (work.op == ttypes.Operation.ADD) {
	      val = work.num1 + work.num2;
	    } else if (work.op === ttypes.Operation.SUBTRACT) {
	      val = work.num1 - work.num2;
	    } else if (work.op === ttypes.Operation.MULTIPLY) {
	      val = work.num1 * work.num2;
	    } else if (work.op === ttypes.Operation.DIVIDE) {
	      if (work.num2 === 0) {
	        var x = new ttypes.InvalidOperation();
	        x.whatOp = work.op;
	        x.why = 'Cannot divide by 0';
	        result(x);
	        return;
	      }
	      val = work.num1 / work.num2;
	    } else {
	      var x = new ttypes.InvalidOperation();
	      x.whatOp = work.op;
	      x.why = 'Invalid operation';
	      result(x);
	      return;
	    }
	
	    var entry = new SharedStruct();
	    entry.key = logid;
	    entry.value = ""+val;
	    data[logid] = entry;
	
	    result(null, val);
	 },
 }

这里的createServer创建了一个tcp/tls的服务,监听的回调如下:

var self = this;
    stream.on('error', function(err) {
        self.emit('error', err);
    });
    stream.on('data', transport.receiver(function(transportWithData) {
      var input = new protocol(transportWithData);
      var output = new protocol(new transport(undefined, function(buf) {
        try {
            stream.write(buf);
        } catch (err) {
            self.emit('error', err);
            stream.end();
        }
      }));

      try {
        do {
          processor.process(input, output);
          transportWithData.commitPosition();
        } while (true);
      } catch (err) {
        ......
      }
    }));

    stream.on('end', function() {
      stream.end();
    });

这里的process是gen-nodejs/Calculator中export出来的processer的实例,input和output对象分别包含读和写的thrift接口。

2.processor.process()

CalculatorProcessor.prototype.process = function(input, output) {
  var r = input.readMessageBegin();
  if (this['process_' + r.fname]) {
    return this['process_' + r.fname].call(this, r.rseqid, input, output);
  } else {
    input.skip(Thrift.Type.STRUCT);
    input.readMessageEnd();
    var x = new Thrift.TApplicationException(Thrift.TApplicationExceptionType.UNKNOWN_METHOD, 'Unknown function ' + r.fname);
    output.writeMessageBegin(r.fname, Thrift.MessageType.EXCEPTION, r.rseqid);
    x.write(output);
    output.writeMessageEnd();
    output.flush();
  }
}

this['process_' + r.fname].call(this, r.rseqid, input, output),也就是说当client调用caculate方法时,会执行process_caculate方法。

process_caculate 方法中,首先实例化CalculatorProcessor.prototype.processcaculate,然后调用其read方法处理参数,接着调用我们注册在createServer()中的方法,最后调用output.writexxxoutput.flush()将结果编码并返回。

至此,我们缕顺了RPC调用和处理的流程,下面我们总结下我们上面一直Calculator.js中到底有什么?

1.服务中每个方法的参数对象(包含read、write方法)

2.服务中每个方法的调用返回结果对象(包含read、write方法)

3.client,其原型中包含所有同service中声明的方法名相同的方法(比如xxxmethod)、send_xxxmethod、recv_xxxmethod

4.processor,其原型中包含与service所有声明的方法一一对应的process方法(process_xxxmethod)

RPC调用原理

1.server端启动程序,侦听端口,实现提供给client调用的函数,保存在一个对象里。

2.client端启动程序,连接服务端,连接完成后发送describe命令,要求server返回它能提供调用的函数名。

3.server端接收到describe命令,把自己可供调用的函数名包装好发送出去

4.client端接收到server发送的函数名,注册到自己的对象里,给每个函数名包装一个方法,使本地调用这些函数时实际上是向server端发送请求:

5.client端调用server端的函数:

1) 给传入的callback函数生成一个唯一ID,称为callbackId,记录到client的一个对象里。

2) 包装好以下数据发送给server端:调用函数名,JSON序列化后的参数列表,callbackId

6.server端接收到上述信息,解析数据,对参数列表反序列化,根据函数名和参数调用函数。

7.函数运行完成后,把结果序列化,连同之前收到的callbackId发送回client端

8.client端接收到函数运行结果和callbackId,根据callbackId取出回调函数,把运行结果传入回调函数中执行。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant