Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] ADF Pipeline Runs query can't do pagination #39438

Closed
DavidKarlas opened this issue Oct 23, 2023 · 8 comments · Fixed by #42247
Closed

[BUG] ADF Pipeline Runs query can't do pagination #39438

DavidKarlas opened this issue Oct 23, 2023 · 8 comments · Fixed by #42247
Assignees
Labels
Data Factory Mgmt This issue is related to a management-plane library. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that

Comments

@DavidKarlas
Copy link
Contributor

DavidKarlas commented Oct 23, 2023

Library name and version

Azure.ResourceManager.DataFactory 1.0.0-beta.4

Describe the bug

Can't query more than 100 PipelineRuns via API because ContinuationToken is not exposed.
Only way to call Pipeline Runs - Query By Factory is by calling DataFactoryResource.GetPipelineRunsAsync.
Problem is that all you get from this call is AsyncPageable<DataFactoryPipelineRunInfo> which means ContinuationToken value is lost...
Since pagination is implemented via RunFilterContent property it means "default" nextLink logic for pages can't work...

Expected behavior

Ideally DataFactoryResource.GetPipelineRunsAsync would do whole pagination itself with ContinuationToken hidden away, but getting ContinuationToken is also acceptable I guess.

Actual behavior

No way to do pagination...

Reproduction Steps

Have ADF with more than 100 pipeline runs and try to query them...
I guess only way today would be by requesting ordered by update time, and keep updating RunFilterContent.LastUpdatedAfter, which feels a bit tricky, do I need to do minus one second?

Environment

Visual Studio 17.8

@github-actions github-actions bot added customer-reported Issues that are reported by GitHub users external to the Azure organization. Data Factory Mgmt This issue is related to a management-plane library. needs-team-triage Workflow: This issue needs the team to triage. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Oct 23, 2023
@jsquire jsquire added needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team and removed needs-team-triage Workflow: This issue needs the team to triage. labels Oct 23, 2023
@jsquire
Copy link
Member

jsquire commented Oct 23, 2023

Thank you for your feedback. Tagging and routing to the team member best able to assist.

@ruowan
Copy link
Member

ruowan commented Dec 4, 2023

Thanks @DavidKarlas I can reproduce this issue locally. The root cause is ContinuationToken is somehow hidden by SDK, client cannot get the nextPage result. @ArthurMa1978 need your help on this issues

@archerzz
Copy link
Member

archerzz commented Dec 6, 2023

I think in track2 mgmt, this method was generated as pageable method in a way the customers didn't expect.

So, in this case, a custom pageable operation is considered a List operation so that it is generated as a pageable method. Customers took it from granted that this new method will do the custom pagination.

@DavidKarlas
Copy link
Contributor Author

While I have you all here, could someone please take a look into #39187 and Azure/azure-rest-api-specs#26334 that fixes it... Thank you!

@ArthurMa1978 ArthurMa1978 removed the customer-reported Issues that are reported by GitHub users external to the Azure organization. label Jan 11, 2024
@ArthurMa1978
Copy link
Member

I think in track2 mgmt, this method was generated as pageable method in a way the customers didn't expect.

So, in this case, a custom pageable operation is considered a List operation so that it is generated as a pageable method. Customers took it from granted that this new method will do the custom pagination.

Think about to support this kind of pagination operation, which uses token to get next page.

@archerzz archerzz self-assigned this Feb 1, 2024
@sdzunenko
Copy link

Have the same issue here: using Azure.ResourceManager.DataFactory 1.0.0-beta.6 it is not possible to get more than 100 rows at once. Method returns IAsyncPageable<T> and continuation token is null.
It worked perfectly fine with Microsoft.Azure.Management.DataFactory.

Is here any workaround available?

For now I can see 2 available solutions:

  1. Rollback to Microsoft.Azure.Management.DataFactory
  2. Use lastUpdateBefore argument under the RunFilterContent

@DavidKarlas
Copy link
Contributor Author

Another option is to take this ugly class:

using Azure.Core.Pipeline;
using Azure.Core;
using Azure.ResourceManager.DataFactory.Models;
using Azure;
using System.Text.Json;
using Microsoft.Extensions.Options;
using Microsoft.Identity.Client.Platforms.Features.DesktopOs.Kerberos;
using System;
using Azure.ResourceManager;
using Azure.Identity;

namespace ControlTower.Helpers
{
    public static class PipelineRunsRestOperations
    {
        private static readonly HttpPipeline _pipeline;
        private static readonly Uri _endpoint;
        private static readonly string _apiVersion;

