Skip to content

Commit

Permalink
importinto: fix import compressed server file (#48173)
Browse files Browse the repository at this point in the history
close #48049
  • Loading branch information
D3Hunter authored Nov 1, 2023
1 parent b99d1c4 commit 1ba524c
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 94 deletions.
2 changes: 1 addition & 1 deletion pkg/executor/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ go_test(
embed = [":importer"],
flaky = True,
race = "on",
shard_count = 18,
shard_count = 19,
deps = [
"//br/pkg/errors",
"//br/pkg/lightning/backend/encode",
Expand Down
10 changes: 8 additions & 2 deletions pkg/executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,13 @@ var (

// LoadDataReadBlockSize is exposed for test.
LoadDataReadBlockSize = int64(config.ReadBlockSize)

supportedSuffixForServerDisk = []string{
".csv", ".sql", ".parquet",
".gz", ".gzip",
".zstd", ".zst",
".snappy",
}
)

// GetKVStore returns a kv.Storage.
Expand Down Expand Up @@ -656,7 +663,6 @@ func (p *Plan) initOptions(seCtx sessionctx.Context, options []*plannercore.Load
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
p.MaxRecordedErrors = vInt
// todo: set a max value for this param?
}
if _, ok := specifiedOptions[detachedOption]; ok {
p.Detached = true
Expand Down Expand Up @@ -969,7 +975,7 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error {
}
// we add this check for security, we don't want user import any sensitive system files,
// most of which is readable text file and don't have a suffix, such as /etc/passwd
if !slices.Contains([]string{".csv", ".sql", ".parquet"}, strings.ToLower(filepath.Ext(e.Path))) {
if !slices.Contains(supportedSuffixForServerDisk, strings.ToLower(filepath.Ext(e.Path))) {
return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(plannercore.ImportIntoDataSource,
"the file suffix is not supported when import from server disk")
}
Expand Down
90 changes: 90 additions & 0 deletions pkg/executor/importer/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"context"
"fmt"
"net/url"
"os"
"path"
"path/filepath"
"runtime"
"testing"
"time"
Expand All @@ -34,9 +37,11 @@ import (
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestInitDefaultOptions(t *testing.T) {
Expand Down Expand Up @@ -301,3 +306,88 @@ func TestGetBackendWorkerConcurrency(t *testing.T) {
c.Plan.ThreadCnt = 123
require.Equal(t, 246, c.getBackendWorkerConcurrency())
}

func TestSupportedSuffixForServerDisk(t *testing.T) {
tempDir := t.TempDir()
ctx := context.Background()

fileName := filepath.Join(tempDir, "test.csv")
require.NoError(t, os.WriteFile(fileName, []byte{}, 0o644))
fileName2 := filepath.Join(tempDir, "test.csv.gz")
require.NoError(t, os.WriteFile(fileName2, []byte{}, 0o644))
c := LoadDataController{
Plan: &Plan{
Format: DataFormatCSV,
InImportInto: true,
},
logger: zap.NewExample(),
}
// no suffix
c.Path = filepath.Join(tempDir, "test")
require.ErrorIs(t, c.InitDataFiles(ctx), exeerrors.ErrLoadDataInvalidURI)
// unknown suffix
c.Path = filepath.Join(tempDir, "test.abc")
require.ErrorIs(t, c.InitDataFiles(ctx), exeerrors.ErrLoadDataInvalidURI)
c.Path = fileName
require.NoError(t, c.InitDataFiles(ctx))
c.Path = fileName2
require.NoError(t, c.InitDataFiles(ctx))

var allData []string
for i := 0; i < 3; i++ {
fileName := fmt.Sprintf("server-%d.csv", i)
var content []byte
rowCnt := 2
for j := 0; j < rowCnt; j++ {
content = append(content, []byte(fmt.Sprintf("%d,test-%d\n", i*rowCnt+j, i*rowCnt+j))...)
allData = append(allData, fmt.Sprintf("%d test-%d", i*rowCnt+j, i*rowCnt+j))
}
require.NoError(t, os.WriteFile(path.Join(tempDir, fileName), content, 0o644))
}
// directory without permission
require.NoError(t, os.MkdirAll(path.Join(tempDir, "no-perm"), 0o700))
require.NoError(t, os.WriteFile(path.Join(tempDir, "no-perm", "no-perm.csv"), []byte("1,1"), 0o644))
require.NoError(t, os.Chmod(path.Join(tempDir, "no-perm"), 0o000))
t.Cleanup(func() {
// make sure TempDir RemoveAll cleanup works
_ = os.Chmod(path.Join(tempDir, "no-perm"), 0o700)
})
// file without permission
require.NoError(t, os.WriteFile(path.Join(tempDir, "no-perm.csv"), []byte("1,1"), 0o644))
require.NoError(t, os.Chmod(path.Join(tempDir, "no-perm.csv"), 0o000))

// relative path
c.Path = "~/file.csv"
err2 := c.InitDataFiles(ctx)
require.ErrorIs(t, err2, exeerrors.ErrLoadDataInvalidURI)
require.ErrorContains(t, err2, "URI of data source is invalid")
// non-exist parent directory
c.Path = "/path/to/non/exists/file.csv"
err := c.InitDataFiles(ctx)
require.ErrorIs(t, err, exeerrors.ErrLoadDataInvalidURI)
require.ErrorContains(t, err, "no such file or directory")
// without permission to parent dir
c.Path = path.Join(tempDir, "no-perm", "no-perm.csv")
err = c.InitDataFiles(ctx)
require.ErrorIs(t, err, exeerrors.ErrLoadDataCantRead)
require.ErrorContains(t, err, "permission denied")
// file not exists
c.Path = path.Join(tempDir, "not-exists.csv")
err = c.InitDataFiles(ctx)
require.ErrorIs(t, err, exeerrors.ErrLoadDataCantRead)
require.ErrorContains(t, err, "no such file or directory")
// file without permission
c.Path = path.Join(tempDir, "no-perm.csv")
err = c.InitDataFiles(ctx)
require.ErrorIs(t, err, exeerrors.ErrLoadDataCantRead)
require.ErrorContains(t, err, "permission denied")
// we don't have read access to 'no-perm' directory, so walk-dir fails
c.Path = path.Join(tempDir, "server-*.csv")
err = c.InitDataFiles(ctx)
require.ErrorIs(t, err, exeerrors.ErrLoadDataCantRead)
require.ErrorContains(t, err, "permission denied")
// grant read access to 'no-perm' directory, should ok now.
require.NoError(t, os.Chmod(path.Join(tempDir, "no-perm"), 0o400))
c.Path = path.Join(tempDir, "server-*.csv")
require.NoError(t, c.InitDataFiles(ctx))
}
1 change: 0 additions & 1 deletion tests/realtikvtest/importintotest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go_test(
timeout = "long",
srcs = [
"detach_test.go",
"from_server_test.go",
"import_into_test.go",
"job_test.go",
"main_test.go",
Expand Down
90 changes: 0 additions & 90 deletions tests/realtikvtest/importintotest/from_server_test.go

This file was deleted.

1 change: 1 addition & 0 deletions tests/realtikvtest/importintotest3/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_test(
timeout = "moderate",
srcs = [
"file_compression_test.go",
"from_server_test.go",
"main_test.go",
],
flaky = True,
Expand Down
58 changes: 58 additions & 0 deletions tests/realtikvtest/importintotest3/from_server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package importintotest

import (
"fmt"
"os"
"path"

"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/pkg/testkit"
)

func (s *mockGCSSuite) TestImportFromServer() {
tempDir := s.T().TempDir()
var allData []string
for i := 0; i < 3; i++ {
fileName := fmt.Sprintf("server-%d.csv", i)
var content []byte
rowCnt := 2
for j := 0; j < rowCnt; j++ {
content = append(content, []byte(fmt.Sprintf("%d,test-%d\n", i*rowCnt+j, i*rowCnt+j))...)
allData = append(allData, fmt.Sprintf("%d test-%d", i*rowCnt+j, i*rowCnt+j))
}
s.NoError(os.WriteFile(path.Join(tempDir, fileName), content, 0o644))
}

s.prepareAndUseDB("from_server")
s.tk.MustExec("create table t (a bigint, b varchar(100));")

s.tk.MustQuery(fmt.Sprintf("IMPORT INTO t FROM '%s'", path.Join(tempDir, "server-0.csv")))
s.tk.MustQuery("SELECT * FROM t;").Sort().Check(testkit.Rows([]string{"0 test-0", "1 test-1"}...))

s.tk.MustExec("truncate table t")
s.tk.MustQuery(fmt.Sprintf("IMPORT INTO t FROM '%s'", path.Join(tempDir, "server-*.csv")))
s.tk.MustQuery("SELECT * FROM t;").Sort().Check(testkit.Rows(allData...))

// try a gzip file
s.NoError(os.WriteFile(
path.Join(tempDir, "test.csv.gz"),
s.getCompressedData(mydump.CompressionGZ, []byte("1,test1\n2,test2")),
0o644))
s.tk.MustExec("truncate table t")
s.tk.MustQuery(fmt.Sprintf("IMPORT INTO t FROM '%s'", path.Join(tempDir, "test.csv.gz")))
s.tk.MustQuery("SELECT * FROM t;").Sort().Check(testkit.Rows([]string{"1 test1", "2 test2"}...))
}

0 comments on commit 1ba524c

Please sign in to comment.