From 7123e9d7db42c93350e13b2dcb097fc43ac2e1d9 Mon Sep 17 00:00:00 2001 From: Keep Focused Date: Thu, 28 Nov 2024 11:41:19 +0700 Subject: [PATCH] Refactored table_azure_storage_blob to stream results per container and removed Go routine from the List function Closes #854 (#855) --- azure/table_azure_storage_blob.go | 40 ++++++------------------------- 1 file changed, 7 insertions(+), 33 deletions(-) diff --git a/azure/table_azure_storage_blob.go b/azure/table_azure_storage_blob.go index 49fcbc16..339b49dd 100644 --- a/azure/table_azure_storage_blob.go +++ b/azure/table_azure_storage_blob.go @@ -5,7 +5,6 @@ import ( "fmt" "net/url" "strings" - "sync" "github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto" "github.com/turbot/steampipe-plugin-sdk/v5/plugin" @@ -389,31 +388,17 @@ func listStorageBlobs(ctx context.Context, d *plugin.QueryData, h *plugin.Hydrat containers = append(containers, result.Values()...) } - var wg sync.WaitGroup - blobCh := make(chan []blobInfo, len(containers)) - errorCh := make(chan error, len(containers)) - // Iterating all the available containers for _, item := range containers { - wg.Add(1) - go getRowDataForBlobAsync(ctx, item, accountName, session.StorageEndpointSuffix, credential, &wg, blobCh, errorCh) - } - - // wait for all containers to be processed - wg.Wait() - - // NOTE: close channel before ranging over results - close(blobCh) - close(errorCh) - - for err := range errorCh { - // return the first error - return nil, err - } + blobs, err := getRowDataForBlob(ctx, item, accountName, session.StorageEndpointSuffix, credential) + if err != nil { + plugin.Logger(ctx).Error("azure_storage_blob.listStorageBlobs.getRowDataForBlob", "api_error", err) + return nil, err + } - for item := range blobCh { - for _, data := range item { + for _, data := range blobs { d.StreamListItem(ctx, &blobInfo{data.Blob, data.Name, accountName, data.Container, resourceGroup, &subscriptionID, region, data.IsSnapshot}) + // Check if context has been cancelled or if the limit has been hit (if specified) // if there is a limit, it will return the number of rows required to reach this limit if d.RowsRemaining(ctx) == 0 { @@ -425,17 +410,6 @@ func listStorageBlobs(ctx context.Context, d *plugin.QueryData, h *plugin.Hydrat return nil, err } -func getRowDataForBlobAsync(ctx context.Context, item storage.ListContainerItem, accountName string, storageEndpointSuffix string, credential *azblob.SharedKeyCredential, wg *sync.WaitGroup, subnetCh chan []blobInfo, errorCh chan error) { - defer wg.Done() - - rowData, err := getRowDataForBlob(ctx, item, accountName, storageEndpointSuffix, credential) - if err != nil { - errorCh <- err - } else if rowData != nil { - subnetCh <- rowData - } -} - // List all the available blobs func getRowDataForBlob(ctx context.Context, container storage.ListContainerItem, accountName string, storageEndpointSuffix string, credential *azblob.SharedKeyCredential) ([]blobInfo, error) { primaryURL, _ := url.Parse(fmt.Sprintf("https://%s.blob.%s", accountName, storageEndpointSuffix))