diff --git a/pkg/sql/executor.go b/pkg/sql/executor.go index 1d827b73ebbd..4a963ef18fc6 100644 --- a/pkg/sql/executor.go +++ b/pkg/sql/executor.go @@ -330,29 +330,36 @@ type DistSQLPlannerTestingKnobs struct { // QueryRegistry holds a map of all queries executing with this node as its gateway. type QueryRegistry struct { + // TODO(peter): Investigate if it is worthwhile to shard based on the query + // ID in order to reduce contention on this lock. syncutil.Mutex - store map[string]*queryMeta + store map[uint128.Uint128]*queryMeta } // MakeQueryRegistry instantiates a new, empty query registry. func MakeQueryRegistry() *QueryRegistry { - return &QueryRegistry{store: make(map[string]*queryMeta)} + return &QueryRegistry{store: make(map[uint128.Uint128]*queryMeta)} } -func (r *QueryRegistry) register(queryID string, query *queryMeta) { +func (r *QueryRegistry) register(queryID uint128.Uint128, query *queryMeta) { r.Lock() - defer r.Unlock() r.store[queryID] = query + r.Unlock() } -func (r *QueryRegistry) deregister(queryID string) { +func (r *QueryRegistry) deregister(queryID uint128.Uint128) { r.Lock() - defer r.Unlock() delete(r.store, queryID) + r.Unlock() } // Cancel looks up the associated query in the registry and cancels it (if it is cancellable). -func (r *QueryRegistry) Cancel(queryID string, username string) (bool, error) { +func (r *QueryRegistry) Cancel(queryIDStr string, username string) (bool, error) { + queryID, err := uint128.FromString(queryIDStr) + if err != nil { + return false, fmt.Errorf("query ID %s malformed: %s", queryID, err) + } + r.Lock() defer r.Unlock() @@ -1084,7 +1091,7 @@ func (e *Executor) execStmtsInCurrentTxn( txnState.schemaChangers.curStatementIdx = i - queryID := e.generateQueryID().GetString() + queryID := e.generateQueryID() queryMeta := &queryMeta{ start: session.phaseTimes[sessionEndParse], @@ -1100,7 +1107,7 @@ func (e *Executor) execStmtsInCurrentTxn( queryMeta.ctx = session.Ctx() session.addActiveQuery(queryID, queryMeta) - defer session.removeActiveQuery(stmt.queryID) + defer session.removeActiveQuery(queryID) e.cfg.QueryRegistry.register(queryID, queryMeta) defer e.cfg.QueryRegistry.deregister(queryID) diff --git a/pkg/sql/prepare.go b/pkg/sql/prepare.go index 7a65a9526fd8..111235ce9714 100644 --- a/pkg/sql/prepare.go +++ b/pkg/sql/prepare.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/util/uint128" ) // PreparedStatement is a SQL statement that has been parsed and the types @@ -61,7 +62,7 @@ func (p *PreparedStatement) close(ctx context.Context, s *Session) { type Statement struct { AST parser.Statement ExpectedTypes sqlbase.ResultColumns - queryID string + queryID uint128.Uint128 queryMeta *queryMeta } diff --git a/pkg/sql/run_control.go b/pkg/sql/run_control.go index 87dfcec40b23..d620147ec1d9 100644 --- a/pkg/sql/run_control.go +++ b/pkg/sql/run_control.go @@ -66,7 +66,7 @@ func (n *cancelQueryNode) Start(params runParams) error { request := &serverpb.CancelQueryRequest{ NodeId: fmt.Sprintf("%d", nodeID), - QueryID: queryID.GetString(), + QueryID: queryIDString, Username: n.p.session.User, } @@ -76,7 +76,7 @@ func (n *cancelQueryNode) Start(params runParams) error { } if !response.Cancelled { - return fmt.Errorf("Could not cancel query %s: %s", queryID.GetString(), response.Error) + return fmt.Errorf("Could not cancel query %s: %s", queryID, response.Error) } return nil diff --git a/pkg/sql/session.go b/pkg/sql/session.go index 2fbb0b8824d1..7109417504b5 100644 --- a/pkg/sql/session.go +++ b/pkg/sql/session.go @@ -49,6 +49,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/uint128" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) @@ -318,7 +319,7 @@ type Session struct { // // ActiveQueries contains all queries in flight. - ActiveQueries map[string]*queryMeta + ActiveQueries map[uint128.Uint128]*queryMeta } // @@ -441,7 +442,7 @@ func NewSession( s.PreparedStatements = makePreparedStatements(s) s.PreparedPortals = makePreparedPortals(s) s.Tracing.session = s - s.mu.ActiveQueries = make(map[string]*queryMeta) + s.mu.ActiveQueries = make(map[uint128.Uint128]*queryMeta) remoteStr := "" if remote != nil { @@ -649,7 +650,7 @@ func (s *Session) setTestingVerifyMetadata(fn func(config.SystemConfig) error) { // addActiveQuery adds a running query to the session's internal store of active // queries. Called from executor's execStmt and execStmtInParallel. -func (s *Session) addActiveQuery(queryID string, queryMeta *queryMeta) { +func (s *Session) addActiveQuery(queryID uint128.Uint128, queryMeta *queryMeta) { s.mu.Lock() s.mu.ActiveQueries[queryID] = queryMeta queryMeta.session = s @@ -658,7 +659,7 @@ func (s *Session) addActiveQuery(queryID string, queryMeta *queryMeta) { // removeActiveQuery removes a query from a session's internal store of active // queries. Called when a query finishes execution. -func (s *Session) removeActiveQuery(queryID string) { +func (s *Session) removeActiveQuery(queryID uint128.Uint128) { s.mu.Lock() delete(s.mu.ActiveQueries, queryID) s.mu.Unlock() @@ -666,7 +667,7 @@ func (s *Session) removeActiveQuery(queryID string) { // setQueryExecutionMode is called upon start of execution of a query, and sets // the query's metadata to indicate whether it's distributed or not. -func (s *Session) setQueryExecutionMode(queryID string, isDistributed bool) { +func (s *Session) setQueryExecutionMode(queryID uint128.Uint128, isDistributed bool) { s.mu.Lock() queryMeta := s.mu.ActiveQueries[queryID] queryMeta.phase = executing @@ -708,7 +709,7 @@ func (s *Session) serialize() serverpb.Session { sql += "…" } activeQueries = append(activeQueries, serverpb.ActiveQuery{ - ID: id, + ID: id.String(), Start: query.start.UTC(), Sql: sql, IsDistributed: query.isDistributed, diff --git a/pkg/util/uint128/uint128.go b/pkg/util/uint128/uint128.go index ea0221497dcc..8698ff6a9d7d 100644 --- a/pkg/util/uint128/uint128.go +++ b/pkg/util/uint128/uint128.go @@ -37,8 +37,8 @@ func (u Uint128) GetBytes() []byte { return buf } -// GetString returns a hexadecimal string representation. -func (u Uint128) GetString() string { +// String returns a hexadecimal string representation. +func (u Uint128) String() string { return hex.EncodeToString(u.GetBytes()) } diff --git a/pkg/util/uint128/uint128_test.go b/pkg/util/uint128/uint128_test.go index ce3c48f129e6..3d0d444e7128 100644 --- a/pkg/util/uint128/uint128_test.go +++ b/pkg/util/uint128/uint128_test.go @@ -37,7 +37,7 @@ func TestString(t *testing.T) { i, _ := FromString(s) - if s != i.GetString() { + if s != i.String() { t.Errorf("incorrect string representation for num: %v", i) } }