Skip to content

Commit

Permalink
Merge pull request #17206 from petermattis/pmattis/query-id-type
Browse files Browse the repository at this point in the history
sql: change query ID type from string to Uint128
  • Loading branch information
petermattis authored Jul 25, 2017
2 parents 20b5f0d + b0b53f9 commit 3758800
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 21 deletions.
25 changes: 16 additions & 9 deletions pkg/sql/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,29 +330,36 @@ type DistSQLPlannerTestingKnobs struct {

// QueryRegistry holds a map of all queries executing with this node as its gateway.
type QueryRegistry struct {
// TODO(peter): Investigate if it is worthwhile to shard based on the query
// ID in order to reduce contention on this lock.
syncutil.Mutex
store map[string]*queryMeta
store map[uint128.Uint128]*queryMeta
}

// MakeQueryRegistry instantiates a new, empty query registry.
func MakeQueryRegistry() *QueryRegistry {
return &QueryRegistry{store: make(map[string]*queryMeta)}
return &QueryRegistry{store: make(map[uint128.Uint128]*queryMeta)}
}

func (r *QueryRegistry) register(queryID string, query *queryMeta) {
func (r *QueryRegistry) register(queryID uint128.Uint128, query *queryMeta) {
r.Lock()
defer r.Unlock()
r.store[queryID] = query
r.Unlock()
}

func (r *QueryRegistry) deregister(queryID string) {
func (r *QueryRegistry) deregister(queryID uint128.Uint128) {
r.Lock()
defer r.Unlock()
delete(r.store, queryID)
r.Unlock()
}

// Cancel looks up the associated query in the registry and cancels it (if it is cancellable).
func (r *QueryRegistry) Cancel(queryID string, username string) (bool, error) {
func (r *QueryRegistry) Cancel(queryIDStr string, username string) (bool, error) {
queryID, err := uint128.FromString(queryIDStr)
if err != nil {
return false, fmt.Errorf("query ID %s malformed: %s", queryID, err)
}

r.Lock()
defer r.Unlock()

Expand Down Expand Up @@ -1084,7 +1091,7 @@ func (e *Executor) execStmtsInCurrentTxn(

txnState.schemaChangers.curStatementIdx = i

queryID := e.generateQueryID().GetString()
queryID := e.generateQueryID()

queryMeta := &queryMeta{
start: session.phaseTimes[sessionEndParse],
Expand All @@ -1100,7 +1107,7 @@ func (e *Executor) execStmtsInCurrentTxn(
queryMeta.ctx = session.Ctx()

session.addActiveQuery(queryID, queryMeta)
defer session.removeActiveQuery(stmt.queryID)
defer session.removeActiveQuery(queryID)
e.cfg.QueryRegistry.register(queryID, queryMeta)
defer e.cfg.QueryRegistry.deregister(queryID)

Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/uint128"
)

// PreparedStatement is a SQL statement that has been parsed and the types
Expand Down Expand Up @@ -61,7 +62,7 @@ func (p *PreparedStatement) close(ctx context.Context, s *Session) {
type Statement struct {
AST parser.Statement
ExpectedTypes sqlbase.ResultColumns
queryID string
queryID uint128.Uint128
queryMeta *queryMeta
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/run_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (n *cancelQueryNode) Start(params runParams) error {

request := &serverpb.CancelQueryRequest{
NodeId: fmt.Sprintf("%d", nodeID),
QueryID: queryID.GetString(),
QueryID: queryIDString,
Username: n.p.session.User,
}

Expand All @@ -76,7 +76,7 @@ func (n *cancelQueryNode) Start(params runParams) error {
}

if !response.Cancelled {
return fmt.Errorf("Could not cancel query %s: %s", queryID.GetString(), response.Error)
return fmt.Errorf("Could not cancel query %s: %s", queryID, response.Error)
}

return nil
Expand Down
13 changes: 7 additions & 6 deletions pkg/sql/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uint128"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

Expand Down Expand Up @@ -318,7 +319,7 @@ type Session struct {
//

// ActiveQueries contains all queries in flight.
ActiveQueries map[string]*queryMeta
ActiveQueries map[uint128.Uint128]*queryMeta
}

//
Expand Down Expand Up @@ -441,7 +442,7 @@ func NewSession(
s.PreparedStatements = makePreparedStatements(s)
s.PreparedPortals = makePreparedPortals(s)
s.Tracing.session = s
s.mu.ActiveQueries = make(map[string]*queryMeta)
s.mu.ActiveQueries = make(map[uint128.Uint128]*queryMeta)

remoteStr := "<admin>"
if remote != nil {
Expand Down Expand Up @@ -649,7 +650,7 @@ func (s *Session) setTestingVerifyMetadata(fn func(config.SystemConfig) error) {

// addActiveQuery adds a running query to the session's internal store of active
// queries. Called from executor's execStmt and execStmtInParallel.
func (s *Session) addActiveQuery(queryID string, queryMeta *queryMeta) {
func (s *Session) addActiveQuery(queryID uint128.Uint128, queryMeta *queryMeta) {
s.mu.Lock()
s.mu.ActiveQueries[queryID] = queryMeta
queryMeta.session = s
Expand All @@ -658,15 +659,15 @@ func (s *Session) addActiveQuery(queryID string, queryMeta *queryMeta) {

// removeActiveQuery removes a query from a session's internal store of active
// queries. Called when a query finishes execution.
func (s *Session) removeActiveQuery(queryID string) {
func (s *Session) removeActiveQuery(queryID uint128.Uint128) {
s.mu.Lock()
delete(s.mu.ActiveQueries, queryID)
s.mu.Unlock()
}

// setQueryExecutionMode is called upon start of execution of a query, and sets
// the query's metadata to indicate whether it's distributed or not.
func (s *Session) setQueryExecutionMode(queryID string, isDistributed bool) {
func (s *Session) setQueryExecutionMode(queryID uint128.Uint128, isDistributed bool) {
s.mu.Lock()
queryMeta := s.mu.ActiveQueries[queryID]
queryMeta.phase = executing
Expand Down Expand Up @@ -708,7 +709,7 @@ func (s *Session) serialize() serverpb.Session {
sql += "…"
}
activeQueries = append(activeQueries, serverpb.ActiveQuery{
ID: id,
ID: id.String(),
Start: query.start.UTC(),
Sql: sql,
IsDistributed: query.isDistributed,
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/uint128/uint128.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func (u Uint128) GetBytes() []byte {
return buf
}

// GetString returns a hexadecimal string representation.
func (u Uint128) GetString() string {
// String returns a hexadecimal string representation.
func (u Uint128) String() string {
return hex.EncodeToString(u.GetBytes())
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/util/uint128/uint128_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestString(t *testing.T) {

i, _ := FromString(s)

if s != i.GetString() {
if s != i.String() {
t.Errorf("incorrect string representation for num: %v", i)
}
}
Expand Down

0 comments on commit 3758800

Please sign in to comment.