Skip to content

Commit

Permalink
More fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
plorenz committed Dec 5, 2024
1 parent e0c64c9 commit 8506358
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 54 deletions.
59 changes: 36 additions & 23 deletions common/router_data_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,36 +50,36 @@ type DataStateIdentity = edge_ctrl_pb.DataState_Identity
type Identity struct {
*DataStateIdentity
ServicePolicies map[string]struct{} `json:"servicePolicies"`
IdentityIndex uint64
ServiceSetIndex uint64
identityIndex uint64
serviceSetIndex uint64
}

type DataStateConfigType = edge_ctrl_pb.DataState_ConfigType

type ConfigType struct {
*DataStateConfigType
Index uint64
index uint64
}

type DataStateConfig = edge_ctrl_pb.DataState_Config

type Config struct {
*DataStateConfig
Index uint64
index uint64
}

type DataStateService = edge_ctrl_pb.DataState_Service

type Service struct {
*DataStateService
Index uint64
index uint64
}

type DataStatePostureCheck = edge_ctrl_pb.DataState_PostureCheck

type PostureCheck struct {
*DataStatePostureCheck
Index uint64
index uint64
}

type DataStateServicePolicy = edge_ctrl_pb.DataState_ServicePolicy
Expand Down Expand Up @@ -355,14 +355,14 @@ func (rdm *RouterDataModel) HandleIdentityEvent(index uint64, event *edge_ctrl_p
identity = &Identity{
DataStateIdentity: model.Identity,
ServicePolicies: map[string]struct{}{},
IdentityIndex: index,
identityIndex: index,
}
} else {
identity = &Identity{
DataStateIdentity: model.Identity,
ServicePolicies: valueInMap.ServicePolicies,
IdentityIndex: index,
ServiceSetIndex: valueInMap.ServiceSetIndex,
identityIndex: index,
serviceSetIndex: valueInMap.serviceSetIndex,
}
}
return identity
Expand All @@ -382,10 +382,13 @@ func (rdm *RouterDataModel) HandleIdentityEvent(index uint64, event *edge_ctrl_p
func (rdm *RouterDataModel) HandleServiceEvent(index uint64, event *edge_ctrl_pb.DataState_Event, model *edge_ctrl_pb.DataState_Event_Service) {
if event.Action == edge_ctrl_pb.DataState_Delete {
rdm.Services.Remove(model.Service.Id)
rdm.ServicePolicies.IterCb(func(key string, v *ServicePolicy) {
delete(v.Services, model.Service.Id)
})
} else {
rdm.Services.Set(model.Service.Id, &Service{
DataStateService: model.Service,
Index: index,
index: index,
})
}
}
Expand All @@ -399,7 +402,7 @@ func (rdm *RouterDataModel) HandleConfigTypeEvent(index uint64, event *edge_ctrl
} else {
rdm.ConfigTypes.Set(model.ConfigType.Id, &ConfigType{
DataStateConfigType: model.ConfigType,
Index: index,
index: index,
})
}
}
Expand All @@ -413,12 +416,12 @@ func (rdm *RouterDataModel) HandleConfigEvent(index uint64, event *edge_ctrl_pb.
} else {
rdm.Configs.Set(model.Config.Id, &Config{
DataStateConfig: model.Config,
Index: index,
index: index,
})
}
}

