Skip to content

Commit

Permalink
fix: Correct the behavior to defer closeSegmentIndex() calls during u…
Browse files Browse the repository at this point in the history
…pdates (shaka-project#7217)

Resolves the issues reported by
shaka-project#7213, which correctly
fixes shaka-project#7156.

The latest comment
shaka-project#7156 (comment)
goes further into detail on the problems of the initial PR.
  • Loading branch information
JulianDomingo committed Aug 28, 2024
1 parent fd1fee7 commit 6b15c46
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 24 deletions.
33 changes: 33 additions & 0 deletions lib/media/streaming_engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ shaka.media.StreamingEngine = class {
this.config_ = null;

/**
<<<<<<< HEAD
* Retains deferred SegmentIndex objects for streams which were switched
* away from during an ongoing fetchAndAppend_().
* They are released and cleared from this map when the next onUpdate_() for
Expand All @@ -85,6 +86,11 @@ shaka.media.StreamingEngine = class {
* Lastly, the operations are stored in arrays in the off chance multiple
* ABR switches happen during an ongoing fetchAndAppend_().
* @private {!Map.<string, !Array.<?shaka.media.SegmentIndex>>}
=======
* Retains a reference to the function used to close SegmentIndex objects
* for streams which were switched away from during an ongoing update_().
* @private {!Map.<string, !function()>}
>>>>>>> 7ba7e618d (fix: Correct the behavior to defer closeSegmentIndex() calls during updates (#7217))
*/
this.deferredCloseSegmentIndex_ = new Map();

Expand Down Expand Up @@ -530,6 +536,7 @@ shaka.media.StreamingEngine = class {
handleDeferredCloseSegmentIndexes_(mediaState) {
for (const [key, value] of this.deferredCloseSegmentIndex_.entries()) {
const streamId = /** @type {string} */ (key);
<<<<<<< HEAD
const segmentIndexArray =
/** @type {!Array.<?shaka.media.SegmentIndex>} */ (value);
if (streamId.includes(mediaState.type)) {
Expand All @@ -538,6 +545,11 @@ shaka.media.StreamingEngine = class {
segmentIndex.release();
}
}
=======
const closeSegmentIndex = /** @type {!function()} */ (value);
if (streamId.includes(mediaState.type)) {
closeSegmentIndex();
>>>>>>> 7ba7e618d (fix: Correct the behavior to defer closeSegmentIndex() calls during updates (#7217))
this.deferredCloseSegmentIndex_.delete(streamId);
}
}
Expand Down Expand Up @@ -619,6 +631,7 @@ shaka.media.StreamingEngine = class {
if (mediaState.performingUpdate) {
const oldStreamTag =
shaka.media.StreamingEngine.logPrefix_(mediaState);
<<<<<<< HEAD
// The ongoing update is still using the old stream's segment
// reference information.
// If we close the old stream now, the update will not complete
Expand All @@ -632,6 +645,18 @@ shaka.media.StreamingEngine = class {
// We switched from stream A -> B -> back to A.
this.deferredCloseSegmentIndex_.get(oldStreamTag).push(
mediaState.stream.segmentIndex);
=======
if (!this.deferredCloseSegmentIndex_.has(oldStreamTag)) {
// The ongoing update is still using the old stream's segment
// reference information.
// If we close the old stream now, the update will not complete
// correctly.
// The next onUpdate_() for this content type will resume the
// closeSegmentIndex() operation for the old stream once the ongoing
// update has finished, then immediately create a new segment index.
this.deferredCloseSegmentIndex_.set(
oldStreamTag, mediaState.stream.closeSegmentIndex);
>>>>>>> 7ba7e618d (fix: Correct the behavior to defer closeSegmentIndex() calls during updates (#7217))
}
} else {
mediaState.stream.closeSegmentIndex();
Expand Down Expand Up @@ -1228,6 +1253,14 @@ shaka.media.StreamingEngine = class {
return;
}

<<<<<<< HEAD
=======
// If stream switches happened during the previous update_() for this
// content type, close out the old streams that were switched away from.
// Even if we had switched away from the active stream 'A' during the
// update_(), e.g. (A -> B -> A), closing 'A' is permissible here since we
// will immediately re-create it in the logic below.
>>>>>>> 7ba7e618d (fix: Correct the behavior to defer closeSegmentIndex() calls during updates (#7217))
this.handleDeferredCloseSegmentIndexes_(mediaState);

// Make sure the segment index exists. If not, create the segment index.
Expand Down
116 changes: 93 additions & 23 deletions test/media/streaming_engine_unit.js
Original file line number Diff line number Diff line change
Expand Up @@ -1145,7 +1145,11 @@ describe('StreamingEngine', () => {
await Util.fakeEventLoop(1);

// Grab a reference to initialVariant's segmentIndex before the switch so
<<<<<<< HEAD
// we can test how it's internal fields change overtime.
=======
// we can test how its internal fields change overtime.
>>>>>>> 7ba7e618d (fix: Correct the behavior to defer closeSegmentIndex() calls during updates (#7217))
const initialVariantSegmentIndex = initialVariant.video.segmentIndex;

// Switch to 'differentVariant' (video-14-%d/audio-15-%d) in the middle of
Expand Down Expand Up @@ -1181,6 +1185,56 @@ describe('StreamingEngine', () => {
expect(initialVariantSegmentIndex.references.length).toBe(0);
});

<<<<<<< HEAD
=======
it('defers old stream cleanup on fast switch during update', async () => {
setupVod();

// Delay the appendBuffer call until later so we are waiting for this to
// finish when we switch.
const p = new shaka.util.PublicPromise();
const old = mediaSourceEngine.appendBuffer;
mediaSourceEngine.appendBuffer =
jasmine.createSpy('appendBuffer')
.and.callFake(async (type, data, reference) => {
await p;
return Util.invokeSpy(old, type, data, reference);
});

streamingEngine.switchVariant(variant, /* clearBuffer= */ true);
await streamingEngine.start();
playing = true;

expect(variant.video.createSegmentIndex).not.toHaveBeenCalled();
await Util.fakeEventLoop(1);
expect(variant.video.createSegmentIndex).toHaveBeenCalledTimes(1);

// Switch from variant A -> B -> A ("fast switch") multiple times.
for (let i = 0; i < 5; i++) {
streamingEngine.switchVariant(
alternateVariant, /* clearBuffer= */ true);
streamingEngine.switchVariant(variant, /* clearBuffer= */ true);
}
// Can resolve now to ensure all the switches happened during the update.
p.resolve();

// Give enough time for the next scheduled update to execute with the
// currently active variant ('variant').
await runTest();

// During the next scheduled update for 'variant', we close all streams
// that were switched away from, regardless of whether it is the active
// stream.
expect(variant.video.closeSegmentIndex).toHaveBeenCalledTimes(1);
expect(alternateVariant.video.closeSegmentIndex).toHaveBeenCalledTimes(1);
// However, we close all the deferred streams right before the check to
// create a new segmentIndex for the currently active stream.
expect(variant.video.createSegmentIndex).toHaveBeenCalledTimes(2);
expect(variant.video.segmentIndex).not.toBe(null);
expect(alternateVariant.video.segmentIndex).toBe(null);
});

>>>>>>> 7ba7e618d (fix: Correct the behavior to defer closeSegmentIndex() calls during updates (#7217))
// See https://github.com/shaka-project/shaka-player/issues/2956
it('works with fast variant switches during update', async () => {
// Delay the appendBuffer call until later so we are waiting for this to
Expand Down Expand Up @@ -2268,9 +2322,11 @@ describe('StreamingEngine', () => {
});

onError.and.callFake((error) => {
expect(error.severity).toBe(shaka.util.Error.Severity.RECOVERABLE);
expect(error.category).toBe(shaka.util.Error.Category.NETWORK);
expect(error.code).toBe(shaka.util.Error.Code.BAD_HTTP_STATUS);
if (error instanceof shaka.util.Error) {
expect(error.severity).toBe(shaka.util.Error.Severity.RECOVERABLE);
expect(error.category).toBe(shaka.util.Error.Category.NETWORK);
expect(error.code).toBe(shaka.util.Error.Code.BAD_HTTP_STATUS);
}
});

disableStream.and.callFake((stream, time) => {
Expand All @@ -2293,6 +2349,7 @@ describe('StreamingEngine', () => {

await runTest();
expect(disableStream).toHaveBeenCalledTimes(1);
expect(onError).toHaveBeenCalled();
});

it('does not temporarily disables stream if not configured to',
Expand Down Expand Up @@ -2344,9 +2401,12 @@ describe('StreamingEngine', () => {
});

onError.and.callFake((error) => {
expect(error.severity).toBe(shaka.util.Error.Severity.RECOVERABLE);
expect(error.category).toBe(shaka.util.Error.Category.NETWORK);
expect(error.code).toBe(shaka.util.Error.Code.SEGMENT_MISSING);
if (error instanceof shaka.util.Error) {
expect(error.severity).toBe(
shaka.util.Error.Severity.RECOVERABLE);
expect(error.category).toBe(shaka.util.Error.Category.NETWORK);
expect(error.code).toBe(shaka.util.Error.Code.SEGMENT_MISSING);
}
});

disableStream.and.callFake((stream, time) => {
Expand All @@ -2369,6 +2429,7 @@ describe('StreamingEngine', () => {

await runTest();
expect(disableStream).toHaveBeenCalledTimes(1);
expect(onError).toHaveBeenCalled();
});

it('throws recoverable error if try to disable stream succeeded',
Expand Down Expand Up @@ -4445,34 +4506,43 @@ describe('StreamingEngine', () => {
* @param {shaka.extern.Stream} alternateStream
*/
function createAlternateSegmentIndex(baseStream, alternateStream) {
const closeSegmentIndexSpy = Util.funcSpy(
/** @type {!function()} */ (alternateStream.closeSegmentIndex));
const createSegmentIndexSpy =
Util.funcSpy(alternateStream.createSegmentIndex);
const altSegmentIndex = new shaka.test.FakeSegmentIndex();

altSegmentIndex.find.and.callFake(
(time) => baseStream.segmentIndex.find(time));
createSegmentIndexSpy.and.callFake(() => {
const altSegmentIndex = new shaka.test.FakeSegmentIndex();

altSegmentIndex.getNumReferences.and.callFake(
() => baseStream.segmentIndex.getNumReferences());
altSegmentIndex.find.and.callFake(
(time) => baseStream.segmentIndex.find(time));

altSegmentIndex.get.and.callFake((pos) => {
const ref = baseStream.segmentIndex.get(pos);
altSegmentIndex.getNumReferences.and.callFake(
() => baseStream.segmentIndex.getNumReferences());

if (ref) {
const altInitUri = ref.initSegmentReference.getUris()[0] + '_alt';
const altSegmentUri = ref.getUris()[0] + '_alt';
altSegmentIndex.get.and.callFake((pos) => {
const ref = baseStream.segmentIndex.get(pos);

ref.initSegmentReference.getUris = () => [altInitUri];
ref.getUris = () => [altSegmentUri];
return ref;
}
if (ref) {
const altInitUri = ref.initSegmentReference.getUris()[0] + '_alt';
const altSegmentUri = ref.getUris()[0] + '_alt';

return null;
});
ref.initSegmentReference.getUris = () => [altInitUri];
ref.getUris = () => [altSegmentUri];
return ref;
}

createSegmentIndexSpy.and.callFake(() => {
return null;
});
alternateStream.segmentIndex = altSegmentIndex;
return Promise.resolve();
});
closeSegmentIndexSpy.and.callFake(() => {
if (alternateStream.segmentIndex) {
alternateStream.segmentIndex.release();
}
alternateStream.segmentIndex = null;
return Promise.resolve();
});
}
});
21 changes: 21 additions & 0 deletions test/test/util/manifest_generator.js
Original file line number Diff line number Diff line change
Expand Up @@ -530,17 +530,35 @@ shaka.test.ManifestGenerator.Stream = class {
}

if (!isPartial) {
const shaka_ = manifest ? manifest.shaka_ : shaka;

/** @type {shaka.media.SegmentIndex} */
this.segmentIndex = shaka_.media.SegmentIndex.forSingleSegment(
/* startTime= */ 0, /* duration= */ 10, ['testUri']);

const create =
jasmine.createSpy('createSegmentIndex').and.callFake(() => {
this.segmentIndex = shaka_.media.SegmentIndex.forSingleSegment(
/* startTime= */ 0, /* duration= */ 10, ['testUri']);
return Promise.resolve();
});
<<<<<<< HEAD
const close =
jasmine.createSpy('closeSegmentIndex').and.callFake(() => {
return Promise.resolve();
});
const shaka_ = manifest ? manifest.shaka_ : shaka;
const segmentIndex = shaka_.media.SegmentIndex.forSingleSegment(
/* startTime= */ 0, /* duration= */ 10, ['testUri']);
=======
const close = jasmine.createSpy('closeSegmentIndex').and.callFake(() => {
if (this.segmentIndex) {
this.segmentIndex.release();
}
this.segmentIndex = null;
return Promise.resolve();
});
>>>>>>> 7ba7e618d (fix: Correct the behavior to defer closeSegmentIndex() calls during updates (#7217))

/** @type {?string} */
this.originalId = null;
Expand All @@ -550,8 +568,11 @@ shaka.test.ManifestGenerator.Stream = class {
this.createSegmentIndex = shaka.test.Util.spyFunc(create);
/** @type {!function()|undefined} */
this.closeSegmentIndex = shaka.test.Util.spyFunc(close);
<<<<<<< HEAD
/** @type {shaka.media.SegmentIndex} */
this.segmentIndex = segmentIndex;
=======
>>>>>>> 7ba7e618d (fix: Correct the behavior to defer closeSegmentIndex() calls during updates (#7217))
/** @type {string} */
this.mimeType = defaultMimeType;
/** @type {string} */
Expand Down
18 changes: 17 additions & 1 deletion test/test/util/streaming_engine_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ shaka.test.StreamingEngineUtil = class {
variant.audio = /** @type {shaka.extern.Stream} */(
shaka.test.StreamingEngineUtil.createMockStream('audio', 1));
if (secondaryAudioVariant) {
variant2.audio = /** @type {shaka.extern.Stream} */(
variant2.audio = /** @type {shaka.extern.Stream} */ (
shaka.test.StreamingEngineUtil.createMockStream('audio', 11));

const ContentType = shaka.util.ManifestParserUtils.ContentType;
Expand All @@ -406,6 +406,13 @@ shaka.test.StreamingEngineUtil = class {
variant2.audio.segmentIndex = segmentIndex;
return Promise.resolve();
});

const closeSegmentIndexSpy = Util.funcSpy(
/** @type {!function()} */ (variant2.audio.closeSegmentIndex));
closeSegmentIndexSpy.and.callFake(() => {
variant2.audio.segmentIndex = null;
return Promise.resolve();
});
}
}

Expand Down Expand Up @@ -452,6 +459,12 @@ shaka.test.StreamingEngineUtil = class {
stream.segmentIndex = segmentIndex;
return Promise.resolve();
});
const closeSegmentIndexSpy = Util.funcSpy(
/** @type {!function()} */ (stream.closeSegmentIndex));
closeSegmentIndexSpy.and.callFake(() => {
stream.segmentIndex = null;
return Promise.resolve();
});
}

if (trickModeVideo) {
Expand Down Expand Up @@ -502,6 +515,7 @@ shaka.test.StreamingEngineUtil = class {
originalId: id.toString(),
groupId: null,
createSegmentIndex: Util.spyFunc(jasmine.createSpy('createSegmentIndex')),
closeSegmentIndex: Util.spyFunc(jasmine.createSpy('closeSegmentIndex')),
segmentIndex: null,
mimeType,
codecs,
Expand Down Expand Up @@ -546,6 +560,7 @@ shaka.test.StreamingEngineUtil = class {
originalId: id.toString(),
groupId: null,
createSegmentIndex: Util.spyFunc(jasmine.createSpy('createSegmentIndex')),
closeSegmentIndex: Util.spyFunc(jasmine.createSpy('closeSegmentIndex')),
segmentIndex: null,
mimeType,
codecs,
Expand Down Expand Up @@ -592,6 +607,7 @@ shaka.test.StreamingEngineUtil = class {
originalId: id.toString(),
groupId: null,
createSegmentIndex: Util.spyFunc(jasmine.createSpy('createSegmentIndex')),
closeSegmentIndex: Util.spyFunc(jasmine.createSpy('closeSegmentIndex')),
segmentIndex: null,
mimeType,
codecs,
Expand Down

0 comments on commit 6b15c46

Please sign in to comment.