Skip to content

Commit

Permalink
feat: Improvements (#1675)
Browse files Browse the repository at this point in the history
* fix(agentctl): Validate view type and show dump errors

Signed-off-by: Ondrej Fabry <[email protected]>

* Fix flowprobe feature key function

Signed-off-by: Ondrej Fabry <[email protected]>

* Add missing config (ipfix, teib) to ConfigData

Signed-off-by: Ondrej Fabry <[email protected]>

* Accept grpc.ClientConnInterface in remoteclient

Signed-off-by: Ondrej Fabry <[email protected]>

* Improve logged info and stop plugins gracefully

Signed-off-by: Ondrej Fabry <[email protected]>

* Prefix name of integration tests related to routes

Signed-off-by: Ondrej Fabry <[email protected]>

* Handle encodings in format flag

Signed-off-by: Ondrej Fabry <[email protected]>

* Add e2e test for VPP TAP tunnel

Signed-off-by: Ondrej Fabry <[email protected]>

* Print output from executed commands in e2e tests

Signed-off-by: Ondrej Fabry <[email protected]>
  • Loading branch information
ondrej-fabry authored Jul 6, 2020
1 parent e9aa355 commit 6ba0dbb
Show file tree
Hide file tree
Showing 12 changed files with 438 additions and 162 deletions.
2 changes: 1 addition & 1 deletion client/remoteclient/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type grpcClient struct {
}

// NewClientGRPC returns new instance that uses given service client for requests.
func NewClientGRPC(conn *grpc.ClientConn) client.ConfigClient {
func NewClientGRPC(conn grpc.ClientConnInterface) client.ConfigClient {
manager := generic.NewManagerServiceClient(conn)
meta := generic.NewMetaServiceClient(conn)
return &grpcClient{
Expand Down
20 changes: 17 additions & 3 deletions cmd/agentctl/commands/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,21 @@ type DumpOptions struct {
}

func runDump(cli agentcli.Cli, opts DumpOptions) error {
dumpView := opts.View

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var dumpView string
switch strings.ToLower(opts.View) {
case "cached", "cache", "":
dumpView = "cached"
case "nb", "northbound":
dumpView = "NB"
case "sb", "southbound":
dumpView = "SB"
default:
return fmt.Errorf("invalid view type: %q", opts.View)
}

allModels, err := cli.Client().ModelList(ctx, types.ModelListOptions{
Class: "config",
})
Expand All @@ -91,18 +101,22 @@ func runDump(cli agentcli.Cli, opts DumpOptions) error {
return fmt.Errorf("no models found for %q", opts.Models)
}

var errs Errors
var dumps []api.KVWithMetadata
for _, keyPrefix := range keyPrefixes {
dump, err := cli.Client().SchedulerDump(ctx, types.SchedulerDumpOptions{
KeyPrefix: keyPrefix,
View: dumpView,
})
if err != nil {
logging.Debug(fmt.Errorf("dump for %s failed: %v", keyPrefix, err))
errs = append(errs, fmt.Errorf("dump for %s failed: %v", keyPrefix, err))
continue
}
dumps = append(dumps, dump...)
}
if errs != nil {
logging.Debugf("dumped with %d errors\n%v", len(errs), errs)
}

sort.Sort(dumpByKey(dumps))

Expand Down
30 changes: 16 additions & 14 deletions cmd/agentctl/commands/formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"io"
"strings"
"text/template"
"time"

Expand All @@ -38,21 +39,22 @@ func formatAsTemplate(w io.Writer, format string, data interface{}) error {
t := template.New("format")
t.Funcs(tmplFuncs)

if format == "json" {
format = "{{json .}}"
} else if format == "yaml" {
format = "{{yaml .}}"
} else if format == "proto" {
format = "{{proto .}}"
}

if _, err := t.Parse(format); err != nil {
return fmt.Errorf("parsing format template failed: %v", err)
}

var b bytes.Buffer
if err := t.Execute(&b, data); err != nil {
return fmt.Errorf("executing format template failed: %v", err)

switch strings.ToLower(format) {
case "json":
b.WriteString(jsonTmpl(data))
case "yaml", "yml":
b.WriteString(yamlTmpl(data))
case "proto":
b.WriteString(protoTmpl(data))
default:
if _, err := t.Parse(format); err != nil {
return fmt.Errorf("parsing format template failed: %v", err)
}
if err := t.Execute(&b, data); err != nil {
return fmt.Errorf("executing format template failed: %v", err)
}
}

_, err := b.WriteTo(w)
Expand Down
93 changes: 64 additions & 29 deletions plugins/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package orchestrator
import (
"os"
"strings"
"sync"

"github.com/golang/protobuf/proto"
"go.ligato.io/cn-infra/v2/datasync"
Expand All @@ -43,16 +44,18 @@ var (
type Plugin struct {
Deps

*dispatcher
manager *genericService

reflection bool

// datasync channels
changeChan chan datasync.ChangeEvent
resyncChan chan datasync.ResyncEvent
watchDataReg datasync.WatchRegistration

reflection bool

*dispatcher
wg sync.WaitGroup
quit chan struct{}
}

// Deps represents dependencies for the plugin.
Expand All @@ -67,6 +70,8 @@ type Deps struct {

// Init registers the service to GRPC server.
func (p *Plugin) Init() (err error) {
p.quit = make(chan struct{})

p.dispatcher = &dispatcher{
log: logging.DefaultRegistry.NewLogger("dispatcher"),
db: newMemStore(),
Expand All @@ -80,28 +85,33 @@ func (p *Plugin) Init() (err error) {
}

if grpcServer := p.GRPC.GetServer(); grpcServer != nil {
p.Log.Debugf("registering generic manager and meta service")
generic.RegisterManagerServiceServer(grpcServer, p.manager)
generic.RegisterMetaServiceServer(grpcServer, p.manager)

// register grpc services for reflection
if p.reflection {
p.Log.Infof("registering grpc reflection service")
p.Log.Debugf("registering grpc reflection service")
reflection.Register(grpcServer)
}
} else {
p.log.Infof("grpc server not available")
p.log.Infof("grpc server is not available")
}

nbPrefixes := p.kvs.GetRegisteredNBKeyPrefixes()
if len(nbPrefixes) > 0 {
p.log.Infof("Watch starting for %d registered NB prefixes", len(nbPrefixes))
} else {
p.log.Warnf("No registered NB prefixes found in KVScheduler (ensure that all KVDescriptors are registered before this)")
p.Log.Infof("Found %d registered models", len(models.RegisteredModels()))
for _, model := range models.RegisteredModels() {
p.debugf("- model: %+v", *model.Spec())
}

var prefixes []string
for _, prefix := range nbPrefixes {
p.log.Debugf("- watching NB prefix: %s", prefix)
prefixes = append(prefixes, prefix)
if nbPrefixes := p.kvs.GetRegisteredNBKeyPrefixes(); len(nbPrefixes) > 0 {
p.log.Infof("Watching %d key prefixes from KVScheduler", len(nbPrefixes))
for _, prefix := range nbPrefixes {
p.debugf("- prefix: %s", prefix)
prefixes = append(prefixes, prefix)
}
} else {
p.log.Warnf("No key prefixes found in KVScheduler (ensure that all KVDescriptors are registered before this)")
}

// initialize datasync channels
Expand All @@ -119,15 +129,26 @@ func (p *Plugin) Init() (err error) {

// AfterInit subscribes to known NB prefixes.
func (p *Plugin) AfterInit() (err error) {
// watch datasync events
p.wg.Add(1)
go p.watchEvents()

statusChan := make(chan *kvscheduler.BaseValueStatus, 100)
p.kvs.WatchValueStatus(statusChan, nil)

// watch KVSchedular status changes
p.wg.Add(1)
go p.watchStatus(statusChan)

return nil
}

func (p *Plugin) Close() (err error) {
close(p.quit)
p.wg.Wait()
return nil
}

// InitialSync will start initial synchronization with downstream.
func (p *Plugin) InitialSync() {
// FIXME: KVScheduler needs to have some type of sync that only refreshes state from SB
Expand All @@ -142,6 +163,11 @@ func (p *Plugin) InitialSync() {
}

func (p *Plugin) watchEvents() {
defer p.wg.Done()

p.Log.Debugf("watching datasync events")
defer p.Log.Debugf("done watching datasync events")

for {
select {
case e := <-p.changeChan:
Expand Down Expand Up @@ -232,30 +258,22 @@ func (p *Plugin) watchEvents() {
_, err := p.PushData(ctx, kvPairs)
e.Done(err)

case <-p.quit:
return
}
}
}

// UnmarshalLazyValue is helper function for unmarshalling from datasync.LazyValue.
func UnmarshalLazyValue(key string, lazy datasync.LazyValue) (proto.Message, error) {
model, err := models.GetModelForKey(key)
if err != nil {
return nil, err
}
instance := model.NewInstance()
// try to deserialize the value into instance
if err := lazy.GetValue(instance); err != nil {
return nil, err
}
return instance, nil
}

func (p *Plugin) watchStatus(ch <-chan *kvscheduler.BaseValueStatus) {
defer p.wg.Done()

p.Log.Debugf("watching status changes")
defer p.Log.Debugf("done watching status events")

for {
select {
case s := <-ch:

p.debugf("STATUS: %15s %v ===> %v (%v) %v",
p.debugf("incoming status change: %15s %v ===> %v (%v) %v",
s.Value.State, s.Value.Details, s.Value.Key, s.Value.LastOperation, s.Value.Error)
for _, dv := range s.DerivedValues {
p.debugf(" \t%15s %v ---> %v (%v) %v",
Expand All @@ -267,6 +285,9 @@ func (p *Plugin) watchStatus(ch <-chan *kvscheduler.BaseValueStatus) {
{Key: s.Value.Key, Status: s.Value},
})
}

case <-p.quit:
return
}
}
}
Expand Down Expand Up @@ -294,3 +315,17 @@ func (p *Plugin) debugf(f string, a ...interface{}) {
p.log.Debugf(f, a...)
}
}

// UnmarshalLazyValue is helper function for unmarshalling from datasync.LazyValue.
func UnmarshalLazyValue(key string, lazy datasync.LazyValue) (proto.Message, error) {
model, err := models.GetModelForKey(key)
if err != nil {
return nil, err
}
instance := model.NewInstance()
// try to deserialize the value into instance
if err := lazy.GetValue(instance); err != nil {
return nil, err
}
return instance, nil
}
4 changes: 1 addition & 3 deletions plugins/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,18 +194,16 @@ func (p *Plugin) periodicUpdates() {
defer p.wg.Done()

p.Log.Debugf("starting periodic updates (%v)", p.updatePeriod)
defer p.Log.Debugf("stopping periodic updates")

tick := time.NewTicker(p.updatePeriod)
for {
select {
// Delay period between updates
case <-tick.C:
ctx := context.Background()
p.updatePrometheus(ctx)

// Plugin has stopped.
case <-p.quit:
p.Log.Debugf("stopping periodic updates")
return
}
}
Expand Down
4 changes: 2 additions & 2 deletions proto/ligato/vpp/ipfix/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,6 @@ func FlowprobeParamsKey() string {

// FlowprobeFeatureKey returns the prefix used in ETCD
// to store vpp Flowprobe feature config.
func FlowprobeFeatureKey() string {
return models.Key(&FlowProbeFeature{})
func FlowprobeFeatureKey(iface string) string {
return models.Key(&FlowProbeFeature{Interface: iface})
}
Loading

0 comments on commit 6ba0dbb

Please sign in to comment.