Skip to content

Commit

Permalink
refactor: collection of selectors in case they dont all match
Browse files Browse the repository at this point in the history
Also implements the generic endpoint type in the discovery service.
  • Loading branch information
iPromKnight committed Nov 16, 2024
1 parent dceb6cb commit 3bce11e
Show file tree
Hide file tree
Showing 13 changed files with 147 additions and 45 deletions.
1 change: 1 addition & 0 deletions Zilean.sln.DotSettings.user
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003ABoundedChannelFullMode_002Ecs_002Fl_003AC_0021_003FUsers_003FProm3theu5_003FAppData_003FRoaming_003FJetBrains_003FRider2024_002E3_003Fresharper_002Dhost_003FDecompilerCache_003Fdecompiler_003F655a81eeb02548579e1e5404da9d688c22938_003Fbf_003F4afb42f7_003FBoundedChannelFullMode_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003ABulkConfig_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003FUsers_003FProm3theu5_003FAppData_003FRoaming_003FJetBrains_003FRider2024_002E2_003Fresharper_002Dhost_003FSourcesCache_003Fd6b09cbbbc4e7bf228e82618e4e0859c91be3b93407de754227ae1a771ea2a6_003FBulkConfig_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003ACallSiteFactory_002Ecs_002Fl_003AC_0021_003FUsers_003FProm3theu5_003FAppData_003FRoaming_003FJetBrains_003FRider2024_002E3_003Fresharper_002Dhost_003FSourcesCache_003Ffc2027f7e776fc105cddb56b1a25eeb3895b3ae6f3aac854d786e63bd01f75e2_003FCallSiteFactory_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003ACronExpression_002Ecs_002Fl_003AC_0021_003FUsers_003FProm3theu5_003FAppData_003FRoaming_003FJetBrains_003FRider2024_002E3_003Fresharper_002Dhost_003FSourcesCache_003F2c78f2a38e28ff68373887f2c71ed6595fcac3a938221853fc708d5ef51f739_003FCronExpression_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
Expand Down
1 change: 1 addition & 0 deletions src/Zilean.Database/Services/ITorrentInfoService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ public interface ITorrentInfoService
Task StoreTorrentInfo(List<TorrentInfo> torrents, int batchSize = 10000);
Task<TorrentInfo[]> SearchForTorrentInfoByOnlyTitle(string query);
Task<TorrentInfo[]> SearchForTorrentInfoFiltered(TorrentInfoFilter filter, int? limit = null);
Task<HashSet<string>> GetExistingInfoHashesAsync(List<string> infoHashes);
}
13 changes: 13 additions & 0 deletions src/Zilean.Database/Services/TorrentInfoService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,5 +189,18 @@ FROM search_imdb_meta(
return imdbRecord?.ImdbId;
}

public async Task<HashSet<string>> GetExistingInfoHashesAsync(List<string> infoHashes)
{
await using var serviceScope = serviceProvider.CreateAsyncScope();
await using var dbContext = serviceScope.ServiceProvider.GetRequiredService<ZileanDbContext>();

var existingHashes = await dbContext.Torrents
.Where(t => infoHashes.Contains(t.InfoHash))
.Select(t => t.InfoHash)
.ToListAsync();

return [..existingHashes];
}

