Skip to content

Commit

Permalink
Refactored table_azure_storage_blob to stream results per container a…
Browse files Browse the repository at this point in the history
…nd removed Go routine from the List function Closes #854 (#855)
  • Loading branch information
ParthaI authored Nov 28, 2024
1 parent 405604f commit 7123e9d
Showing 1 changed file with 7 additions and 33 deletions.
40 changes: 7 additions & 33 deletions azure/table_azure_storage_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"net/url"
"strings"
"sync"

"github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
"github.com/turbot/steampipe-plugin-sdk/v5/plugin"
Expand Down Expand Up @@ -389,31 +388,17 @@ func listStorageBlobs(ctx context.Context, d *plugin.QueryData, h *plugin.Hydrat
containers = append(containers, result.Values()...)
}

var wg sync.WaitGroup
blobCh := make(chan []blobInfo, len(containers))
errorCh := make(chan error, len(containers))

// Iterating all the available containers
for _, item := range containers {
wg.Add(1)
go getRowDataForBlobAsync(ctx, item, accountName, session.StorageEndpointSuffix, credential, &wg, blobCh, errorCh)
}

// wait for all containers to be processed
wg.Wait()

// NOTE: close channel before ranging over results
close(blobCh)
close(errorCh)

for err := range errorCh {
// return the first error
return nil, err
}
blobs, err := getRowDataForBlob(ctx, item, accountName, session.StorageEndpointSuffix, credential)
if err != nil {
plugin.Logger(ctx).Error("azure_storage_blob.listStorageBlobs.getRowDataForBlob", "api_error", err)
return nil, err
}

for item := range blobCh {
for _, data := range item {
for _, data := range blobs {
d.StreamListItem(ctx, &blobInfo{data.Blob, data.Name, accountName, data.Container, resourceGroup, &subscriptionID, region, data.IsSnapshot})

// Check if context has been cancelled or if the limit has been hit (if specified)
// if there is a limit, it will return the number of rows required to reach this limit
if d.RowsRemaining(ctx) == 0 {
Expand All @@ -425,17 +410,6 @@ func listStorageBlobs(ctx context.Context, d *plugin.QueryData, h *plugin.Hydrat
return nil, err
}

func getRowDataForBlobAsync(ctx context.Context, item storage.ListContainerItem, accountName string, storageEndpointSuffix string, credential *azblob.SharedKeyCredential, wg *sync.WaitGroup, subnetCh chan []blobInfo, errorCh chan error) {
defer wg.Done()

rowData, err := getRowDataForBlob(ctx, item, accountName, storageEndpointSuffix, credential)
if err != nil {
errorCh <- err
} else if rowData != nil {
subnetCh <- rowData
}
}

// List all the available blobs
func getRowDataForBlob(ctx context.Context, container storage.ListContainerItem, accountName string, storageEndpointSuffix string, credential *azblob.SharedKeyCredential) ([]blobInfo, error) {
primaryURL, _ := url.Parse(fmt.Sprintf("https://%s.blob.%s", accountName, storageEndpointSuffix))
Expand Down

0 comments on commit 7123e9d

Please sign in to comment.