Skip to content

Commit

Permalink
post rebase fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 10, 2024
1 parent 97b5d04 commit bfc18ce
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 84 deletions.
36 changes: 9 additions & 27 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"maps"
"net/url"
"slices"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -221,34 +222,14 @@ func Connect(ctx context.Context, env map[string]string, config *protos.Clickhou
tlsSetting.RootCAs = caPool
}

/*
// See: https://clickhouse.com/docs/en/cloud/reference/shared-merge-tree#consistency
settings := clickhouse.Settings{"select_sequential_consistency": uint64(1)}
if maxInsertThreads, err := peerdbenv.PeerDBClickHouseMaxInsertThreads(ctx, env); err != nil {
return nil, fmt.Errorf("failed to load max_insert_threads config: %w", err)
} else if maxInsertThreads != 0 {
settings["max_insert_threads"] = maxInsertThreads
}
// See: https://clickhouse.com/docs/en/cloud/reference/shared-merge-tree#consistency
settings := []ch.Setting{{Key: "select_sequential_consistency", Value: "1"}}
if maxInsertThreads, err := peerdbenv.PeerDBClickHouseMaxInsertThreads(ctx, env); err != nil {
return nil, fmt.Errorf("failed to load max_insert_threads config: %w", err)
} else if maxInsertThreads != 0 {
settings = append(settings, ch.Setting{Key: "max_insert_threads", Value: strconv.FormatInt(maxInsertThreads, 10)})
}

conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", config.Host, config.Port)},
Auth: clickhouse.Auth{
Database: config.Database,
Username: config.User,
Password: config.Password,
},
TLS: tlsSetting,
Compression: &clickhouse.Compression{Method: clickhouse.CompressionLZ4},
ClientInfo: clickhouse.ClientInfo{
Products: []struct {
Name string
Version string
}{
{Name: "peerdb"},
},
},
Settings: settings,
*/
conn, err := ch.Dial(ctx, ch.Options{
Address: fmt.Sprintf("%s:%d", config.Host, config.Port),
Database: config.Database,
Expand All @@ -259,6 +240,7 @@ func Connect(ctx context.Context, env map[string]string, config *protos.Clickhou
ClientName: "peerdb",
DialTimeout: 3600 * time.Second,
ReadTimeout: 3600 * time.Second,
Settings: settings,
})
if err != nil {
return nil, fmt.Errorf("failed to connect to ClickHouse peer: %w", err)
Expand Down
101 changes: 44 additions & 57 deletions flow/e2e/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package e2e_clickhouse
import (
"context"
"encoding/binary"
"errors"
"fmt"
"math/big"
"slices"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -96,21 +96,10 @@ func (s ClickHouseSuite) Teardown() {
e2e.TearDownPostgres(s)
}

// from clickhouse-go lib/column/bigint.go
func endianSwap(src []byte, not bool) {
for i := range len(src) / 2 {
if not {
src[i], src[len(src)-i-1] = ^src[len(src)-i-1], ^src[i]
} else {
src[i], src[len(src)-i-1] = src[len(src)-i-1], src[i]
}
}
}

// from clickhouse-go lib/column/bigint.go
func rawToBigInt(v []byte, signed bool) *big.Int {
// LittleEndian to BigEndian
endianSwap(v, false)
slices.Reverse(v)
lt := new(big.Int)
if signed && len(v) > 0 && v[0]&0x80 != 0 {
// [0] ^ will +1
Expand All @@ -130,23 +119,24 @@ func rawToBigInt(v []byte, signed bool) *big.Int {
func decimalRow(col chproto.ColResult, i int) decimal.Decimal {
typ := string(col.Type())
lparam := strings.LastIndex(typ, "(")
if lparam == -1 {
panic("no ( in " + typ)
}
params := typ[lparam+1:]
rparam := strings.Index(params, ")")
if rparam == -1 {
panic("no ) in params " + params + " of " + typ)
}
params = typ[:rparam]
_, scaleStr, ok := strings.Cut(params, ",")
if !ok {
panic("no , in params " + params + " of " + typ)
}
scaleStr = strings.TrimSpace(scaleStr)
scale, err := strconv.Atoi(scaleStr)
if err != nil {
panic("failed to parse scale " + scaleStr + ": " + err.Error())
scale := 0
if lparam != -1 {
params := typ[lparam+1:]
rparam := strings.Index(params, ")")
if rparam == -1 {
panic("no ) in params " + params + " of " + typ)
}
params = typ[:rparam]
_, scaleStr, ok := strings.Cut(params, ",")
if !ok {
panic("no , in params " + params + " of " + typ)
}
scaleStr = strings.TrimSpace(scaleStr)
var err error
scale, err = strconv.Atoi(scaleStr)
if err != nil {
panic("failed to parse scale " + scaleStr + ": " + err.Error())
}
}

var value decimal.Decimal
Expand Down Expand Up @@ -184,43 +174,24 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch
}
defer ch.Close()

firstCol, _, _ := strings.Cut(cols, ",")
if firstCol == "" {
return nil, errors.New("no columns specified")
}
batch := &model.QRecordBatch{}
var schema chproto.Results
if err := ch.Do(context.Background(), chgo.Query{
Body: fmt.Sprintf(`SELECT %s FROM %s FINAL WHERE _peerdb_is_deleted = 0 ORDER BY %s SETTINGS use_query_cache = false`,
cols, table, firstCol),
Body: fmt.Sprintf(`SELECT %s FROM %s FINAL WHERE _peerdb_is_deleted = 0 ORDER BY 1 SETTINGS use_query_cache = false`, cols, table),
Result: schema.Auto(),
OnResult: func(ctx context.Context, block chproto.Block) error {
if len(batch.Schema.Fields) == 0 {
for _, col := range schema {
nullable := strings.HasPrefix(string(col.Data.Type()), "Nullable(")
var qkind qvalue.QValueKind
switch col.Data.Type() {
case "String", "Nullable(String)":
qkind = qvalue.QValueKindString
case "Int32", "Nullable(Int32)":
qkind = qvalue.QValueKindInt32
case "DateTime64(6)", "Nullable(DateTime64(6))":
qkind = qvalue.QValueKindTimestamp
case "Date32", "Nullable(Date32)":
qkind = qvalue.QValueKindDate
default:
if strings.Contains(string(col.Data.Type()), "Decimal") {
qkind = qvalue.QValueKindNumeric
} else {
return fmt.Errorf("failed to resolve QValueKind for %s", col.Data.Type())
}
}
tableSchema, err := connclickhouse.GetTableSchemaForTable(table, schema)
if err != nil {
return err
}
for _, col := range tableSchema.Columns {
batch.Schema.Fields = append(batch.Schema.Fields, qvalue.QField{
Name: col.Name,
Type: qkind,
Type: qvalue.QValueKind(col.Type),
Precision: 0,
Scale: 0,
Nullable: nullable,
Nullable: col.Nullable,
})
}
}
Expand All @@ -237,6 +208,14 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch
}
case *chproto.ColStr:
qrow = append(qrow, qvalue.QValueString{Val: v.Row(idx)})
case *chproto.ColNullable[int16]:
if v.Nulls[idx] != 0 {
qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindInt32))
} else {
qrow = append(qrow, qvalue.QValueInt16{Val: v.Values.Row(idx)})
}
case *chproto.ColInt16:
qrow = append(qrow, qvalue.QValueInt16{Val: v.Row(idx)})
case *chproto.ColNullable[int32]:
if v.Nulls[idx] != 0 {
qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindInt32))
Expand All @@ -245,6 +224,14 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch
}
case *chproto.ColInt32:
qrow = append(qrow, qvalue.QValueInt32{Val: v.Row(idx)})
case *chproto.ColNullable[int64]:
if v.Nulls[idx] != 0 {
qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindInt32))
} else {
qrow = append(qrow, qvalue.QValueInt64{Val: v.Values.Row(idx)})
}
case *chproto.ColInt64:
qrow = append(qrow, qvalue.QValueInt64{Val: v.Row(idx)})
case *chproto.ColNullable[time.Time]:
if v.Nulls[idx] != 0 {
qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindTimestamp))
Expand Down

0 comments on commit bfc18ce

Please sign in to comment.