        static PipelineRunsRestOperations()
        {
            _pipeline = HttpPipelineBuilder.Build(
                new ArmClientOptions(),
                new BearerTokenAuthenticationPolicy(new ClientSecretCredential(Constants.AdfTenantId, Constants.AdfApplicationId, Constants.AdfAuthenticationKey),
                ArmEnvironment.AzurePublicCloud.DefaultScope));
            _endpoint = new Uri("https://management.azure.com");
            _apiVersion = "2018-06-01";
        }

        static internal HttpMessage CreateQueryByFactoryRequest(string subscriptionId, string resourceGroupName, string factoryName, RunFilterContent content)
        {
            var message = _pipeline.CreateMessage();
            var request = message.Request;
            request.Method = RequestMethod.Post;
            var uri = new RequestUriBuilder();
            uri.Reset(_endpoint);
            uri.AppendPath("/subscriptions/", false);
            uri.AppendPath(subscriptionId, true);
            uri.AppendPath("/resourceGroups/", false);
            uri.AppendPath(resourceGroupName, true);
            uri.AppendPath("/providers/Microsoft.DataFactory/factories/", false);
            uri.AppendPath(factoryName, true);
            uri.AppendPath("/queryPipelineRuns", false);
            uri.AppendQuery("api-version", _apiVersion, true);
            request.Uri = uri;
            request.Headers.Add("Accept", "application/json");
            request.Headers.Add("Content-Type", "application/json");
            var content0 = new Utf8JsonRequestContent();
            content.GetType()!.GetMethod("Azure.Core.IUtf8JsonSerializable.Write", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!.Invoke(content, [content0.JsonWriter]);
            request.Content = content0;
            return message;
        }
        internal class Utf8JsonRequestContent : RequestContent
        {
            private readonly MemoryStream _stream;
            private readonly RequestContent _content;

            public Utf8JsonWriter JsonWriter { get; }

            public Utf8JsonRequestContent()
            {
                _stream = new MemoryStream();
                _content = Create(_stream);
                JsonWriter = new Utf8JsonWriter(_stream);
            }

            public override async Task WriteToAsync(Stream stream, CancellationToken cancellation)
            {
                await JsonWriter.FlushAsync(cancellation).ConfigureAwait(false);
                await _content.WriteToAsync(stream, cancellation).ConfigureAwait(false);
            }

            public override void WriteTo(Stream stream, CancellationToken cancellation)
            {
                JsonWriter.Flush();
                _content.WriteTo(stream, cancellation);
            }

            public override bool TryComputeLength(out long length)
            {
                length = JsonWriter.BytesCommitted + JsonWriter.BytesPending;
                return true;
            }

            public override void Dispose()
            {
                JsonWriter.Dispose();
                _content.Dispose();
                _stream.Dispose();
            }
        }

        /// <summary> Query pipeline runs in the factory based on input filter conditions. </summary>
        /// <param name="subscriptionId"> The subscription identifier. </param>
        /// <param name="resourceGroupName"> The resource group name. </param>
        /// <param name="factoryName"> The factory name. </param>
        /// <param name="content"> Parameters to filter the pipeline run. </param>
        /// <param name="cancellationToken"> The cancellation token to use. </param>
        /// <exception cref="ArgumentNullException"> <paramref name="subscriptionId"/>, <paramref name="resourceGroupName"/>, <paramref name="factoryName"/> or <paramref name="content"/> is null. </exception>
        /// <exception cref="ArgumentException"> <paramref name="subscriptionId"/>, <paramref name="resourceGroupName"/> or <paramref name="factoryName"/> is an empty string, and was expected to be non-empty. </exception>
        public static async Task<Response<DataFactoryPipelineRunsQueryResult>> QueryByFactoryAsync(string subscriptionId, string resourceGroupName, string factoryName, RunFilterContent content, CancellationToken cancellationToken = default)
        {
            using var message = CreateQueryByFactoryRequest(subscriptionId, resourceGroupName, factoryName, content);
            await _pipeline.SendAsync(message, cancellationToken).ConfigureAwait(false);
            switch (message.Response.Status)
            {
                case 200:
                    {
                        DataFactoryPipelineRunsQueryResult value = default;
                        using var document = await JsonDocument.ParseAsync(message.Response.ContentStream, default, cancellationToken).ConfigureAwait(false);
                        value = DataFactoryPipelineRunsQueryResult.DeserializeDataFactoryPipelineRunsQueryResult(document.RootElement);
                        return Response.FromValue(value, message.Response);
                    }
                default:
                    throw new RequestFailedException(message.Response);
            }
        }

        /// <summary> A list pipeline runs. </summary>
        public class DataFactoryPipelineRunsQueryResult
        {
            /// <summary> Initializes a new instance of DataFactoryPipelineRunsQueryResult. </summary>
            /// <param name="value"> List of pipeline runs. </param>
            /// <exception cref="ArgumentNullException"> <paramref name="value"/> is null. </exception>
            internal DataFactoryPipelineRunsQueryResult(IEnumerable<DataFactoryPipelineRunInfo> value)
            {
                Value = value.ToList();
            }

            /// <summary> Initializes a new instance of DataFactoryPipelineRunsQueryResult. </summary>
            /// <param name="value"> List of pipeline runs. </param>
            /// <param name="continuationToken"> The continuation token for getting the next page of results, if any remaining results exist, null otherwise. </param>
            internal DataFactoryPipelineRunsQueryResult(IReadOnlyList<DataFactoryPipelineRunInfo> value, string continuationToken)
            {
                Value = value;
                ContinuationToken = continuationToken;
            }

            /// <summary> List of pipeline runs. </summary>
            public IReadOnlyList<DataFactoryPipelineRunInfo> Value { get; }
            /// <summary> The continuation token for getting the next page of results, if any remaining results exist, null otherwise. </summary>
            public string ContinuationToken { get; }
            internal static DataFactoryPipelineRunsQueryResult DeserializeDataFactoryPipelineRunsQueryResult(JsonElement element)
            {
                if (element.ValueKind == JsonValueKind.Null)
                {
                    return null;
                }
                IReadOnlyList<DataFactoryPipelineRunInfo> value = default;
                string? continuationToken = default;
                foreach (var property in element.EnumerateObject())
                {
                    if (property.NameEquals("value"u8))
                    {
                        List<DataFactoryPipelineRunInfo> array = new List<DataFactoryPipelineRunInfo>();
                        foreach (var item in property.Value.EnumerateArray())
                        {
                            array.Add((DataFactoryPipelineRunInfo)typeof(DataFactoryPipelineRunInfo).GetMethod("DeserializeDataFactoryPipelineRunInfo", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Static).Invoke(null, [item]));
                        }
                        value = array;
                        continue;
                    }
                    if (property.NameEquals("continuationToken"u8))
                    {
                        continuationToken = property.Value.GetString();
                        continue;
                    }
                }
                return new DataFactoryPipelineRunsQueryResult(value, continuationToken);
            }
        }
    }
}

and then call it like this:

