From 11b671366bab30fac13999b407fda59cf072eded Mon Sep 17 00:00:00 2001 From: VolcanoCookies Date: Sun, 11 Jun 2023 10:17:15 +0200 Subject: [PATCH] feat: relative sequence support for HLS using optional stateful mode --- README.md | 19 ++- src/manifests/handlers/hls/master.ts | 11 +- src/manifests/handlers/hls/media.test.ts | 130 +++++++++++++++++++ src/manifests/handlers/hls/media.ts | 27 +++- src/manifests/utils/configs.test.ts | 54 ++++++++ src/manifests/utils/configs.ts | 32 ++++- src/manifests/utils/hlsManifestUtils.test.ts | 3 +- src/manifests/utils/hlsManifestUtils.ts | 19 ++- src/shared/utils.ts | 44 +++++++ 9 files changed, 325 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 622584c..3600643 100644 --- a/README.md +++ b/README.md @@ -67,14 +67,23 @@ To try it out, go to your favourite HLS/MPEG-DASH video player such as `https:// | `timeout` | Force a timeout for the response of a specific segment request | | `throttle` | Send back the segment at a specified speed of bytes per second | +### Stateful Mode + +By settings the `STATEFUL` env variable to `true`, stateful mode can be enabled, in this mode certain additional features are enabled at the cost of keeping some state in-memory. The state cache TTL ín seconds can be configured with the `TTL` env variable, default of 300 seconds. + +Currently the only feature not available when running in stateless mode is relative sequence numbers on HLS livestreams. + ### Load Manifest url params from AWS SSM parameter store instead + - Create a .env file at the root the of project -- fill it like this : +- fill it like this : + ``` AWS_REGION="eu-central-1" AWS_SSM_PARAM_KEY="/ChaosStreamProxy/Development/UrlParams" LOAD_PARAMS_FROM_AWS_SSM=true ``` + - on AWS SSM, create a parameter with name : /ChaosStreamProxy/Development/UrlParams - add a value for corruptions, for example : &statusCode=[{i:3,code:500},{i:4,code:500}] @@ -92,7 +101,7 @@ Across all coruptions, there are 3 ways to target a segment in a playlist for co 1. `i`: The segment's list index in any Media Playlist, with HLS segments starting at 0 and MPEG-DASH segments starting at 1. For a Media Playlist with 12 segments, `i`=11, would target the last segment for HLS and `i`=12, would target the last segment for MPEG-DASH. 2. `sq`: The segment's Media Sequence Number (**HLS only**). For a Media Playlist with 12 segments, and where `#EXT-X-MEDIA-SEQUENCE` is 100, `sq`=111 would target the last segment. When corrupting a live HLS stream it is recommended to target with `sq`. -3. `rsq`: A relative sequence number, counted from where the live stream is currently at when requesting manifest. (**MPEG-DASH Live only**) +3. `rsq`: A relative sequence number, counted from where the live stream is currently at when requesting manifest. (**HLS SUPPORTED ONLY IN STATEFUL MODE**) Below are configuration JSON object templates for the currently supported corruptions. A query should have its value be an array consisting of any one of these 3 types of items: @@ -133,6 +142,7 @@ Timeout Corruption: ``` Throttle Corruption: + ```typescript { i?: number | "*", // index of target segment in playlist. If "*", then target all segments. (Starts on 0 for HLS / 1 for MPEG-DASH) @@ -143,7 +153,6 @@ Throttle Corruption: } ``` - One can either target a segment through the index parameter, `i`, or the sequence number parameter, `sq`, relative sequence numbers, `rsq`, are translated to sequence numbers, . In the case where one has entered both, the **index parameter** will take precedence. Relative sequence numbers, `rsq`, are translated to sequence numbers, `sq`, and will thus override any provided `sq`. @@ -234,6 +243,7 @@ https://chaos-proxy.prod.eyevinn.technology/api/v2/manifests/dash/proxy-master.m ``` 6. LIVE: Example of MPEG-DASH with a segment download speed limited to 10kB/s on all segments + ``` https://chaos-proxy.prod.eyevinn.technology/api/v2/manifests/dash/proxy-master.mpd?url=https://f53accc45b7aded64ed8085068f31881.egress.mediapackage-vod.eu-north-1.amazonaws.com/out/v1/1c63bf88e2664639a6c293b4d055e6bb/64651f16da554640930b7ce2cd9f758b/66d211307b7d43d3bd515a3bfb654e1c/manifest.mpd&throttle=[{i:*,rate:10000}] ``` @@ -255,6 +265,7 @@ To deploy and update production environment publish a release on GitHub. This wi See [CONTRIBUTING](CONTRIBUTING.md) if you want to contribute to this project. ### Git way-of-working + In the interest of keeping a clean and easy to debug git history, use the following guidelines: - Read [How to Write a Commit Message](https://chris.beams.io/posts/git-commit/). @@ -322,4 +333,4 @@ Eyevinn Technology is an independent consultant firm specialized in video and st At Eyevinn, every software developer consultant has a dedicated budget reserved for open source development and contribution to the open source community. This give us room for innovation, team building and personal competence development. And also gives us as a company a way to contribute back to the open source community. -Want to know more about Eyevinn and how it is to work here? Contact us at work@eyevinn.se! \ No newline at end of file +Want to know more about Eyevinn and how it is to work here? Contact us at work@eyevinn.se! diff --git a/src/manifests/handlers/hls/master.ts b/src/manifests/handlers/hls/master.ts index 834127a..7d315c5 100644 --- a/src/manifests/handlers/hls/master.ts +++ b/src/manifests/handlers/hls/master.ts @@ -5,7 +5,9 @@ import { isValidUrl, parseM3U8Text, refineALBEventQuery, - generateErrorResponse + generateErrorResponse, + STATEFUL, + newState } from '../../../shared/utils'; // To be able to reuse the handlers for AWS lambda function - input should be ALBEvent @@ -40,11 +42,16 @@ export default async function hlsMasterHandler(event: ALBEvent) { }); } + const stateKey = STATEFUL + ? newState({ initialSequenceNumber: undefined }) + : undefined; + const reqQueryParams = new URLSearchParams(query); const manifestUtils = hlsManifestUtils(); const proxyManifest = manifestUtils.createProxyMasterManifest( masterM3U, - reqQueryParams + reqQueryParams, + stateKey ); return { diff --git a/src/manifests/handlers/hls/media.test.ts b/src/manifests/handlers/hls/media.test.ts index 60e4852..e78a4af 100644 --- a/src/manifests/handlers/hls/media.test.ts +++ b/src/manifests/handlers/hls/media.test.ts @@ -245,6 +245,136 @@ https://mock.mock.com/stream/hls/manifest_1_00001.ts expect(response.body).toEqual(expected.body); }); + it('should return 400 when providing relative offset in stateless mode', async () => { + // Arrange + const getMedia = () => { + return new Promise((resolve) => { + const readStream: ReadStream = createReadStream( + path.join( + __dirname, + `../../../testvectors/hls/hls2_multitrack/manifest_1.m3u8` + ) + ); + resolve(readStream); + }); + }; + nock(mockBaseURL).persist().get('/manifest_1.m3u8').reply(200, getMedia, { + 'Content-Type': 'application/vnd.apple.mpegurl;charset=UTF-8', + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Headers': 'Content-Type, Origin' + }); + + const queryParams = { + url: mockMediaURL, + statusCode: '[{rsq:15,code:400}]' + }; + const event: ALBEvent = { + requestContext: { + elb: { + targetGroupArn: '' + } + }, + path: '/stream/hls/manifest.m3u8', + httpMethod: 'GET', + headers: { + accept: 'application/vnd.apple.mpegurl;charset=UTF-8', + 'accept-language': 'en-US,en;q=0.8', + 'content-type': 'text/plain', + host: 'lambda-846800462-us-east-2.elb.amazonaws.com', + 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6)', + 'x-amzn-trace-id': 'Root=1-5bdb40ca-556d8b0c50dc66f0511bf520', + 'x-forwarded-for': '72.21.198.xx', + 'x-forwarded-port': '443', + 'x-forwarded-proto': 'https' + }, + isBase64Encoded: false, + queryStringParameters: queryParams, + body: '' + }; + + // Act + const response = await hlsMediaHandler(event); + + // Assert + const expected: ALBResult = { + statusCode: 400, + headers: { + 'Access-Control-Allow-Headers': 'Content-Type, Origin', + 'Access-Control-Allow-Origin': '*', + 'Content-Type': 'application/json' + }, + body: '{"reason":"Relative sequence numbers on HLS are only supported when proxy is running in stateful mode"}' + }; + expect(response.statusCode).toEqual(expected.statusCode); + expect(response.headers).toEqual(expected.headers); + expect(response.body).toEqual(expected.body); + }); + + it('should return 400 when providing relative offset in stateless mode', async () => { + // Arrange + const getMedia = () => { + return new Promise((resolve) => { + const readStream: ReadStream = createReadStream( + path.join( + __dirname, + `../../../testvectors/hls/hls2_multitrack/manifest_1.m3u8` + ) + ); + resolve(readStream); + }); + }; + nock(mockBaseURL).persist().get('/manifest_1.m3u8').reply(200, getMedia, { + 'Content-Type': 'application/vnd.apple.mpegurl;charset=UTF-8', + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Headers': 'Content-Type, Origin' + }); + + const queryParams = { + url: mockMediaURL, + statusCode: '[{rsq:15,code:400}]' + }; + const event: ALBEvent = { + requestContext: { + elb: { + targetGroupArn: '' + } + }, + path: '/stream/hls/manifest.m3u8', + httpMethod: 'GET', + headers: { + accept: 'application/vnd.apple.mpegurl;charset=UTF-8', + 'accept-language': 'en-US,en;q=0.8', + 'content-type': 'text/plain', + host: 'lambda-846800462-us-east-2.elb.amazonaws.com', + 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6)', + 'x-amzn-trace-id': 'Root=1-5bdb40ca-556d8b0c50dc66f0511bf520', + 'x-forwarded-for': '72.21.198.xx', + 'x-forwarded-port': '443', + 'x-forwarded-proto': 'https' + }, + isBase64Encoded: false, + queryStringParameters: queryParams, + body: '' + }; + + // Act + const response = await hlsMediaHandler(event); + + // Assert + const expected: ALBResult = { + statusCode: 400, + headers: { + 'Access-Control-Allow-Headers': 'Content-Type, Origin', + 'Access-Control-Allow-Origin': '*', + 'Content-Type': 'application/json' + }, + body: '{"reason":"Relative sequence numbers on HLS are only supported when proxy is running in stateful mode"}' + }; + expect(response.statusCode).toEqual(expected.statusCode); + expect(response.headers).toEqual(expected.headers); + expect(response.body).toEqual(expected.body); + }); + //it('should return code 500 on Other Errors, eg M3U8 parser error', async () => {}); }); }); diff --git a/src/manifests/handlers/hls/media.ts b/src/manifests/handlers/hls/media.ts index 0fe858d..a92bae4 100644 --- a/src/manifests/handlers/hls/media.ts +++ b/src/manifests/handlers/hls/media.ts @@ -2,9 +2,12 @@ import fetch, { Response } from 'node-fetch'; import { ALBEvent, ALBResult } from 'aws-lambda'; import { fixUrl, + STATEFUL, generateErrorResponse, + getState, isValidUrl, parseM3U8Text, + putState, refineALBEventQuery } from '../../../shared/utils'; import delaySCC from '../../utils/corruptions/delay'; @@ -58,8 +61,30 @@ export default async function hlsMediaHandler( .register(timeoutSCC) .register(throttleSCC); + const mediaSequence = mediaM3U.get('mediaSequence'); + let mediaSequenceOffset = 0; + if (STATEFUL) { + const stateKey = reqQueryParams.get('state'); + if (stateKey) { + const state = getState(stateKey); + if (state.initialSequenceNumber == undefined) { + putState(stateKey, { + ...state, + initialSequenceNumber: mediaSequence + }); + mediaSequenceOffset = mediaSequence; + } else { + mediaSequenceOffset = state.initialSequenceNumber; + } + } + } + const [error, allMutations, levelMutations] = - configUtils.getAllManifestConfigs(mediaM3U.get('mediaSequence')); + configUtils.getAllManifestConfigs( + mediaSequence, + false, + mediaSequenceOffset + ); if (error) { return generateErrorResponse(error); } diff --git a/src/manifests/utils/configs.test.ts b/src/manifests/utils/configs.test.ts index 85c4dc2..6f173d1 100644 --- a/src/manifests/utils/configs.test.ts +++ b/src/manifests/utils/configs.test.ts @@ -5,6 +5,8 @@ import { CorruptorIndexMap, CorruptorLevelMap } from './configs'; +import statusCodeConfig from './corruptions/statusCode'; +import throttleConfig from './corruptions/throttle'; describe('configs', () => { describe('utils', () => { @@ -87,6 +89,16 @@ describe('configs', () => { }); describe('getAllManifestConfigs', () => { + const env = process.env; + beforeEach(() => { + jest.resetModules(); + process.env = { ...env }; + }); + + afterEach(() => { + process.env = env; + }); + it('should handle matching config with url query params', () => { // Arrange const configs = corruptorConfigUtils( @@ -160,6 +172,48 @@ describe('configs', () => { expect(actualIndex).toEqual(expectedIndex); expect(actualLevel).toEqual(expectedLevel); }); + + it('should handle media sequence offsets', () => { + // Arrange + process.env.STATEFUL = 'true'; + + const configs = corruptorConfigUtils( + new URLSearchParams( + 'statusCode=[{rsq:15,code:400}]&throttle=[{sq:15,rate:1000}]' + ) + ); + + configs.register(statusCodeConfig).register(throttleConfig); + + // Act + + const [err, actual] = configs.getAllManifestConfigs(0, false, 100); + + // Assert + expect(err).toBeNull(); + expect(actual.get(115)).toEqual( + new Map([ + [ + 'statusCode', + { + fields: { code: 400 }, + sq: 115 + } + ] + ]) + ); + expect(actual.get(15)).toEqual( + new Map([ + [ + 'throttle', + { + fields: { rate: 1000 }, + sq: 15 + } + ] + ]) + ); + }); }); describe('getAllSegmentConfigs', () => { diff --git a/src/manifests/utils/configs.ts b/src/manifests/utils/configs.ts index 475e219..d3fd050 100644 --- a/src/manifests/utils/configs.ts +++ b/src/manifests/utils/configs.ts @@ -1,3 +1,4 @@ +import { STATEFUL } from '../../shared/utils'; import { ServiceError, TargetIndex, TargetLevel } from '../../shared/types'; // export type SegmentCorruptorConfigItem = { @@ -52,7 +53,8 @@ export interface CorruptorConfigUtils { */ getAllManifestConfigs: ( mseq?: number, - isDash?: boolean + isDash?: boolean, + mseqOffset?: number ) => [ ServiceError | null, IndexedCorruptorConfigMap | null, @@ -129,7 +131,7 @@ export const corruptorConfigUtils = function ( } return this; }, - getAllManifestConfigs(mseq = 0, isDash = false) { + getAllManifestConfigs(mseq = 0, isDash = false, mseqOffset = 0) { const outputMap = new CorruptorIndexMap(); const levelMap = new CorruptorLevelMap(); const configs = ( @@ -144,12 +146,36 @@ export const corruptorConfigUtils = function ( ); let params = JSON.parse(parsableSearchParam); - // If bitrate is set, filter out segments that doesn't match if (Array.isArray(params)) { + // Check if we are trying to use stateful feature without statefulness enabled + const hasRelativeSequences = params.some( + (param) => param.rsq != undefined + ); + if (hasRelativeSequences && !STATEFUL && !isDash) { + return [ + { + status: 400, + message: + 'Relative sequence numbers on HLS are only supported when proxy is running in stateful mode' + }, + null + ]; + } + + // If bitrate is set, filter out segments that doesn't match params = params.filter( (config) => !config?.br || config?.br === '*' || config?.br === segmentBitrate ); + + // Replace relative sequence numbers with absolute ones + params = params.map((param) => { + if (param.rsq) { + param.sq = Number(param.rsq) + mseqOffset; + delete param.rsq; + } + return param; + }); } const [error, configList] = config.getManifestConfigs(params); diff --git a/src/manifests/utils/hlsManifestUtils.test.ts b/src/manifests/utils/hlsManifestUtils.test.ts index 46913b0..a656042 100644 --- a/src/manifests/utils/hlsManifestUtils.test.ts +++ b/src/manifests/utils/hlsManifestUtils.test.ts @@ -30,7 +30,8 @@ describe('hlsManifestTools', () => { const manifestUtils = hlsManifestUtils(); const proxyManifest: string = manifestUtils.createProxyMasterManifest( masterM3U, - urlSearchParams + urlSearchParams, + undefined ); // Assert diff --git a/src/manifests/utils/hlsManifestUtils.ts b/src/manifests/utils/hlsManifestUtils.ts index 73dae35..badc639 100644 --- a/src/manifests/utils/hlsManifestUtils.ts +++ b/src/manifests/utils/hlsManifestUtils.ts @@ -1,5 +1,9 @@ import { M3U, Manifest } from '../../shared/types'; -import { proxyPathBuilder, segmentUrlParamString } from '../../shared/utils'; +import { + newState, + proxyPathBuilder, + segmentUrlParamString +} from '../../shared/utils'; import { CorruptorConfigMap, IndexedCorruptorConfigMap } from './configs'; import clone from 'clone'; @@ -18,7 +22,8 @@ export interface HLSManifestTools { ) => Manifest; // look def again createProxyMasterManifest: ( originalM3U: M3U, - originalUrlQuery: URLSearchParams + originalUrlQuery: URLSearchParams, + stateKey: string | undefined ) => Manifest; utils: HLSManifestUtils; } @@ -78,7 +83,8 @@ export default function (): HLSManifestTools { utils, createProxyMasterManifest( originalM3U: M3U, - originalUrlQuery: URLSearchParams + originalUrlQuery: URLSearchParams, + stateKey: string | undefined ) { const m3u: M3U = clone(originalM3U); @@ -95,6 +101,9 @@ export default function (): HLSManifestTools { urlQuery.set('level', abrLevel.toString()); abrLevel++; } + if (stateKey) { + urlQuery.set('state', stateKey); + } streamItem.set( 'uri', proxyPathBuilder(currentUri, urlQuery, 'proxy-media.m3u8') @@ -104,12 +113,16 @@ export default function (): HLSManifestTools { // [Audio/Subtitles/IFrame] m3u.items.MediaItem = m3u.items.MediaItem.map((mediaItem) => { + const urlQuery = new URLSearchParams(originalUrlQuery); const currentUri = mediaItem.get('uri'); // #EXT-X-MEDIA URI,is only required with type SUBTITLES, optional for AUDIO and VIDEO if (mediaItem.get('type') !== 'SUBTITLES' && currentUri == undefined) { return mediaItem; } + if (stateKey) { + urlQuery.set('state', stateKey); + } mediaItem.set( 'uri', proxyPathBuilder(currentUri, originalUrlQuery, 'proxy-media.m3u8') diff --git a/src/shared/utils.ts b/src/shared/utils.ts index 1a1caff..1a54dae 100644 --- a/src/shared/utils.ts +++ b/src/shared/utils.ts @@ -17,6 +17,8 @@ import { addSSMUrlParametersToUrl } from './aws.utils'; import dotenv from 'dotenv'; import { Readable } from 'stream'; +import NodeCache from 'node-cache'; +import { randomInt } from 'crypto'; dotenv.config(); const version = process.env.npm_package_version; @@ -296,3 +298,45 @@ export class AppSettings { static loadUrlParametersFromAwsSSM: boolean = process.env.LOAD_PARAMS_FROM_AWS_SSM === 'true'; } + +export const STATEFUL: boolean = process.env.STATEFUL + ? process.env.STATEFUL == 'true' + : false; +export const TTL: number = process.env.TTL ? parseInt(process.env.TTL) : 300; + +const stateCache: NodeCache = STATEFUL + ? new NodeCache({ stdTTL: TTL }) + : undefined; + +type RequestState = { + initialSequenceNumber?: number; +}; + +export function getState(stateKey: string): RequestState | undefined { + if (STATEFUL) return stateCache.get(stateKey); + else return undefined; +} + +export function putState( + key: string, + state: RequestState +): boolean | ServiceError { + if (STATEFUL) return stateCache.set(key, state); + else return { status: 400, message: 'Stateful feature not enabled' }; +} + +const alpha = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'; +function randomStateKey(length: number): string { + let key = ''; + for (let i = 0; i < length; i++) { + key += alpha.charAt(randomInt(0, alpha.length)); + } + return key; +} + +export function newState(state: RequestState): string { + let key = randomStateKey(16); + while (stateCache.get(key) != undefined) key = randomStateKey(16); + stateCache.set(key, state); + return key; +}