func (rdm *RouterDataModel) applyUpdateServicePolicyEvent(event *edge_ctrl_pb.DataState_Event, model *edge_ctrl_pb.DataState_Event_ServicePolicy) {
func (rdm *RouterDataModel) applyUpdateServicePolicyEvent(model *edge_ctrl_pb.DataState_Event_ServicePolicy) {
servicePolicy := model.ServicePolicy
rdm.ServicePolicies.Upsert(servicePolicy.Id, nil, func(exist bool, valueInMap *ServicePolicy, newValue *ServicePolicy) *ServicePolicy {
if valueInMap == nil {
Expand All @@ -437,7 +440,7 @@ func (rdm *RouterDataModel) applyUpdateServicePolicyEvent(event *edge_ctrl_pb.Da
})
}

func (rdm *RouterDataModel) applyDeleteServicePolicyEvent(_ *edge_ctrl_pb.DataState_Event, model *edge_ctrl_pb.DataState_Event_ServicePolicy) {
func (rdm *RouterDataModel) applyDeleteServicePolicyEvent(model *edge_ctrl_pb.DataState_Event_ServicePolicy) {
rdm.ServicePolicies.Remove(model.ServicePolicy.Id)
}

Expand All @@ -448,11 +451,11 @@ func (rdm *RouterDataModel) HandleServicePolicyEvent(event *edge_ctrl_pb.DataSta
pfxlog.Logger().WithField("policyId", model.ServicePolicy.Id).WithField("action", event.Action).Debug("applying service policy event")
switch event.Action {
case edge_ctrl_pb.DataState_Create:
rdm.applyUpdateServicePolicyEvent(event, model)
rdm.applyUpdateServicePolicyEvent(model)
case edge_ctrl_pb.DataState_Update:
rdm.applyUpdateServicePolicyEvent(event, model)
rdm.applyUpdateServicePolicyEvent(model)
case edge_ctrl_pb.DataState_Delete:
rdm.applyDeleteServicePolicyEvent(event, model)
rdm.applyDeleteServicePolicyEvent(model)
}
}

Expand All @@ -465,7 +468,7 @@ func (rdm *RouterDataModel) HandlePostureCheckEvent(index uint64, event *edge_ct
} else {
rdm.PostureChecks.Set(model.PostureCheck.Id, &PostureCheck{
DataStatePostureCheck: model.PostureCheck,
Index: index,
index: index,
})
}
}
Expand Down Expand Up @@ -510,7 +513,7 @@ func (rdm *RouterDataModel) HandleServicePolicyChange(index uint64, model *edge_
} else {
delete(valueInMap.ServicePolicies, model.PolicyId)
}
valueInMap.ServiceSetIndex = index
valueInMap.serviceSetIndex = index
}
return valueInMap
})
Expand Down Expand Up @@ -594,7 +597,9 @@ func (rdm *RouterDataModel) recalculateCachedPublicKeys() {
func (rdm *RouterDataModel) GetDataState() *edge_ctrl_pb.DataState {
var events []*edge_ctrl_pb.DataState_Event

rdm.EventCache.WhileLocked(func(_ uint64, _ bool) {
var index uint64
rdm.EventCache.WhileLocked(func(currentIndex uint64, _ bool) {
index = currentIndex
rdm.ConfigTypes.IterCb(func(key string, v *ConfigType) {
newEvent := &edge_ctrl_pb.DataState_Event{
Action: edge_ctrl_pb.DataState_Create,
Expand Down Expand Up @@ -717,10 +722,9 @@ func (rdm *RouterDataModel) GetDataState() *edge_ctrl_pb.DataState {
})
})

idx, _ := rdm.CurrentIndex()
return &edge_ctrl_pb.DataState{
Events: events,
EndIndex: idx,
EndIndex: index,
}
}

Expand Down Expand Up @@ -1098,7 +1102,16 @@ func (self *compareReporter) Report(result cmp.Result) {
}
if step != nil {
vx, vy := step.Values()
err := fmt.Sprintf("%s mismatch. orig: %v, copy: %v", path.String(), vx.Interface(), vy.Interface())
var x any
var y any

if vx.IsValid() {
x = vx.Interface()
}
if vy.IsValid() {
y = vy.Interface()
}
err := fmt.Sprintf("%s mismatch. orig: %v, copy: %v", path.String(), x, y)
self.f(self.key, err)
} else {
self.f(self.key, "programming error, empty path stack")
Expand Down
12 changes: 6 additions & 6 deletions common/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type IdentityService struct {
}

func (self *IdentityService) Equals(other *IdentityService) bool {
if self.Service.Index != other.Service.Index {
if self.Service.index != other.Service.index {
return false
}

Expand Down Expand Up @@ -67,10 +67,10 @@ func (self *IdentityService) Equals(other *IdentityService) bool {
if !ok {
return false
}
if config.Config.Index != otherConfig.Config.Index {
if config.Config.index != otherConfig.Config.index {
return false
}
if config.ConfigType.Index != otherConfig.ConfigType.Index {
if config.ConfigType.index != otherConfig.ConfigType.index {
return false
}
}
Expand Down Expand Up @@ -102,7 +102,7 @@ func (self *IdentitySubscription) identityUpdated(rdm *RouterDataModel, identity
var state *IdentityState
self.Lock()
if self.Identity != nil {
if identity.IdentityIndex > self.Identity.IdentityIndex {
if identity.identityIndex > self.Identity.identityIndex {
self.Identity = identity
notify = true
}
Expand Down Expand Up @@ -198,7 +198,7 @@ func (self *IdentitySubscription) checkForChanges(rdm *RouterDataModel) {
return
}

if oldIdentity.IdentityIndex < newIdentity.IdentityIndex {
if oldIdentity.identityIndex < newIdentity.identityIndex {
for _, subscriber := range self.Listeners.Value() {
subscriber.NotifyIdentityEvent(state, EventIdentityUpdated)
}
Expand Down Expand Up @@ -235,7 +235,7 @@ func (self *IdentitySubscription) checkForChanges(rdm *RouterDataModel) {
checksChanged = true
break
}
if check.Index != newCheck.Index {
if check.index != newCheck.index {
checksChanged = true
break
}
Expand Down
30 changes: 22 additions & 8 deletions controller/sync_strats/sync_instant.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,11 +612,13 @@ func (strategy *InstantStrategy) ReceiveClientHello(routerId string, msg *channe
WithField("os", rtx.Router.VersionInfo.OS).
WithField("arch", rtx.Router.VersionInfo.Arch)

var routerDataModelIndex uint64
if supported, ok := msg.Headers.GetBoolHeader(int32(edge_ctrl_pb.Header_RouterDataModel)); ok && supported {
rtx.SupportsRouterModel = true

if index, ok := msg.Headers.GetUint64Header(int32(edge_ctrl_pb.Header_RouterDataModelIndex)); ok {
rtx.RouterModelIndex = &index
routerDataModelIndex = index
}
}

Expand All @@ -641,7 +643,13 @@ func (strategy *InstantStrategy) ReceiveClientHello(routerId string, msg *channe
rtx.SetVersionInfo(*rtx.Router.VersionInfo)

serverVersion := build.GetBuildInfo().Version()
logger.Infof("edge router sent hello with version [%s] to controller with version [%s]", respHello.Version, serverVersion)

currentIndex, _ := strategy.CurrentIndex()
logger.WithField("routerIndex", routerDataModelIndex).
WithField("dataModelIndex", currentIndex).
WithField("routerVersion", respHello.Version).
WithField("serverVersion", serverVersion).
Info("edge router sent hello")
strategy.queueClientHello(rtx)
}

Expand Down Expand Up @@ -721,6 +729,8 @@ func (strategy *InstantStrategy) synchronize(rtx *RouterSender) {
rtx.RouterModelIndex = nil
events, ok := strategy.RouterDataModel.ReplayFrom(*replayFrom)

logger.WithError(err).Infof("replaying %d router data model events to router", len(events))

if ok {
var err error
for _, curEvent := range events {
Expand Down Expand Up @@ -762,12 +772,12 @@ func (strategy *InstantStrategy) synchronize(rtx *RouterSender) {
Error("could not send data state event for peers")
}
}
}

// no error sync is done, if err try full state
if err == nil {
rtx.SetSyncStatus(env.RouterSyncDone)
return
// no error sync is done, if err try full state
if err == nil {
rtx.SetSyncStatus(env.RouterSyncDone)
return
}
}

pfxlog.Logger().WithError(err).Error("could not send events for router sync, attempting full state")
Expand All @@ -779,9 +789,8 @@ func (strategy *InstantStrategy) synchronize(rtx *RouterSender) {
if dataState == nil {
return
}
dataState.EndIndex = strategy.indexProvider.CurrentIndex()

if err := strategy.sendDataState(rtx, dataState); err != nil {
if err = strategy.sendDataState(rtx, dataState); err != nil {
logger.WithError(err).Error("failure sending full data state")
rtx.SetSyncStatus(env.RouterSyncError)
return
Expand Down Expand Up @@ -1915,6 +1924,11 @@ func (strategy *InstantStrategy) inspect(val string) (bool, *string, error) {
result := string(js)
return true, &result, nil
}
if val == "router-data-model-index" {
idx, _ := strategy.RouterDataModel.CurrentIndex()
strVal := fmt.Sprintf("%d", idx)
return true, &strVal, nil
}
return false, nil, nil
}

Expand Down
4 changes: 4 additions & 0 deletions router/handler_ctrl/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ func (context *inspectRequestContext) processLocal() {
} else if lc == "router-data-model" {
result := context.handler.env.GetRouterDataModel()
context.handleJsonResponse(requested, result)
} else if lc == "router-data-model-index" {
idx, _ := context.handler.env.GetRouterDataModel().CurrentIndex()
strVal := fmt.Sprintf("%d", idx)
context.appendValue(requested, strVal)
} else if lc == "router-controllers" {
result := context.handler.env.GetNetworkControllers().Inspect()
context.handleJsonResponse(requested, result)
Expand Down
1 change: 1 addition & 0 deletions ziti/cmd/fabric/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func newInspectCmd(p common.OptionsProvider) *cobra.Command {
cmd.AddCommand(action.newInspectSubCmd(p, "sdk-terminators", "gets information from routers about their view of sdk terminators"))
cmd.AddCommand(action.newInspectSubCmd(p, "router-messaging", "gets information about pending router peer updates and terminator validations"))
cmd.AddCommand(action.newInspectSubCmd(p, "router-data-model", "gets information about the router data model"))
cmd.AddCommand(action.newInspectSubCmd(p, "router-data-model-index", "gets current index of the router data model"))
cmd.AddCommand(action.newInspectSubCmd(p, "router-controllers", "gets information about the state of a router's connections to its controllers"))
cmd.AddCommand(action.newInspectSubCmd(p, "terminator-costs", "gets information about terminator dynamic costs"))
cmd.AddCommand(action.newInspectSubCmd(p, inspectCommon.RouterIdentityConnectionStatusesKey, "gets information about controller identity state"))
Expand Down
8 changes: 4 additions & 4 deletions zititest/models/router-data-model-test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ var m = &model.Model{
task := createNewService(ctrls.getCtrl("ctrl1"))
tasks = append(tasks, task)
}
return parallel.ExecuteLabeled(tasks, 2)
return parallel.ExecuteLabeled(tasks, 2, nil)
}))

workflow.AddAction(model.ActionFunc(func(run model.Run) error {
Expand All @@ -280,7 +280,7 @@ var m = &model.Model{
task := createNewIdentity(ctrls.getCtrl("ctrl1"))
tasks = append(tasks, task)
}
return parallel.ExecuteLabeled(tasks, 2)
return parallel.ExecuteLabeled(tasks, 2, nil)
}))

workflow.AddAction(model.ActionFunc(func(run model.Run) error {
Expand All @@ -291,10 +291,10 @@ var m = &model.Model{

var tasks []parallel.LabeledTask
for range 100 {
task := createNewService(ctrls.getCtrl("ctrl1"))
task := createNewServicePolicy(ctrls.getCtrl("ctrl1"))
tasks = append(tasks, task)
}
return parallel.ExecuteLabeled(tasks, 2)
return parallel.ExecuteLabeled(tasks, 2, nil)
}))

workflow.AddAction(semaphore.Sleep(2 * time.Second))
Expand Down
Loading

0 comments on commit 8506358

Please sign in to comment.