Skip to content

Commit

Permalink
feat(spanner): enable client to server compression (#7899)
Browse files Browse the repository at this point in the history
* feat(spanner): enable client to server compression

* add unit tests
  • Loading branch information
rahul2393 authored May 19, 2023
1 parent 7b484cc commit 3a047d2
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 11 deletions.
29 changes: 26 additions & 3 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
gtransport "google.golang.org/api/transport/grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/metadata"

vkit "cloud.google.com/go/spanner/apiv1"
Expand All @@ -51,6 +52,8 @@ const (
// requests need to route to leader.
routeToLeaderHeader = "x-goog-spanner-route-to-leader"

requestsCompressionHeader = "x-response-encoding"

// numChannels is the default value for NumChannels of client.
numChannels = 4
)
Expand Down Expand Up @@ -161,6 +164,18 @@ type ClientConfig struct {
// Logger is the logger to use for this client. If it is nil, all logging
// will be directed to the standard logger.
Logger *log.Logger

//
// Sets the compression to use for all gRPC calls. The compressor must be a valid name.
// This will enable compression both from the client to the
// server and from the server to the client.
//
// Supported values are:
// gzip: Enable gzip compression
// identity: Disable compression
//
// Default: identity
Compression string
}

func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD, disableRouteToLeader bool) context.Context {
Expand Down Expand Up @@ -209,7 +224,7 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
config.NumChannels = numChannels
}
// gRPC options.
allOpts := allClientOpts(config.NumChannels, opts...)
allOpts := allClientOpts(config.NumChannels, config.Compression, opts...)
pool, err := gtransport.DialPool(ctx, allOpts...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -237,8 +252,12 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
if config.incStep == 0 {
config.incStep = DefaultSessionPoolConfig.incStep
}
md := metadata.Pairs(resourcePrefixHeader, database)
if config.Compression == gzip.Name {
md.Append(requestsCompressionHeader, gzip.Name)
}
// Create a session client.
sc := newSessionClient(pool, database, config.UserAgent, sessionLabels, config.DatabaseRole, config.DisableRouteToLeader, metadata.Pairs(resourcePrefixHeader, database), config.Logger, config.CallOptions)
sc := newSessionClient(pool, database, config.UserAgent, sessionLabels, config.DatabaseRole, config.DisableRouteToLeader, md, config.Logger, config.CallOptions)
// Create a session pool.
config.SessionPoolConfig.sessionLabels = sessionLabels
sp, err := newSessionPool(sc, config.SessionPoolConfig)
Expand All @@ -263,13 +282,17 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
// Combines the default options from the generated client, the default options
// of the hand-written client and the user options to one list of options.
// Precedence: userOpts > clientDefaultOpts > generatedDefaultOpts
func allClientOpts(numChannels int, userOpts ...option.ClientOption) []option.ClientOption {
func allClientOpts(numChannels int, compression string, userOpts ...option.ClientOption) []option.ClientOption {
generatedDefaultOpts := vkit.DefaultClientOptions()
clientDefaultOpts := []option.ClientOption{
option.WithGRPCConnectionPool(numChannels),
option.WithUserAgent(fmt.Sprintf("spanner-go/v%s", internal.Version)),
internaloption.EnableDirectPath(true),
}
if compression == "gzip" {
userOpts = append(userOpts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(
grpc.UseCompressor(gzip.Name))))
}
allDefaultOpts := append(generatedDefaultOpts, clientDefaultOpts...)
return append(allDefaultOpts, userOpts...)
}
Expand Down
15 changes: 15 additions & 0 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/status"

vkit "cloud.google.com/go/spanner/apiv1"
Expand Down Expand Up @@ -70,6 +71,20 @@ func setupMockedTestServerWithConfigAndClientOptions(t *testing.T, config Client
},
},
}
if config.Compression == gzip.Name {
grpcHeaderChecker.Checkers = append(grpcHeaderChecker.Checkers, &itestutil.HeaderChecker{
Key: "x-response-encoding",
ValuesValidator: func(token ...string) error {
if len(token) != 1 {
return status.Errorf(codes.Internal, "unexpected number of compression headers: %v", len(token))
}
if token[0] != gzip.Name {
return status.Errorf(codes.Internal, "unexpected compression: %v", token[0])
}
return nil
},
})
}
clientOptions = append(clientOptions, grpcHeaderChecker.CallOptions()...)
server, opts, serverTeardown := NewMockedSpannerInMemTestServer(t)
opts = append(opts, clientOptions...)
Expand Down
20 changes: 13 additions & 7 deletions spanner/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1462,9 +1462,15 @@ func TestIntegration_Reads(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Set up testing environment.
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][testTableDDLStatements])
_, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][testTableDDLStatements])
defer cleanup()

