Skip to content

Commit

Permalink
Merge pull request pingcap#6 from Xuanwo/parquetx
Browse files Browse the repository at this point in the history
dumpling: Implement parquet support
  • Loading branch information
crazycs520 authored Jan 7, 2022
2 parents b089f1e + 92bad95 commit 15c2585
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 7 deletions.
1 change: 1 addition & 0 deletions dumpling/export/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,7 @@ func adjustFileFormat(conf *Config) error {
return errors.Errorf("unsupported config.FileType '%s' when we specify --sql, please unset --filetype or set it to 'csv'", conf.FileType)
}
case FileFormatCSVString:
case FileFormatParquetString:
default:
return errors.Errorf("unknown config.FileType '%s'", conf.FileType)
}
Expand Down
2 changes: 2 additions & 0 deletions dumpling/export/ir.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type RowReceiverStringer interface {
type Stringer interface {
WriteToBuffer(*bytes.Buffer, bool)
WriteToBufferInCsv(*bytes.Buffer, bool, *csvOption)

Bytes() []byte
}

// RowReceiver is an interface which represents sql types that support bind address for *sql.Rows
Expand Down
45 changes: 45 additions & 0 deletions dumpling/export/sql_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"bytes"
"database/sql"
"fmt"
"github.com/xitongsys/parquet-go/writer"
)

var colTypeRowReceiverMap = map[string]func() RowReceiverStringer{}
Expand Down Expand Up @@ -229,6 +230,26 @@ func (r RowReceiverArr) WriteToBufferInCsv(bf *bytes.Buffer, escapeBackslash boo
}
}

// WriteToParquet implements Stringer.WriteToBufferInCsv
func (r RowReceiverArr) WriteToParquet(bf *writer.CSVWriter) int {
s := make([]*string, len(r.receivers))

size := 0

for i, receiver := range r.receivers {
bs := receiver.Bytes()
size += len(bs)
v := string(bs)
s[i] = &v
}

err := bf.WriteString(s)
if err != nil {
panic("failed to write to parquet")
}
return size
}

// SQLTypeNumber implements RowReceiverStringer which represents numeric type columns in database
type SQLTypeNumber struct {
SQLTypeString
Expand All @@ -252,6 +273,14 @@ func (s SQLTypeNumber) WriteToBufferInCsv(bf *bytes.Buffer, _ bool, opt *csvOpti
}
}

func (s SQLTypeNumber) Bytes() []byte {
if s.RawBytes != nil {
return s.RawBytes
} else {
return []byte(nullValue)
}
}

// SQLTypeString implements RowReceiverStringer which represents string type columns in database
type SQLTypeString struct {
sql.RawBytes
Expand Down Expand Up @@ -284,6 +313,14 @@ func (s *SQLTypeString) WriteToBufferInCsv(bf *bytes.Buffer, escapeBackslash boo
}
}

func (s SQLTypeString) Bytes() []byte {
if s.RawBytes != nil {
return s.RawBytes
} else {
return []byte(nullValue)
}
}

// SQLTypeBytes implements RowReceiverStringer which represents bytes type columns in database
type SQLTypeBytes struct {
sql.RawBytes
Expand Down Expand Up @@ -313,3 +350,11 @@ func (s *SQLTypeBytes) WriteToBufferInCsv(bf *bytes.Buffer, escapeBackslash bool
bf.WriteString(opt.nullValue)
}
}

func (s SQLTypeBytes) Bytes() []byte {
if s.RawBytes != nil {
return s.RawBytes
} else {
return []byte(nullValue)
}
}
2 changes: 2 additions & 0 deletions dumpling/export/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ func NewWriter(tctx *tcontext.Context, id int64, config *Config, conn *sql.Conn,
sw.fileFmt = FileFormatSQLText
case FileFormatCSVString:
sw.fileFmt = FileFormatCSV
case FileFormatParquetString:
sw.fileFmt = FileFormatParquet
}
return sw
}
Expand Down
134 changes: 134 additions & 0 deletions dumpling/export/writer_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"bytes"
"context"
"fmt"

