Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workflow如何实现同步API,去调用异步方式的第三方服务API #1625

Open
TongxianYang opened this issue Sep 23, 2024 · 67 comments · Fixed by #1627
Open

workflow如何实现同步API,去调用异步方式的第三方服务API #1625

TongxianYang opened this issue Sep 23, 2024 · 67 comments · Fixed by #1627

Comments

@TongxianYang
Copy link

workflow作为服务代理提供API,将接受到http请求去请求第三方服务的API,第三方的服务API是异步的,第三方服务器将资源结果也通过http请求方式post给workflow, 我的问题是如何把workflow对外提供的api实现为同步调用接口?

@Barenboim
Copy link
Contributor

我觉得你的需求可能不是实现同步调用吧?

你是想收到POST请求之后,你再回复之前的http请求呢

@TongxianYang
Copy link
Author

是的,收到第三方服务的post过来结果,再回复到之前用户调用过来的http请求,这种实现整个流程的同步,workflow有没有推荐的方案?

比如: app---->workflow---->第三方服务器---->远程终端设备, 设备把最终结果回复到最初的app请求中

@Barenboim
Copy link
Contributor

这个简单哈。只有用一下我们的counter,全程异步实现。
收到http请求时,请求第三方API,并使用counter堵住server task的series:

void process(WFHttpTask *server_task)
{
    WFCounterTask *counter= WFTaskFactory::create_counter_task(1, nullptr);
    series_of(server_task)->push_back(counter);
}

收到post请求,打开counter就可以了。如果不想传达指针,也可以使用命名counter,利用名字来打开。相关文档:
https://github.com/sogou/workflow/blob/master/docs/about-counter.md
另外,你也可以使用WFMailboxTask,这是一种带数据传递,目标只只能是1的counter,很符合你的场景。

@Barenboim
Copy link
Contributor

Barenboim commented Sep 23, 2024

Mailbox相关接口:


