Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: LOAD DATA support load one file from s3 and other OSS #40489

Merged
merged 16 commits into from
Jan 18, 2023
Merged
14 changes: 14 additions & 0 deletions br/pkg/storage/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ func ParseRawURL(rawURL string) (*url.URL, error) {
return u, nil
}

// ParseBackendFromURL constructs a structured backend description from the
// *url.URL.
func ParseBackendFromURL(u *url.URL, options *BackendOptions) (*backuppb.StorageBackend, error) {
return parseBackend(u, "", options)
}

// ParseBackend constructs a structured backend description from the
// storage URL.
func ParseBackend(rawURL string, options *BackendOptions) (*backuppb.StorageBackend, error) {
Expand All @@ -45,6 +51,14 @@ func ParseBackend(rawURL string, options *BackendOptions) (*backuppb.StorageBack
if err != nil {
return nil, errors.Trace(err)
}
return parseBackend(u, rawURL, options)
}

func parseBackend(u *url.URL, rawURL string, options *BackendOptions) (*backuppb.StorageBackend, error) {
if rawURL == "" {
// try to handle hdfs for ParseBackendFromURL caller
rawURL = u.String()
}
switch u.Scheme {
case "":
absPath, err := filepath.Abs(rawURL)
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ func Create(ctx context.Context, backend *backuppb.StorageBackend, sendCreds boo

// New creates an ExternalStorage with options.
func New(ctx context.Context, backend *backuppb.StorageBackend, opts *ExternalStorageOptions) (ExternalStorage, error) {
if opts == nil {
opts = &ExternalStorageOptions{}
}
switch backend := backend.Backend.(type) {
case *backuppb.StorageBackend_Local:
if backend.Local == nil {
Expand Down
3 changes: 2 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,7 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
IgnoreLines: v.IgnoreLines,
ColumnAssignments: v.ColumnAssignments,
ColumnsAndUserVars: v.ColumnsAndUserVars,
OnDuplicate: v.OnDuplicate,
Ctx: b.ctx,
}
columnNames := loadDataInfo.initFieldMappings()
Expand All @@ -947,7 +948,7 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
}
loadDataExec := &LoadDataExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()),
IsLocal: v.IsLocal,
FileLocRef: v.FileLocRef,
OnDuplicate: v.OnDuplicate,
loadDataInfo: loadDataInfo,
}
Expand Down
181 changes: 170 additions & 11 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,27 @@
package executor

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand All @@ -40,13 +47,15 @@ import (
var (
null = []byte("NULL")
taskQueueSize = 16 // the maximum number of pending tasks to commit in queue
// InTest is a flag that bypass gcs authentication in unit tests.
InTest bool
)

// LoadDataExec represents a load data executor.
type LoadDataExec struct {
baseExecutor

IsLocal bool
FileLocRef ast.FileLocRefTp
OnDuplicate ast.OnDuplicateKeyHandlingType
loadDataInfo *LoadDataInfo
}
Expand All @@ -55,29 +64,73 @@ type LoadDataExec struct {
func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.GrowAndReset(e.maxChunkSize)
// TODO: support load data without local field.
if !e.IsLocal {
if e.FileLocRef == ast.FileLocServer {
return errors.New("Load Data: don't support load data without local field")
}
e.loadDataInfo.OnDuplicate = e.OnDuplicate
// TODO: support lines terminated is "".
if len(e.loadDataInfo.LinesInfo.Terminated) == 0 {
return errors.New("Load Data: don't support load data terminated is nil")
}

sctx := e.loadDataInfo.ctx
val := sctx.Value(LoadDataVarKey)
if val != nil {
sctx.SetValue(LoadDataVarKey, nil)
return errors.New("Load Data: previous load data option isn't closed normal")
}
if e.loadDataInfo.Path == "" {
return errors.New("Load Data: infile path is empty")
}
sctx.SetValue(LoadDataVarKey, e.loadDataInfo)
if !e.loadDataInfo.Table.Meta().IsBaseTable() {
return errors.New("can only load data into base tables")
}

switch e.FileLocRef {
case ast.FileLocServer:
panic("FileLocServer should be handled earlier")
case ast.FileLocClient:
// let caller use handleQuerySpecial to read data in this connection
sctx := e.loadDataInfo.ctx
val := sctx.Value(LoadDataVarKey)
if val != nil {
sctx.SetValue(LoadDataVarKey, nil)
return errors.New("Load Data: previous load data option wasn't closed normally")
}
sctx.SetValue(LoadDataVarKey, e.loadDataInfo)
case ast.FileLocRemote:
return e.loadFromRemote(ctx)
}
return nil
}

func (e *LoadDataExec) loadFromRemote(ctx context.Context) error {
u, err := storage.ParseRawURL(e.loadDataInfo.Path)
if err != nil {
return err
}
var filename string
u.Path, filename = filepath.Split(u.Path)
b, err := storage.ParseBackendFromURL(u, nil)
if err != nil {
return err
}
if b.GetLocal() != nil {
return errors.Errorf("Load Data: don't support load data from tidb-server when set REMOTE, path %s", e.loadDataInfo.Path)
}

opt := &storage.ExternalStorageOptions{}
if InTest {
opt.NoCredentials = true
}
s, err := storage.New(ctx, b, opt)
if err != nil {
return err
}
fileReader, err := s.Open(ctx, filename)
if err != nil {
return err
}
defer fileReader.Close()
reader := bufio.NewReader(fileReader)

return e.loadDataInfo.Load(ctx, func() ([]byte, error) {
return reader.ReadBytes('\n')
})
}

// Close implements the Executor Close interface.
func (e *LoadDataExec) Close() error {
if e.runtimeStats != nil && e.loadDataInfo != nil && e.loadDataInfo.stats != nil {
Expand All @@ -103,6 +156,7 @@ type CommitTask struct {
}

// LoadDataInfo saves the information of loading data operation.
// TODO: rename it and remove unnecessary public methods.
type LoadDataInfo struct {
*InsertValues

Expand Down Expand Up @@ -132,6 +186,111 @@ type FieldMapping struct {
UserVar *ast.VariableExpr
}

// Load reads from readerFn and do load data job.
func (e *LoadDataInfo) Load(ctx context.Context, readerFn func() ([]byte, error)) error {
e.InitQueues()
e.SetMaxRowsInBatch(uint64(e.Ctx.GetSessionVars().DMLBatchSize))
e.StartStopWatcher()
// let stop watcher goroutine quit
defer e.ForceQuit()
err := sessiontxn.NewTxn(ctx, e.Ctx)
if err != nil {
return err
}
// processStream process input data, enqueue commit task
wg := new(sync.WaitGroup)
wg.Add(1)
go processStream(ctx, readerFn, e, wg)
err = e.CommitWork(ctx)
wg.Wait()
return err
}

// processStream process input stream from network
func processStream(ctx context.Context, readerFn func() ([]byte, error), loadDataInfo *LoadDataInfo, wg *sync.WaitGroup) {
var err error
var shouldBreak bool
var prevData, curData []byte
defer func() {
r := recover()
if r != nil {
logutil.Logger(ctx).Error("process routine panicked",
zap.Reflect("r", r),
zap.Stack("stack"))
}
if err != nil || r != nil {
loadDataInfo.ForceQuit()
} else {
loadDataInfo.CloseTaskQueue()
}
wg.Done()
}()
for {
curData, err = readerFn()
if err != nil {
if terror.ErrorNotEqual(err, io.EOF) {
logutil.Logger(ctx).Error("read data for LOAD DATA failed", zap.Error(err))
break
}
err = nil
sleepymole marked this conversation as resolved.
Show resolved Hide resolved
}
if len(curData) == 0 {
loadDataInfo.Drained = true
shouldBreak = true
if len(prevData) == 0 {
break
}
}
select {
case <-loadDataInfo.QuitCh:
err = errors.New("processStream forced to quit")
default:
}
if err != nil {
break
}
// prepare batch and enqueue task
prevData, err = insertDataWithCommit(ctx, prevData, curData, loadDataInfo)
if err != nil {
break
}
if shouldBreak {
break
}
}
if err != nil {
logutil.Logger(ctx).Error("load data process stream error", zap.Error(err))
return
}
if err = loadDataInfo.EnqOneTask(ctx); err != nil {
logutil.Logger(ctx).Error("load data process stream error", zap.Error(err))
return
}
}

func insertDataWithCommit(ctx context.Context, prevData,
curData []byte, loadDataInfo *LoadDataInfo) ([]byte, error) {
var err error
var reachLimit bool
for {
prevData, reachLimit, err = loadDataInfo.InsertData(ctx, prevData, curData)
if err != nil {
return nil, err
}
if !reachLimit {
break
}
// push into commit task queue
err = loadDataInfo.EnqOneTask(ctx)
if err != nil {
return prevData, err
}
curData = prevData
prevData = nil
}
return prevData, nil
}

// reorderColumns reorder the e.insertColumns according to the order of columnNames
// Note: We must ensure there must be one-to-one mapping between e.insertColumns and columnNames in terms of column name.
func (e *LoadDataInfo) reorderColumns(columnNames []string) error {
Expand Down
18 changes: 18 additions & 0 deletions executor/loadremotetest/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "loadremotetest_test",
srcs = [
"main_test.go",
"one_csv_test.go",
"util_test.go",
],
deps = [
"//executor",
"//kv",
"//testkit",
"@com_github_fsouza_fake_gcs_server//fakestorage",
"@com_github_stretchr_testify//suite",
"@org_uber_go_goleak//:goleak",
],
)
33 changes: 33 additions & 0 deletions executor/loadremotetest/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package loadremotetest

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/txnkv/transaction.keepAlive"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"),
goleak.IgnoreTopFunction("net.(*netFD).connect.func2"),
}
goleak.VerifyTestMain(m, opts...)
}
Loading