"io"
"path"
"strings"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/prometheus/client_golang/prometheus"
"github.com/xitongsys/parquet-go/writer"
"go.uber.org/zap"

"github.com/pingcap/tidb/br/pkg/storage"
Expand Down Expand Up @@ -395,6 +397,128 @@ func WriteInsertInCsv(pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR
return counter, wp.Error()
}

const parquetFileLimit = 64 * 1024 * 1024 // 64MB

var parquetTypeMap = map[string]string{
"INT": "INT64",
"VARCHAR": "BYTE_ARRAY",
}

func WriteInsertInParquet(pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR TableDataIR, w storage.ExternalFileWriter) (u uint64, err error) {
fileRowIter := tblIR.Rows()
if !fileRowIter.HasNext() {
return 0, fileRowIter.Error()
}

// Build metadata that parquet needs.
md := make([]string, meta.ColumnCount())
for k, v := range meta.ColumnNames() {
ot := meta.ColumnTypes()[k]
pt, ok := parquetTypeMap[ot]
if !ok {
panic(fmt.Errorf("type %s is not supported", ot))
}
md[k] = fmt.Sprintf("name=%s, type=%s", v, pt)
}

bf := pool.Get().(*bytes.Buffer)
if bfCap := bf.Cap(); bfCap < lengthLimit {
bf.Grow(lengthLimit - bfCap)
}
parquetWriter, err := writer.NewCSVWriterFromWriter(md, bf, 4)
if err != nil {
panic(fmt.Errorf("init parquet writer: %v", err))
}

wp := newWriterPipe(w, cfg.FileSize, UnspecifiedSize, cfg.Labels)

// use context.Background here to make sure writerPipe can deplete all the chunks in pipeline
ctx, cancel := tcontext.Background().WithLogger(pCtx.L()).WithCancel()
var wg sync.WaitGroup
wg.Add(1)
go func() {
wp.Run(ctx)
wg.Done()
}()
defer func() {
cancel()
wg.Wait()
}()

var (
row = MakeRowReceiver(meta.ColumnTypes())
counter uint64
lastCounter uint64
selectedFields = meta.SelectedField()
)

defer func() {
if err != nil {
pCtx.L().Warn("fail to dumping table(chunk), will revert some metrics and start a retry if possible",
zap.String("database", meta.DatabaseName()),
zap.String("table", meta.TableName()),
zap.Uint64("finished rows", lastCounter),
zap.Uint64("finished size", wp.finishedFileSize),
log.ShortError(err))
SubGauge(finishedRowsGauge, cfg.Labels, float64(lastCounter))
SubGauge(finishedSizeGauge, cfg.Labels, float64(wp.finishedFileSize))
} else {
pCtx.L().Debug("finish dumping table(chunk)",
zap.String("database", meta.DatabaseName()),
zap.String("table", meta.TableName()),
zap.Uint64("finished rows", counter),
zap.Uint64("finished size", wp.finishedFileSize))
summary.CollectSuccessUnit(summary.TotalBytes, 1, wp.finishedFileSize)
summary.CollectSuccessUnit("total rows", 1, counter)
}
}()

for fileRowIter.HasNext() {
if selectedFields != "" {
if err = fileRowIter.Decode(row); err != nil {
return counter, errors.Trace(err)
}
size := row.WriteToParquet(parquetWriter)
wp.currentFileSize += uint64(size)
}
counter++

// We treat parquet as a single file instead of multiple buffers.
if wp.currentFileSize >= parquetFileLimit {
select {
case <-pCtx.Done():
return counter, pCtx.Err()
case err = <-wp.errCh:
return counter, err
default:
AddGauge(finishedRowsGauge, cfg.Labels, float64(counter-lastCounter))
lastCounter = counter

break
}
}

fileRowIter.Next()
}

if wp.currentFileSize > 0 {
err = parquetWriter.WriteStop()
if err != nil {
panic(fmt.Errorf("stop parquet writer: %v", err))
}
wp.input <- bf
}

close(wp.input)
<-wp.closed
AddGauge(finishedRowsGauge, cfg.Labels, float64(counter-lastCounter))
lastCounter = counter
if err = fileRowIter.Error(); err != nil {
return counter, errors.Trace(err)
}
return counter, wp.Error()
}

