Skip to content

Commit

Permalink
admin: tenant support jobs endpoints
Browse files Browse the repository at this point in the history
Previously, the Job() and Jobs() endpoints were not implemented on the
tenant admin server as there was not a need and we had split the
implementation into a tenant-only server.

Now that we want to ship the jobs page into CC Console and support it
for multi-tenant, the endpoints for the UI should work as expected.

This PR edits the `jobHelper` and `jobsHelper` helpers to be functions
instead of methods in `adminServer` which allows the implementation to
be shared between the two servers.

Release note: None

Release justification: low risk, high benefit addition to multi-tenant
  • Loading branch information
dhartunian committed Aug 30, 2022
1 parent e318993 commit 61d6af3
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 15 deletions.
3 changes: 3 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ ALL_TESTS = [
"//pkg/ccl/oidcccl:oidcccl_test",
"//pkg/ccl/partitionccl:partitionccl_test",
"//pkg/ccl/schemachangerccl:schemachangerccl_test",
"//pkg/ccl/serverccl/adminccl:adminccl_test",
"//pkg/ccl/serverccl/diagnosticsccl:diagnosticsccl_test",
"//pkg/ccl/serverccl/statusccl:statusccl_test",
"//pkg/ccl/serverccl:serverccl_test",
Expand Down Expand Up @@ -717,6 +718,7 @@ GO_TARGETS = [
"//pkg/ccl/partitionccl:partitionccl_test",
"//pkg/ccl/schemachangerccl:schemachangerccl",
"//pkg/ccl/schemachangerccl:schemachangerccl_test",
"//pkg/ccl/serverccl/adminccl:adminccl_test",
"//pkg/ccl/serverccl/diagnosticsccl:diagnosticsccl_test",
"//pkg/ccl/serverccl/statusccl:statusccl_test",
"//pkg/ccl/serverccl:serverccl",
Expand Down Expand Up @@ -2149,6 +2151,7 @@ GET_X_DATA_TARGETS = [
"//pkg/ccl/partitionccl:get_x_data",
"//pkg/ccl/schemachangerccl:get_x_data",
"//pkg/ccl/serverccl:get_x_data",
"//pkg/ccl/serverccl/adminccl:get_x_data",
"//pkg/ccl/serverccl/diagnosticsccl:get_x_data",
"//pkg/ccl/serverccl/statusccl:get_x_data",
"//pkg/ccl/spanconfigccl/spanconfigkvaccessorccl:get_x_data",
Expand Down
30 changes: 30 additions & 0 deletions pkg/ccl/serverccl/adminccl/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "adminccl_test",
srcs = [
"main_test.go",
"tenant_admin_test.go",
],
deps = [
"//pkg/ccl",
"//pkg/ccl/serverccl",
"//pkg/ccl/utilccl",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/server/serverpb",
"//pkg/spanconfig",
"//pkg/sql/tests",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"@com_github_stretchr_testify//require",
],
)

get_x_data(name = "get_x_data")
34 changes: 34 additions & 0 deletions pkg/ccl/serverccl/adminccl/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package adminccl

import (
"os"
"testing"

_ "github.com/cockroachdb/cockroach/pkg/ccl"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/security/securityassets"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)

func TestMain(m *testing.M) {
defer utilccl.TestingEnableEnterprise()()
securityassets.SetLoader(securitytest.EmbeddedAssets)
randutil.SeedForTests()
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
os.Exit(m.Run())
}

//go:generate ../../../util/leaktest/add-leaktest.sh *_test.go
65 changes: 65 additions & 0 deletions pkg/ccl/serverccl/adminccl/tenant_admin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package adminccl

import (
"context"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/ccl/serverccl"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

func TestTenantAdminAPI(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// The liveness session might expire before the stress race can finish.
skip.UnderStressRace(t, "expensive tests")

ctx := context.Background()

knobs := tests.CreateTestingKnobs()
knobs.SpanConfig = &spanconfig.TestingKnobs{
// Some of these subtests expect multiple (uncoalesced) tenant ranges.
StoreDisableCoalesceAdjacent: true,
}

testHelper := serverccl.NewTestTenantHelper(t, 3 /* tenantClusterSize */, knobs)
defer testHelper.Cleanup(ctx, t)

t.Run("tenant_jobs", func(t *testing.T) {
testJobsRPCs(ctx, t, testHelper)
})
}

func testJobsRPCs(ctx context.Context, t *testing.T, helper serverccl.TenantTestHelper) {
http := helper.TestCluster().TenantAdminHTTPClient(t, 1)
defer http.Close()

_ = helper.TestCluster().TenantConn(1).Exec(t, "CREATE TABLE test (id INT)")
_ = helper.TestCluster().TenantConn(1).Exec(t, "ALTER TABLE test ADD COLUMN name STRING")

jobsResp := serverpb.JobsResponse{}
http.GetJSON("/_admin/v1/jobs", &jobsResp)
require.NotEmpty(t, jobsResp.Jobs)

jobResp := serverpb.JobResponse{}
job := jobsResp.Jobs[0]
http.GetJSON(fmt.Sprintf("/_admin/v1/jobs/%d", job.ID), &jobResp)

require.Equal(t, jobResp.ID, job.ID)
}
45 changes: 30 additions & 15 deletions pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2089,7 +2089,14 @@ func (s *adminServer) Jobs(
return nil, serverError(ctx, err)
}

j, err := s.jobsHelper(ctx, req, userName)
j, err := jobsHelper(
ctx,
req,
userName,
s.server.sqlServer,
&s.server.cfg.BaseConfig,
&s.server.cfg.Settings.SV,
)
if err != nil {
return nil, serverError(ctx, err)
}
Expand All @@ -2098,8 +2105,13 @@ func (s *adminServer) Jobs(

// Note that the function returns plain errors, and it is the caller's
// responsibility to convert them to serverErrors.
func (s *adminServer) jobsHelper(
ctx context.Context, req *serverpb.JobsRequest, userName username.SQLUsername,
func jobsHelper(
ctx context.Context,
req *serverpb.JobsRequest,
userName username.SQLUsername,
sqlServer *SQLServer,
cfg *BaseConfig,
sv *settings.Values,
) (_ *serverpb.JobsResponse, retErr error) {
retryRunningCondition := "status='running' AND next_run > now() AND num_runs > 1"
retryRevertingCondition := "status='reverting' AND next_run > now() AND num_runs > 1"
Expand Down Expand Up @@ -2138,7 +2150,7 @@ func (s *adminServer) jobsHelper(
if req.Limit > 0 {
q.Append(" LIMIT $", tree.DInt(req.Limit))
}
it, err := s.server.sqlServer.internalExecutor.QueryIteratorEx(
it, err := sqlServer.internalExecutor.QueryIteratorEx(
ctx, "admin-jobs", nil, /* txn */
sessiondata.InternalExecutorOverride{User: userName},
q.String(), q.QueryArguments()...,
Expand All @@ -2158,16 +2170,16 @@ func (s *adminServer) jobsHelper(
var resp serverpb.JobsResponse

now := timeutil.Now()
if s.server.cfg.TestingKnobs.Server != nil &&
s.server.cfg.TestingKnobs.Server.(*TestingKnobs).StubTimeNow != nil {
now = s.server.cfg.TestingKnobs.Server.(*TestingKnobs).StubTimeNow()
if cfg.TestingKnobs.Server != nil &&
cfg.TestingKnobs.Server.(*TestingKnobs).StubTimeNow != nil {
now = cfg.TestingKnobs.Server.(*TestingKnobs).StubTimeNow()
}
retentionDuration := func() time.Duration {
if s.server.cfg.TestingKnobs.Server != nil &&
s.server.cfg.TestingKnobs.JobsTestingKnobs.(*jobs.TestingKnobs).IntervalOverrides.RetentionTime != nil {
return *s.server.cfg.TestingKnobs.JobsTestingKnobs.(*jobs.TestingKnobs).IntervalOverrides.RetentionTime
if cfg.TestingKnobs.JobsTestingKnobs != nil &&
cfg.TestingKnobs.JobsTestingKnobs.(*jobs.TestingKnobs).IntervalOverrides.RetentionTime != nil {
return *cfg.TestingKnobs.JobsTestingKnobs.(*jobs.TestingKnobs).IntervalOverrides.RetentionTime
}
return jobs.RetentionTimeSetting.Get(&s.server.st.SV)
return jobs.RetentionTimeSetting.Get(sv)
}
resp.EarliestRetainedTime = now.Add(-retentionDuration())

Expand Down Expand Up @@ -2271,7 +2283,7 @@ func (s *adminServer) Job(
if err != nil {
return nil, serverError(ctx, err)
}
r, err := s.jobHelper(ctx, request, userName)
r, err := jobHelper(ctx, request, userName, s.server.sqlServer)
if err != nil {
return nil, serverError(ctx, err)
}
Expand All @@ -2280,8 +2292,11 @@ func (s *adminServer) Job(

// Note that the function returns plain errors, and it is the caller's
// responsibility to convert them to serverErrors.
func (s *adminServer) jobHelper(
ctx context.Context, request *serverpb.JobRequest, userName username.SQLUsername,
func jobHelper(
ctx context.Context,
request *serverpb.JobRequest,
userName username.SQLUsername,
sqlServer *SQLServer,
) (_ *serverpb.JobResponse, retErr error) {
const query = `
SELECT job_id, job_type, description, statement, user_name, descriptor_ids, status,
Expand All @@ -2291,7 +2306,7 @@ func (s *adminServer) jobHelper(
coordinator_id
FROM crdb_internal.jobs
WHERE job_id = $1`
row, cols, err := s.server.sqlServer.internalExecutor.QueryRowExWithCols(
row, cols, err := sqlServer.internalExecutor.QueryRowExWithCols(
ctx, "admin-job", nil,
sessiondata.InternalExecutorOverride{User: userName},
query,
Expand Down
40 changes: 40 additions & 0 deletions pkg/server/tenant_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,43 @@ func (t *tenantAdminServer) Drain(

return t.drain.handleDrain(ctx, req, stream)
}

func (t *tenantAdminServer) Jobs(
ctx context.Context, req *serverpb.JobsRequest,
) (_ *serverpb.JobsResponse, retErr error) {
ctx = t.AnnotateCtx(ctx)

userName, err := userFromContext(ctx)
if err != nil {
return nil, serverError(ctx, err)
}

j, err := jobsHelper(
ctx,
req,
userName,
t.sqlServer,
t.sqlServer.cfg,
&t.sqlServer.cfg.Settings.SV,
)
if err != nil {
return nil, serverError(ctx, err)
}
return j, nil
}

func (t *tenantAdminServer) Job(
ctx context.Context, request *serverpb.JobRequest,
) (_ *serverpb.JobResponse, retErr error) {
ctx = t.AnnotateCtx(ctx)

userName, err := userFromContext(ctx)
if err != nil {
return nil, serverError(ctx, err)
}
r, err := jobHelper(ctx, request, userName, t.sqlServer)
if err != nil {
return nil, serverError(ctx, err)
}
return r, nil
}

0 comments on commit 61d6af3

Please sign in to comment.