diff --git a/chsql/src/duck_flock.cpp b/chsql/src/duck_flock.cpp index f27a11a..90e3588 100644 --- a/chsql/src/duck_flock.cpp +++ b/chsql/src/duck_flock.cpp @@ -1,8 +1,9 @@ #ifndef DUCK_FLOCK_H #define DUCK_FLOCK_H #include "chsql_extension.hpp" + namespace duckdb { - struct DuckFlockData : FunctionData{ + struct DuckFlockData : FunctionData { vector> conn; vector> results; unique_ptr Copy() const override { @@ -13,66 +14,123 @@ namespace duckdb { }; }; - - unique_ptr DuckFlockBind(ClientContext &context, TableFunctionBindInput &input, - vector &return_types, vector &names) { + vector &return_types, vector &names) { auto data = make_uniq(); + + // Check for NULL input parameters + if (input.inputs.empty() || input.inputs.size() < 2) { + throw std::runtime_error("url_flock: missing required parameters"); + } + if (input.inputs[0].IsNull() || input.inputs[1].IsNull()) { + throw std::runtime_error("url_flock: NULL parameters are not allowed"); + } + auto strQuery = input.inputs[0].GetValue(); - vector flock; + if (strQuery.empty()) { + throw std::runtime_error("url_flock: empty query string"); + } + auto &raw_flock = ListValue::GetChildren(input.inputs[1]); + if (raw_flock.empty()) { + throw std::runtime_error("url_flock: empty flock list"); + } + + bool has_valid_result = false; + // Process each connection for (auto &duck : raw_flock) { - flock.push_back(duck.ToString()); - auto conn = make_uniq(*context.db); - conn->Query("SET autoload_known_extensions=1;SET autoinstall_known_extensions=1;"); - auto req = conn->Prepare("SELECT * FROM read_json($2 || '/?default_format=JSONEachRow&query=' || url_encode($1::VARCHAR))"); - if (req->HasError()) { - throw std::runtime_error("duck_flock: error: " + req->GetError()); + if (duck.IsNull() || duck.ToString().empty()) { + continue; + } + + try { + auto conn = make_uniq(*context.db); + if (!conn) { + continue; + } + + auto settingResult = conn->Query("SET autoload_known_extensions=1;SET autoinstall_known_extensions=1;"); + if (settingResult->HasError()) { + continue; + } + + auto req = conn->Prepare("SELECT * FROM read_json($2 || '/?default_format=JSONEachRow&query=' || url_encode($1::VARCHAR))"); + if (req->HasError()) { + continue; + } + + auto queryResult = req->Execute(strQuery.c_str(), duck.ToString()); + if (!queryResult || queryResult->HasError()) { + continue; + } + + // Store the first valid result's types and names + if (!has_valid_result) { + return_types.clear(); + copy(queryResult->types.begin(), queryResult->types.end(), back_inserter(return_types)); + names.clear(); + copy(queryResult->names.begin(), queryResult->names.end(), back_inserter(names)); + + if (return_types.empty()) { + throw std::runtime_error("url_flock: query must return at least one column"); + } + has_valid_result = true; + } + + data->conn.push_back(std::move(conn)); + data->results.push_back(std::move(queryResult)); + } catch (const std::exception &e) { + continue; } - data->conn.push_back(std::move(conn)); - data->results.push_back(std::move(req->Execute(strQuery.c_str(), duck.ToString()))); } - if (data->results[0]->HasError()) { - throw std::runtime_error("duck_flock: error: " + data->results[0]->GetError()); + + // Verify we have at least one valid result + if (!has_valid_result || data->results.empty()) { + throw std::runtime_error("url_flock: invalid or no results"); } - return_types.clear(); - copy(data->results[0]->types.begin(), data->results[0]->types.end(), back_inserter(return_types)); - names.clear(); - copy(data->results[0]->names.begin(), data->results[0]->names.end(), back_inserter(names)); + return std::move(data); } - void DuckFlockImplementation(ClientContext &context, duckdb::TableFunctionInput &data_p, - DataChunk &output) { + void DuckFlockImplementation(ClientContext &context, TableFunctionInput &data_p, + DataChunk &output) { auto &data = data_p.bind_data->Cast(); + + if (data.results.empty()) { + return; + } + for (const auto &res : data.results) { + if (!res) { + continue; + } + ErrorData error_data; unique_ptr data_chunk = make_uniq(); - if (res->TryFetch(data_chunk, error_data)) { - if (data_chunk != nullptr) { - output.Append(*data_chunk); - return; + + try { + if (res->TryFetch(data_chunk, error_data)) { + if (data_chunk && !data_chunk->size() == 0) { + output.Append(*data_chunk); + return; + } } + } catch (...) { + continue; } } } TableFunction DuckFlockTableFunction() { - TableFunction f( - "url_flock", - {LogicalType::VARCHAR, LogicalType::LIST(LogicalType::VARCHAR)}, - DuckFlockImplementation, - DuckFlockBind, - nullptr, - nullptr - ); - return f; + TableFunction f( + "url_flock", + {LogicalType::VARCHAR, LogicalType::LIST(LogicalType::VARCHAR)}, + DuckFlockImplementation, + DuckFlockBind, + nullptr, + nullptr + ); + return f; } - - } - - - - #endif