diff --git a/common/config/value.go b/common/config/value.go new file mode 100644 index 000000000..1869fd6c0 --- /dev/null +++ b/common/config/value.go @@ -0,0 +1,91 @@ +/* + Copyright NetFoundry Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package config + +import ( + "github.com/openziti/foundation/v2/concurrenz" + "sync" +) + +type Listener[T any] interface { + NotifyChanged(init bool, old T, new T) +} + +type ListenerFunc[T any] func(init bool, old T, new T) + +func (f ListenerFunc[T]) NotifyChanged(init bool, old T, new T) { + f(init, old, new) +} + +func NewConfigValue[T comparable]() *Value[T] { + return &Value[T]{ + notifyInitialized: make(chan struct{}), + } +} + +type Value[T comparable] struct { + lock sync.Mutex + initialized bool + notifyInitialized chan struct{} + value concurrenz.AtomicValue[T] + listeners concurrenz.CopyOnWriteSlice[Listener[T]] +} + +func (self *Value[T]) Store(value T) { + self.lock.Lock() + defer self.lock.Unlock() + + first := !self.initialized + old := self.value.Swap(value) + + if first || old != value { + for _, l := range self.listeners.Value() { + l.NotifyChanged(first, old, value) + } + } + + if first { + self.initialized = true + close(self.notifyInitialized) + } +} + +func (self *Value[T]) Load() T { + return self.value.Load() +} + +func (self *Value[T]) AddListener(listener Listener[T]) { + self.lock.Lock() + defer self.lock.Unlock() + + self.listeners.Append(listener) + + if self.initialized { + listener.NotifyChanged(true, self.Load(), self.Load()) + } +} + +func (self *Value[T]) RemoveListener(listener Listener[T]) { + self.lock.Lock() + defer self.lock.Unlock() + + self.listeners.Delete(listener) +} + +func (self *Value[T]) GetInitNotifyChannel() <-chan struct{} { + return self.notifyInitialized +} diff --git a/common/router_data_model.go b/common/router_data_model.go index 7d449b366..6537193b5 100644 --- a/common/router_data_model.go +++ b/common/router_data_model.go @@ -24,6 +24,7 @@ import ( "encoding/json" "fmt" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/michaelquigley/pfxlog" "github.com/openziti/foundation/v2/concurrenz" "github.com/openziti/ziti/common/pb/edge_ctrl_pb" @@ -49,36 +50,36 @@ type DataStateIdentity = edge_ctrl_pb.DataState_Identity type Identity struct { *DataStateIdentity ServicePolicies map[string]struct{} `json:"servicePolicies"` - IdentityIndex uint64 - ServiceSetIndex uint64 + identityIndex uint64 + serviceSetIndex uint64 } type DataStateConfigType = edge_ctrl_pb.DataState_ConfigType type ConfigType struct { *DataStateConfigType - Index uint64 + index uint64 } type DataStateConfig = edge_ctrl_pb.DataState_Config type Config struct { *DataStateConfig - Index uint64 + index uint64 } type DataStateService = edge_ctrl_pb.DataState_Service type Service struct { *DataStateService - Index uint64 + index uint64 } type DataStatePostureCheck = edge_ctrl_pb.DataState_PostureCheck type PostureCheck struct { *DataStatePostureCheck - Index uint64 + index uint64 } type DataStateServicePolicy = edge_ctrl_pb.DataState_ServicePolicy @@ -354,14 +355,14 @@ func (rdm *RouterDataModel) HandleIdentityEvent(index uint64, event *edge_ctrl_p identity = &Identity{ DataStateIdentity: model.Identity, ServicePolicies: map[string]struct{}{}, - IdentityIndex: index, + identityIndex: index, } } else { identity = &Identity{ DataStateIdentity: model.Identity, ServicePolicies: valueInMap.ServicePolicies, - IdentityIndex: index, - ServiceSetIndex: valueInMap.ServiceSetIndex, + identityIndex: index, + serviceSetIndex: valueInMap.serviceSetIndex, } } return identity @@ -381,10 +382,13 @@ func (rdm *RouterDataModel) HandleIdentityEvent(index uint64, event *edge_ctrl_p func (rdm *RouterDataModel) HandleServiceEvent(index uint64, event *edge_ctrl_pb.DataState_Event, model *edge_ctrl_pb.DataState_Event_Service) { if event.Action == edge_ctrl_pb.DataState_Delete { rdm.Services.Remove(model.Service.Id) + rdm.ServicePolicies.IterCb(func(key string, v *ServicePolicy) { + delete(v.Services, model.Service.Id) + }) } else { rdm.Services.Set(model.Service.Id, &Service{ DataStateService: model.Service, - Index: index, + index: index, }) } } @@ -398,7 +402,7 @@ func (rdm *RouterDataModel) HandleConfigTypeEvent(index uint64, event *edge_ctrl } else { rdm.ConfigTypes.Set(model.ConfigType.Id, &ConfigType{ DataStateConfigType: model.ConfigType, - Index: index, + index: index, }) } } @@ -412,12 +416,12 @@ func (rdm *RouterDataModel) HandleConfigEvent(index uint64, event *edge_ctrl_pb. } else { rdm.Configs.Set(model.Config.Id, &Config{ DataStateConfig: model.Config, - Index: index, + index: index, }) } } -func (rdm *RouterDataModel) applyUpdateServicePolicyEvent(event *edge_ctrl_pb.DataState_Event, model *edge_ctrl_pb.DataState_Event_ServicePolicy) { +func (rdm *RouterDataModel) applyUpdateServicePolicyEvent(model *edge_ctrl_pb.DataState_Event_ServicePolicy) { servicePolicy := model.ServicePolicy rdm.ServicePolicies.Upsert(servicePolicy.Id, nil, func(exist bool, valueInMap *ServicePolicy, newValue *ServicePolicy) *ServicePolicy { if valueInMap == nil { @@ -436,7 +440,7 @@ func (rdm *RouterDataModel) applyUpdateServicePolicyEvent(event *edge_ctrl_pb.Da }) } -func (rdm *RouterDataModel) applyDeleteServicePolicyEvent(_ *edge_ctrl_pb.DataState_Event, model *edge_ctrl_pb.DataState_Event_ServicePolicy) { +func (rdm *RouterDataModel) applyDeleteServicePolicyEvent(model *edge_ctrl_pb.DataState_Event_ServicePolicy) { rdm.ServicePolicies.Remove(model.ServicePolicy.Id) } @@ -447,11 +451,11 @@ func (rdm *RouterDataModel) HandleServicePolicyEvent(event *edge_ctrl_pb.DataSta pfxlog.Logger().WithField("policyId", model.ServicePolicy.Id).WithField("action", event.Action).Debug("applying service policy event") switch event.Action { case edge_ctrl_pb.DataState_Create: - rdm.applyUpdateServicePolicyEvent(event, model) + rdm.applyUpdateServicePolicyEvent(model) case edge_ctrl_pb.DataState_Update: - rdm.applyUpdateServicePolicyEvent(event, model) + rdm.applyUpdateServicePolicyEvent(model) case edge_ctrl_pb.DataState_Delete: - rdm.applyDeleteServicePolicyEvent(event, model) + rdm.applyDeleteServicePolicyEvent(model) } } @@ -464,7 +468,7 @@ func (rdm *RouterDataModel) HandlePostureCheckEvent(index uint64, event *edge_ct } else { rdm.PostureChecks.Set(model.PostureCheck.Id, &PostureCheck{ DataStatePostureCheck: model.PostureCheck, - Index: index, + index: index, }) } } @@ -509,7 +513,7 @@ func (rdm *RouterDataModel) HandleServicePolicyChange(index uint64, model *edge_ } else { delete(valueInMap.ServicePolicies, model.PolicyId) } - valueInMap.ServiceSetIndex = index + valueInMap.serviceSetIndex = index } return valueInMap }) @@ -593,7 +597,9 @@ func (rdm *RouterDataModel) recalculateCachedPublicKeys() { func (rdm *RouterDataModel) GetDataState() *edge_ctrl_pb.DataState { var events []*edge_ctrl_pb.DataState_Event - rdm.EventCache.WhileLocked(func(_ uint64, _ bool) { + var index uint64 + rdm.EventCache.WhileLocked(func(currentIndex uint64, _ bool) { + index = currentIndex rdm.ConfigTypes.IterCb(func(key string, v *ConfigType) { newEvent := &edge_ctrl_pb.DataState_Event{ Action: edge_ctrl_pb.DataState_Create, @@ -717,7 +723,8 @@ func (rdm *RouterDataModel) GetDataState() *edge_ctrl_pb.DataState { }) return &edge_ctrl_pb.DataState{ - Events: events, + Events: events, + EndIndex: index, } } @@ -1005,18 +1012,48 @@ func (rdm *RouterDataModel) Diff(o *RouterDataModel, sink DiffSink) { return } - diffType("configType", rdm.ConfigTypes, o.ConfigTypes, sink) - diffType("config", rdm.Configs, o.Configs, sink) - diffType("identity", rdm.Identities, o.Identities, sink) - diffType("service", rdm.Services, o.Services, sink) - diffType("service-policy", rdm.ServicePolicies, o.ServicePolicies, sink) - diffType("posture-check", rdm.PostureChecks, o.PostureChecks, sink) - diffType("public-keys", rdm.PublicKeys, o.PublicKeys, sink) - diffType("revocations", rdm.Revocations, o.Revocations, sink) - diffType("cached-public-keys", rdm.getPublicKeysAsCmap(), o.getPublicKeysAsCmap(), sink) + diffType("configType", rdm.ConfigTypes, o.ConfigTypes, sink, ConfigType{}, DataStateConfigType{}) + diffType("config", rdm.Configs, o.Configs, sink, Config{}, DataStateConfig{}) + diffType("identity", rdm.Identities, o.Identities, sink, Identity{}, DataStateIdentity{}) + diffType("service", rdm.Services, o.Services, sink, Service{}, DataStateService{}) + diffType("service-policy", rdm.ServicePolicies, o.ServicePolicies, sink, ServicePolicy{}, DataStateServicePolicy{}) + diffType("posture-check", rdm.PostureChecks, o.PostureChecks, sink, PostureCheck{}, DataStatePostureCheck{}) + diffType("public-keys", rdm.PublicKeys, o.PublicKeys, sink, edge_ctrl_pb.DataState_PublicKey{}) + diffType("revocations", rdm.Revocations, o.Revocations, sink, edge_ctrl_pb.DataState_Revocation{}) + diffMaps("cached-public-keys", rdm.getPublicKeysAsCmap(), o.getPublicKeysAsCmap(), sink, func(a, b crypto.PublicKey) []string { + if a == nil || b == nil { + return []string{fmt.Sprintf("cached public key is nil: orig: %v, dest: %v", a, a)} + } + return nil + }) +} + +type diffF[T any] func(a, b T) []string + +func diffMaps[T any](entityType string, m1, m2 cmap.ConcurrentMap[string, T], sink DiffSink, differ diffF[T]) { + hasMissing := false + m1.IterCb(func(key string, v T) { + v2, exists := m2.Get(key) + if !exists { + sink(entityType, key, DiffTypeSub, "entity missing") + hasMissing = true + } else { + for _, diff := range differ(v, v2) { + sink(entityType, key, DiffTypeMod, diff) + } + } + }) + + if m1.Count() != m2.Count() || hasMissing { + m2.IterCb(func(key string, v2 T) { + if _, exists := m1.Get(key); !exists { + sink(entityType, key, DiffTypeAdd, "entity unexpected") + } + }) + } } -func diffType[T any](entityType string, m1 cmap.ConcurrentMap[string, T], m2 cmap.ConcurrentMap[string, T], sink DiffSink) { +func diffType[P any, T *P](entityType string, m1 cmap.ConcurrentMap[string, T], m2 cmap.ConcurrentMap[string, T], sink DiffSink, ignoreTypes ...any) { diffReporter := &compareReporter{ f: func(key string, detail string) { sink(entityType, key, DiffTypeMod, detail) @@ -1032,7 +1069,7 @@ func diffType[T any](entityType string, m1 cmap.ConcurrentMap[string, T], m2 cma hasMissing = true } else { diffReporter.key = key - cmp.Diff(v, v2, adapter) + cmp.Diff(v, v2, cmpopts.IgnoreUnexported(ignoreTypes...), adapter) } }) @@ -1065,7 +1102,16 @@ func (self *compareReporter) Report(result cmp.Result) { } if step != nil { vx, vy := step.Values() - err := fmt.Sprintf("%s mismatch. orig: %s, copy: %s", path.String(), vx.String(), vy.String()) + var x any + var y any + + if vx.IsValid() { + x = vx.Interface() + } + if vy.IsValid() { + y = vy.Interface() + } + err := fmt.Sprintf("%s mismatch. orig: %v, copy: %v", path.String(), x, y) self.f(self.key, err) } else { self.f(self.key, "programming error, empty path stack") diff --git a/common/subscriber.go b/common/subscriber.go index 31a0bc11a..091babaf8 100644 --- a/common/subscriber.go +++ b/common/subscriber.go @@ -36,7 +36,7 @@ type IdentityService struct { } func (self *IdentityService) Equals(other *IdentityService) bool { - if self.Service.Index != other.Service.Index { + if self.Service.index != other.Service.index { return false } @@ -67,10 +67,10 @@ func (self *IdentityService) Equals(other *IdentityService) bool { if !ok { return false } - if config.Config.Index != otherConfig.Config.Index { + if config.Config.index != otherConfig.Config.index { return false } - if config.ConfigType.Index != otherConfig.ConfigType.Index { + if config.ConfigType.index != otherConfig.ConfigType.index { return false } } @@ -102,7 +102,7 @@ func (self *IdentitySubscription) identityUpdated(rdm *RouterDataModel, identity var state *IdentityState self.Lock() if self.Identity != nil { - if identity.IdentityIndex > self.Identity.IdentityIndex { + if identity.identityIndex > self.Identity.identityIndex { self.Identity = identity notify = true } @@ -198,7 +198,7 @@ func (self *IdentitySubscription) checkForChanges(rdm *RouterDataModel) { return } - if oldIdentity.IdentityIndex < newIdentity.IdentityIndex { + if oldIdentity.identityIndex < newIdentity.identityIndex { for _, subscriber := range self.Listeners.Value() { subscriber.NotifyIdentityEvent(state, EventIdentityUpdated) } @@ -235,7 +235,7 @@ func (self *IdentitySubscription) checkForChanges(rdm *RouterDataModel) { checksChanged = true break } - if check.Index != newCheck.Index { + if check.index != newCheck.index { checksChanged = true break } @@ -294,7 +294,10 @@ type identityCreatedEvent struct { } func (self identityCreatedEvent) process(rdm *RouterDataModel) { - pfxlog.Logger().WithField("subs", rdm.subscriptions.Count()).WithField("identityId", self.identity.Id).Info("handling identity created event") + pfxlog.Logger(). + WithField("subs", rdm.subscriptions.Count()). + WithField("identityId", self.identity.Id). + Debug("handling identity created event") if sub, found := rdm.subscriptions.Get(self.identity.Id); found { state := sub.initialize(rdm, self.identity) diff --git a/controller/model/controller_manager.go b/controller/model/controller_manager.go index 78aa94b1d..6c4671642 100644 --- a/controller/model/controller_manager.go +++ b/controller/model/controller_manager.go @@ -126,6 +126,11 @@ func (self *ControllerManager) Unmarshall(bytes []byte) (*Controller, error) { return nil, err } + lastJoinedAt := time.Time{} + if msg.LastJoinedAt != nil { + lastJoinedAt = *pbTimeToTimePtr(msg.LastJoinedAt) + } + controller := &Controller{ BaseEntity: models.BaseEntity{ Id: msg.Id, @@ -136,7 +141,7 @@ func (self *ControllerManager) Unmarshall(bytes []byte) (*Controller, error) { CertPem: msg.CertPem, Fingerprint: msg.Fingerprint, IsOnline: msg.IsOnline, - LastJoinedAt: *pbTimeToTimePtr(msg.LastJoinedAt), + LastJoinedAt: lastJoinedAt, ApiAddresses: map[string][]ApiAddress{}, } diff --git a/controller/sync_strats/sync_instant.go b/controller/sync_strats/sync_instant.go index 886761576..2be42a631 100644 --- a/controller/sync_strats/sync_instant.go +++ b/controller/sync_strats/sync_instant.go @@ -612,11 +612,13 @@ func (strategy *InstantStrategy) ReceiveClientHello(routerId string, msg *channe WithField("os", rtx.Router.VersionInfo.OS). WithField("arch", rtx.Router.VersionInfo.Arch) + var routerDataModelIndex uint64 if supported, ok := msg.Headers.GetBoolHeader(int32(edge_ctrl_pb.Header_RouterDataModel)); ok && supported { rtx.SupportsRouterModel = true if index, ok := msg.Headers.GetUint64Header(int32(edge_ctrl_pb.Header_RouterDataModelIndex)); ok { rtx.RouterModelIndex = &index + routerDataModelIndex = index } } @@ -641,7 +643,13 @@ func (strategy *InstantStrategy) ReceiveClientHello(routerId string, msg *channe rtx.SetVersionInfo(*rtx.Router.VersionInfo) serverVersion := build.GetBuildInfo().Version() - logger.Infof("edge router sent hello with version [%s] to controller with version [%s]", respHello.Version, serverVersion) + + currentIndex, _ := strategy.CurrentIndex() + logger.WithField("routerIndex", routerDataModelIndex). + WithField("dataModelIndex", currentIndex). + WithField("routerVersion", respHello.Version). + WithField("serverVersion", serverVersion). + Info("edge router sent hello") strategy.queueClientHello(rtx) } @@ -721,6 +729,8 @@ func (strategy *InstantStrategy) synchronize(rtx *RouterSender) { rtx.RouterModelIndex = nil events, ok := strategy.RouterDataModel.ReplayFrom(*replayFrom) + logger.WithError(err).Infof("replaying %d router data model events to router", len(events)) + if ok { var err error for _, curEvent := range events { @@ -762,12 +772,12 @@ func (strategy *InstantStrategy) synchronize(rtx *RouterSender) { Error("could not send data state event for peers") } } - } - // no error sync is done, if err try full state - if err == nil { - rtx.SetSyncStatus(env.RouterSyncDone) - return + // no error sync is done, if err try full state + if err == nil { + rtx.SetSyncStatus(env.RouterSyncDone) + return + } } pfxlog.Logger().WithError(err).Error("could not send events for router sync, attempting full state") @@ -779,9 +789,8 @@ func (strategy *InstantStrategy) synchronize(rtx *RouterSender) { if dataState == nil { return } - dataState.EndIndex = strategy.indexProvider.CurrentIndex() - if err := strategy.sendDataState(rtx, dataState); err != nil { + if err = strategy.sendDataState(rtx, dataState); err != nil { logger.WithError(err).Error("failure sending full data state") rtx.SetSyncStatus(env.RouterSyncError) return @@ -1915,6 +1924,11 @@ func (strategy *InstantStrategy) inspect(val string) (bool, *string, error) { result := string(js) return true, &result, nil } + if val == "router-data-model-index" { + idx, _ := strategy.RouterDataModel.CurrentIndex() + strVal := fmt.Sprintf("%d", idx) + return true, &strVal, nil + } return false, nil, nil } diff --git a/router/config.go b/router/config.go index 1adeeb755..fbb7f4317 100644 --- a/router/config.go +++ b/router/config.go @@ -179,9 +179,6 @@ type Config struct { InitialDelay time.Duration } } - Ha struct { - Enabled bool - } ConnectEvents env.ConnectEventsConfig Proxy *transport.ProxyConfiguration Plugins []string @@ -201,10 +198,6 @@ func (config *Config) SetFlags(flags map[string]*pflag.Flag) { SetConfigMapFlags(config.src, flags) } -func (config *Config) IsHaEnabled() bool { - return config.Ha.Enabled -} - const ( TimeFormatYear = "2006" TimeFormatMonth = "01" @@ -818,16 +811,6 @@ func LoadConfig(path string) (*Config, error) { } } - if value, found := cfgmap[CtrlHaMapKey]; found { - if haMap, ok := value.(map[interface{}]interface{}); ok { - if enabledValue, found := haMap["enabled"]; found { - if enabled, ok := enabledValue.(bool); ok { - cfg.Ha.Enabled = enabled - } - } - } - } - cfg.ConnectEvents.Enabled = DefaultConnectEventsEnabled cfg.ConnectEvents.BatchInterval = DefaultConnectEventsBatchInterval cfg.ConnectEvents.FullSyncInterval = DefaultConnectEventsFullSyncInterval diff --git a/router/env/env.go b/router/env/env.go index b55fc90d7..9570c910a 100644 --- a/router/env/env.go +++ b/router/env/env.go @@ -24,6 +24,7 @@ import ( "github.com/openziti/identity" "github.com/openziti/metrics" "github.com/openziti/ziti/common" + "github.com/openziti/ziti/common/config" "github.com/openziti/ziti/router/xgress" "github.com/openziti/ziti/router/xlink" "time" @@ -46,6 +47,7 @@ type RouterEnv interface { GetVersionInfo() versions.VersionProvider GetRouterDataModel() *common.RouterDataModel GetConnectEventsConfig() *ConnectEventsConfig + GetRouterDataModelEnabledConfig() *config.Value[bool] } type ConnectEventsConfig struct { diff --git a/router/env/xrctrl.go b/router/env/xrctrl.go index 86b017f1a..72ab0d120 100644 --- a/router/env/xrctrl.go +++ b/router/env/xrctrl.go @@ -33,5 +33,4 @@ type Xrctrl interface { Enabled() bool Run(env RouterEnv) error NotifyOfReconnect(ch channel.Channel) - GetTraceDecoders() []channel.TraceMessageDecoder } diff --git a/router/handler_ctrl/bind.go b/router/handler_ctrl/bind.go index 46e758096..51766d2c1 100644 --- a/router/handler_ctrl/bind.go +++ b/router/handler_ctrl/bind.go @@ -117,5 +117,11 @@ func (self *bindHandler) BindChannel(binding channel.Binding) error { } } + if ok, _ := ctrl.GetVersion().HasMinimumVersion("1.3.0"); ok { + self.env.GetRouterDataModelEnabledConfig().Store(true) + } else { + self.env.GetRouterDataModelEnabledConfig().Store(false) + } + return nil } diff --git a/router/handler_ctrl/inspect.go b/router/handler_ctrl/inspect.go index 3eeab4134..2f2a86c32 100644 --- a/router/handler_ctrl/inspect.go +++ b/router/handler_ctrl/inspect.go @@ -131,6 +131,10 @@ func (context *inspectRequestContext) processLocal() { } else if lc == "router-data-model" { result := context.handler.env.GetRouterDataModel() context.handleJsonResponse(requested, result) + } else if lc == "router-data-model-index" { + idx, _ := context.handler.env.GetRouterDataModel().CurrentIndex() + strVal := fmt.Sprintf("%d", idx) + context.appendValue(requested, strVal) } else if lc == "router-controllers" { result := context.handler.env.GetNetworkControllers().Inspect() context.handleJsonResponse(requested, result) diff --git a/router/handler_edge_ctrl/hello.go b/router/handler_edge_ctrl/hello.go index 686c4da5d..484e89cd5 100644 --- a/router/handler_edge_ctrl/hello.go +++ b/router/handler_edge_ctrl/hello.go @@ -92,14 +92,11 @@ func (h *helloHandler) HandleReceive(msg *channel.Message, ch channel.Channel) { outMsg := protobufs.MarshalTyped(clientHello).ToSendable().Msg() - if h.stateManager.GetEnv().IsHaEnabled() { - if supported, ok := msg.Headers.GetBoolHeader(int32(edge_ctrl_pb.Header_RouterDataModel)); ok && supported { + if supported, ok := msg.Headers.GetBoolHeader(int32(edge_ctrl_pb.Header_RouterDataModel)); ok && supported { + outMsg.Headers.PutBoolHeader(int32(edge_ctrl_pb.Header_RouterDataModel), true) - outMsg.Headers.PutBoolHeader(int32(edge_ctrl_pb.Header_RouterDataModel), true) - - if index, ok := h.stateManager.RouterDataModel().CurrentIndex(); ok { - outMsg.Headers.PutUint64Header(int32(edge_ctrl_pb.Header_RouterDataModelIndex), index) - } + if index, ok := h.stateManager.RouterDataModel().CurrentIndex(); ok { + outMsg.Headers.PutUint64Header(int32(edge_ctrl_pb.Header_RouterDataModelIndex), index) } } diff --git a/router/router.go b/router/router.go index 07af30597..b4855da31 100644 --- a/router/router.go +++ b/router/router.go @@ -73,29 +73,29 @@ import ( ) type Router struct { - config *Config - ctrls env.NetworkControllers - ctrlBindhandler channel.BindHandler - faulter *forwarder.Faulter - forwarder *forwarder.Forwarder - xrctrls []env.Xrctrl - xlinkFactories map[string]xlink.Factory - xlinkListeners []xlink.Listener - xlinkDialers []xlink.Dialer - xlinkRegistry xlink.Registry - xgressListeners []xgress.Listener - linkDialerPool goroutines.Pool - rateLimiterPool goroutines.Pool - ctrlRateLimiter rate.AdaptiveRateLimitTracker - metricsRegistry metrics.UsageRegistry - shutdownC chan struct{} - shutdownDoneC chan struct{} - isShutdown atomic.Bool - metricsReporter metrics.Handler - versionProvider versions.VersionProvider - debugOperations map[byte]func(c *bufio.ReadWriter) error - stateManager state.Manager - + config *Config + ctrls env.NetworkControllers + ctrlBindhandler channel.BindHandler + faulter *forwarder.Faulter + forwarder *forwarder.Forwarder + xrctrls []env.Xrctrl + xlinkFactories map[string]xlink.Factory + xlinkListeners []xlink.Listener + xlinkDialers []xlink.Dialer + xlinkRegistry xlink.Registry + xgressListeners []xgress.Listener + linkDialerPool goroutines.Pool + rateLimiterPool goroutines.Pool + ctrlRateLimiter rate.AdaptiveRateLimitTracker + metricsRegistry metrics.UsageRegistry + shutdownC chan struct{} + shutdownDoneC chan struct{} + isShutdown atomic.Bool + metricsReporter metrics.Handler + versionProvider versions.VersionProvider + debugOperations map[byte]func(c *bufio.ReadWriter) error + stateManager state.Manager + rdmEnabled *config.Value[bool] xwebs []xweb.Instance xwebFactoryRegistry xweb.Registry agentBindHandlers []channel.BindHandler @@ -167,29 +167,33 @@ func (self *Router) GetRouterDataModel() *common.RouterDataModel { return self.stateManager.RouterDataModel() } -func (self *Router) IsHaEnabled() bool { - return self.config.Ha.Enabled +func (self *Router) IsRouterDataModelEnabled() bool { + return self.rdmEnabled.Load() +} + +func (self *Router) GetRouterDataModelEnabledConfig() *config.Value[bool] { + return self.rdmEnabled } func (self *Router) GetConnectEventsConfig() *env.ConnectEventsConfig { return &self.config.ConnectEvents } -func Create(config *Config, versionProvider versions.VersionProvider) *Router { +func Create(cfg *Config, versionProvider versions.VersionProvider) *Router { closeNotify := make(chan struct{}) - if config.Metrics.IntervalAgeThreshold != 0 { - metrics.SetIntervalAgeThreshold(config.Metrics.IntervalAgeThreshold) - logrus.Infof("set interval age threshold to '%v'", config.Metrics.IntervalAgeThreshold) + if cfg.Metrics.IntervalAgeThreshold != 0 { + metrics.SetIntervalAgeThreshold(cfg.Metrics.IntervalAgeThreshold) + logrus.Infof("set interval age threshold to '%v'", cfg.Metrics.IntervalAgeThreshold) } - env.IntervalSize = config.Metrics.ReportInterval - metricsRegistry := metrics.NewUsageRegistry(config.Id.Token, map[string]string{}, closeNotify) + env.IntervalSize = cfg.Metrics.ReportInterval + metricsRegistry := metrics.NewUsageRegistry(cfg.Id.Token, map[string]string{}, closeNotify) xgress.InitMetrics(metricsRegistry) linkDialerPoolConfig := goroutines.PoolConfig{ - QueueSize: uint32(config.Forwarder.LinkDial.QueueLength), + QueueSize: uint32(cfg.Forwarder.LinkDial.QueueLength), MinWorkers: 0, - MaxWorkers: uint32(config.Forwarder.LinkDial.WorkerCount), + MaxWorkers: uint32(cfg.Forwarder.LinkDial.WorkerCount), IdleTime: 30 * time.Second, CloseNotify: closeNotify, PanicHandler: func(err interface{}) { @@ -205,7 +209,7 @@ func Create(config *Config, versionProvider versions.VersionProvider) *Router { } router := &Router{ - config: config, + config: cfg, metricsRegistry: metricsRegistry, shutdownC: closeNotify, shutdownDoneC: make(chan struct{}), @@ -213,15 +217,16 @@ func Create(config *Config, versionProvider versions.VersionProvider) *Router { debugOperations: map[byte]func(c *bufio.ReadWriter) error{}, xwebFactoryRegistry: xweb.NewRegistryMap(), linkDialerPool: linkDialerPool, - ctrlRateLimiter: command.NewAdaptiveRateLimitTracker(config.Ctrl.RateLimit, metricsRegistry, closeNotify), + ctrlRateLimiter: command.NewAdaptiveRateLimitTracker(cfg.Ctrl.RateLimit, metricsRegistry, closeNotify), + rdmEnabled: config.NewConfigValue[bool](), } router.stateManager = state.NewManager(router) - router.ctrls = env.NewNetworkControllers(config.Ctrl.DefaultRequestTimeout, router.connectToController, &config.Ctrl.Heartbeats) + router.ctrls = env.NewNetworkControllers(cfg.Ctrl.DefaultRequestTimeout, router.connectToController, &cfg.Ctrl.Heartbeats) router.xlinkRegistry = link.NewLinkRegistry(router) - router.faulter = forwarder.NewFaulter(router.ctrls, config.Forwarder.FaultTxInterval, closeNotify) - router.forwarder = forwarder.NewForwarder(metricsRegistry, router.faulter, config.Forwarder, closeNotify) + router.faulter = forwarder.NewFaulter(router.ctrls, cfg.Forwarder.FaultTxInterval, closeNotify) + router.forwarder = forwarder.NewForwarder(metricsRegistry, router.faulter, cfg.Forwarder, closeNotify) router.forwarder.StartScanner(router.ctrls) xgress.InitPayloadIngester(closeNotify) diff --git a/router/state/manager.go b/router/state/manager.go index 9cee83ff2..bc8ba845c 100644 --- a/router/state/manager.go +++ b/router/state/manager.go @@ -58,7 +58,7 @@ type RemoveListener func() type DisconnectCB func(token string) type Env interface { - IsHaEnabled() bool + IsRouterDataModelEnabled() bool GetCloseNotify() <-chan struct{} DefaultRequestTimeout() time.Duration } @@ -503,7 +503,7 @@ func NewApiSessionFromToken(jwtToken *jwt.Token, accessClaims *common.AccessClai } func (sm *ManagerImpl) GetApiSession(token string) *ApiSession { - if sm.env.IsHaEnabled() && strings.HasPrefix(token, oidc_auth.JwtTokenPrefix) { + if strings.HasPrefix(token, oidc_auth.JwtTokenPrefix) { jwtToken, accessClaims, err := sm.ParseJwt(token) if err == nil { diff --git a/router/xgress_edge/factory.go b/router/xgress_edge/factory.go index 000085378..261c700e0 100644 --- a/router/xgress_edge/factory.go +++ b/router/xgress_edge/factory.go @@ -25,7 +25,6 @@ import ( "github.com/openziti/metrics" "github.com/openziti/sdk-golang/ziti/edge" "github.com/openziti/transport/v2" - "github.com/openziti/ziti/common" "github.com/openziti/ziti/common/inspect" "github.com/openziti/ziti/common/pb/edge_ctrl_pb" "github.com/openziti/ziti/router" @@ -103,10 +102,6 @@ func (factory *Factory) addReconnectionHandler(h reconnectionHandler) { factory.reconnectionHandlers.Append(h) } -func (factory *Factory) GetTraceDecoders() []channel.TraceMessageDecoder { - return nil -} - func (factory *Factory) Run(env env.RouterEnv) error { factory.stateManager.StartHeartbeat(env, factory.edgeRouterConfig.HeartbeatIntervalSeconds, env.GetCloseNotify()) @@ -141,12 +136,7 @@ func (factory *Factory) LoadConfig(configMap map[interface{}]interface{}) error edgeConfig.Tcfg["protocol"] = append(edgeConfig.Tcfg.Protocols(), "ziti-edge", "") factory.edgeRouterConfig = edgeConfig - - if factory.routerConfig.Ha.Enabled { - factory.stateManager.LoadRouterModel(factory.edgeRouterConfig.Db) - } else { - factory.stateManager.SetRouterDataModel(common.NewReceiverRouterDataModel(state.RouterDataModelListerBufferSize, factory.env.GetCloseNotify())) - } + factory.stateManager.LoadRouterModel(factory.edgeRouterConfig.Db) go apiproxy.Start(edgeConfig) diff --git a/router/xgress_edge/hosted.go b/router/xgress_edge/hosted.go index 17a5b418c..d1c6884bc 100644 --- a/router/xgress_edge/hosted.go +++ b/router/xgress_edge/hosted.go @@ -550,7 +550,7 @@ func (self *hostedServiceRegistry) establishTerminator(terminator *edgeTerminato InstanceSecret: terminator.instanceSecret, } - if self.stateManager.GetEnv().IsHaEnabled() && xgress_common.IsBearerToken(request.SessionToken) { + if xgress_common.IsBearerToken(request.SessionToken) { apiSession := self.stateManager.GetApiSessionFromCh(terminator.Channel) if apiSession == nil { diff --git a/router/xgress_edge/listener.go b/router/xgress_edge/listener.go index 3e81baee8..8045778a3 100644 --- a/router/xgress_edge/listener.go +++ b/router/xgress_edge/listener.go @@ -183,7 +183,7 @@ func (self *edgeClientConn) processConnect(manager state.Manager, req *channel.M PeerData: peerData, } - if manager.GetEnv().IsHaEnabled() && xgress_common.IsBearerToken(sessionToken) { + if xgress_common.IsBearerToken(sessionToken) { apiSession := manager.GetApiSessionFromCh(ch) if apiSession == nil { @@ -369,7 +369,7 @@ func (self *edgeClientConn) processBindV1(manager state.Manager, req *channel.Me InstanceSecret: terminatorIdentitySecret, } - if manager.GetEnv().IsHaEnabled() && xgress_common.IsBearerToken(sessionToken) { + if xgress_common.IsBearerToken(sessionToken) { apiSession := manager.GetApiSessionFromCh(ch) if apiSession == nil { @@ -596,7 +596,7 @@ func (self *edgeClientConn) processUpdateBind(manager state.Manager, req *channe TerminatorId: terminator.terminatorId, } - if manager.GetEnv().IsHaEnabled() && xgress_common.IsBearerToken(sessionToken) { + if xgress_common.IsBearerToken(sessionToken) { apiSession := manager.GetApiSessionFromCh(ch) request.ApiSessionToken = apiSession.Token } @@ -663,7 +663,7 @@ func (self *edgeClientConn) processHealthEvent(manager state.Manager, req *chann log = log.WithField("terminator", terminator.terminatorId).WithField("checkPassed", checkPassed) - if manager.GetEnv().IsHaEnabled() && xgress_common.IsBearerToken(sessionToken) { + if xgress_common.IsBearerToken(sessionToken) { apiSession := manager.GetApiSessionFromCh(ch) request.ApiSessionToken = apiSession.Token } diff --git a/router/xgress_edge_tunnel/fabric.go b/router/xgress_edge_tunnel/fabric.go index 4ad7b5aed..ea66db69a 100644 --- a/router/xgress_edge_tunnel/fabric.go +++ b/router/xgress_edge_tunnel/fabric.go @@ -235,7 +235,7 @@ func (self *fabricProvider) TunnelService(service tunnel.Service, terminatorInst return errors.New(errStr) } - if self.factory.routerConfig.Ha.Enabled { + if self.factory.stateManager.GetEnv().IsRouterDataModelEnabled() { return self.tunnelServiceV2(service, terminatorInstanceId, conn, halfClose, ctrlCh, peerData, keyPair) } diff --git a/router/xgress_edge_tunnel/factory.go b/router/xgress_edge_tunnel/factory.go index 617446e08..54bcf2019 100644 --- a/router/xgress_edge_tunnel/factory.go +++ b/router/xgress_edge_tunnel/factory.go @@ -29,7 +29,6 @@ import ( "github.com/openziti/ziti/router/handler_edge_ctrl" "github.com/openziti/ziti/router/state" "github.com/openziti/ziti/router/xgress" - "github.com/openziti/ziti/router/xgress_edge_tunnel_v2" "github.com/pkg/errors" "strings" "time" @@ -96,10 +95,10 @@ type XrctrlFactory interface { // NewFactory constructs a new Edge Xgress Tunnel Factory instance func NewFactory(env env.RouterEnv, routerConfig *router.Config, stateManager state.Manager) XrctrlFactory { - if routerConfig.Ha.Enabled { - return xgress_edge_tunnel_v2.NewFactory(env, routerConfig, stateManager) - } + return NewFactoryWrapper(env, routerConfig, stateManager) +} +func NewV1Factory(env env.RouterEnv, routerConfig *router.Config, stateManager state.Manager) XrctrlFactory { factory := &Factory{ id: env.GetRouterId(), routerConfig: routerConfig, diff --git a/router/xgress_edge_tunnel/factory_wrapper.go b/router/xgress_edge_tunnel/factory_wrapper.go new file mode 100644 index 000000000..3a7ed073a --- /dev/null +++ b/router/xgress_edge_tunnel/factory_wrapper.go @@ -0,0 +1,202 @@ +package xgress_edge_tunnel + +import ( + "errors" + "github.com/michaelquigley/pfxlog" + "github.com/openziti/channel/v3" + "github.com/openziti/foundation/v2/concurrenz" + "github.com/openziti/ziti/common" + "github.com/openziti/ziti/common/config" + "github.com/openziti/ziti/common/pb/edge_ctrl_pb" + "github.com/openziti/ziti/router" + "github.com/openziti/ziti/router/env" + "github.com/openziti/ziti/router/state" + "github.com/openziti/ziti/router/xgress" + "github.com/openziti/ziti/router/xgress_edge_tunnel_v2" + "time" +) + +type FactoryWrapper struct { + env env.RouterEnv + routerConfig *router.Config + stateManager state.Manager + initDone chan struct{} + delegate concurrenz.AtomicValue[XrctrlFactory] + + listenerOptions chan xgress.OptionsData + listenerArgs chan listenArgs +} + +func (self *FactoryWrapper) LoadConfig(map[interface{}]interface{}) error { + // both v1/v2 currently have empty LoadConfig methods. Will need to update this if that changes. + return nil +} + +func (self *FactoryWrapper) BindChannel(binding channel.Binding) error { + // v1 bindings + binding.AddReceiveHandlerF(int32(edge_ctrl_pb.ContentType_ServiceListType), self.handleV1ServiceListType) + binding.AddReceiveHandlerF(int32(edge_ctrl_pb.ContentType_CreateTunnelTerminatorResponseType), self.handleV1CreateTunnelTerminatorResponse) + + // v2 bindings + binding.AddReceiveHandlerF(int32(edge_ctrl_pb.ContentType_CreateTunnelTerminatorResponseV2Type), self.handleV2CreateTunnelTerminatorResponse) + + return nil +} + +func (self *FactoryWrapper) handleV1ServiceListType(msg *channel.Message, ch channel.Channel) { + if delegate := self.delegate.Load(); delegate != nil { + if v1, ok := delegate.(*Factory); ok { + v1.serviceListHandler.HandleReceive(msg, ch) + } + } +} + +func (self *FactoryWrapper) handleV1CreateTunnelTerminatorResponse(msg *channel.Message, ch channel.Channel) { + if delegate := self.delegate.Load(); delegate != nil { + if v1, ok := delegate.(*Factory); ok { + v1.tunneler.fabricProvider.HandleTunnelResponse(msg, ch) + } + } +} + +func (self *FactoryWrapper) handleV2CreateTunnelTerminatorResponse(msg *channel.Message, ch channel.Channel) { + if delegate := self.delegate.Load(); delegate != nil { + if v2, ok := delegate.(*xgress_edge_tunnel_v2.Factory); ok { + v2.HandleCreateTunnelTerminatorResponse(msg, ch) + } + } +} + +func (self *FactoryWrapper) Enabled() bool { + return true +} + +func (self *FactoryWrapper) Run(env.RouterEnv) error { + // we'll call run when initialization is complete + return nil +} + +func (self *FactoryWrapper) NotifyOfReconnect(ch channel.Channel) { + if delegate := self.delegate.Load(); delegate != nil { + delegate.NotifyOfReconnect(ch) + } +} + +func NewFactoryWrapper(env env.RouterEnv, routerConfig *router.Config, stateManager state.Manager) XrctrlFactory { + wrapper := &FactoryWrapper{ + env: env, + routerConfig: routerConfig, + stateManager: stateManager, + initDone: make(chan struct{}), + listenerOptions: make(chan xgress.OptionsData, 5), + listenerArgs: make(chan listenArgs, 5), + } + + env.GetRouterDataModelEnabledConfig().AddListener(config.ListenerFunc[bool](func(init bool, old bool, new bool) { + if !init && old != new { + pfxlog.Logger() + } + })) + + go func() { + defer close(wrapper.initDone) + + log := pfxlog.Logger() + + select { + case <-env.GetRouterDataModelEnabledConfig().GetInitNotifyChannel(): + case <-env.GetCloseNotify(): + return + } + + var factory XrctrlFactory + if env.GetRouterDataModelEnabledConfig().Load() { + log.Info("router data model enabled, using xgress_edge_tunnel_v2") + factory = xgress_edge_tunnel_v2.NewFactory(env, routerConfig, stateManager) + } else { + log.Info("router data model NOT enabled, using xgress_edge_tunnel") + factory = NewV1Factory(env, routerConfig, stateManager) + } + + wrapper.delegate.Store(factory) + + done := false + for !done { + select { + case options := <-wrapper.listenerOptions: + listener, err := factory.CreateListener(options) + if err != nil { + log.WithField("binding", common.TunnelBinding).WithError(err).Fatal("error creating listener") + return + } + + select { + case args := <-wrapper.listenerArgs: + args.delegate.delegate.Store(listener) + err = listener.Listen(args.address, args.bindHandler) + if err != nil { + log.WithField("binding", common.TunnelBinding).WithError(err).Fatal("error starting listener") + return + } + case <-time.After(time.Second * 5): + log.WithField("binding", common.TunnelBinding).WithError(err).Fatal("timeout waiting for start to be called on listener") + return + } + default: + done = true + } + + } + + _ = env.GetNetworkControllers().AnyValidCtrlChannel() + if err := factory.Run(env); err != nil { + log.WithError(err).Fatal("error starting") + } + }() + + return wrapper +} + +func (self *FactoryWrapper) CreateListener(optionsData xgress.OptionsData) (xgress.Listener, error) { + self.listenerOptions <- optionsData + return &delegatingListener{ + factory: self, + options: optionsData, + }, nil +} + +func (self *FactoryWrapper) CreateDialer(optionsData xgress.OptionsData) (xgress.Dialer, error) { + if delegate := self.delegate.Load(); delegate != nil { + return delegate.CreateDialer(optionsData) + } + + return nil, errors.New("initialization incomplete, unable to create dialer") +} + +type delegatingListener struct { + factory *FactoryWrapper + options xgress.OptionsData + delegate concurrenz.AtomicValue[xgress.Listener] +} + +type listenArgs struct { + address string + bindHandler xgress.BindHandler + delegate *delegatingListener +} + +func (self *delegatingListener) Listen(address string, bindHandler xgress.BindHandler) error { + self.factory.listenerArgs <- listenArgs{ + address: address, + bindHandler: bindHandler, + delegate: self, + } + return nil +} + +func (self *delegatingListener) Close() error { + if listener := self.delegate.Load(); listener != nil { + return listener.Close() + } + return nil +} diff --git a/router/xgress_edge_tunnel_v2/factory.go b/router/xgress_edge_tunnel_v2/factory.go index 6cb7a4247..6319a2f8d 100644 --- a/router/xgress_edge_tunnel_v2/factory.go +++ b/router/xgress_edge_tunnel_v2/factory.go @@ -26,7 +26,6 @@ import ( "github.com/openziti/ziti/common/pb/edge_ctrl_pb" "github.com/openziti/ziti/router" "github.com/openziti/ziti/router/env" - "github.com/openziti/ziti/router/handler_edge_ctrl" "github.com/openziti/ziti/router/state" "github.com/openziti/ziti/router/xgress" "github.com/pkg/errors" @@ -42,14 +41,13 @@ const ( ) type Factory struct { - id *identity.TokenId - ctrls env.NetworkControllers - routerConfig *router.Config - stateManager state.Manager - serviceListHandler *handler_edge_ctrl.ServiceListHandler - tunneler *tunneler - metricsRegistry metrics.UsageRegistry - env env.RouterEnv + id *identity.TokenId + ctrls env.NetworkControllers + routerConfig *router.Config + stateManager state.Manager + tunneler *tunneler + metricsRegistry metrics.UsageRegistry + env env.RouterEnv } func (self *Factory) NotifyOfReconnect(channel.Channel) { @@ -57,20 +55,19 @@ func (self *Factory) NotifyOfReconnect(channel.Channel) { self.tunneler.HandleReconnect() } -func (self *Factory) GetTraceDecoders() []channel.TraceMessageDecoder { - return nil -} - func (self *Factory) Enabled() bool { return true } func (self *Factory) BindChannel(binding channel.Binding) error { - binding.AddTypedReceiveHandler(self.serviceListHandler) binding.AddReceiveHandlerF(int32(edge_ctrl_pb.ContentType_CreateTunnelTerminatorResponseV2Type), self.tunneler.fabricProvider.HandleTunnelResponse) return nil } +func (self *Factory) HandleCreateTunnelTerminatorResponse(msg *channel.Message, ch channel.Channel) { + self.tunneler.fabricProvider.HandleTunnelResponse(msg, ch) +} + func (self *Factory) Run(env env.RouterEnv) error { self.ctrls = env.GetNetworkControllers() if self.tunneler.listenOptions != nil { diff --git a/tests/context.go b/tests/context.go index d08cde88e..6a4872c3f 100644 --- a/tests/context.go +++ b/tests/context.go @@ -530,9 +530,7 @@ func (ctx *TestContext) CreateEnrollAndStartEdgeRouter(roleAttributes ...string) func (ctx *TestContext) CreateEnrollAndStartHAEdgeRouter(roleAttributes ...string) *router.Router { ctx.shutdownRouters() ctx.createAndEnrollEdgeRouter(false, roleAttributes...) - return ctx.startEdgeRouter(func(r *router.Config) { - r.Ha.Enabled = true - }) + return ctx.startEdgeRouter(nil) } func (ctx *TestContext) startEdgeRouter(cfgTweaks func(*router.Config)) *router.Router { diff --git a/ziti/cmd/fabric/inspect.go b/ziti/cmd/fabric/inspect.go index 72961ccf1..a6406516d 100644 --- a/ziti/cmd/fabric/inspect.go +++ b/ziti/cmd/fabric/inspect.go @@ -35,6 +35,7 @@ func newInspectCmd(p common.OptionsProvider) *cobra.Command { cmd.AddCommand(action.newInspectSubCmd(p, "sdk-terminators", "gets information from routers about their view of sdk terminators")) cmd.AddCommand(action.newInspectSubCmd(p, "router-messaging", "gets information about pending router peer updates and terminator validations")) cmd.AddCommand(action.newInspectSubCmd(p, "router-data-model", "gets information about the router data model")) + cmd.AddCommand(action.newInspectSubCmd(p, "router-data-model-index", "gets current index of the router data model")) cmd.AddCommand(action.newInspectSubCmd(p, "router-controllers", "gets information about the state of a router's connections to its controllers")) cmd.AddCommand(action.newInspectSubCmd(p, "terminator-costs", "gets information about terminator dynamic costs")) cmd.AddCommand(action.newInspectSubCmd(p, inspectCommon.RouterIdentityConnectionStatusesKey, "gets information about controller identity state")) diff --git a/zititest/models/router-data-model-test/configs/router.yml.tmpl b/zititest/models/router-data-model-test/configs/router.yml.tmpl index 8e42c4598..a652ee6df 100644 --- a/zititest/models/router-data-model-test/configs/router.yml.tmpl +++ b/zititest/models/router-data-model-test/configs/router.yml.tmpl @@ -6,10 +6,8 @@ v: 3 enableDebugOps: true -{{if .Component.GetFlag "ha"}} ha: enabled: true -{{end}} identity: cert: /home/{{$ssh_username}}/fablab/cfg/{{$identity}}-client.cert diff --git a/zititest/models/router-data-model-test/main.go b/zititest/models/router-data-model-test/main.go index 86e387fcc..eaf8c6695 100644 --- a/zititest/models/router-data-model-test/main.go +++ b/zititest/models/router-data-model-test/main.go @@ -266,7 +266,7 @@ var m = &model.Model{ task := createNewService(ctrls.getCtrl("ctrl1")) tasks = append(tasks, task) } - return parallel.ExecuteLabeled(tasks, 2) + return parallel.ExecuteLabeled(tasks, 2, nil) })) workflow.AddAction(model.ActionFunc(func(run model.Run) error { @@ -280,7 +280,7 @@ var m = &model.Model{ task := createNewIdentity(ctrls.getCtrl("ctrl1")) tasks = append(tasks, task) } - return parallel.ExecuteLabeled(tasks, 2) + return parallel.ExecuteLabeled(tasks, 2, nil) })) workflow.AddAction(model.ActionFunc(func(run model.Run) error { @@ -291,10 +291,10 @@ var m = &model.Model{ var tasks []parallel.LabeledTask for range 100 { - task := createNewService(ctrls.getCtrl("ctrl1")) + task := createNewServicePolicy(ctrls.getCtrl("ctrl1")) tasks = append(tasks, task) } - return parallel.ExecuteLabeled(tasks, 2) + return parallel.ExecuteLabeled(tasks, 2, nil) })) workflow.AddAction(semaphore.Sleep(2 * time.Second)) diff --git a/zititest/models/router-data-model-test/validation.go b/zititest/models/router-data-model-test/validation.go index accd82040..c72961a56 100644 --- a/zititest/models/router-data-model-test/validation.go +++ b/zititest/models/router-data-model-test/validation.go @@ -26,12 +26,15 @@ import ( "github.com/openziti/edge-api/rest_model" "github.com/openziti/fablab/kernel/lib/parallel" "github.com/openziti/fablab/kernel/model" + "github.com/openziti/foundation/v2/errorz" "github.com/openziti/ziti/common/pb/mgmt_pb" + "github.com/openziti/ziti/ziti/util" "github.com/openziti/ziti/zitirest" "github.com/openziti/ziti/zititest/zitilab/chaos" "github.com/openziti/ziti/zititest/zitilab/models" "google.golang.org/protobuf/proto" "math/rand" + "strings" "sync" "time" ) @@ -117,7 +120,24 @@ func sowChaos(run model.Run) error { } chaos.Randomize(tasks) - return parallel.ExecuteLabeled(tasks, 2) + + retryPolicy := func(task parallel.LabeledTask, attempt int, err error) parallel.ErrorAction { + if strings.HasPrefix(task.Type(), "delete.") { + var apiErr util.ApiErrorPayload + if errors.As(err, &apiErr) { + if apiErr.GetPayload().Error.Code == errorz.NotFoundCode { + return parallel.ErrActionIgnore + } + } + } + if attempt > 3 { + return parallel.ErrActionReport + } + pfxlog.Logger().WithField("attempt", attempt).WithError(err).WithField("task", task.Label()).Error("action failed, retrying") + time.Sleep(time.Second) + return parallel.ErrActionRetry + } + return parallel.ExecuteLabeled(tasks, 2, retryPolicy) } func getRestartTasks(run model.Run, _ *CtrlClients) ([]parallel.LabeledTask, error) { @@ -135,7 +155,7 @@ func getRestartTasks(run model.Run, _ *CtrlClients) ([]parallel.LabeledTask, err return nil, err } for _, controller := range controllers { - result = append(result, parallel.TaskWithLabel(fmt.Sprintf("restart controller %s", controller.Id), func() error { + result = append(result, parallel.TaskWithLabel("restart.ctrl", fmt.Sprintf("restart controller %s", controller.Id), func() error { return chaos.RestartSelected(run, 1, controller) })) } @@ -148,7 +168,7 @@ func getRestartTasks(run model.Run, _ *CtrlClients) ([]parallel.LabeledTask, err return nil, err } for _, router := range routers { - result = append(result, parallel.TaskWithLabel(fmt.Sprintf("restart router %s", router.Id), func() error { + result = append(result, parallel.TaskWithLabel("restart.router", fmt.Sprintf("restart router %s", router.Id), func() error { return chaos.RestartSelected(run, 1, router) })) } @@ -204,13 +224,13 @@ func getServiceChaosTasks(_ model.Run, ctrls *CtrlClients) ([]parallel.LabeledTa var result []parallel.LabeledTask for i := 0; i < 5; i++ { - result = append(result, parallel.TaskWithLabel(fmt.Sprintf("delete service %s", *svcs[i].ID), func() error { + result = append(result, parallel.TaskWithLabel("delete.service", fmt.Sprintf("delete service %s", *svcs[i].ID), func() error { return models.DeleteService(ctrls.getRandomCtrl(), *svcs[i].ID, 15*time.Second) })) } for i := 5; i < 10; i++ { - result = append(result, parallel.TaskWithLabel(fmt.Sprintf("modify service %s", *svcs[i].ID), func() error { + result = append(result, parallel.TaskWithLabel("modify.service", fmt.Sprintf("modify service %s", *svcs[i].ID), func() error { svc := svcs[i] svc.RoleAttributes = getRoleAttributesAsAttrPtr(3) svc.Name = newId() @@ -226,7 +246,7 @@ func getServiceChaosTasks(_ model.Run, ctrls *CtrlClients) ([]parallel.LabeledTa } func getIdentityChaosTasks(_ model.Run, ctrls *CtrlClients) ([]parallel.LabeledTask, error) { - entities, err := models.ListIdentities(ctrls.getRandomCtrl(), "limit none", 15*time.Second) + entities, err := models.ListIdentities(ctrls.getRandomCtrl(), "not isAdmin limit none", 15*time.Second) if err != nil { return nil, err } @@ -235,13 +255,13 @@ func getIdentityChaosTasks(_ model.Run, ctrls *CtrlClients) ([]parallel.LabeledT var result []parallel.LabeledTask for i := 0; i < 5; i++ { - result = append(result, parallel.TaskWithLabel(fmt.Sprintf("delete identity %s", *entities[i].ID), func() error { + result = append(result, parallel.TaskWithLabel("delete.identity", fmt.Sprintf("delete identity %s", *entities[i].ID), func() error { return models.DeleteIdentity(ctrls.getRandomCtrl(), *entities[i].ID, 15*time.Second) })) } for i := 5; i < 10; i++ { - result = append(result, parallel.TaskWithLabel(fmt.Sprintf("modify identity %s", *entities[i].ID), func() error { + result = append(result, parallel.TaskWithLabel("modify.identity", fmt.Sprintf("modify identity %s", *entities[i].ID), func() error { entity := entities[i] entity.RoleAttributes = getRoleAttributesAsAttrPtr(3) entity.Name = newId() @@ -266,13 +286,13 @@ func getServicePolicyChaosTasks(_ model.Run, ctrls *CtrlClients) ([]parallel.Lab var result []parallel.LabeledTask for i := 0; i < 5; i++ { - result = append(result, parallel.TaskWithLabel(fmt.Sprintf("delete service policy %s", *entities[i].ID), func() error { + result = append(result, parallel.TaskWithLabel("delete.service-policy", fmt.Sprintf("delete service policy %s", *entities[i].ID), func() error { return models.DeleteServicePolicy(ctrls.getRandomCtrl(), *entities[i].ID, 15*time.Second) })) } for i := 5; i < 10; i++ { - result = append(result, parallel.TaskWithLabel(fmt.Sprintf("modify service policy %s", *entities[i].ID), func() error { + result = append(result, parallel.TaskWithLabel("modify.service-policy", fmt.Sprintf("modify service policy %s", *entities[i].ID), func() error { entity := entities[i] entity.IdentityRoles = getRoles(3) entity.ServiceRoles = getRoles(3) @@ -290,7 +310,7 @@ func getServicePolicyChaosTasks(_ model.Run, ctrls *CtrlClients) ([]parallel.Lab } func createNewService(ctrl *zitirest.Clients) parallel.LabeledTask { - return parallel.TaskWithLabel("create new service", func() error { + return parallel.TaskWithLabel("create.service", "create new service", func() error { svc := &rest_model.ServiceCreate{ Configs: nil, EncryptionRequired: newBoolPtr(), @@ -305,7 +325,7 @@ func createNewService(ctrl *zitirest.Clients) parallel.LabeledTask { func createNewIdentity(ctrl *zitirest.Clients) parallel.LabeledTask { isAdmin := false identityType := rest_model.IdentityTypeDefault - return parallel.TaskWithLabel("create new identity", func() error { + return parallel.TaskWithLabel("create.identity", "create new identity", func() error { svc := &rest_model.IdentityCreate{ DefaultHostingCost: nil, DefaultHostingPrecedence: "", @@ -322,7 +342,7 @@ func createNewIdentity(ctrl *zitirest.Clients) parallel.LabeledTask { } func createNewServicePolicy(ctrl *zitirest.Clients) parallel.LabeledTask { - return parallel.TaskWithLabel("create new service policy", func() error { + return parallel.TaskWithLabel("create.service-policy", "create new service policy", func() error { anyOf := rest_model.SemanticAnyOf policyType := rest_model.DialBindDial if rand.Int()%2 == 0 {