Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] 订阅模型 #1468

Closed
popomore opened this issue Sep 27, 2017 · 22 comments
Closed

[RFC] 订阅模型 #1468

popomore opened this issue Sep 27, 2017 · 22 comments
Assignees

Comments

@popomore
Copy link
Member

一个服务接受请求一般分为两种

  1. 常见的是请求响应模型,不管是 http 还是 tcp,接受到一个请求经过处理后马上响应,客户端会等待响应。
  2. 另一种是异步响应模型,一个请求发送过来立即响应后再处理,处理完成后根据情况是否返回回执。

这个 RFC 主要讨论第二种模型:订阅模型,讨论如何统一这种编码风格。

订阅模型常见的是消息中间件,如社区已经实现的 kafka,虽然还没有 consumer 的例子,我可以简单写一下在 egg 使用的方式

// app.js
module.exports = app => {
  const consumer = new Consumer({});
  consumer.on('message', function (message) {
    co(function() {
      const ctx = app.createAnonymousContext();
      yield ctx.service.kafka.consume(message);
    });
  });
};

实现起来会显得非常繁琐,如果变为真正的企业应用,事件包 co 还会有潜在风险,比如这里 co 没有 catch,而且是否需要返回回执?而且代码没处放都写在 app.js 非常不友好。

设计

订阅模型是一种模式,所以 Egg 会提供一个基类和特定写法。

  1. 提供 Subscription 基类,这个类继承 BaseContextClass。

    const Subscription = require('egg').Subscription;
  2. 所有的子类必须实现 subscribe 方法

    const Subscription = require('egg').Subscription;
    class MessageConsumer extends Subscription {
       async subscribe(message) {
          await this.ctx.service.kafka.consume(message);
       }
    }
    module.exports = MessageConsumer;
  3. 这种模式不会约定目录,但是建议实现方实现 loader,比如 kafka 插件自动加载 app/kafka/message_consumer.js 文件中的类。

  4. 实现放可以继承 Subscription 覆盖基类

Schedule

现在的 egg-schedule 也非常类似订阅模型,如果改造可以改为

// {app_root}/app/schedule/cleandb.js
const Subscription = require('egg').Subscription;

class CleanDB extends Subscription {
  static get schedule() {
    type: 'worker',
    cron: '0 0 3 * * *',
  }

  * subscribe() {
    yield this.ctx.service.db.cleandb();
  }
}

module.exports = CleanDB;
@atian25 atian25 changed the title 订阅模型 [RFC] 订阅模型 Sep 27, 2017
@popomore popomore self-assigned this Sep 27, 2017
popomore added a commit that referenced this issue Sep 27, 2017
@popomore popomore mentioned this issue Sep 27, 2017
4 tasks
@atian25
Copy link
Member

atian25 commented Sep 27, 2017

哈,前几天刚想重构下 egg-schedule,写了一半

@popomore
Copy link
Member Author

这个还等着重构 eggjs/egg-logrotator#11

@atian25
Copy link
Member

atian25 commented Sep 27, 2017

egg-schedule

  • app.schedule.use(CustomStrategy) 替换掉现在的 agent[SCHEDULE_HANDLER]
  • app.schedule.start(job / jobId),启动一个任务
  • app.schedule.stop(job / jobId),停止一个任务
  • app.messenger.on('schedule', function(job, stats) {}) 任务开始执行的消息
class CleanDB extends Subscription {
  static get schedule() {
    type: 'worker',
    cron: '0 0 3 * * *',
  }

  * subscribe() {
    yield this.ctx.service.db.cleandb();
  }
  
  * afterEach() {}
  * after() {}
}
``

@linxuanwei
Copy link

定时任务用agenda不是更好么

@atian25
Copy link
Member

atian25 commented Sep 27, 2017 via email

@popomore
Copy link
Member Author

是的,egg-schedule 本身是单机的,集群的扩展 egg-schedule 就可以了。

@popomore
Copy link
Member Author

@ntfs32

@luckydrq
Copy link

还没有领会到这样设计的好处,特别对egg-schedule来说,因为没有服务端,不需要发送回执吧?

@ghost
Copy link

ghost commented Oct 11, 2017

请问 egg-schedule 现在支持 stop 某个任务否?

popomore added a commit to eggjs/schedule that referenced this issue Oct 12, 2017
schedule can extend from Subscription which is context class

Ref eggjs/egg#1468
atian25 pushed a commit to eggjs/schedule that referenced this issue Oct 13, 2017
* feat: support class

schedule can extend from Subscription which is context class

Ref eggjs/egg#1468
@fengmk2
Copy link
Member

fengmk2 commented Oct 16, 2017

cc @gxcsoccer ,强制 done 的设计问题也一并看看。

@gxcsoccer
Copy link
Contributor

cc @gxcsoccer ,强制 done 的设计问题也一并看看。

@JacksonTian 的意思要业务主动确认下下消费成功,他举的例子如下业务主动 try catch 了,但是忘记 rethrow,消息就会被丢掉

consumer.subscribe(config.topic, '*', function* (msg) {
  console.log(`receive message, msgId: ${msg.msgId}, body: ${msg.body.toString()}`)
  try {
    // 业务处理抛异常
  } catch(err) {
    // 这里没有 retry,消息就丢了
  }
});

@popomore
Copy link
Member Author

如果没抛异常就认为是成功了,如果抛异常了就失败了。

@dead-horse
Copy link
Member

如果没抛异常就认为是成功了,如果抛异常了就失败了。

+1,我觉得可以 try catch 忘记 re-throw 也可以忘记调用 done,从逻辑上增加一个 done 没道理。

@JacksonTian
Copy link
Contributor

如果用户忘记调用 done,那是用户的问题,会造成消息堆积,使得他能够重新审视代码的问题。

如果默认 done,失败会很容易造成静默,消息就被标记为消费了(但实际上没有)。

前一种会把问题暴露出来,后一种,就。

@popomore
Copy link
Member Author

这个文档写清楚就好了,一般都要给回执的,不然调度方有些还有可能重发。

@fsx950223
Copy link

可以参考rxjs

popomore added a commit that referenced this issue Oct 19, 2017
popomore added a commit that referenced this issue Oct 19, 2017
popomore added a commit that referenced this issue Oct 20, 2017
popomore pushed a commit that referenced this issue Oct 20, 2017
feat: add Subscription (#1469)

Ref #1468
@tsui66
Copy link

tsui66 commented Mar 24, 2018

能否帮忙构思下egg rabbitmq 基于订阅模型如何编写?

@beiyu98
Copy link

beiyu98 commented Jun 1, 2018

@popomore 是的,egg-schedule 本身是单机的,集群的扩展 egg-schedule 就可以了。
是否可以给个集群拓展的example呢?或者思路呢?

@beiyu98
Copy link

beiyu98 commented Jun 1, 2018

@atian25 这个要依赖消息服务的调度吧?目前遇到的情况是,同一份代码跑了部署在多台服务上,并订阅redis 一个db的key失效事件,收到失效事件后,只要一台服务器的一个worker执行,然后根据最新的数据再向redis插入一个key并设置失效事件,如此循环。目前的方式是另写一个项目单机起了一个服务来处理。

@atian25
Copy link
Member

atian25 commented Jun 1, 2018

@brucecodezone 集群情况,肯定是必须依赖外部服务的调度的

@beiyu98
Copy link

beiyu98 commented Jun 1, 2018

@atian25 thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests