diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index d8067f2d58..a10bc49e64 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -18,8 +18,8 @@ import ( ) const ( - checkIfTableExistsSQL = `SELECT exists(SELECT 1 FROM system.tables WHERE database = %s AND name = %s) AS table_exists;` - dropTableIfExistsSQL = `DROP TABLE IF EXISTS %s;` + checkIfTableExistsSQL = `SELECT exists(SELECT 1 FROM system.tables WHERE database = %s AND name = %s) AS table_exists` + dropTableIfExistsSQL = `DROP TABLE IF EXISTS %s` ) // getRawTableName returns the raw table name for the given table identifier. @@ -36,6 +36,9 @@ func (c *ClickHouseConnector) checkIfTableExists(ctx context.Context, databaseNa {Name: "table_exists", Data: &existsC}, }, OnResult: func(ctx context.Context, block chproto.Block) error { + if block.Rows == 0 && block.Info.Overflows { + return nil + } if block.Rows != 1 { return fmt.Errorf("[clickhouse] checkIfTableExists: expected 1 row, got %d", block.Rows) } diff --git a/flow/connectors/core.go b/flow/connectors/core.go index e7dc2bf688..9facc671a8 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -426,8 +426,7 @@ func GetByNameAs[T Connector](ctx context.Context, env map[string]string, catalo } func CloseConnector(ctx context.Context, conn Connector) { - err := conn.Close() - if err != nil { + if err := conn.Close(); err != nil { logger.LoggerFromCtx(ctx).Error("error closing connector", slog.Any("error", err)) } }