Skip to content

Commit

Permalink
fix(spanner): parallelize session deletion when closing pool (#3701)
Browse files Browse the repository at this point in the history
Closing the session pool when closing a Spanner client should be quick and should ignore any errors that are caused by an unresponsive backend or by loss of connectivity. Go does not support fire-and-forget RPC invocations. Instead this solution uses parallel invocations of the `DeleteSession` RPC with short timeouts that are ignored if they occur.

Fixes #3685
  • Loading branch information
olavloite authored Feb 16, 2021
1 parent 73faf8e commit 75ac7d2
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 10 deletions.
4 changes: 3 additions & 1 deletion spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,9 @@ func getQueryOptions(opts QueryOptions) QueryOptions {
// Close closes the client.
func (c *Client) Close() {
if c.idleSessions != nil {
c.idleSessions.close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
c.idleSessions.close(ctx)
}
c.sc.close()
}
Expand Down
33 changes: 33 additions & 0 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2367,3 +2367,36 @@ func TestClient_Single_Read_WithNumericKey(t *testing.T) {
t.Fatalf("row count mismatch\nGot: %v\nWant: %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
}
}

func TestClient_CloseWithUnresponsiveBackend(t *testing.T) {
t.Parallel()

minOpened := uint64(5)
server, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: minOpened,
},
})
defer teardown()
sp := client.idleSessions

waitFor(t, func() error {
sp.mu.Lock()
defer sp.mu.Unlock()
if uint64(sp.idleList.Len()) != minOpened {
return fmt.Errorf("num open sessions mismatch\nWant: %d\nGot: %d", sp.MinOpened, sp.numOpened)
}
return nil
})
server.TestSpanner.Freeze()
defer server.TestSpanner.Unfreeze()

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
sp.close(ctx)

if w, g := context.DeadlineExceeded, ctx.Err(); w != g {
t.Fatalf("context error mismatch\nWant: %v\nGot: %v", w, g)
}
}
29 changes: 23 additions & 6 deletions spanner/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,15 +328,19 @@ func (s *session) recycle() {
// destroy removes the session from its home session pool, healthcheck queue
// and Cloud Spanner service.
func (s *session) destroy(isExpire bool) bool {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
return s.destroyWithContext(ctx, isExpire)
}

func (s *session) destroyWithContext(ctx context.Context, isExpire bool) bool {
// Remove s from session pool.
if !s.pool.remove(s, isExpire) {
return false
}
// Unregister s from healthcheck queue.
s.pool.hc.unregister(s)
// Remove s from Cloud Spanner service.
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
s.delete(ctx)
return true
}
Expand All @@ -345,7 +349,9 @@ func (s *session) delete(ctx context.Context) {
// Ignore the error because even if we fail to explicitly destroy the
// session, it will be eventually garbage collected by Cloud Spanner.
err := s.client.DeleteSession(contextWithOutgoingMetadata(ctx, s.md), &sppb.DeleteSessionRequest{Name: s.getID()})
if err != nil {
// Do not log DeadlineExceeded errors when deleting sessions, as these do
// not indicate anything the user can or should act upon.
if err != nil && ErrCode(err) != codes.DeadlineExceeded {
logf(s.logger, "Failed to delete session %v. Error: %v", s.getID(), err)
}
}
Expand Down Expand Up @@ -725,8 +731,11 @@ func (p *sessionPool) isValid() bool {
return p.valid
}

// close marks the session pool as closed.
func (p *sessionPool) close() {
// close marks the session pool as closed and deletes all sessions in parallel.
// Any errors that are returned by the Delete RPC are logged but otherwise
// ignored, except for DeadlineExceeded errors, which are ignored and not
// logged.
func (p *sessionPool) close(ctx context.Context) {
if p == nil {
return
}
Expand All @@ -743,9 +752,17 @@ func (p *sessionPool) close() {
allSessions := make([]*session, len(p.hc.queue.sessions))
copy(allSessions, p.hc.queue.sessions)
p.hc.mu.Unlock()
wg := sync.WaitGroup{}
for _, s := range allSessions {
s.destroy(false)
wg.Add(1)
go deleteSession(ctx, s, &wg)
}
wg.Wait()
}

func deleteSession(ctx context.Context, s *session, wg *sync.WaitGroup) {
defer wg.Done()
s.destroyWithContext(ctx, false)
}

// errInvalidSessionPool is the error for using an invalid session pool.
Expand Down
6 changes: 3 additions & 3 deletions spanner/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1355,7 +1355,7 @@ func TestSessionHealthCheck(t *testing.T) {
if err != nil {
t.Fatalf("cannot get session from session pool: %v", err)
}
sp.close()
sp.close(context.Background())
if sh.session.isValid() {
t.Fatalf("session(%v) is still alive, want it to be garbage collected", s)
}
Expand Down Expand Up @@ -1454,7 +1454,7 @@ func TestStressSessionPool(t *testing.T) {
t.Fatalf("%v: session in healthcheck queue (%v) was not found on server", ti, id)
}
}
sp.close()
sp.close(context.Background())
mockSessions = server.TestSpanner.DumpSessions()
for id, b := range hcSessions {
if b && mockSessions[id] {
Expand All @@ -1477,7 +1477,7 @@ func testStressSessionPool(t *testing.T, cfg SessionPoolConfig, ti int, idx int,
if idx%10 == 0 && j >= 900 {
// Close the pool in selected set of workers during the
// middle of the test.
pool.close()
pool.close(context.Background())
}
// Take a write sessions ~ 20% of the times.
takeWrite := rand.Intn(5) == 4
Expand Down

0 comments on commit 75ac7d2

Please sign in to comment.