Skip to content

Commit

Permalink
tweaked interface statistics
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Lavor <[email protected]>
  • Loading branch information
VladoLavor committed Jun 6, 2019
1 parent 3ec1870 commit 8715c96
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 84 deletions.
102 changes: 54 additions & 48 deletions plugins/vpp/ifplugin/ifplugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ type IfPlugin struct {
defaultMtu uint32

// state data
publishStats bool
publishLock sync.Mutex
statusCheckReg bool
watchStatusReg datasync.WatchRegistration
Expand Down Expand Up @@ -139,10 +138,6 @@ func (p *IfPlugin) Init() error {
// Read config file and set all related fields
p.fromConfigFile()

// Fills nil dependencies with default values
p.publishStats = p.PublishStatistics != nil || p.NotifyStates != nil
p.fixNilPointers()

// VPP channel
if p.vppCh, err = p.GoVppmux.NewAPIChannel(); err != nil {
return errors.Errorf("failed to create GoVPP API channel: %v", err)
Expand Down Expand Up @@ -208,55 +203,62 @@ func (p *IfPlugin) Init() error {
}
p.dhcpDescriptor.WatchDHCPNotifications(p.ctx)

// interface state data
if p.publishStats {
// subscribe & watch for resync of interface state data

// -> Interface stats

var publishKVDB, publishMessaging bool
if p.PublishStatistics != nil {
publishKVDB = true
}
if p.NotifyStates != nil {
publishMessaging = true
}
p.fixNilPointers()

// subscribe and watch for resync events and remove obsolete KV statistics from DB
if publishKVDB {
p.resyncStatusChan = make(chan datasync.ResyncEvent)

p.wg.Add(1)
go p.watchStatusEvents()
if err = p.subscribeWatcher(); err != nil {
return err
}
}

// start interface state updater
p.ifStateChan = make(chan *interfaces.InterfaceNotification, 1000)

// start interface state publishing
p.wg.Add(1)
go p.publishIfStateEvents()

// Interface state updater
p.ifStateUpdater = &InterfaceStateUpdater{}

var n int
var t time.Time
ifNotifHandler := func(state *interfaces.InterfaceNotification) {
select {
case p.ifStateChan <- state:
// OK
default:
// full
if time.Since(t) > time.Second {
p.Log.Debugf("ifStateChan channel is full (%d)", n)
n = 0
} else {
n++
ifNotifHandler := func(state *interfaces.InterfaceNotification){}
if publishKVDB || publishMessaging {
p.ifStateChan = make(chan *interfaces.InterfaceNotification, 1000)
p.wg.Add(1)
go p.publishIfStateEvents()

var n int
var t time.Time
ifNotifHandler = func(state *interfaces.InterfaceNotification) {
select {
case p.ifStateChan <- state:
// OK
default:
// full
if time.Since(t) > time.Second {
p.Log.Debugf("ifStateChan channel is full (%d)", n)
n = 0
} else {
n++
}
t = time.Now()
}
t = time.Now()
}
}

err = p.ifStateUpdater.Init(p.ctx, p.Log, p.KVScheduler, p.GoVppmux, p.intfIndex,
ifNotifHandler, p.publishStats)
// Start interface state updater
p.ifStateUpdater = &InterfaceStateUpdater{}
err = p.ifStateUpdater.Init(p.ctx, p.Log, p.KVScheduler, p.GoVppmux, p.intfIndex, ifNotifHandler, publishKVDB || publishMessaging)
if err != nil {
return err
}

if p.publishStats {
if err = p.subscribeWatcher(); err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -334,17 +336,21 @@ func (p *IfPlugin) fromConfigFile() {
return
}
if config != nil {
publishers := datasync.KVProtoWriters{}
for _, pub := range config.StatusPublishers {
db, found := p.Deps.DataSyncs[pub]
if !found {
p.Log.Warnf("Unknown status publisher %q from config", pub)
continue
// status publishers
if len(config.StatusPublishers) > 0 {
publishers := datasync.KVProtoWriters{}
for _, pub := range config.StatusPublishers {
db, found := p.Deps.DataSyncs[pub]
if !found {
p.Log.Warnf("Unknown status publisher %q from config", pub)
continue
}
publishers = append(publishers, db)
p.Log.Infof("Added status publisher %q from config", pub)
}
publishers = append(publishers, db)
p.Log.Infof("Added status publisher %q from config", pub)
p.Deps.PublishStatistics = publishers
}
p.Deps.PublishStatistics = publishers
// default mtu
if config.MTU != 0 {
p.defaultMtu = config.MTU
p.Log.Infof("Default MTU set to %v", p.defaultMtu)
Expand Down
55 changes: 27 additions & 28 deletions plugins/vpp/ifplugin/interface_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package ifplugin

import (
"context"
"os"
"sync"
"time"

Expand All @@ -38,8 +37,6 @@ var (

// StateUpdateDelay defines delay before dumping states
StateUpdateDelay = time.Second * 3

disableInterfaceStats = os.Getenv("DISABLE_INTERFACE_STATS") != ""
)

// InterfaceStateUpdater holds state data of all VPP interfaces.
Expand All @@ -48,7 +45,8 @@ type InterfaceStateUpdater struct {

kvScheduler kvs.KVScheduler
swIfIndexes ifaceidx.IfaceMetadataIndex
publishIfState func(notification *intf.InterfaceNotification)
publishState bool
ifNotifHandler func(notification *intf.InterfaceNotification)

access sync.Mutex // lock for the state data map
ifState map[uint32]*intf.InterfaceState // swIfIndex to state data map
Expand All @@ -74,7 +72,7 @@ type InterfaceStateUpdater struct {
// Init members (channels, maps...) and start go routines
func (c *InterfaceStateUpdater) Init(ctx context.Context, logger logging.PluginLogger, kvScheduler kvs.KVScheduler,
goVppMux govppmux.StatsAPI, swIfIndexes ifaceidx.IfaceMetadataIndex,
publishIfState func(notification *intf.InterfaceNotification), readCounters bool) (err error) {
ifNotifHandler func(notification *intf.InterfaceNotification), publishState bool) (err error) {

// Logger
c.log = logger.NewLogger("if-state")
Expand All @@ -83,7 +81,8 @@ func (c *InterfaceStateUpdater) Init(ctx context.Context, logger logging.PluginL
c.swIfIndexes = swIfIndexes

c.kvScheduler = kvScheduler
c.publishIfState = publishIfState
c.ifNotifHandler = ifNotifHandler
c.publishState = publishState
c.ifState = make(map[uint32]*intf.InterfaceState)

c.ifsForUpdate = make(map[uint32]struct{})
Expand All @@ -97,39 +96,39 @@ func (c *InterfaceStateUpdater) Init(ctx context.Context, logger logging.PluginL
}

c.ifHandler = vppcalls.CompatibleInterfaceVppHandler(c.vppCh, logger.NewLogger("if-handler"))

c.ifMetaChan = make(chan ifaceidx.IfaceMetadataDto, 1000)
swIfIndexes.WatchInterfaces("ifplugin_ifstate", c.ifMetaChan)

c.ifEvents = make(chan *vppcalls.InterfaceEvent, 1000)

// Create child context
var childCtx context.Context
childCtx, c.cancel = context.WithCancel(ctx)

// Watch for incoming notifications
c.wg.Add(1)
go c.watchVPPNotifications(childCtx)
// Start watchers
if c.publishState {
// Watch for incoming notifications
c.ifMetaChan = make(chan ifaceidx.IfaceMetadataDto, 1000)
swIfIndexes.WatchInterfaces("ifplugin_ifstate", c.ifMetaChan)

// Periodically read VPP counters and combined counters for VPP statistics
if disableInterfaceStats {
c.log.Warnf("reading interface stats is disabled!")
} else if readCounters {
c.wg.Add(1)
go c.watchVPPNotifications(childCtx)

// Periodically read VPP counters and combined counters for VPP statistics
c.wg.Add(1)
go c.startReadingCounters(childCtx)
}

c.wg.Add(1)
go c.startUpdatingIfStateDetails(childCtx)
c.wg.Add(1)
go c.startUpdatingIfStateDetails(childCtx)
}

return nil
}

// AfterInit subscribes for watching VPP notifications on previously initialized channel
func (c *InterfaceStateUpdater) AfterInit() error {
err := c.subscribeVPPNotifications()
if err != nil {
return err
if c.publishState {
err := c.subscribeVPPNotifications()
if err != nil {
return err
}
}
return nil
}
Expand Down Expand Up @@ -248,7 +247,7 @@ func (c *InterfaceStateUpdater) doUpdatesIfStateDetails() {
return
}

// we dont want to lock during potentionally long dump call
// we dont want to lock during potentially long dump call
c.access.Unlock()

c.log.Debugf("running update for interface state details (%d)", len(c.ifsForUpdate))
Expand Down Expand Up @@ -325,7 +324,7 @@ func (c *InterfaceStateUpdater) processInterfaceStatEntry(ifCounters govppapi.In
OutBytes: ifCounters.TxBytes,
}

c.publishIfState(&intf.InterfaceNotification{
c.ifNotifHandler(&intf.InterfaceNotification{
Type: intf.InterfaceNotification_COUNTERS, State: ifState})
}

Expand All @@ -346,7 +345,7 @@ func (c *InterfaceStateUpdater) processIfStateEvent(notif *vppcalls.InterfaceEve
ifState.Name, ifState.IfIndex, notif)

// store data in ETCD
c.publishIfState(&intf.InterfaceNotification{
c.ifNotifHandler(&intf.InterfaceNotification{
Type: intf.InterfaceNotification_UPDOWN, State: ifState})
}

Expand Down Expand Up @@ -475,7 +474,7 @@ func (c *InterfaceStateUpdater) updateIfStateDetails(ifDetails *vppcalls.Interfa
ifState.Duplex = intf.InterfaceState_UNKNOWN_DUPLEX
}

c.publishIfState(&intf.InterfaceNotification{
c.ifNotifHandler(&intf.InterfaceNotification{
Type: intf.InterfaceNotification_UNKNOWN, State: ifState})
}

Expand All @@ -494,6 +493,6 @@ func (c *InterfaceStateUpdater) setIfStateDeleted(swIfIndex uint32, ifName strin
ifState.LastChange = time.Now().Unix()

// this can be post-processed by multiple plugins
c.publishIfState(&intf.InterfaceNotification{
c.ifNotifHandler(&intf.InterfaceNotification{
Type: intf.InterfaceNotification_UNKNOWN, State: ifState})
}
14 changes: 6 additions & 8 deletions plugins/vpp/ifplugin/publish_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,16 @@ func (p *IfPlugin) publishIfStateEvents() {

p.Log.Debugf("Publishing interface state: %+v", ifState)

if p.PublishStatistics != nil {
err := p.PublishStatistics.Put(key, ifState.State)
if err != nil {
if lastPublishErr == nil || lastPublishErr.Error() != err.Error() {
p.Log.Error(err)
}
err := p.PublishStatistics.Put(key, ifState.State)
if err != nil {
if lastPublishErr == nil || lastPublishErr.Error() != err.Error() {
p.Log.Error(err)
}
lastPublishErr = err
}
lastPublishErr = err

// Marshall data into JSON & send kafka message.
if p.NotifyStates != nil && ifState.Type == interfaces.InterfaceNotification_UPDOWN {
if ifState.Type == interfaces.InterfaceNotification_UPDOWN {
err := p.NotifyStates.Put(key, ifState.State)
if err != nil {
if lastNotifErr == nil || lastNotifErr.Error() != err.Error() {
Expand Down

0 comments on commit 8715c96

Please sign in to comment.