Skip to content

Commit

Permalink
prototype: push clickhouse cdc directly
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Oct 9, 2024
1 parent 8ae44d3 commit d78e065
Showing 1 changed file with 26 additions and 9 deletions.
35 changes: 26 additions & 9 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (c *ClickHouseConnector) avroSyncMethod(flowJobName string) *ClickHouseAvro
return NewClickHouseAvroSyncMethod(qrepConfig, c)
}

func (c *ClickHouseConnector) syncRecordsViaAvro(
func (c *ClickHouseConnector) syncRecords(
ctx context.Context,
req *model.SyncRecordsRequest[model.RecordItems],
syncBatchID int64,
Expand All @@ -87,14 +87,32 @@ func (c *ClickHouseConnector) syncRecordsViaAvro(
return nil, fmt.Errorf("failed to convert records to raw table stream: %w", err)
}

avroSyncer := c.avroSyncMethod(req.FlowJobName)
numRecords, err := avroSyncer.SyncRecords(ctx, stream, req.FlowJobName, syncBatchID)
batch, err := c.database.PrepareBatch(ctx, "INSERT INTO "+c.getRawTableName(req.FlowJobName))
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to begin inserting batch: %w", err)
}
numRecords := 0
row := make([]any, 0, 8)
for record := range stream.Records {
row = row[:0]
for _, qv := range record {
switch val := qv.(type) {
case qvalue.QValueString:
row = append(row, val.Val)
case qvalue.QValueInt64:
row = append(row, val.Val)
}
}
if err := batch.Append(row...); err != nil {
return nil, fmt.Errorf("failed to append to batch")

Check failure on line 107 in flow/connectors/clickhouse/cdc.go

View workflow job for this annotation

GitHub Actions / lint

fmt.Errorf can be replaced with errors.New (perfsprint)
}
numRecords += 1
}
if err := batch.Send(); err != nil {
return nil, fmt.Errorf("failed to send batch: %w", err)
}

err = c.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas)
if err != nil {
if err := c.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas); err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}

Expand All @@ -108,13 +126,12 @@ func (c *ClickHouseConnector) syncRecordsViaAvro(
}

func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error) {
res, err := c.syncRecordsViaAvro(ctx, req, req.SyncBatchID)
res, err := c.syncRecords(ctx, req, req.SyncBatchID)
if err != nil {
return nil, err
}

err = c.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, res.LastSyncedCheckpointID)
if err != nil {
if err := c.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, res.LastSyncedCheckpointID); err != nil {
c.logger.Error("failed to increment id", slog.Any("error", err))
return nil, err
}
Expand Down

0 comments on commit d78e065

Please sign in to comment.