Skip to content

Commit

Permalink
DVR: Support record to local disk. #42
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Jul 5, 2022
1 parent 55c409c commit 9d0122a
Show file tree
Hide file tree
Showing 20 changed files with 691 additions and 13 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ mgmt/containers/etc/letsencrypt/*
mgmt/containers/var/lib/letsencrypt/*
mgmt/containers/var/log/letsencrypt/*
mgmt/containers/objs/nginx/html/*
mgmt/containers/data/record/*
mgmt/containers/data/dvr/*
mgmt/containers/data/vod/*
mgmt/containers/data/redis/*
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ A lightweight open-source video cloud based on Nodejs, SRS, FFmpeg, WebRTC, etc.
- [x] [Support WordPress Plugin](https://mp.weixin.qq.com/s/YjTkcJLkErMcZYHIjzsW_w) or [here](https://wordpress.org/plugins/srs-player).
- [x] [Support Typecho Plugin](https://github.com/ossrs/Typecho-Plugin-SrsPlayer).
- [x] [Support aaPanel to install on any linux](https://github.com/ossrs/srs-cloud/issues/29).
- [x] [Support DVR to local disk](https://github.com/ossrs/srs-cloud/issues/42).

Other more use scenarios is on the way, please read [this post](https://github.com/ossrs/srs/issues/2856#lighthouse).

Expand Down Expand Up @@ -146,6 +147,7 @@ Market:
* `/terraform/v1/hooks/srs/secret/update` Hooks: Update the secret to generate stream URL.
* `/terraform/v1/hooks/srs/secret/disable` Hooks: Disable the secret for authentication.
* `/terraform/v1/hooks/srs/hls` Hooks: Handle the `on_hls` event.
* `/terraform/v1/hooks/record/query` Hooks: Query the Record pattern.
* `/terraform/v1/hooks/dvr/apply` Hooks: Apply the DVR pattern.
* `/terraform/v1/hooks/dvr/query` Hooks: Query the DVR pattern.
* `/terraform/v1/hooks/dvr/files` Hooks: List the DVR files.
Expand Down
13 changes: 11 additions & 2 deletions hooks/dvrWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,17 @@ async function handleMessage(msg) {
const m3u8 = await redis.hget(keys.redis.SRS_DVR_M3U8_ACTIVE, m3u8_url);
const m3u8Obj = m3u8 ? JSON.parse(m3u8) : {update: null, uuid: uuidv4()};
m3u8Obj.update = moment().format();
const r0 = await redis.hset(keys.redis.SRS_DVR_M3U8_ACTIVE, m3u8_url, JSON.stringify(m3u8Obj));

// Append ts files to local m3u8 object.
const local = await redis.hget(keys.redis.SRS_DVR_M3U8_LOCAL, m3u8_url);
const localObj = local ? JSON.parse(local) : {nn: 0, update: null, done: null, uuid: m3u8Obj.uuid, uuids: [], files: []};
const localObj = local ? JSON.parse(local) : {
nn: 0,
update: null,
done: null,
uuid: m3u8Obj.uuid,
uuids: [],
files: [],
};
if (!local || localObj.uuid !== m3u8Obj.uuid) {
localObj.done = null;
localObj.uuid = m3u8Obj.uuid;
Expand All @@ -72,7 +78,10 @@ async function handleMessage(msg) {
localObj.files.push({m3u8_url, tsid, tsfile, ...msg});
localObj.nn = localObj.files.length;
localObj.update = moment().format();

// We update the local m3u8 object, then update the active m3u8 stream list, to make sure the localObj is ready.
const r1 = await redis.hset(keys.redis.SRS_DVR_M3U8_LOCAL, m3u8_url, JSON.stringify(localObj));
const r0 = await redis.hset(keys.redis.SRS_DVR_M3U8_ACTIVE, m3u8_url, JSON.stringify(m3u8Obj));
console.log(`Thread #dvrWorker: local dvr task m3u8=${m3u8_url}, files=${localObj.files.length}, uuid=${m3u8Obj.uuid}, file=${file}, tsfile=${tsfile}, duration=${duration}, seqno=${seqno}, r0=${r0}, r1=${r1}`);
}

Expand Down
14 changes: 13 additions & 1 deletion hooks/hls.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,23 @@ exports.handle = (router) => {
});
}

// Create a Record task if enabled.
const recordAll = await redis.hget(keys.redis.SRS_RECORD_PATTERNS, 'all');
let record = 'ignore';
if (recordAll === 'true') {
record = 'task_created';
console.log(`create record task file=${file}, duration=${duration}, seqno=${seqno}, m3u8_url=${m3u8_url}, url=${url}`);
manager.postMessage({
action: 'on_record_file', file, duration, seqno, m3u8_url, url, params: ctx.request.body,
});
}

const update = moment().format();
const r0 = await redis.hset(keys.redis.SRS_DVR_PATTERNS, m3u8_url, JSON.stringify({dvr, update}));
const r1 = await redis.hset(keys.redis.SRS_VOD_PATTERNS, m3u8_url, JSON.stringify({vod, update}));
const r2 = await redis.hset(keys.redis.SRS_RECORD_PATTERNS, m3u8_url, JSON.stringify({record, update}));

console.log(`srs hooks ok, dvr=${dvrAll}/${dvr}/${r0}, vod=${vodAll}/${vod}/${r1}, ${JSON.stringify(ctx.request.body)}`);
console.log(`srs hooks ok, dvr=${dvrAll}/${dvr}/${r0}, vod=${vodAll}/${vod}/${r1}, record=${recordAll}/${record}/${r2}, ${JSON.stringify(ctx.request.body)}`);
ctx.body = utils.asResponse(0);
});

Expand Down
3 changes: 2 additions & 1 deletion hooks/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const BodyParser = require('koa-bodyparser');
const hooks = require('./hooks');
const pkg = require('./package.json');
const hls = require('./hls');
const record = require('./record');
const dvr = require('./dvr');
const vod = require('./vod');
const manager = require('./manager');
Expand All @@ -42,7 +43,7 @@ app.use(async (ctx, next) => {

const router = new Router();

dvr.handle(vod.handle(hls.handle(hooks.handle(router))));
record.handle(dvr.handle(vod.handle(hls.handle(hooks.handle(router)))));

router.all('/terraform/v1/hooks/versions', async (ctx) => {
ctx.body = utils.asResponse(0, {version: pkg.version});
Expand Down
11 changes: 7 additions & 4 deletions hooks/m3u8Generator.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ const path = require('path');

// See https://github.com/ossrs/srs/wiki/v4_EN_DeliveryHLS#vodm3u8
// See https://developer.apple.com/documentation/http_live_streaming/example_playlists_for_http_live_streaming/video_on_demand_playlist_construction
exports.buildVodM3u8 = (metadataObj, absUrl, domain) => {
exports.buildVodM3u8 = (metadataObj, absUrl, domain, useKey, prefix) => {
if (!metadataObj) throw new Error('no object');
if (!metadataObj.files) throw new Error('no files');
if (!metadataObj.bucket) throw new Error('no bucket');
if (!metadataObj.region) throw new Error('no region');
if (absUrl && !metadataObj.bucket) throw new Error('no bucket');
if (absUrl && !metadataObj.region) throw new Error('no region');

const duration = metadataObj.files.reduce((p, c) => Math.max(p, c.duration || 0), 0);
const m3u8 = [
Expand Down Expand Up @@ -36,7 +36,10 @@ exports.buildVodM3u8 = (metadataObj, absUrl, domain) => {
desc.push(new URL(e.key, `https://${metadataObj.bucket}.cos.${metadataObj.region}.myqcloud.com`).href);
}
} else {
desc.push(`${e.tsid}.ts`);
desc.push([
...(prefix ? [prefix] : []),
...(useKey ? [e.key] : `${e.tsid}.ts`),
].join('/'));
}

return desc;
Expand Down
23 changes: 22 additions & 1 deletion hooks/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const { Worker } = require("worker_threads");

let dvrWorker, vodWorker;
let dvrWorker, vodWorker, recordWorker;

exports.run = async () => {
// The DVR worker, for cloud storage.
Expand Down Expand Up @@ -42,13 +42,34 @@ exports.run = async () => {
resolve();
});
});

// The Record worker, for cloud storage.
new Promise((resolve, reject) => {
recordWorker = new Worker("./recordWorker.js");

recordWorker.on('message', (msg) => {
console.log('Thread #manager:', msg);
});

recordWorker.on('error', reject);

recordWorker.on('exit', (code) => {
console.log(`Thread #manager: exit with ${code}`);
if (code !== 0) {
return reject(new Error(`Thread #manager: stopped with exit code ${code}`));
}
resolve();
});
});
};

exports.postMessage = (msg) => {
if (msg.action === 'on_dvr_file') {
dvrWorker?.postMessage(msg);
} else if (msg.action === 'on_vod_file') {
vodWorker?.postMessage(msg);
} else if (msg.action === 'on_record_file') {
recordWorker?.postMessage(msg);
} else {
console.error(`Thread #manager: Ignore message ${JSON.stringify(msg)}`);
}
Expand Down
1 change: 1 addition & 0 deletions hooks/record
124 changes: 124 additions & 0 deletions hooks/record.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
'use strict';

// For components in docker, connect by host.
const config = {
redis:{
host: process.env.NODE_ENV === 'development' ? 'localhost' : (process.env.REDIS_HOST || 'mgmt.srs.local'),
port: process.env.REDIS_PORT || 6379,
password: process.env.REDIS_PASSWORD || '',
},
};

const utils = require('js-core/utils');
const errs = require('js-core/errs');
const jwt = require('jsonwebtoken');
const ioredis = require('ioredis');
const redis = require('js-core/redis').create({config: config.redis, redis: ioredis});
const keys = require('js-core/keys');
const m3u8Generator = require('./m3u8Generator');
const fs = require("fs");

exports.handle = (router) => {
// Query the record patterns.
router.all('/terraform/v1/hooks/record/query', async (ctx) => {
const {token} = ctx.request.body;

const apiSecret = await utils.apiSecret(redis);
const decoded = await utils.verifyToken(jwt, token, apiSecret);

const all = await redis.hget(keys.redis.SRS_RECORD_PATTERNS, 'all');
const home = '/usr/local/srs-cloud/mgmt/containers/data/record';

console.log(`record query ok, home=${home}, all=${all}, decoded=${JSON.stringify(decoded)}, token=${token.length}B`);
ctx.body = utils.asResponse(0, {
all: all === 'true',
home,
});
});

// Setup the record patterns.
router.all('/terraform/v1/hooks/record/apply', async (ctx) => {
const {token, all} = ctx.request.body;

const apiSecret = await utils.apiSecret(redis);
const decoded = await utils.verifyToken(jwt, token, apiSecret);

if (all !== true && all !== false) throw utils.asError(errs.sys.invalid, errs.status.args, `invalid all=${all}`);

const r0 = await redis.hset(keys.redis.SRS_RECORD_PATTERNS, 'all', all);

console.log(`record apply ok, all=${all}, r0=${r0}, decoded=${JSON.stringify(decoded)}, token=${token.length}B`);
ctx.body = utils.asResponse(0);
});

// List the record files.
router.all('/terraform/v1/hooks/record/files', async (ctx) => {
const {token} = ctx.request.body;

const apiSecret = await utils.apiSecret(redis);
const decoded = await utils.verifyToken(jwt, token, apiSecret);

const files = [];
const [cursor, fileKVs] = await redis.hscan(keys.redis.SRS_RECORD_M3U8_METADATA, 0, '*', 100);
for (let i = 0; i < fileKVs.length; i += 2) {
const file = fileKVs[i + 1];
file && files.push(JSON.parse(file));
}

const r0 = files.map(e => {
return {
uuid: e.uuid,
vhost: e.vhost,
app: e.app,
stream: e.stream,
progress: e.progress,
update: e.update,
nn: e.files.length,
duration: e.files.reduce((p, c) => p + (c.duration || 0), 0),
size: e.files.reduce((p, c) => p + (c.size || 0), 0),
};
});

console.log(`record files ok, cursor=${cursor}, files=${files.length}, decoded=${JSON.stringify(decoded)}, token=${token.length}B`);
ctx.body = utils.asResponse(0, r0);
});

// Generate m3u8 to play.
const handleM3u8 = async (ctx) => {
const {uuid} = ctx.params;
if (!uuid) throw utils.asError(errs.sys.empty, errs.status.args, `no param uuid`);

const metadata = await redis.hget(keys.redis.SRS_RECORD_M3U8_METADATA, uuid);
if (!metadata) throw utils.asError(errs.sys.invalid, errs.status.args, `no hls for uuid=${uuid}`);

const metadataObj = JSON.parse(metadata);
const [contentType, m3u8Body, duration] = m3u8Generator.buildVodM3u8(
metadataObj, false, null, true, '/terraform/v1/hooks/record/hls',
);
console.log(`record generate m3u8 ok, uuid=${uuid}, duration=${duration}`);

ctx.type = contentType;
ctx.body = m3u8Body;
};
router.all('/terraform/v1/hooks/record/hls/:uuid.m3u8', handleM3u8);
router.all('/terraform/v1/hooks/record/hls/:uuid/index.m3u8', handleM3u8);

// Serve ts to play.
router.all('/terraform/v1/hooks/record/hls/:dir/:m3u8/:uuid.ts', async (ctx) => {
const {dir, m3u8, uuid} = ctx.params;
if (!dir) throw utils.asError(errs.sys.empty, errs.status.args, `no param dir`);
if (!m3u8) throw utils.asError(errs.sys.empty, errs.status.args, `no param m3u8`);
if (!uuid) throw utils.asError(errs.sys.empty, errs.status.args, `no param uuid`);

const tsfile = `${dir}/${m3u8}/${uuid}.ts`;
if ((!fs.existsSync(tsfile))) {
throw utils.asError(errs.sys.invalid, errs.status.not, `no ts file ${tsfile}`)
}

ctx.type = 'application/vnd.apple.mpegurl';
ctx.body = fs.readFileSync(tsfile);
});

return router;
};

Loading

0 comments on commit 9d0122a

Please sign in to comment.