From 03cf93fc2b9d62daacd0f467b1d8debe95f69606 Mon Sep 17 00:00:00 2001 From: chundonglinlin Date: Wed, 16 Feb 2022 10:49:16 +0800 Subject: [PATCH] Forward: support config full rtmp url forward to other server (#2799) * Forward: add backend config and demo server for dynamic create forwarder to other server.(#1342) * Forward: if call forward backend failed, then return directly. * Forward: add API description and change return value format. * Forward: add backend conf file and wrapper function for backend service. * Forward: add backend comment in full.conf and update forward.backend.conf. * Forward: rename backend param and add comment tips. --- trunk/conf/forward.backend.conf | 15 +++ trunk/conf/full.conf | 31 +++++++ trunk/research/api-server/server.py | 81 ++++++++++++++++ trunk/src/app/srs_app_config.cpp | 17 +++- trunk/src/app/srs_app_config.hpp | 2 + trunk/src/app/srs_app_forward.cpp | 4 +- trunk/src/app/srs_app_http_hooks.cpp | 70 ++++++++++++++ trunk/src/app/srs_app_http_hooks.hpp | 5 + trunk/src/app/srs_app_source.cpp | 133 +++++++++++++++++++++------ trunk/src/app/srs_app_source.hpp | 3 +- trunk/src/utest/srs_utest_config.cpp | 7 ++ 11 files changed, 335 insertions(+), 33 deletions(-) create mode 100644 trunk/conf/forward.backend.conf diff --git a/trunk/conf/forward.backend.conf b/trunk/conf/forward.backend.conf new file mode 100644 index 0000000000..b032d2cd68 --- /dev/null +++ b/trunk/conf/forward.backend.conf @@ -0,0 +1,15 @@ +# the config for srs to forward to slave service +# @see https://github.com/ossrs/srs/wiki/v5_CN_SampleForward +# @see full.conf for detail config. + +listen 1935; +max_connections 1000; +pid ./objs/srs.backend.pid; +daemon off; +srs_log_tank console; +vhost __defaultVhost__ { + forward { + enabled on; + backend http://127.0.0.1:8085/api/v1/forward; + } +} diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index c9652896e1..e015769238 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -669,6 +669,37 @@ vhost same.vhost.forward.srs.com { # active-active for cdn to build high available fault tolerance system. # format: {ip}:{port} {ip_N}:{port_N} destination 127.0.0.1:1936 127.0.0.1:1937; + + # when client(encoder) publish to vhost/app/stream, call the hook in creating backend forwarder. + # the request in the POST data string is a object encode by json: + # { + # "action": "on_forward", + # "server_id": "vid-k21d7y2", + # "client_id": "9o7g1330", + # "ip": "127.0.0.1", + # "vhost": "__defaultVhost__", + # "app": "live", + # "tcUrl": "rtmp://127.0.0.1:1935/live", + # "stream": "livestream", + # "param": "" + # } + # if valid, the hook must return HTTP code 200(Status OK) and response + # an int value specifies the error code(0 corresponding to success): + # { + # "code": 0, + # "data": { + # "urls":[ + # "rtmp://127.0.0.1:19350/test/teststream" + # ] + # } + # } + # PS: you can transform params to backend service, such as: + # { "param": "?forward=rtmp://127.0.0.1:19351/test/livestream" } + # then backend return forward's url in response. + # if backend return empty urls, destanition is still disabled. + # only support one api hook, format: + # backend http://xxx/api0 + backend http://127.0.0.1:8085/api/v1/forward; } } diff --git a/trunk/research/api-server/server.py b/trunk/research/api-server/server.py index e50af7c46e..1cfb73a021 100755 --- a/trunk/research/api-server/server.py +++ b/trunk/research/api-server/server.py @@ -805,6 +805,86 @@ def POST(self): def OPTIONS(self, *args, **kwargs): enable_crossdomain() +''' +handle the forward requests: dynamic forward url. +''' +class RESTForward(object): + exposed = True + + def __init__(self): + self.__forwards = [] + + def GET(self): + enable_crossdomain() + + forwards = {} + return json.dumps(forwards) + + ''' + for SRS hook: on_forward + on_forward: + when srs reap a dvr file, call the hook, + the request in the POST data string is a object encode by json: + { + "action": "on_forward", + "server_id": "server_test", + "client_id": 1985, + "ip": "192.168.1.10", + "vhost": "video.test.com", + "app": "live", + "tcUrl": "rtmp://video.test.com/live?key=d2fa801d08e3f90ed1e1670e6e52651a", + "stream": "livestream", + "param":"?token=xxx&salt=yyy" + } + if valid, the hook must return HTTP code 200(Stauts OK) and response + an int value specifies the error code(0 corresponding to success): + 0 + ''' + def POST(self): + enable_crossdomain() + + # return the error code in str + code = Error.success + + req = cherrypy.request.body.read() + trace("post to forwards, req=%s"%(req)) + try: + json_req = json.loads(req) + except Exception, ex: + code = Error.system_parse_json + trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code)) + return json.dumps({"code": int(code), "data": None}) + + action = json_req["action"] + if action == "on_forward": + return self.__on_forward(json_req) + else: + trace("invalid request action: %s"%(json_req["action"])) + code = Error.request_invalid_action + + return json.dumps({"code": int(code), "data": None}) + + def OPTIONS(self, *args, **kwargs): + enable_crossdomain() + + def __on_forward(self, req): + code = Error.success + + trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, tcUrl=%s, stream=%s, param=%s"%( + req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["tcUrl"], req["stream"], req["param"] + )) + + ''' + backend service config description: + support multiple rtmp urls(custom addresses or third-party cdn service), + url's host is slave service. + For example: + ["rtmp://127.0.0.1:19350/test/teststream", "rtmp://127.0.0.1:19350/test/teststream?token=xxxx"] + ''' + forwards = ["rtmp://127.0.0.1:19350/test/teststream"] + + return json.dumps({"code": int(code), "data": {"urls": forwards}}) + # HTTP RESTful path. class Root(object): exposed = True @@ -846,6 +926,7 @@ def __init__(self): self.chats = RESTChats() self.servers = RESTServers() self.snapshots = RESTSnapshots() + self.forward = RESTForward() def GET(self): enable_crossdomain(); return json.dumps({"code":Error.success, "urls":{ diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index a9aa056356..e782ceaa2f 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -2825,7 +2825,7 @@ srs_error_t SrsConfig::check_normal_config() } else if (n == "forward") { for (int j = 0; j < (int)conf->directives.size(); j++) { string m = conf->at(j)->name; - if (m != "enabled" && m != "destination") { + if (m != "enabled" && m != "destination" && m != "backend") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.forward.%s of %s", m.c_str(), vhost->arg0().c_str()); } } @@ -4635,6 +4635,21 @@ SrsConfDirective* SrsConfig::get_forwards(string vhost) return conf->get("destination"); } +SrsConfDirective* SrsConfig::get_forward_backend(string vhost) +{ + SrsConfDirective* conf = get_vhost(vhost); + if (!conf) { + return NULL; + } + + conf = conf->get("forward"); + if (!conf) { + return NULL; + } + + return conf->get("backend"); +} + SrsConfDirective* SrsConfig::get_vhost_http_hooks(string vhost) { SrsConfDirective* conf = get_vhost(vhost); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 7623dc04db..5d4fdddbab 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -624,6 +624,8 @@ class SrsConfig virtual bool get_forward_enabled(SrsConfDirective* vhost); // Get the forward directive of vhost. virtual SrsConfDirective* get_forwards(std::string vhost); + // Get the forward directive of backend. + virtual SrsConfDirective* get_forward_backend(std::string vhost); public: // Whether the srt sevice enabled diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index 93e663f00f..5464ff6bdb 100755 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -52,6 +52,8 @@ SrsForwarder::~SrsForwarder() srs_freep(sh_video); srs_freep(sh_audio); + + srs_freep(req); } srs_error_t SrsForwarder::initialize(SrsRequest* r, string ep) @@ -60,7 +62,7 @@ srs_error_t SrsForwarder::initialize(SrsRequest* r, string ep) // it's ok to use the request object, // SrsLiveSource already copy it and never delete it. - req = r; + req = r->copy(); // the ep(endpoint) to forward to ep_forward = ep; diff --git a/trunk/src/app/srs_app_http_hooks.cpp b/trunk/src/app/srs_app_http_hooks.cpp index 2541161687..f49b4a5ed1 100644 --- a/trunk/src/app/srs_app_http_hooks.cpp +++ b/trunk/src/app/srs_app_http_hooks.cpp @@ -482,6 +482,76 @@ srs_error_t SrsHttpHooks::discover_co_workers(string url, string& host, int& por return err; } +srs_error_t SrsHttpHooks::on_forward_backend(string url, SrsRequest* req, std::vector& rtmp_urls) +{ + srs_error_t err = srs_success; + + SrsContextId cid = _srs_context->get_id(); + + SrsStatistic* stat = SrsStatistic::instance(); + + SrsJsonObject* obj = SrsJsonAny::object(); + SrsAutoFree(SrsJsonObject, obj); + + obj->set("action", SrsJsonAny::str("on_forward")); + obj->set("server_id", SrsJsonAny::str(stat->server_id().c_str())); + obj->set("client_id", SrsJsonAny::str(cid.c_str())); + obj->set("ip", SrsJsonAny::str(req->ip.c_str())); + obj->set("vhost", SrsJsonAny::str(req->vhost.c_str())); + obj->set("app", SrsJsonAny::str(req->app.c_str())); + obj->set("tcUrl", SrsJsonAny::str(req->tcUrl.c_str())); + obj->set("stream", SrsJsonAny::str(req->stream.c_str())); + obj->set("param", SrsJsonAny::str(req->param.c_str())); + + std::string data = obj->dumps(); + std::string res; + int status_code; + + SrsHttpClient http; + if ((err = do_post(&http, url, data, status_code, res)) != srs_success) { + return srs_error_wrap(err, "http: on_forward_backend failed, client_id=%s, url=%s, request=%s, response=%s, code=%d", + cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code); + } + + // parse string res to json. + SrsJsonAny* info = SrsJsonAny::loads(res); + if (!info) { + return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "load json from %s", res.c_str()); + } + SrsAutoFree(SrsJsonAny, info); + + // response error code in string. + if (!info->is_object()) { + return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "response %s", res.c_str()); + } + + SrsJsonAny* prop = NULL; + // response standard object, format in json: {} + SrsJsonObject* res_info = info->to_object(); + if ((prop = res_info->ensure_property_object("data")) == NULL) { + return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "parse data %s", res.c_str()); + } + + SrsJsonObject* p = prop->to_object(); + if ((prop = p->ensure_property_array("urls")) == NULL) { + return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "parse urls %s", res.c_str()); + } + + SrsJsonArray* urls = prop->to_array(); + for (int i = 0; i < urls->count(); i++) { + prop = urls->at(i); + string rtmp_url = prop->to_str(); + if (!rtmp_url.empty()) { + rtmp_urls.push_back(rtmp_url); + } + } + + srs_trace("http: on_forward_backend ok, client_id=%s, url=%s, request=%s, response=%s", + cid.c_str(), url.c_str(), data.c_str(), res.c_str()); + + return err; +} + srs_error_t SrsHttpHooks::do_post(SrsHttpClient* hc, std::string url, std::string req, int& code, string& res) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_http_hooks.hpp b/trunk/src/app/srs_app_http_hooks.hpp index 8e3e01663c..f0ad24d017 100644 --- a/trunk/src/app/srs_app_http_hooks.hpp +++ b/trunk/src/app/srs_app_http_hooks.hpp @@ -10,6 +10,7 @@ #include #include +#include class SrsHttpUri; class SrsStSocket; @@ -79,6 +80,10 @@ class SrsHttpHooks static srs_error_t on_hls_notify(SrsContextId cid, std::string url, SrsRequest* req, std::string ts_url, int nb_notify); // Discover co-workers for origin cluster. static srs_error_t discover_co_workers(std::string url, std::string& host, int& port); + // The on_forward_backend hook, when publish stream start to forward + // @param url the api server url, to valid the client. + // ignore if empty. + static srs_error_t on_forward_backend(std::string url, SrsRequest* req, std::vector& rtmp_urls); private: static srs_error_t do_post(SrsHttpClient* hc, std::string url, std::string req, int& code, std::string& res); }; diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 40d2083167..f715610330 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -34,6 +34,7 @@ using namespace std; #include #include #include +#include #define CONST_MAX_JITTER_MS 250 #define CONST_MAX_JITTER_MS_NEG -250 @@ -807,7 +808,7 @@ SrsSharedPtrMessage* SrsMixQueue::pop() SrsOriginHub::SrsOriginHub() { source = NULL; - req = NULL; + req_ = NULL; is_active = false; hls = new SrsHls(); @@ -851,22 +852,22 @@ srs_error_t SrsOriginHub::initialize(SrsLiveSource* s, SrsRequest* r) { srs_error_t err = srs_success; - req = r; + req_ = r; source = s; if ((err = format->initialize()) != srs_success) { return srs_error_wrap(err, "format initialize"); } - if ((err = hls->initialize(this, req)) != srs_success) { + if ((err = hls->initialize(this, req_)) != srs_success) { return srs_error_wrap(err, "hls initialize"); } - if ((err = dash->initialize(this, req)) != srs_success) { + if ((err = dash->initialize(this, req_)) != srs_success) { return srs_error_wrap(err, "dash initialize"); } - if ((err = dvr->initialize(this, req)) != srs_success) { + if ((err = dvr->initialize(this, req_)) != srs_success) { return srs_error_wrap(err, "dvr initialize"); } @@ -952,7 +953,7 @@ srs_error_t SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio) // when got audio stream info. SrsStatistic* stat = SrsStatistic::instance(); - if ((err = stat->on_audio_info(req, SrsAudioCodecIdAAC, c->sound_rate, c->sound_type, c->aac_object)) != srs_success) { + if ((err = stat->on_audio_info(req_, SrsAudioCodecIdAAC, c->sound_rate, c->sound_type, c->aac_object)) != srs_success) { return srs_error_wrap(err, "stat audio"); } @@ -966,7 +967,7 @@ srs_error_t SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio) if ((err = hls->on_audio(msg, format)) != srs_success) { // apply the error strategy for hls. // @see https://github.com/ossrs/srs/issues/264 - std::string hls_error_strategy = _srs_config->get_hls_on_error(req->vhost); + std::string hls_error_strategy = _srs_config->get_hls_on_error(req_->vhost); if (srs_config_hls_is_on_error_ignore(hls_error_strategy)) { srs_warn("hls: ignore audio error %s", srs_error_desc(err).c_str()); hls->on_unpublish(); @@ -1025,7 +1026,7 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se // user can disable the sps parse to workaround when parse sps failed. // @see https://github.com/ossrs/srs/issues/474 if (is_sequence_header) { - format->avc_parse_sps = _srs_config->get_parse_sps(req->vhost); + format->avc_parse_sps = _srs_config->get_parse_sps(req_->vhost); } if ((err = format->on_video(msg)) != srs_success) { @@ -1046,7 +1047,7 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se // when got video stream info. SrsStatistic* stat = SrsStatistic::instance(); - if ((err = stat->on_video_info(req, SrsVideoCodecIdAVC, c->avc_profile, c->avc_level, c->width, c->height)) != srs_success) { + if ((err = stat->on_video_info(req_, SrsVideoCodecIdAVC, c->avc_profile, c->avc_level, c->width, c->height)) != srs_success) { return srs_error_wrap(err, "stat video"); } @@ -1066,7 +1067,7 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se // TODO: We should support more strategies. // apply the error strategy for hls. // @see https://github.com/ossrs/srs/issues/264 - std::string hls_error_strategy = _srs_config->get_hls_on_error(req->vhost); + std::string hls_error_strategy = _srs_config->get_hls_on_error(req_->vhost); if (srs_config_hls_is_on_error_ignore(hls_error_strategy)) { srs_warn("hls: ignore video error %s", srs_error_desc(err).c_str()); hls->on_unpublish(); @@ -1126,7 +1127,7 @@ srs_error_t SrsOriginHub::on_publish() } // TODO: FIXME: use initialize to set req. - if ((err = encoder->on_publish(req)) != srs_success) { + if ((err = encoder->on_publish(req_)) != srs_success) { return srs_error_wrap(err, "encoder publish"); } @@ -1139,7 +1140,7 @@ srs_error_t SrsOriginHub::on_publish() } // @see https://github.com/ossrs/srs/issues/1613#issuecomment-961657927 - if ((err = dvr->on_publish(req)) != srs_success) { + if ((err = dvr->on_publish(req_)) != srs_success) { return srs_error_wrap(err, "dvr publish"); } @@ -1151,7 +1152,7 @@ srs_error_t SrsOriginHub::on_publish() #endif // TODO: FIXME: use initialize to set req. - if ((err = ng_exec->on_publish(req)) != srs_success) { + if ((err = ng_exec->on_publish(req_)) != srs_success) { return srs_error_wrap(err, "exec publish"); } @@ -1236,7 +1237,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_forward(string vhost) { srs_error_t err = srs_success; - if (req->vhost != vhost) { + if (req_->vhost != vhost) { return err; } @@ -1263,7 +1264,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_dash(string vhost) { srs_error_t err = srs_success; - if (req->vhost != vhost) { + if (req_->vhost != vhost) { return err; } @@ -1305,7 +1306,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_hls(string vhost) { srs_error_t err = srs_success; - if (req->vhost != vhost) { + if (req_->vhost != vhost) { return err; } @@ -1355,7 +1356,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_hds(string vhost) { srs_error_t err = srs_success; - if (req->vhost != vhost) { + if (req_->vhost != vhost) { return err; } @@ -1382,7 +1383,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_dvr(string vhost) { srs_error_t err = srs_success; - if (req->vhost != vhost) { + if (req_->vhost != vhost) { return err; } @@ -1397,12 +1398,12 @@ srs_error_t SrsOriginHub::on_reload_vhost_dvr(string vhost) } // reinitialize the dvr, update plan. - if ((err = dvr->initialize(this, req)) != srs_success) { + if ((err = dvr->initialize(this, req_)) != srs_success) { return srs_error_wrap(err, "reload dvr"); } // start to publish by new plan. - if ((err = dvr->on_publish(req)) != srs_success) { + if ((err = dvr->on_publish(req_)) != srs_success) { return srs_error_wrap(err, "dvr publish failed"); } @@ -1419,7 +1420,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_transcode(string vhost) { srs_error_t err = srs_success; - if (req->vhost != vhost) { + if (req_->vhost != vhost) { return err; } @@ -1432,7 +1433,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_transcode(string vhost) return err; } - if ((err = encoder->on_publish(req)) != srs_success) { + if ((err = encoder->on_publish(req_)) != srs_success) { return srs_error_wrap(err, "start encoder failed"); } srs_trace("vhost %s transcode reload success", vhost.c_str()); @@ -1444,7 +1445,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_exec(string vhost) { srs_error_t err = srs_success; - if (req->vhost != vhost) { + if (req_->vhost != vhost) { return err; } @@ -1457,7 +1458,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_exec(string vhost) return err; } - if ((err = ng_exec->on_publish(req)) != srs_success) { + if ((err = ng_exec->on_publish(req_)) != srs_success) { return srs_error_wrap(err, "start exec failed"); } srs_trace("vhost %s exec reload success", vhost.c_str()); @@ -1469,11 +1470,24 @@ srs_error_t SrsOriginHub::create_forwarders() { srs_error_t err = srs_success; - if (!_srs_config->get_forward_enabled(req->vhost)) { + if (!_srs_config->get_forward_enabled(req_->vhost)) { return err; } - - SrsConfDirective* conf = _srs_config->get_forwards(req->vhost); + + // For backend config + // If backend is enabled and applied, ignore destination. + bool applied_backend_server = false; + if ((err = create_backend_forwarders(applied_backend_server)) != srs_success) { + return srs_error_wrap(err, "create backend applied=%d", applied_backend_server); + } + + // Already applied backend server, ignore destination. + if (applied_backend_server) { + return err; + } + + // For destanition config + SrsConfDirective* conf = _srs_config->get_forwards(req_->vhost); for (int i = 0; conf && i < (int)conf->args.size(); i++) { std::string forward_server = conf->args.at(i); @@ -1481,22 +1495,81 @@ srs_error_t SrsOriginHub::create_forwarders() forwarders.push_back(forwarder); // initialize the forwarder with request. - if ((err = forwarder->initialize(req, forward_server)) != srs_success) { + if ((err = forwarder->initialize(req_, forward_server)) != srs_success) { return srs_error_wrap(err, "init forwarder"); } - srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost); + srs_utime_t queue_size = _srs_config->get_queue_length(req_->vhost); forwarder->set_queue_size(queue_size); if ((err = forwarder->on_publish()) != srs_success) { return srs_error_wrap(err, "start forwarder failed, vhost=%s, app=%s, stream=%s, forward-to=%s", - req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), forward_server.c_str()); + req_->vhost.c_str(), req_->app.c_str(), req_->stream.c_str(), forward_server.c_str()); } } return err; } +srs_error_t SrsOriginHub::create_backend_forwarders(bool& applied) +{ + srs_error_t err = srs_success; + + // default not configure backend service + applied = false; + + SrsConfDirective* conf = _srs_config->get_forward_backend(req_->vhost); + if (!conf || conf->arg0().empty()) { + return err; + } + + // configure backend service + applied = true; + + // only get first backend url + std::string backend_url = conf->arg0(); + + // get urls on forward backend + std::vector urls; + if ((err = SrsHttpHooks::on_forward_backend(backend_url, req_, urls)) != srs_success) { + return srs_error_wrap(err, "get forward backend failed, backend=%s", backend_url.c_str()); + } + + // create forwarders by urls + std::vector::iterator it; + for (it = urls.begin(); it != urls.end(); ++it) { + std::string url = *it; + + // create temp Request by url + SrsRequest* req = new SrsRequest(); + SrsAutoFree(SrsRequest, req); + srs_parse_rtmp_url(url, req->tcUrl, req->stream); + srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->stream, req->port, req->param); + + // create forwarder + SrsForwarder* forwarder = new SrsForwarder(this); + forwarders.push_back(forwarder); + + std::stringstream forward_server; + forward_server << req->host << ":" << req->port; + + // initialize the forwarder with request. + if ((err = forwarder->initialize(req, forward_server.str())) != srs_success) { + return srs_error_wrap(err, "init backend forwarder failed, forward-to=%s", forward_server.str().c_str()); + } + + srs_utime_t queue_size = _srs_config->get_queue_length(req_->vhost); + forwarder->set_queue_size(queue_size); + + if ((err = forwarder->on_publish()) != srs_success) { + return srs_error_wrap(err, "start backend forwarder failed, vhost=%s, app=%s, stream=%s, forward-to=%s", + req_->vhost.c_str(), req_->app.c_str(), req_->stream.c_str(), forward_server.str().c_str()); + } + } + + return err; +} + void SrsOriginHub::destroy_forwarders() { std::vector::iterator it; diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 3043034125..065deba014 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -310,7 +310,7 @@ class SrsOriginHub : public ISrsReloadHandler { private: SrsLiveSource* source; - SrsRequest* req; + SrsRequest* req_; bool is_active; private: // The format, codec information. @@ -375,6 +375,7 @@ class SrsOriginHub : public ISrsReloadHandler virtual srs_error_t on_reload_vhost_exec(std::string vhost); private: virtual srs_error_t create_forwarders(); + virtual srs_error_t create_backend_forwarders(bool& applied); virtual void destroy_forwarders(); }; diff --git a/trunk/src/utest/srs_utest_config.cpp b/trunk/src/utest/srs_utest_config.cpp index 3a3c04a5b2..b36700b69b 100644 --- a/trunk/src/utest/srs_utest_config.cpp +++ b/trunk/src/utest/srs_utest_config.cpp @@ -2994,6 +2994,13 @@ VOID TEST(ConfigMainTest, CheckVhostConfig2) EXPECT_EQ(5000000, conf.get_publish_normal_timeout("ossrs.net")); EXPECT_FALSE(conf.get_forward_enabled("ossrs.net")); EXPECT_TRUE(conf.get_forwards("ossrs.net") == NULL); + EXPECT_TRUE(conf.get_forward_backend("ossrs.net") == NULL); + } + + if (true) { + MockSrsConfig conf; + HELPER_ASSERT_SUCCESS(conf.parse(_MIN_OK_CONF "vhost ossrs.net{forward {backend xxx;}}")); + EXPECT_TRUE(conf.get_forward_backend("ossrs.net") != NULL); } if (true) {