Skip to content

Commit

Permalink
Merge #97270
Browse files Browse the repository at this point in the history
97270: server: simplify logic for logging failed pgwire cancel r=knz a=rafiss

Now that we only log when the rate limit is exceeded, it is simpler to move the log statement inside of the pgwire function that does the cancellation. This also removed the need for the server router to wait for all servers to respond to the request.

Informs #91386
Release note: None

Co-authored-by: Rafi Shamim <[email protected]>
  • Loading branch information
craig[bot] and rafiss committed Mar 7, 2023
2 parents 76d6719 + 2822cb9 commit fa0440b
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 50 deletions.
2 changes: 1 addition & 1 deletion pkg/server/server_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type onDemandServer interface {
getHTTPHandlerFn() http.HandlerFunc

// handleCancel processes a SQL async cancel query.
handleCancel(ctx context.Context, cancelKey pgwirecancel.BackendKeyData) error
handleCancel(ctx context.Context, cancelKey pgwirecancel.BackendKeyData)

// serveConn handles an incoming SQL connection.
serveConn(ctx context.Context, conn net.Conn, status pgwire.PreServeStatus) error
Expand Down
40 changes: 6 additions & 34 deletions pkg/server/server_controller_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
)

// sqlMux redirects incoming SQL connections to the server selected
Expand All @@ -36,7 +34,6 @@ func (c *serverController) sqlMux(
// simply broadcast them to all servers. One of the servers will
// pick it up.
servers := c.getServers()
errCh := make(chan error, len(servers))
for i := range servers {
s := servers[i]
// We dispatch the request concurrently to all the servers.
Expand All @@ -53,37 +50,12 @@ func (c *serverController) sqlMux(
// servers to see and process the cancel at approximately the
// same time as every other.
if err := c.stopper.RunAsyncTask(ctx, "cancel", func(ctx context.Context) {
errCh <- s.server.handleCancel(ctx, status.CancelKey)
s.server.handleCancel(ctx, status.CancelKey)
}); err != nil {
return err
}
}
// Wait for the cancellation to be processed.
return c.stopper.RunAsyncTask(ctx, "wait-cancel", func(ctx context.Context) {
var err error
sawSuccess := false
for i := 0; i < len(servers); i++ {
select {
case thisErr := <-errCh:
err = errors.CombineErrors(err, thisErr)
sawSuccess = sawSuccess || thisErr == nil
case <-c.stopper.ShouldQuiesce():
return
}
}
if !sawSuccess {
// We don't want to log a warning if cancellation has succeeded.
_, rateLimited := errors.If(err, func(err error) (interface{}, bool) {
if respStatus := grpcstatus.Convert(err); respStatus.Code() == codes.ResourceExhausted {
return nil, true
}
return nil, false
})
if rateLimited {
log.Sessions.Warningf(ctx, "unexpected while handling pgwire cancellation request: %v", err)
}
}
})
return nil

case pgwire.PreServeReady:
tenantName := roachpb.TenantName(status.GetTenantName())
Expand All @@ -107,10 +79,10 @@ func (c *serverController) sqlMux(

func (t *systemServerWrapper) handleCancel(
ctx context.Context, cancelKey pgwirecancel.BackendKeyData,
) error {
) {
pgCtx := t.server.sqlServer.AnnotateCtx(context.Background())
pgCtx = logtags.AddTags(pgCtx, logtags.FromContext(ctx))
return t.server.sqlServer.pgServer.HandleCancel(pgCtx, cancelKey)
t.server.sqlServer.pgServer.HandleCancel(pgCtx, cancelKey)
}

func (t *systemServerWrapper) serveConn(
Expand All @@ -123,10 +95,10 @@ func (t *systemServerWrapper) serveConn(

func (t *tenantServerWrapper) handleCancel(
ctx context.Context, cancelKey pgwirecancel.BackendKeyData,
) error {
) {
pgCtx := t.server.sqlServer.AnnotateCtx(context.Background())
pgCtx = logtags.AddTags(pgCtx, logtags.FromContext(ctx))
return t.server.sqlServer.pgServer.HandleCancel(pgCtx, cancelKey)
t.server.sqlServer.pgServer.HandleCancel(pgCtx, cancelKey)
}

func (t *tenantServerWrapper) serveConn(
Expand Down
14 changes: 1 addition & 13 deletions pkg/server/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ import (
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
sentry "github.com/getsentry/sentry-go"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
)

// SQLServerWrapper is a utility struct that encapsulates
Expand Down Expand Up @@ -769,17 +767,7 @@ func (s *SQLServerWrapper) serveConn(
pgServer := s.PGServer()
switch status.State {
case pgwire.PreServeCancel:
if err := pgServer.HandleCancel(ctx, status.CancelKey); err != nil {
_, rateLimited := errors.If(err, func(err error) (interface{}, bool) {
if respStatus := grpcstatus.Convert(err); respStatus.Code() == codes.ResourceExhausted {
return nil, true
}
return nil, false
})
if rateLimited {
log.Sessions.Warningf(ctx, "unexpected while handling pgwire cancellation request: %v", err)
}
}
pgServer.HandleCancel(ctx, status.CancelKey)
return nil
case pgwire.PreServeReady:
return pgServer.ServeConn(ctx, conn, status)
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/pgwire/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ func readCancelKeyAndCloseConn(
// - (*server/statusServer).cancelSemaphore
// - (*server/statusServer).CancelRequestByKey
// - (*server/serverController).sqlMux
func (s *Server) HandleCancel(ctx context.Context, cancelKey pgwirecancel.BackendKeyData) error {
func (s *Server) HandleCancel(ctx context.Context, cancelKey pgwirecancel.BackendKeyData) {
s.tenantMetrics.PGWireCancelTotalCount.Inc(1)

resp, err := func() (*serverpb.CancelQueryByKeyResponse, error) {
Expand All @@ -846,9 +846,9 @@ func (s *Server) HandleCancel(ctx context.Context, cancelKey pgwirecancel.Backen
} else if err != nil {
if respStatus := status.Convert(err); respStatus.Code() == codes.ResourceExhausted {
s.tenantMetrics.PGWireCancelIgnoredCount.Inc(1)
log.Sessions.Warningf(ctx, "unexpected while handling pgwire cancellation request: %v", err)
}
}
return err
}

// finalizeClientParameters "fills in" the session arguments with
Expand Down

0 comments on commit fa0440b

Please sign in to comment.