From 8e31f56b7c7b18631e60da152edb40d31c295fbf Mon Sep 17 00:00:00 2001 From: David Hartunian Date: Tue, 8 Nov 2022 16:53:23 -0500 Subject: [PATCH] tenant: add support to Log endpoints to status server Previously, the tenant status server did not support the log-file related endpoints leading to missing logs in the debug.zip when generating for a tenant server. This commit migrates the implementations from the standard status server into the tenant, adjusts for fanout to instances instead of nodes, and leaves the rest as-is. Resolves: #91992 Epic: CC-5168 Release note (ops change): generating a debug.zip for a tenant server will now include logs in the zip file. --- .../serverccl/statusccl/tenant_status_test.go | 28 +++ pkg/server/serverpb/status.go | 3 + pkg/server/tenant_status.go | 174 ++++++++++++++++++ 3 files changed, 205 insertions(+) diff --git a/pkg/ccl/serverccl/statusccl/tenant_status_test.go b/pkg/ccl/serverccl/statusccl/tenant_status_test.go index 98684fef719a..4f276e82f109 100644 --- a/pkg/ccl/serverccl/statusccl/tenant_status_test.go +++ b/pkg/ccl/serverccl/statusccl/tenant_status_test.go @@ -110,6 +110,34 @@ func TestTenantStatusAPI(t *testing.T) { t.Run("tenant_auth_statement", func(t *testing.T) { testTenantAuthOnStatements(ctx, t, testHelper) }) + + t.Run("tenant_logs", func(t *testing.T) { + testTenantLogs(ctx, t, testHelper) + }) +} + +func testTenantLogs(ctx context.Context, t *testing.T, helper serverccl.TenantTestHelper) { + tenantA := helper.TestCluster().TenantStatusSrv(0) + + logsResp, err := tenantA.Logs(ctx, &serverpb.LogsRequest{ + NodeId: helper.TestCluster().Tenant(0).GetTenant().SQLInstanceID().String(), + Redact: false, + }) + require.NoError(t, err) + require.NotEmpty(t, logsResp.Entries) + + logsFilesListResp, err := tenantA.LogFilesList(ctx, &serverpb.LogFilesListRequest{ + NodeId: helper.TestCluster().Tenant(0).GetTenant().SQLInstanceID().String(), + }) + require.NoError(t, err) + require.NotEmpty(t, logsFilesListResp.Files) + + logsFileResp, err := tenantA.LogFile(ctx, &serverpb.LogFileRequest{ + NodeId: helper.TestCluster().Tenant(0).GetTenant().SQLInstanceID().String(), + File: logsFilesListResp.Files[0].Name, + }) + require.NoError(t, err) + require.NotEmpty(t, logsFileResp.Entries) } func TestTenantCannotSeeNonTenantStats(t *testing.T) { diff --git a/pkg/server/serverpb/status.go b/pkg/server/serverpb/status.go index 7be06399d29e..94d2ece6c2a6 100644 --- a/pkg/server/serverpb/status.go +++ b/pkg/server/serverpb/status.go @@ -41,6 +41,9 @@ type SQLStatusServer interface { TransactionContentionEvents(context.Context, *TransactionContentionEventsRequest) (*TransactionContentionEventsResponse, error) NodesList(context.Context, *NodesListRequest) (*NodesListResponse, error) ListExecutionInsights(context.Context, *ListExecutionInsightsRequest) (*ListExecutionInsightsResponse, error) + LogFilesList(context.Context, *LogFilesListRequest) (*LogFilesListResponse, error) + LogFile(context.Context, *LogFileRequest) (*LogEntriesResponse, error) + Logs(context.Context, *LogsRequest) (*LogEntriesResponse, error) } // OptionalNodesStatusServer is a StatusServer that is only optionally present diff --git a/pkg/server/tenant_status.go b/pkg/server/tenant_status.go index 7168eb78911d..26328ffe023f 100644 --- a/pkg/server/tenant_status.go +++ b/pkg/server/tenant_status.go @@ -18,6 +18,8 @@ package server import ( "context" "fmt" + "io" + "regexp" "sort" "strconv" "time" @@ -42,6 +44,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/logpb" "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -1110,6 +1113,177 @@ func (t *tenantStatusServer) Details( return resp, nil } +func (t *tenantStatusServer) LogFilesList( + ctx context.Context, req *serverpb.LogFilesListRequest, +) (*serverpb.LogFilesListResponse, error) { + ctx = propagateGatewayMetadata(ctx) + ctx = t.AnnotateCtx(ctx) + + if _, err := t.privilegeChecker.requireAdminUser(ctx); err != nil { + // NB: not using serverError() here since the priv checker + // already returns a proper gRPC error status. + return nil, err + } + + instanceID, local, err := t.parseInstanceID(req.NodeId) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, err.Error()) + } + instance, err := t.sqlServer.sqlInstanceReader.GetInstance(ctx, instanceID) + if err != nil { + return nil, err + } + if !local { + status, err := t.dialPod(ctx, instance.InstanceID, instance.InstanceAddr) + if err != nil { + return nil, serverError(ctx, err) + } + return status.LogFilesList(ctx, req) + } + log.Flush() + logFiles, err := log.ListLogFiles() + if err != nil { + return nil, serverError(ctx, err) + } + return &serverpb.LogFilesListResponse{Files: logFiles}, nil +} + +func (t *tenantStatusServer) LogFile( + ctx context.Context, req *serverpb.LogFileRequest, +) (*serverpb.LogEntriesResponse, error) { + ctx = propagateGatewayMetadata(ctx) + ctx = t.AnnotateCtx(ctx) + + if _, err := t.privilegeChecker.requireAdminUser(ctx); err != nil { + // NB: not using serverError() here since the priv checker + // already returns a proper gRPC error status. + return nil, err + } + + instanceID, local, err := t.parseInstanceID(req.NodeId) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, err.Error()) + } + instance, err := t.sqlServer.sqlInstanceReader.GetInstance(ctx, instanceID) + if err != nil { + return nil, err + } + if !local { + status, err := t.dialPod(ctx, instance.InstanceID, instance.InstanceAddr) + if err != nil { + return nil, serverError(ctx, err) + } + return status.LogFile(ctx, req) + } + + // Determine how to redact. + inputEditMode := log.SelectEditMode(req.Redact, log.KeepRedactable) + + // Ensure that the latest log entries are available in files. + log.Flush() + + // Read the logs. + reader, err := log.GetLogReader(req.File) + if err != nil { + return nil, serverError(ctx, errors.Wrapf(err, "log file %q could not be opened", req.File)) + } + defer reader.Close() + + var resp serverpb.LogEntriesResponse + decoder, err := log.NewEntryDecoder(reader, inputEditMode) + if err != nil { + return nil, serverError(ctx, err) + } + for { + var entry logpb.Entry + if err := decoder.Decode(&entry); err != nil { + if err == io.EOF { + break + } + return nil, serverError(ctx, err) + } + resp.Entries = append(resp.Entries, entry) + } + + return &resp, nil +} + +func (t *tenantStatusServer) Logs( + ctx context.Context, req *serverpb.LogsRequest, +) (*serverpb.LogEntriesResponse, error) { + ctx = propagateGatewayMetadata(ctx) + ctx = t.AnnotateCtx(ctx) + + if _, err := t.privilegeChecker.requireAdminUser(ctx); err != nil { + // NB: not using serverError() here since the priv checker + // already returns a proper gRPC error status. + return nil, err + } + + instanceID, local, err := t.parseInstanceID(req.NodeId) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, err.Error()) + } + instance, err := t.sqlServer.sqlInstanceReader.GetInstance(ctx, instanceID) + if err != nil { + return nil, err + } + if !local { + status, err := t.dialPod(ctx, instance.InstanceID, instance.InstanceAddr) + if err != nil { + return nil, serverError(ctx, err) + } + return status.Logs(ctx, req) + } + + // Determine how to redact. + inputEditMode := log.SelectEditMode(req.Redact, log.KeepRedactable) + + // Select the time interval. + startTimestamp, err := parseInt64WithDefault( + req.StartTime, + timeutil.Now().AddDate(0, 0, -1).UnixNano()) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "StartTime could not be parsed: %s", err) + } + + endTimestamp, err := parseInt64WithDefault(req.EndTime, timeutil.Now().UnixNano()) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "EndTime could not be parsed: %s", err) + } + + if startTimestamp > endTimestamp { + return nil, status.Errorf(codes.InvalidArgument, "StartTime: %d should not be greater than endtime: %d", startTimestamp, endTimestamp) + } + + maxEntries, err := parseInt64WithDefault(req.Max, defaultMaxLogEntries) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "Max could not be parsed: %s", err) + } + if maxEntries < 1 { + return nil, status.Errorf(codes.InvalidArgument, "Max: %d should be set to a value greater than 0", maxEntries) + } + + var regex *regexp.Regexp + if len(req.Pattern) > 0 { + if regex, err = regexp.Compile(req.Pattern); err != nil { + return nil, status.Errorf(codes.InvalidArgument, "regex pattern could not be compiled: %s", err) + } + } + + // Ensure that the latest log entries are available in files. + log.Flush() + + // Read the logs. + entries, err := log.FetchEntriesFromFiles( + startTimestamp, endTimestamp, int(maxEntries), regex, inputEditMode) + if err != nil { + return nil, serverError(ctx, err) + } + + return &serverpb.LogEntriesResponse{Entries: entries}, nil +} + func (t *tenantStatusServer) NodesList( ctx context.Context, req *serverpb.NodesListRequest, ) (*serverpb.NodesListResponse, error) {