From 148a9bdc7c14011fab65c2d5ca16f39710dca073 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Sat, 2 Jul 2016 11:58:00 +0000 Subject: [PATCH] sql: preliminary mechanism to track and limit SQL memory usage. This patch instruments SQL execution to track memory allocations whose amounts depend on user input and current database contents (as opposed to allocations dependent on other parameters e.g. statement size). It does so by introducing a new MemoryUsageMonitor object intended to be instantiated once per session. It is set up and teared down with the lifecycle of a session. Components can then link and report their memory usage to the monitor via span objects. Spans can gate incremental allocations and keep track of the cumulative allocations so that all can be released at once. This is used to track and limits allocations: - in `valueNode`, - buckets in `groupNode`, - sorted data in `sortNode`, - temp results in `joinNode`, - seen prefixes and suffixes in `distinctNode`, - the Result arrays in Executor, - prepared statements and prepared portals in pgwire. A moderate amount of intelligence is implemented so as to avoid computing sizes for every column of every row in a valueNode - the combined size of all fixed-size columns is precomputed and counted at once for every row. This patch does not track the memory allocated for write intents in the KV transaction object. For troubleshooting the following mechanisms are provided: - an env var COCKROACH_NOTEWORTHY_MEMORY_USAGE, if set, defines the minimum total allocated size for a monitor before it starts logging how much memory it is consuming. - detailed allocation progress is logged at level 2 or above. To trace just SQL activity and memory allocation, one can use for example `--vmodule=executor=2,mem_usage=2`. --- server/admin.go | 87 +++++++++++++------- server/admin_test.go | 26 +++++- sql/alter_table.go | 3 +- sql/copy.go | 30 +++++-- sql/create.go | 40 ++++++--- sql/data_source.go | 10 +-- sql/delete.go | 6 +- sql/dist_sql_node.go | 11 ++- sql/distinct.go | 52 ++++++++++-- sql/drop.go | 9 +- sql/empty.go | 3 +- sql/executor.go | 56 +++++++++---- sql/explain.go | 102 ++++++++++++++--------- sql/group.go | 79 ++++++++++++------ sql/group_test.go | 3 +- sql/information_schema.go | 46 +++++++---- sql/insert.go | 12 +-- sql/join.go | 7 +- sql/limit.go | 6 +- sql/mon/account.go | 109 +++++++++++++++++++++++++ sql/mon/account_test.go | 80 ++++++++++++++++++ sql/mon/mem_usage.go | 136 +++++++++++++++++++++++++++++++ sql/mon/mem_usage_test.go | 64 +++++++++++++++ sql/ordering.go | 4 +- sql/parser/datum.go | 78 ++++++++++++++++++ sql/pgwire/v3.go | 21 +++-- sql/plan.go | 6 +- sql/planner.go | 6 ++ sql/prepare.go | 81 +++++++++++++++--- sql/returning.go | 4 +- sql/row_container.go | 167 ++++++++++++++++++++++++++++++++++++++ sql/scan.go | 6 +- sql/select.go | 13 +-- sql/select_top.go | 7 +- sql/session.go | 8 ++ sql/session_mem_usage.go | 91 +++++++++++++++++++++ sql/show.go | 160 ++++++++++++++++++++++-------------- sql/sort.go | 67 +++++++++++---- sql/split.go | 5 +- sql/subquery.go | 10 ++- sql/table_join.go | 52 ++++++++---- sql/testdata/explain | 4 +- sql/trace.go | 39 ++++++--- sql/union.go | 15 +++- sql/update.go | 6 +- sql/values.go | 54 +++++++----- sql/values_test.go | 4 + sql/virtual_schema.go | 12 +-- util/roundup2.go | 30 +++++++ util/smalltrace.go | 47 +++++++++++ util/smalltrace_test.go | 37 +++++++++ 51 files changed, 1663 insertions(+), 348 deletions(-) create mode 100644 sql/mon/account.go create mode 100644 sql/mon/account_test.go create mode 100644 sql/mon/mem_usage.go create mode 100644 sql/mon/mem_usage_test.go create mode 100644 sql/row_container.go create mode 100644 sql/session_mem_usage.go create mode 100644 util/roundup2.go create mode 100644 util/smalltrace.go create mode 100644 util/smalltrace_test.go diff --git a/server/admin.go b/server/admin.go index 1575adef16e9..b60428237f9d 100644 --- a/server/admin.go +++ b/server/admin.go @@ -165,16 +165,20 @@ func (s *adminServer) Databases( ) (*serverpb.DatabasesResponse, error) { args := sql.SessionArgs{User: s.getUser(req)} session := sql.NewSession(ctx, args, s.server.sqlExecutor, nil) + defer session.Finish() + r := s.server.sqlExecutor.ExecuteStatements(session, "SHOW DATABASES;", nil) + defer r.Close() if err := s.checkQueryResults(r.ResultList, 1); err != nil { return nil, s.serverError(err) } var resp serverpb.DatabasesResponse - for _, row := range r.ResultList[0].Rows { - dbname, ok := row.Values[0].(*parser.DString) + for i, nRows := 0, r.ResultList[0].Rows.Len(); i < nRows; i++ { + row := r.ResultList[0].Rows.At(i) + dbname, ok := row[0].(*parser.DString) if !ok { - return nil, s.serverErrorf("type assertion failed on db name: %T", row.Values[0]) + return nil, s.serverErrorf("type assertion failed on db name: %T", row[0]) } resp.Databases = append(resp.Databases, string(*dbname)) } @@ -189,6 +193,7 @@ func (s *adminServer) DatabaseDetails( ) (*serverpb.DatabaseDetailsResponse, error) { args := sql.SessionArgs{User: s.getUser(req)} session := sql.NewSession(ctx, args, s.server.sqlExecutor, nil) + defer session.Finish() // Placeholders don't work with SHOW statements, so we need to manually // escape the database name. @@ -197,6 +202,7 @@ func (s *adminServer) DatabaseDetails( escDBName := parser.Name(req.Database).String() query := fmt.Sprintf("SHOW GRANTS ON DATABASE %s; SHOW TABLES FROM %s;", escDBName, escDBName) r := s.server.sqlExecutor.ExecuteStatements(session, query, nil) + defer r.Close() if err := s.firstNotFoundError(r.ResultList); err != nil { return nil, grpc.Errorf(codes.NotFound, "%s", err) } @@ -213,7 +219,8 @@ func (s *adminServer) DatabaseDetails( ) scanner := makeResultScanner(r.ResultList[0].Columns) - for _, row := range r.ResultList[0].Rows { + for i, nRows := 0, r.ResultList[0].Rows.Len(); i < nRows; i++ { + row := r.ResultList[0].Rows.At(i) // Marshal grant, splitting comma-separated privileges into a proper slice. var grant serverpb.DatabaseDetailsResponse_Grant var privileges string @@ -235,7 +242,8 @@ func (s *adminServer) DatabaseDetails( if a, e := len(r.ResultList[1].Columns), 1; a != e { return nil, s.serverErrorf("show tables columns mismatch: %d != expected %d", a, e) } - for _, row := range r.ResultList[1].Rows { + for i, nRows := 0, r.ResultList[1].Rows.Len(); i < nRows; i++ { + row := r.ResultList[1].Rows.At(i) var tableName string if err := scanner.Scan(row, tableCol, &tableName); err != nil { return nil, err @@ -254,6 +262,7 @@ func (s *adminServer) TableDetails( ) (*serverpb.TableDetailsResponse, error) { args := sql.SessionArgs{User: s.getUser(req)} session := sql.NewSession(ctx, args, s.server.sqlExecutor, nil) + defer session.Finish() // TODO(cdo): Use real placeholders for the table and database names when we've extended our SQL // grammar to allow that. @@ -263,6 +272,7 @@ func (s *adminServer) TableDetails( query := fmt.Sprintf("SHOW COLUMNS FROM %s; SHOW INDEX FROM %s; SHOW GRANTS ON TABLE %s; SHOW CREATE TABLE %s;", escQualTable, escQualTable, escQualTable, escQualTable) r := s.server.sqlExecutor.ExecuteStatements(session, query, nil) + defer r.Close() if err := s.firstNotFoundError(r.ResultList); err != nil { return nil, grpc.Errorf(codes.NotFound, "%s", err) } @@ -286,7 +296,8 @@ func (s *adminServer) TableDetails( defaultCol = "Default" ) scanner := makeResultScanner(r.ResultList[0].Columns) - for _, row := range r.ResultList[0].Rows { + for i, nRows := 0, r.ResultList[0].Rows.Len(); i < nRows; i++ { + row := r.ResultList[0].Rows.At(i) var col serverpb.TableDetailsResponse_Column if err := scanner.Scan(row, fieldCol, &col.Name); err != nil { return nil, err @@ -321,7 +332,8 @@ func (s *adminServer) TableDetails( storingCol = "Storing" ) scanner := makeResultScanner(r.ResultList[1].Columns) - for _, row := range r.ResultList[1].Rows { + for i, nRows := 0, r.ResultList[1].Rows.Len(); i < nRows; i++ { + row := r.ResultList[1].Rows.At(i) // Marshal grant, splitting comma-separated privileges into a proper slice. var index serverpb.TableDetailsResponse_Index if err := scanner.Scan(row, nameCol, &index.Name); err != nil { @@ -353,7 +365,8 @@ func (s *adminServer) TableDetails( privilegesCol = "Privileges" ) scanner := makeResultScanner(r.ResultList[2].Columns) - for _, row := range r.ResultList[2].Rows { + for i, nRows := 0, r.ResultList[2].Rows.Len(); i < nRows; i++ { + row := r.ResultList[2].Rows.At(i) // Marshal grant, splitting comma-separated privileges into a proper slice. var grant serverpb.TableDetailsResponse_Grant var privileges string @@ -372,13 +385,13 @@ func (s *adminServer) TableDetails( { const createTableCol = "CreateTable" showResult := r.ResultList[3] - if len(showResult.Rows) != 1 { + if showResult.Rows.Len() != 1 { return nil, s.serverErrorf("CreateTable response not available.") } scanner := makeResultScanner(showResult.Columns) var createStmt string - if err := scanner.Scan(showResult.Rows[0], createTableCol, &createStmt); err != nil { + if err := scanner.Scan(showResult.Rows.At(0), createTableCol, &createStmt); err != nil { return nil, err } @@ -565,15 +578,19 @@ func (s *adminServer) TableStats(ctx context.Context, req *serverpb.TableStatsRe func (s *adminServer) Users(ctx context.Context, req *serverpb.UsersRequest) (*serverpb.UsersResponse, error) { args := sql.SessionArgs{User: s.getUser(req)} session := sql.NewSession(ctx, args, s.server.sqlExecutor, nil) + defer session.Finish() + query := "SELECT username FROM system.users" r := s.server.sqlExecutor.ExecuteStatements(session, query, nil) + defer r.Close() if err := s.checkQueryResults(r.ResultList, 1); err != nil { return nil, s.serverError(err) } var resp serverpb.UsersResponse - for _, row := range r.ResultList[0].Rows { - resp.Users = append(resp.Users, serverpb.UsersResponse_User{Username: string(*row.Values[0].(*parser.DString))}) + for i, nRows := 0, r.ResultList[0].Rows.Len(); i < nRows; i++ { + row := r.ResultList[0].Rows.At(i) + resp.Users = append(resp.Users, serverpb.UsersResponse_User{Username: string(*row[0].(*parser.DString))}) } return &resp, nil } @@ -586,6 +603,7 @@ func (s *adminServer) Users(ctx context.Context, req *serverpb.UsersRequest) (*s func (s *adminServer) Events(ctx context.Context, req *serverpb.EventsRequest) (*serverpb.EventsResponse, error) { args := sql.SessionArgs{User: s.getUser(req)} session := sql.NewSession(ctx, args, s.server.sqlExecutor, nil) + defer session.Finish() // Execute the query. q := makeSQLQuery() @@ -604,6 +622,7 @@ func (s *adminServer) Events(ctx context.Context, req *serverpb.EventsRequest) ( return nil, s.serverErrors(q.Errors()) } r := s.server.sqlExecutor.ExecuteStatements(session, q.String(), q.QueryArguments()) + defer r.Close() if err := s.checkQueryResults(r.ResultList, 1); err != nil { return nil, s.serverError(err) } @@ -611,7 +630,8 @@ func (s *adminServer) Events(ctx context.Context, req *serverpb.EventsRequest) ( // Marshal response. var resp serverpb.EventsResponse scanner := makeResultScanner(r.ResultList[0].Columns) - for _, row := range r.ResultList[0].Rows { + for i, nRows := 0, r.ResultList[0].Rows.Len(); i < nRows; i++ { + row := r.ResultList[0].Rows.At(i) var event serverpb.EventsResponse_Event var ts time.Time if err := scanner.ScanIndex(row, 0, &ts); err != nil { @@ -660,24 +680,26 @@ func (s *adminServer) getUIData(session *sql.Session, user string, keys []string return nil, s.serverErrorf("error constructing query: %v", err) } r := s.server.sqlExecutor.ExecuteStatements(session, query.String(), query.QueryArguments()) + defer r.Close() if err := s.checkQueryResults(r.ResultList, 1); err != nil { return nil, s.serverError(err) } // Marshal results. resp := serverpb.GetUIDataResponse{KeyValues: make(map[string]serverpb.GetUIDataResponse_Value)} - for _, row := range r.ResultList[0].Rows { - dKey, ok := row.Values[0].(*parser.DString) + for i, nRows := 0, r.ResultList[0].Rows.Len(); i < nRows; i++ { + row := r.ResultList[0].Rows.At(i) + dKey, ok := row[0].(*parser.DString) if !ok { - return nil, s.serverErrorf("unexpected type for UI key: %T", row.Values[0]) + return nil, s.serverErrorf("unexpected type for UI key: %T", row[0]) } - dValue, ok := row.Values[1].(*parser.DBytes) + dValue, ok := row[1].(*parser.DBytes) if !ok { - return nil, s.serverErrorf("unexpected type for UI value: %T", row.Values[1]) + return nil, s.serverErrorf("unexpected type for UI value: %T", row[1]) } - dLastUpdated, ok := row.Values[2].(*parser.DTimestamp) + dLastUpdated, ok := row[2].(*parser.DTimestamp) if !ok { - return nil, s.serverErrorf("unexpected type for UI lastUpdated: %T", row.Values[2]) + return nil, s.serverErrorf("unexpected type for UI lastUpdated: %T", row[2]) } resp.KeyValues[string(*dKey)] = serverpb.GetUIDataResponse_Value{ @@ -697,11 +719,13 @@ func (s *adminServer) SetUIData(ctx context.Context, req *serverpb.SetUIDataRequ args := sql.SessionArgs{User: s.getUser(req)} session := sql.NewSession(ctx, args, s.server.sqlExecutor, nil) + defer session.Finish() for key, val := range req.KeyValues { // Do an upsert of the key. We update each key in a separate transaction to // avoid long-running transactions and possible deadlocks. br := s.server.sqlExecutor.ExecuteStatements(session, "BEGIN;", nil) + defer br.Close() if err := s.checkQueryResults(br.ResultList, 1); err != nil { return nil, s.serverError(err) } @@ -720,6 +744,7 @@ func (s *adminServer) SetUIData(ctx context.Context, req *serverpb.SetUIDataRequ qargs.SetValue(`1`, parser.NewDString(string(val))) qargs.SetValue(`2`, parser.NewDString(key)) r := s.server.sqlExecutor.ExecuteStatements(session, query, qargs) + defer r.Close() if err := s.checkQueryResults(r.ResultList, 2); err != nil { return nil, s.serverError(err) } @@ -732,6 +757,7 @@ func (s *adminServer) SetUIData(ctx context.Context, req *serverpb.SetUIDataRequ qargs.SetValue(`1`, parser.NewDString(key)) qargs.SetValue(`2`, parser.NewDBytes(parser.DBytes(val))) r := s.server.sqlExecutor.ExecuteStatements(session, query, qargs) + defer r.Close() if err := s.checkQueryResults(r.ResultList, 2); err != nil { return nil, s.serverError(err) } @@ -753,6 +779,7 @@ func (s *adminServer) SetUIData(ctx context.Context, req *serverpb.SetUIDataRequ func (s *adminServer) GetUIData(ctx context.Context, req *serverpb.GetUIDataRequest) (*serverpb.GetUIDataResponse, error) { args := sql.SessionArgs{User: s.getUser(req)} session := sql.NewSession(ctx, args, s.server.sqlExecutor, nil) + defer session.Finish() if len(req.Keys) == 0 { return nil, grpc.Errorf(codes.InvalidArgument, "keys cannot be empty") @@ -1123,17 +1150,17 @@ func makeResultScanner(cols []sql.ResultColumn) resultScanner { // IsNull returns whether the specified column of the given row contains // a SQL NULL value. -func (rs resultScanner) IsNull(row sql.ResultRow, col string) (bool, error) { +func (rs resultScanner) IsNull(row parser.DTuple, col string) (bool, error) { idx, ok := rs.colNameToIdx[col] if !ok { return false, errors.Errorf("result is missing column %s", col) } - return row.Values[idx] == parser.DNull, nil + return row[idx] == parser.DNull, nil } // ScanIndex scans the given column index of the given row into dst. -func (rs resultScanner) ScanIndex(row sql.ResultRow, index int, dst interface{}) error { - src := row.Values[index] +func (rs resultScanner) ScanIndex(row parser.DTuple, index int, dst interface{}) error { + src := row[index] switch d := dst.(type) { case *string: @@ -1195,7 +1222,7 @@ func (rs resultScanner) ScanIndex(row sql.ResultRow, index int, dst interface{}) } // Scan scans the column with the given name from the given row into dst. -func (rs resultScanner) Scan(row sql.ResultRow, colName string, dst interface{}) error { +func (rs resultScanner) Scan(row parser.DTuple, colName string, dst interface{}) error { idx, ok := rs.colNameToIdx[colName] if !ok { return errors.Errorf("result is missing column %s", colName) @@ -1222,18 +1249,19 @@ func (s *adminServer) queryZone( params := parser.NewPlaceholderInfo() params.SetValue(`1`, parser.NewDInt(parser.DInt(id))) r := s.server.sqlExecutor.ExecuteStatements(session, query, params) + defer r.Close() if err := s.checkQueryResults(r.ResultList, 1); err != nil { return config.ZoneConfig{}, false, err } result := r.ResultList[0] - if len(result.Rows) == 0 { + if result.Rows.Len() == 0 { return config.ZoneConfig{}, false, nil } var zoneBytes []byte scanner := resultScanner{} - err := scanner.ScanIndex(result.Rows[0], 0, &zoneBytes) + err := scanner.ScanIndex(result.Rows.At(0), 0, &zoneBytes) if err != nil { return config.ZoneConfig{}, false, err } @@ -1270,18 +1298,19 @@ func (s *adminServer) queryNamespaceID( params.SetValue(`1`, parser.NewDInt(parser.DInt(parentID))) params.SetValue(`2`, parser.NewDString(name)) r := s.server.sqlExecutor.ExecuteStatements(session, query, params) + defer r.Close() if err := s.checkQueryResults(r.ResultList, 1); err != nil { return 0, err } result := r.ResultList[0] - if len(result.Rows) == 0 { + if result.Rows.Len() == 0 { return 0, errors.Errorf("namespace %s with ParentID %d not found", name, parentID) } var id int64 scanner := resultScanner{} - err := scanner.ScanIndex(result.Rows[0], 0, &id) + err := scanner.ScanIndex(result.Rows.At(0), 0, &id) if err != nil { return 0, err } diff --git a/server/admin_test.go b/server/admin_test.go index 53209efc97b6..8ccaf923ce27 100644 --- a/server/admin_test.go +++ b/server/admin_test.go @@ -224,8 +224,12 @@ func TestAdminAPIDatabases(t *testing.T) { const testdb = "test" session := sql.NewSession( context.Background(), sql.SessionArgs{User: security.RootUser}, ts.sqlExecutor, nil) + defer session.Finish() + query := "CREATE DATABASE " + testdb createRes := ts.sqlExecutor.ExecuteStatements(session, query, nil) + defer createRes.Close() + if createRes.ResultList[0].Err != nil { t.Fatal(createRes.ResultList[0].Err) } @@ -255,6 +259,7 @@ func TestAdminAPIDatabases(t *testing.T) { testuser := "testuser" grantQuery := "GRANT " + strings.Join(privileges, ", ") + " ON DATABASE " + testdb + " TO " + testuser grantRes := s.(*TestServer).sqlExecutor.ExecuteStatements(session, grantQuery, nil) + defer grantRes.Close() if grantRes.ResultList[0].Err != nil { t.Fatal(grantRes.ResultList[0].Err) } @@ -349,6 +354,8 @@ func TestAdminAPITableDetails(t *testing.T) { session := sql.NewSession( context.Background(), sql.SessionArgs{User: security.RootUser}, ts.sqlExecutor, nil) + defer session.Finish() + setupQueries := []string{ "CREATE DATABASE test", ` @@ -365,6 +372,7 @@ CREATE TABLE test.tbl ( for _, q := range setupQueries { res := ts.sqlExecutor.ExecuteStatements(session, q, nil) + defer res.Close() if res.ResultList[0].Err != nil { t.Fatalf("error executing '%s': %s", q, res.ResultList[0].Err) } @@ -443,6 +451,7 @@ CREATE TABLE test.tbl ( ) resSet := ts.sqlExecutor.ExecuteStatements(session, showCreateTableQuery, nil) + defer resSet.Close() res := resSet.ResultList[0] if res.Err != nil { t.Fatalf("error executing '%s': %s", showCreateTableQuery, res.Err) @@ -450,7 +459,7 @@ CREATE TABLE test.tbl ( scanner := makeResultScanner(res.Columns) var createStmt string - if err := scanner.Scan(res.Rows[0], createTableCol, &createStmt); err != nil { + if err := scanner.Scan(res.Rows.At(0), createTableCol, &createStmt); err != nil { t.Fatal(err) } @@ -515,7 +524,10 @@ func TestAdminAPITableDetailsForVirtualSchema(t *testing.T) { session := sql.NewSession( context.Background(), sql.SessionArgs{User: security.RootUser}, ts.sqlExecutor, nil) + defer session.Finish() + resSet := ts.sqlExecutor.ExecuteStatements(session, showCreateTableQuery, nil) + defer resSet.Close() res := resSet.ResultList[0] if res.Err != nil { t.Fatalf("error executing '%s': %s", showCreateTableQuery, res.Err) @@ -523,7 +535,7 @@ func TestAdminAPITableDetailsForVirtualSchema(t *testing.T) { scanner := makeResultScanner(res.Columns) var createStmt string - if err := scanner.Scan(res.Rows[0], createTableCol, &createStmt); err != nil { + if err := scanner.Scan(res.Rows.At(0), createTableCol, &createStmt); err != nil { t.Fatal(err) } @@ -542,12 +554,15 @@ func TestAdminAPITableDetailsZone(t *testing.T) { // Create database and table. session := sql.NewSession( context.Background(), sql.SessionArgs{User: security.RootUser}, ts.sqlExecutor, nil) + defer session.Finish() + setupQueries := []string{ "CREATE DATABASE test", "CREATE TABLE test.tbl (val STRING)", } for _, q := range setupQueries { res := ts.sqlExecutor.ExecuteStatements(session, q, nil) + defer res.Close() if res.ResultList[0].Err != nil { t.Fatalf("error executing '%s': %s", q, res.ResultList[0].Err) } @@ -581,6 +596,7 @@ func TestAdminAPITableDetailsZone(t *testing.T) { params.SetValue(`1`, parser.NewDInt(parser.DInt(id))) params.SetValue(`2`, parser.NewDBytes(parser.DBytes(zoneBytes))) res := ts.sqlExecutor.ExecuteStatements(session, query, params) + defer res.Close() if res.ResultList[0].Err != nil { t.Fatalf("error executing '%s': %s", query, res.ResultList[0].Err) } @@ -620,10 +636,13 @@ func TestAdminAPIUsers(t *testing.T) { // Create sample users. session := sql.NewSession( context.Background(), sql.SessionArgs{User: security.RootUser}, ts.sqlExecutor, nil) + defer session.Finish() + query := ` INSERT INTO system.users (username, hashedPassword) VALUES ('admin', 'abc'), ('bob', 'xyz')` res := ts.sqlExecutor.ExecuteStatements(session, query, nil) + defer res.Close() if a, e := len(res.ResultList), 1; a != e { t.Fatalf("len(results) %d != %d", a, e) } else if res.ResultList[0].Err != nil { @@ -659,6 +678,8 @@ func TestAdminAPIEvents(t *testing.T) { session := sql.NewSession( context.Background(), sql.SessionArgs{User: security.RootUser}, ts.sqlExecutor, nil) + defer session.Finish() + setupQueries := []string{ "CREATE DATABASE api_test", "CREATE TABLE api_test.tbl1 (a INT)", @@ -669,6 +690,7 @@ func TestAdminAPIEvents(t *testing.T) { } for _, q := range setupQueries { res := ts.sqlExecutor.ExecuteStatements(session, q, nil) + defer res.Close() if res.ResultList[0].Err != nil { t.Fatalf("error executing '%s': %s", q, res.ResultList[0].Err) } diff --git a/sql/alter_table.go b/sql/alter_table.go index a98c16ad019b..b1edacc244ed 100644 --- a/sql/alter_table.go +++ b/sql/alter_table.go @@ -290,7 +290,8 @@ func (n *alterTableNode) Start() error { } func (n *alterTableNode) Next() (bool, error) { return false, nil } -func (n *alterTableNode) Columns() []ResultColumn { return make([]ResultColumn, 0) } +func (n *alterTableNode) Close() {} +func (n *alterTableNode) Columns() ResultColumns { return make(ResultColumns, 0) } func (n *alterTableNode) Ordering() orderingInfo { return orderingInfo{} } func (n *alterTableNode) Values() parser.DTuple { return parser.DTuple{} } func (n *alterTableNode) DebugValues() debugValues { return debugValues{} } diff --git a/sql/copy.go b/sql/copy.go index cbed0728a581..ee87c42d843d 100644 --- a/sql/copy.go +++ b/sql/copy.go @@ -20,7 +20,9 @@ import ( "bytes" "fmt" "time" + "unsafe" + "github.com/cockroachdb/cockroach/sql/mon" "github.com/cockroachdb/cockroach/sql/parser" "github.com/cockroachdb/cockroach/sql/privilege" ) @@ -42,12 +44,13 @@ type copyNode struct { p *planner table parser.TableExpr columns parser.UnresolvedNames - resultColumns []ResultColumn + resultColumns ResultColumns buf bytes.Buffer rows []*parser.Tuple + rowsMemAcc mon.MemoryAccount } -func (n *copyNode) Columns() []ResultColumn { return n.resultColumns } +func (n *copyNode) Columns() ResultColumns { return n.resultColumns } func (*copyNode) Ordering() orderingInfo { return orderingInfo{} } func (*copyNode) Values() parser.DTuple { return nil } func (*copyNode) ExplainTypes(_ func(string, string)) {} @@ -56,6 +59,10 @@ func (*copyNode) MarkDebug(_ explainMode) {} func (*copyNode) expandPlan() error { return nil } func (*copyNode) Next() (bool, error) { return false, nil } +func (n *copyNode) Close() { + n.p.session.CloseAccount(&n.rowsMemAcc) +} + func (*copyNode) ExplainPlan(_ bool) (name, description string, children []planNode) { return "copy", "-", nil } @@ -89,15 +96,16 @@ func (p *planner) CopyFrom(n *parser.CopyFrom, autoCommit bool) (planNode, error if err != nil { return nil, err } - cn.resultColumns = make([]ResultColumn, len(cols)) + cn.resultColumns = make(ResultColumns, len(cols)) for i, c := range cols { cn.resultColumns[i] = ResultColumn{Typ: c.Type.ToDatumType()} } cn.p = p + p.session.OpenAccount(&cn.rowsMemAcc) return cn, nil } -// Start +// Start implements the planNode interface. func (n *copyNode) Start() error { // Should never happen because the executor prevents non-COPY messages during // a COPY. @@ -235,9 +243,20 @@ func (n *copyNode) addRow(line []byte) error { if err != nil { return err } + + sz, _ := d.Size() + if err := n.p.session.GrowAccount(&n.rowsMemAcc, int64(sz)); err != nil { + return err + } + exprs[i] = d } - n.rows = append(n.rows, &parser.Tuple{Exprs: exprs}) + tuple := &parser.Tuple{Exprs: exprs} + if err := n.p.session.GrowAccount(&n.rowsMemAcc, int64(unsafe.Sizeof(*tuple))); err != nil { + return err + } + + n.rows = append(n.rows, tuple) return nil } @@ -351,6 +370,7 @@ func (p *planner) CopyData(n CopyDataBlock, autoCommit bool) (planNode, error) { vc := &parser.ValuesClause{Tuples: cf.rows} // Reuse the same backing array once the Insert is complete. cf.rows = cf.rows[:0] + p.session.ClearAccount(&cf.rowsMemAcc) in := parser.Insert{ Table: cf.table, diff --git a/sql/create.go b/sql/create.go index ac2b309746dd..0556604e19cd 100644 --- a/sql/create.go +++ b/sql/create.go @@ -98,7 +98,8 @@ func (n *createDatabaseNode) Start() error { } func (n *createDatabaseNode) Next() (bool, error) { return false, nil } -func (n *createDatabaseNode) Columns() []ResultColumn { return make([]ResultColumn, 0) } +func (n *createDatabaseNode) Close() {} +func (n *createDatabaseNode) Columns() ResultColumns { return make(ResultColumns, 0) } func (n *createDatabaseNode) Ordering() orderingInfo { return orderingInfo{} } func (n *createDatabaseNode) Values() parser.DTuple { return parser.DTuple{} } func (n *createDatabaseNode) DebugValues() debugValues { return debugValues{} } @@ -216,7 +217,8 @@ func (n *createIndexNode) Start() error { } func (n *createIndexNode) Next() (bool, error) { return false, nil } -func (n *createIndexNode) Columns() []ResultColumn { return make([]ResultColumn, 0) } +func (n *createIndexNode) Close() {} +func (n *createIndexNode) Columns() ResultColumns { return make(ResultColumns, 0) } func (n *createIndexNode) Ordering() orderingInfo { return orderingInfo{} } func (n *createIndexNode) Values() parser.DTuple { return parser.DTuple{} } func (n *createIndexNode) DebugValues() debugValues { return debugValues{} } @@ -270,6 +272,10 @@ func (p *planner) CreateTable(n *parser.CreateTable) (planNode, error) { var selectPlan planNode if n.As() { + // The selectPlan is needed to determine the set of columns to use + // to populate the new table descriptor in Start() below. We + // instantiate the selectPlan as early as here so that EXPLAIN has + // something useful to show about CREATE TABLE .. AS ... selectPlan, err = p.getSelectPlan(n) if err != nil { return nil, err @@ -285,14 +291,14 @@ func removeParens(sel parser.SelectStatement) (parser.SelectStatement, error) { case *parser.ParenSelect: return removeParens(ps.Select.Select) default: - return nil, errors.Errorf("Invalid Select type.") + return nil, errors.Errorf("invalid select type %T", sel) } } func (p *planner) getSelectPlan(n *parser.CreateTable) (planNode, error) { selNoParens, err := removeParens(n.AsSource.Select) if err != nil { - return nil, errors.Errorf("Invalid Select type.") + return nil, err } s, err := p.SelectClause(selNoParens.(*parser.SelectClause), n.AsSource.OrderBy, n.AsSource.Limit, []parser.Datum{}, 0) if err != nil { @@ -500,6 +506,17 @@ func (n *createTableNode) Start() error { if err != nil { return err } + + // TODO(knz): Ideally we would want to plug the selectPlan which + // was already computed as a data source into the insertNode. Now + // unfortunately this is not so easy: when this point is reached, + // selectPlan.expandPlan() has already been called, and + // insertPlan.expandPlan() below would cause a 2nd invocation and + // cause a panic. So instead we close this selectPlan and let the + // insertNode create it anew from the AsSource syntax node. + n.selectPlan.Close() + n.selectPlan = nil + desiredTypesFromSelect := make([]parser.Datum, len(resultColumns)) for i, col := range resultColumns { desiredTypesFromSelect[i] = col.Typ @@ -510,12 +527,10 @@ func (n *createTableNode) Start() error { return err } n.insertPlan = insertPlan - err = insertPlan.expandPlan() - if err != nil { + if err := insertPlan.expandPlan(); err != nil { return err } - err = insertPlan.Start() - if err != nil { + if err = insertPlan.Start(); err != nil { return err } // This loop is done here instead of in the Next method @@ -530,11 +545,14 @@ func (n *createTableNode) Start() error { return nil } -func (n *createTableNode) Next() (bool, error) { - return false, nil +func (n *createTableNode) Close() { + if n.insertPlan != nil { + n.insertPlan.Close() + } } -func (n *createTableNode) Columns() []ResultColumn { return make([]ResultColumn, 0) } +func (n *createTableNode) Next() (bool, error) { return false, nil } +func (n *createTableNode) Columns() ResultColumns { return make(ResultColumns, 0) } func (n *createTableNode) Ordering() orderingInfo { return orderingInfo{} } func (n *createTableNode) Values() parser.DTuple { return parser.DTuple{} } func (n *createTableNode) DebugValues() debugValues { return debugValues{} } diff --git a/sql/data_source.go b/sql/data_source.go index 476064e9dcf1..0ed6efc632ba 100644 --- a/sql/data_source.go +++ b/sql/data_source.go @@ -112,7 +112,7 @@ import ( type dataSourceInfo struct { // sourceColumns match the plan.Columns() 1-to-1. However the column // names might be different if the statement renames them using AS. - sourceColumns []ResultColumn + sourceColumns ResultColumns // sourceAliases indicates to which table alias column ranges // belong. @@ -153,7 +153,7 @@ func fillColumnRange(firstIdx, lastIdx int) columnRange { // newSourceInfoForSingleTable creates a simple dataSourceInfo // which maps the same tableAlias to all columns. -func newSourceInfoForSingleTable(tn parser.TableName, columns []ResultColumn) *dataSourceInfo { +func newSourceInfoForSingleTable(tn parser.TableName, columns ResultColumns) *dataSourceInfo { norm := sqlbase.NormalizeTableName(tn) return &dataSourceInfo{ sourceColumns: columns, @@ -294,7 +294,7 @@ func (p *planner) getDataSource( if len(colAlias) > 0 { // Make a copy of the slice since we are about to modify the contents. - src.info.sourceColumns = append([]ResultColumn(nil), src.info.sourceColumns...) + src.info.sourceColumns = append(ResultColumns(nil), src.info.sourceColumns...) // The column aliases can only refer to explicit columns. for colIdx, aliasIdx := 0, 0; aliasIdx < len(colAlias); colIdx++ { @@ -328,7 +328,7 @@ func (p *planner) getDataSource( // expressions that correspond to the expansion of a star. func (src *dataSourceInfo) expandStar( v parser.VarName, qvals qvalMap, -) (columns []ResultColumn, exprs []parser.TypedExpr, err error) { +) (columns ResultColumns, exprs []parser.TypedExpr, err error) { if len(src.sourceColumns) == 0 { return nil, nil, fmt.Errorf("cannot use %q without a FROM clause", v) } @@ -598,7 +598,7 @@ func concatDataSourceInfos(left *dataSourceInfo, right *dataSourceInfo) (*dataSo aliases[k] = v } - columns := make([]ResultColumn, 0, len(left.sourceColumns)+len(right.sourceColumns)) + columns := make(ResultColumns, 0, len(left.sourceColumns)+len(right.sourceColumns)) columns = append(columns, left.sourceColumns...) columns = append(columns, right.sourceColumns...) diff --git a/sql/delete.go b/sql/delete.go index 20befffb3a71..1e2c0cc58a5e 100644 --- a/sql/delete.go +++ b/sql/delete.go @@ -133,6 +133,10 @@ func (d *deleteNode) Start() error { return d.run.tw.init(d.p.txn) } +func (d *deleteNode) Close() { + d.run.rows.Close() +} + func (d *deleteNode) FastPathResults() (int, bool) { if d.run.fastPath { return d.rh.rowCount, true @@ -213,7 +217,7 @@ func (d *deleteNode) fastDelete(scan *scanNode) error { return nil } -func (d *deleteNode) Columns() []ResultColumn { +func (d *deleteNode) Columns() ResultColumns { return d.rh.columns } diff --git a/sql/dist_sql_node.go b/sql/dist_sql_node.go index 9f493324733d..8ff5c5d1530e 100644 --- a/sql/dist_sql_node.go +++ b/sql/dist_sql_node.go @@ -30,7 +30,7 @@ import ( // distSQLNode is a planNode that receives results from a distsql flow (through // a RowChannel). type distSQLNode struct { - columns []ResultColumn + columns ResultColumns ordering orderingInfo // syncMode indicates the mode in which we run the associated flow. If true, @@ -62,12 +62,15 @@ func (n *distSQLNode) expandPlan() error { return nil func (n *distSQLNode) MarkDebug(explainMode) {} func (n *distSQLNode) DebugValues() debugValues { return debugValues{} } func (n *distSQLNode) Start() error { return nil } +func (n *distSQLNode) Close() { + // TODO(RaduBerinde) close the stream / release resources. +} func (n *distSQLNode) ExplainPlan(verbose bool) (name, description string, children []planNode) { return "distsql", "", nil } -func (n *distSQLNode) Columns() []ResultColumn { +func (n *distSQLNode) Columns() ResultColumns { return n.columns } @@ -76,7 +79,7 @@ func (n *distSQLNode) Ordering() orderingInfo { } func newDistSQLNode( - columns []ResultColumn, + columns ResultColumns, colMapping []uint32, ordering orderingInfo, srv *distsql.ServerImpl, @@ -182,7 +185,7 @@ func scanNodeToTableReaderSpec(n *scanNode) *distsql.TableReaderSpec { // refer to columns. We temporarily rename the scanNode columns to // (literally) "$0", "$1", ... and convert to a string. tmp := n.resultColumns - n.resultColumns = make([]ResultColumn, len(tmp)) + n.resultColumns = make(ResultColumns, len(tmp)) for i, orig := range tmp { n.resultColumns[i].Name = fmt.Sprintf("$%d", i) n.resultColumns[i].Typ = orig.Typ diff --git a/sql/distinct.go b/sql/distinct.go index c8694e80f47b..0356b0ce1bd3 100644 --- a/sql/distinct.go +++ b/sql/distinct.go @@ -21,6 +21,7 @@ import ( "fmt" "strings" + "github.com/cockroachdb/cockroach/sql/mon" "github.com/cockroachdb/cockroach/sql/parser" "github.com/cockroachdb/cockroach/sql/sqlbase" ) @@ -28,25 +29,33 @@ import ( // distinctNode de-duplicates rows returned by a wrapped planNode. type distinctNode struct { plan planNode + p *planner top *selectTopNode // All the columns that are part of the Sort. Set to nil if no-sort, or // sort used an expression that was not part of the requested column set. columnsInOrder []bool // Encoding of the columnsInOrder columns for the previous row. - prefixSeen []byte + prefixSeen []byte + prefixMemAcc mon.MemoryAccount + // Encoding of the non-columnInOrder columns for rows sharing the same // prefixSeen value. - suffixSeen map[string]struct{} - explain explainMode - debugVals debugValues + suffixSeen map[string]struct{} + suffixMemAcc mon.MemoryAccount + + explain explainMode + debugVals debugValues } // distinct constructs a distinctNode. -func (*planner) Distinct(n *parser.SelectClause) *distinctNode { +func (p *planner) Distinct(n *parser.SelectClause) *distinctNode { if !n.Distinct { return nil } - return &distinctNode{} + d := &distinctNode{p: p} + p.session.OpenAccount(&d.prefixMemAcc) + p.session.OpenAccount(&d.suffixMemAcc) + return d } // wrap connects the distinctNode to its source planNode. @@ -103,7 +112,7 @@ func (n *distinctNode) setTop(top *selectTopNode) { } } -func (n *distinctNode) Columns() []ResultColumn { +func (n *distinctNode) Columns() ResultColumns { if n.plan != nil { return n.plan.Columns() } @@ -129,6 +138,15 @@ func (n *distinctNode) DebugValues() debugValues { return n.debugVals } +func (n *distinctNode) addSuffixSeen(sKey string) error { + sz := int64(len(sKey)) + if err := n.p.session.GrowAccount(&n.suffixMemAcc, sz); err != nil { + return err + } + n.suffixSeen[sKey] = struct{}{} + return nil +} + func (n *distinctNode) Next() (bool, error) { for { next, err := n.plan.Next() @@ -152,11 +170,17 @@ func (n *distinctNode) Next() (bool, error) { // The prefix of the row which is ordered differs from the last row; // reset our seen set. if len(n.suffixSeen) > 0 { + n.p.session.ClearAccount(&n.suffixMemAcc) n.suffixSeen = make(map[string]struct{}) } + if err := n.p.session.ClearAccountAndAlloc(&n.prefixMemAcc, int64(len(prefix))); err != nil { + return false, err + } n.prefixSeen = prefix if suffix != nil { - n.suffixSeen[string(suffix)] = struct{}{} + if err := n.addSuffixSeen(string(suffix)); err != nil { + return false, err + } } return true, nil } @@ -166,7 +190,9 @@ func (n *distinctNode) Next() (bool, error) { if suffix != nil { sKey := string(suffix) if _, ok := n.suffixSeen[sKey]; !ok { - n.suffixSeen[sKey] = struct{}{} + if err := n.addSuffixSeen(sKey); err != nil { + return false, err + } return true, nil } } @@ -223,3 +249,11 @@ func (n *distinctNode) SetLimitHint(numRows int64, soft bool) { // Any limit becomes a "soft" limit underneath. n.plan.SetLimitHint(numRows, true) } + +func (n *distinctNode) Close() { + n.plan.Close() + n.prefixSeen = nil + n.suffixSeen = nil + n.p.session.CloseAccount(&n.suffixMemAcc) + n.p.session.CloseAccount(&n.prefixMemAcc) +} diff --git a/sql/drop.go b/sql/drop.go index 80b8467a934a..d9a5e21e164c 100644 --- a/sql/drop.go +++ b/sql/drop.go @@ -149,7 +149,8 @@ func (n *dropDatabaseNode) Start() error { } func (n *dropDatabaseNode) Next() (bool, error) { return false, nil } -func (n *dropDatabaseNode) Columns() []ResultColumn { return make([]ResultColumn, 0) } +func (n *dropDatabaseNode) Close() {} +func (n *dropDatabaseNode) Columns() ResultColumns { return make(ResultColumns, 0) } func (n *dropDatabaseNode) Ordering() orderingInfo { return orderingInfo{} } func (n *dropDatabaseNode) Values() parser.DTuple { return parser.DTuple{} } func (n *dropDatabaseNode) DebugValues() debugValues { return debugValues{} } @@ -294,7 +295,8 @@ func (n *dropIndexNode) Start() error { } func (n *dropIndexNode) Next() (bool, error) { return false, nil } -func (n *dropIndexNode) Columns() []ResultColumn { return make([]ResultColumn, 0) } +func (n *dropIndexNode) Close() {} +func (n *dropIndexNode) Columns() ResultColumns { return make(ResultColumns, 0) } func (n *dropIndexNode) Ordering() orderingInfo { return orderingInfo{} } func (n *dropIndexNode) Values() parser.DTuple { return parser.DTuple{} } func (n *dropIndexNode) DebugValues() debugValues { return debugValues{} } @@ -459,7 +461,8 @@ func (n *dropTableNode) Start() error { } func (n *dropTableNode) Next() (bool, error) { return false, nil } -func (n *dropTableNode) Columns() []ResultColumn { return make([]ResultColumn, 0) } +func (n *dropTableNode) Close() {} +func (n *dropTableNode) Columns() ResultColumns { return make(ResultColumns, 0) } func (n *dropTableNode) Ordering() orderingInfo { return orderingInfo{} } func (n *dropTableNode) Values() parser.DTuple { return parser.DTuple{} } func (n *dropTableNode) ExplainTypes(_ func(string, string)) {} diff --git a/sql/empty.go b/sql/empty.go index 409d379ea28c..f1c8d6435311 100644 --- a/sql/empty.go +++ b/sql/empty.go @@ -27,7 +27,7 @@ type emptyNode struct { results bool } -func (*emptyNode) Columns() []ResultColumn { return nil } +func (*emptyNode) Columns() ResultColumns { return nil } func (*emptyNode) Ordering() orderingInfo { return orderingInfo{} } func (*emptyNode) Values() parser.DTuple { return nil } func (*emptyNode) ExplainTypes(_ func(string, string)) {} @@ -35,6 +35,7 @@ func (*emptyNode) Start() error { return nil } func (*emptyNode) SetLimitHint(_ int64, _ bool) {} func (*emptyNode) MarkDebug(_ explainMode) {} func (*emptyNode) expandPlan() error { return nil } +func (*emptyNode) Close() {} func (*emptyNode) ExplainPlan(_ bool) (name, description string, children []planNode) { return "empty", "-", nil diff --git a/sql/executor.go b/sql/executor.go index 7c3721bc2493..5659e91c86eb 100644 --- a/sql/executor.go +++ b/sql/executor.go @@ -98,6 +98,13 @@ type StatementResults struct { Empty bool } +// Close ensures that the resources claimed by the results are released. +func (s *StatementResults) Close() { + for _, r := range s.ResultList { + r.Close() + } +} + // Result corresponds to the execution of a single SQL statement. type Result struct { Err error @@ -111,11 +118,20 @@ type Result struct { // the names and types of the columns returned in the result set in the order // specified in the SQL statement. The number of columns will equal the number // of values in each Row. - Columns []ResultColumn + Columns ResultColumns // Rows will be populated if the statement type is "Rows". It will contain // the result set of the result. // TODO(nvanbenschoten): Can this be streamed from the planNode? - Rows []ResultRow + Rows *RowContainer +} + +// Close ensures that the resources claimed by the result are released. +func (r *Result) Close() { + // The Rows pointer may be nil if the statement returned no rows or + // if an error occurred. + if r.Rows != nil { + r.Rows.Close() + } } // ResultColumn contains the name and type of a SQL "cell". @@ -127,10 +143,9 @@ type ResultColumn struct { hidden bool } -// ResultRow is a collection of values representing a row in a result. -type ResultRow struct { - Values []parser.Datum -} +// ResultColumns is the type used throughout the sql module to +// describe the column types of a table. +type ResultColumns []ResultColumn // An Executor executes SQL statements. // Executor is thread-safe. @@ -343,7 +358,7 @@ func (e *Executor) Prepare( query string, session *Session, pinfo parser.PlaceholderTypes, -) ([]ResultColumn, error) { +) (ResultColumns, error) { if log.V(2) { log.Infof(session.Ctx(), "preparing: %s", query) } else if traceSQL { @@ -388,6 +403,7 @@ func (e *Executor) Prepare( if plan == nil { return nil, nil } + defer plan.Close() cols := plan.Columns() for _, c := range cols { if err := checkResultDatum(c.Typ); err != nil { @@ -428,6 +444,7 @@ func (e *Executor) CopyDone(session *Session) StatementResults { // CopyEnd ends the COPY mode. Any buffered data is discarded. func (session *Session) CopyEnd() { + session.planner.copyFrom.Close() session.planner.copyFrom = nil } @@ -1017,6 +1034,10 @@ func (e *Executor) execStmtInOpenTxn( autoCommit := implicitTxn && !e.cfg.TestingKnobs.DisableAutoCommit result, err := e.execStmt(stmt, planMaker, autoCommit) if err != nil { + if result.Rows != nil { + result.Rows.Close() + result.Rows = nil + } if traceSQL { log.ErrEventf(txnState.txn.Context, "ERROR: %v", err) } @@ -1030,7 +1051,7 @@ func (e *Executor) execStmtInOpenTxn( case parser.RowsAffected: tResult.count = result.RowsAffected case parser.Rows: - tResult.count = len(result.Rows) + tResult.count = result.Rows.Len() } if traceSQL { log.Eventf(txnState.txn.Context, "%s done", tResult) @@ -1127,6 +1148,8 @@ func (e *Executor) execStmt( return result, err } + defer plan.Close() + if testDistSQL != 0 { if err := hackPlanToUseDistSQL(plan, testDistSQL == 1); err != nil { return result, err @@ -1155,9 +1178,10 @@ func (e *Executor) execStmt( return result, err } } + result.Rows = planMaker.NewRowContainer(result.Columns, 0) // valuesAlloc is used to allocate the backing storage for the - // ResultRow.Values slices in chunks. + // result row slices in chunks. var valuesAlloc []parser.Datum const maxChunkSize = 64 // Arbitrary, could use tuning. chunkSize := 4 // Arbitrary as well. @@ -1169,21 +1193,23 @@ func (e *Executor) execStmt( n := len(values) if len(valuesAlloc) < n { - valuesAlloc = make([]parser.Datum, len(result.Columns)*chunkSize) + valuesAlloc = make(parser.DTuple, len(result.Columns)*chunkSize) if chunkSize < maxChunkSize { chunkSize *= 2 } } - row := ResultRow{Values: valuesAlloc[:0:n]} + row := valuesAlloc[:0:n] valuesAlloc = valuesAlloc[n:] for _, val := range values { if err := checkResultDatum(val); err != nil { return result, err } - row.Values = append(row.Values, val) + row = append(row, val) + } + if err := result.Rows.AddRow(row); err != nil { + return result, err } - result.Rows = append(result.Rows, row) } if err != nil { return result, err @@ -1308,8 +1334,8 @@ func checkResultDatum(datum parser.Datum) error { } // makeResultColumns converts sqlbase.ColumnDescriptors to ResultColumns. -func makeResultColumns(colDescs []sqlbase.ColumnDescriptor) []ResultColumn { - cols := make([]ResultColumn, 0, len(colDescs)) +func makeResultColumns(colDescs []sqlbase.ColumnDescriptor) ResultColumns { + cols := make(ResultColumns, 0, len(colDescs)) for _, colDesc := range colDescs { // Convert the sqlbase.ColumnDescriptor to ResultColumn. typ := colDesc.Type.ToDatumType() diff --git a/sql/explain.go b/sql/explain.go index 9fa7a7ff6869..138acd304aa1 100644 --- a/sql/explain.go +++ b/sql/explain.go @@ -102,29 +102,39 @@ func (p *planner) Explain(n *parser.Explain, autoCommit bool) (planNode, error) return &explainDebugNode{plan}, nil case explainTypes: + columns := ResultColumns{ + {Name: "Level", Typ: parser.TypeInt}, + {Name: "Type", Typ: parser.TypeString}, + {Name: "Element", Typ: parser.TypeString}, + {Name: "Description", Typ: parser.TypeString}, + } node := &explainTypesNode{ plan: plan, expanded: expanded, - results: &valuesNode{ - columns: []ResultColumn{ - {Name: "Level", Typ: parser.TypeInt}, - {Name: "Type", Typ: parser.TypeString}, - {Name: "Element", Typ: parser.TypeString}, - {Name: "Description", Typ: parser.TypeString}, - }, - }, + results: p.newContainerValuesNode(columns, 0), } return node, nil case explainPlan: + columns := ResultColumns{ + {Name: "Level", Typ: parser.TypeInt}, + {Name: "Type", Typ: parser.TypeString}, + {Name: "Description", Typ: parser.TypeString}, + } + if verbose { + columns = append(columns, ResultColumn{Name: "Columns", Typ: parser.TypeString}) + columns = append(columns, ResultColumn{Name: "Ordering", Typ: parser.TypeString}) + } + node := &explainPlanNode{ verbose: verbose, plan: plan, + results: p.newContainerValuesNode(columns, 0), } return node, nil case explainTrace: - return makeTraceNode(plan, p.txn), nil + return p.makeTraceNode(plan, p.txn), nil default: return nil, fmt.Errorf("unsupported EXPLAIN mode: %d", mode) @@ -139,7 +149,7 @@ type explainTypesNode struct { func (e *explainTypesNode) ExplainTypes(fn func(string, string)) {} func (e *explainTypesNode) Next() (bool, error) { return e.results.Next() } -func (e *explainTypesNode) Columns() []ResultColumn { return e.results.Columns() } +func (e *explainTypesNode) Columns() ResultColumns { return e.results.Columns() } func (e *explainTypesNode) Ordering() orderingInfo { return e.results.Ordering() } func (e *explainTypesNode) Values() parser.DTuple { return e.results.Values() } func (e *explainTypesNode) DebugValues() debugValues { return e.results.DebugValues() } @@ -164,11 +174,15 @@ func (e *explainTypesNode) expandPlan() error { } func (e *explainTypesNode) Start() error { - populateTypes(e.results, e.plan, 0) - return nil + return populateTypes(e.results, e.plan, 0) +} + +func (e *explainTypesNode) Close() { + e.plan.Close() + e.results.Close() } -func formatColumns(cols []ResultColumn, printTypes bool) string { +func formatColumns(cols ResultColumns, printTypes bool) string { var buf bytes.Buffer buf.WriteByte('(') for i, rCol := range cols { @@ -188,7 +202,7 @@ func formatColumns(cols []ResultColumn, printTypes bool) string { return buf.String() } -func populateTypes(v *valuesNode, plan planNode, level int) { +func populateTypes(v *valuesNode, plan planNode, level int) error { name, _, children := plan.ExplainPlan(true) // Format the result column types. @@ -198,24 +212,39 @@ func populateTypes(v *valuesNode, plan planNode, level int) { parser.NewDString("result"), parser.NewDString(formatColumns(plan.Columns(), true)), } - v.rows = append(v.rows, row) + if err := v.rows.AddRow(row); err != nil { + return err + } // Format the node's typing details. + var err error regType := func(elt string, desc string) { + if err != nil { + return + } + row := parser.DTuple{ parser.NewDInt(parser.DInt(level)), parser.NewDString(name), parser.NewDString(elt), parser.NewDString(desc), } - v.rows = append(v.rows, row) + err = v.rows.AddRow(row) } plan.ExplainTypes(regType) + if err != nil { + return err + } + // Recurse into sub-nodes. for _, child := range children { - populateTypes(v, child, level+1) + if err := populateTypes(v, child, level+1); err != nil { + return err + } } + + return nil } type explainPlanNode struct { @@ -226,24 +255,13 @@ type explainPlanNode struct { func (e *explainPlanNode) ExplainTypes(fn func(string, string)) {} func (e *explainPlanNode) Next() (bool, error) { return e.results.Next() } -func (e *explainPlanNode) Columns() []ResultColumn { return e.results.Columns() } +func (e *explainPlanNode) Columns() ResultColumns { return e.results.Columns() } func (e *explainPlanNode) Ordering() orderingInfo { return e.results.Ordering() } func (e *explainPlanNode) Values() parser.DTuple { return e.results.Values() } func (e *explainPlanNode) DebugValues() debugValues { return debugValues{} } func (e *explainPlanNode) SetLimitHint(n int64, s bool) { e.results.SetLimitHint(n, s) } func (e *explainPlanNode) MarkDebug(mode explainMode) {} func (e *explainPlanNode) expandPlan() error { - columns := []ResultColumn{ - {Name: "Level", Typ: parser.TypeInt}, - {Name: "Type", Typ: parser.TypeString}, - {Name: "Description", Typ: parser.TypeString}, - } - if e.verbose { - columns = append(columns, ResultColumn{Name: "Columns", Typ: parser.TypeString}) - columns = append(columns, ResultColumn{Name: "Ordering", Typ: parser.TypeString}) - } - e.results = &valuesNode{columns: columns} - if err := e.plan.expandPlan(); err != nil { return err } @@ -259,11 +277,15 @@ func (e *explainPlanNode) ExplainPlan(v bool) (string, string, []planNode) { } func (e *explainPlanNode) Start() error { - populateExplain(e.verbose, e.results, e.plan, 0) - return nil + return populateExplain(e.verbose, e.results, e.plan, 0) } -func populateExplain(verbose bool, v *valuesNode, plan planNode, level int) { +func (e *explainPlanNode) Close() { + e.plan.Close() + e.results.Close() +} + +func populateExplain(verbose bool, v *valuesNode, plan planNode, level int) error { name, description, children := plan.ExplainPlan(verbose) row := parser.DTuple{ @@ -275,11 +297,16 @@ func populateExplain(verbose bool, v *valuesNode, plan planNode, level int) { row = append(row, parser.NewDString(formatColumns(plan.Columns(), false))) row = append(row, parser.NewDString(plan.Ordering().AsString(plan.Columns()))) } - v.rows = append(v.rows, row) + if err := v.rows.AddRow(row); err != nil { + return err + } for _, child := range children { - populateExplain(verbose, v, child, level+1) + if err := populateExplain(verbose, v, child, level+1); err != nil { + return err + } } + return nil } type debugValueType int @@ -359,15 +386,15 @@ type explainDebugNode struct { } // Columns for explainDebug mode. -var debugColumns = []ResultColumn{ +var debugColumns = ResultColumns{ {Name: "RowIdx", Typ: parser.TypeInt}, {Name: "Key", Typ: parser.TypeString}, {Name: "Value", Typ: parser.TypeString}, {Name: "Disposition", Typ: parser.TypeString}, } -func (*explainDebugNode) Columns() []ResultColumn { return debugColumns } -func (*explainDebugNode) Ordering() orderingInfo { return orderingInfo{} } +func (*explainDebugNode) Columns() ResultColumns { return debugColumns } +func (*explainDebugNode) Ordering() orderingInfo { return orderingInfo{} } func (n *explainDebugNode) expandPlan() error { if err := n.plan.expandPlan(); err != nil { @@ -379,6 +406,7 @@ func (n *explainDebugNode) expandPlan() error { func (n *explainDebugNode) Start() error { return n.plan.Start() } func (n *explainDebugNode) Next() (bool, error) { return n.plan.Next() } +func (n *explainDebugNode) Close() { n.plan.Close() } func (n *explainDebugNode) ExplainPlan(v bool) (name, description string, children []planNode) { return n.plan.ExplainPlan(v) diff --git a/sql/group.go b/sql/group.go index d527a4b9cc04..31265cb874fd 100644 --- a/sql/group.go +++ b/sql/group.go @@ -21,6 +21,7 @@ import ( "fmt" "strings" + "github.com/cockroachdb/cockroach/sql/mon" "github.com/cockroachdb/cockroach/sql/parser" "github.com/cockroachdb/cockroach/sql/sqlbase" "github.com/cockroachdb/cockroach/util/encoding" @@ -190,7 +191,7 @@ type groupNode struct { explain explainMode } -func (n *groupNode) Columns() []ResultColumn { +func (n *groupNode) Columns() ResultColumns { return n.values.Columns() } @@ -303,7 +304,7 @@ func (n *groupNode) Next() (bool, error) { // Feed the aggregateFuncHolders for this bucket the non-grouped values. for i, value := range aggregatedValues { - if err := n.funcs[i].add(encoded, value); err != nil { + if err := n.funcs[i].add(n.planner.session, encoded, value); err != nil { return false, err } } @@ -329,7 +330,7 @@ func (n *groupNode) computeAggregates() error { n.populated = true // Render the results. - n.values.rows = make([]parser.DTuple, 0, len(n.buckets)) + n.values.rows = n.planner.NewRowContainer(n.values.Columns(), len(n.buckets)) for k := range n.buckets { n.currentBucket = k @@ -354,7 +355,9 @@ func (n *groupNode) computeAggregates() error { row = append(row, res) } - n.values.rows = append(n.values.rows, row) + if err := n.values.rows.AddRow(row); err != nil { + return err + } } return nil } @@ -393,6 +396,15 @@ func (n *groupNode) ExplainTypes(regTypes func(string, string)) { func (*groupNode) SetLimitHint(_ int64, _ bool) {} +func (n *groupNode) Close() { + n.plan.Close() + for _, f := range n.funcs { + f.close(n.planner.session) + } + n.values.Close() + n.buckets = nil +} + // wrap the supplied planNode with the groupNode if grouping/aggregation is required. func (n *groupNode) wrap(plan planNode) planNode { if n == nil { @@ -504,13 +516,7 @@ func (v *extractAggregatesVisitor) VisitPre(expr parser.Expr) (recurse bool, new return false, expr } - f := &aggregateFuncHolder{ - expr: t, - arg: argExpr.(parser.TypedExpr), - create: agg, - group: v.n, - buckets: make(map[string]parser.AggregateFunc), - } + f := v.n.newAggregateFuncHolder(t, argExpr.(parser.TypedExpr), agg) if t.Type == parser.Distinct { f.seen = make(map[string]struct{}) } @@ -523,13 +529,7 @@ func (v *extractAggregatesVisitor) VisitPre(expr parser.Expr) (recurse bool, new t.colRef.get().Name) return true, expr } - f := &aggregateFuncHolder{ - expr: t, - arg: t, - create: parser.NewIdentAggregate, - group: v.n, - buckets: make(map[string]parser.AggregateFunc), - } + f := v.n.newAggregateFuncHolder(t, t, parser.NewIdentAggregate) v.n.funcs = append(v.n.funcs, f) return false, f } @@ -569,15 +569,38 @@ var _ parser.TypedExpr = &aggregateFuncHolder{} var _ parser.VariableExpr = &aggregateFuncHolder{} type aggregateFuncHolder struct { - expr parser.TypedExpr - arg parser.TypedExpr - create func() parser.AggregateFunc - group *groupNode - buckets map[string]parser.AggregateFunc - seen map[string]struct{} + expr parser.TypedExpr + arg parser.TypedExpr + create func() parser.AggregateFunc + group *groupNode + buckets map[string]parser.AggregateFunc + bucketsMemAcc mon.MemoryAccount + seen map[string]struct{} } -func (a *aggregateFuncHolder) add(bucket []byte, d parser.Datum) error { +func (n *groupNode) newAggregateFuncHolder( + expr, arg parser.TypedExpr, + create func() parser.AggregateFunc, +) *aggregateFuncHolder { + res := &aggregateFuncHolder{ + expr: expr, + arg: arg, + create: create, + group: n, + buckets: make(map[string]parser.AggregateFunc), + } + n.planner.session.OpenAccount(&res.bucketsMemAcc) + return res +} + +func (a *aggregateFuncHolder) close(s *Session) { + a.buckets = nil + a.seen = nil + a.group = nil + s.CloseAccount(&a.bucketsMemAcc) +} + +func (a *aggregateFuncHolder) add(s *Session, bucket []byte, d parser.Datum) error { // NB: the compiler *should* optimize `myMap[string(myBytes)]`. See: // https://github.com/golang/go/commit/f5f5a8b6209f84961687d993b93ea0d397f5d5bf @@ -590,6 +613,9 @@ func (a *aggregateFuncHolder) add(bucket []byte, d parser.Datum) error { // skip return nil } + if err := s.GrowAccount(&a.bucketsMemAcc, int64(len(encoded))); err != nil { + return err + } a.seen[string(encoded)] = struct{}{} } @@ -631,8 +657,7 @@ func (a *aggregateFuncHolder) Eval(ctx *parser.EvalContext) (parser.Datum, error datum := found.Result() - // This is almost certainly the identity. Oh well. - return datum.Eval(ctx) + return datum, nil } func (a *aggregateFuncHolder) ReturnType() parser.Datum { diff --git a/sql/group_test.go b/sql/group_test.go index 325789c745a8..fd5309311fb6 100644 --- a/sql/group_test.go +++ b/sql/group_test.go @@ -46,9 +46,10 @@ func TestDesiredAggregateOrder(t *testing.T) { {`(COUNT(a), MIN(a))`, nil}, {`(MIN(a+1))`, nil}, } + p := makePlanner() for _, d := range testData { expr, _ := parseAndNormalizeExpr(t, d.expr) - group := &groupNode{} + group := &groupNode{planner: p} _, err := extractAggregatesVisitor{n: group}.extract(expr) if err != nil { t.Fatal(err) diff --git a/sql/information_schema.go b/sql/information_schema.go index 13423753c00f..a4bd5cf50f7d 100644 --- a/sql/information_schema.go +++ b/sql/information_schema.go @@ -93,9 +93,9 @@ CREATE TABLE information_schema.columns ( DATETIME_PRECISION INT ); `, - populate: func(p *planner, addRow func(...parser.Datum)) error { + populate: func(p *planner, addRow func(...parser.Datum) error) error { return forEachTableDesc(p, - func(db *sqlbase.DatabaseDescriptor, table *sqlbase.TableDescriptor) { + func(db *sqlbase.DatabaseDescriptor, table *sqlbase.TableDescriptor) error { // Table descriptors already holds columns in-order. visible := 0 for _, column := range table.Columns { @@ -103,7 +103,7 @@ CREATE TABLE information_schema.columns ( continue } visible++ - addRow( + if err := addRow( defString, // table_catalog parser.NewDString(db.Name), // table_schema parser.NewDString(table.Name), // table_name @@ -117,8 +117,11 @@ CREATE TABLE information_schema.columns ( numericPrecision(column.Type), // numeric_precision numericScale(column.Type), // numeric_scale datetimePrecision(column.Type), // datetime_precision - ) + ); err != nil { + return err + } } + return nil }, ) }, @@ -153,9 +156,9 @@ CREATE TABLE information_schema.schemata ( DEFAULT_CHARACTER_SET_NAME STRING NOT NULL DEFAULT '', SQL_PATH STRING );`, - populate: func(p *planner, addRow func(...parser.Datum)) error { - return forEachDatabaseDesc(p, func(db *sqlbase.DatabaseDescriptor) { - addRow( + populate: func(p *planner, addRow func(...parser.Datum) error) error { + return forEachDatabaseDesc(p, func(db *sqlbase.DatabaseDescriptor) error { + return addRow( defString, // catalog_name parser.NewDString(db.Name), // schema_name parser.DNull, // default_character_set_name @@ -183,9 +186,9 @@ CREATE TABLE information_schema.table_constraints ( TABLE_NAME STRING NOT NULL DEFAULT '', CONSTRAINT_TYPE STRING NOT NULL DEFAULT '' );`, - populate: func(p *planner, addRow func(...parser.Datum)) error { + populate: func(p *planner, addRow func(...parser.Datum) error) error { return forEachTableDesc(p, - func(db *sqlbase.DatabaseDescriptor, table *sqlbase.TableDescriptor) { + func(db *sqlbase.DatabaseDescriptor, table *sqlbase.TableDescriptor) error { type constraint struct { name string typ parser.Datum @@ -215,15 +218,18 @@ CREATE TABLE information_schema.table_constraints ( }) } for _, c := range constraints { - addRow( + if err := addRow( defString, // constraint_catalog parser.NewDString(db.Name), // constraint_schema dStringOrNull(c.name), // constraint_name parser.NewDString(db.Name), // table_schema parser.NewDString(table.Name), // table_name c.typ, // constraint_type - ) + ); err != nil { + return err + } } + return nil }, ) }, @@ -243,14 +249,14 @@ CREATE TABLE information_schema.tables ( TABLE_TYPE STRING NOT NULL DEFAULT '', VERSION INT );`, - populate: func(p *planner, addRow func(...parser.Datum)) error { + populate: func(p *planner, addRow func(...parser.Datum) error) error { return forEachTableDesc(p, - func(db *sqlbase.DatabaseDescriptor, table *sqlbase.TableDescriptor) { + func(db *sqlbase.DatabaseDescriptor, table *sqlbase.TableDescriptor) error { tableType := tableTypeBaseTable if isVirtualDescriptor(table) { tableType = tableTypeSystemView } - addRow( + return addRow( defString, // table_catalog parser.NewDString(db.Name), // table_schema parser.NewDString(table.Name), // table_name @@ -275,7 +281,7 @@ func (dbs sortedDBDescs) Less(i, j int) bool { return dbs[i].Name < dbs[j].Name // will call fn with its descriptor. func forEachDatabaseDesc( p *planner, - fn func(*sqlbase.DatabaseDescriptor), + fn func(*sqlbase.DatabaseDescriptor) error, ) error { // Handle real schemas dbDescs, err := p.getAllDatabaseDescs() @@ -291,7 +297,9 @@ func forEachDatabaseDesc( sort.Sort(sortedDBDescs(dbDescs)) for _, db := range dbDescs { if userCanSeeDatabase(db, p.session.User) { - fn(db) + if err := fn(db); err != nil { + return err + } } } return nil @@ -303,7 +311,7 @@ func forEachDatabaseDesc( // database and table descriptor. func forEachTableDesc( p *planner, - fn func(*sqlbase.DatabaseDescriptor, *sqlbase.TableDescriptor), + fn func(*sqlbase.DatabaseDescriptor, *sqlbase.TableDescriptor) error, ) error { type dbDescTables struct { desc *sqlbase.DatabaseDescriptor @@ -371,7 +379,9 @@ func forEachTableDesc( for _, tableName := range dbTableNames { tableDesc := db.tables[tableName] if userCanSeeTable(tableDesc, p.session.User) { - fn(db.desc, tableDesc) + if err := fn(db.desc, tableDesc); err != nil { + return err + } } } } diff --git a/sql/insert.go b/sql/insert.go index ab9c79538b93..b3d0ec7cfb57 100644 --- a/sql/insert.go +++ b/sql/insert.go @@ -20,8 +20,6 @@ import ( "bytes" "fmt" - "golang.org/x/net/context" - "github.com/cockroachdb/cockroach/sql/parser" "github.com/cockroachdb/cockroach/sql/privilege" "github.com/cockroachdb/cockroach/sql/sqlbase" @@ -33,7 +31,6 @@ type insertNode struct { editNodeBase defaultExprs []parser.TypedExpr n *parser.Insert - insertRows parser.SelectStatement checkHelper checkHelper insertCols []sqlbase.ColumnDescriptor @@ -200,7 +197,6 @@ func (p *planner) Insert( n: n, editNodeBase: en, defaultExprs: defaultExprs, - insertRows: insertRows, insertCols: ri.insertCols, insertColIDtoRowIndex: ri.insertColIDtoRowIndex, tw: tw, @@ -257,8 +253,12 @@ func (n *insertNode) Start() error { return n.run.tw.init(n.p.txn) } +func (n *insertNode) Close() { + n.run.rows.Close() +} + func (n *insertNode) Next() (bool, error) { - ctx := context.TODO() + ctx := n.editNodeBase.p.ctx() if next, err := n.run.rows.Next(); !next { if err == nil { // We're done. Finish the batch. @@ -463,7 +463,7 @@ func makeDefaultExprs( return defaultExprs, nil } -func (n *insertNode) Columns() []ResultColumn { +func (n *insertNode) Columns() ResultColumns { return n.rh.columns } diff --git a/sql/join.go b/sql/join.go index e9e4c6b95bb9..a6ec75e52696 100644 --- a/sql/join.go +++ b/sql/join.go @@ -118,7 +118,7 @@ func (p *planner) makeIndexJoin(origScan *scanNode, exactPrefix int) (resultPlan }, indexScan } -func (n *indexJoinNode) Columns() []ResultColumn { +func (n *indexJoinNode) Columns() ResultColumns { return n.table.Columns() } @@ -244,3 +244,8 @@ func (n *indexJoinNode) ExplainTypes(_ func(string, string)) {} func (n *indexJoinNode) SetLimitHint(numRows int64, soft bool) { n.index.SetLimitHint(numRows, soft) } + +func (n *indexJoinNode) Close() { + n.index.Close() + n.table.Close() +} diff --git a/sql/limit.go b/sql/limit.go index 3cba4ea8959d..0c284c1bf28e 100644 --- a/sql/limit.go +++ b/sql/limit.go @@ -200,7 +200,7 @@ func (n *limitNode) setTop(top *selectTopNode) { } } -func (n *limitNode) Columns() []ResultColumn { +func (n *limitNode) Columns() ResultColumns { if n.plan != nil { return n.plan.Columns() } @@ -281,6 +281,10 @@ func (n *limitNode) ExplainPlan(_ bool) (string, string, []planNode) { return "limit", buf.String(), subplans } +func (n *limitNode) Close() { + n.plan.Close() +} + // getLimit computes the actual number of rows to request from the // data source to honour both the required count and offset together. // This also ensures that the resulting number of rows does not diff --git a/sql/mon/account.go b/sql/mon/account.go new file mode 100644 index 000000000000..d87367c2b16a --- /dev/null +++ b/sql/mon/account.go @@ -0,0 +1,109 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Raphael 'kena' Poss (knz@cockroachlabs.com) + +package mon + +import ( + "fmt" + + "golang.org/x/net/context" +) + +// MemoryAccount tracks the cumulated allocations for one client of +// MemoryUsageMonitor. This allows a client to release all the memory +// at once when it completes its work. +type MemoryAccount struct { + curAllocated int64 +} + +// OpenAccount creates a new empty account. +func (mm *MemoryUsageMonitor) OpenAccount(_ context.Context, acc *MemoryAccount) { + // TODO(knz): conditionally track accounts in the memory monitor + // (#9122). +} + +// OpenAndInitAccount creates a new account and pre-allocates some +// initial amount of memory. +func (mm *MemoryUsageMonitor) OpenAndInitAccount( + ctx context.Context, acc *MemoryAccount, initialAllocation int64, +) error { + mm.OpenAccount(ctx, acc) + return mm.GrowAccount(ctx, acc, initialAllocation) +} + +// GrowAccount requests a new allocation in an account. +func (mm *MemoryUsageMonitor) GrowAccount( + ctx context.Context, acc *MemoryAccount, extraSize int64, +) error { + if err := mm.reserveMemory(ctx, extraSize); err != nil { + return err + } + acc.curAllocated += extraSize + return nil +} + +// CloseAccount releases all the cumulated allocations of an account at once. +func (mm *MemoryUsageMonitor) CloseAccount(ctx context.Context, acc *MemoryAccount) { + mm.releaseMemory(ctx, acc.curAllocated) +} + +// ClearAccount releases all the cumulated allocations of an account at once +// and primes it for reuse. +func (mm *MemoryUsageMonitor) ClearAccount(ctx context.Context, acc *MemoryAccount) { + mm.releaseMemory(ctx, acc.curAllocated) + acc.curAllocated = 0 +} + +// ClearAccountAndAlloc releases all the cumulated allocations of an account +// at once and primes it for reuse, starting with a first allocation +// of the given size. The account is always closed even if the new +// allocation is refused. +func (mm *MemoryUsageMonitor) ClearAccountAndAlloc( + ctx context.Context, acc *MemoryAccount, newSize int64, +) error { + mm.releaseMemory(ctx, acc.curAllocated) + if err := mm.reserveMemory(ctx, newSize); err != nil { + return err + } + acc.curAllocated = newSize + return nil +} + +// ResizeItem requests a size change for an object already registered +// in an account. The reservation is not modified if the new allocation is +// refused, so that the caller can keep using the original item +// without an accounting error. This is better than calling ClearAccount +// then GrowAccount because if the Clear succeeds and the Grow fails +// the original item becomes invisible from the perspective of the +// monitor. +func (mm *MemoryUsageMonitor) ResizeItem( + ctx context.Context, acc *MemoryAccount, oldSize, newSize int64, +) error { + delta := newSize - oldSize + switch { + case delta > 0: + if err := mm.reserveMemory(ctx, delta); err != nil { + return err + } + case delta < 0: + if acc.curAllocated < -delta { + panic(fmt.Sprintf("no memory in account to release, current %d, free %d", acc.curAllocated, delta)) + } + mm.releaseMemory(ctx, -delta) + } + acc.curAllocated += delta + return nil +} diff --git a/sql/mon/account_test.go b/sql/mon/account_test.go new file mode 100644 index 000000000000..3466ccea0d51 --- /dev/null +++ b/sql/mon/account_test.go @@ -0,0 +1,80 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Raphael 'kena' Poss (knz@cockroachlabs.com) + +package mon + +import ( + "testing" + + "github.com/cockroachdb/cockroach/util/leaktest" + "golang.org/x/net/context" +) + +func TestMemAcc(t *testing.T) { + defer leaktest.AfterTest(t)() + + m := MemoryUsageMonitor{} + ctx := context.Background() + + var a1, a2 MemoryAccount + + m.StartMonitor() + m.OpenAccount(ctx, &a1) + m.OpenAccount(ctx, &a2) + + m.maxAllocatedBudget = 100 + + if err := m.GrowAccount(ctx, &a1, 10); err != nil { + t.Fatalf("monitor refused allocation: %v", err) + } + + if err := m.GrowAccount(ctx, &a2, 30); err != nil { + t.Fatalf("monitor refused allocation: %v", err) + } + + if err := m.GrowAccount(ctx, &a1, 61); err == nil { + t.Fatalf("monitor accepted excessive allocation") + } + + if err := m.GrowAccount(ctx, &a2, 61); err == nil { + t.Fatalf("monitor accepted excessive allocation") + } + + m.ClearAccount(ctx, &a1) + + if err := m.GrowAccount(ctx, &a2, 61); err != nil { + t.Fatalf("monitor refused allocation: %v", err) + } + + if err := m.ResizeItem(ctx, &a2, 50, 60); err == nil { + t.Fatalf("monitor accepted excessive allocation") + } + + if err := m.ResizeItem(ctx, &a1, 0, 5); err != nil { + t.Fatalf("monitor refused allocation: %v", err) + } + + if err := m.ClearAccountAndAlloc(ctx, &a2, 40); err != nil { + t.Fatalf("monitor refused reset + allocation: %v", err) + } + + m.CloseAccount(ctx, &a1) + m.CloseAccount(ctx, &a2) + + if m.curAllocated != 0 { + t.Fatal("closing spans leaves bytes in monitor") + } +} diff --git a/sql/mon/mem_usage.go b/sql/mon/mem_usage.go new file mode 100644 index 000000000000..41a96ef29880 --- /dev/null +++ b/sql/mon/mem_usage.go @@ -0,0 +1,136 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Raphael 'kena' Poss (knz@cockroachlabs.com) + +package mon + +import ( + "fmt" + "math" + + "github.com/cockroachdb/cockroach/util" + "github.com/cockroachdb/cockroach/util/envutil" + "github.com/cockroachdb/cockroach/util/humanizeutil" + "github.com/cockroachdb/cockroach/util/log" + "github.com/pkg/errors" + "golang.org/x/net/context" +) + +// noteworthyMemoryUsageBytes is the minimum size tracked by a monitor +// before the monitor starts explicitly logging overall usage growth in the log. +var noteworthyMemoryUsageBytes = envutil.EnvOrDefaultInt64("COCKROACH_NOTEWORTHY_MEMORY_USAGE", 10000) + +// MemoryUsageMonitor defines an object that can track and limit +// memory usage by other CockroachDB components. +// +// This is currently used by the SQL session, see sql/session_mem_usage.go. +// +// The monitor must be set up via StartMonitor/StopMonitor just before +// and after a processing context. +// The various counters express sizes in bytes. +type MemoryUsageMonitor struct { + // curAllocated tracks the current amount of memory allocated for + // in-memory row storage. + curAllocated int64 + + // totalAllocated tracks the cumulated amount of memory allocated. + totalAllocated int64 + + // maxAllocated tracks the high water mark of allocations. + maxAllocated int64 + + // maxAllocatedBudget sets the allowable upper limit for allocations. + // Set to MaxInt64 to indicate no limit. + maxAllocatedBudget int64 +} + +// reserveMemory declares the intent to allocate a given number of +// bytes to this monitor. An error is returned if the intent is +// denied. +func (mm *MemoryUsageMonitor) reserveMemory(ctx context.Context, x int64) error { + // TODO(knz): This is here that a policy will restrict how large a + // query can become. + if mm.curAllocated > mm.maxAllocatedBudget-x { + err := errors.Errorf("memory budget exceeded: %d requested, %s already allocated", + x, humanizeutil.IBytes(mm.curAllocated)) + if log.V(2) { + log.Errorf(ctx, "%s - %s", err, util.GetSmallTrace(2)) + } + return err + } + mm.curAllocated += x + mm.totalAllocated += x + if mm.maxAllocated < mm.curAllocated { + mm.maxAllocated = mm.curAllocated + } + + // Report "large" queries to the log for further investigation. + if mm.curAllocated > noteworthyMemoryUsageBytes { + // We only report changes in binary magnitude of the size. This + // is to limit the amount of log messages when a size blowup is + // caused by many small allocations. + if util.RoundUpPowerOfTwo(mm.curAllocated) != util.RoundUpPowerOfTwo(mm.curAllocated-x) { + log.Infof(ctx, "memory usage increases to %s (+%d)", + humanizeutil.IBytes(mm.curAllocated), x) + } + } + + if log.V(2) { + log.Infof(ctx, "now at %d bytes (+%d) - %s", mm.curAllocated, x, util.GetSmallTrace(2)) + } + return nil +} + +// releaseMemory releases memory previously successfully registered +// via reserveMemory(). +func (mm *MemoryUsageMonitor) releaseMemory(ctx context.Context, x int64) { + if mm.curAllocated < x { + panic(fmt.Sprintf("no memory to release, current %d, free %d", mm.curAllocated, x)) + } + mm.curAllocated -= x + + if log.V(2) { + log.Infof(ctx, "now at %d bytes (-%d) - %s", mm.curAllocated, x, util.GetSmallTrace(2)) + } +} + +// StartMonitor begins a monitoring region. +func (mm *MemoryUsageMonitor) StartMonitor() { + if mm.curAllocated != 0 { + panic(fmt.Sprintf("monitor started with %d bytes left over", mm.curAllocated)) + } + mm.curAllocated = 0 + mm.maxAllocated = 0 + mm.totalAllocated = 0 + // TODO(knz): this is where we will use a policy to set a maximum. + mm.maxAllocatedBudget = math.MaxInt64 +} + +// StopMonitor completes a monitoring region. +func (mm *MemoryUsageMonitor) StopMonitor(ctx context.Context) { + if log.V(1) { + log.InfofDepth(ctx, 1, "memory usage max %s total %s", + humanizeutil.IBytes(mm.maxAllocated), + humanizeutil.IBytes(mm.totalAllocated)) + } + + if mm.curAllocated != 0 { + panic(fmt.Sprintf("unexpected leftover memory: %d bytes", mm.curAllocated)) + } + + // Disable the budget for further allocations, so that further SQL + // uses outside of monitor control get errors. + mm.maxAllocatedBudget = 0 +} diff --git a/sql/mon/mem_usage_test.go b/sql/mon/mem_usage_test.go new file mode 100644 index 000000000000..a494cac687ae --- /dev/null +++ b/sql/mon/mem_usage_test.go @@ -0,0 +1,64 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Raphael 'kena' Poss (knz@cockroachlabs.com) + +package mon + +import ( + "testing" + + "github.com/cockroachdb/cockroach/util/leaktest" + "golang.org/x/net/context" +) + +func TestMemoryUsageMonitor(t *testing.T) { + defer leaktest.AfterTest(t)() + + m := MemoryUsageMonitor{} + ctx := context.Background() + + if err := m.reserveMemory(ctx, 10); err == nil { + t.Fatal("monitor failed to reject non-monitored request") + } + + m.StartMonitor() + m.maxAllocatedBudget = 100 + + if err := m.reserveMemory(ctx, 10); err != nil { + t.Fatalf("monitor refused small allocation: %v", err) + } + if err := m.reserveMemory(ctx, 91); err == nil { + t.Fatalf("monitor accepted excessive allocation: %v", err) + } + if err := m.reserveMemory(ctx, 90); err != nil { + t.Fatalf("monitor refused top allocation: %v", err) + } + if m.curAllocated != 100 { + t.Fatalf("incorrect current allocation: got %d, expected %d", m.curAllocated, 100) + } + + m.releaseMemory(ctx, 90) // Should succeed without panic. + if m.curAllocated != 10 { + t.Fatalf("incorrect current allocation: got %d, expected %d", m.curAllocated, 10) + } + if m.maxAllocated != 100 { + t.Fatalf("incorrect max allocation: got %d, expected %d", m.maxAllocated, 100) + } + + m.releaseMemory(ctx, 10) // Should succeed without panic. + if m.curAllocated != 0 { + t.Fatalf("incorrect current allocation: got %d, expected %d", m.curAllocated, 0) + } +} diff --git a/sql/ordering.go b/sql/ordering.go index 35a6eae8d4c4..e3c9ef6e6ded 100644 --- a/sql/ordering.go +++ b/sql/ordering.go @@ -57,7 +57,7 @@ type orderingInfo struct { // Format pretty-prints the orderingInfo to a stream. // If columns is not nil, column names are printed instead of column indexes. -func (ord orderingInfo) Format(buf *bytes.Buffer, columns []ResultColumn) { +func (ord orderingInfo) Format(buf *bytes.Buffer, columns ResultColumns) { sep := "" // Print the exact match columns. We sort them to ensure @@ -104,7 +104,7 @@ func (ord orderingInfo) Format(buf *bytes.Buffer, columns []ResultColumn) { } // AsString pretty-prints the orderingInfo to a string. -func (ord orderingInfo) AsString(columns []ResultColumn) string { +func (ord orderingInfo) AsString(columns ResultColumns) string { var buf bytes.Buffer ord.Format(&buf, columns) return buf.String() diff --git a/sql/parser/datum.go b/sql/parser/datum.go index e5e820940b20..9a3e53aa190a 100644 --- a/sql/parser/datum.go +++ b/sql/parser/datum.go @@ -20,10 +20,12 @@ import ( "bytes" "fmt" "math" + "math/big" "sort" "strconv" "strings" "time" + "unsafe" "gopkg.in/inf.v0" @@ -82,6 +84,11 @@ type Datum interface { // IsMin returns true if the datum is equal to the minimum value the datum // type can hold. IsMin() bool + + // Size returns a lower bound on the Datum's size, in bytes. The + // second return value indicates whether the size is dependent on + // the particular value. + Size() (uintptr, bool) } // DBool is the boolean Datum. @@ -199,6 +206,11 @@ func (d *DBool) Format(buf *bytes.Buffer, f FmtFlags) { buf.WriteString(strconv.FormatBool(bool(*d))) } +// Size implements the Datum interface. +func (d *DBool) Size() (uintptr, bool) { + return unsafe.Sizeof(*d), false +} + // DInt is the int Datum. type DInt int64 @@ -291,6 +303,11 @@ func (d *DInt) Format(buf *bytes.Buffer, f FmtFlags) { buf.WriteString(strconv.FormatInt(int64(*d), 10)) } +// Size implements the Datum interface. +func (d *DInt) Size() (uintptr, bool) { + return unsafe.Sizeof(*d), false +} + // DFloat is the float Datum. type DFloat float64 @@ -392,6 +409,11 @@ func (d *DFloat) Format(buf *bytes.Buffer, f FmtFlags) { } } +// Size implements the Datum interface. +func (d *DFloat) Size() (uintptr, bool) { + return unsafe.Sizeof(*d), false +} + // DDecimal is the decimal Datum. type DDecimal struct { inf.Dec @@ -475,6 +497,12 @@ func (d *DDecimal) Format(buf *bytes.Buffer, f FmtFlags) { buf.WriteString(d.Dec.String()) } +// Size implements the Datum interface. +func (d *DDecimal) Size() (uintptr, bool) { + intVal := d.Dec.UnscaledBig() + return unsafe.Sizeof(*d) + unsafe.Sizeof(*intVal) + uintptr(cap(intVal.Bits()))*unsafe.Sizeof(big.Word(0)), true +} + // DString is the string Datum. type DString string @@ -555,6 +583,11 @@ func (d *DString) Format(buf *bytes.Buffer, f FmtFlags) { encodeSQLString(buf, string(*d)) } +// Size implements the Datum interface. +func (d *DString) Size() (uintptr, bool) { + return unsafe.Sizeof(*d) + uintptr(len(*d)), true +} + // DBytes is the bytes Datum. The underlying type is a string because we want // the immutability, but this may contain arbitrary bytes. type DBytes string @@ -635,6 +668,11 @@ func (d *DBytes) Format(buf *bytes.Buffer, f FmtFlags) { encodeSQLBytes(buf, string(*d)) } +// Size implements the Datum interface. +func (d *DBytes) Size() (uintptr, bool) { + return unsafe.Sizeof(*d) + uintptr(len(*d)), true +} + // DDate is the date Datum represented as the number of days after // the Unix epoch. type DDate int64 @@ -756,6 +794,11 @@ func (d *DDate) Format(buf *bytes.Buffer, f FmtFlags) { buf.WriteString(time.Unix(int64(*d)*secondsInDay, 0).UTC().Format(dateFormat)) } +// Size implements the Datum interface. +func (d *DDate) Size() (uintptr, bool) { + return unsafe.Sizeof(*d), false +} + // DTimestamp is the timestamp Datum. type DTimestamp struct { time.Time @@ -878,6 +921,11 @@ func (d *DTimestamp) Format(buf *bytes.Buffer, f FmtFlags) { buf.WriteString(d.UTC().Format(timestampNodeFormat)) } +// Size implements the Datum interface. +func (d *DTimestamp) Size() (uintptr, bool) { + return unsafe.Sizeof(*d), false +} + // DTimestampTZ is the timestamp Datum that is rendered with session offset. type DTimestampTZ struct { time.Time @@ -972,6 +1020,11 @@ func (d *DTimestampTZ) Format(buf *bytes.Buffer, f FmtFlags) { buf.WriteString(d.UTC().Format(timestampNodeFormat)) } +// Size implements the Datum interface. +func (d *DTimestampTZ) Size() (uintptr, bool) { + return unsafe.Sizeof(*d), false +} + // DInterval is the interval Datum. type DInterval struct { duration.Duration @@ -1115,6 +1168,11 @@ func (d *DInterval) Format(buf *bytes.Buffer, f FmtFlags) { } } +// Size implements the Datum interface. +func (d *DInterval) Size() (uintptr, bool) { + return unsafe.Sizeof(*d), false +} + // DTuple is the tuple Datum. type DTuple []Datum @@ -1280,6 +1338,16 @@ func (d *DTuple) makeUnique() { *d = (*d)[:n] } +// Size implements the Datum interface. +func (d *DTuple) Size() (uintptr, bool) { + sz := unsafe.Sizeof(*d) + for _, e := range *d { + dsz, _ := e.Size() + sz += dsz + } + return sz, true +} + // SortedDifference finds the elements of d which are not in other, // assuming that d and other are already sorted. func (d *DTuple) SortedDifference(other *DTuple) *DTuple { @@ -1362,6 +1430,11 @@ func (dNull) Format(buf *bytes.Buffer, f FmtFlags) { buf.WriteString("NULL") } +// Size implements the Datum interface. +func (d dNull) Size() (uintptr, bool) { + return unsafe.Sizeof(d), false +} + var _ VariableExpr = &DPlaceholder{} // DPlaceholder is the named placeholder Datum. @@ -1438,6 +1511,11 @@ func (d *DPlaceholder) Format(buf *bytes.Buffer, f FmtFlags) { buf.WriteString(d.name) } +// Size implements the Datum interface. +func (d *DPlaceholder) Size() (uintptr, bool) { + return unsafe.Sizeof(*d) + uintptr(len(d.name)), true +} + // Temporary workaround for #3633, allowing comparisons between // heterogeneous types. // TODO(nvanbenschoten) Now that typing is improved, can we get rid of this? diff --git a/sql/pgwire/v3.go b/sql/pgwire/v3.go index e4547e7cac09..a948b0fa53fc 100644 --- a/sql/pgwire/v3.go +++ b/sql/pgwire/v3.go @@ -605,7 +605,10 @@ func (c *v3Conn) handleBind(buf *readBuffer) error { return c.sendInternalError(fmt.Sprintf("expected 0, 1, or %d for number of format codes, got %d", numColumns, numColumnFormatCodes)) } // Create the new PreparedPortal in the connection's Session. - portal := c.session.PreparedPortals.New(portalName, stmt, qargs) + portal, err := c.session.PreparedPortals.New(portalName, stmt, qargs) + if err != nil { + return err + } // Attach pgwire-specific metadata to the PreparedPortal. portal.ProtocolMeta = preparedPortalMeta{outFormats: columnFormatCodes} c.writeBuf.initMsg(serverMsgBindComplete) @@ -646,6 +649,7 @@ func (c *v3Conn) executeStatements( ) error { tracing.AnnotateTrace() results := c.executor.ExecuteStatements(c.session, stmts, pinfo) + defer results.Close() tracing.AnnotateTrace() if results.Empty { @@ -716,6 +720,7 @@ func (c *v3Conn) sendErrorWithCode(errCode string, errCtx sqlbase.SrcCtx, errToS return c.wr.Flush() } +// sendResponse sends the results as a query response. func (c *v3Conn) sendResponse(results sql.ResultList, formatCodes []formatCode, sendDescription bool, limit int) error { if len(results) == 0 { return c.sendCommandComplete(nil) @@ -727,8 +732,8 @@ func (c *v3Conn) sendResponse(results sql.ResultList, formatCodes []formatCode, } break } - if limit != 0 && len(result.Rows) > limit { - if err := c.sendInternalError(fmt.Sprintf("execute row count limits not supported: %d of %d", limit, len(result.Rows))); err != nil { + if limit != 0 && result.Rows != nil && result.Rows.Len() > limit { + if err := c.sendInternalError(fmt.Sprintf("execute row count limits not supported: %d of %d", limit, result.Rows.Len())); err != nil { return err } break @@ -759,10 +764,12 @@ func (c *v3Conn) sendResponse(results sql.ResultList, formatCodes []formatCode, } // Send DataRows. - for _, row := range result.Rows { + nRows := result.Rows.Len() + for rowIdx := 0; rowIdx < nRows; rowIdx++ { + row := result.Rows.At(rowIdx) c.writeBuf.initMsg(serverMsgDataRow) - c.writeBuf.putInt16(int16(len(row.Values))) - for i, col := range row.Values { + c.writeBuf.putInt16(int16(len(row))) + for i, col := range row { fmtCode := formatText if formatCodes != nil { fmtCode = formatCodes[i] @@ -783,7 +790,7 @@ func (c *v3Conn) sendResponse(results sql.ResultList, formatCodes []formatCode, // Send CommandComplete. tag = append(tag, ' ') - tag = strconv.AppendUint(tag, uint64(len(result.Rows)), 10) + tag = strconv.AppendUint(tag, uint64(result.Rows.Len()), 10) if err := c.sendCommandComplete(tag); err != nil { return err } diff --git a/sql/plan.go b/sql/plan.go index e9ca6befb704..1832ed09897c 100644 --- a/sql/plan.go +++ b/sql/plan.go @@ -122,7 +122,7 @@ type planNode interface { // Stable after expandPlan() (or makePlan). // Available after newPlan(), but may change on intermediate plan // nodes during expandPlan() due to index selection. - Columns() []ResultColumn + Columns() ResultColumns // The indexes of the columns the output is ordered by. // @@ -171,6 +171,9 @@ type planNode interface { // Available after Next() and MarkDebug(explainDebug), see // explain.go. DebugValues() debugValues + + // Close terminates the planNode execution and releases its resources. + Close() } // planNodeFastPath is implemented by nodes that can perform all their @@ -206,6 +209,7 @@ var _ planNode = &dropTableNode{} var _ planNode = &dropIndexNode{} var _ planNode = &alterTableNode{} var _ planNode = &joinNode{} +var _ planNode = &distSQLNode{} // makePlan implements the Planner interface. func (p *planner) makePlan(stmt parser.Statement, autoCommit bool) (planNode, error) { diff --git a/sql/planner.go b/sql/planner.go index 1a54d6b9ed5a..297adc767533 100644 --- a/sql/planner.go +++ b/sql/planner.go @@ -220,10 +220,13 @@ func (p *planner) query(sql string, args ...interface{}) (planNode, error) { // queryRow implements the queryRunner interface. func (p *planner) queryRow(sql string, args ...interface{}) (parser.DTuple, error) { + p.session.mon.StartMonitor() + defer p.session.mon.StopMonitor(p.ctx()) plan, err := p.query(sql, args...) if err != nil { return nil, err } + defer plan.Close() if err := plan.Start(); err != nil { return nil, err } @@ -243,10 +246,13 @@ func (p *planner) queryRow(sql string, args ...interface{}) (parser.DTuple, erro // exec implements the queryRunner interface. func (p *planner) exec(sql string, args ...interface{}) (int, error) { + p.session.mon.StartMonitor() + defer p.session.mon.StopMonitor(p.ctx()) plan, err := p.query(sql, args...) if err != nil { return 0, err } + defer plan.Close() if err := plan.Start(); err != nil { return 0, err } diff --git a/sql/prepare.go b/sql/prepare.go index 7f311da56f0f..66fd4cdf3052 100644 --- a/sql/prepare.go +++ b/sql/prepare.go @@ -17,8 +17,11 @@ package sql import ( + "unsafe" + "golang.org/x/net/context" + "github.com/cockroachdb/cockroach/sql/mon" "github.com/cockroachdb/cockroach/sql/parser" ) @@ -27,10 +30,12 @@ import ( type PreparedStatement struct { Query string SQLTypes parser.PlaceholderTypes - Columns []ResultColumn + Columns ResultColumns portalNames map[string]struct{} ProtocolMeta interface{} // a field for protocol implementations to hang metadata off of. + + memAcc mon.MemoryAccount } // PreparedStatements is a mapping of PreparedStatement names to their @@ -68,17 +73,31 @@ func (ps PreparedStatements) New( name, query string, placeholderHints parser.PlaceholderTypes, ) (*PreparedStatement, error) { + stmt := &PreparedStatement{} + + // For now we are just counting the size of the query string and + // statement name. When we start storing the prepared query plan + // during prepare, this should be tallied up to the monitor as well. + sz := int64(uintptr(len(query)+len(name)) + unsafe.Sizeof(*stmt)) + if err := ps.session.OpenAndInitAccount(&stmt.memAcc, sz); err != nil { + return nil, err + } + // Prepare the query. This completes the typing of placeholders. cols, err := e.Prepare(query, ps.session, placeholderHints) if err != nil { + ps.session.CloseAccount(&stmt.memAcc) return nil, err } - stmt := &PreparedStatement{ - Query: query, - SQLTypes: placeholderHints, - Columns: cols, - portalNames: make(map[string]struct{}), + stmt.Query = query + stmt.SQLTypes = placeholderHints + stmt.Columns = cols + stmt.portalNames = make(map[string]struct{}) + + if prevStmt, ok := ps.Get(name); ok { + ps.session.CloseAccount(&prevStmt.memAcc) } + ps.stmts[name] = stmt return stmt, nil } @@ -87,18 +106,45 @@ func (ps PreparedStatements) New( // The method returns whether a statement with that name was found and removed. func (ps PreparedStatements) Delete(name string) bool { if stmt, ok := ps.Get(name); ok { - for portalName := range stmt.portalNames { - delete(ps.session.PreparedPortals.portals, portalName) + if ps.session.PreparedPortals.portals != nil { + for portalName := range stmt.portalNames { + if portal, ok := ps.session.PreparedPortals.Get(name); ok { + delete(ps.session.PreparedPortals.portals, portalName) + ps.session.CloseAccount(&portal.memAcc) + } + } } + ps.session.CloseAccount(&stmt.memAcc) delete(ps.stmts, name) return true } return false } +// closeAll de-registers all statements and portals from the monitor. +func (ps PreparedStatements) closeAll(s *Session) { + for _, stmt := range ps.stmts { + s.CloseAccount(&stmt.memAcc) + } + for _, portal := range s.PreparedPortals.portals { + s.CloseAccount(&portal.memAcc) + } +} + +// ClearStatementsAndPortals de-registers all statements and +// portals. Afterwards none can be added any more. +func (s *Session) ClearStatementsAndPortals() { + s.PreparedStatements.closeAll(s) + s.PreparedStatements.stmts = nil + s.PreparedPortals.portals = nil +} + // DeleteAll removes all PreparedStatements from the PreparedStatements. This will in turn // remove all PreparedPortals from the session's PreparedPortals. +// This is used by the "delete" message in the pgwire protocol; after DeleteAll +// statements and portals can be added again. func (ps PreparedStatements) DeleteAll() { + ps.closeAll(ps.session) ps.stmts = make(map[string]*PreparedStatement) ps.session.PreparedPortals.portals = make(map[string]*PreparedPortal) } @@ -109,6 +155,8 @@ type PreparedPortal struct { Qargs parser.QueryArguments ProtocolMeta interface{} // a field for protocol implementations to hang metadata off of. + + memAcc mon.MemoryAccount } // PreparedPortals is a mapping of PreparedPortal names to their corresponding @@ -140,14 +188,24 @@ func (pp PreparedPortals) Exists(name string) bool { // New creates a new PreparedPortal with the provided name and corresponding // PreparedStatement, binding the statement using the given QueryArguments. func (pp PreparedPortals) New(name string, stmt *PreparedStatement, qargs parser.QueryArguments, -) *PreparedPortal { - stmt.portalNames[name] = struct{}{} +) (*PreparedPortal, error) { portal := &PreparedPortal{ Stmt: stmt, Qargs: qargs, } + sz := int64(uintptr(len(name)) + unsafe.Sizeof(*portal)) + if err := pp.session.OpenAndInitAccount(&portal.memAcc, sz); err != nil { + return nil, err + } + + stmt.portalNames[name] = struct{}{} + + if prevPortal, ok := pp.Get(name); ok { + pp.session.CloseAccount(&prevPortal.memAcc) + } + pp.portals[name] = portal - return portal + return portal, nil } // Delete removes the PreparedPortal with the provided name from the PreparedPortals. @@ -155,6 +213,7 @@ func (pp PreparedPortals) New(name string, stmt *PreparedStatement, qargs parser func (pp PreparedPortals) Delete(name string) bool { if portal, ok := pp.Get(name); ok { delete(portal.Stmt.portalNames, name) + pp.session.CloseAccount(&portal.memAcc) delete(pp.portals, name) return true } diff --git a/sql/returning.go b/sql/returning.go index c883fc8d8bd8..2338f8030dc5 100644 --- a/sql/returning.go +++ b/sql/returning.go @@ -28,7 +28,7 @@ import ( type returningHelper struct { p *planner // Expected columns. - columns []ResultColumn + columns ResultColumns // Processed copies of expressions from ReturningExprs. exprs []parser.TypedExpr qvals qvalMap @@ -55,7 +55,7 @@ func (p *planner) makeReturningHelper( } } - rh.columns = make([]ResultColumn, 0, len(r)) + rh.columns = make(ResultColumns, 0, len(r)) aliasTableName := parser.TableName{TableName: parser.Name(alias)} rh.source = newSourceInfoForSingleTable(aliasTableName, makeResultColumns(tablecols)) rh.qvals = make(qvalMap) diff --git a/sql/row_container.go b/sql/row_container.go new file mode 100644 index 000000000000..579f595c885b --- /dev/null +++ b/sql/row_container.go @@ -0,0 +1,167 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Raphael 'kena' Poss (knz@cockroachlabs.com) + +package sql + +import ( + "unsafe" + + "github.com/cockroachdb/cockroach/sql/mon" + "github.com/cockroachdb/cockroach/sql/parser" +) + +// RowContainer is a container for rows of DTuples which tracks the +// approximate amount of memory allocated for row data. +// Rows must be added using AddRow(); once the work is done +// the Close() method must be called to release the allocated memory. +// +// TODO(knz): this does not currently track the amount of memory used +// for the outer array of DTuple references. +type RowContainer struct { + p *planner + rows []parser.DTuple + + // fixedColsSize is the sum of widths of fixed-width columns in a + // single row. + fixedColsSize int64 + // varSizedColumns indicates for which columns the datum size + // is variable. + varSizedColumns []int + + // memAcc tracks the current memory consumption of this + // RowContainer. + memAcc mon.MemoryAccount +} + +// NewRowContainer allocates a new row container. +// +// The rowCapacity argument indicates how many rows are to be +// expected; it is used to pre-allocate the outer array of row +// references, in the fashion of Go's capacity argument to the make() +// function. +// +// Note that we could, but do not (yet), report the size of the row +// container itself to the monitor in this constructor. This is +// because the various planNodes are not (yet) equipped to call +// Close() upon encountering errors in their constructor (all nodes +// initializing a RowContainer there) and SetLimitHint() (for sortNode +// which initializes a RowContainer there). This would be rather +// error-prone to implement consistently and hellishly difficult to +// test properly. The trade-off is that very large table schemas or +// column selections could cause unchecked and potentially dangerous +// memory growth. +func (p *planner) NewRowContainer(h ResultColumns, rowCapacity int) *RowContainer { + nCols := len(h) + + res := &RowContainer{ + p: p, + rows: make([]parser.DTuple, 0, rowCapacity), + varSizedColumns: make([]int, 0, nCols), + } + + p.session.OpenAccount(&res.memAcc) + + for i := 0; i < nCols; i++ { + sz, variable := h[i].Typ.Size() + if variable { + res.varSizedColumns = append(res.varSizedColumns, i) + } else { + res.fixedColsSize += int64(sz) + } + } + res.fixedColsSize += int64(unsafe.Sizeof(parser.Datum(nil)) * uintptr(nCols)) + + return res +} + +// Close releases the memory associated with the RowContainer. +func (c *RowContainer) Close() { + c.rows = nil + c.varSizedColumns = nil + c.p.session.CloseAccount(&c.memAcc) +} + +// rowSize computes the size of a single row. +func (c *RowContainer) rowSize(row parser.DTuple) int64 { + rsz := c.fixedColsSize + for _, i := range c.varSizedColumns { + sz, _ := row[i].Size() + rsz += int64(sz) + } + return rsz +} + +// AddRow attempts to insert a new row in the RowContainer. +// Returns an error if the allocation was denied by the MemoryUsageMonitor. +func (c *RowContainer) AddRow(row parser.DTuple) error { + if err := c.p.session.GrowAccount(&c.memAcc, c.rowSize(row)); err != nil { + return err + } + c.rows = append(c.rows, row) + return nil +} + +// Len reports the number of rows currently held in this RowContainer. +func (c *RowContainer) Len() int { + return len(c.rows) +} + +// At accesses a row at a specific index. +func (c *RowContainer) At(i int) parser.DTuple { + return c.rows[i] +} + +// Swap exchanges two rows. Used for sorting. +func (c *RowContainer) Swap(i, j int) { + c.rows[i], c.rows[j] = c.rows[j], c.rows[i] +} + +// PseudoPop retrieves a pointer to the last row, and decreases the +// visible size of the RowContainer. This is used for heap sorting in +// sql.sortNode. A pointer is returned to avoid an allocation when +// passing the DTuple as an interface{} to heap.Push(). +// Note that the pointer is only valid until the next call to AddRow. +// We use this for heap sorting in sort.go. +func (c *RowContainer) PseudoPop() *parser.DTuple { + idx := len(c.rows) - 1 + x := &(c.rows)[idx] + c.rows = c.rows[:idx] + return x +} + +// ResetLen cancels the effects of PseudoPop(), that is, it restores +// the visible size of the RowContainer to its actual size. +func (c *RowContainer) ResetLen(l int) { + c.rows = c.rows[:l] +} + +// Replace substitutes one row for another. This does query the +// MemoryUsageMonitor to determine whether the new row fits the +// allowance. +func (c *RowContainer) Replace(i int, newRow parser.DTuple) error { + newSz := c.rowSize(newRow) + oldSz := int64(0) + if c.rows[i] != nil { + oldSz = c.rowSize(c.rows[i]) + } + if newSz != oldSz { + if err := c.p.session.ResizeItem(&c.memAcc, oldSz, newSz); err != nil { + return err + } + } + c.rows[i] = newRow + return nil +} diff --git a/sql/scan.go b/sql/scan.go index 57b04a916623..f4feea6b653b 100644 --- a/sql/scan.go +++ b/sql/scan.go @@ -43,7 +43,7 @@ type scanNode struct { // The table columns, possibly including ones currently in schema changes. cols []sqlbase.ColumnDescriptor // There is a 1-1 correspondence between cols and resultColumns. - resultColumns []ResultColumn + resultColumns ResultColumns // Contains values for the current row. There is a 1-1 correspondence // between resultColumns and values in row. row parser.DTuple @@ -80,7 +80,7 @@ func (p *planner) Scan() *scanNode { return &scanNode{p: p} } -func (n *scanNode) Columns() []ResultColumn { +func (n *scanNode) Columns() ResultColumns { return n.resultColumns } @@ -139,6 +139,8 @@ func (n *scanNode) Start() error { return n.p.startSubqueryPlans(n.filter) } +func (n *scanNode) Close() {} + // initScan sets up the rowFetcher and starts a scan. func (n *scanNode) initScan() error { if len(n.spans) == 0 { diff --git a/sql/select.go b/sql/select.go index b524bf0aa10b..bc7c2926855c 100644 --- a/sql/select.go +++ b/sql/select.go @@ -59,7 +59,7 @@ type selectNode struct { // groupNode copies/extends the render array defined by initTargets() // will add extra selectNode renders for the aggregation sources. render []parser.TypedExpr - columns []ResultColumn + columns ResultColumns // The number of initial columns - before adding any internal render // targets for grouping, filtering or ordering. The original columns @@ -88,10 +88,7 @@ type selectNode struct { row parser.DTuple } -func (s *selectNode) Columns() []ResultColumn { - if s.explain == explainDebug { - return debugColumns - } +func (s *selectNode) Columns() ResultColumns { return s.columns } @@ -215,6 +212,10 @@ func (s *selectNode) SetLimitHint(numRows int64, soft bool) { s.source.plan.SetLimitHint(numRows, soft || s.filter != nil) } +func (s *selectNode) Close() { + s.source.plan.Close() +} + // Select selects rows from a SELECT/UNION/VALUES, ordering and/or limiting them. func (p *planner) Select(n *parser.Select, desiredTypes []parser.Datum, autoCommit bool) (planNode, error) { wrapped := n.Select @@ -496,7 +497,7 @@ func (s *selectNode) initWhere(where *parser.Where) error { // returned for each column. func checkRenderStar( target parser.SelectExpr, src *dataSourceInfo, qvals qvalMap, -) (isStar bool, columns []ResultColumn, exprs []parser.TypedExpr, err error) { +) (isStar bool, columns ResultColumns, exprs []parser.TypedExpr, err error) { v, ok := target.Expr.(parser.VarName) if !ok { return false, nil, nil, nil diff --git a/sql/select_top.go b/sql/select_top.go index 9d667aff4396..a8e5d31229fe 100644 --- a/sql/select_top.go +++ b/sql/select_top.go @@ -137,7 +137,7 @@ func (n *selectTopNode) ExplainPlan(v bool) (name, description string, subplans return "select", "", subplans } -func (n *selectTopNode) Columns() []ResultColumn { +func (n *selectTopNode) Columns() ResultColumns { // sort, group and source may have different ideas about the // result columns. Ask them in turn. // (We cannot ask n.plan because it may not be connected yet.) @@ -162,3 +162,8 @@ func (n *selectTopNode) Start() error { return n.plan.Start() } func (n *selectTopNode) Next() (bool, error) { return n.plan.Next() } func (n *selectTopNode) Values() parser.DTuple { return n.plan.Values() } func (n *selectTopNode) DebugValues() debugValues { return n.plan.DebugValues() } +func (n *selectTopNode) Close() { + if n.plan != nil { + n.plan.Close() + } +} diff --git a/sql/session.go b/sql/session.go index 608f0db6633e..71e7aa7ba806 100644 --- a/sql/session.go +++ b/sql/session.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/base" "github.com/cockroachdb/cockroach/internal/client" "github.com/cockroachdb/cockroach/roachpb" + "github.com/cockroachdb/cockroach/sql/mon" "github.com/cockroachdb/cockroach/storage/engine/enginepb" "github.com/cockroachdb/cockroach/util/envutil" "github.com/cockroachdb/cockroach/util/log" @@ -64,6 +65,10 @@ type Session struct { DefaultIsolationLevel enginepb.IsolationType context context.Context cancel context.CancelFunc + + // mon tracks memory usage for SQL activity within this session. + // See the comments at the start of session_mem_usage.go for more details. + mon mon.MemoryUsageMonitor } // SessionArgs contains arguments for creating a new Session with NewSession(). @@ -101,6 +106,7 @@ func NewSession(ctx context.Context, args SessionArgs, e *Executor, remote net.A ctx = log.WithEventLog(ctx, fmt.Sprintf("sql [%s]", args.User), remoteStr) s.context, s.cancel = context.WithCancel(ctx) + s.mon.StartMonitor() return s } @@ -111,6 +117,8 @@ func (s *Session) Finish() { // addressed, there might be leases accumulated by preparing statements. s.planner.releaseLeases() log.FinishEventLog(s.context) + s.ClearStatementsAndPortals() + s.mon.StopMonitor(s.Ctx()) s.cancel() } diff --git a/sql/session_mem_usage.go b/sql/session_mem_usage.go new file mode 100644 index 000000000000..3c450df8cf0f --- /dev/null +++ b/sql/session_mem_usage.go @@ -0,0 +1,91 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Raphael 'kena' Poss (knz@cockroachlabs.com) + +package sql + +import "github.com/cockroachdb/cockroach/sql/mon" + +// Currently we bind an instance of MemoryUsageMonitor to each +// session, and the logical timespan for tracking memory usage is +// also bound to the entire duration of the session. +// +// The "logical timespan" is the duration between the point in time where +// to "begin" monitoring (set counters to 0) and where to "end" +// monitoring (check that if counters != 0 then there was a leak, and +// report that in logs/errors). +// +// Alternatives to define the logical timespan were considered and +// rejected: +// +// - binding to a single statement: fails to track transaction +// state including intents across a transaction. +// - binding to a single transaction attempt: idem. +// - binding to an entire transaction: fails to track the +// ResultList created by Executor.ExecuteStatements which +// stays alive after the transaction commits and until +// pgwire sends the ResultList back to the client. +// - binding to the duration of v3.go:handleExecute(): fails +// to track transaction state that spans across multiple +// separate execute messages. +// +// Ideally we would want a "magic" timespan that extends automatically +// from the start of a transaction to the point where all related +// Results (ResultList items) have been sent back to the +// client. However with this definition and the current code there can +// be multiple such "magic" timespans alive simultaneously. This is +// because a client can start a new transaction before it reads the +// ResultList of a previous transaction, e.g. if issuing `BEGIN; +// SELECT; COMMIT; BEGIN; SELECT; COMMIT` in one pgwire message. +// +// A way forward to implement this "magic" timespan would be to +// fix/implement #7775 (stream results from Executor to pgwire) and +// take care that the corresponding new streaming/pipeline logic +// passes a transaction-bound context to the monitor throughout. + +// OpenAccount interfaces between Session and mon.MemoryUsageMonitor. +func (s *Session) OpenAccount(acc *mon.MemoryAccount) { + s.mon.OpenAccount(s.Ctx(), acc) +} + +// OpenAndInitAccount interfaces between Session and mon.MemoryUsageMonitor. +func (s *Session) OpenAndInitAccount(acc *mon.MemoryAccount, initialAllocation int64) error { + return s.mon.OpenAndInitAccount(s.Ctx(), acc, initialAllocation) +} + +// GrowAccount interfaces between Session and mon.MemoryUsageMonitor. +func (s *Session) GrowAccount(acc *mon.MemoryAccount, extraSize int64) error { + return s.mon.GrowAccount(s.Ctx(), acc, extraSize) +} + +// CloseAccount interfaces between Session and mon.MemoryUsageMonitor. +func (s *Session) CloseAccount(acc *mon.MemoryAccount) { + s.mon.CloseAccount(s.Ctx(), acc) +} + +// ClearAccount interfaces between Session and mon.MemoryUsageMonitor. +func (s *Session) ClearAccount(acc *mon.MemoryAccount) { + s.mon.ClearAccount(s.Ctx(), acc) +} + +// ClearAccountAndAlloc interfaces between Session and mon.MemoryUsageMonitor. +func (s *Session) ClearAccountAndAlloc(acc *mon.MemoryAccount, newSize int64) error { + return s.mon.ClearAccountAndAlloc(s.Ctx(), acc, newSize) +} + +// ResizeItem interfaces between Session and mon.MemoryUsageMonitor. +func (s *Session) ResizeItem(acc *mon.MemoryAccount, oldSize, newSize int64) error { + return s.mon.ResizeItem(s.Ctx(), acc, oldSize, newSize) +} diff --git a/sql/show.go b/sql/show.go index 9ea6bab75416..78bfbe4e0146 100644 --- a/sql/show.go +++ b/sql/show.go @@ -32,25 +32,32 @@ import ( func (p *planner) Show(n *parser.Show) (planNode, error) { name := strings.ToUpper(n.Name) - v := &valuesNode{columns: []ResultColumn{{Name: name, Typ: parser.TypeString}}} + v := p.newContainerValuesNode(ResultColumns{{Name: name, Typ: parser.TypeString}}, 0) + var newRow parser.DTuple switch name { case `DATABASE`: - v.rows = append(v.rows, []parser.Datum{parser.NewDString(p.session.Database)}) + newRow = parser.DTuple{parser.NewDString(p.session.Database)} case `TIME ZONE`: - v.rows = append(v.rows, []parser.Datum{parser.NewDString(p.session.Location.String())}) + newRow = parser.DTuple{parser.NewDString(p.session.Location.String())} case `SYNTAX`: - v.rows = append(v.rows, []parser.Datum{parser.NewDString(parser.Syntax(p.session.Syntax).String())}) + newRow = parser.DTuple{parser.NewDString(parser.Syntax(p.session.Syntax).String())} case `DEFAULT_TRANSACTION_ISOLATION`: level := p.session.DefaultIsolationLevel.String() - v.rows = append(v.rows, []parser.Datum{parser.NewDString(level)}) + newRow = parser.DTuple{parser.NewDString(level)} case `TRANSACTION ISOLATION LEVEL`: - v.rows = append(v.rows, []parser.Datum{parser.NewDString(p.txn.Proto.Isolation.String())}) + newRow = parser.DTuple{parser.NewDString(p.txn.Proto.Isolation.String())} case `TRANSACTION PRIORITY`: - v.rows = append(v.rows, []parser.Datum{parser.NewDString(p.txn.UserPriority.String())}) + newRow = parser.DTuple{parser.NewDString(p.txn.UserPriority.String())} default: return nil, fmt.Errorf("unknown variable: %q", name) } + if newRow != nil { + if err := v.rows.AddRow(newRow); err != nil { + v.rows.Close() + return nil, err + } + } return v, nil } @@ -73,26 +80,29 @@ func (p *planner) ShowColumns(n *parser.ShowColumns) (planNode, error) { return nil, err } - v := &valuesNode{ - columns: []ResultColumn{ - {Name: "Field", Typ: parser.TypeString}, - {Name: "Type", Typ: parser.TypeString}, - {Name: "Null", Typ: parser.TypeBool}, - {Name: "Default", Typ: parser.TypeString}, - }, + columns := ResultColumns{ + {Name: "Field", Typ: parser.TypeString}, + {Name: "Type", Typ: parser.TypeString}, + {Name: "Null", Typ: parser.TypeBool}, + {Name: "Default", Typ: parser.TypeString}, } + v := p.newContainerValuesNode(columns, 0) for i, col := range desc.Columns { defaultExpr := parser.DNull if e := desc.Columns[i].DefaultExpr; e != nil { defaultExpr = parser.NewDString(*e) } - v.rows = append(v.rows, []parser.Datum{ + newRow := parser.DTuple{ parser.NewDString(desc.Columns[i].Name), parser.NewDString(col.Type.SQLString()), parser.MakeDBool(parser.DBool(desc.Columns[i].Nullable)), defaultExpr, - }) + } + if err := v.rows.AddRow(newRow); err != nil { + v.rows.Close() + return nil, err + } } return v, nil } @@ -134,12 +144,11 @@ func (p *planner) ShowCreateTable(n *parser.ShowCreateTable) (planNode, error) { return nil, err } - v := &valuesNode{ - columns: []ResultColumn{ - {Name: "Table", Typ: parser.TypeString}, - {Name: "CreateTable", Typ: parser.TypeString}, - }, + columns := ResultColumns{ + {Name: "Table", Typ: parser.TypeString}, + {Name: "CreateTable", Typ: parser.TypeString}, } + v := p.newContainerValuesNode(columns, 0) var buf bytes.Buffer fmt.Fprintf(&buf, "CREATE TABLE %s (", quoteNames(n.Table.String())) @@ -174,6 +183,7 @@ func (p *planner) ShowCreateTable(n *parser.ShowCreateTable) (planNode, error) { } interleave, err := p.showCreateInterleave(&idx) if err != nil { + v.rows.Close() return nil, err } fmt.Fprintf(&buf, ",\n\t%sINDEX %s (%s)%s%s", @@ -206,16 +216,21 @@ func (p *planner) ShowCreateTable(n *parser.ShowCreateTable) (planNode, error) { } buf.WriteString("\n)") + interleave, err := p.showCreateInterleave(&desc.PrimaryIndex) if err != nil { + v.rows.Close() return nil, err } buf.WriteString(interleave) - v.rows = append(v.rows, []parser.Datum{ + if err := v.rows.AddRow(parser.DTuple{ parser.NewDString(n.Table.String()), parser.NewDString(buf.String()), - }) + }); err != nil { + v.rows.Close() + return nil, err + } return v, nil } @@ -244,17 +259,24 @@ func (p *planner) ShowDatabases(n *parser.ShowDatabases) (planNode, error) { if err != nil { return nil, err } - v := &valuesNode{columns: []ResultColumn{{Name: "Database", Typ: parser.TypeString}}} + v := p.newContainerValuesNode(ResultColumns{{Name: "Database", Typ: parser.TypeString}}, 0) for db := range virtualSchemaMap { - v.rows = append(v.rows, []parser.Datum{parser.NewDString(db)}) + if err := v.rows.AddRow(parser.DTuple{parser.NewDString(db)}); err != nil { + v.rows.Close() + return nil, err + } } for _, row := range sr { _, name, err := encoding.DecodeUnsafeStringAscending( bytes.TrimPrefix(row.Key, prefix), nil) if err != nil { + v.rows.Close() + return nil, err + } + if err := v.rows.AddRow(parser.DTuple{parser.NewDString(name)}); err != nil { + v.rows.Close() return nil, err } - v.rows = append(v.rows, []parser.Datum{parser.NewDString(name)}) } return v, nil } @@ -278,13 +300,12 @@ func (p *planner) ShowGrants(n *parser.ShowGrants) (planNode, error) { objectType = "Table" } - v := &valuesNode{ - columns: []ResultColumn{ - {Name: objectType, Typ: parser.TypeString}, - {Name: "User", Typ: parser.TypeString}, - {Name: "Privileges", Typ: parser.TypeString}, - }, + columns := ResultColumns{ + {Name: objectType, Typ: parser.TypeString}, + {Name: "User", Typ: parser.TypeString}, + {Name: "Privileges", Typ: parser.TypeString}, } + v := p.newContainerValuesNode(columns, 0) var wantedUsers map[string]struct{} if len(n.Grantees) != 0 { wantedUsers = make(map[string]struct{}) @@ -301,11 +322,15 @@ func (p *planner) ShowGrants(n *parser.ShowGrants) (planNode, error) { continue } } - v.rows = append(v.rows, []parser.Datum{ + newRow := parser.DTuple{ parser.NewDString(descriptor.GetName()), parser.NewDString(userPriv.User), parser.NewDString(userPriv.Privileges), - }) + } + if err := v.rows.AddRow(newRow); err != nil { + v.rows.Close() + return nil, err + } } } return v, nil @@ -329,21 +354,20 @@ func (p *planner) ShowIndex(n *parser.ShowIndex) (planNode, error) { return nil, err } - v := &valuesNode{ - columns: []ResultColumn{ - {Name: "Table", Typ: parser.TypeString}, - {Name: "Name", Typ: parser.TypeString}, - {Name: "Unique", Typ: parser.TypeBool}, - {Name: "Seq", Typ: parser.TypeInt}, - {Name: "Column", Typ: parser.TypeString}, - {Name: "Direction", Typ: parser.TypeString}, - {Name: "Storing", Typ: parser.TypeBool}, - }, + columns := ResultColumns{ + {Name: "Table", Typ: parser.TypeString}, + {Name: "Name", Typ: parser.TypeString}, + {Name: "Unique", Typ: parser.TypeBool}, + {Name: "Seq", Typ: parser.TypeInt}, + {Name: "Column", Typ: parser.TypeString}, + {Name: "Direction", Typ: parser.TypeString}, + {Name: "Storing", Typ: parser.TypeBool}, } + v := p.newContainerValuesNode(columns, 0) appendRow := func(index sqlbase.IndexDescriptor, colName string, sequence int, - direction string, isStored bool) { - v.rows = append(v.rows, []parser.Datum{ + direction string, isStored bool) error { + newRow := parser.DTuple{ parser.NewDString(tn.Table()), parser.NewDString(index.Name), parser.MakeDBool(parser.DBool(index.Unique)), @@ -351,17 +375,24 @@ func (p *planner) ShowIndex(n *parser.ShowIndex) (planNode, error) { parser.NewDString(colName), parser.NewDString(direction), parser.MakeDBool(parser.DBool(isStored)), - }) + } + return v.rows.AddRow(newRow) } for _, index := range append([]sqlbase.IndexDescriptor{desc.PrimaryIndex}, desc.Indexes...) { sequence := 1 for i, col := range index.ColumnNames { - appendRow(index, col, sequence, index.ColumnDirections[i].String(), false) + if err := appendRow(index, col, sequence, index.ColumnDirections[i].String(), false); err != nil { + v.rows.Close() + return nil, err + } sequence++ } for _, col := range index.StoreColumnNames { - appendRow(index, col, sequence, "N/A", true) + if err := appendRow(index, col, sequence, "N/A", true); err != nil { + v.rows.Close() + return nil, err + } sequence++ } } @@ -386,15 +417,14 @@ func (p *planner) ShowConstraints(n *parser.ShowConstraints) (planNode, error) { return nil, err } - v := &valuesNode{ - columns: []ResultColumn{ - {Name: "Table", Typ: parser.TypeString}, - {Name: "Name", Typ: parser.TypeString}, - {Name: "Type", Typ: parser.TypeString}, - {Name: "Column(s)", Typ: parser.TypeString}, - {Name: "Details", Typ: parser.TypeString}, - }, + columns := ResultColumns{ + {Name: "Table", Typ: parser.TypeString}, + {Name: "Name", Typ: parser.TypeString}, + {Name: "Type", Typ: parser.TypeString}, + {Name: "Column(s)", Typ: parser.TypeString}, + {Name: "Details", Typ: parser.TypeString}, } + v := p.newContainerValuesNode(columns, 0) info, err := desc.GetConstraintInfo(p.txn) if err != nil { @@ -409,18 +439,23 @@ func (p *planner) ShowConstraints(n *parser.ShowConstraints) (planNode, error) { if c.Columns != nil { columnsDatum = parser.NewDString(strings.Join(c.Columns, ", ")) } - v.rows = append(v.rows, []parser.Datum{ + newRow := []parser.Datum{ parser.NewDString(tn.Table()), parser.NewDString(name), parser.NewDString(string(c.Kind)), columnsDatum, detailsDatum, - }) + } + if err := v.rows.AddRow(newRow); err != nil { + v.Close() + return nil, err + } } // Sort the results by constraint name. sort := &sortNode{ ctx: p.ctx(), + p: p, ordering: sqlbase.ColumnOrdering{ {ColIdx: 0, Direction: encoding.Ascending}, {ColIdx: 1, Direction: encoding.Ascending}, @@ -457,9 +492,12 @@ func (p *planner) ShowTables(n *parser.ShowTables) (planNode, error) { if err != nil { return nil, err } - v := &valuesNode{columns: []ResultColumn{{Name: "Table", Typ: parser.TypeString}}} + v := p.newContainerValuesNode(ResultColumns{{Name: "Table", Typ: parser.TypeString}}, len(tableNames)) for _, name := range tableNames { - v.rows = append(v.rows, []parser.Datum{parser.NewDString(name.Table())}) + if err := v.rows.AddRow(parser.DTuple{parser.NewDString(name.Table())}); err != nil { + v.rows.Close() + return nil, err + } } return v, nil diff --git a/sql/sort.go b/sql/sort.go index 68239aaa444e..2dc9f48cb1ac 100644 --- a/sql/sort.go +++ b/sql/sort.go @@ -36,8 +36,9 @@ import ( // sub-node. type sortNode struct { ctx context.Context + p *planner plan planNode - columns []ResultColumn + columns ResultColumns ordering sqlbase.ColumnOrdering needSort bool @@ -163,7 +164,7 @@ func (p *planner) orderBy(orderBy parser.OrderBy, n planNode) (*sortNode, error) ordering = append(ordering, sqlbase.ColumnOrderInfo{ColIdx: index, Direction: direction}) } - return &sortNode{ctx: p.ctx(), columns: columns, ordering: ordering}, nil + return &sortNode{ctx: p.ctx(), p: p, columns: columns, ordering: ordering}, nil } // colIndex takes an expression that refers to a column using an integer, verifies it refers to a @@ -193,7 +194,7 @@ func colIndex(numOriginalCols int, expr parser.Expr) (int, error) { } } -func (n *sortNode) Columns() []ResultColumn { +func (n *sortNode) Columns() ResultColumns { return n.columns } @@ -233,7 +234,7 @@ func (n *sortNode) ExplainPlan(_ bool) (name, description string, children []pla } var buf bytes.Buffer - var columns []ResultColumn + var columns ResultColumns if n.plan != nil { columns = n.plan.Columns() } @@ -259,7 +260,8 @@ func (n *sortNode) SetLimitHint(numRows int64, soft bool) { } else { // The special value math.MaxInt64 means "no limit". if numRows != math.MaxInt64 { - v := &valuesNode{ordering: n.ordering} + v := n.p.newContainerValuesNode(n.plan.Columns(), int(numRows)) + v.ordering = n.ordering if soft { n.sortStrategy = newIterativeSortStrategy(v) } else { @@ -322,7 +324,8 @@ func (n *sortNode) Next() (bool, error) { n.needSort = false break } else if n.sortStrategy == nil { - v := &valuesNode{ordering: n.ordering} + v := n.p.newContainerValuesNode(n.plan.Columns(), 0) + v.ordering = n.ordering n.sortStrategy = newSortAllStrategy(v) } @@ -349,7 +352,9 @@ func (n *sortNode) Next() (bool, error) { } values := n.plan.Values() - n.sortStrategy.Add(values) + if err := n.sortStrategy.Add(values); err != nil { + return false, err + } if n.explain == explainDebug { // Emit a "buffered" row. @@ -371,6 +376,16 @@ func (n *sortNode) Next() (bool, error) { return true, nil } +func (n *sortNode) Close() { + n.plan.Close() + if n.sortStrategy != nil { + n.sortStrategy.Close() + } + if n.valueIter != nil { + n.valueIter.Close() + } +} + // valueIterator provides iterative access to a value source's values and // debug values. It is a subset of the planNode interface, so all methods // should conform to the comments expressed in the planNode definition. @@ -378,6 +393,7 @@ type valueIterator interface { Next() (bool, error) Values() parser.DTuple DebugValues() debugValues + Close() } type sortingStrategy interface { @@ -385,7 +401,7 @@ type sortingStrategy interface { // Add adds a single value to the sortingStrategy. It guarantees that // if it decided to store the provided value, that it will make a deep // copy of it. - Add(parser.DTuple) + Add(parser.DTuple) error // Finish terminates the sorting strategy, allowing for postprocessing // after all values have been provided to the strategy. The method should // not be called more than once, and should only be called after all Add @@ -408,10 +424,10 @@ func newSortAllStrategy(vNode *valuesNode) sortingStrategy { } } -func (ss *sortAllStrategy) Add(values parser.DTuple) { +func (ss *sortAllStrategy) Add(values parser.DTuple) error { valuesCopy := make(parser.DTuple, len(values)) copy(valuesCopy, values) - ss.vNode.rows = append(ss.vNode.rows, valuesCopy) + return ss.vNode.rows.AddRow(valuesCopy) } func (ss *sortAllStrategy) Finish() { @@ -430,6 +446,10 @@ func (ss *sortAllStrategy) DebugValues() debugValues { return ss.vNode.DebugValues() } +func (ss *sortAllStrategy) Close() { + ss.vNode.Close() +} + // iterativeSortStrategy reads in all values into the wrapped valuesNode // and turns the underlying slice into a min-heap. It then pops a value // off of the heap for each call to Next, meaning that it only needs to @@ -452,10 +472,10 @@ func newIterativeSortStrategy(vNode *valuesNode) sortingStrategy { } } -func (ss *iterativeSortStrategy) Add(values parser.DTuple) { +func (ss *iterativeSortStrategy) Add(values parser.DTuple) error { valuesCopy := make(parser.DTuple, len(values)) copy(valuesCopy, values) - ss.vNode.rows = append(ss.vNode.rows, valuesCopy) + return ss.vNode.rows.AddRow(valuesCopy) } func (ss *iterativeSortStrategy) Finish() { @@ -484,6 +504,10 @@ func (ss *iterativeSortStrategy) DebugValues() debugValues { } } +func (ss *iterativeSortStrategy) Close() { + ss.vNode.Close() +} + // sortTopKStrategy creates a max-heap in its wrapped valuesNode and keeps // this heap populated with only the top k values seen. It accomplishes this // by comparing new values (before the deep copy) with the top of the heap. @@ -514,24 +538,29 @@ func newSortTopKStrategy(vNode *valuesNode, topK int64) sortingStrategy { return ss } -func (ss *sortTopKStrategy) Add(values parser.DTuple) { +func (ss *sortTopKStrategy) Add(values parser.DTuple) error { switch { case int64(ss.vNode.Len()) < ss.topK: // The first k values all go into the max-heap. valuesCopy := make(parser.DTuple, len(values)) copy(valuesCopy, values) - ss.vNode.PushValues(valuesCopy) - case ss.vNode.ValuesLess(values, ss.vNode.rows[0]): + if err := ss.vNode.PushValues(valuesCopy); err != nil { + return err + } + case ss.vNode.ValuesLess(values, ss.vNode.rows.At(0)): // Once the heap is full, only replace the top // value if a new value is less than it. If so // replace and fix the heap. valuesCopy := make(parser.DTuple, len(values)) copy(valuesCopy, values) - ss.vNode.rows[0] = valuesCopy + if err := ss.vNode.rows.Replace(0, valuesCopy); err != nil { + return err + } heap.Fix(ss.vNode, 0) } + return nil } func (ss *sortTopKStrategy) Finish() { @@ -542,7 +571,7 @@ func (ss *sortTopKStrategy) Finish() { for ss.vNode.Len() > 0 { heap.Pop(ss.vNode) } - ss.vNode.rows = ss.vNode.rows[:origLen] + ss.vNode.rows.ResetLen(origLen) } func (ss *sortTopKStrategy) Next() (bool, error) { @@ -557,6 +586,10 @@ func (ss *sortTopKStrategy) DebugValues() debugValues { return ss.vNode.DebugValues() } +func (ss *sortTopKStrategy) Close() { + ss.vNode.Close() +} + // TODO(pmattis): If the result set is large, we might need to perform the // sort on disk. There is no point in doing this while we're buffering the // entire result set in memory. If/when we start streaming results back to diff --git a/sql/split.go b/sql/split.go index 1250b4c52c75..2a09f50d693c 100644 --- a/sql/split.go +++ b/sql/split.go @@ -149,8 +149,8 @@ func (n *splitNode) Values() parser.DTuple { } } -func (*splitNode) Columns() []ResultColumn { - return []ResultColumn{ +func (*splitNode) Columns() ResultColumns { + return ResultColumns{ { Name: "key", Typ: parser.TypeBytes, @@ -162,6 +162,7 @@ func (*splitNode) Columns() []ResultColumn { } } +func (*splitNode) Close() {} func (*splitNode) Ordering() orderingInfo { return orderingInfo{} } func (*splitNode) ExplainTypes(_ func(string, string)) {} func (*splitNode) SetLimitHint(_ int64, _ bool) {} diff --git a/sql/subquery.go b/sql/subquery.go index ab3b11ffa001..7636c94449b9 100644 --- a/sql/subquery.go +++ b/sql/subquery.go @@ -112,6 +112,7 @@ func (s *subquery) doEval() (parser.Datum, error) { // For EXISTS expressions, all we want to know is if there is at least one // result. next, err := s.plan.Next() + s.plan.Close() if s.err = err; err != nil { return result, err } @@ -142,6 +143,7 @@ func (s *subquery) doEval() (parser.Datum, error) { rows = append(rows, &valuesCopy) } } + s.plan.Close() if s.err = err; err != nil { return result, err } @@ -152,11 +154,14 @@ func (s *subquery) doEval() (parser.Datum, error) { case execModeOneRow: result = parser.DNull - next, err := s.plan.Next() + hasRow, err := s.plan.Next() if s.err = err; err != nil { + s.plan.Close() return result, err } - if next { + if !hasRow { + s.plan.Close() + } else { values := s.plan.Values() switch len(values) { case 1: @@ -167,6 +172,7 @@ func (s *subquery) doEval() (parser.Datum, error) { result = &valuesCopy } another, err := s.plan.Next() + s.plan.Close() if s.err = err; err != nil { return result, err } diff --git a/sql/table_join.go b/sql/table_join.go index 66f92b1d6375..e3026c0dbf22 100644 --- a/sql/table_join.go +++ b/sql/table_join.go @@ -48,7 +48,7 @@ type joinNode struct { pred joinPredicate // columns contains the metadata for the results of this node. - columns []ResultColumn + columns ResultColumns // output contains the last generated row of results from this node. output parser.DTuple @@ -56,6 +56,7 @@ type joinNode struct { // rightRows contains a copy of all rows from the data source on the // right of the join. rightRows *valuesNode + // rightMatched remembers which of the right rows have matched in a // full outer join. rightMatched []bool @@ -290,7 +291,7 @@ func (p *usingPredicate) prepareRow(result parser.DTuple, leftRow parser.DTuple, // pickUsingColumn searches for a column whose name matches colName. // The column index and type are returned if found, otherwise an error // is reported. -func pickUsingColumn(cols []ResultColumn, colName string, context string) (int, parser.Datum, error) { +func pickUsingColumn(cols ResultColumns, colName string, context string) (int, parser.Datum, error) { idx := invalidColIdx for j, col := range cols { if col.hidden { @@ -323,7 +324,7 @@ func (p *planner) makeUsingPredicate( usedRight[i] = invalidColIdx } seenNames := make(map[string]struct{}) - columns := make([]ResultColumn, 0, len(left.sourceColumns)+len(right.sourceColumns)-len(colNames)) + columns := make(ResultColumns, 0, len(left.sourceColumns)+len(right.sourceColumns)-len(colNames)) // Find out which columns are involved in the USING clause. for i, unnormalizedColName := range colNames { @@ -502,12 +503,13 @@ func (p *planner) makeJoin( return planDataSource{ info: info, plan: &joinNode{ - joinType: typ, - left: left.plan, - right: right.plan, - pred: pred, - columns: info.sourceColumns, - swapped: swapped, + joinType: typ, + left: left.plan, + right: right.plan, + pred: pred, + columns: info.sourceColumns, + swapped: swapped, + rightRows: p.newContainerValuesNode(right.plan.Columns(), 0), }, }, nil } @@ -565,7 +567,7 @@ func (n *joinNode) ExplainPlan(v bool) (name, description string, children []pla } // Columns implements the planNode interface. -func (n *joinNode) Columns() []ResultColumn { return n.columns } +func (n *joinNode) Columns() ResultColumns { return n.columns } // Ordering implements the planNode interface. func (n *joinNode) Ordering() orderingInfo { return n.left.Ordering() } @@ -595,7 +597,6 @@ func (n *joinNode) Start() error { if n.explain != explainDebug { // Load all the rows from the right side in memory. - v := &valuesNode{} for { hasRow, err := n.right.Next() if err != nil { @@ -607,10 +608,13 @@ func (n *joinNode) Start() error { row := n.right.Values() newRow := make([]parser.Datum, len(row)) copy(newRow, row) - v.rows = append(v.rows, newRow) + if err := n.rightRows.rows.AddRow(newRow); err != nil { + return err + } } - if len(v.rows) > 0 { - n.rightRows = v + if n.rightRows.Len() == 0 { + n.rightRows.Close() + n.rightRows = nil } } @@ -628,7 +632,7 @@ func (n *joinNode) Start() error { // If needed, allocate an array of booleans to remember which // right rows have matched. if n.joinType == joinTypeOuterFull && n.rightRows != nil { - n.rightMatched = make([]bool, len(n.rightRows.rows)) + n.rightMatched = make([]bool, n.rightRows.rows.Len()) n.emptyLeft = make(parser.DTuple, len(n.left.Columns())) for i := range n.emptyLeft { n.emptyLeft[i] = parser.DNull @@ -669,7 +673,7 @@ func (n *joinNode) Next() (bool, error) { } nRightRows = 0 } else { - nRightRows = len(n.rightRows.rows) + nRightRows = n.rightRows.Len() } // We fetch one row at a time until we find one that passes the filter. @@ -685,7 +689,7 @@ func (n *joinNode) Next() (bool, error) { continue } leftRow = n.emptyLeft - rightRow = n.rightRows.rows[curRightIdx] + rightRow = n.rightRows.rows.At(curRightIdx) break } else { // Both right and left exhausted. @@ -701,6 +705,7 @@ func (n *joinNode) Next() (bool, error) { if !leftHasRow && n.rightMatched != nil { // Go through the remaining right rows. + n.left.Close() n.rightIdx = -1 continue } @@ -730,7 +735,7 @@ func (n *joinNode) Next() (bool, error) { emptyRight := false if nRightRows > 0 { - rightRow = n.rightRows.rows[curRightIdx] + rightRow = n.rightRows.rows.At(curRightIdx) n.rightIdx = curRightIdx + 1 } else { emptyRight = true @@ -778,3 +783,14 @@ func (n *joinNode) DebugValues() debugValues { } return res } + +// Close implements the planNode interface. +func (n *joinNode) Close() { + if n.rightRows != nil { + n.rightRows.Close() + n.rightRows = nil + } + n.rightMatched = nil + n.right.Close() + n.left.Close() +} diff --git a/sql/testdata/explain b/sql/testdata/explain index 2228110b0dea..2507f5689533 100644 --- a/sql/testdata/explain +++ b/sql/testdata/explain @@ -68,8 +68,8 @@ Level Type Description Columns 0 select ("Cumulative Time", Duration, "Span Pos", Operation, Event, RowIdx, Key, Value, Disposition) +9,+"Span Pos" 1 sort +9,+"Span Pos" ("Cumulative Time", Duration, "Span Pos", Operation, Event, RowIdx, Key, Value, Disposition) +9,+"Span Pos" 2 explain trace ("Cumulative Time", Duration, "Span Pos", Operation, Event, RowIdx, Key, Value, Disposition) -3 select (RowIdx, Key, Value, Disposition) -4 render/filter(debug) from () (RowIdx, Key, Value, Disposition) +3 select ("1") +4 render/filter(debug) from () ("1") 5 empty - () # Ensure that all relevant statement types can be explained diff --git a/sql/trace.go b/sql/trace.go index 296f36f3f732..22ebf66bcb0a 100644 --- a/sql/trace.go +++ b/sql/trace.go @@ -20,12 +20,11 @@ import ( "fmt" "time" - "golang.org/x/net/context" - "github.com/cockroachdb/cockroach/internal/client" "github.com/cockroachdb/cockroach/sql/parser" "github.com/cockroachdb/cockroach/sql/sqlbase" "github.com/cockroachdb/cockroach/util/encoding" + "github.com/cockroachdb/cockroach/util/tracing" "github.com/opentracing/basictracer-go" "github.com/opentracing/opentracing-go" ) @@ -43,7 +42,7 @@ type explainTraceNode struct { lastPos int } -var traceColumns = append([]ResultColumn{ +var traceColumns = append(ResultColumns{ {Name: "Cumulative Time", Typ: parser.TypeString}, {Name: "Duration", Typ: parser.TypeString}, {Name: "Span Pos", Typ: parser.TypeInt}, @@ -56,25 +55,44 @@ var traceOrdering = sqlbase.ColumnOrdering{ {ColIdx: 2, Direction: encoding.Ascending}, /* Span pos */ } -func makeTraceNode(plan planNode, txn *client.Txn) planNode { +func (p *planner) makeTraceNode(plan planNode, txn *client.Txn) planNode { return &selectTopNode{ source: &explainTraceNode{ plan: plan, txn: txn, }, sort: &sortNode{ - // Don't use the planner context: this sort node is sorting the - // trace events themselves; we don't want any events from this sort - // node to show up in the EXPLAIN TRACE output. - ctx: context.Background(), + // Generally, sortNode uses its ctx field to log sorting + // details. However the user of EXPLAIN(TRACE) only wants + // details about the traced statement, not about the sortNode + // that does work on behalf of the EXPLAIN statement itself. So + // we connect this sortNode to a different context, so its log + // messages do not go to the planner's context which will be + // responsible to collect the trace. + + // TODO(andrei): I think ideally we would also use the planner's + // Span, but create a sub-span for the Sorting with a special + // tag that is ignored by the EXPLAIN TRACE logic. This way, + // this sorting would still appear in our debug tracing, it just + // wouldn't be reported. Of course, currently statements under + // EXPLAIN TRACE are not present in our normal debug tracing at + // all since we override the tracer, but I'm hoping to stop + // doing that. And even then I'm not completely sure how this + // would work exactly, since the sort node "wraps" the inner + // select node, but we can probably do something using + // opentracing's "follows-from" spans as opposed to + // "parent-child" spans when expressing this relationship + // between the sort and the select. + ctx: opentracing.ContextWithSpan(tracing.WithTracer(p.ctx(), nil), nil), + p: p, ordering: traceOrdering, columns: traceColumns, }, } } -func (*explainTraceNode) Columns() []ResultColumn { return traceColumns } -func (*explainTraceNode) Ordering() orderingInfo { return orderingInfo{} } +func (*explainTraceNode) Columns() ResultColumns { return traceColumns } +func (*explainTraceNode) Ordering() orderingInfo { return orderingInfo{} } func (n *explainTraceNode) expandPlan() error { if err := n.plan.expandPlan(); err != nil { @@ -86,6 +104,7 @@ func (n *explainTraceNode) expandPlan() error { } func (n *explainTraceNode) Start() error { return n.plan.Start() } +func (n *explainTraceNode) Close() { n.plan.Close() } func (n *explainTraceNode) Next() (bool, error) { first := n.rows == nil diff --git a/sql/union.go b/sql/union.go index d78f891e28ca..eda46b778747 100644 --- a/sql/union.go +++ b/sql/union.go @@ -136,8 +136,8 @@ type unionNode struct { debugVals debugValues } -func (n *unionNode) Columns() []ResultColumn { return n.left.Columns() } -func (n *unionNode) Ordering() orderingInfo { return orderingInfo{} } +func (n *unionNode) Columns() ResultColumns { return n.left.Columns() } +func (n *unionNode) Ordering() orderingInfo { return orderingInfo{} } func (n *unionNode) Values() parser.DTuple { switch { @@ -208,6 +208,7 @@ func (n *unionNode) readRight() (bool, error) { } n.rightDone = true + n.right.Close() return n.readLeft() } @@ -242,6 +243,7 @@ func (n *unionNode) readLeft() (bool, error) { return false, err } n.leftDone = true + n.left.Close() return false, nil } @@ -270,6 +272,15 @@ func (n *unionNode) Next() (bool, error) { } } +func (n *unionNode) Close() { + switch { + case !n.rightDone: + n.right.Close() + case !n.leftDone: + n.left.Close() + } +} + // unionNodeEmit represents the emitter logic for one of the six combinations of // UNION/INTERSECT/EXCEPT and ALL/DISTINCE. As right and then left are iterated, // state is kept and used to compute the set operation as well as distinctness. diff --git a/sql/update.go b/sql/update.go index 401a7007f997..862f45dcd64c 100644 --- a/sql/update.go +++ b/sql/update.go @@ -284,6 +284,10 @@ func (u *updateNode) Start() error { return u.run.tw.init(u.p.txn) } +func (u *updateNode) Close() { + u.run.rows.Close() +} + func (u *updateNode) Next() (bool, error) { next, err := u.run.rows.Next() if !next { @@ -381,7 +385,7 @@ func fillDefault( return expr } -func (u *updateNode) Columns() []ResultColumn { +func (u *updateNode) Columns() ResultColumns { return u.rh.columns } diff --git a/sql/values.go b/sql/values.go index 7415fc2be6fc..1e61d45900a9 100644 --- a/sql/values.go +++ b/sql/values.go @@ -31,16 +31,24 @@ import ( type valuesNode struct { n *parser.ValuesClause p *planner - columns []ResultColumn + columns ResultColumns ordering sqlbase.ColumnOrdering tuples [][]parser.TypedExpr - rows []parser.DTuple + rows *RowContainer desiredTypes []parser.Datum // This can be removed when we only type check once. nextRow int // The index of the next row. invertSorting bool // Inverts the sorting predicate. tmpValues parser.DTuple // Used to store temporary values. + err error // Used to propagate errors during heap operations. +} + +func (p *planner) newContainerValuesNode(columns ResultColumns, capacity int) *valuesNode { + return &valuesNode{ + columns: columns, + rows: p.NewRowContainer(columns, capacity), + } } func (p *planner) ValuesClause(n *parser.ValuesClause, desiredTypes []parser.Datum) (planNode, error) { @@ -58,7 +66,7 @@ func (p *planner) ValuesClause(n *parser.ValuesClause, desiredTypes []parser.Dat v.tuples = make([][]parser.TypedExpr, 0, len(n.Tuples)) tupleBuf := make([]parser.TypedExpr, len(n.Tuples)*numCols) - v.columns = make([]ResultColumn, 0, numCols) + v.columns = make(ResultColumns, 0, numCols) for num, tuple := range n.Tuples { if a, e := len(tuple.Exprs), numCols; a != e { @@ -128,8 +136,9 @@ func (n *valuesNode) Start() error { // others that create a valuesNode internally for storing results // from other planNodes), so its expressions need evaluting. // This may run subqueries. + n.rows = n.p.NewRowContainer(n.columns, len(n.n.Tuples)) + numCols := len(n.columns) - n.rows = make([]parser.DTuple, 0, len(n.n.Tuples)) rowBuf := make([]parser.Datum, len(n.n.Tuples)*numCols) for _, tupleRow := range n.tuples { // Chop off prefix of rowBuf and limit its capacity. @@ -147,13 +156,15 @@ func (n *valuesNode) Start() error { return err } } - n.rows = append(n.rows, row) + if err := n.rows.AddRow(row); err != nil { + return err + } } return nil } -func (n *valuesNode) Columns() []ResultColumn { +func (n *valuesNode) Columns() ResultColumns { return n.columns } @@ -162,37 +173,45 @@ func (n *valuesNode) Ordering() orderingInfo { } func (n *valuesNode) Values() parser.DTuple { - return n.rows[n.nextRow-1] + return n.rows.At(n.nextRow - 1) } func (*valuesNode) MarkDebug(_ explainMode) {} func (n *valuesNode) DebugValues() debugValues { + val := n.rows.At(n.nextRow - 1) return debugValues{ rowIdx: n.nextRow - 1, key: fmt.Sprintf("%d", n.nextRow-1), - value: n.rows[n.nextRow-1].String(), + value: val.String(), output: debugValueRow, } } func (n *valuesNode) Next() (bool, error) { - if n.nextRow >= len(n.rows) { + if n.nextRow >= n.rows.Len() { return false, nil } n.nextRow++ return true, nil } +func (n *valuesNode) Close() { + if n.rows != nil { + n.rows.Close() + n.rows = nil + } +} + func (n *valuesNode) Len() int { - return len(n.rows) + return n.rows.Len() } func (n *valuesNode) Less(i, j int) bool { // TODO(pmattis): An alternative to this type of field-based comparison would // be to construct a sort-key per row using encodeTableKey(). Using a // sort-key approach would likely fit better with a disk-based sort. - ra, rb := n.rows[i], n.rows[j] + ra, rb := n.rows.At(i), n.rows.At(j) return n.invertSorting != n.ValuesLess(ra, rb) } @@ -222,31 +241,28 @@ func (n *valuesNode) ValuesLess(ra, rb parser.DTuple) bool { } func (n *valuesNode) Swap(i, j int) { - n.rows[i], n.rows[j] = n.rows[j], n.rows[i] + n.rows.Swap(i, j) } var _ heap.Interface = (*valuesNode)(nil) // Push implements the heap.Interface interface. func (n *valuesNode) Push(x interface{}) { - n.rows = append(n.rows, n.tmpValues) + n.err = n.rows.AddRow(n.tmpValues) } // PushValues pushes the given DTuple value into the heap representation // of the valuesNode. -func (n *valuesNode) PushValues(values parser.DTuple) { +func (n *valuesNode) PushValues(values parser.DTuple) error { // Avoid passing slice through interface{} to avoid allocation. n.tmpValues = values heap.Push(n, nil) + return n.err } // Pop implements the heap.Interface interface. func (n *valuesNode) Pop() interface{} { - idx := len(n.rows) - 1 - // Returning a pointer to avoid an allocation when storing the slice in an interface{}. - x := &(n.rows)[idx] - n.rows = n.rows[:idx] - return x + return n.rows.PseudoPop() } // PopValues pops the top DTuple value off the heap representation diff --git a/sql/values_test.go b/sql/values_test.go index b0bdb161a335..2a8530e4fef3 100644 --- a/sql/values_test.go +++ b/sql/values_test.go @@ -30,12 +30,15 @@ import ( "github.com/cockroachdb/cockroach/util/leaktest" "github.com/cockroachdb/cockroach/util/timeutil" "github.com/pkg/errors" + "golang.org/x/net/context" ) func TestValues(t *testing.T) { defer leaktest.AfterTest(t)() p := makePlanner() + p.session.mon.StartMonitor() + defer p.session.mon.StopMonitor(context.Background()) vInt := int64(5) vNum := 3.14159 @@ -119,6 +122,7 @@ func TestValues(t *testing.T) { t.Errorf("%d: error_expected=%t, but got error %v", i, tc.ok, err) } if plan != nil { + defer plan.Close() if err := plan.expandPlan(); err != nil { t.Errorf("%d: unexpected error in expandPlan: %v", i, err) continue diff --git a/sql/virtual_schema.go b/sql/virtual_schema.go index 7984c9981f23..4a0baa0c92fe 100644 --- a/sql/virtual_schema.go +++ b/sql/virtual_schema.go @@ -44,7 +44,7 @@ type virtualSchema struct { // virtualSchemaTable represents a table within a virtualSchema. type virtualSchemaTable struct { schema string - populate func(p *planner, addRow func(...parser.Datum)) error + populate func(p *planner, addRow func(...parser.Datum) error) error } // virtualSchemas holds a slice of statically registered virtualSchema objects. @@ -93,21 +93,23 @@ type virtualTableEntry struct { // getValuesNode returns a new valuesNode for the virtual table using the // provided planner. func (e virtualTableEntry) getValuesNode(p *planner) (*valuesNode, error) { - v := &valuesNode{} + var columns ResultColumns for _, col := range e.desc.Columns { - v.columns = append(v.columns, ResultColumn{ + columns = append(columns, ResultColumn{ Name: col.Name, Typ: col.Type.ToDatumType(), }) } + v := p.newContainerValuesNode(columns, 0) - err := e.tableDef.populate(p, func(datum ...parser.Datum) { + err := e.tableDef.populate(p, func(datum ...parser.Datum) error { if r, c := len(datum), len(v.columns); r != c { panic(fmt.Sprintf("datum row count and column count differ: %d vs %d", r, c)) } - v.rows = append(v.rows, datum) + return v.rows.AddRow(datum) }) if err != nil { + v.Close() return nil, err } return v, nil diff --git a/util/roundup2.go b/util/roundup2.go new file mode 100644 index 000000000000..613f729a0c3e --- /dev/null +++ b/util/roundup2.go @@ -0,0 +1,30 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Raphael 'kena' Poss (knz@cockroachlabs.com) + +package util + +// RoundUpPowerOfTwo returns the first power of 2 greater or equal to the number. +// Source: http://graphics.stanford.edu/%7Eseander/bithacks.html#RoundUpPowerOf2 +func RoundUpPowerOfTwo(x int64) int64 { + x-- + x |= x >> 1 + x |= x >> 2 + x |= x >> 4 + x |= x >> 8 + x |= x >> 16 + x |= x >> 32 + return x + 1 +} diff --git a/util/smalltrace.go b/util/smalltrace.go new file mode 100644 index 000000000000..0010b7b25a02 --- /dev/null +++ b/util/smalltrace.go @@ -0,0 +1,47 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Raphael 'kena' Poss (knz@cockroachlabs.com) + +package util + +import ( + "runtime" + "strings" +) + +// GetTopCallers populates an array with the names of the topmost 5 +// caller functions in the stack after skipping a given number of +// frames. We use this to provide context to allocations in the logs +// with high verbosity. +func GetTopCallers(callers []string, skip int) { + var pc [5]uintptr + nCallers := runtime.Callers(skip+1, pc[:]) + for i := 0; i < nCallers; i++ { + name := runtime.FuncForPC(pc[i]).Name() + const crl = "github.com/cockroachdb/cockroach/" + if strings.HasPrefix(name, crl) { + name = name[len(crl):] + } + callers[i] = name + } +} + +// GetSmallTrace produces a ":"-separated single line containing the +// topmost 5 callers from a given skip level. +func GetSmallTrace(skip int) string { + var callers [5]string + GetTopCallers(callers[0:], skip+2) + return strings.Join(callers[0:], ":") +} diff --git a/util/smalltrace_test.go b/util/smalltrace_test.go new file mode 100644 index 000000000000..71b62c8ea1a0 --- /dev/null +++ b/util/smalltrace_test.go @@ -0,0 +1,37 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Raphael 'kena' Poss (knz@cockroachlabs.com) + +package util + +import ( + "strings" + "testing" +) + +func testSmallTrace2(t *testing.T) { + s := GetSmallTrace(2) + if !strings.Contains(s, "TestGenerateSmallTrace") { + t.Fatalf("trace not generated properly: %q", s) + } +} + +func testSmallTrace(t *testing.T) { + testSmallTrace2(t) +} + +func TestGenerateSmallTrace(t *testing.T) { + testSmallTrace(t) +}