Skip to content

Commit

Permalink
Merge pull request #966 from dcantah/ncproxy-oc
Browse files Browse the repository at this point in the history
Add open census spans for ncproxy + go mod vendor
  • Loading branch information
dcantah authored Mar 13, 2021
2 parents 57cae1d + d0a87ad commit 5281188
Show file tree
Hide file tree
Showing 49 changed files with 4,299 additions and 66 deletions.
18 changes: 17 additions & 1 deletion cmd/ncproxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ import (
"syscall"
"time"

"github.com/Microsoft/go-winio/pkg/etwlogrus"
"github.com/Microsoft/hcsshim/cmd/ncproxy/nodenetsvc"
"github.com/Microsoft/hcsshim/internal/computeagent"
"github.com/Microsoft/hcsshim/internal/log"
"github.com/Microsoft/hcsshim/internal/oc"
"github.com/sirupsen/logrus"
"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/trace"
"google.golang.org/grpc"
)

Expand All @@ -31,6 +35,18 @@ var (
)

func main() {
// Provider ID: cf9f01fe-87b3-568d-ecef-9f54b7c5ff70
// Hook isn't closed explicitly, as it will exist until process exit.
if hook, err := etwlogrus.NewHook("Microsoft.Virtualization.NCProxy"); err == nil {
logrus.AddHook(hook)
} else {
logrus.Error(err)
}

// Register our OpenCensus logrus exporter
trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})
trace.RegisterExporter(&oc.LogrusExporter{})

