Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: support compression when sending kv pairs to tikv #41164

Merged
merged 18 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ go_library(
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//credentials",
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_google_grpc//encoding/gzip",
"@org_golang_google_grpc//keepalive",
"@org_golang_google_grpc//status",
"@org_golang_x_exp//slices",
Expand Down
71 changes: 54 additions & 17 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"io"
"math"
"net"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -77,6 +78,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -130,18 +132,25 @@ type ImportClientFactory interface {
}

type importClientFactoryImpl struct {
conns *common.GRPCConns
splitCli split.SplitClient
tls *common.TLS
tcpConcurrency int
conns *common.GRPCConns
splitCli split.SplitClient
tls *common.TLS
tcpConcurrency int
compressionType config.CompressionType
}

func newImportClientFactoryImpl(splitCli split.SplitClient, tls *common.TLS, tcpConcurrency int) *importClientFactoryImpl {
func newImportClientFactoryImpl(
splitCli split.SplitClient,
tls *common.TLS,
tcpConcurrency int,
compressionType config.CompressionType,
) *importClientFactoryImpl {
return &importClientFactoryImpl{
conns: common.NewGRPCConns(),
splitCli: splitCli,
tls: tls,
tcpConcurrency: tcpConcurrency,
conns: common.NewGRPCConns(),
splitCli: splitCli,
tls: tls,
tcpConcurrency: tcpConcurrency,
compressionType: compressionType,
}
}

Expand All @@ -150,11 +159,14 @@ func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64)
if err != nil {
return nil, errors.Trace(err)
}
opt := grpc.WithTransportCredentials(insecure.NewCredentials())
var opts []grpc.DialOption
if f.tls.TLSConfig() != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(f.tls.TLSConfig()))
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(f.tls.TLSConfig())))
} else {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
ctx, cancel := context.WithTimeout(ctx, dialTimeout)
defer cancel()

bfConf := backoff.DefaultConfig
bfConf.MaxDelay = gRPCBackOffMaxDelay
Expand All @@ -163,18 +175,34 @@ func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64)
if addr == "" {
addr = store.GetAddress()
}
conn, err := grpc.DialContext(
ctx,
addr,
opt,
opts = append(opts,
grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: gRPCKeepAliveTime,
Timeout: gRPCKeepAliveTimeout,
PermitWithoutStream: true,
}),
)
cancel()
switch f.compressionType {
case config.CompressionNone:
// do nothing
case config.CompressionGzip:
opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name)))
default:
return nil, common.ErrInvalidConfig.GenWithStack("unsupported compression type %s", f.compressionType)
}

failpoint.Inject("LoggingImportBytes", func() {
opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) {
conn, err := (&net.Dialer{}).DialContext(ctx, "tcp", target)
if err != nil {
return nil, err
}
return &loggingConn{Conn: conn}, nil
}))
})

conn, err := grpc.DialContext(ctx, addr, opts...)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -200,6 +228,15 @@ func (f *importClientFactoryImpl) Close() {
f.conns.Close()
}

type loggingConn struct {
net.Conn
}

func (c loggingConn) Write(b []byte) (int, error) {
log.L().Debug("import write", zap.Int("bytes", len(b)))
return c.Conn.Write(b)
}

