Skip to content

Commit

Permalink
Add connection monitoring (#386)
Browse files Browse the repository at this point in the history
* add connection monitoring and conn id generation

Signed-off-by: Nikita Skrynnik <[email protected]>

* fix ci issues

Signed-off-by: Nikita Skrynnik <[email protected]>
  • Loading branch information
NikitaSkrynnik authored Apr 1, 2022
1 parent ba3a77c commit 8f91426
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 9 deletions.
6 changes: 3 additions & 3 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ linters-settings:
goimports:
local-prefixes: github.com/networkservicemesh
gocyclo:
min-complexity: 15
min-complexity: 20
maligned:
suggest-new: true
dupl:
threshold: 150
funlen:
Lines: 180
Statements: 80
Lines: 225
Statements: 100
goconst:
min-len: 2
min-occurrences: 2
Expand Down
2 changes: 2 additions & 0 deletions internal/imports/imports_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package imports

import (
_ "context"
_ "fmt"
_ "github.com/antonfisher/nested-logrus-formatter"
_ "github.com/edwarnicke/debug"
_ "github.com/edwarnicke/grpcfd"
Expand All @@ -20,6 +21,7 @@ import (
_ "github.com/networkservicemesh/sdk/pkg/networkservice/common/retry"
_ "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"
_ "github.com/networkservicemesh/sdk/pkg/tools/awarenessgroups"
_ "github.com/networkservicemesh/sdk/pkg/tools/grpcutils"
_ "github.com/networkservicemesh/sdk/pkg/tools/log"
_ "github.com/networkservicemesh/sdk/pkg/tools/log/logruslogger"
_ "github.com/networkservicemesh/sdk/pkg/tools/nsurl"
Expand Down
69 changes: 63 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package main

import (
"context"
"fmt"
"net/url"
"os"
"os/signal"
Expand Down Expand Up @@ -50,6 +51,7 @@ import (
"github.com/networkservicemesh/sdk/pkg/networkservice/common/retry"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"
"github.com/networkservicemesh/sdk/pkg/tools/awarenessgroups"
"github.com/networkservicemesh/sdk/pkg/tools/grpcutils"
"github.com/networkservicemesh/sdk/pkg/tools/log"
"github.com/networkservicemesh/sdk/pkg/tools/log/logruslogger"
"github.com/networkservicemesh/sdk/pkg/tools/nsurl"
Expand Down Expand Up @@ -190,7 +192,7 @@ func main() {
grpcfd.WithChainUnaryInterceptor(),
)

c := client.NewClient(
nsmClient := client.NewClient(
ctx,
client.WithClientURL(&config.ConnectTo),
client.WithName(config.Name),
Expand All @@ -208,36 +210,91 @@ func main() {
client.WithDialOptions(dialOptions...),
)

c = retry.NewClient(c, retry.WithTryTimeout(config.RequestTimeout))
nsmClient = retry.NewClient(nsmClient, retry.WithTryTimeout(config.RequestTimeout))

// ********************************************************************************
log.FromContext(ctx).Infof("executing phase 5: connect to all passed services (time since start: %s)", time.Since(starttime))
// Configure signal handling context
// ********************************************************************************
signalCtx, cancelSignalCtx := notifyContext(ctx)
defer cancelSignalCtx()

// ********************************************************************************
// Create Network Service Manager monitorClient
// ********************************************************************************
dialCtx, cancelDial := context.WithTimeout(signalCtx, config.DialTimeout)
defer cancelDial()

log.FromContext(ctx).Infof("NSC: Connecting to Network Service Manager %v", config.ConnectTo.String())
cc, err := grpc.DialContext(dialCtx, grpcutils.URLToTarget(&config.ConnectTo), dialOptions...)
if err != nil {
log.FromContext(ctx).Fatalf("failed dial to NSMgr: %v", err.Error())
}

monitorClient := networkservice.NewMonitorConnectionClient(cc)

// ********************************************************************************
log.FromContext(ctx).Infof("executing phase 5: connect to all passed services (time since start: %s)", time.Since(starttime))
// ********************************************************************************

for i := 0; i < len(config.NetworkServices); i++ {
u := nsurl.NSURL(config.NetworkServices[i])

id := fmt.Sprintf("%s-%d", config.Name, i)

monitorCtx, cancelMonitor := context.WithTimeout(signalCtx, config.RequestTimeout)
defer cancelMonitor()

stream, err := monitorClient.MonitorConnections(monitorCtx, &networkservice.MonitorScopeSelector{
PathSegments: []*networkservice.PathSegment{
{
Id: id,
},
},
})
if err != nil {
log.FromContext(ctx).Fatal(err.Error())
}

event, err := stream.Recv()
if err != nil {
log.FromContext(ctx).Fatal(err.Error())
}
cancelMonitor()

mech := u.Mechanism()
if mech.Type != memif.MECHANISM {
log.FromContext(ctx).Fatalf("mechanism type: %v is not supproted", mech.Type)
log.FromContext(ctx).Fatalf("mechanism type: %v is not supported", mech.Type)
}
request := &networkservice.NetworkServiceRequest{
Connection: &networkservice.Connection{
Id: id,
NetworkService: u.NetworkService(),
Labels: u.Labels(),
},
MechanismPreferences: []*networkservice.Mechanism{
mech,
},
}

for _, conn := range event.Connections {
path := conn.GetPath()
if path.Index == 1 && path.PathSegments[0].Id == id && conn.Mechanism.Type == u.Mechanism().Type {
request.Connection = conn
request.Connection.Path.Index = 0
request.Connection.Id = id
break
}
}

resp, err := c.Request(ctx, request)
resp, err := nsmClient.Request(ctx, request)
if err != nil {
log.FromContext(ctx).Fatalf("request has failed: %v", err.Error())
}

defer func() {
closeCtx, cancelClose := context.WithTimeout(ctx, config.RequestTimeout)
defer cancelClose()
_, _ = c.Close(closeCtx, resp)
_, _ = nsmClient.Close(closeCtx, resp)
}()
}

Expand Down

0 comments on commit 8f91426

Please sign in to comment.