Skip to content

Commit

Permalink
sql,server: exposes TxnIDResolution rpc endpoint
Browse files Browse the repository at this point in the history
This commit introduces the new TxnIDResolution RPC endpoint. This RPC
endpoint utilizes the new txnidcache.Resolves interface to perform
transaction ID resolution.

Partially addresses cockroachdb#74487

Release note: None
  • Loading branch information
Azhng committed Feb 14, 2022
1 parent 8db642f commit a148762
Show file tree
Hide file tree
Showing 18 changed files with 266 additions and 22 deletions.
45 changes: 45 additions & 0 deletions docs/generated/http/full.md
Original file line number Diff line number Diff line change
Expand Up @@ -4278,6 +4278,51 @@ UserSQLRolesResponse returns a list of roles for the logged SQL user.



## TxnIDResolution





Support status: [reserved](#support-status)

#### Request Parameters




Request object for issuing Transaction ID Resolution.


| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| CoordinatorNodeID | [int32](#cockroach.server.serverpb.TxnIDResolutionRequest-int32) | | | [reserved](#support-status) |
| txnIDs | [bytes](#cockroach.server.serverpb.TxnIDResolutionRequest-bytes) | repeated | | [reserved](#support-status) |







#### Response Parameters




Response object for issuing Transaction ID Resolution.


| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| resolvedTxnIDs | [cockroach.sql.contentionpb.ResolvedTxnID](#cockroach.server.serverpb.TxnIDResolutionResponse-cockroach.sql.contentionpb.ResolvedTxnID) | repeated | | [reserved](#support-status) |







## RequestCA

`GET /_join/v1/ca`
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/serverccl/statusccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ go_test(
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
Expand Down
55 changes: 55 additions & 0 deletions pkg/ccl/serverccl/statusccl/tenant_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -82,6 +83,10 @@ func TestTenantStatusAPI(t *testing.T) {
t.Run("index_usage_stats", func(t *testing.T) {
testIndexUsageForTenants(t, testHelper)
})

t.Run("txn_id_resolution", func(t *testing.T) {
testTxnIDResolutionRPC(ctx, t, testHelper)
})
}

func TestTenantCannotSeeNonTenantStats(t *testing.T) {
Expand Down Expand Up @@ -887,3 +892,53 @@ func testTenantStatusCancelQuery(ctx context.Context, t *testing.T, helper *tena
require.Equal(t, false, cancelQueryResp.Canceled)
require.Equal(t, fmt.Sprintf("query ID %s not found", query.ID), cancelQueryResp.Error)
}

// testTxnIDResolutionRPC tests the reachability of TxnIDResolution RPC. The
// underlying implementation correctness is tested within
// pkg/sql/contention/txnidcache.
func testTxnIDResolutionRPC(ctx context.Context, t *testing.T, helper *tenantTestHelper) {
run := func(sqlConn *sqlutils.SQLRunner, status serverpb.SQLStatusServer, coordinatorNodeID int32) {
sqlConn.Exec(t, "SET application_name='test1'")

sqlConn.Exec(t, "BEGIN")
result := sqlConn.QueryStr(t, `
SELECT id
FROM crdb_internal.node_transactions
WHERE application_name = 'test1'`)
require.Equal(t, 1 /* expected */, len(result),
"expected only one active txn, but there are %d active txns found", len(result))
txnID := uuid.FromStringOrNil(result[0][0])
require.False(t, uuid.Nil.Equal(txnID),
"expected a valid txnID, but %+v is found", result)
sqlConn.Exec(t, "COMMIT")

testutils.SucceedsWithin(t, func() error {
resp, err := status.TxnIDResolution(ctx, &serverpb.TxnIDResolutionRequest{
CoordinatorNodeID: coordinatorNodeID,
TxnIDs: []uuid.UUID{txnID},
})
require.NoError(t, err)
if len(resp.ResolvedTxnIDs) != 1 {
return errors.New("txnID not found")
}
require.True(t, resp.ResolvedTxnIDs[0].TxnID.Equal(txnID))
return nil
}, 3*time.Second)
}

t.Run("regular_cluster", func(t *testing.T) {
status :=
helper.hostCluster.Server(0 /* idx */).StatusServer().(serverpb.SQLStatusServer)
sqlConn := helper.hostCluster.ServerConn(0 /* idx */)
run(sqlutils.MakeSQLRunner(sqlConn), status, 1 /* coordinatorNodeID */)
})

t.Run("tenant_cluster", func(t *testing.T) {
// Select a different tenant status server here so a pod-to-pod RPC will
// happen.
status :=
helper.testCluster().tenantStatusSrv(2 /* idx */)
sqlConn := helper.testCluster().tenantConn(0 /* idx */)
run(sqlConn, status, 1 /* coordinatorNodeID */)
})
}
2 changes: 2 additions & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ go_library(
"//pkg/sql/catalog/systemschema",
"//pkg/sql/colexec",
"//pkg/sql/contention",
"//pkg/sql/contention/txnidcache",
"//pkg/sql/contentionpb",
"//pkg/sql/descmetadata",
"//pkg/sql/distsql",
"//pkg/sql/execinfra",
Expand Down
1 change: 1 addition & 0 deletions pkg/server/serverpb/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type SQLStatusServer interface {
ResetIndexUsageStats(context.Context, *ResetIndexUsageStatsRequest) (*ResetIndexUsageStatsResponse, error)
TableIndexStats(context.Context, *TableIndexStatsRequest) (*TableIndexStatsResponse, error)
UserSQLRoles(ctx context.Context, request *UserSQLRolesRequest) (*UserSQLRolesResponse, error)
TxnIDResolution(context.Context, *TxnIDResolutionRequest) (*TxnIDResolutionResponse, error)
}

// OptionalNodesStatusServer is a StatusServer that is only optionally present
Expand Down
20 changes: 20 additions & 0 deletions pkg/server/serverpb/status.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1483,6 +1483,24 @@ message UserSQLRolesResponse {
repeated string roles = 1;
}

// Request object for issuing Transaction ID Resolution.
message TxnIDResolutionRequest {
int32 CoordinatorNodeID = 1 [
(gogoproto.customname) = "CoordinatorNodeID"
];

repeated bytes txnIDs = 2 [
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID",
(gogoproto.nullable) = false
];
}

// Response object for issuing Transaction ID Resolution.
message TxnIDResolutionResponse {
repeated cockroach.sql.contentionpb.ResolvedTxnID resolvedTxnIDs = 1 [
(gogoproto.nullable) = false];
}

service Status {
// Certificates retrieves a copy of the TLS certificates.
rpc Certificates(CertificatesRequest) returns (CertificatesResponse) {
Expand Down Expand Up @@ -1875,4 +1893,6 @@ service Status {
get: "/_status/sqlroles"
};
}

rpc TxnIDResolution(TxnIDResolutionRequest) returns (TxnIDResolutionResponse) {}
}
72 changes: 72 additions & 0 deletions pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catconstants"
"github.com/cockroachdb/cockroach/pkg/sql/contention"
"github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache"
"github.com/cockroachdb/cockroach/pkg/sql/contentionpb"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirecancel"
"github.com/cockroachdb/cockroach/pkg/sql/roleoption"
Expand All @@ -69,6 +71,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
gwruntime "github.com/grpc-ecosystem/grpc-gateway/runtime"
raft "go.etcd.io/etcd/raft/v3"
Expand Down Expand Up @@ -366,6 +369,53 @@ func (b *baseStatusServer) ListLocalDistSQLFlows(
return response, nil
}

func (b *baseStatusServer) localTxnIDResolution(
req *serverpb.TxnIDResolutionRequest,
) *serverpb.TxnIDResolutionResponse {
txnIDCache := b.sqlServer.pgServer.SQLServer.GetTxnIDCache()

unresolvedTxnIDs := make(map[uuid.UUID]struct{}, len(req.TxnIDs))
for _, txnID := range req.TxnIDs {
unresolvedTxnIDs[txnID] = struct{}{}
}

resp := &serverpb.TxnIDResolutionResponse{
ResolvedTxnIDs: make([]contentionpb.ResolvedTxnID, 0, len(req.TxnIDs)),
}

// Optimistically attempt to resolve txn IDs. However, if the optimistic
// attempt fails, we flush the txnID cache and try again.
unresolvedTxnIDs, resp.ResolvedTxnIDs =
resolveTxnIDHelper(txnIDCache, unresolvedTxnIDs, resp.ResolvedTxnIDs)

if len(unresolvedTxnIDs) > 0 {
txnIDCache.Flush()

// If there are any unresolved txn IDs at this point, we can simply
// discard them. The caller will perform retry accordingly based
// on the response we send back.
_, resp.ResolvedTxnIDs =
resolveTxnIDHelper(txnIDCache, unresolvedTxnIDs, resp.ResolvedTxnIDs)
}

return resp
}

func resolveTxnIDHelper(
txnIDCache *txnidcache.Cache, query map[uuid.UUID]struct{}, result []contentionpb.ResolvedTxnID,
) (remaining map[uuid.UUID]struct{}, _ []contentionpb.ResolvedTxnID) {
for txnID := range query {
if txnFingerprintID, found := txnIDCache.Lookup(txnID); found {
delete(query, txnID)
result = append(result, contentionpb.ResolvedTxnID{
TxnID: txnID,
TxnFingerprintID: txnFingerprintID,
})
}
}
return query, result
}

// A statusServer provides a RESTful status API.
type statusServer struct {
*baseStatusServer
Expand Down Expand Up @@ -3027,3 +3077,25 @@ func (s *statusServer) JobStatus(

return &serverpb.JobStatusResponse{Job: res}, nil
}

func (s *statusServer) TxnIDResolution(
ctx context.Context, req *serverpb.TxnIDResolutionRequest,
) (*serverpb.TxnIDResolutionResponse, error) {
ctx = s.AnnotateCtx(propagateGatewayMetadata(ctx))
if _, err := s.privilegeChecker.requireAdminUser(ctx); err != nil {
return nil, err
}

nodeID := roachpb.NodeID(req.CoordinatorNodeID)
local := nodeID == s.gossip.NodeID.Get()
if local {
return s.localTxnIDResolution(req), nil
}

statusClient, err := s.dialNode(ctx, nodeID)
if err != nil {
return nil, err
}

return statusClient.TxnIDResolution(ctx, req)
}
26 changes: 26 additions & 0 deletions pkg/server/tenant_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -1042,3 +1042,29 @@ func (t *tenantStatusServer) NodesList(
}
return &resp, err
}

func (t *tenantStatusServer) TxnIDResolution(
ctx context.Context, req *serverpb.TxnIDResolutionRequest,
) (*serverpb.TxnIDResolutionResponse, error) {
ctx = t.AnnotateCtx(propagateGatewayMetadata(ctx))
if _, err := t.privilegeChecker.requireAdminUser(ctx); err != nil {
return nil, err
}

instanceID := base.SQLInstanceID(req.CoordinatorNodeID)
local := instanceID == t.sqlServer.SQLInstanceID()
if local {
return t.localTxnIDResolution(req), nil
}

instance, err := t.sqlServer.sqlInstanceProvider.GetInstance(ctx, instanceID)
if err != nil {
return nil, err
}
statusClient, err := t.dialPod(ctx, instanceID, instance.InstanceAddr)
if err != nil {
return nil, err
}

return statusClient.TxnIDResolution(ctx, req)
}
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ go_library(
"//pkg/sql/colflow",
"//pkg/sql/contention",
"//pkg/sql/contention/txnidcache",
"//pkg/sql/contentionpb",
"//pkg/sql/covering",
"//pkg/sql/delegate",
"//pkg/sql/distsql",
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache"
"github.com/cockroachdb/cockroach/pkg/sql/contentionpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec/explain"
Expand Down Expand Up @@ -1958,7 +1958,7 @@ func (ex *connExecutor) onTxnRestart(ctx context.Context) {
func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) {
// Transaction fingerprint ID will be available once transaction finishes
// execution.
ex.txnIDCacheWriter.Record(txnidcache.ResolvedTxnID{
ex.txnIDCacheWriter.Record(contentionpb.ResolvedTxnID{
TxnID: txnID,
TxnFingerprintID: roachpb.InvalidTransactionFingerprintID,
})
Expand Down Expand Up @@ -2033,7 +2033,7 @@ func (ex *connExecutor) recordTransactionFinish(
}
ex.metrics.EngineMetrics.SQLTxnLatency.RecordValue(txnTime.Nanoseconds())

ex.txnIDCacheWriter.Record(txnidcache.ResolvedTxnID{
ex.txnIDCacheWriter.Record(contentionpb.ResolvedTxnID{
TxnID: ev.txnID,
TxnFingerprintID: transactionFingerprintID,
})
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/contention/txnidcache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/contention/contentionutils",
"//pkg/sql/contentionpb",
"//pkg/util/encoding",
"//pkg/util/metric",
"//pkg/util/stop",
Expand Down
7 changes: 4 additions & 3 deletions pkg/sql/contention/txnidcache/concurrent_write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import (
"sync"

"github.com/cockroachdb/cockroach/pkg/sql/contention/contentionutils"
"github.com/cockroachdb/cockroach/pkg/sql/contentionpb"
)

const messageBlockSize = 1024

type messageBlock [messageBlockSize]ResolvedTxnID
type messageBlock [messageBlockSize]contentionpb.ResolvedTxnID

var blockPool = &sync.Pool{
New: func() interface{} {
Expand All @@ -27,7 +28,7 @@ var blockPool = &sync.Pool{
}

func (m *messageBlock) isFull() bool {
return m[messageBlockSize-1].valid()
return m[messageBlockSize-1].Valid()
}

// concurrentWriteBuffer is a data structure that optimizes for concurrent
Expand Down Expand Up @@ -71,7 +72,7 @@ func newConcurrentWriteBuffer(sink messageSink) *concurrentWriteBuffer {

// Record records a mapping from txnID to its corresponding transaction
// fingerprint ID. Record is safe to be used concurrently.
func (c *concurrentWriteBuffer) Record(resolvedTxnID ResolvedTxnID) {
func (c *concurrentWriteBuffer) Record(resolvedTxnID contentionpb.ResolvedTxnID) {
c.guard.AtomicWrite(func(writerIdx int64) {
c.guard.msgBlock[writerIdx] = resolvedTxnID
})
Expand Down
Loading

0 comments on commit a148762

Please sign in to comment.