Skip to content

Realtime Query Command

Sergiu Ciumac edited this page Jan 3, 2022 · 5 revisions

Real-time query command

Oftentimes the source of the query comes from real-time data: broadcast, stream, microphone, a continuous stream of files, etc. This sets additional challenges in correct striding and real-time matches management that continue across multiple discrete query chunks.

To handle these uses cases, a special query command has been developed: RealtimeQueryCommand. It accepts an IAsyncEnumerable that continuously feeds the algorithm with data to fingerprint and query.

RealtimeQueryCommand should be viewed as a generic consumer of realtime files, samples, or hashes, which queries the underlying IModelService. The producer is abstracted by IAsyncEnumerable interface.

Query from a continuous stream of files

Below is a short example of how to create real-time query command from a continuous list of files generated by a given producer:

/// <summary>
///  Instance of model service with stored fingerprints to query.
/// </summary>
private readonly IModelService modelService = new InMemoryModelService();

/// <summary>
///  Media service to use for audio/video decoding.
/// </summary>
/// <remarks>
///  Install SoundFingerprinting.Emy package to get access to FFmpegAudioService.
/// </remarks>
private readonly IMediaService mediaService = new FFmpegAudioService();

public void StartQueryingFromFiles(IAsyncEnumerable<string> files, CancellationToken token)
{
    _ = QueryCommandBuilder.Instance
        .BuildRealtimeQueryCommand()
        .From(files, MediaType.Audio)
        .WithRealtimeQueryConfig(config =>
        {
            // provide a success callback that will be invoked for matches that pass result entry filter
            config.SuccessCallback = result =>
            {
                foreach (var entry in result.ResultEntries)
                {
                    Console.WriteLine($"Successfully matched {entry.TrackId}");
                }
            };

            // configure result entry filter
            config.ResultEntryFilter = new TrackMatchLengthEntryFilter(5d);
            return config;
        })
        .UsingServices(modelService, mediaService)
        .Query(token);
} 

This will start a query that will continue until the cancellation token is invoked. Similarly, you can provide the list of already downsampled AudioSamples, by using the appropriate overload. Make sure the audio samples adhere to the format required by the algorithm: mono audio, sampled at 5512 Hz.

Notice how we pass a SuccessCallback to capture successful matches and ResultEntryFilter to filter false positives.

Querying from the broadcasted stream

Realtime streams are typical radio broadcasts available over the web. Here is an example of CNN radio that you can listen to online: https://tunein.streamguys1.com/cnn-new

Using a real-time stream as your query source gives the opportunity to monitor the stream 24/7. Below is an example of how you can generate a real-time query command referencing CNN stream.

/// <summary>
///  Instance of model service with stored fingerprints.
/// </summary>
private readonly IModelService modelService = new InMemoryModelService();

/// <summary>
///  Media service to use for audio/video decoding.
/// </summary>
/// <remarks>
///  Install SoundFingerprinting.Emy package to get access to FFmpegAudioService.
/// </remarks>
private readonly IMediaService mediaService = new FFmpegAudioService();

public void StartQueryingRealtimeSource(CancellationToken cancellationToken)
{
    string url = "https://tunein.streamguys1.com/cnn-new";

    _ = QueryCommandBuilder.Instance
        .BuildRealtimeQueryCommand()
        .From(url, chunkLength: 60, MediaType.Audio)
        .WithRealtimeQueryConfig(config =>
        {
            config.ResultEntryFilter = new TrackMatchLengthEntryFilter(5d);
            config.SuccessCallback = result => Console.WriteLine($"Found {result.ResultEntries.Count()} matches");
            config.ErrorCallback = (exception, _) => Console.WriteLine($"An exception occured while querying the data source: {exception.Message}");
            config.RestoredAfterErrorCallback = () => Console.WriteLine("Query connection re-established");
            config.DidNotPassFilterCallback = result => Console.WriteLine($"Found {result.ResultEntries.Count()} matches, but they did not pass result entry filter.");
            return config;
        })
        .InterceptHashes(hashes =>
        {
            Console.WriteLine($"Querying the source with hashes {hashes} captured at {hashes.RelativeTo:O}");
            return hashes;
        })
        .UsingServices(modelService, mediaService)
        .Query(cancellationToken);
}

Things to understand:

  • The data source with tracks is provided by IModelService. Available sources listed here.
  • Queries will be generated every 60 seconds, after getting captured from the source stream.
  • The query thread will continue until the cancellation token is not canceled.
  • Matches will be relative to the DateTime.UtcNow, to be able to correctly identify the match time relative to the universal time.

Query from a microphone

Below is a short example of a program that queries the underlying IModelService using samples captured over the microphone. To run this example, you will need NAudio package that provides means for recording from available recording devices.

class Program
{
    static BlockingCollection<AudioSamples> realtimeSource;
    static WaveInEvent waveSource;
    static int sampleRate = 5512;


    static async Task Main(string[] args)
    {
        _ = Task.Factory.StartNew(RecordMicNAudio);

        var tokenSource = new CancellationTokenSource();
        realtimeSource = new BlockingCollection<AudioSamples>();

        IModelService modelService = GetModelService(); // initialize your data source

        Console.WriteLine("Start Matching [...]");
        _ = GetBestMatchForStream(realtimeSource, modelService, tokenSource.Token);

        Console.WriteLine("Press any key to cancel.");
        Console.ReadKey();
        tokenSource.Cancel();
    }