        Response<PipelineRunsRestOperations.DataFactoryPipelineRunsQueryResult>? response = null;
        do
        {
            var filter = new RunFilterContent(response?.Value.ContinuationToken != null ? DateTimeOffset.MinValue : lastUpdate.AddMicroseconds(1), DateTimeOffset.MaxValue) {
                ContinuationToken = response?.Value.ContinuationToken
            };
            response = await PipelineRunsRestOperations.QueryByFactoryAsync(
                factory.Id.SubscriptionId!,
                factory.Id.ResourceGroupName!,
                factory.Id.Name,
                filter);
            foreach (var adfPipelineRun in response.Value.Value)
            {
               // Process adfPipelineRun 
            }
        } while (response.Value.ContinuationToken != null);

archerzz pushed a commit to archerzz/azure-sdk-for-net that referenced this issue Feb 28, 2024
add customization codes to implement the non-standard pagination logic of
`DataFactoryResource.GetPipelineRuns`

fix Azure#39438
@archerzz
Copy link
Member

@DavidKarlas @sdzunenko The issue should be fixed in #42247

archerzz added a commit that referenced this issue Feb 29, 2024
…42247)

- add customization codes to implement the non-standard pagination logic of
`DataFactoryResource.GetPipelineRuns`
- add test

fix #39438

---------

Co-authored-by: Mingzhe Huang (from Dev Box) <[email protected]>
angiurgiu pushed a commit that referenced this issue Mar 20, 2024
…42247)

- add customization codes to implement the non-standard pagination logic of
`DataFactoryResource.GetPipelineRuns`
- add test

fix #39438

---------

Co-authored-by: Mingzhe Huang (from Dev Box) <[email protected]>
@github-actions github-actions bot locked and limited conversation to collaborators May 29, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Data Factory Mgmt This issue is related to a management-plane library. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants