diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 53a1434606..9f7defa612 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -2,7 +2,6 @@ package connclickhouse import ( "context" - "errors" "fmt" "log/slog" "strings" @@ -18,8 +17,8 @@ import ( ) const ( - checkIfTableExistsSQL = `SELECT exists(SELECT 1 FROM system.tables WHERE database = %s AND name = %s) AS table_exists;` - dropTableIfExistsSQL = `DROP TABLE IF EXISTS %s;` + checkIfTableExistsSQL = `SELECT exists(SELECT 1 FROM system.tables WHERE database = %s AND name = %s) AS table_exists` + dropTableIfExistsSQL = `DROP TABLE IF EXISTS %s` ) // getRawTableName returns the raw table name for the given table identifier. @@ -36,16 +35,13 @@ func (c *ClickHouseConnector) checkIfTableExists(ctx context.Context, databaseNa {Name: "table_exists", Data: &existsC}, }, OnResult: func(ctx context.Context, block chproto.Block) error { - if block.Rows != 1 { - return fmt.Errorf("[clickhouse] checkIfTableExists: expected 1 row, got %d", block.Rows) - } - if block.Info.Overflows { - return errors.New("[clickhouse] checkIfTableExists: expected 1 row, got block with overflow") - } return nil }, }); err != nil { - return false, fmt.Errorf("error while reading result row: %w", err) + return false, fmt.Errorf("[clickhouse] checkIfTableExists: error in query: %w", err) + } + if len(existsC) != 1 { + return false, fmt.Errorf("[clickhouse] checkIfTableExists: expected 1 row, got %d", len(existsC)) } return existsC[0] != 0, nil }