Skip to content

Commit

Permalink
Merge pull request #101 from Kevinjil/fix/restream
Browse files Browse the repository at this point in the history
Fix restream
  • Loading branch information
Kevinjil authored May 16, 2024
2 parents 877d523 + 3e80cc5 commit 0780178
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 42 deletions.
26 changes: 19 additions & 7 deletions Jellyfin.Xtream/LiveTvService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public async Task<IEnumerable<ChannelInfo>> GetChannelsAsync(CancellationToken c
ParsedName parsed = StreamService.ParseName(channel.Name);
items.Add(new ChannelInfo()
{
Id = channel.StreamId.ToString(CultureInfo.InvariantCulture),
Id = StreamService.ToGuid(StreamService.LiveTvPrefix, channel.StreamId, 0, 0).ToString(),
Number = channel.Num.ToString(CultureInfo.InvariantCulture),
ImageUrl = channel.StreamIcon,
Name = parsed.Title,
Expand Down Expand Up @@ -168,6 +168,13 @@ public Task<SeriesTimerInfo> GetNewTimerDefaultsAsync(CancellationToken cancella
/// <inheritdoc />
public async Task<IEnumerable<ProgramInfo>> GetProgramsAsync(string channelId, DateTime startDateUtc, DateTime endDateUtc, CancellationToken cancellationToken)
{
Guid guid = Guid.Parse(channelId);
StreamService.FromGuid(guid, out int prefix, out int streamId, out int _, out int _);
if (prefix != StreamService.LiveTvPrefix)
{
throw new ArgumentException("Unsupported channel");
}

string key = $"xtream-epg-{channelId}";
ICollection<ProgramInfo>? items = null;
if (memoryCache.TryGetValue(key, out ICollection<ProgramInfo>? o))
Expand All @@ -180,13 +187,12 @@ public async Task<IEnumerable<ProgramInfo>> GetProgramsAsync(string channelId, D
Plugin plugin = Plugin.Instance;
using (XtreamClient client = new XtreamClient())
{
int streamId = int.Parse(channelId, CultureInfo.InvariantCulture);
EpgListings epgs = await client.GetEpgInfoAsync(plugin.Creds, streamId, cancellationToken).ConfigureAwait(false);
foreach (EpgInfo epg in epgs.Listings)
{
items.Add(new ProgramInfo()
{
Id = $"epg-{epg.Id}",
Id = StreamService.ToGuid(StreamService.EpgPrefix, streamId, epg.Id, 0).ToString(),
ChannelId = channelId,
StartDate = epg.Start,
EndDate = epg.End,
Expand All @@ -213,15 +219,21 @@ public Task ResetTuner(string id, CancellationToken cancellationToken)
/// <inheritdoc />
public Task<ILiveStream> GetChannelStreamWithDirectStreamProvider(string channelId, string streamId, List<ILiveStream> currentLiveStreams, CancellationToken cancellationToken)
{
ILiveStream? stream = currentLiveStreams.Find(stream => stream.TunerHostId == Restream.TunerHost && stream.MediaSource.Id == channelId);
if (stream != null)
Guid guid = Guid.Parse(channelId);
StreamService.FromGuid(guid, out int prefix, out int channel, out int _, out int _);
if (prefix != StreamService.LiveTvPrefix)
{
return Task.FromResult(stream);
throw new ArgumentException("Unsupported channel");
}

Plugin plugin = Plugin.Instance;
int channel = int.Parse(channelId, CultureInfo.InvariantCulture);
MediaSourceInfo mediaSourceInfo = plugin.StreamService.GetMediaSourceInfo(StreamType.Live, channel, restream: true);
ILiveStream? stream = currentLiveStreams.Find(stream => stream.TunerHostId == Restream.TunerHost && stream.MediaSource.Id == mediaSourceInfo.Id);
if (stream != null)
{
return Task.FromResult(stream);
}

stream = new Restream(appHost, httpClientFactory, logger, mediaSourceInfo);
return Task.FromResult(stream);
}
Expand Down
10 changes: 10 additions & 0 deletions Jellyfin.Xtream/Service/StreamService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ public class StreamService
/// </summary>
public const int MediaSourcePrefix = 0x5d774c3d;

/// <summary>
/// The id prefix for Live TV items.
/// </summary>
public const int LiveTvPrefix = 0x5d774c3e;

/// <summary>
/// The id prefix for TV EPG items.
/// </summary>
public const int EpgPrefix = 0x5d774c3f;

private static readonly Regex TagRegex = new Regex(@"\[([^\]]+)\]|\|([^\|]+)\|");

private readonly ILogger logger;
Expand Down
42 changes: 21 additions & 21 deletions Jellyfin.Xtream/Service/WrappedBufferReadStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

using System;
using System.IO;
using System.Threading;

namespace Jellyfin.Xtream.Service;

Expand All @@ -25,9 +26,8 @@ public class WrappedBufferReadStream : Stream
{
private readonly WrappedBufferStream sourceBuffer;

private long position;
private readonly long initialReadHead;
private long readHead;
private long totalBytesRead;

/// <summary>
/// Initializes a new instance of the <see cref="WrappedBufferReadStream"/> class.
Expand All @@ -37,8 +37,7 @@ public WrappedBufferReadStream(WrappedBufferStream sourceBuffer)
{
this.sourceBuffer = sourceBuffer;
this.readHead = sourceBuffer.TotalBytesWritten;
this.totalBytesRead = 0;
this.position = sourceBuffer.Position;
this.initialReadHead = readHead;
}

/// <summary>
Expand All @@ -49,10 +48,13 @@ public WrappedBufferReadStream(WrappedBufferStream sourceBuffer)
/// <summary>
/// Gets the number of bytes that have been written to this stream.
/// </summary>
public long TotalBytesRead { get => totalBytesRead; }
public long TotalBytesRead { get => readHead - initialReadHead; }

/// <inheritdoc />
public override long Position { get => position; set => position = value; }
public override long Position
{
get => readHead % sourceBuffer.BufferSize; set { }
}

/// <inheritdoc />
public override bool CanRead => true;
Expand All @@ -72,6 +74,14 @@ public WrappedBufferReadStream(WrappedBufferStream sourceBuffer)
public override int Read(byte[] buffer, int offset, int count)
{
long gap = sourceBuffer.TotalBytesWritten - readHead;

// We cannot return with 0 bytes read, as that indicates the end of the stream has been reached
while (gap == 0)
{
Thread.Sleep(1);
gap = sourceBuffer.TotalBytesWritten - readHead;
}

if (gap > sourceBuffer.BufferSize)
{
// TODO: design good handling method.
Expand All @@ -81,30 +91,20 @@ public override int Read(byte[] buffer, int offset, int count)
throw new IOException("Reader cannot keep up");
}

// The bytes that still need to be copied.
long remaining = Math.Min(count, sourceBuffer.TotalBytesWritten - readHead);
long remainingOffset = offset;

// The number of bytes that can be copied.
long canCopy = Math.Min(count, gap);
long read = 0;

// Copy inside a loop to simplify wrapping logic.
while (remaining > 0)
while (read < canCopy)
{
// The amount of bytes that we can directly write from the current position without wrapping.
long readable = Math.Min(remaining, sourceBuffer.BufferSize - Position);
long readable = Math.Min(canCopy - read, sourceBuffer.BufferSize - Position);

// Copy the data.
Array.Copy(sourceBuffer.Buffer, Position, buffer, remainingOffset, readable);
remaining -= readable;
remainingOffset += readable;

Array.Copy(sourceBuffer.Buffer, Position, buffer, offset + read, readable);
read += readable;
Position += readable;
readHead += readable;
totalBytesRead += readable;

// We might have to loop the position.
Position %= sourceBuffer.BufferSize;
}

return (int)read;
Expand Down
23 changes: 9 additions & 14 deletions Jellyfin.Xtream/Service/WrappedBufferStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public class WrappedBufferStream : Stream
{
private readonly byte[] sourceBuffer;

private long position;
private long totalBytesWritten;

/// <summary>
Expand Down Expand Up @@ -56,7 +55,10 @@ public WrappedBufferStream(int bufferSize)
public long TotalBytesWritten { get => totalBytesWritten; }

/// <inheritdoc />
public override long Position { get => position; set => position = value; }
public override long Position
{
get => totalBytesWritten % BufferSize; set { }
}

/// <inheritdoc />
public override bool CanRead => false;
Expand All @@ -81,25 +83,18 @@ public override int Read(byte[] buffer, int offset, int count)
/// <inheritdoc />
public override void Write(byte[] buffer, int offset, int count)
{
// The bytes that still need to be copied.
long remaining = count;
long remainingOffset = offset;
long written = 0;

// Copy inside a loop to simplify wrapping logic.
while (remaining > 0)
while (written < count)
{
// The amount of bytes that we can directly write from the current position without wrapping.
long writable = Math.Min(remaining, BufferSize - Position);
long writable = Math.Min(count - written, BufferSize - Position);

// Copy the data.
Array.Copy(buffer, remainingOffset, sourceBuffer, Position, writable);
remaining -= writable;
remainingOffset += writable;
Position += writable;
Array.Copy(buffer, offset + written, sourceBuffer, Position, writable);
written += writable;
totalBytesWritten += writable;

// We might have to wrap the position.
Position %= BufferSize;
}
}

Expand Down

0 comments on commit 0780178

Please sign in to comment.