// Range record start and end key for localStoreDir.DB
// so we can write it to tikv in streaming
type Range struct {
Expand Down Expand Up @@ -479,7 +516,7 @@ func NewLocalBackend(
if err != nil {
return backend.MakeBackend(nil), common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs()
}
importClientFactory := newImportClientFactoryImpl(splitCli, tls, rangeConcurrency)
importClientFactory := newImportClientFactoryImpl(splitCli, tls, rangeConcurrency, cfg.TikvImporter.CompressKVPairs)
duplicateDetection := cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone
keyAdapter := KeyAdapter(noopKeyAdapter{})
if duplicateDetection {
Expand Down
53 changes: 53 additions & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,58 @@ func (dra DuplicateResolutionAlgorithm) String() string {
}
}

// CompressionType is the config type of compression algorithm.
type CompressionType int

const (
// CompressionNone means no compression.
CompressionNone CompressionType = iota
// CompressionGzip means gzip compression.
CompressionGzip
)

func (t *CompressionType) UnmarshalTOML(v interface{}) error {
if val, ok := v.(string); ok {
return t.FromStringValue(val)
}
return errors.Errorf("invalid compression-type '%v', please choose valid option between ['gzip']", v)
}

func (t CompressionType) MarshalText() ([]byte, error) {
return []byte(t.String()), nil
}

func (t *CompressionType) FromStringValue(s string) error {
switch strings.ToLower(s) {
case "":
*t = CompressionNone
case "gz", "gzip":
*t = CompressionGzip
default:
return errors.Errorf("invalid compression-type '%s', please choose valid option between ['gzip']", s)
}
return nil
}

func (t *CompressionType) MarshalJSON() ([]byte, error) {
return []byte(`"` + t.String() + `"`), nil
}

func (t *CompressionType) UnmarshalJSON(data []byte) error {
return t.FromStringValue(strings.Trim(string(data), `"`))
}

func (t CompressionType) String() string {
switch t {
case CompressionGzip:
return "gzip"
case CompressionNone:
return ""
default:
panic(fmt.Sprintf("invalid compression type '%d'", t))
}
}

// PostRestore has some options which will be executed after kv restored.
type PostRestore struct {
Checksum PostOpLevel `toml:"checksum" json:"checksum"`
Expand Down Expand Up @@ -583,6 +635,7 @@ type TikvImporter struct {
OnDuplicate string `toml:"on-duplicate" json:"on-duplicate"`
MaxKVPairs int `toml:"max-kv-pairs" json:"max-kv-pairs"`
SendKVPairs int `toml:"send-kv-pairs" json:"send-kv-pairs"`
CompressKVPairs CompressionType `toml:"compress-kv-pairs" json:"compress-kv-pairs"`
RegionSplitSize ByteSize `toml:"region-split-size" json:"region-split-size"`
RegionSplitKeys int `toml:"region-split-keys" json:"region-split-keys"`
SortedKVDir string `toml:"sorted-kv-dir" json:"sorted-kv-dir"`
Expand Down
14 changes: 14 additions & 0 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1144,3 +1144,17 @@ func TestCreateSeveralConfigsWithDifferentFilters(t *testing.T) {
))
require.True(t, common.StringSliceEqual(config.GetDefaultFilter(), originalDefaultCfg))
}

func TestCompressionType(t *testing.T) {
var ct config.CompressionType
require.NoError(t, ct.FromStringValue(""))
require.Equal(t, config.CompressionNone, ct)
require.NoError(t, ct.FromStringValue("gzip"))
require.Equal(t, config.CompressionGzip, ct)
require.NoError(t, ct.FromStringValue("gz"))
require.Equal(t, config.CompressionGzip, ct)
require.EqualError(t, ct.FromStringValue("zstd"), "invalid compression-type 'zstd', please choose valid option between ['gzip']")

require.Equal(t, "", config.CompressionNone.String())
require.Equal(t, "gzip", config.CompressionGzip.String())
}
sleepymole marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 0 additions & 4 deletions br/tests/lightning_compress/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,3 @@ enable = true
schema = "tidb_lightning_checkpoint_test"
driver = "mysql"
keep-after-success = true

[tikv-importer]
send-kv-pairs=10
region-split-size = 1024
5 changes: 5 additions & 0 deletions br/tests/lightning_import_compress/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[tikv-importer]
backend = 'local'

[mydumper.csv]
header = false
6 changes: 6 additions & 0 deletions br/tests/lightning_import_compress/config_gz.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[tikv-importer]
backend = 'local'
compress-kv-pairs = 'gz'

[mydumper.csv]
header = false
6 changes: 6 additions & 0 deletions br/tests/lightning_import_compress/config_gzip.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[tikv-importer]
backend = 'local'
compress-kv-pairs = 'gzip'

[mydumper.csv]
header = false
56 changes: 56 additions & 0 deletions br/tests/lightning_import_compress/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/bin/bash
#
# Copyright 2023 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eu

export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/backend/local/LoggingImportBytes=return"

mkdir -p "$TEST_DIR/data"

cat <<EOF >"$TEST_DIR/data/test-schema-create.sql"
CREATE DATABASE test;
EOF
cat <<EOF >"$TEST_DIR/data/test.t-schema.sql"
CREATE TABLE test.t (id int primary key, a int, b int, c int);
EOF

# Generate 200k rows. Total size is about 5MiB.
for i in {1..200000}; do
echo "$i,$i,$i,$i" >>"$TEST_DIR/data/test.t.0.csv"
done

LOG_FILE1="$TEST_DIR/lightning-import-compress1.log"
LOG_FILE2="$TEST_DIR/lightning-import-compress2.log"
LOG_FILE3="$TEST_DIR/lightning-import-compress3.log"

run_lightning --backend local -d "$TEST_DIR/data" --config "tests/$TEST_NAME/config.toml" --log-file "$LOG_FILE1" -L debug
run_sql 'DROP DATABASE test;'
run_lightning --backend local -d "$TEST_DIR/data" --config "tests/$TEST_NAME/config_gz.toml" --log-file "$LOG_FILE2" -L debug
run_sql 'DROP DATABASE test;'
run_lightning --backend local -d "$TEST_DIR/data" --config "tests/$TEST_NAME/config_gzip.toml" --log-file "$LOG_FILE3" -L debug

uncompress=$(grep "import write" /tmp/backup_restore_test/lightning-import-compress1.log |
grep -Eo "bytes=[0-9]+" | sed 's/bytes=//g' | awk '{sum+=$1} END {print sum}')
gzip=$(grep "import write" /tmp/backup_restore_test/lightning-import-compress2.log |
grep -Eo "bytes=[0-9]+" | sed 's/bytes=//g' | awk '{sum+=$1} END {print sum}')
gz=$(grep "import write" /tmp/backup_restore_test/lightning-import-compress3.log |
grep -Eo "bytes=[0-9]+" | sed 's/bytes=//g' | awk '{sum+=$1} END {print sum}')

echo "uncompress: ${uncompress}, gzip: ${gzip}, gz: ${gz}"
if [ "$uncompress" -le "$gzip" ] || [ "$uncompress" -le "$gz" ]; then
echo "compress is not working"
exit 1
fi