diff --git a/.gitignore b/.gitignore index 3b711891..1fbb8add 100644 --- a/.gitignore +++ b/.gitignore @@ -113,6 +113,7 @@ mgmt/containers/etc/letsencrypt/* mgmt/containers/var/lib/letsencrypt/* mgmt/containers/var/log/letsencrypt/* mgmt/containers/objs/nginx/html/* +mgmt/containers/data/record/* mgmt/containers/data/dvr/* mgmt/containers/data/vod/* mgmt/containers/data/redis/* diff --git a/README.md b/README.md index 3b234537..89750a8d 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ A lightweight open-source video cloud based on Nodejs, SRS, FFmpeg, WebRTC, etc. - [x] [Support WordPress Plugin](https://mp.weixin.qq.com/s/YjTkcJLkErMcZYHIjzsW_w) or [here](https://wordpress.org/plugins/srs-player). - [x] [Support Typecho Plugin](https://github.com/ossrs/Typecho-Plugin-SrsPlayer). - [x] [Support aaPanel to install on any linux](https://github.com/ossrs/srs-cloud/issues/29). +- [x] [Support DVR to local disk](https://github.com/ossrs/srs-cloud/issues/42). Other more use scenarios is on the way, please read [this post](https://github.com/ossrs/srs/issues/2856#lighthouse). @@ -146,6 +147,7 @@ Market: * `/terraform/v1/hooks/srs/secret/update` Hooks: Update the secret to generate stream URL. * `/terraform/v1/hooks/srs/secret/disable` Hooks: Disable the secret for authentication. * `/terraform/v1/hooks/srs/hls` Hooks: Handle the `on_hls` event. +* `/terraform/v1/hooks/record/query` Hooks: Query the Record pattern. * `/terraform/v1/hooks/dvr/apply` Hooks: Apply the DVR pattern. * `/terraform/v1/hooks/dvr/query` Hooks: Query the DVR pattern. * `/terraform/v1/hooks/dvr/files` Hooks: List the DVR files. diff --git a/hooks/dvrWorker.js b/hooks/dvrWorker.js index 8068e953..fd7d6141 100644 --- a/hooks/dvrWorker.js +++ b/hooks/dvrWorker.js @@ -58,11 +58,17 @@ async function handleMessage(msg) { const m3u8 = await redis.hget(keys.redis.SRS_DVR_M3U8_ACTIVE, m3u8_url); const m3u8Obj = m3u8 ? JSON.parse(m3u8) : {update: null, uuid: uuidv4()}; m3u8Obj.update = moment().format(); - const r0 = await redis.hset(keys.redis.SRS_DVR_M3U8_ACTIVE, m3u8_url, JSON.stringify(m3u8Obj)); // Append ts files to local m3u8 object. const local = await redis.hget(keys.redis.SRS_DVR_M3U8_LOCAL, m3u8_url); - const localObj = local ? JSON.parse(local) : {nn: 0, update: null, done: null, uuid: m3u8Obj.uuid, uuids: [], files: []}; + const localObj = local ? JSON.parse(local) : { + nn: 0, + update: null, + done: null, + uuid: m3u8Obj.uuid, + uuids: [], + files: [], + }; if (!local || localObj.uuid !== m3u8Obj.uuid) { localObj.done = null; localObj.uuid = m3u8Obj.uuid; @@ -72,7 +78,10 @@ async function handleMessage(msg) { localObj.files.push({m3u8_url, tsid, tsfile, ...msg}); localObj.nn = localObj.files.length; localObj.update = moment().format(); + + // We update the local m3u8 object, then update the active m3u8 stream list, to make sure the localObj is ready. const r1 = await redis.hset(keys.redis.SRS_DVR_M3U8_LOCAL, m3u8_url, JSON.stringify(localObj)); + const r0 = await redis.hset(keys.redis.SRS_DVR_M3U8_ACTIVE, m3u8_url, JSON.stringify(m3u8Obj)); console.log(`Thread #dvrWorker: local dvr task m3u8=${m3u8_url}, files=${localObj.files.length}, uuid=${m3u8Obj.uuid}, file=${file}, tsfile=${tsfile}, duration=${duration}, seqno=${seqno}, r0=${r0}, r1=${r1}`); } diff --git a/hooks/hls.js b/hooks/hls.js index 0c69a3fa..4f5e1eb3 100644 --- a/hooks/hls.js +++ b/hooks/hls.js @@ -49,11 +49,23 @@ exports.handle = (router) => { }); } + // Create a Record task if enabled. + const recordAll = await redis.hget(keys.redis.SRS_RECORD_PATTERNS, 'all'); + let record = 'ignore'; + if (recordAll === 'true') { + record = 'task_created'; + console.log(`create record task file=${file}, duration=${duration}, seqno=${seqno}, m3u8_url=${m3u8_url}, url=${url}`); + manager.postMessage({ + action: 'on_record_file', file, duration, seqno, m3u8_url, url, params: ctx.request.body, + }); + } + const update = moment().format(); const r0 = await redis.hset(keys.redis.SRS_DVR_PATTERNS, m3u8_url, JSON.stringify({dvr, update})); const r1 = await redis.hset(keys.redis.SRS_VOD_PATTERNS, m3u8_url, JSON.stringify({vod, update})); + const r2 = await redis.hset(keys.redis.SRS_RECORD_PATTERNS, m3u8_url, JSON.stringify({record, update})); - console.log(`srs hooks ok, dvr=${dvrAll}/${dvr}/${r0}, vod=${vodAll}/${vod}/${r1}, ${JSON.stringify(ctx.request.body)}`); + console.log(`srs hooks ok, dvr=${dvrAll}/${dvr}/${r0}, vod=${vodAll}/${vod}/${r1}, record=${recordAll}/${record}/${r2}, ${JSON.stringify(ctx.request.body)}`); ctx.body = utils.asResponse(0); }); diff --git a/hooks/index.js b/hooks/index.js index 7fd516b3..da926364 100644 --- a/hooks/index.js +++ b/hooks/index.js @@ -18,6 +18,7 @@ const BodyParser = require('koa-bodyparser'); const hooks = require('./hooks'); const pkg = require('./package.json'); const hls = require('./hls'); +const record = require('./record'); const dvr = require('./dvr'); const vod = require('./vod'); const manager = require('./manager'); @@ -42,7 +43,7 @@ app.use(async (ctx, next) => { const router = new Router(); -dvr.handle(vod.handle(hls.handle(hooks.handle(router)))); +record.handle(dvr.handle(vod.handle(hls.handle(hooks.handle(router))))); router.all('/terraform/v1/hooks/versions', async (ctx) => { ctx.body = utils.asResponse(0, {version: pkg.version}); diff --git a/hooks/m3u8Generator.js b/hooks/m3u8Generator.js index 08332d2d..cdde78ea 100644 --- a/hooks/m3u8Generator.js +++ b/hooks/m3u8Generator.js @@ -4,11 +4,11 @@ const path = require('path'); // See https://github.com/ossrs/srs/wiki/v4_EN_DeliveryHLS#vodm3u8 // See https://developer.apple.com/documentation/http_live_streaming/example_playlists_for_http_live_streaming/video_on_demand_playlist_construction -exports.buildVodM3u8 = (metadataObj, absUrl, domain) => { +exports.buildVodM3u8 = (metadataObj, absUrl, domain, useKey, prefix) => { if (!metadataObj) throw new Error('no object'); if (!metadataObj.files) throw new Error('no files'); - if (!metadataObj.bucket) throw new Error('no bucket'); - if (!metadataObj.region) throw new Error('no region'); + if (absUrl && !metadataObj.bucket) throw new Error('no bucket'); + if (absUrl && !metadataObj.region) throw new Error('no region'); const duration = metadataObj.files.reduce((p, c) => Math.max(p, c.duration || 0), 0); const m3u8 = [ @@ -36,7 +36,10 @@ exports.buildVodM3u8 = (metadataObj, absUrl, domain) => { desc.push(new URL(e.key, `https://${metadataObj.bucket}.cos.${metadataObj.region}.myqcloud.com`).href); } } else { - desc.push(`${e.tsid}.ts`); + desc.push([ + ...(prefix ? [prefix] : []), + ...(useKey ? [e.key] : `${e.tsid}.ts`), + ].join('/')); } return desc; diff --git a/hooks/manager.js b/hooks/manager.js index e58497c5..cf3dd71b 100644 --- a/hooks/manager.js +++ b/hooks/manager.js @@ -2,7 +2,7 @@ const { Worker } = require("worker_threads"); -let dvrWorker, vodWorker; +let dvrWorker, vodWorker, recordWorker; exports.run = async () => { // The DVR worker, for cloud storage. @@ -42,6 +42,25 @@ exports.run = async () => { resolve(); }); }); + + // The Record worker, for cloud storage. + new Promise((resolve, reject) => { + recordWorker = new Worker("./recordWorker.js"); + + recordWorker.on('message', (msg) => { + console.log('Thread #manager:', msg); + }); + + recordWorker.on('error', reject); + + recordWorker.on('exit', (code) => { + console.log(`Thread #manager: exit with ${code}`); + if (code !== 0) { + return reject(new Error(`Thread #manager: stopped with exit code ${code}`)); + } + resolve(); + }); + }); }; exports.postMessage = (msg) => { @@ -49,6 +68,8 @@ exports.postMessage = (msg) => { dvrWorker?.postMessage(msg); } else if (msg.action === 'on_vod_file') { vodWorker?.postMessage(msg); + } else if (msg.action === 'on_record_file') { + recordWorker?.postMessage(msg); } else { console.error(`Thread #manager: Ignore message ${JSON.stringify(msg)}`); } diff --git a/hooks/record b/hooks/record new file mode 120000 index 00000000..5ffb7c66 --- /dev/null +++ b/hooks/record @@ -0,0 +1 @@ +../mgmt/containers/data/record \ No newline at end of file diff --git a/hooks/record.js b/hooks/record.js new file mode 100644 index 00000000..da0a3a23 --- /dev/null +++ b/hooks/record.js @@ -0,0 +1,124 @@ +'use strict'; + +// For components in docker, connect by host. +const config = { + redis:{ + host: process.env.NODE_ENV === 'development' ? 'localhost' : (process.env.REDIS_HOST || 'mgmt.srs.local'), + port: process.env.REDIS_PORT || 6379, + password: process.env.REDIS_PASSWORD || '', + }, +}; + +const utils = require('js-core/utils'); +const errs = require('js-core/errs'); +const jwt = require('jsonwebtoken'); +const ioredis = require('ioredis'); +const redis = require('js-core/redis').create({config: config.redis, redis: ioredis}); +const keys = require('js-core/keys'); +const m3u8Generator = require('./m3u8Generator'); +const fs = require("fs"); + +exports.handle = (router) => { + // Query the record patterns. + router.all('/terraform/v1/hooks/record/query', async (ctx) => { + const {token} = ctx.request.body; + + const apiSecret = await utils.apiSecret(redis); + const decoded = await utils.verifyToken(jwt, token, apiSecret); + + const all = await redis.hget(keys.redis.SRS_RECORD_PATTERNS, 'all'); + const home = '/usr/local/srs-cloud/mgmt/containers/data/record'; + + console.log(`record query ok, home=${home}, all=${all}, decoded=${JSON.stringify(decoded)}, token=${token.length}B`); + ctx.body = utils.asResponse(0, { + all: all === 'true', + home, + }); + }); + + // Setup the record patterns. + router.all('/terraform/v1/hooks/record/apply', async (ctx) => { + const {token, all} = ctx.request.body; + + const apiSecret = await utils.apiSecret(redis); + const decoded = await utils.verifyToken(jwt, token, apiSecret); + + if (all !== true && all !== false) throw utils.asError(errs.sys.invalid, errs.status.args, `invalid all=${all}`); + + const r0 = await redis.hset(keys.redis.SRS_RECORD_PATTERNS, 'all', all); + + console.log(`record apply ok, all=${all}, r0=${r0}, decoded=${JSON.stringify(decoded)}, token=${token.length}B`); + ctx.body = utils.asResponse(0); + }); + + // List the record files. + router.all('/terraform/v1/hooks/record/files', async (ctx) => { + const {token} = ctx.request.body; + + const apiSecret = await utils.apiSecret(redis); + const decoded = await utils.verifyToken(jwt, token, apiSecret); + + const files = []; + const [cursor, fileKVs] = await redis.hscan(keys.redis.SRS_RECORD_M3U8_METADATA, 0, '*', 100); + for (let i = 0; i < fileKVs.length; i += 2) { + const file = fileKVs[i + 1]; + file && files.push(JSON.parse(file)); + } + + const r0 = files.map(e => { + return { + uuid: e.uuid, + vhost: e.vhost, + app: e.app, + stream: e.stream, + progress: e.progress, + update: e.update, + nn: e.files.length, + duration: e.files.reduce((p, c) => p + (c.duration || 0), 0), + size: e.files.reduce((p, c) => p + (c.size || 0), 0), + }; + }); + + console.log(`record files ok, cursor=${cursor}, files=${files.length}, decoded=${JSON.stringify(decoded)}, token=${token.length}B`); + ctx.body = utils.asResponse(0, r0); + }); + + // Generate m3u8 to play. + const handleM3u8 = async (ctx) => { + const {uuid} = ctx.params; + if (!uuid) throw utils.asError(errs.sys.empty, errs.status.args, `no param uuid`); + + const metadata = await redis.hget(keys.redis.SRS_RECORD_M3U8_METADATA, uuid); + if (!metadata) throw utils.asError(errs.sys.invalid, errs.status.args, `no hls for uuid=${uuid}`); + + const metadataObj = JSON.parse(metadata); + const [contentType, m3u8Body, duration] = m3u8Generator.buildVodM3u8( + metadataObj, false, null, true, '/terraform/v1/hooks/record/hls', + ); + console.log(`record generate m3u8 ok, uuid=${uuid}, duration=${duration}`); + + ctx.type = contentType; + ctx.body = m3u8Body; + }; + router.all('/terraform/v1/hooks/record/hls/:uuid.m3u8', handleM3u8); + router.all('/terraform/v1/hooks/record/hls/:uuid/index.m3u8', handleM3u8); + + // Serve ts to play. + router.all('/terraform/v1/hooks/record/hls/:dir/:m3u8/:uuid.ts', async (ctx) => { + const {dir, m3u8, uuid} = ctx.params; + if (!dir) throw utils.asError(errs.sys.empty, errs.status.args, `no param dir`); + if (!m3u8) throw utils.asError(errs.sys.empty, errs.status.args, `no param m3u8`); + if (!uuid) throw utils.asError(errs.sys.empty, errs.status.args, `no param uuid`); + + const tsfile = `${dir}/${m3u8}/${uuid}.ts`; + if ((!fs.existsSync(tsfile))) { + throw utils.asError(errs.sys.invalid, errs.status.not, `no ts file ${tsfile}`) + } + + ctx.type = 'application/vnd.apple.mpegurl'; + ctx.body = fs.readFileSync(tsfile); + }); + + return router; +}; + diff --git a/hooks/recordWorker.js b/hooks/recordWorker.js new file mode 100644 index 00000000..3836d8a6 --- /dev/null +++ b/hooks/recordWorker.js @@ -0,0 +1,251 @@ +'use strict'; + +// For components in docker, connect by host. +const config = { + redis:{ + host: process.env.NODE_ENV === 'development' ? 'localhost' : (process.env.REDIS_HOST || 'mgmt.srs.local'), + port: process.env.REDIS_PORT || 6379, + password: process.env.REDIS_PASSWORD || '', + }, +}; + +const { isMainThread, parentPort } = require("worker_threads"); +const util = require('util'); +const execFile = util.promisify(require('child_process').execFile); +const { v4: uuidv4 } = require('uuid'); +const ioredis = require('ioredis'); +const redis = require('js-core/redis').create({config: config.redis, redis: ioredis}); +const keys = require('js-core/keys'); +const moment = require('moment'); +const fs = require('fs'); +const m3u8Generator = require('./m3u8Generator'); +const os = require("os"); + +if (!isMainThread) { + threadMain(); +} + +async function threadMain() { + // We must initialize the thread first. + console.log(`Thread #recordWorker: initialize`); + + parentPort.on('message', (msg) => { + handleMessage(msg); + }); + + while (true) { + try { + await doThreadMain(); + } catch (e) { + console.error(`Thread #recordWorker: err`, e); + } finally { + await new Promise(resolve => setTimeout(resolve, 30 * 1000)); + } + } +} + +async function handleMessage(msg) { + const {action, m3u8_url, file, duration, seqno} = msg; + if (action !== 'on_record_file') return console.log(`Thread #recordWorker: ignore ${JSON.stringify(msg)}`); + + // Copy the ts file to temporary cache dir. + const tsid = uuidv4(); + const tsfile = `record/${tsid}.ts`; + // Always use execFile when params contains user inputs, see https://auth0.com/blog/preventing-command-injection-attacks-in-node-js-apps/ + await execFile('cp', ['-f', file, tsfile]); + + // Create or update active m3u8 object, for worker to scan. + const m3u8 = await redis.hget(keys.redis.SRS_RECORD_M3U8_ACTIVE, m3u8_url); + const m3u8Obj = m3u8 ? JSON.parse(m3u8) : {update: null, uuid: uuidv4()}; + m3u8Obj.update = moment().format(); + + // Append ts files to local m3u8 object. + const local = await redis.hget(keys.redis.SRS_RECORD_M3U8_LOCAL, m3u8_url); + const localObj = local ? JSON.parse(local) : { + nn: 0, + update: null, + done: null, + uuid: m3u8Obj.uuid, + uuids: [], + files: [], + }; + if (!local || localObj.uuid !== m3u8Obj.uuid) { + localObj.done = null; + localObj.uuid = m3u8Obj.uuid; + localObj.uuids.push(m3u8Obj.uuid); + console.log(`Thread #recordWorker: local start new m3u8=${m3u8_url}, uuid=${m3u8Obj.uuid}, uuids=${localObj.uuids.length}`); + } + localObj.files.push({m3u8_url, tsid, tsfile, ...msg}); + localObj.nn = localObj.files.length; + localObj.update = moment().format(); + + // We update the local m3u8 object, then update the active m3u8 stream list, to make sure the localObj is ready. + const r1 = await redis.hset(keys.redis.SRS_RECORD_M3U8_LOCAL, m3u8_url, JSON.stringify(localObj)); + const r0 = await redis.hset(keys.redis.SRS_RECORD_M3U8_ACTIVE, m3u8_url, JSON.stringify(m3u8Obj)); + console.log(`Thread #recordWorker: local record task m3u8=${m3u8_url}, files=${localObj.files.length}, uuid=${m3u8Obj.uuid}, file=${file}, tsfile=${tsfile}, duration=${duration}, seqno=${seqno}, r0=${r0}, r1=${r1}`); +} + +async function doThreadMain() { + while (true) { + // We query the local active keys from m3u8 status. + const activeKeys = await redis.hkeys(keys.redis.SRS_RECORD_M3U8_ACTIVE); + if (!activeKeys || !activeKeys.length) { + await new Promise(resolve => setTimeout(resolve, 9 * 1000)); + continue; + } + console.log(`Thread #recordWorker: local active keys ${JSON.stringify(activeKeys)}`); + + for (const i in activeKeys) { + // Get the local object by the active key. + const localKey = activeKeys[i]; + const local = await redis.hget(keys.redis.SRS_RECORD_M3U8_LOCAL, localKey); + await handleLocalObject(localKey, local ? JSON.parse(local) : null); + } + + await new Promise(resolve => setTimeout(resolve, 3000)); + } +} + +async function handleLocalObject(localKey, localObj) { + if (!localObj || !localObj.files.length) { + await finishLocalObject(localKey, localObj); + return; + } + + const localFiles = [...localObj.files]; + for (const i in localFiles) { + const localFile = localFiles[i]; + await handleLocalFile(localKey, localObj, localFile); + } + + const metadata = await redis.hget(keys.redis.SRS_RECORD_M3U8_METADATA, localObj.uuid); + const metadataObj = metadata && JSON.parse(metadata); + + console.log(`Thread #recordWorker: Finished files=${localFiles.length}, left=${localObj.files.length}, metadata=${metadataObj?.files?.length}`); +} + +async function handleLocalFile(localKey, localObj, localFile) { + // Ignore file if not exists. + if (!fs.existsSync(localFile.tsfile)) { + await updateLocalObject(localKey, localObj, localFile, null); + console.warn(`Thread #recordWorker: Ignore m3u8=${localKey}, ts=${localFile.url} for TsNotFount`); + return; + } + + const stats = fs.statSync(localFile.tsfile); + const key = `record/${localObj.uuid}/${localFile.tsid}.ts`; + await execFile('mkdir', ['-p', `record/${localObj.uuid}`]); + await execFile('mv', ['-f', localFile.tsfile, key]); + + // Update the metadata for m3u8. + await updateMetadataObject(localKey, localObj, localFile, key, stats); + + // Update the left local local files. + await updateLocalObject(localKey, localObj, localFile, key); + + // Local record, already move the tsfile to key. + console.log(`Thread #recordWorker: Finish local for m3u8=${localKey}, ts=${localFile.url}, as=${key}`); +} + +async function updateMetadataObject(localKey, localObj, localFile, key, stats) { + const metadata = await redis.hget(keys.redis.SRS_RECORD_M3U8_METADATA, localObj.uuid); + const metadataObj = metadata ? JSON.parse(metadata) : { + nn: 0, + update: moment().format(), + uuid: localObj.uuid, + vhost: localFile.params.vhost, + app: localFile.params.app, + stream: localFile.params.stream, + // The Record is progressing, use local m3u8 address to preview or download. + progress: true, + done: null, + m3u8: null, + // The ts files. + files: [], + }; + + // Reduce the uploaded files by uuid. + metadataObj.files = metadataObj.files.filter(e => e.tsid !== localFile.tsid); + metadataObj.files.push({ + key, + tsid: localFile.tsid, + url: localFile.url, + seqno: localFile.seqno, + duration: localFile.duration, + size: stats.size, + }); + metadataObj.nn = metadataObj.files.length; + metadataObj.update = moment().format(); + + await redis.hset(keys.redis.SRS_RECORD_M3U8_METADATA, localObj.uuid, JSON.stringify(metadataObj)); + console.log(`Thread #recordWorker: Update metadata for m3u8=${localKey}, uuid=${localObj.uuid}, files=${metadataObj.nn}`); +} + +// Note that key is optional, logging only. +async function updateLocalObject(localKey, localObj, localFile, key) { + // @remark Note that the local.files might changed by other asyncs, so we must reload it before save it. + const localRef = await redis.hget(keys.redis.SRS_RECORD_M3U8_LOCAL, localKey); + const localRefObj = JSON.parse(localRef); + + // Warning if files changed. + if (localObj.files.length !== localRefObj.files.length) { + console.warn(`Thread #recordWorker: LocalTsFiles changed, m3u8=${localKey}, before=${localObj.files.length}, ref=${localRefObj.files.length}`); + } + + // Filter the left files. + const leftFiles = localRefObj.files.filter(e => e.tsid !== localFile.tsid); + console.log(`Thread #recordWorker: Update local for m3u8=${localKey}, ts=${localFile.url}, as=${key}, before=${localRefObj.files.length}, left=${leftFiles.length}`); + + // Update the local realtime reference object. + localObj.files = localRefObj.files = leftFiles; + localObj.nn = localRefObj.nn = leftFiles.length; + localObj.update = localRefObj.update = moment().format(); + + // Write to redis. + await redis.hset(keys.redis.SRS_RECORD_M3U8_LOCAL, localKey, JSON.stringify(localRefObj)); +} + +async function finishLocalObject(localKey, localObj) { + if (!localObj || !localObj.update) { + await redis.hdel(keys.redis.SRS_RECORD_M3U8_LOCAL, localKey); + return; + } + + // If stream expired, finish the Record. + const expired = moment(localObj.update).add(process.env.NODE_ENV === 'development' ? 30 : 300, 's'); + if (expired.isAfter(moment())) return; + + // Try to finish the m3u8 first. + const duration = await finishM3u8(localKey, localObj); + + // Keep the local status, to allow query all uuids of uploaded. + localObj.done = moment().format(); + await redis.hset(keys.redis.SRS_RECORD_M3U8_LOCAL, localKey, JSON.stringify(localObj)); + console.log(`Thread #recordWorker: Record expired, key=${localKey}, update=${moment(localObj.update).format()}, expired=${expired.format()}, now=${moment().format()}`); + + // Remove the status, to create new m3u8 next Record. + await redis.hdel(keys.redis.SRS_RECORD_M3U8_ACTIVE, localKey); + + console.log(`Thread #recordWorker: Record done, key=${localKey}, duration=${duration}`); +} + +async function finishM3u8(localKey, localObj) { + // Update the metadata, and keep the uploaded. + const metadata = await redis.hget(keys.redis.SRS_RECORD_M3U8_METADATA, localObj.uuid); + if (!metadata) return; + + const metadataObj = metadata && JSON.parse(metadata); + const [contentType, m3u8Body, duration] = m3u8Generator.buildVodM3u8( + metadataObj, false, null, true, '/terraform/v1/hooks/record/hls', + ); + + const key = `record/${localObj.uuid}/index.m3u8`; + fs.writeFileSync(key, m3u8Body); + console.log(`Thread #recordWorker: Record m3u8=${key}, contentType=${contentType}`); + + metadataObj.progress = false; + metadataObj.done = moment().format(); + await redis.hset(keys.redis.SRS_RECORD_M3U8_METADATA, localObj.uuid, JSON.stringify(metadataObj)); + return duration; +} + diff --git a/hooks/vodWorker.js b/hooks/vodWorker.js index 2edd6413..193f1c21 100644 --- a/hooks/vodWorker.js +++ b/hooks/vodWorker.js @@ -61,11 +61,17 @@ async function handleMessage(msg) { const m3u8 = await redis.hget(keys.redis.SRS_VOD_M3U8_ACTIVE, m3u8_url); const m3u8Obj = m3u8 ? JSON.parse(m3u8) : {update: null, uuid: uuidv4()}; m3u8Obj.update = moment().format(); - const r0 = await redis.hset(keys.redis.SRS_VOD_M3U8_ACTIVE, m3u8_url, JSON.stringify(m3u8Obj)); // Append ts files to local m3u8 object. const local = await redis.hget(keys.redis.SRS_VOD_M3U8_LOCAL, m3u8_url); - const localObj = local ? JSON.parse(local) : {nn: 0, update: null, done: null, uuid: m3u8Obj.uuid, uuids: [], files: []}; + const localObj = local ? JSON.parse(local) : { + nn: 0, + update: null, + done: null, + uuid: m3u8Obj.uuid, + uuids: [], + files: [], + }; if (!local || localObj.uuid !== m3u8Obj.uuid) { localObj.done = null; localObj.uuid = m3u8Obj.uuid; @@ -75,7 +81,10 @@ async function handleMessage(msg) { localObj.files.push({m3u8_url, tsid, tsfile, ...msg}); localObj.nn = localObj.files.length; localObj.update = moment().format(); + + // We update the local m3u8 object, then update the active m3u8 stream list, to make sure the localObj is ready. const r1 = await redis.hset(keys.redis.SRS_VOD_M3U8_LOCAL, m3u8_url, JSON.stringify(localObj)); + const r0 = await redis.hset(keys.redis.SRS_VOD_M3U8_ACTIVE, m3u8_url, JSON.stringify(m3u8Obj)); console.log(`Thread #vodWorker: local vod task m3u8=${m3u8_url}, files=${localObj.files.length}, uuid=${m3u8Obj.uuid}, file=${file}, tsfile=${tsfile}, duration=${duration}, seqno=${seqno}, r0=${r0}, r1=${r1}`); } diff --git a/js-core/keys.js b/js-core/keys.js index 50184ae6..19db76b9 100644 --- a/js-core/keys.js +++ b/js-core/keys.js @@ -8,6 +8,11 @@ exports.redis = { SRS_TENCENT_CAM: 'SRS_TENCENT_CAM', SRS_TENCENT_COS: 'SRS_TENCENT_COS', SRS_TENCENT_VOD: 'SRS_TENCENT_VOD', + // For local record. + SRS_RECORD_PATTERNS: 'SRS_RECORD_PATTERNS', + SRS_RECORD_M3U8_ACTIVE: 'SRS_RECORD_M3U8_ACTIVE', + SRS_RECORD_M3U8_LOCAL: 'SRS_RECORD_M3U8_LOCAL', + SRS_RECORD_M3U8_METADATA: 'SRS_RECORD_M3U8_METADATA', // For cloud storage. SRS_DVR_PATTERNS: 'SRS_DVR_PATTERNS', SRS_DVR_M3U8_ACTIVE: 'SRS_DVR_M3U8_ACTIVE', diff --git a/mgmt/containers/data/record/.gitkeep b/mgmt/containers/data/record/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/mgmt/system.js b/mgmt/system.js index 1e1c5f14..eee31251 100644 --- a/mgmt/system.js +++ b/mgmt/system.js @@ -391,6 +391,9 @@ const handlers = { ); } + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Build the + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Build the config for NGINX. const confLines = [ diff --git a/platform/ui/src/components/DvrVodStatus.js b/platform/ui/src/components/DvrStatus.js similarity index 67% rename from platform/ui/src/components/DvrVodStatus.js rename to platform/ui/src/components/DvrStatus.js index ad61d3b9..de12a3fd 100644 --- a/platform/ui/src/components/DvrVodStatus.js +++ b/platform/ui/src/components/DvrStatus.js @@ -47,3 +47,28 @@ export default function useDvrVodStatus() { return [dvrStatus, vodStatus]; } +export function useRecordStatus() { + const navigate = useNavigate(); + const [recordStatus, setRecordStatus] = React.useState(); + + React.useEffect(() => { + const token = Token.load(); + axios.post('/terraform/v1/hooks/record/query', { + ...token, + }).then(res => { + console.log(`RecordPattern: Query ok, ${JSON.stringify(res.data.data)}`); + setRecordStatus(res.data.data); + }).catch(e => { + const err = e.response.data; + if (err.code === Errors.auth) { + alert(`Token过期,请重新登录,${err.code}: ${err.data.message}`); + navigate('/routers-logout'); + } else { + alert(`服务器错误,${err.code}: ${err.data.message}`); + } + }); + }, [navigate]); + + return recordStatus; +} + diff --git a/platform/ui/src/pages/Scenario.js b/platform/ui/src/pages/Scenario.js index 5477aac7..93d98d87 100644 --- a/platform/ui/src/pages/Scenario.js +++ b/platform/ui/src/pages/Scenario.js @@ -15,6 +15,7 @@ import {SrsErrorBoundary} from "../components/SrsErrorBoundary"; import ScenarioTutorials from './ScenarioTutorials'; import {useTranslation} from "react-i18next"; import {useSrsLanguage} from "../components/LanguageSwitch"; +import ScenarioRecord from "./ScenarioRecord"; export default function Scenario() { const [searchParams] = useSearchParams(); @@ -92,6 +93,9 @@ function ScenarioImpl({defaultActiveTab}) { { activeTab === 'forward' && } + + { activeTab === 'record' && } + { activeTab === 'dvr' && } diff --git a/platform/ui/src/pages/ScenarioDvr.js b/platform/ui/src/pages/ScenarioDvr.js index 258bdeb0..dfb46f21 100644 --- a/platform/ui/src/pages/ScenarioDvr.js +++ b/platform/ui/src/pages/ScenarioDvr.js @@ -5,7 +5,7 @@ import axios from "axios"; import SetupCamSecret from '../components/SetupCamSecret'; import moment from "moment"; import {TutorialsButton, useTutorials} from "../components/TutorialsButton"; -import useDvrVodStatus from "../components/DvrVodStatus"; +import useDvrVodStatus from "../components/DvrStatus"; import {useErrorHandler} from "react-error-boundary"; import {useTranslation} from "react-i18next"; import {useSrsLanguage} from "../components/LanguageSwitch"; diff --git a/platform/ui/src/pages/ScenarioRecord.js b/platform/ui/src/pages/ScenarioRecord.js new file mode 100644 index 00000000..931442c0 --- /dev/null +++ b/platform/ui/src/pages/ScenarioRecord.js @@ -0,0 +1,205 @@ +import React from "react"; +import {Accordion, Form, Button, Table} from "react-bootstrap"; +import {Token, StreamURL, Clipboard} from "../utils"; +import axios from "axios"; +import moment from "moment"; +import {useRecordStatus} from "../components/DvrStatus"; +import {useErrorHandler} from "react-error-boundary"; +import {useTranslation} from "react-i18next"; +import {useSrsLanguage} from "../components/LanguageSwitch"; +import * as Icon from "react-bootstrap-icons"; + +export default function ScenarioRecord() { + const language = useSrsLanguage(); + return language === 'zh' ? : ; +} + +function ScenarioRecordCn() { + const recordStatus = useRecordStatus(); + const [activeKey, setActiveKey] = React.useState(); + + // We must init the activeKey, because the defaultActiveKey only apply when init for Accordion. + // See https://stackoverflow.com/q/61324259/17679565 + React.useEffect(() => { + if (!recordStatus) return; + + if (recordStatus.all) { + setActiveKey('3'); + } else { + setActiveKey('2'); + } + }, [recordStatus]); + + return ( + <> + { activeKey && } + + ); +} + +function ScenarioRecordImpl({activeKey, defaultApplyAll, recordHome}) { + const [recordAll, setRecordAll] = React.useState(defaultApplyAll); + const [recordFiles, setRecordFiles] = React.useState(); + const handleError = useErrorHandler(); + const {t} = useTranslation(); + + React.useEffect(() => { + const refreshRecordFiles = () => { + const token = Token.load(); + axios.post('/terraform/v1/hooks/record/files', { + ...token, + }).then(res => { + console.log(`Record: Files ok, ${JSON.stringify(res.data.data)}`); + setRecordFiles(res.data.data.map(file => { + const l = window.location; + const schema = l.protocol.replace(':', ''); + const httpPort = l.port || (l.protocol === 'http:' ? 80 : 443); + if (file.progress) { + file.location = `${l.protocol}//${l.host}/terraform/v1/hooks/record/hls/${file.uuid}.m3u8`; + file.preview = `/players/srs_player.html?schema=${schema}&port=${httpPort}&autostart=true&app=terraform/v1/hooks/record/hls&stream=${file.uuid}.m3u8`; + } else { + file.location = `${l.protocol}//${l.host}/terraform/v1/hooks/record/hls/${file.uuid}/index.m3u8`; + file.preview = `/players/srs_player.html?schema=${schema}&port=${httpPort}&autostart=true&app=terraform/v1/hooks/record/hls/${file.uuid}&stream=index.m3u8`; + } + + return { + ...file, + url: StreamURL.build(file.vhost, file.app, file.stream), + update: moment(file.update), + duration: Number(file.duration), + size: Number(file.size / 1024.0 / 1024), + }; + }).sort((a, b) => { + return b.update - a.update; + }).map((file, i) => { + return {...file, i: i + 1}; + })); + }).catch(handleError); + }; + + refreshRecordFiles(); + const timer = setInterval(() => refreshRecordFiles(), 10 * 1000); + return () => clearInterval(timer); + }, [handleError]); + + const setupRecordPattern = (e) => { + e.preventDefault(); + + const token = Token.load(); + axios.post('/terraform/v1/hooks/record/apply', { + ...token, all: !!recordAll, + }).then(res => { + alert('设置录制规则成功'); + console.log(`Record: Apply patterns ok, all=${recordAll}`); + }).catch(handleError); + }; + + const copyToClipboard = React.useCallback((e, text) => { + e.preventDefault(); + + Clipboard.copy(text).then(() => { + alert(t('helper.copyOk')); + }).catch((err) => { + alert(`${t('helper.copyFail')} ${err}`); + }); + }, [t]); + + return ( + + + 场景介绍 + +
+ 本地录制,指录制视频流到云SRS的本地磁盘,只要推送到服务器的流都可以录制。 +

+
+

可应用的具体场景包括:

+
    +
  • 直播转点播,录制直播流成为一个HLS文件,存储在云SRS本地磁盘,可以下载
  • +
+

特别注意:

+
    +
  • 如果流的路数特别多,磁盘会很忙,特别是挂共享存储,需要监控磁盘IO和负载。
  • +
  • 虽然本地磁盘足够大,但云存储是真的无限大,而本地磁盘其实还是有限的,需要监控磁盘空间。
  • +
  • 暂时不支持本地文件的管理,比如删除和清理等。
  • +
+

使用说明:

+
    +
  • 具体使用步骤,请根据下面引导操作
  • +
+
+
+ + 录制文件夹 + + 保存路径: {recordHome}   +
+ copyToClipboard(e, recordHome)} /> +
+
+
+ + 设置录制规则 + +
+ + setRecordAll(!recordAll)} /> + + +
+
+
+ + 录制任务列表 + + { + recordFiles?.length ? ( + + + + + + + + + + + + + + + + { + recordFiles?.map(file => { + return + + + + + + + + + + ; + }) + } + +
#状态更新时间媒体流时长大小切片地址预览
{file.i}{file.progress ? '录制中' : '已完成'}{`${file.update.format('YYYY-MM-DD HH:mm:ss')}`}{file.url}{`${file.duration.toFixed(1)}`}秒{`${file.size.toFixed(1)}`}MB{file.nn} copyToClipboard(e, file.location)} target='_blank' rel='noreferrer'>复制预览
+ ) : '' + } + {!recordFiles?.length ? '没有流。请开启录制并推流后,等待大约60秒左右,录制列表会自动更新' : ''} +
+
+
+ ); +} + +function ScenarioRecordEn() { + return ( + On the way... + ); +} + diff --git a/platform/ui/src/pages/ScenarioVod.js b/platform/ui/src/pages/ScenarioVod.js index 9a01c039..b4db7d46 100644 --- a/platform/ui/src/pages/ScenarioVod.js +++ b/platform/ui/src/pages/ScenarioVod.js @@ -3,7 +3,7 @@ import {Accordion, Form, Button, Table} from "react-bootstrap"; import {Token, StreamURL, Clipboard} from "../utils"; import axios from "axios"; import SetupCamSecret from '../components/SetupCamSecret'; -import useDvrVodStatus from "../components/DvrVodStatus"; +import useDvrVodStatus from "../components/DvrStatus"; import moment from "moment"; import {TutorialsButton, useTutorials} from "../components/TutorialsButton"; import {useErrorHandler} from "react-error-boundary"; diff --git a/platform/ui/src/resources/locale.json b/platform/ui/src/resources/locale.json index 2b276fa3..e0355f7f 100644 --- a/platform/ui/src/resources/locale.json +++ b/platform/ui/src/resources/locale.json @@ -31,6 +31,7 @@ "live": "私人直播间", "srt": "超清实时直播", "restream": "多平台转播", + "record": "本地录制", "dvr": "云录制", "vod": "云点播", "code": "源代码" @@ -211,6 +212,7 @@ "live": "Streaming", "srt": "Low Latency", "restream": "Restreaming", + "record": "Record", "dvr": "DVR", "vod": "VoD", "code": "Source"