private void WriteProgress(decimal @decimal) => logger.LogInformation("Storing torrent info: {Percentage:P}", @decimal);
}
41 changes: 34 additions & 7 deletions src/Zilean.Scraper/Features/Ingestion/GenericIngestionProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ public class GenericIngestionProcessor(
ITorrentInfoService torrentInfoService,
ZileanConfiguration configuration)
{
public async Task ProcessTorrentsAsync(string url, CancellationToken cancellationToken = default)
private int _processedCount;

public async Task ProcessTorrentsAsync(GenericEndpoint endpoint, CancellationToken cancellationToken = default)
{
logger.LogInformation("Processing URL: {Url}", url);
var sw = Stopwatch.StartNew();
logger.LogInformation("Processing URL: {@Url}", endpoint);

Interlocked.Exchange(ref _processedCount, 0);

var channel = Channel.CreateBounded<Task<StreamedEntry>>(new BoundedChannelOptions(configuration.Ingestion.MaxChannelSize)
{
Expand All @@ -18,17 +23,27 @@ public async Task ProcessTorrentsAsync(string url, CancellationToken cancellatio
FullMode = BoundedChannelFullMode.Wait
});

var producerTask = ProduceAsync(url, channel.Writer, cancellationToken);
var producerTask = ProduceAsync(endpoint, channel.Writer, cancellationToken);
var consumerTask = ConsumeAsync(channel.Reader, configuration.Ingestion.BatchSize, cancellationToken);
await Task.WhenAll(producerTask, consumerTask);

logger.LogInformation("Processed {Count} torrents for endpoint '{@Endpoint}' in {TimeTaken}s", _processedCount, endpoint, sw.Elapsed.TotalSeconds);
sw.Stop();
}

private async Task ProduceAsync(string url, ChannelWriter<Task<StreamedEntry>> writer, CancellationToken cancellationToken = default)
private async Task ProduceAsync(GenericEndpoint endpoint, ChannelWriter<Task<StreamedEntry>> writer, CancellationToken cancellationToken = default)
{
try
{
var httpClient = clientFactory.CreateClient();
var response = await httpClient.GetAsync(url, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
var fullUrl = endpoint.EndpointType switch
{
GenericEndpointType.Zurg => $"{endpoint.Url}/debug/torrents",
GenericEndpointType.Zilean => $"{endpoint.Url}/torrents/all",
_ => throw new InvalidOperationException("Unknown endpoint type")
};

var response = await httpClient.GetAsync(fullUrl, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
response.EnsureSuccessStatusCode();

var stream = await response.Content.ReadAsStreamAsync(cancellationToken);
Expand Down Expand Up @@ -89,18 +104,30 @@ private async Task ProcessBatch(List<Task<StreamedEntry>> batch, CancellationTok
{
var current = await result;
torrents.Add(ExtractedDmmEntry.FromStreamedEntry(current));
Interlocked.Increment(ref _processedCount);
}

if (torrents.Count == 0 || cancellationToken.IsCancellationRequested)
{
return;
}

logger.LogInformation("Processing batch of {Count} torrents", torrents.Count);
var infoHashes = torrents.Select(t => t.InfoHash!).ToList();

var existingInfoHashes = await torrentInfoService.GetExistingInfoHashesAsync(infoHashes);

var newTorrents = torrents.Where(t => !existingInfoHashes.Contains(t.InfoHash)).ToList();
logger.LogInformation("Filtered out {Count} torrents already in the database", torrents.Count - newTorrents.Count);

if (newTorrents.Count == 0)
{
logger.LogInformation("No new torrents to process in this batch.");
return;
}

if (torrents.Count != 0)
{
var parsedTorrents = await parseTorrentNameService.ParseAndPopulateAsync(torrents);
var parsedTorrents = await parseTorrentNameService.ParseAndPopulateAsync(newTorrents);
var finalizedTorrents = parsedTorrents.Where(torrentInfo => torrentInfo.WipeSomeTissue()).ToList();
logger.LogInformation("Parsed {Count} torrents", finalizedTorrents.Count);
await torrentInfoService.StoreTorrentInfo(finalizedTorrents);
Expand Down
64 changes: 45 additions & 19 deletions src/Zilean.Scraper/Features/Ingestion/GenericIngestionScraping.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,16 @@ public async Task<int> Execute(CancellationToken cancellationToken)
{
logger.LogInformation("Starting ingestion scraping");

List<string> urlsToProcess = [];
List<GenericEndpoint> urlsToProcess = [];

if (configuration.Ingestion.Kubernetes.EnableServiceDiscovery)
{
logger.LogInformation("Discovering URLs from Kubernetes services");
var urls = await kubernetesServiceDiscovery.DiscoverUrlsAsync(cancellationToken);
logger.LogInformation("Discovered {Count} URLs from Kubernetes services", urls.Count);
urlsToProcess.AddRange(urls);
}
configuration.Ingestion.ZileanInstances.Add("http://localhost:8181");
configuration.Ingestion.ZurgInstances.Add("http://experiments:19999");

if (configuration.Ingestion.ZurgInstances.Count > 0)
{
logger.LogInformation("Adding Zurg instances to the list of URLs to process");
urlsToProcess.AddRange(configuration.Ingestion.ZurgInstances);
}
await DiscoverUrlsFromKubernetesServices(cancellationToken, urlsToProcess);

if (configuration.Ingestion.ZileanInstances.Count > 0)
{
logger.LogInformation("Adding Zilean instances to the list of URLs to process");
urlsToProcess.AddRange(configuration.Ingestion.ZileanInstances);
}
AddZurgInstancesToUrls(urlsToProcess);

AddZileanInstancesToUrls(urlsToProcess);

if (urlsToProcess.Count == 0)
{
Expand All @@ -56,12 +45,49 @@ public async Task<int> Execute(CancellationToken cancellationToken)
}
catch (Exception ex)
{
logger.LogError(ex, "Error processing URL: {Url}", url);
logger.LogError(ex, "Error processing URL: {@Url}", url);
}
}

logger.LogInformation("Ingestion scraping completed for {Count} URLs", completedCount);

return 0;
}

private void AddZileanInstancesToUrls(List<GenericEndpoint> urlsToProcess)
{
if (configuration.Ingestion.ZileanInstances.Count > 0)
{
logger.LogInformation("Adding Zilean instances to the list of URLs to process");
urlsToProcess.AddRange(configuration.Ingestion.ZileanInstances.Select(url => new GenericEndpoint
{
EndpointType = GenericEndpointType.Zilean,
Url = url,
}));
}
}

private void AddZurgInstancesToUrls(List<GenericEndpoint> urlsToProcess)
{
if (configuration.Ingestion.ZurgInstances.Count > 0)
{
logger.LogInformation("Adding Zurg instances to the list of URLs to process");
urlsToProcess.AddRange(configuration.Ingestion.ZurgInstances.Select(url => new GenericEndpoint
{
EndpointType = GenericEndpointType.Zurg,
Url = url,
}));
}
}

private async Task DiscoverUrlsFromKubernetesServices(CancellationToken cancellationToken, List<GenericEndpoint> urlsToProcess)
{
if (configuration.Ingestion.Kubernetes.EnableServiceDiscovery)
{
logger.LogInformation("Discovering URLs from Kubernetes services");
var endpoints = await kubernetesServiceDiscovery.DiscoverUrlsAsync(cancellationToken);
logger.LogInformation("Discovered {Count} URLs from Kubernetes services", endpoints.Count);
urlsToProcess.AddRange(endpoints);
}
}
}
39 changes: 26 additions & 13 deletions src/Zilean.Scraper/Features/Ingestion/KubernetesServiceDiscovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,54 +4,67 @@ public class KubernetesServiceDiscovery(
ILogger<KubernetesServiceDiscovery> logger,
ZileanConfiguration configuration)
{
public async Task<List<string>> DiscoverUrlsAsync(CancellationToken cancellationToken = default)
private record DiscoveredService(V1Service Service, KubernetesSelector Selector);

public async Task<List<GenericEndpoint>> DiscoverUrlsAsync(CancellationToken cancellationToken = default)
{
var urls = new List<string>();
var urls = new List<GenericEndpoint>();

try
{
var clientConfig =
KubernetesClientConfiguration.BuildConfigFromConfigFile(configuration.Ingestion.Kubernetes.KubeConfigFile);
var kubernetesClient = new Kubernetes(clientConfig);

var services = await kubernetesClient.CoreV1.ListServiceForAllNamespacesAsync(
labelSelector: configuration.Ingestion.Kubernetes.LabelSelector,
cancellationToken: cancellationToken);
List<DiscoveredService> discoveredServices = [];

foreach (var selector in configuration.Ingestion.Kubernetes.KubernetesSelectors)
{
var services = await kubernetesClient.CoreV1.ListServiceForAllNamespacesAsync(
labelSelector: selector.LabelSelector,
cancellationToken: cancellationToken);

discoveredServices.AddRange(services.Items.Select(service => new DiscoveredService(service, selector)));
}

foreach (var service in services.Items)
foreach (var service in discoveredServices)
{
try
{
var url = BuildUrlFromService(service);
if (!string.IsNullOrEmpty(url))
{
urls.Add(url);
urls.Add(new GenericEndpoint
{
EndpointType = service.Selector.EndpointType,
Url = url,
});
logger.LogInformation("Discovered service URL: {Url}", url);
}
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to build URL for service {ServiceName} in namespace {Namespace}",
service.Metadata.Name, service.Metadata.NamespaceProperty);
service.Service.Metadata.Name, service.Service.Metadata.NamespaceProperty);
}
}
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to list services with label selector {LabelSelector}", configuration.Ingestion.Kubernetes.LabelSelector);
logger.LogError(ex, "Failed to list services with label selectors {@LabelSelector}", configuration.Ingestion.Kubernetes.KubernetesSelectors);
}

return urls;
}

private string BuildUrlFromService(V1Service service)
private static string BuildUrlFromService(DiscoveredService service)
{
if (service.Metadata?.NamespaceProperty == null)
if (service.Service.Metadata?.NamespaceProperty == null)
{
throw new InvalidOperationException("Service metadata or namespace is missing.");
}

var namespaceName = service.Metadata.NamespaceProperty;
return string.Format(configuration.Ingestion.Kubernetes.ZurgUrlTemplate, namespaceName);
var namespaceName = service.Service.Metadata.NamespaceProperty;
return string.Format(service.Selector.UrlTemplate, namespaceName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ public class IngestionConfiguration
public List<string> ZileanInstances { get; set; } = [];
public bool EnableScraping { get; set; } = false;
public KubernetesConfiguration Kubernetes { get; set; } = new();
public int BatchSize { get; set; } = 1000;
public int BatchSize { get; set; } = 500;
public int MaxChannelSize { get; set; } = 5000;

public string ScrapeSchedule { get; set; } = "0 * * * *";
public string ZurgEndpointSuffix { get; set; } = "/debug/torrents";
public string ZileanEndpointSuffix { get; set; } = "/torrents/all";
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
public class KubernetesConfiguration
{
public bool EnableServiceDiscovery { get; set; } = false;
public string ZurgUrlTemplate { get; set; } = "http://zurg.{0}:9999/debug/torrents";
public string LabelSelector { get; set; } = "app.elfhosted.com/name=zurg";
public List<KubernetesSelector> KubernetesSelectors { get; set; } = [new()];
public string KubeConfigFile { get; set; } = "/$HOME/.kube/config";
public bool IsConfigFile { get; set; } = false;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Zilean.Shared.Features.Configuration;

public class KubernetesSelector
{
public string UrlTemplate { get; set; } = "http://zurg.{0}:9999/debug/torrents";
public string LabelSelector { get; set; } = "app.elfhosted.com/name=zurg";
public GenericEndpointType EndpointType { get; set; } = GenericEndpointType.Zurg;
}
2 changes: 0 additions & 2 deletions src/Zilean.Shared/Features/Dmm/DmmRecords.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
using Zilean.Shared.Features.Scraping;

namespace Zilean.Shared.Features.Dmm;

public class ExtractedDmmEntry(string? infoHash, string? filename, long filesize, TorrentInfo? parseResponse)
Expand Down
7 changes: 7 additions & 0 deletions src/Zilean.Shared/Features/Scraping/GenericEndpoint.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Zilean.Shared.Features.Scraping;

public class GenericEndpoint
{
public required string Url { get; set; }
public required GenericEndpointType EndpointType { get; set; }
}
7 changes: 7 additions & 0 deletions src/Zilean.Shared/Features/Scraping/GenericEndpointType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Zilean.Shared.Features.Scraping;

public enum GenericEndpointType
{
Zilean,
Zurg
}
1 change: 1 addition & 0 deletions src/Zilean.Shared/GlobalUsings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
global using Zilean.Shared.Features.Configuration;
global using Zilean.Shared.Features.Dmm;
global using Zilean.Shared.Features.Imdb;
global using Zilean.Shared.Features.Scraping;
global using Zilean.Shared.Features.Torznab.Categories;
global using Zilean.Shared.Features.Torznab.Info;
global using Zilean.Shared.Features.Torznab.Parameters;
Expand Down

0 comments on commit 3bce11e

Please sign in to comment.