client, err := createClient(ctx, dbPath, ClientConfig{SessionPoolConfig: DefaultSessionPoolConfig, Compression: "gzip"})
if err != nil {
t.Fatal(err)
}
defer client.Close()

// Includes k0..k14. Strings sort lexically, eg "k1" < "k10" < "k2".
var ms []*Mutation
for i := 0; i < 15; i++ {
Expand Down Expand Up @@ -2271,7 +2277,7 @@ func TestIntegration_InvalidDatabase(t *testing.T) {
}
ctx := context.Background()
dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/invalid", testProjectID, testInstanceID)
c, err := createClient(ctx, dbPath, SessionPoolConfig{})
c, err := createClient(ctx, dbPath, ClientConfig{SessionPoolConfig: SessionPoolConfig{}})
// Client creation should succeed even if the database is invalid.
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -3060,7 +3066,7 @@ func TestIntegration_BatchQuery(t *testing.T) {
if err = populate(ctx, client); err != nil {
t.Fatal(err)
}
if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil {
if client2, err = createClient(ctx, dbPath, ClientConfig{SessionPoolConfig: SessionPoolConfig{}}); err != nil {
t.Fatal(err)
}
defer client2.Close()
Expand Down Expand Up @@ -3144,7 +3150,7 @@ func TestIntegration_BatchRead(t *testing.T) {
if err = populate(ctx, client); err != nil {
t.Fatal(err)
}
if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil {
if client2, err = createClient(ctx, dbPath, ClientConfig{SessionPoolConfig: SessionPoolConfig{}}); err != nil {
t.Fatal(err)
}
defer client2.Close()
Expand Down Expand Up @@ -4376,7 +4382,7 @@ func prepareDBAndClient(ctx context.Context, t *testing.T, spc SessionPoolConfig
t.Fatalf("timeout creating testing table %v: %v", dbPath, err)
}
}
client, err := createClient(ctx, dbPath, spc)
client, err := createClient(ctx, dbPath, ClientConfig{SessionPoolConfig: spc})
if err != nil {
t.Fatalf("cannot create data client on DB %v: %v", dbPath, err)
}
Expand Down Expand Up @@ -4546,15 +4552,15 @@ func isNaN(x interface{}) bool {
}

// createClient creates Cloud Spanner data client.
func createClient(ctx context.Context, dbPath string, spc SessionPoolConfig) (client *Client, err error) {
func createClient(ctx context.Context, dbPath string, config ClientConfig) (client *Client, err error) {
opts := grpcHeaderChecker.CallOptions()
if spannerHost != "" {
opts = append(opts, option.WithEndpoint(spannerHost))
}
if dpConfig.attemptDirectPath {
opts = append(opts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(peerInfo))))
}
client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{SessionPoolConfig: spc}, opts...)
client, err = NewClientWithConfig(ctx, dbPath, config, opts...)
if err != nil {
return nil, fmt.Errorf("cannot create data client on DB %v: %v", dbPath, err)
}
Expand Down
3 changes: 2 additions & 1 deletion spanner/pdml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
. "cloud.google.com/go/spanner/internal/testutil"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/status"
)

Expand Down Expand Up @@ -150,7 +151,7 @@ func TestPartitionedUpdate_QueryOptions(t *testing.T) {
}

ctx := context.Background()
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{QueryOptions: tt.client})
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{QueryOptions: tt.client, Compression: gzip.Name})
defer teardown()

var err error
Expand Down

0 comments on commit 3a047d2

Please sign in to comment.