Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: LOAD DATA INFILE support asterisk matching #42050

Merged
merged 9 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions br/pkg/storage/azblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,13 +348,13 @@ func (s *AzureBlobStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func
if opt == nil {
opt = &WalkOption{}
}
if len(opt.ObjPrefix) != 0 {
return errors.New("azure storage not support ObjPrefix for now")
}
prefix := path.Join(s.options.Prefix, opt.SubDir)
if len(prefix) > 0 && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
if len(opt.ObjPrefix) != 0 {
prefix += opt.ObjPrefix
}

listOption := &azblob.ContainerListBlobFlatSegmentOptions{Prefix: &prefix}
for {
Expand Down
7 changes: 4 additions & 3 deletions br/pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,14 @@ func (s *GCSStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin
if opt == nil {
opt = &WalkOption{}
}
if len(opt.ObjPrefix) != 0 {
return errors.New("gcs storage not support ObjPrefix for now")
}
prefix := path.Join(s.gcs.Prefix, opt.SubDir)
if len(prefix) > 0 && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
if len(opt.ObjPrefix) != 0 {
prefix += opt.ObjPrefix
}

query := &storage.Query{Prefix: prefix}
// only need each object's name and size
err := query.SetAttrSelection([]string{"Name", "Size"})
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ const (
type WalkOption struct {
// walk on SubDir of specify directory
SubDir string
// ObjPrefix used fo prefix search in storage.
// it can save lots of time when we want find specify prefix objects in storage.
// ObjPrefix used fo prefix search in storage. Note that only part of storage
// support it.
// It can save lots of time when we want find specify prefix objects in storage.
// For example. we have 10000 <Hash>.sst files and 10 backupmeta.(\d+) files.
// we can use ObjPrefix = "backupmeta" to retrieve all meta files quickly.
ObjPrefix string
Expand Down
23 changes: 15 additions & 8 deletions executor/asyncloaddata/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,17 @@ func (s *mockGCSSuite) TestInternalStatus() {
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: "test-tsv",
Name: "t.tsv",
Name: "t1.tsv",
},
Content: []byte(`1
2`),
Content: []byte(`1`),
})

s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: "test-tsv",
Name: "t2.tsv",
},
Content: []byte(`2`),
})

ctx := context.Background()
Expand Down Expand Up @@ -118,7 +125,7 @@ func (s *mockGCSSuite) TestInternalStatus() {
expected := &JobInfo{
JobID: id,
User: "test-load@test-host",
DataSource: fmt.Sprintf("gs://test-tsv/t.tsv?endpoint=%s", gcsEndpoint),
DataSource: fmt.Sprintf("gs://test-tsv/t*.tsv?endpoint=%s", gcsEndpoint),
TableSchema: "load_tsv",
TableName: "t",
ImportMode: "logical",
Expand All @@ -141,15 +148,15 @@ func (s *mockGCSSuite) TestInternalStatus() {
// tk2 @ 0:08
info, err = GetJobInfo(ctx, tk2.Session(), id)
require.NoError(s.T(), err)
expected.Progress = `{"SourceFileSize":3,"LoadedFileSize":0,"LoadedRowCnt":1}`
expected.Progress = `{"SourceFileSize":2,"LoadedFileSize":0,"LoadedRowCnt":1}`
require.Equal(s.T(), expected, info)
// tk @ 0:09
// commit one task and sleep 3 seconds
time.Sleep(3 * time.Second)
// tk2 @ 0:11
info, err = GetJobInfo(ctx, tk2.Session(), id)
require.NoError(s.T(), err)
expected.Progress = `{"SourceFileSize":3,"LoadedFileSize":2,"LoadedRowCnt":2}`
expected.Progress = `{"SourceFileSize":2,"LoadedFileSize":1,"LoadedRowCnt":2}`
require.Equal(s.T(), expected, info)
// tk @ 0:12
// finish job
Expand All @@ -159,7 +166,7 @@ func (s *mockGCSSuite) TestInternalStatus() {
require.NoError(s.T(), err)
expected.Status = JobFinished
expected.StatusMessage = "Records: 2 Deleted: 0 Skipped: 0 Warnings: 0"
expected.Progress = `{"SourceFileSize":3,"LoadedFileSize":3,"LoadedRowCnt":2}`
expected.Progress = `{"SourceFileSize":2,"LoadedFileSize":2,"LoadedRowCnt":2}`
require.Equal(s.T(), expected, info)
}()

Expand All @@ -183,7 +190,7 @@ func (s *mockGCSSuite) TestInternalStatus() {
s.enableFailpoint("github.com/pingcap/tidb/executor/AfterStartJob", `sleep(3000)`)
s.enableFailpoint("github.com/pingcap/tidb/executor/AfterCommitOneTask", `sleep(3000)`)
s.tk.MustExec("SET SESSION tidb_dml_batch_size = 1;")
sql := fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t.tsv?endpoint=%s'
sql := fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t*.tsv?endpoint=%s'
INTO TABLE load_tsv.t;`, gcsEndpoint)
s.tk.MustExec(sql)
wg.Wait()
Expand Down
Loading