Skip to content

Commit

Permalink
[usm] add SHOW operation to postgres monitor (#29469)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuri-lipnesh authored and grantseltzer committed Oct 4, 2024
1 parent 9643163 commit 1952f89
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 35 deletions.
4 changes: 3 additions & 1 deletion pkg/network/encoding/marshal/usm_postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (e *postgresEncoder) encodeData(connectionData *USMConnectionData[postgres.
staticTags |= stats.StaticTags
e.postgresAggregationsBuilder.AddAggregations(func(builder *model.DatabaseStatsBuilder) {
builder.SetPostgres(func(statsBuilder *model.PostgresStatsBuilder) {
statsBuilder.SetTableName(key.TableName)
statsBuilder.SetTableName(key.Parameters)
statsBuilder.SetOperation(uint64(toPostgresModelOperation(key.Operation)))
if latencies := stats.Latencies; latencies != nil {
blob, _ := proto.Marshal(latencies.ToProto())
Expand Down Expand Up @@ -106,6 +106,8 @@ func toPostgresModelOperation(op postgres.Operation) model.PostgresOperation {
return model.PostgresOperation_PostgresAlterOp
case postgres.TruncateTableOP:
return model.PostgresOperation_PostgresTruncateOp
case postgres.ShowOP:
return model.PostgresOperation_PostgresShowOp
default:
return model.PostgresOperation_PostgresUnknownOp
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/network/protocols/postgres/debugging/debugging.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ type address struct {
Port uint16
}

// key represents a (client, server, table name) tuple.
// key represents a (client, server, parameters: table name or runtime parameter) tuple.
type key struct {
Client address
Server address
TableName string
Client address
Server address
Parameters string
}

// Stats consolidates request count and latency information for a certain status code
Expand Down Expand Up @@ -58,7 +58,7 @@ func Postgres(stats map[postgres.Key]*postgres.RequestStat) []RequestSummary {
IP: serverAddr.String(),
Port: k.DstPort,
},
TableName: k.TableName,
Parameters: k.Parameters,
}
if _, ok := resMap[tempKey]; !ok {
resMap[tempKey] = make(map[string]Stats)
Expand Down
52 changes: 40 additions & 12 deletions pkg/network/protocols/postgres/model_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,21 @@ import (
"github.com/DataDog/datadog-agent/pkg/util/log"
)

const (
// EmptyParameters represents the case where the non-empty query has no parameters
EmptyParameters = "EMPTY_PARAMETERS"
)

// EventWrapper wraps an ebpf event and provides additional methods to extract information from it.
// We use this wrapper to avoid recomputing the same values (operation and table name) multiple times.
type EventWrapper struct {
*ebpf.EbpfEvent

operationSet bool
operation Operation
tableNameSet bool
tableName string
normalizer *sqllexer.Normalizer
operationSet bool
operation Operation
parametersSet bool
parameters string
normalizer *sqllexer.Normalizer
}

// NewEventWrapper creates a new EventWrapper from an ebpf event.
Expand Down Expand Up @@ -73,6 +78,25 @@ func (e *EventWrapper) Operation() Operation {
return e.operation
}

// extractParameters returns the string following the command
func (e *EventWrapper) extractParameters() string {
b := getFragment(&e.Tx)
idxParam := bytes.IndexByte(b, ' ') // trim the string to a space, it will give the parameter
if idxParam == -1 {
return EmptyParameters
}
idxParam++

idxEnd := bytes.IndexByte(b[idxParam:], '\x00') // trim trailing nulls
if idxEnd == 0 {
return EmptyParameters
}
if idxEnd != -1 {
return string(b[idxParam : idxParam+idxEnd])
}
return string(b[idxParam:])
}

var re = regexp.MustCompile(`(?i)if\s+exists`)

// extractTableName extracts the table name from the query.
Expand All @@ -97,14 +121,18 @@ func (e *EventWrapper) extractTableName() string {

}

// TableName returns the name of the table the query is operating on.
func (e *EventWrapper) TableName() string {
if !e.tableNameSet {
e.tableName = e.extractTableName()
e.tableNameSet = true
// Parameters returns the table name or run-time parameter.
func (e *EventWrapper) Parameters() string {
if !e.parametersSet {
if e.operation == ShowOP {
e.parameters = e.extractParameters()
} else {
e.parameters = e.extractTableName()
}
e.parametersSet = true
}

return e.tableName
return e.parameters
}

// RequestLatency returns the latency of the request in nanoseconds
Expand All @@ -125,6 +153,6 @@ ebpfTx{
// String returns a string representation of the underlying event
func (e *EventWrapper) String() string {
var output strings.Builder
output.WriteString(fmt.Sprintf(template, e.Operation(), e.TableName(), e.RequestLatency()))
output.WriteString(fmt.Sprintf(template, e.Operation(), e.Parameters(), e.RequestLatency()))
return output.String()
}
6 changes: 6 additions & 0 deletions pkg/network/protocols/postgres/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ const (
AlterTableOP
// TruncateTableOP represents a TRUNCATE operation.
TruncateTableOP
// ShowOP represents a command SHOW
ShowOP
)

// String returns the string representation of the operation.
Expand All @@ -50,6 +52,8 @@ func (op Operation) String() string {
return "DELETE"
case AlterTableOP:
return "ALTER"
case ShowOP:
return "SHOW"
default:
return "UNKNOWN"
}
Expand All @@ -74,6 +78,8 @@ func FromString(op string) Operation {
return DeleteTableOP
case "ALTER":
return AlterTableOP
case "SHOW":
return ShowOP
default:
return UnknownOP
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/network/protocols/postgres/stats_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@ import (

// Key is an identifier for a group of Postgres transactions
type Key struct {
Operation Operation
TableName string
Operation Operation
Parameters string
types.ConnectionKey
}

// NewKey creates a new postgres key
func NewKey(saddr, daddr util.Address, sport, dport uint16, operation Operation, tableName string) Key {
func NewKey(saddr, daddr util.Address, sport, dport uint16, operation Operation, parameters string) Key {
return Key{
ConnectionKey: types.NewConnectionKey(saddr, daddr, sport, dport),
Operation: operation,
TableName: tableName,
Parameters: parameters,
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/network/protocols/postgres/statskeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (s *StatKeeper) Process(tx *EventWrapper) {

key := Key{
Operation: tx.Operation(),
TableName: tx.TableName(),
Parameters: tx.Parameters(),
ConnectionKey: tx.ConnTuple(),
}
requestStats, ok := s.stats[key]
Expand Down
10 changes: 5 additions & 5 deletions pkg/network/protocols/postgres/statskeeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ func TestStatKeeperProcess(t *testing.T) {
Response_last_seen: 10,
},
},
operationSet: true,
operation: SelectOP,
tableNameSet: true,
tableName: "dummy",
operationSet: true,
operation: SelectOP,
parametersSet: true,
parameters: "dummy",
})
}

require.Equal(t, 1, len(s.stats))
for k, stat := range s.stats {
require.Equal(t, "dummy", k.TableName)
require.Equal(t, "dummy", k.Parameters)
require.Equal(t, SelectOP, k.Operation)
require.Equal(t, 20, stat.Count)
require.Equal(t, float64(20), stat.Latencies.GetCount())
Expand Down
2 changes: 1 addition & 1 deletion pkg/network/protocols/postgres/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (t *Telemetry) Count(tx *ebpf.EbpfEvent, eventWrapper *EventWrapper) {
t.failedOperationExtraction.Add(1)
state = operationNotFound
}
if eventWrapper.TableName() == "UNKNOWN" {
if eventWrapper.Parameters() == "UNKNOWN" {
t.failedTableNameExtraction.Add(1)
if state == operationNotFound {
state = tableAndOpNotFound
Expand Down
106 changes: 100 additions & 6 deletions pkg/network/usm/postgres_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,6 @@ func testDecoding(t *testing.T, isTLS bool) {
}, isTLS)
},
},
// This test validates that the SHOW command is currently not supported.
{
name: "show command",
preMonitorSetup: func(t *testing.T, ctx pgTestContext) {
Expand All @@ -497,8 +496,8 @@ func testDecoding(t *testing.T, isTLS bool) {
},
validation: func(t *testing.T, _ pgTestContext, monitor *Monitor) {
validatePostgres(t, monitor, map[string]map[postgres.Operation]int{
"UNKNOWN": {
postgres.UnknownOP: adjustCount(1),
"search_path": {
postgres.ShowOP: adjustCount(1),
},
}, isTLS)
},
Expand Down Expand Up @@ -724,11 +723,106 @@ func validatePostgres(t *testing.T, monitor *Monitor, expectedStats map[string]m
if hasTLSTag != tls {
continue
}
if _, ok := found[key.TableName]; !ok {
found[key.TableName] = make(map[postgres.Operation]int)
if _, ok := found[key.Parameters]; !ok {
found[key.Parameters] = make(map[postgres.Operation]int)
}
found[key.TableName][key.Operation] += stats.Count
found[key.Parameters][key.Operation] += stats.Count
}
return reflect.DeepEqual(expectedStats, found)
}, time.Second*5, time.Millisecond*100, "Expected to find a %v stats, instead captured %v", &expectedStats, &found)
}

func (s *postgresProtocolParsingSuite) TestExtractParameters() {
t := s.T()

units := []struct {
name string
expected string
event ebpf.EbpfEvent
}{
{
name: "query_size longer than the actual length of the content",
expected: "version and status",
event: ebpf.EbpfEvent{
Tx: ebpf.EbpfTx{
Request_fragment: createFragment([]byte("SHOW version and status")),
Original_query_size: 64,
},
},
},
{
name: "query_size shorter than the actual length of the content",
expected: "param1 param2",
event: ebpf.EbpfEvent{
Tx: ebpf.EbpfTx{
Request_fragment: createFragment([]byte("SHOW param1 param2 param3")),
Original_query_size: 18,
},
},
},
{
name: "the query has no parameters",
expected: postgres.EmptyParameters,
event: ebpf.EbpfEvent{
Tx: ebpf.EbpfTx{
Request_fragment: createFragment([]byte("SHOW ")),
Original_query_size: 10,
},
},
},
{
name: "command has trailing zeros",
expected: "param",
event: ebpf.EbpfEvent{
Tx: ebpf.EbpfTx{
Request_fragment: [ebpf.BufferSize]byte{'S', 'H', 'O', 'W', ' ', 'p', 'a', 'r', 'a', 'm', 0, 0, 0},
Original_query_size: 13,
},
},
},
{
name: "malformed command with wrong query_size",
expected: postgres.EmptyParameters,
event: ebpf.EbpfEvent{
Tx: ebpf.EbpfTx{
Request_fragment: [ebpf.BufferSize]byte{'S', 'H', 'O', 'W', ' ', 0, 0, 'a', ' ', 'b', 'c', 0, 0, 0},
Original_query_size: 14,
},
},
},
{
name: "empty parameters with spaces and nils",
expected: postgres.EmptyParameters,
event: ebpf.EbpfEvent{
Tx: ebpf.EbpfTx{
Request_fragment: [ebpf.BufferSize]byte{'S', 'H', 'O', 'W', ' ', 0, ' ', 0, ' ', 0, 0, 0},
Original_query_size: 12,
},
},
},
{
name: "parameters with control codes only",
expected: "\x01\x02\x03\x04\x05",
event: ebpf.EbpfEvent{
Tx: ebpf.EbpfTx{
Request_fragment: [ebpf.BufferSize]byte{'S', 'H', 'O', 'W', ' ', 1, 2, 3, 4, 5},
Original_query_size: 10,
},
},
},
}
for _, unit := range units {
t.Run(unit.name, func(t *testing.T) {
e := postgres.NewEventWrapper(&unit.event)
require.NotNil(t, e)
e.Operation()
require.Equal(t, unit.expected, e.Parameters())
})
}
}

func createFragment(fragment []byte) [ebpf.BufferSize]byte {
var b [ebpf.BufferSize]byte
copy(b[:], fragment)
return b
}

0 comments on commit 1952f89

Please sign in to comment.