diff --git a/.github/workflows/goreleaser.yml b/.github/workflows/goreleaser.yml index f6c94a6d6bc..804a73f0e35 100644 --- a/.github/workflows/goreleaser.yml +++ b/.github/workflows/goreleaser.yml @@ -14,7 +14,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v2 with: - go-version: 1.17.3 + go-version: 1.17.6 - name: Run GoReleaser uses: goreleaser/goreleaser-action@v2 with: diff --git a/.goreleaser.yml b/.goreleaser.yml index 193e48347a0..97e4f4eafac 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -2,8 +2,36 @@ before: hooks: - go mod download - ./develop/scripts/create_build_info_data.sh + +archives: + - id: default + builds: + - temporal-server + - tctl + - temporal-cassandra-tool + - temporal-sql-tool + name_template: "{{ .ProjectName }}_{{ .Version }}_{{ .Os }}_{{ .Arch }}" + format_overrides: + - goos: windows + format: zip + files: + - ./config/* + + - id: no-cgo + builds: + - temporal-server-no-cgo + - tctl-no-cgo + - temporal-cassandra-tool-no-cgo + - temporal-sql-tool-no-cgo + name_template: "{{ .ProjectName }}_{{ .Version }}_{{ .Os }}_{{ .Arch }}_no_cgo" + format_overrides: + - goos: windows + format: zip + files: + - ./config/* + builds: - - id: "temporal-server" + - id: temporal-server dir: cmd/server binary: temporal-server goos: @@ -13,7 +41,7 @@ builds: goarch: - amd64 - arm64 - - id: "temporal-server-no-cgo" + - id: temporal-server-no-cgo dir: cmd/server binary: temporal-server env: @@ -23,7 +51,7 @@ builds: goarch: - amd64 - arm64 - - id: "tctl" + - id: tctl dir: cmd/tools/cli binary: tctl goos: @@ -33,7 +61,17 @@ builds: goarch: - amd64 - arm64 - - id: "temporal-cassandra-tool" + - id: tctl-no-cgo + dir: cmd/tools/cli + binary: tctl + env: + - CGO_ENABLED=0 + goos: + - linux + goarch: + - amd64 + - arm64 + - id: temporal-cassandra-tool dir: cmd/tools/cassandra binary: temporal-cassandra-tool goos: @@ -43,7 +81,17 @@ builds: goarch: - amd64 - arm64 - - id: "temporal-sql-tool" + - id: temporal-cassandra-tool-no-cgo + dir: cmd/tools/cassandra + binary: temporal-cassandra-tool + env: + - CGO_ENABLED=0 + goos: + - linux + goarch: + - amd64 + - arm64 + - id: temporal-sql-tool dir: cmd/tools/sql binary: temporal-sql-tool goos: @@ -53,14 +101,23 @@ builds: goarch: - amd64 - arm64 + - id: temporal-sql-tool-no-cgo + dir: cmd/tools/sql + binary: temporal-sql-tool + env: + - CGO_ENABLED=0 + goos: + - linux + goarch: + - amd64 + - arm64 + checksum: name_template: 'checksums.txt' algorithm: sha256 -snapshot: - name_template: "{{ .Tag }}-next" + changelog: - sort: asc - filters: - exclude: - - '^docs:' - - '^test:' + skip: true + +announce: + skip: "true" diff --git a/client/history/metricClient.go b/client/history/metricClient.go index 1d3faa931ca..548484e21ff 100644 --- a/client/history/metricClient.go +++ b/client/history/metricClient.go @@ -27,6 +27,7 @@ package history import ( "context" + "go.temporal.io/api/serviceerror" "google.golang.org/grpc" "go.temporal.io/server/api/historyservice/v1" @@ -617,7 +618,15 @@ func (c *metricClient) finishMetricsRecording( err error, ) { if err != nil { - c.throttledLogger.Error("history client encountered error", tag.Error(err), tag.ErrorType(err)) + switch err.(type) { + case *serviceerror.Canceled, + *serviceerror.DeadlineExceeded, + *serviceerror.NotFound, + *serviceerror.WorkflowExecutionAlreadyStarted: + // noop - not interest and too many logs + default: + c.throttledLogger.Error("history client encountered error", tag.Error(err), tag.ErrorType(err)) + } scope.Tagged(metrics.ServiceErrorTypeTag(err)).IncCounter(metrics.ClientFailures) } stopwatch.Stop() diff --git a/client/matching/metricClient.go b/client/matching/metricClient.go index 8c114b1bb81..16c3d97fb1b 100644 --- a/client/matching/metricClient.go +++ b/client/matching/metricClient.go @@ -28,6 +28,7 @@ import ( "context" "strings" + "go.temporal.io/api/serviceerror" taskqueuepb "go.temporal.io/api/taskqueue/v1" "google.golang.org/grpc" @@ -256,7 +257,16 @@ func (c *metricClient) finishMetricsRecording( err error, ) { if err != nil { - c.throttledLogger.Error("matching client encountered error", tag.Error(err), tag.ErrorType(err)) + switch err.(type) { + case *serviceerror.Canceled, + *serviceerror.DeadlineExceeded, + *serviceerror.NotFound, + *serviceerror.WorkflowExecutionAlreadyStarted: + // noop - not interest and too many logs + default: + + c.throttledLogger.Error("matching client encountered error", tag.Error(err), tag.ErrorType(err)) + } scope.Tagged(metrics.ServiceErrorTypeTag(err)).IncCounter(metrics.ClientFailures) } stopwatch.Stop() diff --git a/common/config/config.go b/common/config/config.go index 96634394d5c..7780e803b4e 100644 --- a/common/config/config.go +++ b/common/config/config.go @@ -116,6 +116,8 @@ type ( Frontend GroupTLS `yaml:"frontend"` // SystemWorker controls TLS setting for System Workers connecting to Frontend. SystemWorker WorkerTLS `yaml:"systemWorker"` + // RemoteFrontendClients controls TLS setting for talking to remote cluster. + RemoteClusters map[string]GroupTLS `yaml:"remoteClusters"` // ExpirationChecks defines settings for periodic checks for expiration of certificates ExpirationChecks CertExpirationValidation `yaml:"expirationChecks"` // Interval between refreshes of certificates loaded from files @@ -484,9 +486,12 @@ func (c *Config) String() string { return maskedYaml } -func (r *GroupTLS) IsEnabled() bool { - return r.Server.KeyFile != "" || r.Server.KeyData != "" || - len(r.Client.RootCAFiles) > 0 || len(r.Client.RootCAData) > 0 || +func (r *GroupTLS) IsServerEnabled() bool { + return r.Server.KeyFile != "" || r.Server.KeyData != "" +} + +func (r *GroupTLS) IsClientEnabled() bool { + return len(r.Client.RootCAFiles) > 0 || len(r.Client.RootCAData) > 0 || r.Client.ForceTLS } diff --git a/common/constants.go b/common/constants.go index a4af323538e..48c446542ab 100644 --- a/common/constants.go +++ b/common/constants.go @@ -111,3 +111,11 @@ const ( // DefaultTransactionSizeLimit is the largest allowed transaction size to persistence DefaultTransactionSizeLimit = 4 * 1024 * 1024 ) + +const ( + // TimeoutFailureTypePrefix is the prefix for timeout failure types + // used in retry policy + // the actual failure type will be prefix + enums.TimeoutType.String() + // e.g. "TemporalTimeout:StartToClose" or "TemporalTimeout:Heartbeat" + TimeoutFailureTypePrefix = "TemporalTimeout:" +) diff --git a/common/headers/versionChecker.go b/common/headers/versionChecker.go index eb4a116ad99..b679c2389a5 100644 --- a/common/headers/versionChecker.go +++ b/common/headers/versionChecker.go @@ -41,8 +41,8 @@ const ( ClientNameTypeScriptSDK = "temporal-typescript" ClientNameCLI = "temporal-cli" - ServerVersion = "1.15.0" - CLIVersion = "1.15.0" + ServerVersion = "1.15.1" + CLIVersion = "1.15.1" // SupportedServerVersions is used by CLI and inter role communication. SupportedServerVersions = ">=1.0.0 <2.0.0" diff --git a/common/metrics/tally/statsd/reporter.go b/common/metrics/tally/statsd/reporter.go index 546d683e97c..096a29c6b3d 100644 --- a/common/metrics/tally/statsd/reporter.go +++ b/common/metrics/tally/statsd/reporter.go @@ -27,6 +27,7 @@ package statsd import ( "bytes" "sort" + "strings" "time" "github.com/cactus/go-statsd-client/statsd" @@ -37,6 +38,7 @@ import ( type temporalTallyStatsdReporter struct { //Wrapper on top of "github.com/uber-go/tally/statsd" tallystatsd tally.StatsReporter + separator string } func (r *temporalTallyStatsdReporter) metricNameWithTags(originalName string, tags map[string]string) string { @@ -63,22 +65,23 @@ func (r *temporalTallyStatsdReporter) metricNameWithTags(originalName string, ta func NewReporter(statsd statsd.Statter, opts tallystatsdreporter.Options) tally.StatsReporter { return &temporalTallyStatsdReporter{ tallystatsd: tallystatsdreporter.NewReporter(statsd, opts), + separator: ".__", } } func (r *temporalTallyStatsdReporter) ReportCounter(name string, tags map[string]string, value int64) { - newName := r.metricNameWithTags(name, tags) - r.tallystatsd.ReportCounter(newName, map[string]string{}, value) + // newName := r.metricNameWithTags(name, tags) + r.tallystatsd.ReportCounter(r.taggedName(name, tags), map[string]string{}, value) } func (r *temporalTallyStatsdReporter) ReportGauge(name string, tags map[string]string, value float64) { - newName := r.metricNameWithTags(name, tags) - r.tallystatsd.ReportGauge(newName, map[string]string{}, value) + // newName := r.metricNameWithTags(name, tags) + r.tallystatsd.ReportGauge(r.taggedName(name, tags), map[string]string{}, value) } func (r *temporalTallyStatsdReporter) ReportTimer(name string, tags map[string]string, interval time.Duration) { - newName := r.metricNameWithTags(name, tags) - r.tallystatsd.ReportTimer(newName, map[string]string{}, interval) + // newName := r.metricNameWithTags(name, tags) + r.tallystatsd.ReportTimer(r.taggedName(name, tags), map[string]string{}, interval) } func (r *temporalTallyStatsdReporter) ReportHistogramValueSamples( @@ -112,3 +115,31 @@ func (r *temporalTallyStatsdReporter) Capabilities() tally.Capabilities { func (r *temporalTallyStatsdReporter) Flush() { r.tallystatsd.Flush() } + +// https://github.com/influxdata/telegraf/blob/master/plugins/inputs/statsd/README.md#influx-statsd +func (r *temporalTallyStatsdReporter) taggedName(name string, tags map[string]string) string { + var b strings.Builder + b.WriteString(name) + for k, v := range tags { + b.WriteString(r.separator) + b.WriteString(replaceChars(k)) + b.WriteByte('=') + b.WriteString(replaceChars(v)) + } + return b.String() +} + +// Replace problematic characters in tags. +func replaceChars(s string) string { + var b strings.Builder + b.Grow(len(s)) + for i := 0; i < len(s); i++ { + switch s[i] { + case '.', ':', '|', '-', '=': + b.WriteByte('_') + default: + b.WriteByte(s[i]) + } + } + return b.String() +} diff --git a/common/persistence/cassandra/util.go b/common/persistence/cassandra/util.go index 414b9d6cc88..681dbe1485c 100644 --- a/common/persistence/cassandra/util.go +++ b/common/persistence/cassandra/util.go @@ -54,7 +54,7 @@ func applyWorkflowMutationBatch( namespaceID, workflowID, runID, - workflowMutation.ExecutionInfo, + workflowMutation.ExecutionInfoBlob, workflowMutation.ExecutionState, workflowMutation.ExecutionStateBlob, workflowMutation.NextEventID, diff --git a/common/persistence/execution_manager.go b/common/persistence/execution_manager.go index ee3267efff5..5dd4bd467cc 100644 --- a/common/persistence/execution_manager.go +++ b/common/persistence/execution_manager.go @@ -454,6 +454,7 @@ func (m *executionManagerImpl) SerializeWorkflowMutation( UpsertRequestCancelInfos: make(map[int64]*commonpb.DataBlob), UpsertSignalInfos: make(map[int64]*commonpb.DataBlob), + ExecutionInfo: input.ExecutionInfo, ExecutionState: input.ExecutionState, DeleteActivityInfos: input.DeleteActivityInfos, @@ -476,7 +477,7 @@ func (m *executionManagerImpl) SerializeWorkflowMutation( NextEventID: input.NextEventID, } - result.ExecutionInfo, err = m.serializer.WorkflowExecutionInfoToBlob(input.ExecutionInfo, enumspb.ENCODING_TYPE_PROTO3) + result.ExecutionInfoBlob, err = m.serializer.WorkflowExecutionInfoToBlob(input.ExecutionInfo, enumspb.ENCODING_TYPE_PROTO3) if err != nil { return nil, err } diff --git a/common/persistence/persistenceInterface.go b/common/persistence/persistenceInterface.go index 1da8a1947ab..16557c9a6fd 100644 --- a/common/persistence/persistenceInterface.go +++ b/common/persistence/persistenceInterface.go @@ -380,7 +380,8 @@ type ( WorkflowID string RunID string - ExecutionInfo *commonpb.DataBlob + ExecutionInfo *persistencespb.WorkflowExecutionInfo + ExecutionInfoBlob *commonpb.DataBlob ExecutionState *persistencespb.WorkflowExecutionState ExecutionStateBlob *commonpb.DataBlob NextEventID int64 diff --git a/common/persistence/size.go b/common/persistence/size.go index d668d1e6a0d..77ddd69f32a 100644 --- a/common/persistence/size.go +++ b/common/persistence/size.go @@ -104,7 +104,7 @@ func statusOfInternalWorkflowMutation( return nil } - executionInfoSize := sizeOfBlob(mutation.ExecutionInfo) + executionInfoSize := sizeOfBlob(mutation.ExecutionInfoBlob) executionStateSize := sizeOfBlob(mutation.ExecutionStateBlob) activityInfoCount := len(mutation.UpsertActivityInfos) diff --git a/common/persistence/sql/execution_util.go b/common/persistence/sql/execution_util.go index 964921f7905..1645d99a577 100644 --- a/common/persistence/sql/execution_util.go +++ b/common/persistence/sql/execution_util.go @@ -86,7 +86,7 @@ func applyWorkflowMutationTx( tx, namespaceID, workflowID, - workflowMutation.ExecutionInfo, + workflowMutation.ExecutionInfoBlob, workflowMutation.ExecutionState, workflowMutation.NextEventID, lastWriteVersion, diff --git a/common/rpc.go b/common/rpc.go index 33a733a65f7..20a374a2711 100644 --- a/common/rpc.go +++ b/common/rpc.go @@ -38,7 +38,7 @@ type ( GetInternodeGRPCServerOptions() ([]grpc.ServerOption, error) GetGRPCListener() net.Listener GetRingpopChannel() *tchannel.Channel - CreateFrontendGRPCConnection(hostName string) *grpc.ClientConn - CreateInternodeGRPCConnection(hostName string) *grpc.ClientConn + CreateFrontendGRPCConnection(rpcAddress string) *grpc.ClientConn + CreateInternodeGRPCConnection(rpcAddress string) *grpc.ClientConn } ) diff --git a/common/rpc/encryption/localStoreTlsProvider.go b/common/rpc/encryption/localStoreTlsProvider.go index 9f4556de16f..65a782fb5cd 100644 --- a/common/rpc/encryption/localStoreTlsProvider.go +++ b/common/rpc/encryption/localStoreTlsProvider.go @@ -51,17 +51,18 @@ type localStoreTlsProvider struct { settings *config.RootTLS - internodeCertProvider CertProvider - internodeClientCertProvider CertProvider - frontendCertProvider CertProvider - workerCertProvider CertProvider - - frontendPerHostCertProviderMap *localStorePerHostCertProviderMap - - cachedInternodeServerConfig *tls.Config - cachedInternodeClientConfig *tls.Config - cachedFrontendServerConfig *tls.Config - cachedFrontendClientConfig *tls.Config + internodeCertProvider CertProvider + internodeClientCertProvider CertProvider + frontendCertProvider CertProvider + workerCertProvider CertProvider + remoteClusterClientCertProvider map[string]CertProvider + frontendPerHostCertProviderMap *localStorePerHostCertProviderMap + + cachedInternodeServerConfig *tls.Config + cachedInternodeClientConfig *tls.Config + cachedFrontendServerConfig *tls.Config + cachedFrontendClientConfig *tls.Config + cachedRemoteClusterClientConfig map[string]*tls.Config ticker *time.Ticker logger log.Logger @@ -84,6 +85,11 @@ func NewLocalStoreTlsProvider(tlsConfig *config.RootTLS, scope metrics.Scope, lo workerProvider = internodeWorkerProvider } + remoteClusterClientCertProvider := make(map[string]CertProvider) + for hostname, groupTLS := range tlsConfig.RemoteClusters { + remoteClusterClientCertProvider[hostname] = certProviderFactory(&groupTLS, nil, nil, tlsConfig.RefreshInterval, logger) + } + provider := &localStoreTlsProvider{ internodeCertProvider: internodeProvider, internodeClientCertProvider: internodeProvider, @@ -91,10 +97,12 @@ func NewLocalStoreTlsProvider(tlsConfig *config.RootTLS, scope metrics.Scope, lo workerCertProvider: workerProvider, frontendPerHostCertProviderMap: newLocalStorePerHostCertProviderMap( tlsConfig.Frontend.PerHostOverrides, certProviderFactory, tlsConfig.RefreshInterval, logger), - RWMutex: sync.RWMutex{}, - settings: tlsConfig, - scope: scope, - logger: logger, + remoteClusterClientCertProvider: remoteClusterClientCertProvider, + RWMutex: sync.RWMutex{}, + settings: tlsConfig, + scope: scope, + logger: logger, + cachedRemoteClusterClientConfig: make(map[string]*tls.Config), } provider.initialize() return provider, nil @@ -130,7 +138,7 @@ func (s *localStoreTlsProvider) GetInternodeClientConfig() (*tls.Config, error) return newClientTLSConfig(s.internodeClientCertProvider, client.ServerName, s.settings.Internode.Server.RequireClientAuth, false, !client.DisableHostVerification) }, - s.settings.Internode.IsEnabled(), + s.settings.Internode.IsClientEnabled(), ) } @@ -143,7 +151,7 @@ func (s *localStoreTlsProvider) GetFrontendClientConfig() (*tls.Config, error) { useTLS = true } else { client = &s.settings.Frontend.Client - useTLS = s.settings.Frontend.IsEnabled() + useTLS = s.settings.Frontend.IsClientEnabled() } return s.getOrCreateConfig( &s.cachedFrontendClientConfig, @@ -155,13 +163,33 @@ func (s *localStoreTlsProvider) GetFrontendClientConfig() (*tls.Config, error) { ) } +func (s *localStoreTlsProvider) GetRemoteClusterClientConfig(hostname string) (*tls.Config, error) { + groupTLS, ok := s.settings.RemoteClusters[hostname] + if !ok { + return nil, nil + } + + return s.getOrCreateRemoteClusterClientConfig( + hostname, + func() (*tls.Config, error) { + return newClientTLSConfig( + s.remoteClusterClientCertProvider[hostname], + groupTLS.Client.ServerName, + groupTLS.Server.RequireClientAuth, + false, + !groupTLS.Client.DisableHostVerification) + }, + groupTLS.IsClientEnabled(), + ) +} + func (s *localStoreTlsProvider) GetFrontendServerConfig() (*tls.Config, error) { return s.getOrCreateConfig( &s.cachedFrontendServerConfig, func() (*tls.Config, error) { return newServerTLSConfig(s.frontendCertProvider, s.frontendPerHostCertProviderMap, &s.settings.Frontend, s.logger) }, - s.settings.Frontend.IsEnabled()) + s.settings.Frontend.IsServerEnabled()) } func (s *localStoreTlsProvider) GetInternodeServerConfig() (*tls.Config, error) { @@ -170,7 +198,7 @@ func (s *localStoreTlsProvider) GetInternodeServerConfig() (*tls.Config, error) func() (*tls.Config, error) { return newServerTLSConfig(s.internodeCertProvider, nil, &s.settings.Internode, s.logger) }, - s.settings.Internode.IsEnabled()) + s.settings.Internode.IsServerEnabled()) } func (s *localStoreTlsProvider) GetExpiringCerts(timeWindow time.Duration, @@ -239,6 +267,41 @@ func (s *localStoreTlsProvider) getOrCreateConfig( return *cachedConfig, nil } +func (s *localStoreTlsProvider) getOrCreateRemoteClusterClientConfig( + hostname string, + configConstructor tlsConfigConstructor, + isEnabled bool, +) (*tls.Config, error) { + if !isEnabled { + return nil, nil + } + + // Check if exists under a read lock first + s.RLock() + if clientConfig, ok := s.cachedRemoteClusterClientConfig[hostname]; ok { + defer s.RUnlock() + return clientConfig, nil + } + // Not found, promote to write lock to initialize + s.RUnlock() + s.Lock() + defer s.Unlock() + // Check if someone got here first while waiting for write lock + if clientConfig, ok := s.cachedRemoteClusterClientConfig[hostname]; ok { + return clientConfig, nil + } + + // Load configuration + localConfig, err := configConstructor() + + if err != nil { + return nil, err + } + + s.cachedRemoteClusterClientConfig[hostname] = localConfig + return localConfig, nil +} + func newServerTLSConfig( certProvider CertProvider, perHostCertProviderMap PerHostCertProviderMap, @@ -321,8 +384,13 @@ func getServerTLSConfigFromCertProvider( logger), nil } -func newClientTLSConfig(clientProvider CertProvider, serverName string, isAuthRequired bool, - isWorker bool, enableHostVerification bool) (*tls.Config, error) { +func newClientTLSConfig( + clientProvider CertProvider, + serverName string, + isAuthRequired bool, + isWorker bool, + enableHostVerification bool, +) (*tls.Config, error) { // Optional ServerCA for client if not already trusted by host serverCa, err := clientProvider.FetchServerRootCAsForClient(isWorker) if err != nil { diff --git a/common/rpc/encryption/testDynamicTLSConfigProvider.go b/common/rpc/encryption/testDynamicTLSConfigProvider.go index 69e9662773d..279ad413433 100644 --- a/common/rpc/encryption/testDynamicTLSConfigProvider.go +++ b/common/rpc/encryption/testDynamicTLSConfigProvider.go @@ -72,6 +72,11 @@ func (t *TestDynamicTLSConfigProvider) GetExpiringCerts(timeWindow time.Duration panic("implement me") } +func (t *TestDynamicTLSConfigProvider) GetRemoteClusterClientConfig(hostName string) (*tls.Config, error) { + //TODO implement me + panic("implement me") +} + var _ TLSConfigProvider = (*TestDynamicTLSConfigProvider)(nil) func NewTestDynamicTLSConfigProvider( diff --git a/common/rpc/encryption/tlsFactory.go b/common/rpc/encryption/tlsFactory.go index 3b97c8dc7c0..5e7c1bf9a6d 100644 --- a/common/rpc/encryption/tlsFactory.go +++ b/common/rpc/encryption/tlsFactory.go @@ -44,6 +44,7 @@ type ( GetInternodeClientConfig() (*tls.Config, error) GetFrontendServerConfig() (*tls.Config, error) GetFrontendClientConfig() (*tls.Config, error) + GetRemoteClusterClientConfig(hostname string) (*tls.Config, error) GetExpiringCerts(timeWindow time.Duration) (expiring CertExpirationMap, expired CertExpirationMap, err error) } diff --git a/common/rpc/encryption/tls_config_test.go b/common/rpc/encryption/tls_config_test.go index 6b11c829942..808f1a41dfc 100644 --- a/common/rpc/encryption/tls_config_test.go +++ b/common/rpc/encryption/tls_config_test.go @@ -51,19 +51,27 @@ func (s *tlsConfigTest) SetupTest() { func (s *tlsConfigTest) TestIsEnabled() { emptyCfg := config.GroupTLS{} - s.False(emptyCfg.IsEnabled()) + s.False(emptyCfg.IsServerEnabled()) + s.False(emptyCfg.IsClientEnabled()) cfg := config.GroupTLS{Server: config.ServerTLS{KeyFile: "foo"}} - s.True(cfg.IsEnabled()) + s.True(cfg.IsServerEnabled()) + s.False(cfg.IsClientEnabled()) cfg = config.GroupTLS{Server: config.ServerTLS{KeyData: "foo"}} - s.True(cfg.IsEnabled()) + s.True(cfg.IsServerEnabled()) + s.False(cfg.IsClientEnabled()) cfg = config.GroupTLS{Client: config.ClientTLS{RootCAFiles: []string{"bar"}}} - s.True(cfg.IsEnabled()) + s.False(cfg.IsServerEnabled()) + s.True(cfg.IsClientEnabled()) cfg = config.GroupTLS{Client: config.ClientTLS{RootCAData: []string{"bar"}}} - s.True(cfg.IsEnabled()) + s.False(cfg.IsServerEnabled()) + s.True(cfg.IsClientEnabled()) cfg = config.GroupTLS{Client: config.ClientTLS{ForceTLS: true}} - s.True(cfg.IsEnabled()) + s.False(cfg.IsServerEnabled()) + s.True(cfg.IsClientEnabled()) cfg = config.GroupTLS{Client: config.ClientTLS{ForceTLS: false}} - s.False(cfg.IsEnabled()) + s.False(cfg.IsServerEnabled()) + s.False(cfg.IsClientEnabled()) + } func (s *tlsConfigTest) TestIsSystemWorker() { diff --git a/common/rpc/rpc.go b/common/rpc/rpc.go index 297144275b2..6938501fe5c 100644 --- a/common/rpc/rpc.go +++ b/common/rpc/rpc.go @@ -31,23 +31,24 @@ import ( "sync" "github.com/uber/tchannel-go" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - + "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/config" "go.temporal.io/server/common/convert" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/rpc/encryption" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" ) // RPCFactory is an implementation of service.RPCFactory interface type RPCFactory struct { - config *config.RPC - serviceName string - logger log.Logger - dc *dynamicconfig.Collection + config *config.RPC + serviceName string + logger log.Logger + dc *dynamicconfig.Collection + clusterMetadata *cluster.Config sync.Mutex grpcListener net.Listener @@ -57,13 +58,21 @@ type RPCFactory struct { // NewFactory builds a new RPCFactory // conforming to the underlying configuration -func NewFactory(cfg *config.RPC, sName string, logger log.Logger, tlsProvider encryption.TLSConfigProvider, dc *dynamicconfig.Collection) *RPCFactory { +func NewFactory( + cfg *config.RPC, + sName string, + logger log.Logger, + tlsProvider encryption.TLSConfigProvider, + dc *dynamicconfig.Collection, + clusterMetadata *cluster.Config, +) *RPCFactory { return &RPCFactory{ - config: cfg, - serviceName: sName, - logger: logger, - dc: dc, - tlsFactory: tlsProvider, + config: cfg, + serviceName: sName, + logger: logger, + dc: dc, + tlsFactory: tlsProvider, + clusterMetadata: clusterMetadata, } } @@ -92,6 +101,14 @@ func (d *RPCFactory) GetFrontendClientTlsConfig() (*tls.Config, error) { return nil, nil } +func (d *RPCFactory) GetRemoteClusterClientConfig(hostname string) (*tls.Config, error) { + if d.tlsFactory != nil { + return d.tlsFactory.GetRemoteClusterClientConfig(hostname) + } + + return nil, nil +} + func (d *RPCFactory) GetInternodeGRPCServerOptions() ([]grpc.ServerOption, error) { var opts []grpc.ServerOption @@ -237,18 +254,29 @@ func getListenIP(cfg *config.RPC, logger log.Logger) net.IP { } // CreateFrontendGRPCConnection creates connection for gRPC calls -func (d *RPCFactory) CreateFrontendGRPCConnection(hostName string) *grpc.ClientConn { +func (d *RPCFactory) CreateFrontendGRPCConnection(rpcAddress string) *grpc.ClientConn { var tlsClientConfig *tls.Config var err error if d.tlsFactory != nil { - tlsClientConfig, err = d.tlsFactory.GetFrontendClientConfig() + currCluster := d.clusterMetadata.ClusterInformation[d.clusterMetadata.CurrentClusterName] + + if currCluster.RPCAddress == rpcAddress { + tlsClientConfig, err = d.tlsFactory.GetFrontendClientConfig() + } else { + hostname, _, err2 := net.SplitHostPort(rpcAddress) + if err2 != nil { + d.logger.Fatal("Invalid rpcAddress for remote cluster", tag.Error(err2)) + } + tlsClientConfig, err = d.tlsFactory.GetRemoteClusterClientConfig(hostname) + } + if err != nil { d.logger.Fatal("Failed to create tls config for gRPC connection", tag.Error(err)) return nil } } - return d.dial(hostName, tlsClientConfig) + return d.dial(rpcAddress, tlsClientConfig) } // CreateInternodeGRPCConnection creates connection for gRPC calls diff --git a/common/rpc/test/rpc_common_test.go b/common/rpc/test/rpc_common_test.go index accc5c21f4c..2ecdfd698d0 100644 --- a/common/rpc/test/rpc_common_test.go +++ b/common/rpc/test/rpc_common_test.go @@ -28,19 +28,19 @@ import ( "context" "crypto/tls" "math/rand" + "net" "strings" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/peer" - "github.com/stretchr/testify/suite" - "google.golang.org/grpc" - "google.golang.org/grpc/examples/helloworld/helloworld" - + "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/config" "go.temporal.io/server/common/convert" "go.temporal.io/server/common/log" "go.temporal.io/server/common/rpc" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/examples/helloworld/helloworld" + "google.golang.org/grpc/peer" ) // HelloServer is used to implement helloworld.GreeterServer. @@ -53,6 +53,7 @@ type ServerUsageType int32 const ( Frontend ServerUsageType = iota Internode + RemoteCluster ) const ( @@ -82,6 +83,10 @@ var ( BroadcastAddress: localhostIPv4, }, } + clusterMetadata = &cluster.Config{ + CurrentClusterName: "test", + ClusterInformation: map[string]cluster.ClusterInformation{"test": {RPCAddress: localhostIPv4 + ":1234"}}, + } ) func startHelloWorldServer(s suite.Suite, factory *TestFactory) (*grpc.Server, string) { @@ -166,15 +171,21 @@ func dialHelloAndGetTLSInfo( logger := log.NewNoopLogger() var cfg *tls.Config var err error - if serverType == Internode { + switch serverType { + case Internode: cfg, err = clientFactory.GetInternodeClientTlsConfig() - } else { + case Frontend: cfg, err = clientFactory.GetFrontendClientTlsConfig() + case RemoteCluster: + host, _, err := net.SplitHostPort(hostport) + s.NoError(err) + cfg, err = clientFactory.GetRemoteClusterClientConfig(host) } - s.NoError(err) + clientConn, err := rpc.Dial(hostport, cfg, logger) s.NoError(err) + client := helloworld.NewGreeterClient(clientConn) request := &helloworld.HelloRequest{Name: convert.Uint64ToString(rand.Uint64())} diff --git a/common/rpc/test/rpc_localstore_tls_test.go b/common/rpc/test/rpc_localstore_tls_test.go index c570eb53dc7..6827b2b4230 100644 --- a/common/rpc/test/rpc_localstore_tls_test.go +++ b/common/rpc/test/rpc_localstore_tls_test.go @@ -76,6 +76,8 @@ type localStoreRPCSuite struct { internodeDynamicTLSFactory *TestFactory internodeMutualTLSRPCRefreshFactory *TestFactory frontendMutualTLSRPCRefreshFactory *TestFactory + remoteClusterMutualTLSRPCFactory *TestFactory + frontendConfigRootCAForceTLSFactory *TestFactory internodeCertDir string frontendCertDir string @@ -101,6 +103,7 @@ type localStoreRPCSuite struct { frontendConfigMutualTLS config.GroupTLS frontendConfigPerHostOverrides config.GroupTLS frontendConfigRootCAOnly config.GroupTLS + frontendConfigRootCAForceTLS config.GroupTLS frontendConfigAltRootCAOnly config.GroupTLS frontendConfigSystemWorker config.WorkerTLS frontendConfigMutualTLSRefresh config.GroupTLS @@ -136,7 +139,7 @@ func (s *localStoreRPCSuite) SetupSuite() { provider, err := encryption.NewTLSConfigProviderFromConfig(serverCfgInsecure.TLS, nil, s.logger, nil) s.NoError(err) - insecureFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + insecureFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(insecureFactory) s.insecureRPCFactory = i(insecureFactory) @@ -201,6 +204,9 @@ func (s *localStoreRPCSuite) SetupSuite() { RootCAData: []string{convertFileToBase64(s.frontendChain.CaPubFile)}, }, } + s.frontendConfigRootCAForceTLS = s.frontendConfigRootCAOnly + s.frontendConfigRootCAForceTLS.Client.ForceTLS = true + s.frontendConfigAltRootCAOnly = config.GroupTLS{ Server: config.ServerTLS{ RequireClientAuth: true, @@ -319,24 +325,39 @@ func (s *localStoreRPCSuite) setupFrontend() { }, } + localStoreRootCAForceTLS := &config.Global{ + Membership: s.membershipConfig, + TLS: config.RootTLS{ + Frontend: s.frontendConfigRootCAForceTLS, + }, + } + + localStoreMutualTLSRemoteCluster := &config.Global{ + Membership: s.membershipConfig, + TLS: config.RootTLS{ + Frontend: s.frontendConfigPerHostOverrides, + RemoteClusters: map[string]config.GroupTLS{localhostIPv4: s.frontendConfigPerHostOverrides}, + }, + } + provider, err := encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLS.TLS, nil, s.logger, nil) s.NoError(err) - frontendMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + frontendMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(frontendMutualTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreServerTLS.TLS, nil, s.logger, nil) s.NoError(err) - frontendServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + frontendServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(frontendServerTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSSystemWorker.TLS, nil, s.logger, nil) s.NoError(err) - frontendSystemWorkerMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + frontendSystemWorkerMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(frontendSystemWorkerMutualTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSWithRefresh.TLS, nil, s.logger, nil) s.NoError(err) - frontendMutualTLSRefreshFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + frontendMutualTLSRefreshFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(frontendMutualTLSRefreshFactory) s.frontendMutualTLSRPCFactory = f(frontendMutualTLSFactory) @@ -350,11 +371,23 @@ func (s *localStoreRPCSuite) setupFrontend() { s.frontendRollingCerts, s.dynamicCACertPool, s.wrongCACertPool) - dynamicServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, s.dynamicConfigProvider, dynamicconfig.NewNoopCollection()) + dynamicServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, s.dynamicConfigProvider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.frontendDynamicTLSFactory = f(dynamicServerTLSFactory) s.internodeDynamicTLSFactory = i(dynamicServerTLSFactory) s.frontendMutualTLSRPCRefreshFactory = f(frontendMutualTLSRefreshFactory) + + provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreRootCAForceTLS.TLS, nil, s.logger, nil) + s.NoError(err) + frontendRootCAForceTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) + s.NotNil(frontendServerTLSFactory) + s.frontendConfigRootCAForceTLSFactory = f(frontendRootCAForceTLSFactory) + + provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSRemoteCluster.TLS, nil, s.logger, nil) + s.NoError(err) + remoteClusterMutualTLSRPCFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) + s.NotNil(remoteClusterMutualTLSRPCFactory) + s.remoteClusterMutualTLSRPCFactory = r(remoteClusterMutualTLSRPCFactory) } func (s *localStoreRPCSuite) setupInternode() { @@ -388,22 +421,22 @@ func (s *localStoreRPCSuite) setupInternode() { provider, err := encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLS.TLS, nil, s.logger, nil) s.NoError(err) - internodeMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + internodeMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(internodeMutualTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreServerTLS.TLS, nil, s.logger, nil) s.NoError(err) - internodeServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + internodeServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(internodeServerTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreAltMutualTLS.TLS, nil, s.logger, nil) s.NoError(err) - internodeMutualAltTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + internodeMutualAltTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(internodeMutualAltTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSWithRefresh.TLS, nil, s.logger, nil) s.NoError(err) - internodeMutualTLSRefreshFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection()) + internodeMutualTLSRefreshFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), clusterMetadata) s.NotNil(internodeMutualTLSRefreshFactory) s.internodeMutualTLSRPCFactory = i(internodeMutualTLSFactory) @@ -436,16 +469,16 @@ func (s *localStoreRPCSuite) setupInternodeRingpop() { provider, err := encryption.NewTLSConfigProviderFromConfig(ringpopMutualTLS.TLS, nil, s.logger, nil) s.NoError(err) - ringpopMutualTLSFactoryA := rpc.NewFactory(rpcCfgA, "tester-A", s.logger, provider, dc) + ringpopMutualTLSFactoryA := rpc.NewFactory(rpcCfgA, "tester-A", s.logger, provider, dc, clusterMetadata) s.NotNil(ringpopMutualTLSFactoryA) - ringpopMutualTLSFactoryB := rpc.NewFactory(rpcCfgB, "tester-B", s.logger, provider, dc) + ringpopMutualTLSFactoryB := rpc.NewFactory(rpcCfgB, "tester-B", s.logger, provider, dc, clusterMetadata) s.NotNil(ringpopMutualTLSFactoryB) provider, err = encryption.NewTLSConfigProviderFromConfig(ringpopServerTLS.TLS, nil, s.logger, nil) s.NoError(err) - ringpopServerTLSFactoryA := rpc.NewFactory(rpcCfgA, "tester-A", s.logger, provider, dc) + ringpopServerTLSFactoryA := rpc.NewFactory(rpcCfgA, "tester-A", s.logger, provider, dc, clusterMetadata) s.NotNil(ringpopServerTLSFactoryA) - ringpopServerTLSFactoryB := rpc.NewFactory(rpcCfgB, "tester-B", s.logger, provider, dc) + ringpopServerTLSFactoryB := rpc.NewFactory(rpcCfgB, "tester-B", s.logger, provider, dc, clusterMetadata) s.NotNil(ringpopServerTLSFactoryB) s.ringpopMutualTLSRPCFactoryA = i(ringpopMutualTLSFactoryA) @@ -562,6 +595,10 @@ func i(r *rpc.RPCFactory) *TestFactory { return &TestFactory{serverUsage: Internode, RPCFactory: r} } +func r(r *rpc.RPCFactory) *TestFactory { + return &TestFactory{serverUsage: RemoteCluster, RPCFactory: r} +} + func convertFileToBase64(file string) string { fileBytes, err := os.ReadFile(file) if err != nil { @@ -587,6 +624,10 @@ func (s *localStoreRPCSuite) TestMutualTLSFrontendToFrontend() { runHelloWorldTest(s.Suite, localhostIPv4, s.frontendMutualTLSRPCFactory, s.frontendMutualTLSRPCFactory, true) } +func (s *localStoreRPCSuite) TestMutualTLSFrontendToRemoteCluster() { + runHelloWorldTest(s.Suite, localhostIPv4, s.remoteClusterMutualTLSRPCFactory, s.remoteClusterMutualTLSRPCFactory, true) +} + func (s *localStoreRPCSuite) TestMutualTLSButClientInsecure() { runHelloWorldTest(s.Suite, localhostIPv4, s.internodeMutualTLSRPCFactory, s.insecureRPCFactory, false) } @@ -789,3 +830,9 @@ func runRingpopTLSTest(s suite.Suite, logger log.Logger, serverA *TestFactory, s s.NoError(err) } } + +func (s *localStoreRPCSuite) TestClientForceTLS() { + options, err := s.frontendConfigRootCAForceTLSFactory.RPCFactory.GetFrontendGRPCServerOptions() + s.NoError(err) + s.Nil(options) +} diff --git a/common/util.go b/common/util.go index 196b71e2ace..21bc461ed41 100644 --- a/common/util.go +++ b/common/util.go @@ -30,6 +30,7 @@ import ( "fmt" "math/rand" "sort" + "strings" "sync" "time" @@ -471,6 +472,17 @@ func ValidateRetryPolicy(policy *commonpb.RetryPolicy) error { if policy.GetMaximumAttempts() < 0 { return serviceerror.NewInvalidArgument("MaximumAttempts cannot be negative on retry policy.") } + + for _, nrt := range policy.NonRetryableErrorTypes { + if strings.HasPrefix(nrt, TimeoutFailureTypePrefix) { + timeoutTypeValue := nrt[len(TimeoutFailureTypePrefix):] + timeoutType, ok := enumspb.TimeoutType_value[timeoutTypeValue] + if !ok || enumspb.TimeoutType(timeoutType) == enumspb.TIMEOUT_TYPE_UNSPECIFIED { + return serviceerror.NewInvalidArgument(fmt.Sprintf("Invalid timeout type value: %v.", timeoutTypeValue)) + } + } + } + return nil } diff --git a/common/util_test.go b/common/util_test.go index fe5a524b4a6..01ecfd31e12 100644 --- a/common/util_test.go +++ b/common/util_test.go @@ -33,6 +33,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/server/common/dynamicconfig" @@ -105,6 +106,42 @@ func TestValidateRetryPolicy(t *testing.T) { wantErr: true, wantErrString: "MaximumAttempts cannot be negative on retry policy.", }, + { + name: "timeout nonretryable error - valid type", + input: &commonpb.RetryPolicy{ + BackoffCoefficient: 1, + NonRetryableErrorTypes: []string{ + TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_START_TO_CLOSE.String(), + TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START.String(), + TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE.String(), + TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_HEARTBEAT.String(), + }, + }, + wantErr: false, + wantErrString: "", + }, + { + name: "timeout nonretryable error - unspecified type", + input: &commonpb.RetryPolicy{ + BackoffCoefficient: 1, + NonRetryableErrorTypes: []string{ + TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_UNSPECIFIED.String(), + }, + }, + wantErr: true, + wantErrString: "Invalid timeout type value: Unspecified.", + }, + { + name: "timeout nonretryable error - unknown type", + input: &commonpb.RetryPolicy{ + BackoffCoefficient: 1, + NonRetryableErrorTypes: []string{ + TimeoutFailureTypePrefix + "unknown", + }, + }, + wantErr: true, + wantErrString: "Invalid timeout type value: unknown.", + }, } for _, tt := range testCases { diff --git a/host/client_integration_test.go b/host/client_integration_test.go index 6991ed0bbe6..d76f92decc1 100644 --- a/host/client_integration_test.go +++ b/host/client_integration_test.go @@ -585,18 +585,16 @@ func (s *clientIntegrationSuite) Test_ActivityTimeouts() { //s.printHistory(id, workflowRun.GetRunID()) } +// This test simulates workflow try to complete itself while there is buffered event. +// Event sequence: +// 1st WorkflowTask runs a local activity. +// While local activity is running, a signal is received by server. +// After signal is received, local activity completed, and workflow drains signal chan (no signal yet) and complete workflow. +// Server failed the complete request because there is unhandled signal. +// Server rescheduled a new workflow task. +// Workflow runs the local activity again and drain the signal chan (with one signal) and complete workflow. +// Server complete workflow as requested. func (s *clientIntegrationSuite) Test_UnhandledCommandAndNewTask() { - /* - Event sequence: - 1st WorkflowTask runs a local activity. - While local activity is running, a signal is received by server. - After signal is received, local activity completed, and workflow drains signal chan (no signal yet) and complete workflow. - Server failed the complete request because there is unhandled signal. - Server rescheduled a new workflow task. - Workflow runs the local activity again and drain the signal chan (with one signal) and complete workflow. - Server complete workflow as requested. - */ - sigReadyToSendChan := make(chan struct{}, 1) sigSendDoneChan := make(chan struct{}) localActivityFn := func(ctx context.Context) error { @@ -694,6 +692,70 @@ func (s *clientIntegrationSuite) Test_UnhandledCommandAndNewTask() { s.assertHistory(id, workflowRun.GetRunID(), expectedHistory) } +// This test simulates workflow generate command with invalid attributes. +// Server is expected to fail the workflow task and schedule a retry immediately for first attempt, +// but if workflow task keeps failing, server will drop the task and wait for timeout to schedule additional retries. +// This is the same behavior as the SDK used to do, but now we would do on server. +func (s *clientIntegrationSuite) Test_InvalidCommandAttribute() { + activityFn := func(ctx context.Context) error { + return nil + } + + var calledTime []time.Time + workflowFn := func(ctx workflow.Context) error { + calledTime = append(calledTime, time.Now().UTC()) + ao := workflow.ActivityOptions{} // invalid activity option without StartToClose timeout + ctx = workflow.WithActivityOptions(ctx, ao) + + err := workflow.ExecuteActivity(ctx, activityFn).Get(ctx, nil) + return err + } + + s.worker.RegisterWorkflow(workflowFn) + s.worker.RegisterActivity(activityFn) + + id := "integration-test-invalid-command-attributes" + workflowOptions := sdkclient.StartWorkflowOptions{ + ID: id, + TaskQueue: s.taskQueue, + // With 3s TaskTimeout and 5s RunTimeout, we expect to see total of 3 attempts. + // First attempt follow by immediate retry follow by timeout and 3rd attempt after WorkflowTaskTimeout. + WorkflowTaskTimeout: 3 * time.Second, + WorkflowRunTimeout: 5 * time.Second, + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + workflowRun, err := s.sdkClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn) + if err != nil { + s.Logger.Fatal("Start workflow failed with err", tag.Error(err)) + } + + s.NotNil(workflowRun) + s.True(workflowRun.GetRunID() != "") + + // wait until workflow close (it will be timeout) + err = workflowRun.Get(ctx, nil) + s.Error(err) + s.Contains(err.Error(), "timeout") + + // verify event sequence + expectedHistory := []enumspb.EventType{ + enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, + enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED, + enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED, + enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED, + enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT, + } + s.assertHistory(id, workflowRun.GetRunID(), expectedHistory) + + // assert workflow task retried 3 times + s.Equal(3, len(calledTime)) + + s.True(calledTime[1].Sub(calledTime[0]) < time.Second) // retry immediately + s.True(calledTime[2].Sub(calledTime[1]) > time.Second*3) // retry after WorkflowTaskTimeout +} + func (s *clientIntegrationSuite) Test_BufferedQuery() { localActivityFn := func(ctx context.Context) error { time.Sleep(5 * time.Second) // use local activity sleep to block workflow task to force query to be buffered diff --git a/service/history/handler.go b/service/history/handler.go index 1c30376a7de..03031cde558 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -1096,6 +1096,10 @@ func (h *Handler) ReplicateEventsV2(ctx context.Context, request *historyservice return nil, errShuttingDown } + if err := h.validateReplicationConfig(); err != nil { + return nil, err + } + namespaceID := namespace.ID(request.GetNamespaceId()) if namespaceID == "" { return nil, h.convertError(errNamespaceNotSet) @@ -1160,6 +1164,10 @@ func (h *Handler) SyncActivity(ctx context.Context, request *historyservice.Sync return nil, errShuttingDown } + if err := h.validateReplicationConfig(); err != nil { + return nil, err + } + namespaceID := namespace.ID(request.GetNamespaceId()) if request.GetNamespaceId() == "" || uuid.Parse(request.GetNamespaceId()) == nil { return nil, h.convertError(errNamespaceNotSet) @@ -1195,6 +1203,9 @@ func (h *Handler) GetReplicationMessages(ctx context.Context, request *historyse if h.isStopped() { return nil, errShuttingDown } + if err := h.validateReplicationConfig(); err != nil { + return nil, err + } var wg sync.WaitGroup wg.Add(len(request.Tokens)) @@ -1248,6 +1259,9 @@ func (h *Handler) GetDLQReplicationMessages(ctx context.Context, request *histor if h.isStopped() { return nil, errShuttingDown } + if err := h.validateReplicationConfig(); err != nil { + return nil, err + } taskInfoPerShard := map[int32][]*replicationspb.ReplicationTaskInfo{} // do batch based on workflow ID and run ID @@ -1487,6 +1501,9 @@ func (h *Handler) GetReplicationStatus( if h.isStopped() { return nil, errShuttingDown } + if err := h.validateReplicationConfig(); err != nil { + return nil, err + } resp := &historyservice.GetReplicationStatusResponse{} for _, shardID := range h.controller.ShardIDs() { @@ -1525,6 +1542,13 @@ func (h *Handler) convertError(err error) error { return err } +func (h *Handler) validateReplicationConfig() error { + if !h.clusterMetadata.IsGlobalNamespaceEnabled() { + return serviceerror.NewUnavailable("The cluster has global namespace disabled. The operation is not supported.") + } + return nil +} + func validateTaskToken(taskToken *tokenspb.Task) error { if taskToken.GetWorkflowId() == "" { return errWorkflowIDNotSet diff --git a/service/history/replicationTaskProcessor.go b/service/history/replicationTaskProcessor.go index 55dda181ea5..0dc01954696 100644 --- a/service/history/replicationTaskProcessor.go +++ b/service/history/replicationTaskProcessor.go @@ -492,7 +492,7 @@ func (p *ReplicationTaskProcessorImpl) cleanupReplicationTasks() error { return nil } - p.logger.Info("cleaning up replication task queue", tag.ReadLevel(*minAckedTaskID)) + p.logger.Debug("cleaning up replication task queue", tag.ReadLevel(*minAckedTaskID)) p.metricsClient.Scope(metrics.ReplicationTaskCleanupScope).IncCounter(metrics.ReplicationTaskCleanupCount) p.metricsClient.Scope( metrics.ReplicationTaskFetcherScope, diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index b6ac2a2bf7d..1beed7d74b8 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -1472,6 +1472,7 @@ func (s *ContextImpl) loadShardMetadata(ownershipChanged *bool) error { s.rLock() if s.state >= contextStateStopping { + s.rUnlock() return errStoppingContext } diff --git a/service/history/timerQueueActiveTaskExecutor.go b/service/history/timerQueueActiveTaskExecutor.go index 2bf7d2d55b6..fab78a6f3fe 100644 --- a/service/history/timerQueueActiveTaskExecutor.go +++ b/service/history/timerQueueActiveTaskExecutor.go @@ -241,7 +241,8 @@ Loop: break Loop } - timeoutFailure := failure.NewTimeoutFailure("activity timeout", timerSequenceID.TimerType) + failureMsg := fmt.Sprintf("activity %v timeout", timerSequenceID.TimerType.String()) + timeoutFailure := failure.NewTimeoutFailure(failureMsg, timerSequenceID.TimerType) var retryState enumspb.RetryState if retryState, err = mutableState.RetryActivity( activityInfo, diff --git a/service/history/workflow/cache.go b/service/history/workflow/cache.go index 5b5e9c54080..6af597ff3dc 100644 --- a/service/history/workflow/cache.go +++ b/service/history/workflow/cache.go @@ -30,6 +30,7 @@ import ( "context" "sync/atomic" "time" + "unicode/utf8" "github.com/pborman/uuid" commonpb "go.temporal.io/api/common/v1" @@ -234,6 +235,11 @@ func (c *CacheImpl) validateWorkflowExecutionInfo( return serviceerror.NewInvalidArgument("Can't load workflow execution. WorkflowId not set.") } + if !utf8.ValidString(execution.GetWorkflowId()) { + // We know workflow cannot exist with invalid utf8 string as WorkflowID. + return serviceerror.NewNotFound("Workflow not exists.") + } + // RunID is not provided, lets try to retrieve the RunID for current active execution if execution.GetRunId() == "" { response, err := c.getCurrentExecutionWithRetry(&persistence.GetCurrentExecutionRequest{ diff --git a/service/history/workflow/retry.go b/service/history/workflow/retry.go index 59c4866bc98..3355f2e160c 100644 --- a/service/history/workflow/retry.go +++ b/service/history/workflow/retry.go @@ -39,6 +39,7 @@ import ( "go.temporal.io/server/api/historyservice/v1" workflowspb "go.temporal.io/server/api/workflow/v1" + "go.temporal.io/server/common" "go.temporal.io/server/common/backoff" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/primitives/timestamp" @@ -118,8 +119,16 @@ func isRetryable(failure *failurepb.Failure, nonRetryableTypes []string) bool { } if failure.GetTimeoutFailureInfo() != nil { - return failure.GetTimeoutFailureInfo().GetTimeoutType() == enumspb.TIMEOUT_TYPE_START_TO_CLOSE || - failure.GetTimeoutFailureInfo().GetTimeoutType() == enumspb.TIMEOUT_TYPE_HEARTBEAT + timeoutType := failure.GetTimeoutFailureInfo().GetTimeoutType() + if timeoutType == enumspb.TIMEOUT_TYPE_START_TO_CLOSE || + timeoutType == enumspb.TIMEOUT_TYPE_HEARTBEAT { + return !matchNonRetryableTypes( + common.TimeoutFailureTypePrefix+timeoutType.String(), + nonRetryableTypes, + ) + } + + return false } if failure.GetServerFailureInfo() != nil { @@ -131,16 +140,26 @@ func isRetryable(failure *failurepb.Failure, nonRetryableTypes []string) bool { return false } - failureType := failure.GetApplicationFailureInfo().GetType() - for _, nrt := range nonRetryableTypes { - if nrt == failureType { - return false - } - } + return !matchNonRetryableTypes( + failure.GetApplicationFailureInfo().GetType(), + nonRetryableTypes, + ) } return true } +func matchNonRetryableTypes( + failureType string, + nonRetryableTypes []string, +) bool { + for _, nrt := range nonRetryableTypes { + if nrt == failureType { + return true + } + } + return false +} + // Helpers for creating new retry/cron workflows: func SetupNewWorkflowForRetryOrCron( diff --git a/service/history/workflow/retry_test.go b/service/history/workflow/retry_test.go index f5adb21bcf6..e06597ca013 100644 --- a/service/history/workflow/retry_test.go +++ b/service/history/workflow/retry_test.go @@ -33,6 +33,7 @@ import ( failurepb "go.temporal.io/api/failure/v1" persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common" "go.temporal.io/server/common/backoff" "go.temporal.io/server/common/clock" "go.temporal.io/server/common/failure" @@ -65,6 +66,7 @@ func Test_IsRetryable(t *testing.T) { }}, } a.True(isRetryable(f, nil)) + a.False(isRetryable(f, []string{common.TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_START_TO_CLOSE.String()})) f = &failurepb.Failure{ FailureInfo: &failurepb.Failure_TimeoutFailureInfo{TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{ @@ -72,6 +74,7 @@ func Test_IsRetryable(t *testing.T) { }}, } a.False(isRetryable(f, nil)) + a.False(isRetryable(f, []string{common.TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START.String()})) f = &failurepb.Failure{ FailureInfo: &failurepb.Failure_TimeoutFailureInfo{TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{ @@ -79,6 +82,7 @@ func Test_IsRetryable(t *testing.T) { }}, } a.False(isRetryable(f, nil)) + a.False(isRetryable(f, []string{common.TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE.String()})) f = &failurepb.Failure{ FailureInfo: &failurepb.Failure_TimeoutFailureInfo{TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{ @@ -86,6 +90,9 @@ func Test_IsRetryable(t *testing.T) { }}, } a.True(isRetryable(f, nil)) + a.False(isRetryable(f, []string{common.TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_HEARTBEAT.String()})) + a.True(isRetryable(f, []string{common.TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_START_TO_CLOSE.String()})) + a.True(isRetryable(f, []string{common.TimeoutFailureTypePrefix + "unknown timeout type string"})) f = &failurepb.Failure{ FailureInfo: &failurepb.Failure_ServerFailureInfo{ServerFailureInfo: &failurepb.ServerFailureInfo{ diff --git a/service/history/workflowTaskHandlerCallbacks.go b/service/history/workflowTaskHandlerCallbacks.go index 70c3fb0fc53..6c5480e143f 100644 --- a/service/history/workflowTaskHandlerCallbacks.go +++ b/service/history/workflowTaskHandlerCallbacks.go @@ -485,6 +485,10 @@ Update_History_Loop: tag.WorkflowID(token.GetWorkflowId()), tag.WorkflowRunID(token.GetRunId()), tag.WorkflowNamespaceID(namespaceID.String())) + if currentWorkflowTask.Attempt > 1 { + // drop this workflow task if it keeps failing. This will cause the workflow task to timeout and get retried after timeout. + return nil, serviceerror.NewInvalidArgument(wtFailedCause.Message()) + } msBuilder, err = handler.historyEngine.failWorkflowTask(weContext, scheduleID, startedID, wtFailedCause, request) if err != nil { return nil, err diff --git a/temporal/fx.go b/temporal/fx.go index 50d53b32314..10521956c0a 100644 --- a/temporal/fx.go +++ b/temporal/fx.go @@ -596,22 +596,15 @@ func ApplyClusterMetadataConfigProvider( // Allow updating cluster metadata if global namespace is disabled if !resp.IsGlobalNamespaceEnabled && clusterData.EnableGlobalNamespace { - resp.InitialFailoverVersion = clusterInfo.InitialFailoverVersion - resp.FailoverVersionIncrement = clusterData.FailoverVersionIncrement + currentMetadata := resp.ClusterMetadata + currentMetadata.IsGlobalNamespaceEnabled = clusterData.EnableGlobalNamespace + currentMetadata.InitialFailoverVersion = clusterInfo.InitialFailoverVersion + currentMetadata.FailoverVersionIncrement = clusterData.FailoverVersionIncrement applied, err = clusterMetadataManager.SaveClusterMetadata( &persistence.SaveClusterMetadataRequest{ - ClusterMetadata: persistencespb.ClusterMetadata{ - HistoryShardCount: resp.HistoryShardCount, - ClusterName: resp.ClusterName, - ClusterId: resp.ClusterId, - ClusterAddress: resp.ClusterAddress, - FailoverVersionIncrement: resp.FailoverVersionIncrement, - InitialFailoverVersion: resp.InitialFailoverVersion, - IsGlobalNamespaceEnabled: clusterData.EnableGlobalNamespace, - IsConnectionEnabled: resp.IsConnectionEnabled, - }, - Version: resp.Version, + ClusterMetadata: currentMetadata, + Version: resp.Version, }) if !applied || err != nil { return config.ClusterMetadata, config.Persistence, fmt.Errorf("error while updating cluster metadata: %w", err) diff --git a/temporal/server_impl.go b/temporal/server_impl.go index 6a7c4850e09..21af9b10cf1 100644 --- a/temporal/server_impl.go +++ b/temporal/server_impl.go @@ -191,7 +191,7 @@ func newBootstrapParams( } svcCfg := cfg.Services[svcName] - rpcFactory := rpc.NewFactory(&svcCfg.RPC, svcName, logger, tlsConfigProvider, dc) + rpcFactory := rpc.NewFactory(&svcCfg.RPC, svcName, logger, tlsConfigProvider, dc, clusterMetadata) params.RPCFactory = rpcFactory // Ringpop uses a different port to register handlers, this map is needed to resolve