Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: change query ID type from string to Uint128 #17206

Merged
merged 1 commit into from
Jul 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions pkg/sql/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,29 +330,36 @@ type DistSQLPlannerTestingKnobs struct {

// QueryRegistry holds a map of all queries executing with this node as its gateway.
type QueryRegistry struct {
// TODO(peter): Investigate if it is worthwhile to shard based on the query
// ID in order to reduce contention on this lock.
syncutil.Mutex
store map[string]*queryMeta
store map[uint128.Uint128]*queryMeta
}

// MakeQueryRegistry instantiates a new, empty query registry.
func MakeQueryRegistry() *QueryRegistry {
return &QueryRegistry{store: make(map[string]*queryMeta)}
return &QueryRegistry{store: make(map[uint128.Uint128]*queryMeta)}
}

func (r *QueryRegistry) register(queryID string, query *queryMeta) {
func (r *QueryRegistry) register(queryID uint128.Uint128, query *queryMeta) {
r.Lock()
defer r.Unlock()
r.store[queryID] = query
r.Unlock()
}

func (r *QueryRegistry) deregister(queryID string) {
func (r *QueryRegistry) deregister(queryID uint128.Uint128) {
r.Lock()
defer r.Unlock()
delete(r.store, queryID)
r.Unlock()
}

// Cancel looks up the associated query in the registry and cancels it (if it is cancellable).
func (r *QueryRegistry) Cancel(queryID string, username string) (bool, error) {
func (r *QueryRegistry) Cancel(queryIDStr string, username string) (bool, error) {
queryID, err := uint128.FromString(queryIDStr)
if err != nil {
return false, fmt.Errorf("query ID %s malformed: %s", queryID, err)
}

r.Lock()
defer r.Unlock()

Expand Down Expand Up @@ -1084,7 +1091,7 @@ func (e *Executor) execStmtsInCurrentTxn(

txnState.schemaChangers.curStatementIdx = i

queryID := e.generateQueryID().GetString()
queryID := e.generateQueryID()

queryMeta := &queryMeta{
start: session.phaseTimes[sessionEndParse],
Expand All @@ -1100,7 +1107,7 @@ func (e *Executor) execStmtsInCurrentTxn(
queryMeta.ctx = session.Ctx()

session.addActiveQuery(queryID, queryMeta)
defer session.removeActiveQuery(stmt.queryID)
defer session.removeActiveQuery(queryID)
e.cfg.QueryRegistry.register(queryID, queryMeta)
defer e.cfg.QueryRegistry.deregister(queryID)

Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/uint128"
)

// PreparedStatement is a SQL statement that has been parsed and the types
Expand Down Expand Up @@ -61,7 +62,7 @@ func (p *PreparedStatement) close(ctx context.Context, s *Session) {
type Statement struct {
AST parser.Statement
ExpectedTypes sqlbase.ResultColumns
queryID string
queryID uint128.Uint128
queryMeta *queryMeta
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/run_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (n *cancelQueryNode) Start(params runParams) error {

request := &serverpb.CancelQueryRequest{
NodeId: fmt.Sprintf("%d", nodeID),
QueryID: queryID.GetString(),
QueryID: queryIDString,
Username: n.p.session.User,
}

Expand All @@ -76,7 +76,7 @@ func (n *cancelQueryNode) Start(params runParams) error {
}

if !response.Cancelled {
return fmt.Errorf("Could not cancel query %s: %s", queryID.GetString(), response.Error)
return fmt.Errorf("Could not cancel query %s: %s", queryID, response.Error)
}

return nil
Expand Down
13 changes: 7 additions & 6 deletions pkg/sql/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uint128"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

Expand Down Expand Up @@ -318,7 +319,7 @@ type Session struct {
//

// ActiveQueries contains all queries in flight.
ActiveQueries map[string]*queryMeta
ActiveQueries map[uint128.Uint128]*queryMeta
}

//
Expand Down Expand Up @@ -441,7 +442,7 @@ func NewSession(
s.PreparedStatements = makePreparedStatements(s)
s.PreparedPortals = makePreparedPortals(s)
s.Tracing.session = s
s.mu.ActiveQueries = make(map[string]*queryMeta)
s.mu.ActiveQueries = make(map[uint128.Uint128]*queryMeta)

remoteStr := "<admin>"
if remote != nil {
Expand Down Expand Up @@ -649,7 +650,7 @@ func (s *Session) setTestingVerifyMetadata(fn func(config.SystemConfig) error) {

// addActiveQuery adds a running query to the session's internal store of active
// queries. Called from executor's execStmt and execStmtInParallel.
func (s *Session) addActiveQuery(queryID string, queryMeta *queryMeta) {
func (s *Session) addActiveQuery(queryID uint128.Uint128, queryMeta *queryMeta) {
s.mu.Lock()
s.mu.ActiveQueries[queryID] = queryMeta
queryMeta.session = s
Expand All @@ -658,15 +659,15 @@ func (s *Session) addActiveQuery(queryID string, queryMeta *queryMeta) {

// removeActiveQuery removes a query from a session's internal store of active
// queries. Called when a query finishes execution.
func (s *Session) removeActiveQuery(queryID string) {
func (s *Session) removeActiveQuery(queryID uint128.Uint128) {
s.mu.Lock()
delete(s.mu.ActiveQueries, queryID)
s.mu.Unlock()
}

// setQueryExecutionMode is called upon start of execution of a query, and sets
// the query's metadata to indicate whether it's distributed or not.
func (s *Session) setQueryExecutionMode(queryID string, isDistributed bool) {
func (s *Session) setQueryExecutionMode(queryID uint128.Uint128, isDistributed bool) {
s.mu.Lock()
queryMeta := s.mu.ActiveQueries[queryID]
queryMeta.phase = executing
Expand Down Expand Up @@ -708,7 +709,7 @@ func (s *Session) serialize() serverpb.Session {
sql += "…"
}
activeQueries = append(activeQueries, serverpb.ActiveQuery{
ID: id,
ID: id.String(),
Start: query.start.UTC(),
Sql: sql,
IsDistributed: query.isDistributed,
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/uint128/uint128.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func (u Uint128) GetBytes() []byte {
return buf
}

// GetString returns a hexadecimal string representation.
func (u Uint128) GetString() string {
// String returns a hexadecimal string representation.
func (u Uint128) String() string {
return hex.EncodeToString(u.GetBytes())
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/util/uint128/uint128_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestString(t *testing.T) {

i, _ := FromString(s)

if s != i.GetString() {
if s != i.String() {
t.Errorf("incorrect string representation for num: %v", i)
}
}
Expand Down