From bfc18cebf6ae5318aea352f674f98e44d98a4730 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 10 Dec 2024 17:53:57 +0000 Subject: [PATCH] post rebase fixes --- flow/connectors/clickhouse/clickhouse.go | 36 ++------ flow/e2e/clickhouse/clickhouse.go | 101 ++++++++++------------- 2 files changed, 53 insertions(+), 84 deletions(-) diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 8a7950fc3d..76f12c6673 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -11,6 +11,7 @@ import ( "maps" "net/url" "slices" + "strconv" "strings" "time" @@ -221,34 +222,14 @@ func Connect(ctx context.Context, env map[string]string, config *protos.Clickhou tlsSetting.RootCAs = caPool } - /* - // See: https://clickhouse.com/docs/en/cloud/reference/shared-merge-tree#consistency - settings := clickhouse.Settings{"select_sequential_consistency": uint64(1)} - if maxInsertThreads, err := peerdbenv.PeerDBClickHouseMaxInsertThreads(ctx, env); err != nil { - return nil, fmt.Errorf("failed to load max_insert_threads config: %w", err) - } else if maxInsertThreads != 0 { - settings["max_insert_threads"] = maxInsertThreads - } + // See: https://clickhouse.com/docs/en/cloud/reference/shared-merge-tree#consistency + settings := []ch.Setting{{Key: "select_sequential_consistency", Value: "1"}} + if maxInsertThreads, err := peerdbenv.PeerDBClickHouseMaxInsertThreads(ctx, env); err != nil { + return nil, fmt.Errorf("failed to load max_insert_threads config: %w", err) + } else if maxInsertThreads != 0 { + settings = append(settings, ch.Setting{Key: "max_insert_threads", Value: strconv.FormatInt(maxInsertThreads, 10)}) + } - conn, err := clickhouse.Open(&clickhouse.Options{ - Addr: []string{fmt.Sprintf("%s:%d", config.Host, config.Port)}, - Auth: clickhouse.Auth{ - Database: config.Database, - Username: config.User, - Password: config.Password, - }, - TLS: tlsSetting, - Compression: &clickhouse.Compression{Method: clickhouse.CompressionLZ4}, - ClientInfo: clickhouse.ClientInfo{ - Products: []struct { - Name string - Version string - }{ - {Name: "peerdb"}, - }, - }, - Settings: settings, - */ conn, err := ch.Dial(ctx, ch.Options{ Address: fmt.Sprintf("%s:%d", config.Host, config.Port), Database: config.Database, @@ -259,6 +240,7 @@ func Connect(ctx context.Context, env map[string]string, config *protos.Clickhou ClientName: "peerdb", DialTimeout: 3600 * time.Second, ReadTimeout: 3600 * time.Second, + Settings: settings, }) if err != nil { return nil, fmt.Errorf("failed to connect to ClickHouse peer: %w", err) diff --git a/flow/e2e/clickhouse/clickhouse.go b/flow/e2e/clickhouse/clickhouse.go index d342227944..c623ddd005 100644 --- a/flow/e2e/clickhouse/clickhouse.go +++ b/flow/e2e/clickhouse/clickhouse.go @@ -3,9 +3,9 @@ package e2e_clickhouse import ( "context" "encoding/binary" - "errors" "fmt" "math/big" + "slices" "strconv" "strings" "testing" @@ -96,21 +96,10 @@ func (s ClickHouseSuite) Teardown() { e2e.TearDownPostgres(s) } -// from clickhouse-go lib/column/bigint.go -func endianSwap(src []byte, not bool) { - for i := range len(src) / 2 { - if not { - src[i], src[len(src)-i-1] = ^src[len(src)-i-1], ^src[i] - } else { - src[i], src[len(src)-i-1] = src[len(src)-i-1], src[i] - } - } -} - // from clickhouse-go lib/column/bigint.go func rawToBigInt(v []byte, signed bool) *big.Int { // LittleEndian to BigEndian - endianSwap(v, false) + slices.Reverse(v) lt := new(big.Int) if signed && len(v) > 0 && v[0]&0x80 != 0 { // [0] ^ will +1 @@ -130,23 +119,24 @@ func rawToBigInt(v []byte, signed bool) *big.Int { func decimalRow(col chproto.ColResult, i int) decimal.Decimal { typ := string(col.Type()) lparam := strings.LastIndex(typ, "(") - if lparam == -1 { - panic("no ( in " + typ) - } - params := typ[lparam+1:] - rparam := strings.Index(params, ")") - if rparam == -1 { - panic("no ) in params " + params + " of " + typ) - } - params = typ[:rparam] - _, scaleStr, ok := strings.Cut(params, ",") - if !ok { - panic("no , in params " + params + " of " + typ) - } - scaleStr = strings.TrimSpace(scaleStr) - scale, err := strconv.Atoi(scaleStr) - if err != nil { - panic("failed to parse scale " + scaleStr + ": " + err.Error()) + scale := 0 + if lparam != -1 { + params := typ[lparam+1:] + rparam := strings.Index(params, ")") + if rparam == -1 { + panic("no ) in params " + params + " of " + typ) + } + params = typ[:rparam] + _, scaleStr, ok := strings.Cut(params, ",") + if !ok { + panic("no , in params " + params + " of " + typ) + } + scaleStr = strings.TrimSpace(scaleStr) + var err error + scale, err = strconv.Atoi(scaleStr) + if err != nil { + panic("failed to parse scale " + scaleStr + ": " + err.Error()) + } } var value decimal.Decimal @@ -184,43 +174,24 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch } defer ch.Close() - firstCol, _, _ := strings.Cut(cols, ",") - if firstCol == "" { - return nil, errors.New("no columns specified") - } batch := &model.QRecordBatch{} var schema chproto.Results if err := ch.Do(context.Background(), chgo.Query{ - Body: fmt.Sprintf(`SELECT %s FROM %s FINAL WHERE _peerdb_is_deleted = 0 ORDER BY %s SETTINGS use_query_cache = false`, - cols, table, firstCol), + Body: fmt.Sprintf(`SELECT %s FROM %s FINAL WHERE _peerdb_is_deleted = 0 ORDER BY 1 SETTINGS use_query_cache = false`, cols, table), Result: schema.Auto(), OnResult: func(ctx context.Context, block chproto.Block) error { if len(batch.Schema.Fields) == 0 { - for _, col := range schema { - nullable := strings.HasPrefix(string(col.Data.Type()), "Nullable(") - var qkind qvalue.QValueKind - switch col.Data.Type() { - case "String", "Nullable(String)": - qkind = qvalue.QValueKindString - case "Int32", "Nullable(Int32)": - qkind = qvalue.QValueKindInt32 - case "DateTime64(6)", "Nullable(DateTime64(6))": - qkind = qvalue.QValueKindTimestamp - case "Date32", "Nullable(Date32)": - qkind = qvalue.QValueKindDate - default: - if strings.Contains(string(col.Data.Type()), "Decimal") { - qkind = qvalue.QValueKindNumeric - } else { - return fmt.Errorf("failed to resolve QValueKind for %s", col.Data.Type()) - } - } + tableSchema, err := connclickhouse.GetTableSchemaForTable(table, schema) + if err != nil { + return err + } + for _, col := range tableSchema.Columns { batch.Schema.Fields = append(batch.Schema.Fields, qvalue.QField{ Name: col.Name, - Type: qkind, + Type: qvalue.QValueKind(col.Type), Precision: 0, Scale: 0, - Nullable: nullable, + Nullable: col.Nullable, }) } } @@ -237,6 +208,14 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch } case *chproto.ColStr: qrow = append(qrow, qvalue.QValueString{Val: v.Row(idx)}) + case *chproto.ColNullable[int16]: + if v.Nulls[idx] != 0 { + qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindInt32)) + } else { + qrow = append(qrow, qvalue.QValueInt16{Val: v.Values.Row(idx)}) + } + case *chproto.ColInt16: + qrow = append(qrow, qvalue.QValueInt16{Val: v.Row(idx)}) case *chproto.ColNullable[int32]: if v.Nulls[idx] != 0 { qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindInt32)) @@ -245,6 +224,14 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch } case *chproto.ColInt32: qrow = append(qrow, qvalue.QValueInt32{Val: v.Row(idx)}) + case *chproto.ColNullable[int64]: + if v.Nulls[idx] != 0 { + qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindInt32)) + } else { + qrow = append(qrow, qvalue.QValueInt64{Val: v.Values.Row(idx)}) + } + case *chproto.ColInt64: + qrow = append(qrow, qvalue.QValueInt64{Val: v.Row(idx)}) case *chproto.ColNullable[time.Time]: if v.Nulls[idx] != 0 { qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindTimestamp))