Skip to content

Commit

Permalink
Merge pull request cockroachdb#96906 from JeffSwenson/backport22.2-91604
Browse files Browse the repository at this point in the history
release-22.2: tenant: add support to Log endpoints to status server
  • Loading branch information
jeffswenson authored Feb 10, 2023
2 parents 5d5b3b3 + 88368db commit df15537
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 2 deletions.
28 changes: 28 additions & 0 deletions pkg/ccl/serverccl/statusccl/tenant_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,34 @@ func TestTenantStatusAPI(t *testing.T) {
t.Run("tenant_auth_statement", func(t *testing.T) {
testTenantAuthOnStatements(ctx, t, testHelper)
})

t.Run("tenant_logs", func(t *testing.T) {
testTenantLogs(ctx, t, testHelper)
})
}

func testTenantLogs(ctx context.Context, t *testing.T, helper serverccl.TenantTestHelper) {
tenantA := helper.TestCluster().TenantStatusSrv(0)

logsResp, err := tenantA.Logs(ctx, &serverpb.LogsRequest{
NodeId: helper.TestCluster().Tenant(0).GetTenant().SQLInstanceID().String(),
Redact: false,
})
require.NoError(t, err)
require.NotEmpty(t, logsResp.Entries)

logsFilesListResp, err := tenantA.LogFilesList(ctx, &serverpb.LogFilesListRequest{
NodeId: helper.TestCluster().Tenant(0).GetTenant().SQLInstanceID().String(),
})
require.NoError(t, err)
require.NotEmpty(t, logsFilesListResp.Files)

logsFileResp, err := tenantA.LogFile(ctx, &serverpb.LogFileRequest{
NodeId: helper.TestCluster().Tenant(0).GetTenant().SQLInstanceID().String(),
File: logsFilesListResp.Files[0].Name,
})
require.NoError(t, err)
require.NotEmpty(t, logsFileResp.Entries)
}

func TestTenantCannotSeeNonTenantStats(t *testing.T) {
Expand Down
3 changes: 1 addition & 2 deletions pkg/cli/testdata/zip/testzip_tenant
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ debug zip --concurrency=1 --cpu-profile-duration=1s /dev/null
[node 1] requesting goroutine dump list... received response... done
[node 1] 0 goroutine dumps found
[node 1] requesting log file ...
[node 1] requesting log file ...
[node 1] requesting log file ...
[node 1] 0 log file ...
[node 1] requesting ranges... received response...
[node 1] requesting ranges: last request failed: rpc error: ...
[node 1] requesting ranges: creating error output: debug/nodes/1/ranges.err.txt... done
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/serverpb/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type SQLStatusServer interface {
TransactionContentionEvents(context.Context, *TransactionContentionEventsRequest) (*TransactionContentionEventsResponse, error)
NodesList(context.Context, *NodesListRequest) (*NodesListResponse, error)
ListExecutionInsights(context.Context, *ListExecutionInsightsRequest) (*ListExecutionInsightsResponse, error)
LogFilesList(context.Context, *LogFilesListRequest) (*LogFilesListResponse, error)
LogFile(context.Context, *LogFileRequest) (*LogEntriesResponse, error)
Logs(context.Context, *LogsRequest) (*LogEntriesResponse, error)
}

// OptionalNodesStatusServer is a StatusServer that is only optionally present
Expand Down
174 changes: 174 additions & 0 deletions pkg/server/tenant_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package server
import (
"context"
"fmt"
"io"
"regexp"
"sort"
"strconv"
"time"
Expand All @@ -42,6 +44,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -1099,6 +1102,177 @@ func (t *tenantStatusServer) Details(
return resp, nil
}

func (t *tenantStatusServer) LogFilesList(
ctx context.Context, req *serverpb.LogFilesListRequest,
) (*serverpb.LogFilesListResponse, error) {
ctx = propagateGatewayMetadata(ctx)
ctx = t.AnnotateCtx(ctx)

if _, err := t.privilegeChecker.requireAdminUser(ctx); err != nil {
// NB: not using serverError() here since the priv checker
// already returns a proper gRPC error status.
return nil, err
}

instanceID, local, err := t.parseInstanceID(req.NodeId)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}
instance, err := t.sqlServer.sqlInstanceProvider.GetInstance(ctx, instanceID)
if err != nil {
return nil, err
}
if !local {
status, err := t.dialPod(ctx, instance.InstanceID, instance.InstanceAddr)
if err != nil {
return nil, serverError(ctx, err)
}
return status.LogFilesList(ctx, req)
}
log.Flush()
logFiles, err := log.ListLogFiles()
if err != nil {
return nil, serverError(ctx, err)
}
return &serverpb.LogFilesListResponse{Files: logFiles}, nil
}

func (t *tenantStatusServer) LogFile(
ctx context.Context, req *serverpb.LogFileRequest,
) (*serverpb.LogEntriesResponse, error) {
ctx = propagateGatewayMetadata(ctx)
ctx = t.AnnotateCtx(ctx)

if _, err := t.privilegeChecker.requireAdminUser(ctx); err != nil {
// NB: not using serverError() here since the priv checker
// already returns a proper gRPC error status.
return nil, err
}

instanceID, local, err := t.parseInstanceID(req.NodeId)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}
instance, err := t.sqlServer.sqlInstanceProvider.GetInstance(ctx, instanceID)
if err != nil {
return nil, err
}
if !local {
status, err := t.dialPod(ctx, instance.InstanceID, instance.InstanceAddr)
if err != nil {
return nil, serverError(ctx, err)
}
return status.LogFile(ctx, req)
}

// Determine how to redact.
inputEditMode := log.SelectEditMode(req.Redact, log.KeepRedactable)

// Ensure that the latest log entries are available in files.
log.Flush()

// Read the logs.
reader, err := log.GetLogReader(req.File)
if err != nil {
return nil, serverError(ctx, errors.Wrapf(err, "log file %q could not be opened", req.File))
}
defer reader.Close()

var resp serverpb.LogEntriesResponse
decoder, err := log.NewEntryDecoder(reader, inputEditMode)
if err != nil {
return nil, serverError(ctx, err)
}
for {
var entry logpb.Entry
if err := decoder.Decode(&entry); err != nil {
if err == io.EOF {
break
}
return nil, serverError(ctx, err)
}
resp.Entries = append(resp.Entries, entry)
}

return &resp, nil
}