flag.Parse()
ctx := context.Background()
conf, err := loadConfig(*configPath)
Expand All @@ -50,7 +66,7 @@ func main() {
if conf.NodeNetSvcAddr != "" {
log.G(ctx).Debugf("connecting to NodeNetworkService at address %s", conf.NodeNetSvcAddr)

opts := []grpc.DialOption{grpc.WithInsecure()}
opts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithStatsHandler(&ocgrpc.ClientHandler{})}
if conf.Timeout > 0 {
opts = append(opts, grpc.WithBlock(), grpc.WithTimeout(time.Duration(conf.Timeout)*time.Second))
}
Expand Down
170 changes: 106 additions & 64 deletions cmd/ncproxy/ncproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ import (
"github.com/Microsoft/hcsshim/cmd/ncproxy/nodenetsvc"
"github.com/Microsoft/hcsshim/hcn"
"github.com/Microsoft/hcsshim/internal/computeagent"
"github.com/Microsoft/hcsshim/internal/log"
"github.com/Microsoft/hcsshim/internal/ncproxyttrpc"
"github.com/Microsoft/hcsshim/internal/oc"
"github.com/Microsoft/hcsshim/internal/uvm"
"github.com/Microsoft/hcsshim/pkg/octtrpc"
"github.com/containerd/ttrpc"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand All @@ -29,12 +30,15 @@ type grpcService struct {

var _ ncproxygrpc.NetworkConfigProxyServer = &grpcService{}

func (s *grpcService) AddNIC(ctx context.Context, req *ncproxygrpc.AddNICRequest) (*ncproxygrpc.AddNICResponse, error) {
log.G(ctx).WithFields(logrus.Fields{
"containerID": req.ContainerID,
"endpointName": req.EndpointName,
"nicID": req.NicID,
}).Info("AddNIC request")
func (s *grpcService) AddNIC(ctx context.Context, req *ncproxygrpc.AddNICRequest) (_ *ncproxygrpc.AddNICResponse, err error) {
ctx, span := trace.StartSpan(ctx, "AddNIC")
defer span.End()
defer func() { oc.SetSpanStatus(span, err) }()

span.AddAttributes(
trace.StringAttribute("containerID", req.ContainerID),
trace.StringAttribute("endpointName", req.EndpointName),
trace.StringAttribute("nicID", req.NicID))

if req.ContainerID == "" || req.EndpointName == "" || req.NicID == "" {
return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req)
Expand All @@ -53,12 +57,15 @@ func (s *grpcService) AddNIC(ctx context.Context, req *ncproxygrpc.AddNICRequest
return nil, status.Errorf(codes.FailedPrecondition, "No shim registered for namespace `%s`", req.ContainerID)
}

func (s *grpcService) DeleteNIC(ctx context.Context, req *ncproxygrpc.DeleteNICRequest) (*ncproxygrpc.DeleteNICResponse, error) {
log.G(ctx).WithFields(logrus.Fields{
"containerID": req.ContainerID,
"nicID": req.NicID,
"endpointName": req.EndpointName,
}).Info("DeleteNIC request")
func (s *grpcService) DeleteNIC(ctx context.Context, req *ncproxygrpc.DeleteNICRequest) (_ *ncproxygrpc.DeleteNICResponse, err error) {
ctx, span := trace.StartSpan(ctx, "DeleteNIC")
defer span.End()
defer func() { oc.SetSpanStatus(span, err) }()

span.AddAttributes(
trace.StringAttribute("containerID", req.ContainerID),
trace.StringAttribute("endpointName", req.EndpointName),
trace.StringAttribute("nicID", req.NicID))

if req.ContainerID == "" || req.EndpointName == "" || req.NicID == "" {
return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req)
Expand All @@ -83,19 +90,22 @@ func (s *grpcService) DeleteNIC(ctx context.Context, req *ncproxygrpc.DeleteNICR
//
// HNS Methods
//
func (s *grpcService) CreateNetwork(ctx context.Context, req *ncproxygrpc.CreateNetworkRequest) (*ncproxygrpc.CreateNetworkResponse, error) {
log.G(ctx).WithFields(logrus.Fields{
"networkName": req.Name,
"type": req.Mode.String(),
"ipamType": req.IpamType,
}).Info("CreateNetwork request")
func (s *grpcService) CreateNetwork(ctx context.Context, req *ncproxygrpc.CreateNetworkRequest) (_ *ncproxygrpc.CreateNetworkResponse, err error) {
ctx, span := trace.StartSpan(ctx, "CreateNetwork")
defer span.End()
defer func() { oc.SetSpanStatus(span, err) }()

span.AddAttributes(
trace.StringAttribute("networkName", req.Name),
trace.StringAttribute("type", req.Mode.String()),
trace.StringAttribute("ipamType", req.IpamType.String()))

if req.Name == "" || req.Mode.String() == "" || req.IpamType.String() == "" || req.SwitchName == "" {
return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req)
}

// Check if the network already exists, and if so return error.
_, err := hcn.GetNetworkByName(req.Name)
_, err = hcn.GetNetworkByName(req.Name)
if err == nil {
return nil, status.Errorf(codes.FailedPrecondition, "network with name %q already exists", req.Name)
}
Expand Down Expand Up @@ -167,13 +177,16 @@ func (s *grpcService) CreateNetwork(ctx context.Context, req *ncproxygrpc.Create
}, nil
}

func (s *grpcService) CreateEndpoint(ctx context.Context, req *ncproxygrpc.CreateEndpointRequest) (*ncproxygrpc.CreateEndpointResponse, error) {
log.G(ctx).WithFields(logrus.Fields{
"endpointName": req.Name,
"ipAddr": req.Ipaddress,
"macAddr": req.Macaddress,
"networkName": req.NetworkName,
}).Info("CreateEndpoint request")
func (s *grpcService) CreateEndpoint(ctx context.Context, req *ncproxygrpc.CreateEndpointRequest) (_ *ncproxygrpc.CreateEndpointResponse, err error) {
ctx, span := trace.StartSpan(ctx, "CreateEndpoint")
defer span.End()
defer func() { oc.SetSpanStatus(span, err) }()

span.AddAttributes(
trace.StringAttribute("macAddr", req.Macaddress),
trace.StringAttribute("endpointName", req.Name),
trace.StringAttribute("ipAddr", req.Ipaddress),
trace.StringAttribute("networkName", req.NetworkName))

if req.Name == "" || req.Ipaddress == "" || req.Macaddress == "" || req.NetworkName == "" {
return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req)
Expand Down Expand Up @@ -238,11 +251,14 @@ func (s *grpcService) CreateEndpoint(ctx context.Context, req *ncproxygrpc.Creat
}, nil
}

func (s *grpcService) AddEndpoint(ctx context.Context, req *ncproxygrpc.AddEndpointRequest) (*ncproxygrpc.AddEndpointResponse, error) {
log.G(ctx).WithFields(logrus.Fields{
"endpointName": req.Name,
"namespaceID": req.NamespaceID,
}).Info("AddEndpoint request")
func (s *grpcService) AddEndpoint(ctx context.Context, req *ncproxygrpc.AddEndpointRequest) (_ *ncproxygrpc.AddEndpointResponse, err error) {
ctx, span := trace.StartSpan(ctx, "AddEndpoint")
defer span.End()
defer func() { oc.SetSpanStatus(span, err) }()

span.AddAttributes(
trace.StringAttribute("endpointName", req.Name),
trace.StringAttribute("namespaceID", req.NamespaceID))

if req.Name == "" {
return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req)
Expand All @@ -262,10 +278,13 @@ func (s *grpcService) AddEndpoint(ctx context.Context, req *ncproxygrpc.AddEndpo
return &ncproxygrpc.AddEndpointResponse{}, nil
}

func (s *grpcService) DeleteEndpoint(ctx context.Context, req *ncproxygrpc.DeleteEndpointRequest) (*ncproxygrpc.DeleteEndpointResponse, error) {
log.G(ctx).WithFields(logrus.Fields{
"endpointName": req.Name,
}).Info("DeleteEndpoint request")
func (s *grpcService) DeleteEndpoint(ctx context.Context, req *ncproxygrpc.DeleteEndpointRequest) (_ *ncproxygrpc.DeleteEndpointResponse, err error) {
ctx, span := trace.StartSpan(ctx, "DeleteEndpoint")
defer span.End()
defer func() { oc.SetSpanStatus(span, err) }()

span.AddAttributes(
trace.StringAttribute("endpointName", req.Name))

if req.Name == "" {
return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req)
Expand All @@ -285,10 +304,13 @@ func (s *grpcService) DeleteEndpoint(ctx context.Context, req *ncproxygrpc.Delet
return &ncproxygrpc.DeleteEndpointResponse{}, nil
}

func (s *grpcService) DeleteNetwork(ctx context.Context, req *ncproxygrpc.DeleteNetworkRequest) (*ncproxygrpc.DeleteNetworkResponse, error) {
log.G(ctx).WithFields(logrus.Fields{
"networkName": req.Name,
}).Info("DeleteNetwork request")
func (s *grpcService) DeleteNetwork(ctx context.Context, req *ncproxygrpc.DeleteNetworkRequest) (_ *ncproxygrpc.DeleteNetworkResponse, err error) {
ctx, span := trace.StartSpan(ctx, "DeleteNetwork")
defer span.End()
defer func() { oc.SetSpanStatus(span, err) }()

span.AddAttributes(
trace.StringAttribute("networkName", req.Name))

if req.Name == "" {
return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req)
Expand All @@ -308,10 +330,13 @@ func (s *grpcService) DeleteNetwork(ctx context.Context, req *ncproxygrpc.Delete
return &ncproxygrpc.DeleteNetworkResponse{}, nil
}

func (s *grpcService) GetEndpoint(ctx context.Context, req *ncproxygrpc.GetEndpointRequest) (*ncproxygrpc.GetEndpointResponse, error) {
log.G(ctx).WithFields(logrus.Fields{
"endpointName": req.Name,
}).Info("GetEndpoint request")
func (s *grpcService) GetEndpoint(ctx context.Context, req *ncproxygrpc.GetEndpointRequest) (_ *ncproxygrpc.GetEndpointResponse, err error) {
ctx, span := trace.StartSpan(ctx, "GetEndpoint")
defer span.End()
defer func() { oc.SetSpanStatus(span, err) }()

span.AddAttributes(
trace.StringAttribute("endpointName", req.Name))

if req.Name == "" {
return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req)
Expand All @@ -333,8 +358,10 @@ func (s *grpcService) GetEndpoint(ctx context.Context, req *ncproxygrpc.GetEndpo
}, nil
}

func (s *grpcService) GetEndpoints(ctx context.Context, req *ncproxygrpc.GetEndpointsRequest) (*ncproxygrpc.GetEndpointsResponse, error) {
log.G(ctx).Info("GetEndpoints request")
func (s *grpcService) GetEndpoints(ctx context.Context, req *ncproxygrpc.GetEndpointsRequest) (_ *ncproxygrpc.GetEndpointsResponse, err error) {
ctx, span := trace.StartSpan(ctx, "GetEndpoints")
defer span.End()
defer func() { oc.SetSpanStatus(span, err) }()

rawEndpoints, err := hcn.ListEndpoints()
if err != nil {
Expand All @@ -356,10 +383,13 @@ func (s *grpcService) GetEndpoints(ctx context.Context, req *ncproxygrpc.GetEndp
}, nil
}

func (s *grpcService) GetNetwork(ctx context.Context, req *ncproxygrpc.GetNetworkRequest) (*ncproxygrpc.GetNetworkResponse, error) {
log.G(ctx).WithFields(logrus.Fields{
"networkName": req.Name,
}).Info("GetNetwork request")
func (s *grpcService) GetNetwork(ctx context.Context, req *ncproxygrpc.GetNetworkRequest) (_ *ncproxygrpc.GetNetworkResponse, err error) {
ctx, span := trace.StartSpan(ctx, "GetNetwork")
defer span.End()
defer func() { oc.SetSpanStatus(span, err) }()

span.AddAttributes(
trace.StringAttribute("networkName", req.Name))

if req.Name == "" {
return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req)
Expand All @@ -379,8 +409,10 @@ func (s *grpcService) GetNetwork(ctx context.Context, req *ncproxygrpc.GetNetwor
}, nil
}

func (s *grpcService) GetNetworks(ctx context.Context, req *ncproxygrpc.GetNetworksRequest) (*ncproxygrpc.GetNetworksResponse, error) {
log.G(ctx).Info("GetNetworks request")
func (s *grpcService) GetNetworks(ctx context.Context, req *ncproxygrpc.GetNetworksRequest) (_ *ncproxygrpc.GetNetworksResponse, err error) {
ctx, span := trace.StartSpan(ctx, "GetNetworks")
defer span.End()
defer func() { oc.SetSpanStatus(span, err) }()

rawNetworks, err := hcn.ListNetworks()
if err != nil {
Expand All @@ -407,17 +439,24 @@ type ttrpcService struct {
m sync.Mutex
}

func (s *ttrpcService) RegisterComputeAgent(ctx context.Context, req *ncproxyttrpc.RegisterComputeAgentRequest) (*ncproxyttrpc.RegisterComputeAgentResponse, error) {
log.G(ctx).WithFields(logrus.Fields{
"containerID": req.ContainerID,
"agentAddress": req.AgentAddress,
}).Info("RegisterComputeAgent request")
func (s *ttrpcService) RegisterComputeAgent(ctx context.Context, req *ncproxyttrpc.RegisterComputeAgentRequest) (_ *ncproxyttrpc.RegisterComputeAgentResponse, err error) {
ctx, span := trace.StartSpan(ctx, "RegisterComputeAgent")
defer span.End()
defer func() { oc.SetSpanStatus(span, err) }()

span.AddAttributes(
trace.StringAttribute("containerID", req.ContainerID),
trace.StringAttribute("agentAddress", req.AgentAddress))

conn, err := winio.DialPipe(req.AgentAddress, nil)
if err != nil {
return nil, errors.Wrap(err, "failed to connect to compute agent service")
}
client := ttrpc.NewClient(conn, ttrpc.WithOnClose(func() { conn.Close() }))
client := ttrpc.NewClient(
conn,
ttrpc.WithUnaryClientInterceptor(octtrpc.ClientInterceptor()),
ttrpc.WithOnClose(func() { conn.Close() }),
)
// Add to global client map if connection succeeds. Don't check if there's already a map entry
// just overwrite as the client may have changed the address of the config agent.
s.m.Lock()
Expand All @@ -426,11 +465,14 @@ func (s *ttrpcService) RegisterComputeAgent(ctx context.Context, req *ncproxyttr
return &ncproxyttrpc.RegisterComputeAgentResponse{}, nil
}

func (s *ttrpcService) ConfigureNetworking(ctx context.Context, req *ncproxyttrpc.ConfigureNetworkingInternalRequest) (*ncproxyttrpc.ConfigureNetworkingInternalResponse, error) {
log.G(ctx).WithFields(logrus.Fields{
"containerID": req.ContainerID,
"RequestType": req.RequestType,
}).Info("ConfigureNetworking request")
func (s *ttrpcService) ConfigureNetworking(ctx context.Context, req *ncproxyttrpc.ConfigureNetworkingInternalRequest) (_ *ncproxyttrpc.ConfigureNetworkingInternalResponse, err error) {
ctx, span := trace.StartSpan(ctx, "ConfigureNetworking")
defer span.End()
defer func() { oc.SetSpanStatus(span, err) }()

span.AddAttributes(
trace.StringAttribute("containerID", req.ContainerID),
trace.StringAttribute("agentAddress", req.RequestType.String()))

if req.ContainerID == "" {
return nil, status.Error(codes.InvalidArgument, "ContainerID is empty")
Expand Down
3 changes: 2 additions & 1 deletion cmd/ncproxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/Microsoft/hcsshim/pkg/octtrpc"
"github.com/containerd/ttrpc"
"github.com/sirupsen/logrus"
"go.opencensus.io/plugin/ocgrpc"
"google.golang.org/grpc"
)

Expand All @@ -28,7 +29,7 @@ func newServer(ctx context.Context, conf *config) (*server, error) {
return nil, err
}
return &server{
grpc: grpc.NewServer(),
grpc: grpc.NewServer(grpc.StatsHandler(&ocgrpc.ServerHandler{})),
ttrpc: ttrpcServer,
conf: conf,
}, nil
Expand Down
Loading

0 comments on commit 5281188

Please sign in to comment.