func write(tctx *tcontext.Context, writer storage.ExternalFileWriter, str string) error {
_, err := writer.Write(tctx, []byte(str))
if err != nil {
Expand Down Expand Up @@ -583,13 +707,17 @@ const (
FileFormatSQLText
// FileFormatCSV indicates the given file type is csv type
FileFormatCSV
// FileFormatParquet indicates the given file type is parquet type
FileFormatParquet
)

const (
// FileFormatSQLTextString indicates the string/suffix of sql type file
FileFormatSQLTextString = "sql"
// FileFormatCSVString indicates the string/suffix of csv type file
FileFormatCSVString = "csv"
// FileFormatParquetString indicates the string/suffix of parquet type file
FileFormatParquetString = "parquet"
)

// String implement Stringer.String method.
Expand All @@ -599,6 +727,8 @@ func (f FileFormat) String() string {
return strings.ToUpper(FileFormatSQLTextString)
case FileFormatCSV:
return strings.ToUpper(FileFormatCSVString)
case FileFormatParquet:
return strings.ToUpper(FileFormatParquetString)
default:
return "unknown"
}
Expand All @@ -613,6 +743,8 @@ func (f FileFormat) Extension() string {
return FileFormatSQLTextString
case FileFormatCSV:
return FileFormatCSVString
case FileFormatParquet:
return FileFormatParquetString
default:
return "unknown_format"
}
Expand All @@ -625,6 +757,8 @@ func (f FileFormat) WriteInsert(pCtx *tcontext.Context, cfg *Config, meta TableM
return WriteInsert(pCtx, cfg, meta, tblIR, w)
case FileFormatCSV:
return WriteInsertInCsv(pCtx, cfg, meta, tblIR, w)
case FileFormatParquet:
return WriteInsertInParquet(pCtx, cfg, meta, tblIR, w)
default:
return 0, errors.Errorf("unknown file format")
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ require (
github.com/uber/jaeger-client-go v2.22.1+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
github.com/wangjohn/quickselect v0.0.0-20161129230411-ed8402a42d5f
github.com/xitongsys/parquet-go v1.5.5-0.20201110004701-b09c49d6d457
github.com/xitongsys/parquet-go v1.6.2
github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0
go.etcd.io/etcd v0.5.0-alpha.5.0.20210512015243-d19fbe541bf9
go.uber.org/atomic v1.9.0
Expand Down
19 changes: 13 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,11 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alvaroloes/enumer v1.1.2/go.mod h1:FxrjvuXoDAx9isTJrv4c+T410zFi0DtXIT0m65DJ+Wo=
github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q=
github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516 h1:byKBBF2CKWBjjA4J1ZL2JXttJULvWSl50LegTyRZ728=
github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516/go.mod h1:QNYViu/X0HXDHw7m3KXzWSVXIbfUvJqBFe6Gj8/pYA0=
github.com/apache/thrift v0.0.0-20181112125854-24918abba929/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.13.1-0.20201008052519-daf620915714 h1:Jz3KVLYY5+JO7rDiX0sAuRGtuv2vG01r17Y9nLMWNUw=
github.com/apache/thrift v0.13.1-0.20201008052519-daf620915714/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.14.2 h1:hY4rAyg7Eqbb27GB6gkhUKrRAuc8xRjlNtJq+LseKeY=
github.com/apache/thrift v0.14.2/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/appleboy/gin-jwt/v2 v2.6.3/go.mod h1:MfPYA4ogzvOcVkRwAxT7quHOtQmVKDpTwxyUrC2DNw0=
github.com/appleboy/gofight/v2 v2.1.2/go.mod h1:frW+U1QZEdDgixycTj4CygQ48yLTUhplt43+Wczp3rw=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
Expand Down Expand Up @@ -327,6 +329,8 @@ github.com/gomodule/redigo v1.7.1-0.20190724094224-574c33c3df38/go.mod h1:B4C85q
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/flatbuffers v1.11.0 h1:O7CEyB8Cb3/DmtxODGtLHcEvpr81Jm5qLg/hsHnxA2A=
github.com/google/flatbuffers v1.11.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
Expand Down Expand Up @@ -461,9 +465,9 @@ github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0
github.com/klauspost/compress v1.9.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.10.5/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.7 h1:0hzRabrMN4tSTvMfnL3SCv1ZGeAP23ynzodBgaHeMeg=
github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.13.1 h1:wXr2uRxZTJXHLly6qhJabee5JqIhTRoLBhDOA74hDEQ=
github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/cpuid v1.2.1 h1:vJi+O/nMdFt0vqm8NZBI6wzALWdA2X+egi0ogNyrC/w=
github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down Expand Up @@ -565,6 +569,8 @@ github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 h1:JhzVVoYvbOACxoU
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2/go.mod h1:iIss55rKnNBTvrwdmkUpLnDpZoAHvWaiq5+iMmen4AE=
github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d h1:U+PMnTlV2tu7RuMK5etusZG3Cf+rpow5hqQByeCzJ2g=
github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d/go.mod h1:lXfE4PvvTW5xOjO6Mba8zDPyw8M93B6AQ7frTGnMlA8=
github.com/pierrec/lz4/v4 v4.1.8 h1:ieHkV+i2BRzngO4Wd/3HGowuZStgq6QkPsD1eolNAO4=
github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pingcap/badger v1.5.1-0.20210831093107-2f6cb8008145 h1:t7sdxmfyZ3p9K7gD8t5B50TerzTvHuAPYt+VubTVKDY=
github.com/pingcap/badger v1.5.1-0.20210831093107-2f6cb8008145/go.mod h1:LyrqUOHZrUDf9oGi1yoz1+qw9ckSIhQb5eMa1acOLNQ=
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ=
Expand Down Expand Up @@ -705,6 +711,7 @@ github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/y
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
Expand Down Expand Up @@ -775,8 +782,8 @@ github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQ
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xitongsys/parquet-go v1.5.1/go.mod h1:xUxwM8ELydxh4edHGegYq1pA8NnMKDx0K/GyB0o2bww=
github.com/xitongsys/parquet-go v1.5.5-0.20201110004701-b09c49d6d457 h1:tBbuFCtyJNKT+BFAv6qjvTFpVdy97IYNaBwGUXifIUs=
github.com/xitongsys/parquet-go v1.5.5-0.20201110004701-b09c49d6d457/go.mod h1:pheqtXeHQFzxJk45lRQ0UIGIivKnLXvialZSFWs81A8=
github.com/xitongsys/parquet-go v1.6.2 h1:MhCaXii4eqceKPu9BwrjLqyK10oX9WF+xGhwvwbw7xM=
github.com/xitongsys/parquet-go v1.6.2/go.mod h1:IulAQyalCm0rPiZVNnCgm/PCL64X2tdSVGMQ/UeKqWA=
github.com/xitongsys/parquet-go-source v0.0.0-20190524061010-2b72cbee77d5/go.mod h1:xxCx7Wpym/3QCo6JhujJX51dzSXrwmb0oH6FQb39SEA=
github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0 h1:a742S4V5A15F93smuVxA60LQWsrCnN8bKeWDBARU1/k=
github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0/go.mod h1:HYhIKsdns7xz80OgkbgJYrtQY7FjHWHKH6cvN7+czGE=
Expand Down

0 comments on commit 15c2585

Please sign in to comment.