Skip to content

Commit

Permalink
Fix responsive check. Fixes #2171. Add router-connections inspection. F…
Browse files Browse the repository at this point in the history
…ixes #2219. Improve can't load data file log message. Fixes #2195
  • Loading branch information
plorenz committed Jul 12, 2024
1 parent fca0bd7 commit ea109bb
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 41 deletions.
31 changes: 31 additions & 0 deletions common/inspect/controller_inspections.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
Copyright NetFoundry Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package inspect

type ControllerInspectDetails struct {
Controllers map[string]*ControllerInspectDetail `json:"controllers"`
}

type ControllerInspectDetail struct {
ControllerId string `json:"controllerId"`
IsConnected bool `json:"connected"`
IsResponsive bool `json:"responsive"`
Address string `json:"address"`
Latency string `json:"latency"`
Version string `json:"version"`
TimeSinceLastContact string `json:"timeSinceLastContact"`
}
10 changes: 8 additions & 2 deletions router/env/ctrl.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type NetworkController interface {
isMoreResponsive(other NetworkController) bool
GetVersion() *versions.VersionInfo
TimeSinceLastContact() time.Duration
IsConnected() bool
}

type networkCtrl struct {
Expand Down Expand Up @@ -111,13 +112,18 @@ func (self *networkCtrl) CheckHeartBeat() {
} else if self.lastTx > 0 && self.lastRx < self.lastTx && (time.Now().UnixMilli()-self.lastTx) > 5000 {
// if we've sent a heartbeat and not gotten a response in over 5s, consider ourselves unresponsive
self.unresponsive.Store(true)
} else if connectable, ok := self.ch.Underlay().(interface{ IsConnected() bool }); ok && !connectable.IsConnected() {
self.unresponsive.Store(false)
} else if !self.IsConnected() {
self.unresponsive.Store(true)
} else {
self.unresponsive.Store(false)
}
}

func (self *networkCtrl) IsConnected() bool {
connectable, ok := self.ch.Underlay().(interface{ IsConnected() bool })
return ok && connectable.IsConnected()
}

func NewDefaultHeartbeatOptions() *HeartbeatOptions {
return &HeartbeatOptions{
HeartbeatOptions: *channel.DefaultHeartbeatOptions(),
Expand Down
26 changes: 26 additions & 0 deletions router/env/ctrls.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/michaelquigley/pfxlog"
"github.com/openziti/foundation/v2/versions"
"github.com/openziti/transport/v2"
"github.com/openziti/ziti/common/inspect"
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/pkg/errors"
"sync"
Expand All @@ -43,6 +44,7 @@ type NetworkControllers interface {
DefaultRequestTimeout() time.Duration
ForEach(f func(ctrlId string, ch channel.Channel))
Close() error
Inspect() *inspect.ControllerInspectDetails
}

type CtrlDialer func(address transport.Address, bindHandler channel.BindHandler) error
Expand Down Expand Up @@ -251,3 +253,27 @@ func (self *networkControllers) CloseAndRemoveByAddress(address string) {
}
}
}