func (t *tenantStatusServer) Logs(
ctx context.Context, req *serverpb.LogsRequest,
) (*serverpb.LogEntriesResponse, error) {
ctx = propagateGatewayMetadata(ctx)
ctx = t.AnnotateCtx(ctx)

if _, err := t.privilegeChecker.requireAdminUser(ctx); err != nil {
// NB: not using serverError() here since the priv checker
// already returns a proper gRPC error status.
return nil, err
}

instanceID, local, err := t.parseInstanceID(req.NodeId)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}
instance, err := t.sqlServer.sqlInstanceProvider.GetInstance(ctx, instanceID)
if err != nil {
return nil, err
}
if !local {
status, err := t.dialPod(ctx, instance.InstanceID, instance.InstanceAddr)
if err != nil {
return nil, serverError(ctx, err)
}
return status.Logs(ctx, req)
}

// Determine how to redact.
inputEditMode := log.SelectEditMode(req.Redact, log.KeepRedactable)

// Select the time interval.
startTimestamp, err := parseInt64WithDefault(
req.StartTime,
timeutil.Now().AddDate(0, 0, -1).UnixNano())
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "StartTime could not be parsed: %s", err)
}

endTimestamp, err := parseInt64WithDefault(req.EndTime, timeutil.Now().UnixNano())
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "EndTime could not be parsed: %s", err)
}

if startTimestamp > endTimestamp {
return nil, status.Errorf(codes.InvalidArgument, "StartTime: %d should not be greater than endtime: %d", startTimestamp, endTimestamp)
}

maxEntries, err := parseInt64WithDefault(req.Max, defaultMaxLogEntries)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Max could not be parsed: %s", err)
}
if maxEntries < 1 {
return nil, status.Errorf(codes.InvalidArgument, "Max: %d should be set to a value greater than 0", maxEntries)
}

var regex *regexp.Regexp
if len(req.Pattern) > 0 {
if regex, err = regexp.Compile(req.Pattern); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "regex pattern could not be compiled: %s", err)
}
}

// Ensure that the latest log entries are available in files.
log.Flush()

// Read the logs.
entries, err := log.FetchEntriesFromFiles(
startTimestamp, endTimestamp, int(maxEntries), regex, inputEditMode)
if err != nil {
return nil, serverError(ctx, err)
}

return &serverpb.LogEntriesResponse{Entries: entries}, nil
}

func (t *tenantStatusServer) NodesList(
ctx context.Context, req *serverpb.NodesListRequest,
) (*serverpb.NodesListResponse, error) {
Expand Down

0 comments on commit df15537

Please sign in to comment.