你可以考虑使用一个命名的mailbox,数据投递在mailbox的user_data,然后在mailbox task的callback里,填写http回复就可以了。
static WFMailboxTask *create_mailbox_task(const std::string& mailbox_name,

@TongxianYang
Copy link
Author

好的,谢谢

用workflow中消息队列来等待返回结果,是否可行吖?

@Barenboim
Copy link
Contributor

Barenboim commented Sep 23, 2024

好的,谢谢

用workflow中消息队列来等待返回结果,是否可行吖?

如果你说的是这个消息队列:https://github.com/sogou/workflow/blob/master/docs/about-resource-pool.md#%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97
可以的。本质上,都是需要产生一个task吧series阻塞住,收到POST请求的时候打开。全程异步。

如果你说的是kernel/msgqueue,那需要占用一个线程等待。这个不可以,容易把所有通讯线程阻塞住导致无法处理POST请求,死锁。同步等待的基础组件是WFFuture,通过动态增加减少通讯线程来解决通讯线程都在等的问题,不过也是一种同步的方式,不推荐。

@TongxianYang
Copy link
Author

好的,谢谢

@TongxianYang
Copy link
Author

还有个问题再请教一下,用WFMailboxTask等待异步消息时,怎么给它设置等待超时时间呢?

@Barenboim
Copy link
Contributor

还有个问题再请教一下,用WFMailboxTask等待异步消息时,怎么给它设[置等待超时时间呢?

自己搞个timer吧,超时的时候自己send一个消息吧。这种情况就不能使用mailbox指针了,需要用name。

不过……你这个,也有一种组件,支持多次接收,但接受到第一个消息就结束,就是selector。文档在:
https://github.com/sogou/workflow/blob/master/docs/about-selector.md

@Barenboim
Copy link
Contributor

Selector可能对你这个场景还不太好。Selector要求每个candidate必须执行submit。但明显你的POST请求可能是永远收不到的。

@TongxianYang
Copy link
Author

是的,终端设备离线了就永远收不到post消息

好的,那我就用workflow内置的定时器来做吧

@TongxianYang
Copy link
Author

WFMailboxTask命名和WFTimerTask命名是否可以用同一个命名?

如下:
WFTaskFactory::send_by_name("app-rrpc", ptr);
WFTaskFactory::cancel_by_name("app-rrpc");

@Barenboim
Copy link
Contributor

可以同名,不同类型的命名任务,名称空间是独立的。

不过我觉得timer没有什么必要命名,这玩意开销极小,而且程序退出的话也就直接中断了。放着就放着。当然你cancel也没什么问题。

@Barenboim
Copy link
Contributor

WFMailboxTask命名和WFTimerTask命名是否可以用同一个命名?

如下: WFTaskFactory::send_by_name("app-rrpc", ptr); WFTaskFactory::cancel_by_name("app-rrpc");

对了,send_by_name这个操作没有返回值,不知道消息投递成功了没有……这个不知道对你是否有影响。

@TongxianYang
Copy link
Author

1727171437982

timer不用串行到http服务器任务中吧?

@Barenboim
Copy link
Contributor

1727171437982

timer不用串行到http服务器任务中吧?

timer直接start就可以了。

@Barenboim
Copy link
Contributor

从你这个代码看起来,send_by_name不加返回值的话,你可能用不了……

@TongxianYang
Copy link
Author

收到post和timer超时,有一处send_by_name成功,就能退出阻塞就可以了

@Barenboim
Copy link
Contributor

收到post和timer超时,有一处send_by_name成功,就能退出阻塞就可以了

问题是,如果超时了,POST请求到达之后,你send_by_name的资源如何回收呢?这时候并没有人收到这个消息。

@TongxianYang
Copy link
Author

确实是哦,有没方案可判断指定的WFMailboxTask已经结束?

@Barenboim
Copy link
Contributor

确实是哦,有没方案可判断指定的WFMailboxTask已经结束?

我改一下代码吧,今晚就可以搞定。send_by_name操作返回有多少个mailbox收到了消息就可以了。

@TongxianYang
Copy link
Author

好的,谢谢

1727172280486
创建任务时就分配好空间,然后用这个函数返回空间地址赋值进去, 这种方式也可以的吧?

@Barenboim
Copy link
Contributor

好的,谢谢

1727172280486 创建任务时就分配好空间,然后用这个函数返回空间地址赋值进去, 这种方式也可以的吧?

但这个空间总是需要回收的。我觉得send_by_name如果不知道投递结果,应该是没有办法实现得比较自然的。

@Barenboim
Copy link
Contributor

另外,你这个代码好像有点旧。send操作后来改成virtual的了。

@TongxianYang
Copy link
Author

另外,你这个代码好像有点旧。send操作后来改成virtual的了。

是的,代码用的库刚更新到最后发布的0.11.5版本了

@TongxianYang
Copy link
Author

但这个空间总是需要回收的。我觉得send_by_name如果不知道投递结果,应该是没有办法实现得比较自然的。

好的,等你们更新后,我明天再拉取最新的

@Barenboim Barenboim linked a pull request Sep 24, 2024 that will close this issue
@Barenboim
Copy link
Contributor

可以看一下这个PR:https://github.com/sogou/workflow/pulls

所有的XXX_by_name函数,都返回一个int代表操作所触发的任务个数。在你这个业务里,send_by_name返回1则代表消息被mailbox成功接收。返回0表示这个名字的mailbox已经被别的任务投递了,应该释放资源。

同样,cancel_by_name也会返回成功取消的定时器个数。

@TongxianYang
Copy link
Author

另外,代码里有一个逻辑问题。mailbox任务应该先于timer运行时创建。否则,就存在可能性,timer callback里send时,mailbox还没有创建,导致超时失效。当然,在这个case里,timer超时10秒,这种情况理论上不会发生,但也是不严密。

其他不用mailbox任务的API是正常的,现在几十台设备访问着

我这个说的是另一个问题,按理是不会发生的。你设备上运行过旧版代码,检查一下头文件不一致的可能性。mailbox任务应该没有什么bug的。

其他两台也运行过旧版代码,会不会有可能资源受限导致的进程死掉

@TongxianYang
Copy link
Author

1728634435806

@Barenboim
Copy link
Contributor

另外,代码里有一个逻辑问题。mailbox任务应该先于timer运行时创建。否则,就存在可能性,timer callback里send时,mailbox还没有创建,导致超时失效。当然,在这个case里,timer超时10秒,这种情况理论上不会发生,但也是不严密。

其他不用mailbox任务的API是正常的,现在几十台设备访问着

我这个说的是另一个问题,按理是不会发生的。你设备上运行过旧版代码,检查一下头文件不一致的可能性。mailbox任务应该没有什么bug的。

其他两台也运行过旧版代码,会不会有可能资源受限导致的进程死掉

不是资源受限问题,我们的资源需求非常小。你的内存够够的。

你这个现象非常符合头文件与lib不一致。安装过旧的库只是一个必要条件,不是充分条件。

另外,我说的bug改一下吧…… 把timer->start()移到create_mailbox_task()下面就行。

@TongxianYang
Copy link
Author

出问题那台是已有80多个设备互相通信着,有没有可能是线程池的线程数量这些不够?正常那两台基本还没多少设备在使用

@TongxianYang
Copy link
Author

另外,代码里有一个逻辑问题。mailbox任务应该先于timer运行时创建。否则,就存在可能性,timer callback里send时,mailbox还没有创建,导致超时失效。当然,在这个case里,timer超时10秒,这种情况理论上不会发生,但也是不严密。

其他不用mailbox任务的API是正常的,现在几十台设备访问着

我这个说的是另一个问题,按理是不会发生的。你设备上运行过旧版代码,检查一下头文件不一致的可能性。mailbox任务应该没有什么bug的。

其他两台也运行过旧版代码,会不会有可能资源受限导致的进程死掉

不是资源受限问题,我们的资源需求非常小。你的内存够够的。

你这个现象非常符合头文件与lib不一致。安装过旧的库只是一个必要条件,不是充分条件。

另外,我说的bug改一下吧…… 把timer->start()移到create_mailbox_task()下面就行。

库文件和头文件我是同时更新的,编译好放到自己目录中链接的

逻辑BUG 我会改掉

@Barenboim
Copy link
Contributor

问题就是担心你系统里安装过一个…… 你要不搞个完全干净的环境再试一下?

@TongxianYang
Copy link
Author

1728635136153
库和头文件我直接放到工程中引用和链接的,不是安装到系统默认搜索路径下,不存在旧版头文件的问题

@TongxianYang
Copy link
Author

问题就是担心你系统里安装过一个…… 你要不搞个完全干净的环境再试一下?

我再看看看吧

@Barenboim
Copy link
Contributor

你看一下/usr/local/lib和/usr/local/include里有没有把。很多不一致的case,都是因为系统里安装了。

@TongxianYang
Copy link
Author

你看一下/usr/local/lib和/usr/local/include里有没有把。很多不一致的case,都是因为系统里安装了。

1728636236126
查了没有,这台已经在使用了,已经在跑其他的服务,环境不能直接系统重装

@TongxianYang
Copy link
Author

1728637836560
1728638008779
1728638181301
我用gdb打印出来的段错误栈信息,帮忙看看有没有发现,centos的编译器和ubuntu的编译器版本不一样的

@TongxianYang
Copy link
Author

1728638421104

@Barenboim
Copy link
Contributor

Barenboim commented Oct 11, 2024

你的uuid_str是在哪里传给root["payload"]["RequestId"]的?

@Barenboim
Copy link
Contributor

你send_by_name之后,message可能已经在mailbox的callback里被delete了,你不能再取message里的内容!

@Barenboim
Copy link
Contributor

最简单的你可以改成先cancel再send。或者做一个小优化,send返回1才cancel,但这时候不能再用message里的数据了。

@TongxianYang
Copy link
Author

你的uuid_str是在哪里传给root["payload"]["RequestId"]的?

这个RequestId在远程设备的回复消息中,在发送给设备的消息附带进去的

1728647257330

@Barenboim
Copy link
Contributor

你的uuid_str是在哪里传给root["payload"]["RequestId"]的?

这个RequestId在远程设备的回复消息中,在发送给设备的消息附带进去的

1728647257330

这个不重要了。

@TongxianYang
Copy link
Author

最简单的你可以改成先cancel再send。或者做一个小优化,send返回1才cancel,但这时候不能再用message里的数据了。

好的,我代码只有在mailbox的callback使用message, 用完delete了,其它外部没有使用message啊

@TongxianYang
Copy link
Author

我都已经开始把centos系统重装为ubuntu了

@Barenboim
Copy link
Contributor

最简单的你可以改成先cancel再send。或者做一个小优化,send返回1才cancel,但这时候不能再用message里的数据了。

好的,我代码只有在mailbox的callback使用message, 用完delete了,其它外部没有使用message啊

但你写的是异步程序。无法保证mailbox callback里的delete,和你后面cancel_by_name()哪个先执行啊。那么你取message里的内容肯定就错了!

和你什么系统没有关系,必然是错的。

@TongxianYang
Copy link
Author

那就很解释为什么在其他两台ubuntu上面不会导致段错误啊

@Barenboim
Copy link
Contributor

那就很解释为什么在其他两台ubuntu上面不会导致段错误啊

讨论这个没有什么意义啊。访问已经释放的内存行为无法定义。碰巧没挂也不能说明什么。你理解问题所在没有?

@TongxianYang
Copy link
Author

最简单的你可以改成先cancel再send。或者做一个小优化,send返回1才cancel,但这时候不能再用message里的数据了。

好的,我代码只有在mailbox的callback使用message, 用完delete了,其它外部没有使用message啊

但你写的是异步程序。无法保证mailbox callback里的delete,和你后面cancel_by_name()哪个先执行啊。那么你取message里的内容肯定就错了!

和你什么系统没有关系,必然是错的。

好的,这个理解了

@TongxianYang
Copy link
Author

最简单的你可以改成先cancel再send。或者做一个小优化,send返回1才cancel,但这时候不能再用message里的数据了。

理解你的意思,是不是取消定时器也会执行定时器回调函数? 我一直理解成定时器回调函数只在定时器过期时触发,提前取消不会触发

@Barenboim
Copy link
Contributor

Barenboim commented Oct 12, 2024 via email

@TongxianYang
Copy link
Author

噢噢,先cancel的话,定时器的callback里需要判断一下状态,再决定要不要send,否则定时器可能先于正常的send导致任务取消。 看起来还是先send再cancel好一点。把name保存一下。

---原始邮件--- 发件人: @.> 发送时间: 2024年10月12日(周六) 上午9:51 收件人: @.>; 抄送: @.>;"State @.>; 主题: Re: [sogou/workflow] workflow如何实现同步API,去调用异步方式的第三方服务API (Issue #1625) 最简单的你可以改成先cancel再send。或者做一个小优化,send返回1才cancel,但这时候不能再用message里的数据了。 理解你的意思,是不是取消定时器也会执行定时器回调函数? 我一直理解成定时器回调函数只在定时器过期时触发,提前取消不会触发 — Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you modified the open/close state.Message ID: @.***>

好的,那再请教下,定时器的callback里需要判断状态为什么时对应的定时器是cancel还是到期触发的呢?

@Barenboim
Copy link
Contributor

Barenboim commented Oct 12, 2024 via email

@TongxianYang
Copy link
Author

正常到时的话,state为WFT_STATE_SUCCESS。取消的话,state为WFT_STATE_SYS_ERROR,error为ECANCELED。

好的,非常感谢支持

@Barenboim
Copy link
Contributor

#1635

做了一个优化,确保cancel成功的个数(cancel_by_name()的返回值)与ECANCELED状态的timer个数严格一致。由于你不使用cancel_by_name()的返回值,这个改动对你无影响。但是,在这个新代码下,你的程序可以这么写:
Timer callback:

void timer_callback(WFTimerTask *timer)
{
    if (timer->get_state() == WFT_STATE_SUCCESS)  // Timer 正常到期
    {
        MESSAGE *msg = new MESSAGE;
        WFTaskFactory::send_by_name(name, msg);   // 无需判断返回值了。
    }
}

Publish:

{
    if (WFTaskFactory::cancel_by_name(name) != 0)  // 成功cancel timer
    {
        MESSAGE *msg = new MESSAGE;
        WFTaskFactory::send_by_name(name, msg);   // 无需判断返回值了。
    }
}

@TongxianYang
Copy link
Author

#1635

做了一个优化,确保cancel成功的个数(cancel_by_name()的返回值)与ECANCELED状态的timer个数严格一致。由于你不使用cancel_by_name()的返回值,这个改动对你无影响。但是,在这个新代码下,你的程序可以这么写: Timer callback:

void timer_callback(WFTimerTask *timer)
{
    if (timer->get_state() == WFT_STATE_SUCCESS)  // Timer 正常到期
    {
        MESSAGE *msg = new MESSAGE;
        WFTaskFactory::send_by_name(name, msg);   // 无需判断返回值了。
    }
}

Publish:

{
    if (WFTaskFactory::cancel_by_name(name) != 0)  // 成功cancel timer
    {
        MESSAGE *msg = new MESSAGE;
        WFTaskFactory::send_by_name(name, msg);   // 无需判断返回值了。
    }
}

好的,谢谢!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants