Skip to content

Commit

Permalink
Merge pull request #8691 from knz/sized-rows
Browse files Browse the repository at this point in the history
sql: preliminary mechanism to track and limit SQL memory usage.
  • Loading branch information
knz authored Sep 10, 2016
2 parents 1e61d60 + 1e04a49 commit f48730d
Show file tree
Hide file tree
Showing 51 changed files with 1,741 additions and 372 deletions.
87 changes: 58 additions & 29 deletions server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,16 +165,20 @@ func (s *adminServer) Databases(
) (*serverpb.DatabasesResponse, error) {
args := sql.SessionArgs{User: s.getUser(req)}
session := sql.NewSession(ctx, args, s.server.sqlExecutor, nil)
defer session.Finish()

r := s.server.sqlExecutor.ExecuteStatements(session, "SHOW DATABASES;", nil)
defer r.Close()
if err := s.checkQueryResults(r.ResultList, 1); err != nil {
return nil, s.serverError(err)
}

var resp serverpb.DatabasesResponse
for _, row := range r.ResultList[0].Rows {
dbname, ok := row.Values[0].(*parser.DString)
for i, nRows := 0, r.ResultList[0].Rows.Len(); i < nRows; i++ {
row := r.ResultList[0].Rows.At(i)
dbname, ok := row[0].(*parser.DString)
if !ok {
return nil, s.serverErrorf("type assertion failed on db name: %T", row.Values[0])
return nil, s.serverErrorf("type assertion failed on db name: %T", row[0])
}
resp.Databases = append(resp.Databases, string(*dbname))
}
Expand All @@ -189,6 +193,7 @@ func (s *adminServer) DatabaseDetails(
) (*serverpb.DatabaseDetailsResponse, error) {
args := sql.SessionArgs{User: s.getUser(req)}
session := sql.NewSession(ctx, args, s.server.sqlExecutor, nil)
defer session.Finish()

// Placeholders don't work with SHOW statements, so we need to manually
// escape the database name.
Expand All @@ -197,6 +202,7 @@ func (s *adminServer) DatabaseDetails(
escDBName := parser.Name(req.Database).String()
query := fmt.Sprintf("SHOW GRANTS ON DATABASE %s; SHOW TABLES FROM %s;", escDBName, escDBName)
r := s.server.sqlExecutor.ExecuteStatements(session, query, nil)
defer r.Close()
if err := s.firstNotFoundError(r.ResultList); err != nil {
return nil, grpc.Errorf(codes.NotFound, "%s", err)
}
Expand All @@ -213,7 +219,8 @@ func (s *adminServer) DatabaseDetails(
)

scanner := makeResultScanner(r.ResultList[0].Columns)
for _, row := range r.ResultList[0].Rows {
for i, nRows := 0, r.ResultList[0].Rows.Len(); i < nRows; i++ {
row := r.ResultList[0].Rows.At(i)
// Marshal grant, splitting comma-separated privileges into a proper slice.
var grant serverpb.DatabaseDetailsResponse_Grant
var privileges string
Expand All @@ -235,7 +242,8 @@ func (s *adminServer) DatabaseDetails(
if a, e := len(r.ResultList[1].Columns), 1; a != e {
return nil, s.serverErrorf("show tables columns mismatch: %d != expected %d", a, e)
}
for _, row := range r.ResultList[1].Rows {
for i, nRows := 0, r.ResultList[1].Rows.Len(); i < nRows; i++ {
row := r.ResultList[1].Rows.At(i)
var tableName string
if err := scanner.Scan(row, tableCol, &tableName); err != nil {
return nil, err
Expand Down Expand Up @@ -280,6 +288,7 @@ func (s *adminServer) TableDetails(
) (*serverpb.TableDetailsResponse, error) {
args := sql.SessionArgs{User: s.getUser(req)}
session := sql.NewSession(ctx, args, s.server.sqlExecutor, nil)
defer session.Finish()

// TODO(cdo): Use real placeholders for the table and database names when we've extended our SQL
// grammar to allow that.
Expand All @@ -289,6 +298,7 @@ func (s *adminServer) TableDetails(
query := fmt.Sprintf("SHOW COLUMNS FROM %s; SHOW INDEX FROM %s; SHOW GRANTS ON TABLE %s; SHOW CREATE TABLE %s;",
escQualTable, escQualTable, escQualTable, escQualTable)
r := s.server.sqlExecutor.ExecuteStatements(session, query, nil)
defer r.Close()
if err := s.firstNotFoundError(r.ResultList); err != nil {
return nil, grpc.Errorf(codes.NotFound, "%s", err)
}
Expand All @@ -312,7 +322,8 @@ func (s *adminServer) TableDetails(
defaultCol = "Default"
)
scanner := makeResultScanner(r.ResultList[0].Columns)
for _, row := range r.ResultList[0].Rows {
for i, nRows := 0, r.ResultList[0].Rows.Len(); i < nRows; i++ {
row := r.ResultList[0].Rows.At(i)
var col serverpb.TableDetailsResponse_Column
if err := scanner.Scan(row, fieldCol, &col.Name); err != nil {
return nil, err
Expand Down Expand Up @@ -347,7 +358,8 @@ func (s *adminServer) TableDetails(
storingCol = "Storing"
)
scanner := makeResultScanner(r.ResultList[1].Columns)
for _, row := range r.ResultList[1].Rows {
for i, nRows := 0, r.ResultList[1].Rows.Len(); i < nRows; i++ {
row := r.ResultList[1].Rows.At(i)
// Marshal grant, splitting comma-separated privileges into a proper slice.
var index serverpb.TableDetailsResponse_Index
if err := scanner.Scan(row, nameCol, &index.Name); err != nil {
Expand Down Expand Up @@ -379,7 +391,8 @@ func (s *adminServer) TableDetails(
privilegesCol = "Privileges"
)
scanner := makeResultScanner(r.ResultList[2].Columns)
for _, row := range r.ResultList[2].Rows {
for i, nRows := 0, r.ResultList[2].Rows.Len(); i < nRows; i++ {
row := r.ResultList[2].Rows.At(i)
// Marshal grant, splitting comma-separated privileges into a proper slice.
var grant serverpb.TableDetailsResponse_Grant
var privileges string
Expand All @@ -398,13 +411,13 @@ func (s *adminServer) TableDetails(
{
const createTableCol = "CreateTable"
showResult := r.ResultList[3]
if len(showResult.Rows) != 1 {
if showResult.Rows.Len() != 1 {
return nil, s.serverErrorf("CreateTable response not available.")
}

scanner := makeResultScanner(showResult.Columns)
var createStmt string
if err := scanner.Scan(showResult.Rows[0], createTableCol, &createStmt); err != nil {
if err := scanner.Scan(showResult.Rows.At(0), createTableCol, &createStmt); err != nil {
return nil, err
}

Expand Down Expand Up @@ -594,15 +607,19 @@ func (s *adminServer) TableStats(ctx context.Context, req *serverpb.TableStatsRe
func (s *adminServer) Users(ctx context.Context, req *serverpb.UsersRequest) (*serverpb.UsersResponse, error) {
args := sql.SessionArgs{User: s.getUser(req)}
session := sql.NewSession(ctx, args, s.server.sqlExecutor, nil)
defer session.Finish()

query := "SELECT username FROM system.users"
r := s.server.sqlExecutor.ExecuteStatements(session, query, nil)
defer r.Close()
if err := s.checkQueryResults(r.ResultList, 1); err != nil {
return nil, s.serverError(err)
}

var resp serverpb.UsersResponse
for _, row := range r.ResultList[0].Rows {
resp.Users = append(resp.Users, serverpb.UsersResponse_User{Username: string(*row.Values[0].(*parser.DString))})
for i, nRows := 0, r.ResultList[0].Rows.Len(); i < nRows; i++ {
row := r.ResultList[0].Rows.At(i)
resp.Users = append(resp.Users, serverpb.UsersResponse_User{Username: string(*row[0].(*parser.DString))})
}
return &resp, nil
}
Expand All @@ -615,6 +632,7 @@ func (s *adminServer) Users(ctx context.Context, req *serverpb.UsersRequest) (*s
func (s *adminServer) Events(ctx context.Context, req *serverpb.EventsRequest) (*serverpb.EventsResponse, error) {
args := sql.SessionArgs{User: s.getUser(req)}
session := sql.NewSession(ctx, args, s.server.sqlExecutor, nil)
defer session.Finish()

// Execute the query.
q := makeSQLQuery()
Expand All @@ -633,14 +651,16 @@ func (s *adminServer) Events(ctx context.Context, req *serverpb.EventsRequest) (
return nil, s.serverErrors(q.Errors())
}
r := s.server.sqlExecutor.ExecuteStatements(session, q.String(), q.QueryArguments())
defer r.Close()
if err := s.checkQueryResults(r.ResultList, 1); err != nil {
return nil, s.serverError(err)
}

// Marshal response.
var resp serverpb.EventsResponse
scanner := makeResultScanner(r.ResultList[0].Columns)
for _, row := range r.ResultList[0].Rows {
for i, nRows := 0, r.ResultList[0].Rows.Len(); i < nRows; i++ {
row := r.ResultList[0].Rows.At(i)
var event serverpb.EventsResponse_Event
var ts time.Time
if err := scanner.ScanIndex(row, 0, &ts); err != nil {
Expand Down Expand Up @@ -689,24 +709,26 @@ func (s *adminServer) getUIData(session *sql.Session, user string, keys []string
return nil, s.serverErrorf("error constructing query: %v", err)
}
r := s.server.sqlExecutor.ExecuteStatements(session, query.String(), query.QueryArguments())
defer r.Close()
if err := s.checkQueryResults(r.ResultList, 1); err != nil {
return nil, s.serverError(err)
}

// Marshal results.
resp := serverpb.GetUIDataResponse{KeyValues: make(map[string]serverpb.GetUIDataResponse_Value)}
for _, row := range r.ResultList[0].Rows {
dKey, ok := row.Values[0].(*parser.DString)
for i, nRows := 0, r.ResultList[0].Rows.Len(); i < nRows; i++ {
row := r.ResultList[0].Rows.At(i)
dKey, ok := row[0].(*parser.DString)
if !ok {
return nil, s.serverErrorf("unexpected type for UI key: %T", row.Values[0])
return nil, s.serverErrorf("unexpected type for UI key: %T", row[0])
}
dValue, ok := row.Values[1].(*parser.DBytes)
dValue, ok := row[1].(*parser.DBytes)
if !ok {
return nil, s.serverErrorf("unexpected type for UI value: %T", row.Values[1])
return nil, s.serverErrorf("unexpected type for UI value: %T", row[1])
}
dLastUpdated, ok := row.Values[2].(*parser.DTimestamp)
dLastUpdated, ok := row[2].(*parser.DTimestamp)
if !ok {
return nil, s.serverErrorf("unexpected type for UI lastUpdated: %T", row.Values[2])
return nil, s.serverErrorf("unexpected type for UI lastUpdated: %T", row[2])
}

resp.KeyValues[string(*dKey)] = serverpb.GetUIDataResponse_Value{
Expand All @@ -726,11 +748,13 @@ func (s *adminServer) SetUIData(ctx context.Context, req *serverpb.SetUIDataRequ

args := sql.SessionArgs{User: s.getUser(req)}
session := sql.NewSession(ctx, args, s.server.sqlExecutor, nil)
defer session.Finish()

for key, val := range req.KeyValues {
// Do an upsert of the key. We update each key in a separate transaction to
// avoid long-running transactions and possible deadlocks.
br := s.server.sqlExecutor.ExecuteStatements(session, "BEGIN;", nil)
defer br.Close()
if err := s.checkQueryResults(br.ResultList, 1); err != nil {
return nil, s.serverError(err)
}
Expand All @@ -749,6 +773,7 @@ func (s *adminServer) SetUIData(ctx context.Context, req *serverpb.SetUIDataRequ
qargs.SetValue(`1`, parser.NewDString(string(val)))
qargs.SetValue(`2`, parser.NewDString(key))
r := s.server.sqlExecutor.ExecuteStatements(session, query, qargs)
defer r.Close()
if err := s.checkQueryResults(r.ResultList, 2); err != nil {
return nil, s.serverError(err)
}
Expand All @@ -761,6 +786,7 @@ func (s *adminServer) SetUIData(ctx context.Context, req *serverpb.SetUIDataRequ
qargs.SetValue(`1`, parser.NewDString(key))
qargs.SetValue(`2`, parser.NewDBytes(parser.DBytes(val)))
r := s.server.sqlExecutor.ExecuteStatements(session, query, qargs)
defer r.Close()
if err := s.checkQueryResults(r.ResultList, 2); err != nil {
return nil, s.serverError(err)
}
Expand All @@ -782,6 +808,7 @@ func (s *adminServer) SetUIData(ctx context.Context, req *serverpb.SetUIDataRequ
func (s *adminServer) GetUIData(ctx context.Context, req *serverpb.GetUIDataRequest) (*serverpb.GetUIDataResponse, error) {
args := sql.SessionArgs{User: s.getUser(req)}
session := sql.NewSession(ctx, args, s.server.sqlExecutor, nil)
defer session.Finish()

if len(req.Keys) == 0 {
return nil, grpc.Errorf(codes.InvalidArgument, "keys cannot be empty")
Expand Down Expand Up @@ -1152,17 +1179,17 @@ func makeResultScanner(cols []sql.ResultColumn) resultScanner {

// IsNull returns whether the specified column of the given row contains
// a SQL NULL value.
func (rs resultScanner) IsNull(row sql.ResultRow, col string) (bool, error) {
func (rs resultScanner) IsNull(row parser.DTuple, col string) (bool, error) {
idx, ok := rs.colNameToIdx[col]
if !ok {
return false, errors.Errorf("result is missing column %s", col)
}
return row.Values[idx] == parser.DNull, nil
return row[idx] == parser.DNull, nil
}

// ScanIndex scans the given column index of the given row into dst.
func (rs resultScanner) ScanIndex(row sql.ResultRow, index int, dst interface{}) error {
src := row.Values[index]
func (rs resultScanner) ScanIndex(row parser.DTuple, index int, dst interface{}) error {
src := row[index]

switch d := dst.(type) {
case *string:
Expand Down Expand Up @@ -1224,7 +1251,7 @@ func (rs resultScanner) ScanIndex(row sql.ResultRow, index int, dst interface{})
}

// Scan scans the column with the given name from the given row into dst.
func (rs resultScanner) Scan(row sql.ResultRow, colName string, dst interface{}) error {
func (rs resultScanner) Scan(row parser.DTuple, colName string, dst interface{}) error {
idx, ok := rs.colNameToIdx[colName]
if !ok {
return errors.Errorf("result is missing column %s", colName)
Expand All @@ -1251,18 +1278,19 @@ func (s *adminServer) queryZone(
params := parser.NewPlaceholderInfo()
params.SetValue(`1`, parser.NewDInt(parser.DInt(id)))
r := s.server.sqlExecutor.ExecuteStatements(session, query, params)
defer r.Close()
if err := s.checkQueryResults(r.ResultList, 1); err != nil {
return config.ZoneConfig{}, false, err
}

result := r.ResultList[0]
if len(result.Rows) == 0 {
if result.Rows.Len() == 0 {
return config.ZoneConfig{}, false, nil
}

var zoneBytes []byte
scanner := resultScanner{}
err := scanner.ScanIndex(result.Rows[0], 0, &zoneBytes)
err := scanner.ScanIndex(result.Rows.At(0), 0, &zoneBytes)
if err != nil {
return config.ZoneConfig{}, false, err
}
Expand Down Expand Up @@ -1299,18 +1327,19 @@ func (s *adminServer) queryNamespaceID(
params.SetValue(`1`, parser.NewDInt(parser.DInt(parentID)))
params.SetValue(`2`, parser.NewDString(name))
r := s.server.sqlExecutor.ExecuteStatements(session, query, params)
defer r.Close()
if err := s.checkQueryResults(r.ResultList, 1); err != nil {
return 0, err
}

result := r.ResultList[0]
if len(result.Rows) == 0 {
if result.Rows.Len() == 0 {
return 0, errors.Errorf("namespace %s with ParentID %d not found", name, parentID)
}

var id int64
scanner := resultScanner{}
err := scanner.ScanIndex(result.Rows[0], 0, &id)
err := scanner.ScanIndex(result.Rows.At(0), 0, &id)
if err != nil {
return 0, err
}
Expand Down
Loading

0 comments on commit f48730d

Please sign in to comment.