Skip to content

Commit

Permalink
sql: preliminary mechanism to track and limit SQL memory usage.
Browse files Browse the repository at this point in the history
This patch instruments SQL execution to track memory allocations whose
amounts depend on user input and current database contents (as opposed
to allocations dependent on other parameters e.g. statement size).

It does so by introducing a new MemoryUsageMonitor object intended
to be instantiated once per session. It is set up and teared down
with the lifecycle of a session.

Components can then link and report their memory usage to the monitor
via span objects. Spans can gate incremental allocations and keep
track of the cumulative allocations so that all can be released at
once.

This  is used to track and limits allocations:
- in `valueNode`,
- buckets in `groupNode`,
- sorted data in `sortNode`,
- temp results in `joinNode`,
- seen prefixes and suffixes in `distinctNode`,
- the Result arrays in Executor,
- prepared statements and prepared portals in pgwire.

A moderate amount of intelligence is implemented so as to avoid
computing sizes for every column of every row in a valueNode - the
combined size of all fixed-size columns is precomputed and counted at
once for every row.

This patch does not track the memory allocated for write intents in
the KV transaction object.

For troubleshooting the following mechanisms are provided:

- an env var COCKROACH_NOTEWORTHY_MEMORY_USAGE, if set, defines the
  minimum total allocated size for a monitor before it starts
  logging how much memory it is consuming.

- detailed allocation progress is logged at level 2 or above. To
  trace just SQL activity and memory allocation, one can use for
  example `--vmodule=executor=2,mem_usage=2`.
  • Loading branch information
