From cd7712c492c24d6bcbfd2770e23bb867df8b75ff Mon Sep 17 00:00:00 2001 From: Guy Arbitman Date: Tue, 28 Nov 2023 12:33:38 +0200 Subject: [PATCH] usm: tests: Make request slower for protocol classification (#21100) * usm: tests: Make request slower for protocol classification * usm: tests: Make request slower for protocol classification --- pkg/network/tracer/testutil/grpc/server.go | 18 ++++++-- pkg/network/tracer/tracer_usm_linux_test.go | 49 +++++++++------------ 2 files changed, 37 insertions(+), 30 deletions(-) diff --git a/pkg/network/tracer/testutil/grpc/server.go b/pkg/network/tracer/testutil/grpc/server.go index dae23aa6eec62..94e65a75f829b 100644 --- a/pkg/network/tracer/testutil/grpc/server.go +++ b/pkg/network/tracer/testutil/grpc/server.go @@ -226,10 +226,17 @@ func NewServer(addr string) (*Server, error) { return nil, err } + server := NewServerWithoutBind() + server.lis = lis + server.Address = lis.Addr().String() + + return server, nil +} + +// NewServerWithoutBind returns a new instance of the gRPC server. +func NewServerWithoutBind() *Server { server := &Server{ - Address: lis.Addr().String(), grpcSrv: grpc.NewServer(grpc.MaxRecvMsgSize(100*1024*1024), grpc.MaxSendMsgSize(100*1024*1024)), - lis: lis, routeNotes: make(map[string][]*routeguide.RouteNote), } @@ -238,7 +245,12 @@ func NewServer(addr string) (*Server, error) { routeguide.RegisterRouteGuideServer(server.grpcSrv, server) pbStream.RegisterMathServer(server.grpcSrv, server) - return server, nil + return server +} + +// GetGRPCServer returns the gRPC server. +func (s *Server) GetGRPCServer() *grpc.Server { + return s.grpcSrv } func (s *Server) Stop() { diff --git a/pkg/network/tracer/tracer_usm_linux_test.go b/pkg/network/tracer/tracer_usm_linux_test.go index 7b31b6b03db69..5807ef53961de 100644 --- a/pkg/network/tracer/tracer_usm_linux_test.go +++ b/pkg/network/tracer/tracer_usm_linux_test.go @@ -8,7 +8,6 @@ package tracer import ( - "bufio" "bytes" "context" "crypto/tls" @@ -27,6 +26,7 @@ import ( "testing" "time" + gorilla "github.com/gorilla/mux" krpretty "github.com/kr/pretty" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -51,6 +51,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/network/tracer/connection/kprobe" "github.com/DataDog/datadog-agent/pkg/network/tracer/testutil/grpc" "github.com/DataDog/datadog-agent/pkg/network/usm/utils" + grpc2 "github.com/DataDog/datadog-agent/pkg/util/grpc" ) func httpSupported() bool { @@ -598,9 +599,6 @@ func (s *USMSuite) TestProtocolClassification() { func testProtocolConnectionProtocolMapCleanup(t *testing.T, tr *Tracer, clientHost, targetHost, serverHost string) { t.Run("protocol cleanup", func(t *testing.T) { - if tr.ebpfTracer.Type() == connection.TracerTypeFentry { - t.Skip("protocol classification not supported for fentry tracer") - } t.Cleanup(func() { tr.ebpfTracer.Pause() }) dialer := &net.Dialer{ @@ -624,17 +622,21 @@ func testProtocolConnectionProtocolMapCleanup(t *testing.T, tr *Tracer, clientHo initTracerState(t, tr) require.NoError(t, tr.ebpfTracer.Resume()) - HTTPServer := NewTCPServerOnAddress(serverHost, func(c net.Conn) { - r := bufio.NewReader(c) - input, err := r.ReadBytes(byte('\n')) - if err == nil { - c.Write(input) - } - c.Close() + mux := gorilla.NewRouter() + mux.Handle("/test", nethttp.DefaultServeMux) + grpcHandler := grpc.NewServerWithoutBind() + + lis, err := net.Listen("tcp", serverHost) + require.NoError(t, err) + srv := grpc2.NewMuxedGRPCServer(serverHost, nil, grpcHandler.GetGRPCServer(), mux) + srv.Addr = lis.Addr().String() + + go srv.Serve(lis) + t.Cleanup(func() { + _ = srv.Shutdown(context.Background()) + _ = lis.Close() }) - t.Cleanup(HTTPServer.Shutdown) - require.NoError(t, HTTPServer.Run()) - _, port, err := net.SplitHostPort(HTTPServer.address) + _, port, err := net.SplitHostPort(srv.Addr) require.NoError(t, err) targetAddr := net.JoinHostPort(targetHost, port) @@ -644,19 +646,13 @@ func testProtocolConnectionProtocolMapCleanup(t *testing.T, tr *Tracer, clientHo DialContext: dialer.DialContext, }, } - resp, err := client.Get("http://" + targetAddr + "/test") - if err == nil { - io.Copy(io.Discard, resp.Body) - resp.Body.Close() - } + resp, err := client.Post("http://"+targetAddr+"/test", "text/plain", bytes.NewReader(bytes.Repeat([]byte("test"), 100))) + require.NoError(t, err) + io.Copy(io.Discard, resp.Body) + resp.Body.Close() client.CloseIdleConnections() - waitForConnectionsWithProtocol(t, tr, targetAddr, HTTPServer.address, &protocols.Stack{Application: protocols.HTTP}) - HTTPServer.Shutdown() - - gRPCServer, err := grpc.NewServer(HTTPServer.address) - require.NoError(t, err) - gRPCServer.Run() + waitForConnectionsWithProtocol(t, tr, targetAddr, srv.Addr, &protocols.Stack{Application: protocols.HTTP}) grpcClient, err := grpc.NewClient(targetAddr, grpc.Options{ CustomDialer: dialer, @@ -664,8 +660,7 @@ func testProtocolConnectionProtocolMapCleanup(t *testing.T, tr *Tracer, clientHo require.NoError(t, err) defer grpcClient.Close() _ = grpcClient.HandleUnary(context.Background(), "test") - gRPCServer.Stop() - waitForConnectionsWithProtocol(t, tr, targetAddr, gRPCServer.Address, &protocols.Stack{Api: protocols.GRPC, Application: protocols.HTTP2}) + waitForConnectionsWithProtocol(t, tr, targetAddr, srv.Addr, &protocols.Stack{Api: protocols.GRPC, Application: protocols.HTTP2}) }) }