Skip to content

Commit

Permalink
Add LOAD INTO VALUES (#11633)
Browse files Browse the repository at this point in the history
Provide a quick way to insert data,  example is as follows:
```sql
load data  inline format='csv', data='1\n2\n' into table t1;

load data  inline format='csv', data=$XXX$
1
2
$XXX$
into table t1;
```

Approved by: @ouyuanning, @iamlinjunhong, @m-schen, @aunjgr, @heni02, @fengttt
  • Loading branch information
nnsgmsone authored and sukki37 committed Sep 17, 2023
1 parent c7ee5ba commit ec022d1
Show file tree
Hide file tree
Showing 12 changed files with 6,398 additions and 5,697 deletions.
2,025 changes: 1,254 additions & 771 deletions pkg/pb/plan/plan.pb.go

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions pkg/sql/colexec/external/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/morpc"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/common/util"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/bytejson"
"github.com/matrixorigin/matrixone/pkg/container/nulls"
Expand Down Expand Up @@ -97,7 +98,7 @@ func Prepare(proc *process.Process, arg any) error {
}
param.IgnoreLineTag = int(param.Extern.Tail.IgnoredLines)
param.IgnoreLine = param.IgnoreLineTag
if len(param.FileList) == 0 {
if len(param.FileList) == 0 && param.Extern.ScanType != tree.INLINE {
logutil.Warnf("no such file '%s'", param.Extern.Filepath)
param.Fileparam.End = true
}
Expand Down Expand Up @@ -139,7 +140,7 @@ func Call(idx int, proc *process.Process, arg any, isFirst bool, isLast bool) (p
proc.SetInputBatch(nil)
return process.ExecStop, nil
}
if param.plh == nil {
if param.plh == nil && param.Extern.ScanType != tree.INLINE {
if param.Fileparam.FileIndex >= len(param.FileList) {
proc.SetInputBatch(nil)
return process.ExecStop, nil
Expand Down Expand Up @@ -282,6 +283,9 @@ func FilterFileList(ctx context.Context, node *plan.Node, proc *process.Process,
}

func readFile(param *ExternalParam, proc *process.Process) (io.ReadCloser, error) {
if param.Extern.ScanType == tree.INLINE {
return io.NopCloser(bytes.NewReader(util.UnsafeStringToBytes(param.Extern.Data))), nil
}
if param.Extern.Local {
return io.NopCloser(proc.LoadLocalReader), nil
}
Expand Down
47 changes: 43 additions & 4 deletions pkg/sql/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"sync/atomic"
"time"

mokafka "github.com/matrixorigin/matrixone/pkg/stream/adapter/kafka"

"github.com/google/uuid"
"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/cnservice/cnclient"
Expand Down Expand Up @@ -1397,9 +1399,21 @@ func (c *Compile) compileExternScan(ctx context.Context, n *plan.Node) ([]*Scope
ID2Addr[i] = mcpu - tmp
}
param := &tree.ExternParam{}
err := json.Unmarshal([]byte(n.TableDef.Createsql), param)
if err != nil {
return nil, err
if n.ExternScan == nil || n.ExternScan.Type != tree.INLINE {
err := json.Unmarshal([]byte(n.TableDef.Createsql), param)
if err != nil {
return nil, err
}
} else {
param.ScanType = int(n.ExternScan.Type)
param.Data = n.ExternScan.Data
param.Format = n.ExternScan.Format
param.Tail = new(tree.TailParameter)
param.Tail.IgnoredLines = n.ExternScan.IgnoredLines
param.Tail.Fields = &tree.Fields{
Terminated: n.ExternScan.Terminated,
EnclosedBy: n.ExternScan.EnclosedBy[0],
}
}
if param.ScanType == tree.S3 {
if err := plan2.InitS3Param(param); err != nil {
Expand All @@ -1418,6 +1432,8 @@ func (c *Compile) compileExternScan(ctx context.Context, n *plan.Node) ([]*Scope
ID2Addr[i] = mcpu - tmp
}
}
} else if param.ScanType == tree.INLINE {
return c.compileExternValueScan(n, param)
} else {
if err := plan2.InitInfileParam(param); err != nil {
return nil, err
Expand All @@ -1426,6 +1442,7 @@ func (c *Compile) compileExternScan(ctx context.Context, n *plan.Node) ([]*Scope

param.FileService = c.proc.FileService
param.Ctx = c.ctx
var err error
var fileList []string
var fileSize []int64
if !param.Local {
Expand Down Expand Up @@ -1463,7 +1480,6 @@ func (c *Compile) compileExternScan(ctx context.Context, n *plan.Node) ([]*Scope

return []*Scope{ret}, nil
}

if param.Parallel && (external.GetCompressType(param, fileList[0]) != tree.NOCOMPRESS || param.Local) {
return c.compileExternScanParallel(n, param, fileList, fileSize)
}
Expand Down Expand Up @@ -1511,6 +1527,29 @@ func (c *Compile) compileExternScan(ctx context.Context, n *plan.Node) ([]*Scope
return ss, nil
}

func (c *Compile) compileExternValueScan(n *plan.Node, param *tree.ExternParam) ([]*Scope, error) {
ss := make([]*Scope, ncpu)
for i := 0; i < ncpu; i++ {
ss[i] = c.constructLoadMergeScope()
}
s := c.constructScopeForExternal(c.addr, false)
s.appendInstruction(vm.Instruction{
Op: vm.External,
Idx: c.anal.curr,
IsFirst: c.anal.isFirst,
Arg: constructExternal(n, param, c.ctx, nil, nil, nil),
})
_, arg := constructDispatchLocalAndRemote(0, ss, c.addr)
arg.FuncId = dispatch.SendToAnyLocalFunc
s.appendInstruction(vm.Instruction{
Op: vm.Dispatch,
Arg: arg,
})
ss[0].PreScopes = append(ss[0].PreScopes, s)
c.anal.isFirst = false
return ss, nil
}

// construct one thread to read the file data, then dispatch to mcpu thread to get the filedata for insert
func (c *Compile) compileExternScanParallel(n *plan.Node, param *tree.ExternParam, fileList []string, fileSize []int64) ([]*Scope, error) {
param.Parallel = false
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/parsers/dialect/mysql/keywords.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func init() {
"in": IN,
"index": INDEX,
"indexes": INDEXES,
"inline": INLINE,
"infile": INFILE,
"inout": INOUT,
"inner": INNER,
Expand Down
Loading

0 comments on commit ec022d1

Please sign in to comment.