func (self *networkControllers) Inspect() *inspect.ControllerInspectDetails {
result := &inspect.ControllerInspectDetails{
Controllers: map[string]*inspect.ControllerInspectDetail{},
}

for id, ctrl := range self.ctrls.AsMap() {
version := ""
if ctrl.GetVersion() != nil {
version = ctrl.GetVersion().Version
}
result.Controllers[id] = &inspect.ControllerInspectDetail{
ControllerId: id,
IsConnected: ctrl.IsConnected(),
IsResponsive: !ctrl.IsUnresponsive(),
Address: ctrl.Address(),
Latency: ctrl.Latency().String(),
Version: version,
TimeSinceLastContact: ctrl.TimeSinceLastContact().String(),
}
}

return result
}
56 changes: 19 additions & 37 deletions router/handler_ctrl/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,7 @@ func (context *inspectRequestContext) processLocal() {
context.appendValue(requested, debugz.GenerateStack())
} else if lc == "links" {
result := context.handler.env.GetXlinkRegistry().Inspect(time.Second)
js, err := json.Marshal(result)
if err != nil {
context.appendError(errors.Wrap(err, "failed to marshal links to json").Error())
} else {
context.appendValue(requested, string(js))
}
context.handleJsonResponse(requested, result)
} else if lc == "sdk-terminators" {
factory, _ := xgress.GlobalRegistry().Factory("edge")
if factory == nil {
Expand All @@ -109,42 +104,22 @@ func (context *inspectRequestContext) processLocal() {
continue
}
result := inspectable.Inspect(lc, time.Second)
js, err := json.Marshal(result)
if err != nil {
context.appendError(errors.Wrap(err, "failed to marshal sdk terminators to json").Error())
} else {
context.appendValue(requested, string(js))
}
context.handleJsonResponse(requested, result)
} else if strings.HasPrefix(lc, "circuit:") {
circuitId := requested[len("circuit:"):]
result := context.handler.fwd.InspectCircuit(circuitId, false)
if result != nil {
js, err := json.Marshal(result)
if err != nil {
context.appendError(errors.Wrap(err, "failed to marshal circuit report to json").Error())
} else {
context.appendValue(requested, string(js))
}
context.handleJsonResponse(requested, result)
}
} else if strings.HasPrefix(lc, "circuitandstacks:") {
circuitId := requested[len("circuitAndStacks:"):]
result := context.handler.fwd.InspectCircuit(circuitId, true)
if result != nil {
js, err := json.Marshal(result)
if err != nil {
context.appendError(errors.Wrap(err, "failed to marshal circuit report to json").Error())
} else {
context.appendValue(requested, string(js))
}
context.handleJsonResponse(requested, result)
}
} else if strings.HasPrefix(lc, "metrics") {
msg := context.handler.fwd.MetricsRegistry().PollWithoutUsageMetrics()
js, err := json.Marshal(msg)
if err != nil {
context.appendError(errors.Wrap(err, "failed to marshal metrics to json").Error())
} else {
context.appendValue(requested, string(js))
}
context.handleJsonResponse(requested, msg)
} else if lc == "config" {
js, err := context.handler.env.RenderJsonConfig()
if err != nil {
Expand All @@ -153,17 +128,24 @@ func (context *inspectRequestContext) processLocal() {
context.appendValue(requested, js)
}
} else if lc == "router-data-model" {
rdm := context.handler.env.GetRouterDataModel()
js, err := json.Marshal(rdm)
if err != nil {
context.appendError(errors.Wrap(err, "failed to router data model to json").Error())
} else {
context.appendValue(requested, string(js))
}
result := context.handler.env.GetRouterDataModel()
context.handleJsonResponse(requested, result)
} else if lc == "router-controllers" {
result := context.handler.env.GetNetworkControllers().Inspect()
context.handleJsonResponse(requested, result)
}
}
}

func (context *inspectRequestContext) handleJsonResponse(key string, val interface{}) {
js, err := json.Marshal(val)
if err != nil {
context.appendError(errors.Wrapf(err, "failed to marshall %s to json", key).Error())
} else {
context.appendValue(key, string(js))
}
}

func (context *inspectRequestContext) sendResponse() {
body, err := proto.Marshal(context.response)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion router/state/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (sm *ManagerImpl) LoadRouterModel(filePath string) {
model, err := common.NewReceiverRouterDataModelFromFile(filePath, RouterDataModelListerBufferSize)

if err != nil {
pfxlog.Logger().WithError(err).Errorf("could not load router model from file [%s]", filePath)
pfxlog.Logger().WithError(err).Infof("could not load router model from file [%s]", filePath)
model = common.NewReceiverRouterDataModel(RouterDataModelListerBufferSize)
}

Expand Down
7 changes: 6 additions & 1 deletion router/xgress_edge/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/openziti/metrics"
"github.com/openziti/sdk-golang/ziti/edge"
"github.com/openziti/transport/v2"
"github.com/openziti/ziti/common"
"github.com/openziti/ziti/common/pb/edge_ctrl_pb"
"github.com/openziti/ziti/router"
"github.com/openziti/ziti/router/env"
Expand Down Expand Up @@ -118,7 +119,11 @@ func (factory *Factory) LoadConfig(configMap map[interface{}]interface{}) error

factory.edgeRouterConfig = config

factory.stateManager.LoadRouterModel(factory.edgeRouterConfig.Db)
if factory.routerConfig.Ha.Enabled {
factory.stateManager.LoadRouterModel(factory.edgeRouterConfig.Db)
} else {
factory.stateManager.SetRouterDataModel(common.NewReceiverRouterDataModel(state.RouterDataModelListerBufferSize))
}

go apiproxy.Start(config)

Expand Down
1 change: 1 addition & 0 deletions ziti/cmd/fabric/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func newInspectCmd(p common.OptionsProvider) *cobra.Command {
cmd.AddCommand(action.newInspectSubCmd(p, "sdk-terminators", "gets information from routers about their view of sdk terminators"))
cmd.AddCommand(action.newInspectSubCmd(p, "router-messaging", "gets information about pending router peer updates and terminator validations"))
cmd.AddCommand(action.newInspectSubCmd(p, "router-data-model", "gets information about the router data model"))
cmd.AddCommand(action.newInspectSubCmd(p, "router-controllers", "gets information about the state of a router's connections to its controllers"))

inspectCircuitsAction := &InspectCircuitsAction{InspectAction: *newInspectAction(p)}
cmd.AddCommand(inspectCircuitsAction.newCobraCmd())
Expand Down

0 comments on commit ea109bb

Please sign in to comment.