Skip to content

Commit

Permalink
feat(agentctl): Add config.resync subcommand (with resync) (#1642)
Browse files Browse the repository at this point in the history
* Improve json encoding/decoding and txn summary format

- TxnType and ResyncType are now marshalled as strings (unmarshalling works for both)
- Added tests for json encoding/decoding of TxnType and ResyncType

Signed-off-by: Ondrej Fabry <[email protected]>

* Fix comments

Signed-off-by: Ondrej Fabry <[email protected]>

* Ensure proto is encoded to json properly

Signed-off-by: Ondrej Fabry <[email protected]>

* Improve recorded transaction formatting for json

Signed-off-by: Ondrej Fabry <[email protected]>

* Log content length for HTTP client

Signed-off-by: Ondrej Fabry <[email protected]>

* Add config subcommand with resync

Signed-off-by: Ondrej Fabry <[email protected]>
  • Loading branch information
ondrej-fabry authored Mar 18, 2020
1 parent ec2549c commit 83d2423
Show file tree
Hide file tree
Showing 12 changed files with 450 additions and 53 deletions.
5 changes: 5 additions & 0 deletions cmd/agentctl/api/types/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,8 @@ type SchedulerDumpOptions struct {
type SchedulerValuesOptions struct {
KeyPrefix string
}

type SchedulerResyncOptions struct {
Retry bool
Verbose bool
}
3 changes: 2 additions & 1 deletion cmd/agentctl/client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type APIClient interface {
Close() error
}

// SystemAPIClient defines API client methods for the system
// InfraAPIClient defines API client methods for the system
type InfraAPIClient interface {
Status(ctx context.Context) (*probe.ExposedStatus, error)
Ping(ctx context.Context) (types.Ping, error)
Expand All @@ -53,6 +53,7 @@ type ModelAPIClient interface {
type SchedulerAPIClient interface {
SchedulerDump(ctx context.Context, opts types.SchedulerDumpOptions) ([]api.KVWithMetadata, error)
SchedulerValues(ctx context.Context, opts types.SchedulerValuesOptions) ([]*kvscheduler.BaseValueStatus, error)
SchedulerResync(ctx context.Context, opts types.SchedulerResyncOptions) (*api.RecordedTxn, error)
}

// VppAPIClient defines API client methods for the VPP
Expand Down
8 changes: 4 additions & 4 deletions cmd/agentctl/client/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
// serverResponse is a wrapper for http API responses.
type serverResponse struct {
body io.ReadCloser
contentLen int64
header http.Header
statusCode int
reqURL *url.URL
Expand Down Expand Up @@ -120,7 +121,7 @@ func (c *Client) doRequest(ctx context.Context, req *http.Request) (serverRespon
if err != nil {
logrus.Debugf("<= http response ERROR: %v", err)
} else {
logrus.Debugf("<= http response (statusCode %v)", serverResp.statusCode)
logrus.Debugf("<= http response %v (%d bytes)", serverResp.statusCode, serverResp.contentLen)
}
}()

Expand All @@ -140,15 +141,13 @@ func (c *Client) doRequest(ctx context.Context, req *http.Request) (serverRespon
case context.Canceled, context.DeadlineExceeded:
return serverResp, err
}

if nErr, ok := err.(*url.Error); ok {
if nErr, ok := nErr.Err.(*net.OpError); ok {
if os.IsPermission(nErr.Err) {
return serverResp, errors.Wrapf(err, "Got permission denied while trying to connect to the agent socket at %v", c.host)
}
}
}

if err, ok := err.(net.Error); ok {
if err.Timeout() {
return serverResp, ErrorConnectionFailed(c.host)
Expand All @@ -167,6 +166,7 @@ func (c *Client) doRequest(ctx context.Context, req *http.Request) (serverRespon
serverResp.statusCode = resp.StatusCode
serverResp.body = resp.Body
serverResp.header = resp.Header
serverResp.contentLen = resp.ContentLength
}
return serverResp, nil
}
Expand Down Expand Up @@ -207,7 +207,7 @@ func (c *Client) checkResponseErr(serverResp serverResponse) error {
if ct == "application/json" {
var errorResponse types.ErrorResponse
if err := json.Unmarshal(body, &errorResponse); err != nil {
return errors.Wrap(err, "Error reading JSON")
return errors.Wrap(err, "Error unmarshaling JSON body")
}
errorMsg = errorResponse.Message
} else {
Expand Down
35 changes: 34 additions & 1 deletion cmd/agentctl/client/scheduler.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package client

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/url"
"reflect"

"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"go.ligato.io/cn-infra/v2/logging"

"go.ligato.io/vpp-agent/v3/cmd/agentctl/api/types"
"go.ligato.io/vpp-agent/v3/plugins/kvscheduler/api"
Expand All @@ -24,7 +27,6 @@ func (c *Client) SchedulerDump(ctx context.Context, opts types.SchedulerDumpOpti
api.KVWithMetadata
Value ProtoWithName
}
var kvdump []KVWithMetadata

query := url.Values{}
query.Set("key-prefix", opts.KeyPrefix)
Expand All @@ -34,6 +36,8 @@ func (c *Client) SchedulerDump(ctx context.Context, opts types.SchedulerDumpOpti
if err != nil {
return nil, err
}

var kvdump []KVWithMetadata
if err := json.NewDecoder(resp.body).Decode(&kvdump); err != nil {
return nil, fmt.Errorf("decoding reply failed: %v", err)
}
Expand Down Expand Up @@ -73,3 +77,32 @@ func (c *Client) SchedulerValues(ctx context.Context, opts types.SchedulerValues

return status, nil
}

func (c *Client) SchedulerResync(ctx context.Context, opts types.SchedulerResyncOptions) (*api.RecordedTxn, error) {
query := url.Values{}
if opts.Retry {
query.Set("retry", "1")
}
if opts.Verbose {
query.Set("verbose", "1")
}

resp, err := c.post(ctx, "/scheduler/downstream-resync", query, nil, nil)
if err != nil {
return nil, err
}

body, err := ioutil.ReadAll(resp.body)
if err != nil {
return nil, err
}

logging.Debugf("body content:\n%s", body)

var rectxn api.RecordedTxn
if err := json.NewDecoder(bytes.NewReader(body)).Decode(&rectxn); err != nil {
return nil, fmt.Errorf("decoding reply failed: %v", err)
}

return &rectxn, nil
}
1 change: 1 addition & 0 deletions cmd/agentctl/commands/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func NewRoot(agentCli *cli.AgentCli) *Root {
// AddBaseCommands adds all base commands to cmd.
func AddBaseCommands(cmd *cobra.Command, cli cli.Cli) {
cmd.AddCommand(
NewConfigCommand(cli),
NewModelCommand(cli),
NewLogCommand(cli),
NewImportCommand(cli),
Expand Down
87 changes: 87 additions & 0 deletions cmd/agentctl/commands/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) 2019 Cisco and/or its affiliates.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package commands

import (
"context"

"github.com/spf13/cobra"

"go.ligato.io/vpp-agent/v3/cmd/agentctl/api/types"
agentcli "go.ligato.io/vpp-agent/v3/cmd/agentctl/cli"
)

func NewConfigCommand(cli agentcli.Cli) *cobra.Command {
cmd := &cobra.Command{
Use: "config",
Short: "Manage agent configuration",
}
cmd.AddCommand(
newConfigResyncCommand(cli),
)
return cmd
}

func newConfigResyncCommand(cli agentcli.Cli) *cobra.Command {
var (
opts ConfigResyncOptions
)
cmd := &cobra.Command{
Use: "resync",
Short: "Run config resync in agent",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
return runConfigResync(cli, opts)
},
}
flags := cmd.Flags()
flags.StringVarP(&opts.Format, "format", "f", "", "Format output")
flags.BoolVar(&opts.Verbose, "verbose", false, "Run resync in verbose mode")
flags.BoolVar(&opts.Retry, "retry", false, "Run resync with retries")
return cmd
}

type ConfigResyncOptions struct {
Format string
Verbose bool
Retry bool
}

func runConfigResync(cli agentcli.Cli, opts ConfigResyncOptions) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

rectxn, err := cli.Client().SchedulerResync(ctx, types.SchedulerResyncOptions{
Retry: opts.Retry,
Verbose: opts.Verbose,
})
if err != nil {
return err
}

format := opts.Format
if len(format) == 0 {
format = defaultFormatConfigResync
}

if err := formatAsTemplate(cli.Out(), format, rectxn); err != nil {
return err
}

return nil
}

// TODO: define default format with go template
const defaultFormatConfigResync = `json`
3 changes: 2 additions & 1 deletion pkg/models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ func GetKey(x proto.Message) (string, error) {
return key, nil
}

// GetName
// GetName returns instance name for given model.
// It returns error if given model is not registered.
func GetName(x proto.Message) (string, error) {
model, err := GetModelFor(x)
if err != nil {
Expand Down
136 changes: 136 additions & 0 deletions plugins/kvscheduler/api/api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright (c) 2020 Cisco and/or its affiliates.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api_test

import (
"encoding/json"
"testing"

"go.ligato.io/vpp-agent/v3/plugins/kvscheduler/api"
)

func TestTxnTypeEncode(t *testing.T) {
tests := []struct {
name string

txntype api.TxnType

expectOut string
expectErr error
}{
{"SBNotification", api.SBNotification, `"SBNotification"`, nil},
{"NBTransaction", api.NBTransaction, `"NBTransaction"`, nil},
{"RetryFailedOps", api.RetryFailedOps, `"RetryFailedOps"`, nil},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
b, err := json.Marshal(test.txntype)
if err != test.expectErr {
t.Fatalf("expected error: %v, got %v", test.expectErr, err)
}
out := string(b)
if out != test.expectOut {
t.Fatalf("expected output: %q, got %q", test.expectOut, out)
}
})
}
}

func TestTxnTypeDecode(t *testing.T) {
tests := []struct {
name string

input string

expectTxnType api.TxnType
expectErr error
}{
{"RetryFailedOps", `"RetryFailedOps"`, api.RetryFailedOps, nil},
{"NBTransaction", `"NBTransaction"`, api.NBTransaction, nil},
{"1 (NBTransaction)", `1`, api.NBTransaction, nil},
{"0 (SBNotification)", `0`, api.SBNotification, nil},
{"invalid", `"INVALID"`, api.TxnType(-1), nil},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var txntype api.TxnType
err := json.Unmarshal([]byte(test.input), &txntype)
if err != test.expectErr {
t.Fatalf("expected error: %v, got %v", test.expectErr, err)
}
if txntype != test.expectTxnType {
t.Fatalf("expected TxnType: %v, got %v", test.expectTxnType, txntype)
}
})
}
}

func TestResyncTypeEncode(t *testing.T) {
tests := []struct {
name string

resynctype api.ResyncType

expectOut string
expectErr error
}{
{"FullResync", api.FullResync, `"FullResync"`, nil},
{"UpstreamResync", api.UpstreamResync, `"UpstreamResync"`, nil},
{"DownstreamResync", api.DownstreamResync, `"DownstreamResync"`, nil},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
b, err := json.Marshal(test.resynctype)
if err != test.expectErr {
t.Fatalf("expected error: %v, got %v", test.expectErr, err)
}
out := string(b)
if out != test.expectOut {
t.Fatalf("expected output: %q, got %q", test.expectOut, out)
}
})
}
}

func TestResyncTypeDecode(t *testing.T) {
tests := []struct {
name string

input string

expectResyncType api.ResyncType
expectErr error
}{
{"FullResync", `"FullResync"`, api.FullResync, nil},
{"UpstreamResync", `"UpstreamResync"`, api.UpstreamResync, nil},
{"DownstreamResync", `"DownstreamResync"`, api.DownstreamResync, nil},
{"1 (FullResync)", `1`, api.FullResync, nil},
{"2 (UpstreamResync)", `2`, api.UpstreamResync, nil},
{"3 (DownstreamResync)", `3`, api.DownstreamResync, nil},
{"invalid", `"INVALID"`, api.ResyncType(0), nil},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var resyncType api.ResyncType
err := json.Unmarshal([]byte(test.input), &resyncType)
if err != test.expectErr {
t.Fatalf("expected error: %v, got %v", test.expectErr, err)
}
if resyncType != test.expectResyncType {
t.Fatalf("expected ResyncType: %v, got %v", test.expectResyncType, resyncType)
}
})
}
}
Loading

0 comments on commit 83d2423

Please sign in to comment.