Skip to content

Commit

Permalink
migrate from azure-storage-blob-go to azure-sdk-for-go (#740)
Browse files Browse the repository at this point in the history
migrate from azure-storage-blob-go to azure-sdk-for-go
  • Loading branch information
sfc-gh-ext-simba-lb authored Mar 14, 2023
1 parent 95699ce commit 7d267da
Show file tree
Hide file tree
Showing 244 changed files with 29,675 additions and 23,985 deletions.
133 changes: 87 additions & 46 deletions azure_storage_client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021-2022 Snowflake Computing Inc. All rights reserved.
// Copyright (c) 2021-2023 Snowflake Computing Inc. All rights reserved.

package gosnowflake

Expand All @@ -14,7 +14,12 @@ import (
"strings"
"time"

"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)

type snowflakeAzureClient struct {
Expand All @@ -27,43 +32,54 @@ type azureLocation struct {

func (util *snowflakeAzureClient) createClient(info *execResponseStageInfo, _ bool) (cloudClient, error) {
sasToken := info.Creds.AzureSasToken
p := azblob.NewPipeline(azblob.NewAnonymousCredential(), azblob.PipelineOptions{
Retry: azblob.RetryOptions{
Policy: azblob.RetryPolicyExponential,
MaxTries: 60,
RetryDelay: 2 * time.Second,
u, err := url.Parse(fmt.Sprintf("https://%s.%s/%s%s", info.StorageAccount, info.EndPoint, info.Path, sasToken))
if err != nil {
return nil, err
}
client, err := azblob.NewClientWithNoCredential(u.String(), &azblob.ClientOptions{
ClientOptions: azcore.ClientOptions{
Retry: policy.RetryOptions{
MaxRetries: 60,
RetryDelay: 2 * time.Second,
},
},
})

u, err := url.Parse(fmt.Sprintf("https://%s.%s/%s%s", info.StorageAccount, info.EndPoint, info.Path, sasToken))
if err != nil {
return nil, err
}
containerURL := azblob.NewContainerURL(*u, p)
return &containerURL, nil
return client, nil
}

// cloudUtil implementation
func (util *snowflakeAzureClient) getFileHeader(meta *fileMetadata, filename string) (*fileHeader, error) {
container, ok := meta.client.(*azblob.ContainerURL)
client, ok := meta.client.(*azblob.Client)
if !ok {
return nil, fmt.Errorf("failed to parse client to azblob.ContainerURL")
return nil, fmt.Errorf("failed to parse client to azblob.Client")
}

azureLoc, err := util.extractContainerNameAndPath(meta.stageInfo.Location)
if err != nil {
return nil, err
}
path := azureLoc.path + strings.TrimLeft(filename, "/")
b := container.NewBlockBlobURL(path)
resp, err := b.GetProperties(context.Background(), azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
containerClient, err := container.NewClientWithNoCredential(client.URL(), &container.ClientOptions{})
if err != nil {
var se azblob.StorageError
return nil, &SnowflakeError{
Message: "failed to create container client",
}
}
blobClient := containerClient.NewBlockBlobClient(path)
resp, err := blobClient.GetProperties(context.Background(), &blob.GetPropertiesOptions{
AccessConditions: &blob.AccessConditions{},
CPKInfo: &blob.CPKInfo{},
})
if err != nil {
var se *azcore.ResponseError
if errors.As(err, &se) {
if se.ServiceCode() == azblob.ServiceCodeBlobNotFound {
if se.ErrorCode == string(bloberror.BlobNotFound) {
meta.resStatus = notFoundFile
return nil, fmt.Errorf("could not find file")
} else if se.Response().StatusCode == 403 {
} else if se.StatusCode == 403 {
meta.resStatus = renewToken
return nil, fmt.Errorf("received 403, attempting to renew")
}
Expand All @@ -73,19 +89,30 @@ func (util *snowflakeAzureClient) getFileHeader(meta *fileMetadata, filename str
}

meta.resStatus = uploaded
metadata := resp.NewMetadata()
metadata := resp.Metadata
var encData encryptionData
if err = json.Unmarshal([]byte(metadata["encryptiondata"]), &encData); err != nil {
if err = json.Unmarshal([]byte(*metadata["Encryptiondata"]), &encData); err != nil {
return nil, err
}

matdesc, ok := metadata["Matdesc"]
if !ok {
// matdesc is not in response, use empty string
matdesc = new(string)
}
encryptionMetadata := encryptMetadata{
encData.WrappedContentKey.EncryptionKey,
encData.ContentEncryptionIV,
metadata["matdesc"],
*matdesc,
}

digest, ok := metadata["Sfcdigest"]
if !ok {
// sfcdigest is not in response, use empty string
digest = new(string)
}
return &fileHeader{
metadata["sfcdigest"],
*digest,
int64(len(metadata)),
&encryptionMetadata,
}, nil
Expand All @@ -98,8 +125,8 @@ func (util *snowflakeAzureClient) uploadFile(
encryptMeta *encryptMetadata,
maxConcurrency int,
multiPartThreshold int64) error {
azureMeta := map[string]string{
"sfcdigest": meta.sha256Digest,
azureMeta := map[string]*string{
"sfcdigest": &meta.sha256Digest,
}
if encryptMeta != nil {
ed := &encryptionData{
Expand All @@ -122,31 +149,37 @@ func (util *snowflakeAzureClient) uploadFile(
if err != nil {
return err
}
azureMeta["encryptiondata"] = string(metadata)
azureMeta["matdesc"] = encryptMeta.matdesc
encryptionMetadata := string(metadata)
azureMeta["encryptiondata"] = &encryptionMetadata
azureMeta["matdesc"] = &encryptMeta.matdesc
}

azureLoc, err := util.extractContainerNameAndPath(meta.stageInfo.Location)
if err != nil {
return err
}
path := azureLoc.path + strings.TrimLeft(meta.dstFileName, "/")
azContainerURL, ok := meta.client.(*azblob.ContainerURL)
client, ok := meta.client.(*azblob.Client)
if !ok {
return &SnowflakeError{
Message: "failed to cast to azure client",
}
}

blobURL := azContainerURL.NewBlockBlobURL(path)
containerClient, err := container.NewClientWithNoCredential(client.URL(), &container.ClientOptions{})
if err != nil {
return &SnowflakeError{
Message: "failed to create container client",
}
}
blobClient := containerClient.NewBlockBlobClient(path)
if meta.srcStream != nil {
uploadSrc := meta.srcStream
if meta.realSrcStream != nil {
uploadSrc = meta.realSrcStream
}
_, err = azblob.UploadStreamToBlockBlob(context.Background(), uploadSrc, blobURL, azblob.UploadStreamToBlockBlobOptions{
BufferSize: uploadSrc.Len(),
Metadata: azureMeta,
_, err = blobClient.UploadStream(context.Background(), uploadSrc, &azblob.UploadStreamOptions{
BlockSize: int64(uploadSrc.Len()),
Metadata: azureMeta,
})
} else {
var f *os.File
Expand All @@ -156,23 +189,25 @@ func (util *snowflakeAzureClient) uploadFile(
}
defer f.Close()

blobOptions := azblob.UploadToBlockBlobOptions{
BlobHTTPHeaders: azblob.BlobHTTPHeaders{
ContentType: httpHeaderValueOctetStream,
ContentEncoding: "utf-8",
contentType := "application/octet-stream"
contentEncoding := "utf-8"
blobOptions := &azblob.UploadFileOptions{
HTTPHeaders: &blob.HTTPHeaders{
BlobContentType: &contentType,
BlobContentEncoding: &contentEncoding,
},
Metadata: azureMeta,
Parallelism: uint16(maxConcurrency),
Concurrency: uint16(maxConcurrency),
}
if meta.options.putAzureCallback != nil {
blobOptions.Progress = meta.options.putAzureCallback.call
}
_, err = azblob.UploadFileToBlockBlob(context.Background(), f, blobURL, blobOptions)
_, err = blobClient.UploadFile(context.Background(), f, blobOptions)
}
if err != nil {
var se azblob.StorageError
var se *azcore.ResponseError
if errors.As(err, &se) {
if se.Response().StatusCode == 403 && util.detectAzureTokenExpireError(se.Response()) {
if se.StatusCode == 403 && util.detectAzureTokenExpireError(se.RawResponse.Request.Response) {
meta.resStatus = renewToken
} else {
meta.resStatus = needRetry
Expand All @@ -199,22 +234,28 @@ func (util *snowflakeAzureClient) nativeDownloadFile(
return err
}
path := azureLoc.path + strings.TrimLeft(meta.srcFileName, "/")
azContainerURL, ok := meta.client.(*azblob.ContainerURL)
client, ok := meta.client.(*azblob.Client)
if !ok {
return &SnowflakeError{
Message: "failed to cast to azure client",
}
}

containerClient, err := container.NewClientWithNoCredential(client.URL(), &container.ClientOptions{})
if err != nil {
return &SnowflakeError{
Message: "failed to create container client",
}
}
blobClient := containerClient.NewBlockBlobClient(path)
f, err := os.OpenFile(fullDstFileName, os.O_CREATE|os.O_WRONLY, os.ModePerm)
if err != nil {
return err
}
defer f.Close()
blobURL := azContainerURL.NewBlockBlobURL(path)
if err = azblob.DownloadBlobToFile(
context.Background(), blobURL.BlobURL, 0, azblob.CountToEnd, f,
azblob.DownloadFromBlobOptions{Parallelism: uint16(maxConcurrency)}); err != nil {
_, err = blobClient.DownloadFile(
context.Background(), f, &azblob.DownloadFileOptions{
Concurrency: uint16(maxConcurrency)})
if err != nil {
return err
}
meta.resStatus = downloaded
Expand Down
8 changes: 3 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ go 1.19

require (
github.com/99designs/keyring v1.2.1
github.com/Azure/azure-storage-blob-go v0.15.0
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.4.0
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0
github.com/apache/arrow/go/v10 v10.0.1
github.com/aws/aws-sdk-go-v2 v1.16.16
github.com/aws/aws-sdk-go-v2/credentials v1.12.20
Expand All @@ -20,7 +21,7 @@ require (

require (
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 // indirect
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.8 // indirect
Expand All @@ -37,13 +38,11 @@ require (
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v2.0.8+incompatible // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.15.11 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/mattn/go-ieproxy v0.0.1 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/mtibben/percent v0.2.1 // indirect
Expand All @@ -56,7 +55,6 @@ require (
golang.org/x/text v0.7.0 // indirect
golang.org/x/tools v0.1.12 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

replace github.com/mattn/go-sqlite3 => github.com/mattn/go-sqlite3 v1.14.15
Loading

0 comments on commit 7d267da

Please sign in to comment.