diff --git a/cmd/root.go b/cmd/root.go index eb9629918..56768c166 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -58,6 +58,7 @@ var azcopyScanningLogger common.ILoggerResetable var azcopyCurrentJobID common.JobID var azcopySkipVersionCheck bool var retryStatusCodes string +var grpcServerPort string type jobLoggerInfo struct { jobID common.JobID @@ -189,6 +190,12 @@ var rootCmd = &cobra.Command{ beginDetectNewVersion() } + if common.GrpcShim.Available() { + if any(common.GrpcShim).(common.GrpcCtl).SetupGrpc(grpcServerPort, common.AzcopyCurrentJobLogger) != nil { + return err + } + } + if debugSkipFiles != "" { for _, v := range strings.Split(debugSkipFiles, ";") { if strings.HasPrefix(v, "/") { @@ -256,9 +263,17 @@ func init() { rootCmd.PersistentFlags().BoolVar(&azcopyAwaitAllowOpenFiles, "await-open", false, "Used when debugging, to tell AzCopy to await `open` on stdin, after scanning but before opening the first file. Assists with testing cases around file modifications between scanning and usage") rootCmd.PersistentFlags().StringVar(&debugSkipFiles, "debug-skip-files", "", "Used when debugging, to tell AzCopy to cancel the job midway. List of relative paths to skip in the STE.") + // special remote control flag, only available if the build enabled it. + if common.GrpcShim.Available() { + rootCmd.PersistentFlags().StringVar(&grpcServerPort, "grpc-server-addr", "", "Used in specific scenarios; defaults to disabled. If set, listens on the requested port (e.g. 127.0.0.1:9879). Protocol spec is in grpcctl/internal.") + } + // reserved for partner teams _ = rootCmd.PersistentFlags().MarkHidden("cancel-from-stdin") + // currently for use in the ev2 extension + _ = rootCmd.PersistentFlags().MarkHidden("enable-grpc-server") + // special flags to be used in case of unexpected service errors. rootCmd.PersistentFlags().StringVar(&retryStatusCodes, "retry-status-codes", "", "Comma-separated list of HTTP status codes to retry on. (default '408;429;500;502;503;504')") _ = rootCmd.PersistentFlags().MarkHidden("retry-status-codes") diff --git a/common/environment.go b/common/environment.go index 5a4b7f3d0..26d2ca10c 100644 --- a/common/environment.go +++ b/common/environment.go @@ -98,6 +98,7 @@ func (AutoLoginType) MSI() AutoLoginType { return AutoLoginType(2) } func (AutoLoginType) AzCLI() AutoLoginType { return AutoLoginType(3) } func (AutoLoginType) PsCred() AutoLoginType { return AutoLoginType(4) } func (AutoLoginType) Workload() AutoLoginType { return AutoLoginType(5) } +func (AutoLoginType) GRPC() AutoLoginType { return AutoLoginType(254) } // Ev2 Extension/FRP integration only. Receives fresh OAuth tokens via GRPC. func (AutoLoginType) TokenStore() AutoLoginType { return AutoLoginType(255) } // Storage Explorer internal integration only. Do not add this to ValidAutoLoginTypes. func (d AutoLoginType) String() string { diff --git a/common/grpc_setup.go b/common/grpc_setup.go new file mode 100644 index 000000000..bd0fbe8ee --- /dev/null +++ b/common/grpc_setup.go @@ -0,0 +1,55 @@ +//go:build grpc +// +build grpc + +package common + +import ( + "github.com/Azure/azure-storage-azcopy/v10/grpcctl" + "golang.org/x/net/http2" + "golang.org/x/net/http2/h2c" + "golang.stackrox.io/grpc-http1/server" + "net/http" +) + +func (grpcCtlImpl) SetupGrpc(addr string, logger ILoggerResetable) error { + if addr != "" { + // JobLog is a function, rather than just a reference, to avoid a dependency loop. It's gross, I know. + grpcctl.JobLog = func(s string) { + logger.Log(LogInfo, s) + } + + // Spin off the HTTP server + go func() { + // HTTP/1 needs support + srv := &http.Server{ + Addr: addr, + } + + // But we must also support HTTP/2 for "modern" clients. + var h2srv http2.Server + + // The downgrade handler will allow clients to request grpc-web support, removing trailers, etc. for platforms like .NET Framework 4.7.2. + srv.Handler = h2c.NewHandler( + server.CreateDowngradingHandler( + grpcctl.GlobalGRPCServer, + http.NotFoundHandler(), // No fallback handler is needed. + server.PreferGRPCWeb(true)), // If grpc-web is requested, grpc-web we'll give. + &h2srv) + + // Start listening. + err := srv.ListenAndServe() + if err != nil { + panic("grpcfailed: " + err.Error()) + } + }() + } + + // Historically, this could return an error. it does not anymore. + return nil +} + +func (grpcCtlImpl) SetupOAuthSubscription(updateFunc func(token *OAuthTokenUpdate)) { + grpcctl.Subscribe(grpcctl.GlobalServer, func(i *grpcctl.OAuthTokenUpdate) { + updateFunc((*OAuthTokenUpdate)(i)) + }) +} diff --git a/common/grpc_shim.go b/common/grpc_shim.go new file mode 100644 index 000000000..ecceedffb --- /dev/null +++ b/common/grpc_shim.go @@ -0,0 +1,28 @@ +package common + +import "time" + +/* +grpc_shim.go implements a shim that allows for GRPC functionality to reasonably disappear, removing all package references to grpcctl, and grpc in general. +*/ + +type GrpcCtl interface { + SetupGrpc(string, ILoggerResetable) error + SetupOAuthSubscription(update func(*OAuthTokenUpdate)) +} + +type grpcCtlImpl struct{} + +var GrpcShim grpcCtlImpl + +func (g grpcCtlImpl) Available() bool { + _, ok := (any(g)).(GrpcCtl) + return ok +} + +type OAuthTokenUpdate struct { + Token string + Live time.Time + Expiry time.Time + Wiggle time.Duration +} diff --git a/common/oauthTokenManager.go b/common/oauthTokenManager.go index ead150881..31f0e28d0 100644 --- a/common/oauthTokenManager.go +++ b/common/oauthTokenManager.go @@ -84,7 +84,7 @@ func newAzcopyHTTPClient() *http.Client { Timeout: 10 * time.Second, KeepAlive: 10 * time.Second, DualStack: true, - }).Dial, /*Context*/ + }).Dial, /*Context*/ MaxIdleConns: 0, // No limit MaxIdleConnsPerHost: 1000, IdleConnTimeout: 180 * time.Second, @@ -398,6 +398,17 @@ func (uotm *UserOAuthTokenManager) getTokenInfoFromEnvVar(ctx context.Context) ( return nil, fmt.Errorf("get token from environment variable failed to unmarshal token, %v", err) } + // Seed the grpc state with what we retrieved from the env var + if tokenInfo.LoginType == EAutoLoginType.GRPC() { + g := globalRPCOAuthTokenState + g.Mutex.L.Lock() + g.Token = tokenInfo.AccessToken + g.Live = time.Now() + g.Expiry = tokenInfo.Expires() + g.Mutex.L.Unlock() + g.Mutex.Broadcast() + } + if tokenInfo.LoginType != EAutoLoginType.TokenStore() { refreshedToken, err := tokenInfo.Refresh(ctx) if err != nil { @@ -524,7 +535,7 @@ type TokenStoreCredential struct { // we do not make repeated GetToken calls. // This is a temporary fix for issue where we would request a // new token from Stg Exp even while they've not yet populated the -// tokenstore. +// tokenstore. // // This is okay because we use same credential on both source and // destination. If we move to a case where the credentials are @@ -532,7 +543,6 @@ type TokenStoreCredential struct { // // We should move to a method where the token is always read from // tokenstore, and azcopy is invoked after tokenstore is populated. -// var globalTokenStoreCredential *TokenStoreCredential var globalTsc sync.Once @@ -739,6 +749,12 @@ func (credInfo *OAuthTokenInfo) GetDeviceCodeCredential() (azcore.TokenCredentia return tc, nil } +func (credInfo *OAuthTokenInfo) GetGRPCOAuthCredential() (azcore.TokenCredential, error) { + tc := globalRPCOAuthTokenState + credInfo.TokenCredential = tc + return tc, nil +} + func (credInfo *OAuthTokenInfo) GetTokenCredential() (azcore.TokenCredential, error) { // Token Credential is cached. if credInfo.TokenCredential != nil { @@ -750,6 +766,8 @@ func (credInfo *OAuthTokenInfo) GetTokenCredential() (azcore.TokenCredential, er } switch credInfo.LoginType { + case EAutoLoginType.GRPC(): + return credInfo.GetGRPCOAuthCredential() case EAutoLoginType.MSI(): return credInfo.GetManagedIdentityCredential() case EAutoLoginType.SPN(): @@ -785,6 +803,124 @@ func jsonToTokenInfo(b []byte) (*OAuthTokenInfo, error) { // ==================================================================================== +var globalGRPCOAuthTokenLock = &sync.RWMutex{} + +// Pass in the RLocker to the sync.Cond so we're only requesting read locks, not write locks. +var globalRPCOAuthTokenState = &GRPCOAuthToken{Mutex: sync.NewCond(globalGRPCOAuthTokenLock.RLocker())} + +func init() { + if GrpcShim.Available() { + any(GrpcShim).(GrpcCtl).SetupOAuthSubscription(func(token *OAuthTokenUpdate) { + g := globalRPCOAuthTokenState + globalGRPCOAuthTokenLock.Lock() // Grab the write lock + + if AzcopyCurrentJobLogger != nil { + AzcopyCurrentJobLogger.Log(LogInfo, fmt.Sprintf("Received fresh OAuth token. (invalid: %v, exp: %v, now: %v, expiry: %v)", g.Token == "", time.Now().After(g.Expiry), time.Now(), g.Expiry)) + } + + // Write the fresh token we've been handed + g.Token = token.Token + g.Live = token.Live + g.Expiry = token.Expiry + g.Wiggle = token.Wiggle + g.GiveUp = false // We've stopped giving up, if we've received a fresh token. + + if AzcopyCurrentJobLogger != nil { + AzcopyCurrentJobLogger.Log(LogInfo, "Broadcasting new OAuth token.") + } + + // Drop the lock, let "clients" know there's a new token. + globalGRPCOAuthTokenLock.Unlock() + g.Mutex.Broadcast() + }) + } +} + +type GRPCOAuthToken struct { + Token string + Live time.Time + Expiry time.Time + // Time in seconds before expiry we'll act like it is expired + Wiggle time.Duration + + // If we don't receive an oauth token for an extended period of time, just give up and fail fast. It's too long to wait for say, 50k files to fail at their own pace. + GiveUp bool + + // Using a sync.Cond, we allow "clients" to drop their lock and await a fresh token signal. + Mutex *sync.Cond +} + +func (g *GRPCOAuthToken) GetToken(ctx context.Context, options policy.TokenRequestOptions) (azcore.AccessToken, error) { + g.Mutex.L.Lock() // Grab the read lock + defer g.Mutex.L.Unlock() // Defer the drop lock, so we don't need to mentally consider this lock. + +retry: + if g.GiveUp { // Add an escape catch, this way new requests after we've given up also give up immediately. This reduces failure time. + return azcore.AccessToken{}, fmt.Errorf("timed out waiting for new token (GiveUp set)") + } + + exp := g.Expiry.Add(-g.Wiggle) + totalDuration := g.Expiry.Sub(g.Live) + + if time.Now().After(exp) || // The token is naturally expired, this should happen. + g.Token == "" || // The token is empty... + totalDuration < 0 { // The token couldn't have been valid... + // Log any potential issues with the token. + if AzcopyCurrentJobLogger != nil { + AzcopyCurrentJobLogger.Log(LogInfo, fmt.Sprintf("Token is expired or invalid (invalid: %v, exp: %v, now: %v, expiry: %v)", g.Token == "" || totalDuration < 0, time.Now().After(exp), time.Now(), exp)) + } + + // Begin waiting for a fresh token. + waitBegin := time.Now() + waitch := make(chan bool) + + go func() { + // g.Mutex.Wait silently releases, then re-captures the (read) mutex. + // For duration, + g.Mutex.Wait() + if AzcopyCurrentJobLogger != nil { + AzcopyCurrentJobLogger.Log(LogInfo, "Released.") + } + + // send wait unblock + waitch <- true + }() + + // Time out eventually, so that AzCopy can exit. + select { + case <-waitch: + if AzcopyCurrentJobLogger != nil { + AzcopyCurrentJobLogger.Log(LogInfo, "Received signal from waitch") + } + close(waitch) + case <-time.After(totalDuration * 3): + g.Mutex.L.Lock() // Grab the write lock. + g.GiveUp = true // Tell everybody we're giving up. + g.Mutex.Broadcast() // Unblock our waiter, *and* everybody else, now that we know we're giving up. + g.Mutex.L.Unlock() // Drop the write lock. + + <-waitch // We must wait for our waiter, because that signals that we have our original lock back. If we drop without it, we may drop somebody else's (or nobody else's) + close(waitch) + + return azcore.AccessToken{}, fmt.Errorf("timed out waiting for new token (3x last live duration) (Began waiting %v, finished %v, duration %v)", waitBegin, time.Now(), totalDuration*3) + } + + goto retry + } + + t := g.Token + e := g.Expiry + return azcore.AccessToken{ + Token: t, + // e could be zero and the result would be the same + // azcore will """refresh""" every 30s + // but we'll hand it back the same token (or whatever has been updated to) + ExpiresOn: e, + }, nil +} + +// ==================================================================================== + // TestOAuthInjection controls variables for OAuth testing injections type TestOAuthInjection struct { DoTokenRefreshInjection bool diff --git a/go.mod b/go.mod index d2b3db4d9..c71c793ee 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,9 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 github.com/Azure/go-autorest/autorest/date v0.3.0 golang.org/x/net v0.27.0 + golang.stackrox.io/grpc-http1 v0.3.12 + google.golang.org/grpc v1.65.0 + google.golang.org/protobuf v1.34.2 ) require ( @@ -55,6 +58,7 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/golang-jwt/jwt/v4 v4.5.0 // indirect github.com/golang-jwt/jwt/v5 v5.2.1 // indirect + github.com/golang/glog v1.2.1 // indirect github.com/google/s2a-go v0.1.8 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.13.0 // indirect @@ -78,9 +82,8 @@ require ( google.golang.org/genproto v0.0.0-20240723171418-e6d459c13d2a // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240723171418-e6d459c13d2a // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240723171418-e6d459c13d2a // indirect - google.golang.org/grpc v1.65.0 // indirect - google.golang.org/protobuf v1.34.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + nhooyr.io/websocket v1.8.11 // indirect ) go 1.22.5 diff --git a/go.sum b/go.sum index e70b7db96..27cdac934 100644 --- a/go.sum +++ b/go.sum @@ -75,6 +75,8 @@ github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/glog v1.2.1 h1:OptwRhECazUx5ix5TTWC3EZhsZEHWcYWY4FQHTIubm4= +github.com/golang/glog v1.2.1/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -256,6 +258,8 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.stackrox.io/grpc-http1 v0.3.12 h1:5Hmpu6OJN4GsoqF+orSv5rPd/R/W02rfPkX0xq3GY1I= +golang.stackrox.io/grpc-http1 v0.3.12/go.mod h1:Xl6wDiPJ7OjLWOzwxFwhThktLTNs1iDS95alVpZct/A= google.golang.org/api v0.189.0 h1:equMo30LypAkdkLMBqfeIqtyAnlyig1JSZArl4XPwdI= google.golang.org/api v0.189.0/go.mod h1:FLWGJKb0hb+pU2j+rJqwbnsF+ym+fQs73rbJ+KAUgy8= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= @@ -295,3 +299,5 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0= +nhooyr.io/websocket v1.8.11/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= diff --git a/grpcctl/events.go b/grpcctl/events.go new file mode 100644 index 000000000..e9c8c2e0f --- /dev/null +++ b/grpcctl/events.go @@ -0,0 +1,26 @@ +package grpcctl + +import "time" + +type EventSubscriber[I any] func(*I) + +type EventSubscriberRaw func(any) + +// 0 = none, -1 = infinite +var validSubscriptionCounts = map[string]int64{ + fetchSubscriptionName(OAuthTokenUpdate{}): 1, +} + +// Inputs + +// EventSubscriber = func(OAuthTokenUpdate) EmptyResponse +type OAuthTokenUpdate struct { + Token string + Live time.Time + Expiry time.Time + Wiggle time.Duration +} + +// Outputs + +type EmptyResponse struct{} diff --git a/grpcctl/internal/grpcctl.go b/grpcctl/internal/grpcctl.go new file mode 100644 index 000000000..0c677709c --- /dev/null +++ b/grpcctl/internal/grpcctl.go @@ -0,0 +1,242 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v5.27.1 +// source: grpcctl.proto + +package internal + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type EmptyReply struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Usually "OK" + Status *string `protobuf:"bytes,1,req,name=Status" json:"Status,omitempty"` +} + +func (x *EmptyReply) Reset() { + *x = EmptyReply{} + if protoimpl.UnsafeEnabled { + mi := &file_azcopy_grpc_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *EmptyReply) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EmptyReply) ProtoMessage() {} + +func (x *EmptyReply) ProtoReflect() protoreflect.Message { + mi := &file_azcopy_grpc_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EmptyReply.ProtoReflect.Descriptor instead. +func (*EmptyReply) Descriptor() ([]byte, []int) { + return file_azcopy_grpc_proto_rawDescGZIP(), []int{0} +} + +func (x *EmptyReply) GetStatus() string { + if x != nil && x.Status != nil { + return *x.Status + } + return "" +} + +type Token struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + BearerToken *string `protobuf:"bytes,1,req,name=BearerToken" json:"BearerToken,omitempty"` + UnixExpiry *int64 `protobuf:"varint,2,req,name=UnixExpiry" json:"UnixExpiry,omitempty"` + UnixLive *int64 `protobuf:"varint,3,req,name=UnixLive" json:"UnixLive,omitempty"` + Wiggle *int64 `protobuf:"varint,4,req,name=Wiggle" json:"Wiggle,omitempty"` +} + +func (x *Token) Reset() { + *x = Token{} + if protoimpl.UnsafeEnabled { + mi := &file_azcopy_grpc_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Token) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Token) ProtoMessage() {} + +func (x *Token) ProtoReflect() protoreflect.Message { + mi := &file_azcopy_grpc_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Token.ProtoReflect.Descriptor instead. +func (*Token) Descriptor() ([]byte, []int) { + return file_azcopy_grpc_proto_rawDescGZIP(), []int{1} +} + +func (x *Token) GetBearerToken() string { + if x != nil && x.BearerToken != nil { + return *x.BearerToken + } + return "" +} + +func (x *Token) GetUnixExpiry() int64 { + if x != nil && x.UnixExpiry != nil { + return *x.UnixExpiry + } + return 0 +} + +func (x *Token) GetUnixLive() int64 { + if x != nil && x.UnixLive != nil { + return *x.UnixLive + } + return 0 +} + +func (x *Token) GetWiggle() int64 { + if x != nil && x.Wiggle != nil { + return *x.Wiggle + } + return 0 +} + +var File_azcopy_grpc_proto protoreflect.FileDescriptor + +var file_azcopy_grpc_proto_rawDesc = []byte{ + 0x0a, 0x11, 0x61, 0x7a, 0x63, 0x6f, 0x70, 0x79, 0x2d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x22, 0x24, 0x0a, 0x0a, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x52, 0x65, 0x70, 0x6c, + 0x79, 0x12, 0x16, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x02, 0x28, + 0x09, 0x52, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x7d, 0x0a, 0x05, 0x54, 0x6f, 0x6b, + 0x65, 0x6e, 0x12, 0x20, 0x0a, 0x0b, 0x42, 0x65, 0x61, 0x72, 0x65, 0x72, 0x54, 0x6f, 0x6b, 0x65, + 0x6e, 0x18, 0x01, 0x20, 0x02, 0x28, 0x09, 0x52, 0x0b, 0x42, 0x65, 0x61, 0x72, 0x65, 0x72, 0x54, + 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x1e, 0x0a, 0x0a, 0x55, 0x6e, 0x69, 0x78, 0x45, 0x78, 0x70, 0x69, + 0x72, 0x79, 0x18, 0x02, 0x20, 0x02, 0x28, 0x03, 0x52, 0x0a, 0x55, 0x6e, 0x69, 0x78, 0x45, 0x78, + 0x70, 0x69, 0x72, 0x79, 0x12, 0x1a, 0x0a, 0x08, 0x55, 0x6e, 0x69, 0x78, 0x4c, 0x69, 0x76, 0x65, + 0x18, 0x03, 0x20, 0x02, 0x28, 0x03, 0x52, 0x08, 0x55, 0x6e, 0x69, 0x78, 0x4c, 0x69, 0x76, 0x65, + 0x12, 0x16, 0x0a, 0x06, 0x57, 0x69, 0x67, 0x67, 0x6c, 0x65, 0x18, 0x04, 0x20, 0x02, 0x28, 0x03, + 0x52, 0x06, 0x57, 0x69, 0x67, 0x67, 0x6c, 0x65, 0x32, 0x35, 0x0a, 0x0d, 0x41, 0x7a, 0x43, 0x6f, + 0x70, 0x79, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x12, 0x24, 0x0a, 0x0b, 0x49, 0x6e, 0x6a, + 0x65, 0x63, 0x74, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x06, 0x2e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, + 0x1a, 0x0b, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, + 0x38, 0x5a, 0x36, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x41, 0x7a, + 0x75, 0x72, 0x65, 0x2f, 0x61, 0x7a, 0x75, 0x72, 0x65, 0x2d, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, + 0x65, 0x2d, 0x61, 0x7a, 0x63, 0x6f, 0x70, 0x79, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x63, 0x74, 0x6c, + 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, +} + +var ( + file_azcopy_grpc_proto_rawDescOnce sync.Once + file_azcopy_grpc_proto_rawDescData = file_azcopy_grpc_proto_rawDesc +) + +func file_azcopy_grpc_proto_rawDescGZIP() []byte { + file_azcopy_grpc_proto_rawDescOnce.Do(func() { + file_azcopy_grpc_proto_rawDescData = protoimpl.X.CompressGZIP(file_azcopy_grpc_proto_rawDescData) + }) + return file_azcopy_grpc_proto_rawDescData +} + +var file_azcopy_grpc_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_azcopy_grpc_proto_goTypes = []interface{}{ + (*EmptyReply)(nil), // 0: EmptyReply + (*Token)(nil), // 1: Token +} +var file_azcopy_grpc_proto_depIdxs = []int32{ + 1, // 0: AzCopyControl.InjectToken:input_type -> Token + 0, // 1: AzCopyControl.InjectToken:output_type -> EmptyReply + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_azcopy_grpc_proto_init() } +func file_azcopy_grpc_proto_init() { + if File_azcopy_grpc_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_azcopy_grpc_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*EmptyReply); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_azcopy_grpc_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Token); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_azcopy_grpc_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_azcopy_grpc_proto_goTypes, + DependencyIndexes: file_azcopy_grpc_proto_depIdxs, + MessageInfos: file_azcopy_grpc_proto_msgTypes, + }.Build() + File_azcopy_grpc_proto = out.File + file_azcopy_grpc_proto_rawDesc = nil + file_azcopy_grpc_proto_goTypes = nil + file_azcopy_grpc_proto_depIdxs = nil +} diff --git a/grpcctl/internal/grpcctl.proto b/grpcctl/internal/grpcctl.proto new file mode 100644 index 000000000..e7c565f66 --- /dev/null +++ b/grpcctl/internal/grpcctl.proto @@ -0,0 +1,27 @@ +//protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative grpcctl.proto + +/* +This protocol definition is subject to change, and will definitely break at some point. + +Do not, under any circumstances, rely upon this protocol file. No reliability is guaranteed. Here be dragons. + */ + +option go_package = "github.com/Azure/azure-storage-azcopy/grpcctl/internal"; + +// todo: Support e2e testing magicks and otherwise. + +service AzCopyControl { + rpc InjectToken(Token) returns (EmptyReply) {} +} + +message EmptyReply { + // Usually "OK" + required string Status = 1; +} + +message Token { + required string BearerToken = 1; + required int64 UnixExpiry = 2; + required int64 UnixLive = 3; + required int64 Wiggle = 4; +} \ No newline at end of file diff --git a/grpcctl/internal/grpcctl_grpc.pb.go b/grpcctl/internal/grpcctl_grpc.pb.go new file mode 100644 index 000000000..00982b226 --- /dev/null +++ b/grpcctl/internal/grpcctl_grpc.pb.go @@ -0,0 +1,105 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v5.27.1 +// source: grpcctl.proto + +package internal + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// AzCopyControlClient is the client API for AzCopyControl service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type AzCopyControlClient interface { + InjectToken(ctx context.Context, in *Token, opts ...grpc.CallOption) (*EmptyReply, error) +} + +type azCopyControlClient struct { + cc grpc.ClientConnInterface +} + +func NewAzCopyControlClient(cc grpc.ClientConnInterface) AzCopyControlClient { + return &azCopyControlClient{cc} +} + +func (c *azCopyControlClient) InjectToken(ctx context.Context, in *Token, opts ...grpc.CallOption) (*EmptyReply, error) { + out := new(EmptyReply) + err := c.cc.Invoke(ctx, "/AzCopyControl/InjectToken", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// AzCopyControlServer is the server API for AzCopyControl service. +// All implementations must embed UnimplementedAzCopyControlServer +// for forward compatibility +type AzCopyControlServer interface { + InjectToken(context.Context, *Token) (*EmptyReply, error) + mustEmbedUnimplementedAzCopyControlServer() +} + +// UnimplementedAzCopyControlServer must be embedded to have forward compatible implementations. +type UnimplementedAzCopyControlServer struct { +} + +func (UnimplementedAzCopyControlServer) InjectToken(context.Context, *Token) (*EmptyReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method InjectToken not implemented") +} +func (UnimplementedAzCopyControlServer) mustEmbedUnimplementedAzCopyControlServer() {} + +// UnsafeAzCopyControlServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to AzCopyControlServer will +// result in compilation errors. +type UnsafeAzCopyControlServer interface { + mustEmbedUnimplementedAzCopyControlServer() +} + +func RegisterAzCopyControlServer(s grpc.ServiceRegistrar, srv AzCopyControlServer) { + s.RegisterService(&AzCopyControl_ServiceDesc, srv) +} + +func _AzCopyControl_InjectToken_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Token) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(AzCopyControlServer).InjectToken(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/AzCopyControl/InjectToken", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(AzCopyControlServer).InjectToken(ctx, req.(*Token)) + } + return interceptor(ctx, in, info, handler) +} + +// AzCopyControl_ServiceDesc is the grpc.ServiceDesc for AzCopyControl service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var AzCopyControl_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "AzCopyControl", + HandlerType: (*AzCopyControlServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "InjectToken", + Handler: _AzCopyControl_InjectToken_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "grpcctl.proto", +} diff --git a/grpcctl/server.go b/grpcctl/server.go new file mode 100644 index 000000000..a159eed76 --- /dev/null +++ b/grpcctl/server.go @@ -0,0 +1,118 @@ +package grpcctl + +import ( + "context" + "errors" + "fmt" + "github.com/Azure/azure-storage-azcopy/v10/grpcctl/internal" + "github.com/google/uuid" + "google.golang.org/grpc" + "reflect" + "sync" + "time" +) + +var JobLog func(string) + +var GlobalServer = func() *Server { + out := &Server{ + subscriptions: make(map[string]map[uuid.UUID]EventSubscriberRaw), + subLock: &sync.RWMutex{}, + } + + return out +}() + +var GlobalGRPCServer = func() *grpc.Server { + s := grpc.NewServer() + + internal.RegisterAzCopyControlServer(s, GlobalServer) + + return s +}() + +type Server struct { + // map[In][Func] = set + // Uses random IDs instead of functions because we can't hash those + subscriptions map[string]map[uuid.UUID]EventSubscriberRaw + subLock *sync.RWMutex + internal.UnimplementedAzCopyControlServer +} + +func fetchSubscriptionName[I any](_ ...I) string { + var in I + return reflect.TypeOf(in).Name() +} + +func Subscribe[I any](s *Server, event EventSubscriber[I]) uuid.UUID { + maxSubs := validSubscriptionCounts[fetchSubscriptionName[I]()] + if int64(len(s.subscriptions[fetchSubscriptionName[I]()])) >= maxSubs { + return uuid.Nil + } + + if _, ok := s.subscriptions[fetchSubscriptionName[I]()]; !ok { + s.subscriptions[fetchSubscriptionName[I]()] = make(map[uuid.UUID]EventSubscriberRaw) + } + + // Add the event to the table + s.subLock.Lock() + eventId := uuid.New() + s.subscriptions[fetchSubscriptionName[I]()][eventId] = func(a any) { + event(a.(*I)) + } + s.subLock.Unlock() + + return eventId +} + +func Unsubscribe[I any](s *Server, event uuid.UUID) { + delete(s.subscriptions[fetchSubscriptionName[I]()], event) +} + +func fireEvent[I any](s *Server, input *I, name string) { + if JobLog != nil { + JobLog("Received event " + name) + } + + s.subLock.RLock() + for _, fun := range s.subscriptions[fetchSubscriptionName[I]()] { + if JobLog != nil { + JobLog("Firing event " + name) + } + + // Run it async, nothing important is output (yet) + go fun(input) + } + s.subLock.RUnlock() +} + +func (s *Server) InjectToken(ctx context.Context, tok *internal.Token) (*internal.EmptyReply, error) { + ok := "ok" + + ev := &OAuthTokenUpdate{ + Token: tok.GetBearerToken(), + Live: time.Unix(tok.GetUnixLive(), 0), + Expiry: time.Unix(tok.GetUnixExpiry(), 0), + Wiggle: time.Duration(tok.GetWiggle()) * time.Second, + } + + // Warn about obvious cases where the token was completely invalid + if ev.Token == "" { + if JobLog != nil { + JobLog("Received an empty OAuth token via GRPC") + } + return &internal.EmptyReply{}, errors.New("token was empty") + } + + if ev.Live.After(ev.Expiry.Add(-ev.Wiggle)) || time.Now().After(ev.Expiry.Add(-ev.Wiggle)) { + if JobLog != nil { + JobLog("Received a pre-expired OAuth token via GRPC") + } + return &internal.EmptyReply{}, fmt.Errorf("token has expired before it was received (now: %v live: %v exp: %v)", time.Now(), ev.Live, ev.Expiry) + } + + // Fire events + fireEvent(s, ev, "oauthtoken") + + return &internal.EmptyReply{Status: &ok}, nil +} diff --git a/release-pipeline.yml b/release-pipeline.yml index 5527936bd..8d80a5327 100644 --- a/release-pipeline.yml +++ b/release-pipeline.yml @@ -110,6 +110,10 @@ stages: GOARCH=amd64 GOOS=linux go build -tags "netgo,se_integration" -o "$(root_dir)/azcopy_linux_se_amd64" displayName: 'Generate Linux AMD64 SE Integration' + - script: | + GOARCH=amd64 GOOS=linux go build -tags "netgo,grpc" -o "$(root_dir)/azcopy_linux_grpc_amd64" + displayName: 'Generate Linux AMD64 GRPC integration' + - script: | sudo ls -lRt $(root_dir)/ cp $(root_dir)/azcopy_linux* $(Build.ArtifactStagingDirectory) @@ -135,9 +139,13 @@ stages: linux_se_amd64_dir="$(work_dir)/azcopy_linux_se_amd64_$(azcopy_version)" echo "##vso[task.setvariable variable=linux_se_amd64_dir]$linux_se_amd64_dir" + + linux_grpc_amd64_dir="$(work_dir)/azcopy_linux_grpc_amd64_$(azcopy_version)" + echo "##vso[task.setvariable variable=linux_grpc_amd64_dir]$linux_grpc_amd64_dir" mkdir -p $linux_amd64_dir mkdir -p $linux_se_amd64_dir + mkdir -p $linux_grpc_amd64_dir mkdir -p $(archives) ls -ltR $(work_dir) displayName: 'Create required directories' @@ -146,9 +154,11 @@ stages: mkdir -p pkgDir/usr/bin/ mv $(root_dir)/azcopy_linux_amd64 $(linux_amd64_dir)/azcopy mv $(root_dir)/azcopy_linux_se_amd64 $(linux_se_amd64_dir)/azcopy + mv $(root_dir)/azcopy_linux_grpc_amd64 $(linux_grpc_amd64_dir)/azcopy cp $(linux_amd64_dir)/azcopy pkgDir/usr/bin/ cp NOTICE.txt $(linux_amd64_dir)/ cp NOTICE.txt $(linux_se_amd64_dir)/ + cp NOTICE.txt $(linux_grpc_amd64_dir)/ cp NOTICE.txt pkgDir/usr/bin/ displayName: 'Copy required files for packaging' @@ -185,13 +195,21 @@ stages: continueOnError: true - task: ArchiveFiles@2 - displayName: 'Archive Amd Linux Build' + displayName: 'Archive AMD64 Linux Build' inputs: rootFolderOrFile: '$(linux_amd64_dir)' archiveType: tar archiveFile: '$(archives)/azcopy_linux_amd64_$(azcopy_version).tar.gz' continueOnError: true + - task: ArchiveFiles@2 + displayName: 'Archive AMD64 GRPC Linux Build' + inputs: + rootFolderOrFile: '$(linux_grpc_amd64_dir)' + archiveType: tar + archiveFile: '$(archives)/azcopy_linux_grpc_amd64_$(azcopy_version).tar.gz' + continueOnError: true + - script: | cp -r $(archives)/* $(Build.ArtifactStagingDirectory) workingDirectory: $(root_dir) @@ -271,6 +289,10 @@ stages: GOARCH=arm64 GOOS=linux go build -tags "netgo,se_integration" -o "$(root_dir)/azcopy_linux_se_arm64" displayName: 'Generate Linux ARM64 SE Integration' + - script: | + GOARCH=arm64 GOOS=linux go build -tags "netgo,grpc" -o "$(root_dir)/azcopy_linux_grpc_arm64" + displayName: 'Generate Linux ARM64 GRPC integration' + - script: | sudo ls -lRt $(root_dir)/ cp -r $(root_dir)/azcopy_linux* $(Build.ArtifactStagingDirectory) @@ -296,9 +318,13 @@ stages: linux_se_arm64_dir="$(work_dir)/azcopy_linux_se_arm64_$(azcopy_version)" echo "##vso[task.setvariable variable=linux_se_arm64_dir]$linux_se_arm64_dir" + + linux_grpc_arm64_dir="$(work_dir)/azcopy_linux_grpc_arm64_$(azcopy_version)" + echo "##vso[task.setvariable variable=linux_grpc_arm64_dir]$linux_grpc_arm64_dir" mkdir -p $linux_arm64_dir mkdir -p $linux_se_arm64_dir + mkdir -p $linux_grpc_arm64_dir mkdir -p $(archives) displayName: 'Create required directories' @@ -307,8 +333,10 @@ stages: cp NOTICE.txt pkgDir/usr/bin/ mv $(root_dir)/azcopy_linux_arm64 $(linux_arm64_dir)/azcopy mv $(root_dir)/azcopy_linux_se_arm64 $(linux_se_arm64_dir)/azcopy + mv $(root_dir)/azcopy_linux_grpc_arm64 $(linux_grpc_arm64_dir)/azcopy cp $(linux_arm64_dir)/azcopy pkgDir/usr/bin/ cp NOTICE.txt $(linux_arm64_dir)/ + cp NOTICE.txt $(linux_grpc_arm64_dir)/ cp NOTICE.txt $(linux_se_arm64_dir)/ displayName: 'Copy required files for packaging' @@ -351,7 +379,15 @@ stages: archiveType: tar archiveFile: '$(archives)/azcopy_linux_se_arm64_$(azcopy_version).tar.gz' continueOnError: true - + + - task: ArchiveFiles@2 + displayName: 'Archive GRPC Linux Build' + inputs: + rootFolderOrFile: '$(linux_grpc_arm64_dir)' + archiveType: tar + archiveFile: '$(archives)/azcopy_linux_grpc_arm64_$(azcopy_version).tar.gz' + continueOnError: true + - script: | cp -r $(archives)/* $(Build.ArtifactStagingDirectory) workingDirectory: $(root_dir) @@ -404,6 +440,14 @@ stages: GOOS: windows CGO_ENABLED: 0 + - script: | + go build -tags "netgo,grpc" -o "$(Build.ArtifactStagingDirectory)\azcopy_windows_grpc_amd64.exe" + displayName: 'Generate Windows amd64 GRPC integration' + env: + GOARCH: amd64 + GOOS: windows + CGO_ENABLED: 0 + - script: | go build -o "$(Build.ArtifactStagingDirectory)\azcopy_windows_386.exe" displayName: 'Generate Windows i386' @@ -412,6 +456,14 @@ stages: GOOS: windows CGO_ENABLED: 0 + - script: | + go build -tags "netgo,grpc" -o "$(Build.ArtifactStagingDirectory)\azcopy_windows_grpc_386.exe" + displayName: 'Generate Windows i386 GRPC integration' + env: + GOARCH: 386 + GOOS: windows + CGO_ENABLED: 0 + - script: | go build -o "$(Build.ArtifactStagingDirectory)\azcopy_windows_v7_arm.exe" displayName: 'Generate Windows ARM' @@ -474,11 +526,19 @@ stages: - script: | CGO_ENABLED=1 go build -o "$(root_dir)/azcopy_darwin_amd64" - displayName: 'Generate MacOS Build with AMD64' + displayName: 'Build MacOS AMD64' + + - script: | + CGO_ENABLED=1 go build -tags "grpc" -o "$(root_dir)/azcopy_darwin_grpc_amd64" + displayName: 'Build MacOS ARM64 GRPC integration' - script: | GOARCH=arm64 CGO_ENABLED=1 go build -o "$(root_dir)/azcopy_darwin_arm64" - displayName: 'Test Cross-compiled MacOS Build with ARM64' + displayName: 'Build Cross-compiled MacOS ARM64' + + - script: | + GOARCH=arm64 CGO_ENABLED=1 go build -tags "grpc" -o "$(root_dir)/azcopy_darwin_grpc_arm64" + displayName: 'Build MacOS ARM64 GRPC integration' - task: Bash@3 displayName: 'Extract AZCopy version' @@ -491,20 +551,32 @@ stages: - script: | darwin_amd_dir="$(work_dir)/azcopy_darwin_amd64_$(azcopy_version)" echo "##vso[task.setvariable variable=darwin_amd_dir]$darwin_amd_dir" + + darwin_grpc_amd_dir="$(work_dir)/azcopy_darwin_grpc_amd64_$(azcopy_version)" + echo "##vso[task.setvariable variable=darwin_grpc_amd_dir]$darwin_grpc_amd_dir" darwin_arm_dir="$(work_dir)/azcopy_darwin_arm64_$(azcopy_version)" echo "##vso[task.setvariable variable=darwin_arm_dir]$darwin_arm_dir" + darwin_grpc_arm_dir="$(work_dir)/azcopy_darwin_grpc_arm64_$(azcopy_version)" + echo "##vso[task.setvariable variable=darwin_grpc_arm_dir]$darwin_grpc_arm_dir" + mkdir -p $darwin_amd_dir + mkdir -p $darwin_grpc_amd_dir mkdir -p $darwin_arm_dir + mkdir -p $darwin_grpc_arm_dir mkdir -p $(archives) displayName: 'Create required directories' - script: | cp NOTICE.txt $(darwin_amd_dir) + cp NOTICE.txt $(darwin_grpc_amd_dir) cp NOTICE.txt $(darwin_arm_dir) + cp NOTICE.txt $(darwin_grpc_arm_dir) mv $(root_dir)/azcopy_darwin_amd64 $(darwin_amd_dir)/azcopy + mv $(root_dir)/azcopy_darwin_grpc_amd64 $(darwin_grpc_amd_dir)/azcopy mv $(root_dir)/azcopy_darwin_arm64 $(darwin_arm_dir)/azcopy + mv $(root_dir)/azcopy_darwin_grpc_arm64 $(darwin_grpc_arm_dir)/azcopy displayName: 'Copy required files for packaging' - task: ArchiveFiles@2 @@ -514,6 +586,13 @@ stages: archiveFile: '$(archives)/azcopy_darwin_amd64_$(azcopy_version).zip' continueOnError: true + - task: ArchiveFiles@2 + displayName: 'Archive MacOS GRPC AMD64 Build (must happen before signing/notarization)' + inputs: + rootFolderOrFile: '$(darwin_grpc_amd_dir)' + archiveFile: '$(archives)/azcopy_darwin_grpc_amd64_$(azcopy_version).zip' + continueOnError: true + - task: ArchiveFiles@2 displayName: 'Archive MacOS ARM64 Build (must happen before signing/notarization)' inputs: @@ -521,6 +600,13 @@ stages: archiveFile: '$(archives)/azcopy_darwin_arm64_$(azcopy_version).zip' continueOnError: true + - task: ArchiveFiles@2 + displayName: 'Archive MacOS GRPC ARM64 Build (must happen before signing/notarization)' + inputs: + rootFolderOrFile: '$(darwin_grpc_arm_dir)' + archiveFile: '$(archives)/azcopy_darwin_grpc_arm64_$(azcopy_version).zip' + continueOnError: true + - script: | cp $(archives)/azcopy_darwin* $(Build.ArtifactStagingDirectory) displayName: 'Copy zip to staging directory' @@ -815,28 +901,37 @@ stages: windows_amd64="$(work_dir)/azcopy_windows_amd64_$(azcopy_version)" echo "##vso[task.setvariable variable=windows_amd64]$windows_amd64" + windows_grpc_amd64="$(work_dir)/azcopy_windows_grpc_amd64_$(azcopy_version)" + echo "##vso[task.setvariable variable=windows_grpc_amd64]$windows_grpc_amd64" + windows_arm64="$(work_dir)/azcopy_windows_arm64_$(azcopy_version)" echo "##vso[task.setvariable variable=windows_arm64]$windows_arm64" windows_386="$(work_dir)/azcopy_windows_386_$(azcopy_version)" echo "##vso[task.setvariable variable=windows_386]$windows_386" + + windows_grpc_386="$(work_dir)/azcopy_windows_grpc_386_$(azcopy_version)" + echo "##vso[task.setvariable variable=windows_grpc_386]$windows_grpc_386" mkdir -p $windows_amd64 + mkdir -p $windows_grpc_amd64 mkdir -p $windows_arm64 mkdir -p $windows_386 + mkdir -p $windows_grpc_386 mkdir -p $(archives) displayName: 'Create required directories' - script: | mv $(Build.ArtifactStagingDirectory)/azCopy-windows-temp/azcopy_windows_amd64.exe $(windows_amd64)/azcopy.exe + mv $(Build.ArtifactStagingDirectory)/azCopy-windows-temp/azcopy_windows_grpc_amd64.exe $(windows_grpc_amd64)/azcopy.exe mv $(Build.ArtifactStagingDirectory)/azCopy-windows-temp/azcopy_windows_arm64.exe $(windows_arm64)/azcopy.exe mv $(Build.ArtifactStagingDirectory)/azCopy-windows-temp/azcopy_windows_386.exe $(windows_386)/azcopy.exe + mv $(Build.ArtifactStagingDirectory)/azCopy-windows-temp/azcopy_windows_grpc_386.exe $(windows_grpc_386)/azcopy.exe cp NOTICE.txt $(windows_amd64) + cp NOTICE.txt $(windows_grpc_amd64) cp NOTICE.txt $(windows_arm64) cp NOTICE.txt $(windows_386) - zip -r $(archives)/azcopy_windows_amd64_$(azcopy_version).zip . - zip -r $(archives)/azcopy_windows_arm64_$(azcopy_version).zip . - zip -r $(archives)/azcopy_windows_386_$(azcopy_version).zip . + cp NOTICE.txt $(windows_grpc_386) displayName: 'Copy required files' - task: ArchiveFiles@2 @@ -846,6 +941,13 @@ stages: archiveFile: '$(archives)/azcopy_windows_amd64_$(azcopy_version).zip' continueOnError: true + - task: ArchiveFiles@2 + displayName: 'Archive Windows 64 bit GRPC AMD Build' + inputs: + rootFolderOrFile: '$(windows_grpc_amd64)' + archiveFile: '$(archives)/azcopy_windows_grpc_amd64_$(azcopy_version).zip' + continueOnError: true + - task: ArchiveFiles@2 displayName: 'Archive Windows 64 bit ARM Build' inputs: @@ -860,6 +962,13 @@ stages: archiveFile: '$(archives)/azcopy_windows_386_$(azcopy_version).zip' continueOnError: true + - task: ArchiveFiles@2 + displayName: 'Archive Windows 32 bit GRPC Build' + inputs: + rootFolderOrFile: '$(windows_grpc_386)' + archiveFile: '$(archives)/azcopy_windows_grpc_386_$(azcopy_version).zip' + continueOnError: true + - script: | cp $(archives)/azcopy_windows* $(Build.ArtifactStagingDirectory) displayName: 'Copy zip to staging directory' @@ -949,18 +1058,28 @@ stages: - script: | mkdir -p unzipped_amd64 + mkdir -p unzipped_grpc_amd64 mkdir -p unzipped_arm64 + mkdir -p unzipped_grpc_arm64 mkdir -p unzipped_m1_arm64 - unzip -o azcopy*amd64*.zip -d unzipped_amd64 + unzip -o azcopy_darwin_amd64*.zip -d unzipped_amd64 + unzip -o azcopy_darwin_grpc_amd64*.zip -d unzipped_grpc_amd64 unzip -o azcopy_darwin_arm64*.zip -d unzipped_arm64 + unzip -o azcopy_darwin_grpc_arm64*.zip -d unzipped_grpc_arm64 unzip -o azcopy_darwin_m1_arm64*.zip -d unzipped_m1_arm64 # Use find to retrieve the folder name amd64FolderName=$(find unzipped_amd64 -maxdepth 1 -type d -name "azcopy_darwin_amd64*") echo "AMD64 Folder name: $amd64FolderName" + + grpcAMD64FolderName=$(find unzipped_grpc_amd64 -maxdepth 1 -type d -name "azcopy_darwin_grpc_amd64*") + echo "AMD64 GRPC Folder name: $grpcAMD64FolderName" arm64FolderName=$(find unzipped_arm64 -maxdepth 1 -type d -name "azcopy_darwin_arm64*") echo "ARM64 Folder name: $arm64FolderName" + + grpcARM64FolderName=$(find unzipped_grpc_arm64 -maxdepth 1 -type d -name "azcopy_darwin_grpc_arm64*") + echo "ARM64 GRPC Folder name: $grpcARM64FolderName" m1Arm64FolderName=$(find unzipped_m1_arm64 -maxdepth 1 -type d -name "azcopy_darwin_m1_arm64*") echo "ARM64 M1 Folder name: $m1Arm64FolderName" @@ -974,6 +1093,18 @@ stages: echo "Error: Expected exactly one matching AMD64 directory, found $(echo "$amd64FolderName" | wc -l)." exit 1 fi + + cd $(Build.ArtifactStagingDirectory) + + # Check if there is exactly one matching directory for amd64 + if [ $(echo "$grpcAMD64FolderName" | wc -l) -eq 1 ]; then + echo "Found matching grpc AMD64 directory: $grpcAMD64FolderName" + cd "$grpcAMD64FolderName" + mv azcopy $(Build.ArtifactStagingDirectory)/azcopy_darwin_grpc_amd64 + else + echo "Error: Expected exactly one matching GRPC AMD64 directory, found $(echo "$grpcAMD64FolderName" | wc -l)." + exit 1 + fi cd $(Build.ArtifactStagingDirectory) @@ -986,6 +1117,18 @@ stages: echo "Error: Expected exactly one matching ARM64 directory, found $(echo "$arm64FolderName" | wc -l)." exit 1 fi + + cd $(Build.ArtifactStagingDirectory) + + # Check if there is exactly one matching directory for arm64 + if [ $(echo "$grpcARM64FolderName" | wc -l) -eq 1 ]; then + echo "Found matching ARM64 directory: $arm64FolderName" + cd "$grpcARM64FolderName" + mv azcopy $(Build.ArtifactStagingDirectory)/azcopy_darwin_grpc_arm64 + else + echo "Error: Expected exactly one matching ARM64 directory, found $(echo "$grpcARM64FolderName" | wc -l)." + exit 1 + fi cd $(Build.ArtifactStagingDirectory) @@ -1047,6 +1190,7 @@ stages: ls -ltR $(System.DefaultWorkingDirectory)/azCopy-binaries/ chmod 755 $(System.DefaultWorkingDirectory)/azCopy-binaries/* $(System.DefaultWorkingDirectory)/azCopy-binaries/azcopy_linux_amd64 --version + $(System.DefaultWorkingDirectory)/azCopy-binaries/azcopy_linux_grpc_amd64 --version $(System.DefaultWorkingDirectory)/azCopy-binaries/azcopy_linux_se_amd64 --version rm -rf /usr/local/bin/azcopy rm -rf /usr/bin/azcopy @@ -1159,6 +1303,38 @@ stages: ./azcopy --help displayName: 'Check Version and Help' workingDirectory: $(System.DefaultWorkingDirectory)/azCopy-linux-signed + + - script: | + TAR_GZ_FILE=$(ls azcopy_linux_grpc_amd64*.tar.gz) + EXTRACT_DIR="$(Build.ArtifactStagingDirectory)/extracted_grpc" + + # Create extraction directory + mkdir -p "$EXTRACT_DIR" + + # Extract the .tar.gz file + tar -xzvf "$TAR_GZ_FILE" -C "$EXTRACT_DIR" + if [ $? -ne 0 ]; then + echo "Error: Failed to extract $TAR_GZ_FILE" + exit 1 + fi + + # Find the directory matching the pattern + matching_dirs=("$EXTRACT_DIR"/azcopy_linux_grpc_amd64*) + + # Check if there is exactly one matching directory + if [ ${#matching_dirs[@]} -eq 1 ]; then + echo "Found matching directory: ${matching_dirs[0]}" + cd "${matching_dirs[0]}" + else + echo "Error: Expected exactly one matching directory, found ${#matching_dirs[@]}." + exit 1 + fi + + # Run azcopy commands + ./azcopy --version + ./azcopy --help + displayName: 'Check Version and Help' + workingDirectory: $(System.DefaultWorkingDirectory)/azCopy-linux-signed - job: Set_3 timeoutInMinutes: 120 @@ -1255,6 +1431,39 @@ stages: displayName: 'Check Version and Help for SE arm64 tar.gz' workingDirectory: $(System.DefaultWorkingDirectory)/azCopy-linux-signed + - script: | + TAR_GZ_FILE=$(ls azcopy_linux_grpc_arm64*.tar.gz) + EXTRACT_DIR="$(Build.ArtifactStagingDirectory)/extracted_grpc" + + # Create extraction directory + mkdir -p "$EXTRACT_DIR" + + # Extract the .tar.gz file + tar -xzvf "$TAR_GZ_FILE" -C "$EXTRACT_DIR" + if [ $? -ne 0 ]; then + echo "Error: Failed to extract $TAR_GZ_FILE" + exit 1 + fi + + # Find the directory matching the pattern + matching_dirs=("$EXTRACT_DIR"/azcopy_linux_grpc_arm64*) + + # Check if there is exactly one matching directory + if [ ${#matching_dirs[@]} -eq 1 ]; then + echo "Found matching directory: ${matching_dirs[0]}" + cd "${matching_dirs[0]}" + else + echo "Error: Expected exactly one matching directory, found ${#matching_dirs[@]}." + exit 1 + fi + + # Run azcopy commands + ./azcopy --version + ./azcopy --help + + displayName: 'Check Version and Help for SE arm64 tar.gz' + workingDirectory: $(System.DefaultWorkingDirectory)/azCopy-linux-signed + - job: Set_4 timeoutInMinutes: 120 strategy: @@ -1289,6 +1498,7 @@ stages: ls -ltR $(System.DefaultWorkingDirectory)/azCopy-binaries chmod 755 $(System.DefaultWorkingDirectory)/azCopy-binaries/* $(System.DefaultWorkingDirectory)/azCopy-binaries/azcopy_linux_arm64 --version + $(System.DefaultWorkingDirectory)/azCopy-binaries/azcopy_linux_grpc_arm64 --version $(System.DefaultWorkingDirectory)/azCopy-binaries/azcopy_linux_se_arm64 --version displayName: 'Test signed linux binaries' @@ -1496,7 +1706,7 @@ stages: fi # Define variables - ZIP_FILE_PATH=$(ls $(Build.ArtifactStagingDirectory)/azCopy-mac-signed/azcopy*amd64*.zip) + ZIP_FILE_PATH=$(ls $(Build.ArtifactStagingDirectory)/azCopy-mac-signed/azcopy_darwin_amd64*.zip) EXTRACT_DIR="$(Build.ArtifactStagingDirectory)/extracted" # Create extraction directory @@ -1520,6 +1730,41 @@ stages: ./azcopy --version ./azcopy --help displayName: 'Install Zip, Extract Files, and Run Version and Help Command' + + - script: | + # Install Homebrew if not already installed + if ! command -v brew &> /dev/null + then + /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)" + echo 'eval "$(/opt/homebrew/bin/brew shellenv)"' >> /Users/$(whoami)/.zprofile + eval "$(/opt/homebrew/bin/brew shellenv)" + fi + + # Define variables + ZIP_FILE_PATH=$(ls $(Build.ArtifactStagingDirectory)/azCopy-mac-signed/azcopy_darwin_grpc_amd64*.zip) + EXTRACT_DIR="$(Build.ArtifactStagingDirectory)/extracted" + + # Create extraction directory + mkdir -p $EXTRACT_DIR + + # Extract the zip file + unzip $ZIP_FILE_PATH -d $EXTRACT_DIR + + # Find the directory matching the pattern + matching_dirs=($EXTRACT_DIR/azcopy_darwin_grpc_amd64*) + + # Check if there is exactly one matching directory + if [ ${#matching_dirs[@]} -eq 1 ]; then + cd "${matching_dirs[0]}" + else + echo "Error: Expected exactly one matching directory, found ${#matching_dirs[@]}." + exit 1 + fi + + # Run the azcopy version command + ./azcopy --version + ./azcopy --help + displayName: 'Install Zip, Extract Files, and Run Version and Help Command' - job: Set_9 timeoutInMinutes: 120 @@ -1563,10 +1808,10 @@ stages: $extractDir = "$(Build.ArtifactStagingDirectory)\extracted" # Find the zip file matching the pattern - $zipFile = Get-ChildItem -Path $artifactDir -Filter "azcopy*amd64*.zip" | Select-Object -First 1 + $zipFile = Get-ChildItem -Path $artifactDir -Filter "azcopy_windows_amd64*.zip" | Select-Object -First 1 if ($null -eq $zipFile) { - Write-Error "No zip file found matching pattern 'azcopy*amd64*.zip' in directory: $artifactDir" + Write-Error "No zip file found matching pattern 'azcopy_windows_amd64*.zip' in directory: $artifactDir" exit 1 } @@ -1599,6 +1844,78 @@ stages: exit 1 } + # Check if the azcopy executable exists in the matching directory + $azcopyPath = Join-Path -Path $matchingDirs.FullName -ChildPath "azcopy.exe" + if (-Not (Test-Path -Path $azcopyPath)) { + Write-Error "azcopy not found in extraction directory: $($matchingDirs.FullName)" + exit 1 + } else { + Write-Output "azcopy found in extraction directory: $($matchingDirs.FullName)" + # Ensure the azcopy executable has the correct permissions + icacls $azcopyPath + + # Run azcopy --version and capture output + try { + $versionOutput = & $azcopyPath --version 2>&1 + Write-Output "azcopy version output: $versionOutput" + } catch { + Write-Error "Failed to run azcopy --version: $_" + } + + # Run azcopy --help and capture output + try { + $helpOutput = & $azcopyPath --help 2>&1 + Write-Output "azcopy help output: $helpOutput" + } catch { + Write-Error "Failed to run azcopy --help: $_" + } + } + displayName: 'Extract Files and Run Version and Help Command on Windows' + + - task: PowerShell@2 + inputs: + targetType: 'inline' + script: | + $artifactDir = "$(Build.ArtifactStagingDirectory)\azCopy-windows-signed" + $extractDir = "$(Build.ArtifactStagingDirectory)\extracted" + + # Find the zip file matching the pattern + $zipFile = Get-ChildItem -Path $artifactDir -Filter "azcopy_windows_grpc_amd64*.zip" | Select-Object -First 1 + + if ($null -eq $zipFile) { + Write-Error "No zip file found matching pattern 'azcopy_windows_grpc_amd64*.zip' in directory: $artifactDir" + exit 1 + } + + $zipFilePath = $zipFile.FullName + + # Create extraction directory + New-Item -ItemType Directory -Path $extractDir -Force | Out-Null + + # Extract the zip file + try { + Add-Type -AssemblyName System.IO.Compression.FileSystem + [System.IO.Compression.ZipFile]::ExtractToDirectory($zipFilePath, $extractDir) + } catch { + Write-Error "Failed to extract zip file: $_" + exit 1 + } + + # Change to the directory containing azcopy + Set-Location -Path $extractDir + + # Find directories matching the pattern + $matchingDirs = Get-ChildItem -Directory -Path .\azcopy_windows_grpc_amd64* | Select-Object -First 1 + + # Check if there is exactly one matching directory + if ($matchingDirs.Count -eq 0) { + Write-Error "No directories found matching the pattern: .\azcopy_windows_grpc_amd64*" + exit 1 + } elseif ($matchingDirs.Count -gt 1) { + Write-Error "Multiple directories found matching the pattern: .\azcopy_windows_grpc_amd64*" + exit 1 + } + # Check if the azcopy executable exists in the matching directory $azcopyPath = Join-Path -Path $matchingDirs.FullName -ChildPath "azcopy.exe" if (-Not (Test-Path -Path $azcopyPath)) { @@ -1687,7 +2004,7 @@ stages: sudo dpkg --info azcopy-*x86_64.deb sudo dpkg -i azcopy-*x86_64.deb sudo apt-get install build-essential -y - displayName: 'Create drop.zip file and install azcopy deb package' + displayName: 'Install azcopy & zip deb packages' - script: | cp $(work_dir)/NOTICE.txt $(Build.ArtifactStagingDirectory)/azCopy-binaries diff --git a/ste/metadata-hack.go b/ste/metadata-hack.go new file mode 100644 index 000000000..2cd290707 --- /dev/null +++ b/ste/metadata-hack.go @@ -0,0 +1,24 @@ +package ste + +import ( + "github.com/Azure/azure-storage-azcopy/v10/common" + "regexp" +) + +var ormetadataregex = regexp.MustCompile("((([A-F]|[a-f]|[0-9])+-?)+_?){2}") + +// hacky hack, this does hacky things. +func FixBustedMetadata(m common.Metadata) common.Metadata { + // copy all metadata + out := common.Metadata{} + + for k, v := range m { + if ormetadataregex.MatchString(k) { + continue // ignore or metadata + } + + out[k] = v + } + + return out +} diff --git a/ste/sender-appendBlob.go b/ste/sender-appendBlob.go index e0ccfc933..77df6c70d 100644 --- a/ste/sender-appendBlob.go +++ b/ste/sender-appendBlob.go @@ -91,7 +91,7 @@ func newAppendBlobSenderBase(jptm IJobPartTransferMgr, destination string, pacer numChunks: numChunks, pacer: pacer, headersToApply: props.SrcHTTPHeaders.ToBlobHTTPHeaders(), - metadataToApply: props.SrcMetadata, + metadataToApply: FixBustedMetadata(props.SrcMetadata), blobTagsToApply: props.SrcBlobTags, sip: srcInfoProvider, soleChunkFuncSemaphore: semaphore.NewWeighted(1)}, nil diff --git a/ste/sender-azureFile.go b/ste/sender-azureFile.go index 2c2322d3c..c172a3adc 100644 --- a/ste/sender-azureFile.go +++ b/ste/sender-azureFile.go @@ -144,7 +144,7 @@ func newAzureFileSenderBase(jptm IJobPartTransferMgr, destination string, pacer smbPropertiesToApply: file.SMBProperties{}, permissionsToApply: file.Permissions{}, sip: sip, - metadataToApply: props.SrcMetadata, + metadataToApply: FixBustedMetadata(props.SrcMetadata), }, nil } @@ -484,7 +484,7 @@ func (d AzureFileParentDirCreator) CreateDirToRoot(ctx context.Context, shareCli if len(segments) == 0 { // If we are trying to create root, perform GetProperties instead. // Azure Files has delayed creation of root, and if we do not perform GetProperties, - // some operations like SetMetadata or SetProperties will fail. + // some operations like SetMetadata or SetProperties will fail. // TODO: Remove this block once the bug is fixed. _, err := directoryClient.GetProperties(ctx, nil) return err diff --git a/ste/sender-blobFS.go b/ste/sender-blobFS.go index 0fa9412e1..db630df4a 100644 --- a/ste/sender-blobFS.go +++ b/ste/sender-blobFS.go @@ -108,7 +108,7 @@ func newBlobFSSenderBase(jptm IJobPartTransferMgr, destination string, pacer pac pacer: pacer, creationTimeHeaders: &headers, flushThreshold: chunkSize * int64(ADLSFlushThreshold), - metadataToSet: props.SrcMetadata, + metadataToSet: FixBustedMetadata(props.SrcMetadata), }, nil } diff --git a/ste/sender-blobFolders.go b/ste/sender-blobFolders.go index 73ddc711b..29ffc2168 100644 --- a/ste/sender-blobFolders.go +++ b/ste/sender-blobFolders.go @@ -41,7 +41,7 @@ func newBlobFolderSender(jptm IJobPartTransferMgr, destination string, sip ISour jptm: jptm, sip: sip, destinationClient: destinationClient, - metadataToApply: props.SrcMetadata.Clone(), // We're going to modify it, so we should clone it. + metadataToApply: FixBustedMetadata(props.SrcMetadata), // FixBustedMetadata inherently clones headersToApply: props.SrcHTTPHeaders.ToBlobHTTPHeaders(), blobTagsToApply: props.SrcBlobTags, } diff --git a/ste/sender-blobSymlinks.go b/ste/sender-blobSymlinks.go index 403e887ef..e388eace1 100644 --- a/ste/sender-blobSymlinks.go +++ b/ste/sender-blobSymlinks.go @@ -46,7 +46,7 @@ func newBlobSymlinkSender(jptm IJobPartTransferMgr, destination string, sip ISou jptm: jptm, sip: sip, destinationClient: destinationClient, - metadataToApply: props.SrcMetadata.Clone(), // We're going to modify it, so we should clone it. + metadataToApply: FixBustedMetadata(props.SrcMetadata), // We're going to modify it, so we should clone it. headersToApply: props.SrcHTTPHeaders.ToBlobHTTPHeaders(), blobTagsToApply: props.SrcBlobTags, destBlobTier: destBlobTier, diff --git a/ste/sender-blockBlob.go b/ste/sender-blockBlob.go index 720fcbfd3..a5ec3a032 100644 --- a/ste/sender-blockBlob.go +++ b/ste/sender-blockBlob.go @@ -192,7 +192,7 @@ func newBlockBlobSenderBase(jptm IJobPartTransferMgr, pacer pacer, srcInfoProvid pacer: pacer, blockIDs: make([]string, numChunks), headersToApply: props.SrcHTTPHeaders.ToBlobHTTPHeaders(), - metadataToApply: props.SrcMetadata, + metadataToApply: FixBustedMetadata(props.SrcMetadata), blobTagsToApply: props.SrcBlobTags, destBlobTier: destBlobTier, muBlockIDs: &sync.Mutex{}, diff --git a/ste/sender-pageBlob.go b/ste/sender-pageBlob.go index 91929fa38..12e3dbf7d 100644 --- a/ste/sender-pageBlob.go +++ b/ste/sender-pageBlob.go @@ -130,7 +130,7 @@ func newPageBlobSenderBase(jptm IJobPartTransferMgr, destination string, pacer p numChunks: numChunks, pacer: pacer, headersToApply: props.SrcHTTPHeaders.ToBlobHTTPHeaders(), - metadataToApply: props.SrcMetadata, + metadataToApply: FixBustedMetadata(props.SrcMetadata), blobTagsToApply: props.SrcBlobTags, destBlobTier: destBlobTier, filePacer: NewNullAutoPacer(), // defer creation of real one to Prologue