knz committed Sep 6, 2016
1 parent 1fe9e73 commit 148a9bd
Show file tree
Hide file tree
Showing 51 changed files with 1,663 additions and 348 deletions.
87 changes: 58 additions & 29 deletions server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,16 +165,20 @@ func (s *adminServer) Databases(
) (*serverpb.DatabasesResponse, error) {
args := sql.SessionArgs{User: s.getUser(req)}
session := sql.NewSession(ctx, args, s.server.sqlExecutor, nil)
defer session.Finish()

r := s.server.sqlExecutor.ExecuteStatements(session, "SHOW DATABASES;", nil)
defer r.Close()
if err := s.checkQueryResults(r.ResultList, 1); err != nil {
return nil, s.serverError(err)
}

var resp serverpb.DatabasesResponse
for _, row := range r.ResultList[0].Rows {
dbname, ok := row.Values[0].(*parser.DString)
for i, nRows := 0, r.ResultList[0].Rows.Len(); i < nRows; i++ {
row := r.ResultList[0].Rows.At(i)
dbname, ok := row[0].(*parser.DString)
if !ok {
return nil, s.serverErrorf("type assertion failed on db name: %T", row.Values[0])
return nil, s.serverErrorf("type assertion failed on db name: %T", row[0])
}
resp.Databases = append(resp.Databases, string(*dbname))
}
Expand All @@ -189,6 +193,7 @@ func (s *adminServer) DatabaseDetails(
) (*serverpb.DatabaseDetailsResponse, error) {
args := sql.SessionArgs{User: s.getUser(req)}
session := sql.NewSession(ctx, args, s.server.sqlExecutor, nil)
defer session.Finish()

// Placeholders don't work with SHOW statements, so we need to manually
// escape the database name.
Expand All @@ -197,6 +202,7 @@ func (s *adminServer) DatabaseDetails(
escDBName := parser.Name(req.Database).String()
query := fmt.Sprintf("SHOW GRANTS ON DATABASE %s; SHOW TABLES FROM %s;", escDBName, escDBName)
r := s.server.sqlExecutor.ExecuteStatements(session, query, nil)
defer r.Close()
if err := s.firstNotFoundError(r.ResultList); err != nil {
return nil, grpc.Errorf(codes.NotFound, "%s", err)
}
Expand All @@ -213,7 +219,8 @@ func (s *adminServer) DatabaseDetails(
)

scanner := makeResultScanner(r.ResultList[0].Columns)
for _, row := range r.ResultList[0].Rows {
for i, nRows := 0, r.ResultList[0].Rows.Len(); i < nRows; i++ {
row := r.ResultList[0].Rows.At(i)
// Marshal grant, splitting comma-separated privileges into a proper slice.
var grant serverpb.DatabaseDetailsResponse_Grant
var privileges string
Expand All @@ -235,7 +242,8 @@ func (s *adminServer) DatabaseDetails(
if a, e := len(r.ResultList[1].Columns), 1; a != e {
return nil, s.serverErrorf("show tables columns mismatch: %d != expected %d", a, e)
}
for _, row := range r.ResultList[1].Rows {
for i, nRows := 0, r.ResultList[1].Rows.Len(); i < nRows; i++ {
row := r.ResultList[1].Rows.At(i)
var tableName string
if err := scanner.Scan(row, tableCol, &tableName); err != nil {
return nil, err
Expand All @@ -254,6 +262,7 @@ func (s *adminServer) TableDetails(
) (*serverpb.TableDetailsResponse, error) {
args := sql.SessionArgs{User: s.getUser(req)}
session := sql.NewSession(ctx, args, s.server.sqlExecutor, nil)
defer session.Finish()

// TODO(cdo): Use real placeholders for the table and database names when we've extended our SQL
// grammar to allow that.
Expand All @@ -263,6 +272,7 @@ func (s *adminServer) TableDetails(
query := fmt.Sprintf("SHOW COLUMNS FROM %s; SHOW INDEX FROM %s; SHOW GRANTS ON TABLE %s; SHOW CREATE TABLE %s;",
escQualTable, escQualTable, escQualTable, escQualTable)
r := s.server.sqlExecutor.ExecuteStatements(session, query, nil)
defer r.Close()
if err := s.firstNotFoundError(r.ResultList); err != nil {
return nil, grpc.Errorf(codes.NotFound, "%s", err)
}
Expand All @@ -286,7 +296,8 @@ func (s *adminServer) TableDetails(
defaultCol = "Default"
)
scanner := makeResultScanner(r.ResultList[0].Columns)
for _, row := range r.ResultList[0].Rows {
for i, nRows := 0, r.ResultList[0].Rows.Len(); i < nRows; i++ {
row := r.ResultList[0].Rows.At(i)
var col serverpb.TableDetailsResponse_Column
if err := scanner.Scan(row, fieldCol, &col.Name); err != nil {
return nil, err
Expand Down Expand Up @@ -321,7 +332,8 @@ func (s *adminServer) TableDetails(
storingCol = "Storing"
)
scanner := makeResultScanner(r.ResultList[1].Columns)
for _, row := range r.ResultList[1].Rows {
for i, nRows := 0, r.ResultList[1].Rows.Len(); i < nRows; i++ {
row := r.ResultList[1].Rows.At(i)
// Marshal grant, splitting comma-separated privileges into a proper slice.
var index serverpb.TableDetailsResponse_Index
if err := scanner.Scan(row, nameCol, &index.Name); err != nil {
Expand Down Expand Up @@ -353,7 +365,8 @@ func (s *adminServer) TableDetails(
privilegesCol = "Privileges"
)
scanner := makeResultScanner(r.ResultList[2].Columns)
for _, row := range r.ResultList[2].Rows {
for i, nRows := 0, r.ResultList[2].Rows.Len(); i < nRows; i++ {
row := r.ResultList[2].Rows.At(i)
// Marshal grant, splitting comma-separated privileges into a proper slice.
var grant serverpb.TableDetailsResponse_Grant
var privileges string
Expand All @@ -372,13 +385,13 @@ func (s *adminServer) TableDetails(
{
const createTableCol = "CreateTable"
showResult := r.ResultList[3]
if len(showResult.Rows) != 1 {
if showResult.Rows.Len() != 1 {
return nil, s.serverErrorf("CreateTable response not available.")
}

scanner := makeResultScanner(showResult.Columns)
var createStmt string
if err := scanner.Scan(showResult.Rows[0], createTableCol, &createStmt); err != nil {
if err := scanner.Scan(showResult.Rows.At(0), createTableCol, &createStmt); err != nil {
return nil, err
}

Expand Down Expand Up @@ -565,15 +578,19 @@ func (s *adminServer) TableStats(ctx context.Context, req *serverpb.TableStatsRe
func (s *adminServer) Users(ctx context.Context, req *serverpb.UsersRequest) (*serverpb.UsersResponse, error) {
args := sql.SessionArgs{User: s.getUser(req)}
session := sql.NewSession(ctx, args, s.server.sqlExecutor, nil)
defer session.Finish()

query := "SELECT username FROM system.users"
r := s.server.sqlExecutor.ExecuteStatements(session, query, nil)
defer r.Close()
if err := s.checkQueryResults(r.ResultList, 1); err != nil {
return nil, s.serverError(err)
}

var resp serverpb.UsersResponse
for _, row := range r.ResultList[0].Rows {
resp.Users = append(resp.Users, serverpb.UsersResponse_User{Username: string(*row.Values[0].(*parser.DString))})
for i, nRows := 0, r.ResultList[0].Rows.Len(); i < nRows; i++ {
row := r.ResultList[0].Rows.At(i)
resp.Users = append(resp.Users, serverpb.UsersResponse_User{Username: string(*row[0].(*parser.DString))})
}
return &resp, nil
}
Expand All @@ -586,6 +603,7 @@ func (s *adminServer) Users(ctx context.Context, req *serverpb.UsersRequest) (*s
func (s *adminServer) Events(ctx context.Context, req *serverpb.EventsRequest) (*serverpb.EventsResponse, error) {
args := sql.SessionArgs{User: s.getUser(req)}
session := sql.NewSession(ctx, args, s.server.sqlExecutor, nil)
defer session.Finish()

// Execute the query.
q := makeSQLQuery()
Expand All @@ -604,14 +622,16 @@ func (s *adminServer) Events(ctx context.Context, req *serverpb.EventsRequest) (
return nil, s.serverErrors(q.Errors())
}
r := s.server.sqlExecutor.ExecuteStatements(session, q.String(), q.QueryArguments())
defer r.Close()
if err := s.checkQueryResults(r.ResultList, 1); err != nil {
return nil, s.serverError(err)
}

// Marshal response.
var resp serverpb.EventsResponse
scanner := makeResultScanner(r.ResultList[0].Columns)
for _, row := range r.ResultList[0].Rows {
for i, nRows := 0, r.ResultList[0].Rows.Len(); i < nRows; i++ {
row := r.ResultList[0].Rows.At(i)
var event serverpb.EventsResponse_Event
var ts time.Time
if err := scanner.ScanIndex(row, 0, &ts); err != nil {
Expand Down Expand Up @@ -660,24 +680,26 @@ func (s *adminServer) getUIData(session *sql.Session, user string, keys []string
return nil, s.serverErrorf("error constructing query: %v", err)
}
r := s.server.sqlExecutor.ExecuteStatements(session, query.String(), query.QueryArguments())
defer r.Close()
if err := s.checkQueryResults(r.ResultList, 1); err != nil {
return nil, s.serverError(err)
}

// Marshal results.
resp := serverpb.GetUIDataResponse{KeyValues: make(map[string]serverpb.GetUIDataResponse_Value)}
for _, row := range r.ResultList[0].Rows {
dKey, ok := row.Values[0].(*parser.DString)
for i, nRows := 0, r.ResultList[0].Rows.Len(); i < nRows; i++ {
row := r.ResultList[0].Rows.At(i)
dKey, ok := row[0].(*parser.DString)
if !ok {
return nil, s.serverErrorf("unexpected type for UI key: %T", row.Values[0])
return nil, s.serverErrorf("unexpected type for UI key: %T", row[0])
}
dValue, ok := row.Values[1].(*parser.DBytes)
dValue, ok := row[1].(*parser.DBytes)
if !ok {
return nil, s.serverErrorf("unexpected type for UI value: %T", row.Values[1])
return nil, s.serverErrorf("unexpected type for UI value: %T", row[1])
}
dLastUpdated, ok := row.Values[2].(*parser.DTimestamp)
dLastUpdated, ok := row[2].(*parser.DTimestamp)
if !ok {
return nil, s.serverErrorf("unexpected type for UI lastUpdated: %T", row.Values[2])
return nil, s.serverErrorf("unexpected type for UI lastUpdated: %T", row[2])
}

resp.KeyValues[string(*dKey)] = serverpb.GetUIDataResponse_Value{
Expand All @@ -697,11 +719,13 @@ func (s *adminServer) SetUIData(ctx context.Context, req *serverpb.SetUIDataRequ

args := sql.SessionArgs{User: s.getUser(req)}
session := sql.NewSession(ctx, args, s.server.sqlExecutor, nil)
defer session.Finish()

for key, val := range req.KeyValues {
// Do an upsert of the key. We update each key in a separate transaction to
// avoid long-running transactions and possible deadlocks.
br := s.server.sqlExecutor.ExecuteStatements(session, "BEGIN;", nil)
defer br.Close()
if err := s.checkQueryResults(br.ResultList, 1); err != nil {
return nil, s.serverError(err)
}
Expand All @@ -720,6 +744,7 @@ func (s *adminServer) SetUIData(ctx context.Context, req *serverpb.SetUIDataRequ
qargs.SetValue(`1`, parser.NewDString(string(val)))
qargs.SetValue(`2`, parser.NewDString(key))
r := s.server.sqlExecutor.ExecuteStatements(session, query, qargs)
defer r.Close()
if err := s.checkQueryResults(r.ResultList, 2); err != nil {
return nil, s.serverError(err)
}
Expand All @@ -732,6 +757,7 @@ func (s *adminServer) SetUIData(ctx context.Context, req *serverpb.SetUIDataRequ
qargs.SetValue(`1`, parser.NewDString(key))
qargs.SetValue(`2`, parser.NewDBytes(parser.DBytes(val)))
r := s.server.sqlExecutor.ExecuteStatements(session, query, qargs)
defer r.Close()
if err := s.checkQueryResults(r.ResultList, 2); err != nil {
return nil, s.serverError(err)
}
Expand All @@ -753,6 +779,7 @@ func (s *adminServer) SetUIData(ctx context.Context, req *serverpb.SetUIDataRequ
func (s *adminServer) GetUIData(ctx context.Context, req *serverpb.GetUIDataRequest) (*serverpb.GetUIDataResponse, error) {
args := sql.SessionArgs{User: s.getUser(req)}
session := sql.NewSession(ctx, args, s.server.sqlExecutor, nil)
defer session.Finish()

if len(req.Keys) == 0 {
return nil, grpc.Errorf(codes.InvalidArgument, "keys cannot be empty")
Expand Down Expand Up @@ -1123,17 +1150,17 @@ func makeResultScanner(cols []sql.ResultColumn) resultScanner {

// IsNull returns whether the specified column of the given row contains
// a SQL NULL value.
func (rs resultScanner) IsNull(row sql.ResultRow, col string) (bool, error) {
func (rs resultScanner) IsNull(row parser.DTuple, col string) (bool, error) {
idx, ok := rs.colNameToIdx[col]
if !ok {
return false, errors.Errorf("result is missing column %s", col)
}
return row.Values[idx] == parser.DNull, nil
return row[idx] == parser.DNull, nil
}

// ScanIndex scans the given column index of the given row into dst.
func (rs resultScanner) ScanIndex(row sql.ResultRow, index int, dst interface{}) error {
src := row.Values[index]
func (rs resultScanner) ScanIndex(row parser.DTuple, index int, dst interface{}) error {
src := row[index]

switch d := dst.(type) {
case *string:
Expand Down Expand Up @@ -1195,7 +1222,7 @@ func (rs resultScanner) ScanIndex(row sql.ResultRow, index int, dst interface{})
}

// Scan scans the column with the given name from the given row into dst.
func (rs resultScanner) Scan(row sql.ResultRow, colName string, dst interface{}) error {
func (rs resultScanner) Scan(row parser.DTuple, colName string, dst interface{}) error {
idx, ok := rs.colNameToIdx[colName]
if !ok {
return errors.Errorf("result is missing column %s", colName)
Expand All @@ -1222,18 +1249,19 @@ func (s *adminServer) queryZone(
params := parser.NewPlaceholderInfo()
params.SetValue(`1`, parser.NewDInt(parser.DInt(id)))
r := s.server.sqlExecutor.ExecuteStatements(session, query, params)
defer r.Close()
if err := s.checkQueryResults(r.ResultList, 1); err != nil {
return config.ZoneConfig{}, false, err
}

result := r.ResultList[0]
if len(result.Rows) == 0 {
if result.Rows.Len() == 0 {
return config.ZoneConfig{}, false, nil
}

var zoneBytes []byte
scanner := resultScanner{}
err := scanner.ScanIndex(result.Rows[0], 0, &zoneBytes)
err := scanner.ScanIndex(result.Rows.At(0), 0, &zoneBytes)
if err != nil {
return config.ZoneConfig{}, false, err
}
Expand Down Expand Up @@ -1270,18 +1298,19 @@ func (s *adminServer) queryNamespaceID(
params.SetValue(`1`, parser.NewDInt(parser.DInt(parentID)))
params.SetValue(`2`, parser.NewDString(name))
r := s.server.sqlExecutor.ExecuteStatements(session, query, params)
defer r.Close()
if err := s.checkQueryResults(r.ResultList, 1); err != nil {
return 0, err
}

result := r.ResultList[0]
if len(result.Rows) == 0 {
if result.Rows.Len() == 0 {
return 0, errors.Errorf("namespace %s with ParentID %d not found", name, parentID)
}

var id int64
scanner := resultScanner{}
err := scanner.ScanIndex(result.Rows[0], 0, &id)
err := scanner.ScanIndex(result.Rows.At(0), 0, &id)
if err != nil {
return 0, err
}
Expand Down
Loading

0 comments on commit 148a9bd

Please sign in to comment.