diff --git a/pkg/server/tenant_status.go b/pkg/server/tenant_status.go index 7168eb78911d..b50c439b5eb9 100644 --- a/pkg/server/tenant_status.go +++ b/pkg/server/tenant_status.go @@ -18,6 +18,8 @@ package server import ( "context" "fmt" + "io" + "regexp" "sort" "strconv" "time" @@ -42,6 +44,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/logpb" "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -1110,6 +1113,177 @@ func (t *tenantStatusServer) Details( return resp, nil } +func (s *tenantStatusServer) LogFilesList( + ctx context.Context, req *serverpb.LogFilesListRequest, +) (*serverpb.LogFilesListResponse, error) { + ctx = propagateGatewayMetadata(ctx) + ctx = s.AnnotateCtx(ctx) + + if _, err := s.privilegeChecker.requireAdminUser(ctx); err != nil { + // NB: not using serverError() here since the priv checker + // already returns a proper gRPC error status. + return nil, err + } + + instanceID, local, err := s.parseInstanceID(req.NodeId) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, err.Error()) + } + instance, err := s.sqlServer.sqlInstanceReader.GetInstance(ctx, instanceID) + if err != nil { + return nil, err + } + if !local { + status, err := s.dialPod(ctx, instance.InstanceID, instance.InstanceAddr) + if err != nil { + return nil, serverError(ctx, err) + } + return status.LogFilesList(ctx, req) + } + log.Flush() + logFiles, err := log.ListLogFiles() + if err != nil { + return nil, serverError(ctx, err) + } + return &serverpb.LogFilesListResponse{Files: logFiles}, nil +} + +func (s *tenantStatusServer) LogFile( + ctx context.Context, req *serverpb.LogFileRequest, +) (*serverpb.LogEntriesResponse, error) { + ctx = propagateGatewayMetadata(ctx) + ctx = s.AnnotateCtx(ctx) + + if _, err := s.privilegeChecker.requireAdminUser(ctx); err != nil { + // NB: not using serverError() here since the priv checker + // already returns a proper gRPC error status. + return nil, err + } + + instanceID, local, err := s.parseInstanceID(req.NodeId) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, err.Error()) + } + instance, err := s.sqlServer.sqlInstanceReader.GetInstance(ctx, instanceID) + if err != nil { + return nil, err + } + if !local { + status, err := s.dialPod(ctx, instance.InstanceID, instance.InstanceAddr) + if err != nil { + return nil, serverError(ctx, err) + } + return status.LogFile(ctx, req) + } + + // Determine how to redact. + inputEditMode := log.SelectEditMode(req.Redact, log.KeepRedactable) + + // Ensure that the latest log entries are available in files. + log.Flush() + + // Read the logs. + reader, err := log.GetLogReader(req.File) + if err != nil { + return nil, serverError(ctx, errors.Wrapf(err, "log file %q could not be opened", req.File)) + } + defer reader.Close() + + var resp serverpb.LogEntriesResponse + decoder, err := log.NewEntryDecoder(reader, inputEditMode) + if err != nil { + return nil, serverError(ctx, err) + } + for { + var entry logpb.Entry + if err := decoder.Decode(&entry); err != nil { + if err == io.EOF { + break + } + return nil, serverError(ctx, err) + } + resp.Entries = append(resp.Entries, entry) + } + + return &resp, nil +} + +func (s *tenantStatusServer) Logs( + ctx context.Context, req *serverpb.LogsRequest, +) (*serverpb.LogEntriesResponse, error) { + ctx = propagateGatewayMetadata(ctx) + ctx = s.AnnotateCtx(ctx) + + if _, err := s.privilegeChecker.requireAdminUser(ctx); err != nil { + // NB: not using serverError() here since the priv checker + // already returns a proper gRPC error status. + return nil, err + } + + instanceID, local, err := s.parseInstanceID(req.NodeId) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, err.Error()) + } + instance, err := s.sqlServer.sqlInstanceReader.GetInstance(ctx, instanceID) + if err != nil { + return nil, err + } + if !local { + status, err := s.dialPod(ctx, instance.InstanceID, instance.InstanceAddr) + if err != nil { + return nil, serverError(ctx, err) + } + return status.Logs(ctx, req) + } + + // Determine how to redact. + inputEditMode := log.SelectEditMode(req.Redact, log.KeepRedactable) + + // Select the time interval. + startTimestamp, err := parseInt64WithDefault( + req.StartTime, + timeutil.Now().AddDate(0, 0, -1).UnixNano()) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "StartTime could not be parsed: %s", err) + } + + endTimestamp, err := parseInt64WithDefault(req.EndTime, timeutil.Now().UnixNano()) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "EndTime could not be parsed: %s", err) + } + + if startTimestamp > endTimestamp { + return nil, status.Errorf(codes.InvalidArgument, "StartTime: %d should not be greater than endtime: %d", startTimestamp, endTimestamp) + } + + maxEntries, err := parseInt64WithDefault(req.Max, defaultMaxLogEntries) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "Max could not be parsed: %s", err) + } + if maxEntries < 1 { + return nil, status.Errorf(codes.InvalidArgument, "Max: %d should be set to a value greater than 0", maxEntries) + } + + var regex *regexp.Regexp + if len(req.Pattern) > 0 { + if regex, err = regexp.Compile(req.Pattern); err != nil { + return nil, status.Errorf(codes.InvalidArgument, "regex pattern could not be compiled: %s", err) + } + } + + // Ensure that the latest log entries are available in files. + log.Flush() + + // Read the logs. + entries, err := log.FetchEntriesFromFiles( + startTimestamp, endTimestamp, int(maxEntries), regex, inputEditMode) + if err != nil { + return nil, serverError(ctx, err) + } + + return &serverpb.LogEntriesResponse{Entries: entries}, nil +} + func (t *tenantStatusServer) NodesList( ctx context.Context, req *serverpb.NodesListRequest, ) (*serverpb.NodesListResponse, error) {