    /**
      * Since NAudio microphone recording is based on event-driven architecture we will push
      * recorded samples into a blocking collection, that RealtimeQueryCommand will consume
      * Typical producer-consumer pattern.
      */
    static void RecordMicNAudio()
    {
        Console.WriteLine($"Available devices {WaveIn.DeviceCount}. Will use device 0 for recording.");
        for (int device = 0; device < WaveIn.DeviceCount; ++device)
        {
            var capabilities = WaveIn.GetCapabilities(device);
            Console.WriteLine($"Device {device} Name {capabilities.ProductName}, Channels {capabilities.Channels}");
        }

        waveSource = new WaveInEvent();
        waveSource.DeviceNumber = 0;
        waveSource.WaveFormat = new NAudio.Wave.WaveFormat(rate: sampleRate, bits: 16, channels: 1);
        waveSource.DataAvailable += (_, e) =>
        {
            // using short because 16 bits per sample is used as input wave format
            short[] samples = new short[e.BytesRecorded / 2];
            Buffer.BlockCopy(e.Buffer, 0, samples, 0, e.BytesRecorded);
            // converting to [-1, +1] range
            float[] floats = Array.ConvertAll(samples, (sample => (float)sample / short.MaxValue));
            realtimeSource.Add(new AudioSamples(floats, string.Empty, sampleRate));
        };
        waveSource.RecordingStopped += (_, _) => Console.WriteLine("Recording stopped.");
        waveSource.BufferMilliseconds = 1000;
        waveSource.StartRecording();
    }

    /**
     * Wrapping blocking collection to an adapter that implements IAsyncEnumerable interface.
     * BlockingRealtimeCollection adapter is provided within SoundFingerprinting package.
     */
    public static async Task<double> GetBestMatchForStream(BlockingCollection<AudioSamples> audioSamples, IModelService modelService, CancellationToken token)
    {
        double seconds = await QueryCommandBuilder.Instance
            .BuildRealtimeQueryCommand()
            .From(new BlockingRealtimeCollection<AudioSamples>(audioSamples))
            .WithRealtimeQueryConfig(config =>
            {
                // match only those entries got at least 5 seconds of query match
                config.ResultEntryFilter = new TrackMatchLengthEntryFilter(5d);

                // provide a success callback that will be invoked for matches that pass the result entry filter
                config.SuccessCallback = result =>
                {
                    foreach (var entry in result.ResultEntries)
                    {
                        Console.WriteLine($"Successfully matched {entry.TrackId}");
                    }
                };

                config.DidNotPassFilterCallback = (queryResult) =>
                {
                    foreach (var result in queryResult.ResultEntries)
                    {
                        Console.WriteLine($"Did not pass filter {result.TrackId}");
                    }
                };

                return config;
            })
            .UsingServices(modelService)
            .Query(token);

        Console.WriteLine($"Realtime query stopped. Issued {seconds} seconds of query.");
        return seconds;
    }
    
    private static IModelService GetModelService()
    {
        return new InMemoryModelService();
    }
}

Since samples are captured in an event-driven way, an adapter class BlockingRealtimeCollection is provided to facilitate their consumption via IAsyncEnumerable.

Realtime configuration options

RealtimeQueryCommand can be configured similar to QueryCommand, as it supersedes the number of available options for configuration. Below are listed additional options that you may want to fine-tune for your application.

  • SuccessCallback - success callback invoked when a match is detected.
  • ResultEntryFilter - filters query results before they are emitted in the SuccessCallback.
    • CompletedRealtimeMatchResultEntryFilter - keeps the match from getting emitted until it can't continue in the next query. Since realtime queries come in chunks that can partition a match into multiple parts (i.e., a 3-minute song will match 3 times if the length of the query is 1 minute), this filter prevents partitioning, emitting only 1 success entry at the end of the last match.
    • TrackMatchLengthEntryFilter - filters all entries those TrackCoverageWithPermittedGapsLength is shorter than the threshold.
    • TrackRelativeCoverageLengthEntryFilter - filters all entries those TrackRelativeCoverage is shorter than the threshold. An example 0.4 - meaning all tracks that matched less than 40% of their length will be disregarded. Also allows specifying waitTillCompletion flag indicating whether to wait till completion before emitting the result (default true).
    • PassThroughRealtimeResultEntryFilter - the matches will be emitted immediately once occurred.
    • NoPassRealtimeResultEntryFilter - block all matches from getting emitted.
  • DidNotPassFilterCallback - callback invoked for those entries that did not pass the ResultEntryFilter.
  • OngoingResultEntryFilter - similar to ResultEntryFilter, but allows identifying matches before they are complete. As an example, consider a 3-minute long song that will continue to match for the entire 3 minutes before it is propagated to SuccessCallback. To be able to detect what's playing right now OngoingSuccessCallback and OngoingResultEntryFilter have been added.
    • OngoingRealtimeResultEntryFilter - will emit the result without waiting it to complete. As an example for initialization values minCoverage = 0.2 and minTrackLength = 10, a 1-minute long track will be emitted 6 times in the OngoingSuccessCallback.
  • OngoingSuccessCallback - callback invoked on entries which pass OngoingRealtimeResultEntryFilter.
  • ErrorCallback - callback invoked on an error. The command will continue to execute even when an error occurs. To alert the developer this callback will be invoked. If you need to stop querying immediately after an error occured, invoke token cancellation in the callback.
  • RestoredAfterErrorCallback - when the command recovers from an error, restore callback will be invoked.
  • AutomaticSkipDetection - experimental automatic skip detection is used to identify fast-forwards or backwards skip in the realtime query source (i.e., skip through audio/video player).

There are more properties marked with Experimental flag. Those are not listed here and are subject to change in the future.