Skip to content

Commit

Permalink
fallback to regular sessions if backend returns Unimplemented error
Browse files Browse the repository at this point in the history
  • Loading branch information
rahul2393 committed Jul 9, 2024
1 parent 64c1ec4 commit 703ffae
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 26 deletions.
28 changes: 9 additions & 19 deletions spanner/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,26 +394,16 @@ const (
func TestMain(m *testing.M) {
cleanup := initIntegrationTests()
defer cleanup()
for _, mx := range []string{"false", "true"} {
if mx == "true" {
os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_MULTIPLEXED_SESSIONS", mx)
// Multiplexing is not supported in emulator.
if isEmulatorEnvSet() {
continue
}
for _, dialect := range []adminpb.DatabaseDialect{adminpb.DatabaseDialect_GOOGLE_STANDARD_SQL, adminpb.DatabaseDialect_POSTGRESQL} {
if isEmulatorEnvSet() && dialect == adminpb.DatabaseDialect_POSTGRESQL {
// PG tests are not supported in emulator
continue
}
for _, dialect := range []adminpb.DatabaseDialect{adminpb.DatabaseDialect_GOOGLE_STANDARD_SQL, adminpb.DatabaseDialect_POSTGRESQL} {
if isEmulatorEnvSet() && dialect == adminpb.DatabaseDialect_POSTGRESQL {
// PG tests are not supported in emulator
continue
}
testDialect = dialect
log.Printf("running tests with dialect %v and multiplexing=%v and emulator=%v", dialect, mx, isEmulatorEnvSet())
res := m.Run()
if res != 0 {
cleanup()
os.Exit(res)
}
testDialect = dialect
res := m.Run()
if res != 0 {
cleanup()
os.Exit(res)
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions spanner/internal/testutil/inmem_spanner_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,11 +707,15 @@ func (s *inMemSpannerServer) CreateSession(ctx context.Context, req *spannerpb.C
}
sessionName := s.generateSessionNameLocked(req.Database)
ts := getCurrentTimestamp()
var creatorRole string
var (
creatorRole string
isMultiplexed bool
)
if req.Session != nil {
creatorRole = req.Session.CreatorRole
isMultiplexed = req.Session.Multiplexed
}
session := &spannerpb.Session{Name: sessionName, CreateTime: ts, ApproximateLastUseTime: ts, CreatorRole: creatorRole, Multiplexed: req.Session.Multiplexed}
session := &spannerpb.Session{Name: sessionName, CreateTime: ts, ApproximateLastUseTime: ts, CreatorRole: creatorRole, Multiplexed: isMultiplexed}
s.totalSessionsCreated++
s.sessions[sessionName] = session
return session, nil
Expand Down
111 changes: 111 additions & 0 deletions spanner/kokoro/presubmit.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#!/bin/bash
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License..

# TODO(deklerk): Add integration tests when it's secure to do so. b/64723143

# Fail on any error
set -eo pipefail

# Display commands being run
set -x

# cd to project dir on Kokoro instance
cd github/google-cloud-go

go version

export GOCLOUD_HOME=$KOKORO_ARTIFACTS_DIR/google-cloud-go/
export PATH="$GOPATH/bin:$PATH"
export GO111MODULE=on
export GOPROXY=https://proxy.golang.org

# Move code into artifacts dir
mkdir -p $GOCLOUD_HOME
git clone . $GOCLOUD_HOME
cd $GOCLOUD_HOME

try3() { eval "$*" || eval "$*" || eval "$*"; }

# All packages, including +build tools, are fetched.
try3 go mod download

set +e # Run all tests, don't stop after the first failure.
exit_code=0

case $JOB_TYPE in
integration-with-multiplexed-session )
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS=true
echo "running presubmits with multiplexed sessions enbled: $GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS"
;;
esac

# Run tests in the current directory and tee output to log file,
# to be pushed to GCS as artifact.
runPresubmitTests() {
if [[ $PWD == *"/internal/"* ]] ||
[[ $PWD == *"/third_party/"* ]]; then
# internal tools only expected to work with latest go version
return
fi

if [ -z ${RUN_INTEGRATION_TESTS} ]; then
GOWORK=off go test -race -v -timeout 15m -short ./... 2>&1 |
tee sponge_log.log
else
GOWORK=off go test -race -v -timeout 45m ./... 2>&1 |
tee sponge_log.log
fi

# Run integration tests against an emulator.
if [ -f "emulator_test.sh" ]; then
./emulator_test.sh
fi
# Takes the kokoro output log (raw stdout) and creates a machine-parseable
# xUnit XML file.
cat sponge_log.log |
go-junit-report -set-exit-code >sponge_log.xml
# Add the exit codes together so we exit non-zero if any module fails.
exit_code=$(($exit_code + $?))
if [[ $PWD != *"/internal/"* ]]; then
GOWORK=off go build ./...
fi
exit_code=$(($exit_code + $?))
}

SIGNIFICANT_CHANGES=$(git --no-pager diff --name-only origin/main...$KOKORO_GIT_COMMIT_google_cloud_go |
grep -Ev '(\.md$|^\.github|\.json$|\.yaml$)' | xargs dirname | sort -u || true)

if [ -z $SIGNIFICANT_CHANGES ]; then
echo "No changes detected, skipping tests"
exit 0
fi

# CHANGED_DIRS is the list of significant top-level directories that changed,
# but weren't deleted by the current PR. CHANGED_DIRS will be empty when run on main.
CHANGED_DIRS=$(echo "$SIGNIFICANT_CHANGES" | tr ' ' '\n' | cut -d/ -f1 | sort -u |
tr '\n' ' ' | xargs ls -d 2>/dev/null || true)

echo "Running tests only in changed submodules: $CHANGED_DIRS"
for d in $CHANGED_DIRS; do
# run tests only if spanner module is part of $CHANGED_DIRS
if [[ $d == *"spanner"* ]]; then
pushd $(dirname $d)
runPresubmitTests
popd
continue
fi
done

exit $exit_code
27 changes: 23 additions & 4 deletions spanner/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,10 @@ func newSessionPool(sc *sessionClient, config SessionPoolConfig) (*sessionPool,
if config.usedSessionsRatioThreshold == 0 {
config.usedSessionsRatioThreshold = DefaultSessionPoolConfig.usedSessionsRatioThreshold
}

isMultiplexed := strings.ToLower(os.Getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS"))
if isMultiplexed != "" && isMultiplexed != "true" && isMultiplexed != "false" {
return nil, spannerErrorf(codes.InvalidArgument, "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS must be either true or false")
}
pool := &sessionPool{
sc: sc,
valid: true,
Expand All @@ -688,7 +691,7 @@ func newSessionPool(sc *sessionClient, config SessionPoolConfig) (*sessionPool,
mw: newMaintenanceWindow(config.MaxOpened),
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
otConfig: sc.otConfig,
enableMultiplexSession: os.Getenv("GOOGLE_CLOUD_SPANNER_ENABLE_MULTIPLEXED_SESSIONS") == "true" && os.Getenv("GOOGLE_CLOUD_SPANNER_FORCE_DISABLE_MULTIPLEXED_SESSIONS") != "true",
enableMultiplexSession: isMultiplexed == "true",
}

_, instance, database, err := parseDatabaseName(sc.database)
Expand Down Expand Up @@ -1206,8 +1209,7 @@ func (p *sessionPool) takeMultiplexed(ctx context.Context) (*sessionHandle, erro
// Multiplexed session is available, get it.
s = p.multiplexedSession
trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
"Acquired session")
p.decNumSessionsLocked(ctx) // TODO: add tag to differentiate from normal session.
"Acquired multiplexed session")
}
if s != nil {
p.mu.Unlock()
Expand Down Expand Up @@ -1246,6 +1248,11 @@ func (p *sessionPool) takeMultiplexed(ctx context.Context) (*sessionHandle, erro
if p.multiplexedSessionCreationError != nil {
trace.TracePrintf(ctx, nil, "Error creating multiplexed session: %v", p.multiplexedSessionCreationError)
err := p.multiplexedSessionCreationError
if isUnimplementedError(err) {
p.enableMultiplexSession = false
p.mu.Unlock()
return p.take(ctx)
}
p.mu.Unlock()
return nil, err
}
Expand Down Expand Up @@ -1842,6 +1849,18 @@ func isSessionNotFoundError(err error) bool {
return strings.Contains(err.Error(), "Session not found")
}

// isUnimplementedError returns true if the error indicates that an gRPC call is
// aborted on the server side.
func isUnimplementedError(err error) bool {
if err == nil {
return false
}
if ErrCode(err) == codes.Unimplemented {
return true
}
return false
}

func isFailedInlineBeginTransaction(err error) bool {
if err == nil {
return false
Expand Down
6 changes: 5 additions & 1 deletion spanner/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,11 @@ func TestTakeFromIdleList(t *testing.T) {
// Make sure maintainer keeps the idle sessions.
server, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{MaxIdle: 10, MaxOpened: 10},
SessionPoolConfig: SessionPoolConfig{
MaxIdle: 10,
MaxOpened: 10,
healthCheckSampleInterval: 10 * time.Millisecond,
},
})
defer teardown()
sp := client.idleSessions
Expand Down

0 comments on commit 703ffae

Please sign in to comment.