diff --git a/trunk/conf/forward.master.conf b/trunk/conf/forward.master.conf index 630a4c84eaa..324cd78bf0b 100644 --- a/trunk/conf/forward.master.conf +++ b/trunk/conf/forward.master.conf @@ -12,4 +12,12 @@ vhost __defaultVhost__ { enabled on; destination 127.0.0.1:19350; } + + forward live/livestream { + destination rtmp://ossrs.net/live01/test001?auth_key=262953 rtmp://ossrs.net/live02/test002; + } + + forward live1/livestream1 { + destination rtmp://ossrs.net/live03/test003?auth_key=abcde rtmp://ossrs.net/live04/test004; + } } diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index fa813737ceb..b65a90a19cf 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -4548,7 +4548,7 @@ bool SrsConfig::get_forward_enabled(SrsConfDirective* vhost) { static bool DEFAULT = false; - SrsConfDirective* conf = vhost->get("forward"); + SrsConfDirective* conf = vhost->get("forward", ""); if (!conf) { return DEFAULT; } @@ -4568,7 +4568,7 @@ SrsConfDirective* SrsConfig::get_forwards(string vhost) return NULL; } - conf = conf->get("forward"); + conf = conf->get("forward", ""); if (!conf) { return NULL; } @@ -4576,6 +4576,22 @@ SrsConfDirective* SrsConfig::get_forwards(string vhost) return conf->get("destination"); } +SrsConfDirective* SrsConfig::get_forwards(string vhost, string app, string stream) +{ + SrsConfDirective* conf = get_vhost(vhost); + if (!conf) { + return NULL; + } + + string pattern = app + "/" + stream; + conf = conf->get("forward", pattern); + if (!conf || conf->arg0().empty()) { + return NULL; + } + + return conf->get("destination"); +} + SrsConfDirective* SrsConfig::get_vhost_http_hooks(string vhost) { SrsConfDirective* conf = get_vhost(vhost); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index c62351c8726..37fcf0cdf5a 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -604,7 +604,8 @@ class SrsConfig virtual bool get_forward_enabled(SrsConfDirective* vhost); // Get the forward directive of vhost. virtual SrsConfDirective* get_forwards(std::string vhost); - + // Get the forward url directive of vhost. + virtual SrsConfDirective* get_forwards(std::string vhost, std::string app, std::string stream); public: // Whether the srt sevice enabled virtual bool get_srt_enabled(); diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index 06cfd118c00..e1592cde0d6 100755 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -52,6 +52,8 @@ SrsForwarder::~SrsForwarder() srs_freep(sh_video); srs_freep(sh_audio); + + srs_freep(req); } srs_error_t SrsForwarder::initialize(SrsRequest* r, string ep) diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index d297f0f4aa2..a9754d0813a 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1468,20 +1468,54 @@ srs_error_t SrsOriginHub::on_reload_vhost_exec(string vhost) srs_error_t SrsOriginHub::create_forwarders() { srs_error_t err = srs_success; - + + // pattern: app/stream + SrsConfDirective* conf = _srs_config->get_forwards(req->vhost, req->app, req->stream); + for (int i = 0; conf && i < (int)conf->args.size(); i++) { + std::string url = conf->args.at(i); + + // create forwarder by url + SrsRequest* freq = new SrsRequest(); + srs_parse_rtmp_url(url, freq->tcUrl, freq->stream); + srs_discovery_tc_url(freq->tcUrl, freq->schema, freq->host, freq->vhost, freq->app, freq->stream, freq->port, freq->param); + + SrsForwarder* forwarder = new SrsForwarder(this); + forwarders.push_back(forwarder); + + std::stringstream ss; + ss << freq->host << ":" << freq->port; + std::string forward_server = ss.str(); + + // initialize the forwarder with request. + if ((err = forwarder->initialize(freq, forward_server)) != srs_success) { + return srs_error_wrap(err, "init forwarder"); + } + + srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost); + forwarder->set_queue_size(queue_size); + + if ((err = forwarder->on_publish()) != srs_success) { + return srs_error_wrap(err, "start forwarder failed, vhost=%s, app=%s, stream=%s, forward-to=%s", + req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), url.c_str()); + } + } + if (!_srs_config->get_forward_enabled(req->vhost)) { return err; } - SrsConfDirective* conf = _srs_config->get_forwards(req->vhost); + conf = _srs_config->get_forwards(req->vhost); for (int i = 0; conf && i < (int)conf->args.size(); i++) { std::string forward_server = conf->args.at(i); - + + // copy source req + SrsRequest* freq = req->copy(); + SrsForwarder* forwarder = new SrsForwarder(this); forwarders.push_back(forwarder); // initialize the forwarder with request. - if ((err = forwarder->initialize(req, forward_server)) != srs_success) { + if ((err = forwarder->initialize(freq, forward_server)) != srs_success) { return srs_error_wrap(err, "init forwarder"); } diff --git a/trunk/src/utest/srs_utest_config.cpp b/trunk/src/utest/srs_utest_config.cpp index 978a2b30d6a..5f0cd992dc6 100644 --- a/trunk/src/utest/srs_utest_config.cpp +++ b/trunk/src/utest/srs_utest_config.cpp @@ -2914,6 +2914,7 @@ VOID TEST(ConfigMainTest, CheckVhostConfig2) EXPECT_EQ(5000000, conf.get_publish_normal_timeout("ossrs.net")); EXPECT_FALSE(conf.get_forward_enabled("ossrs.net")); EXPECT_TRUE(conf.get_forwards("ossrs.net") == NULL); + EXPECT_TRUE(conf.get_forwards("ossrs.net", "live", "livestream") == NULL); } if (true) { @@ -2928,6 +2929,12 @@ VOID TEST(ConfigMainTest, CheckVhostConfig2) EXPECT_TRUE(conf.get_forward_enabled("ossrs.net")); } + if (true) { + MockSrsConfig conf; + HELPER_ASSERT_SUCCESS(conf.parse(_MIN_OK_CONF "vhost ossrs.net{forward live/livestream {destination xxx;}}")); + EXPECT_TRUE(conf.get_forwards("ossrs.net", "live", "livestream")); + } + if (true) { MockSrsConfig conf; HELPER_ASSERT_SUCCESS(conf.parse(_MIN_OK_CONF "vhost ossrs.net{publish {normal_timeout 10;}}"));