Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #70 - Add Serverless Support #73

Merged
merged 21 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
b401f48
Initial support for OpenSearch Serverless, based simply on parsing URL
currantw Nov 13, 2024
b855e2d
Minor unrelated cleanup
currantw Nov 13, 2024
8491671
Initial updates to add `IsServerless` configuration option.
currantw Nov 14, 2024
c490706
Add `.idea/*` to project `.gitignore`
currantw Nov 14, 2024
0820b35
Fix small errors
currantw Nov 15, 2024
44336bf
Update documentation
currantw Nov 15, 2024
f39036d
Minor cleanuo
currantw Nov 15, 2024
29ac366
Fix format string error
currantw Nov 15, 2024
929088e
Exclude `.pyc` files from test runner, which supports Python in venv
currantw Nov 15, 2024
aa76d77
Add `is_serverless` member variable to avoid repeatedly determining it.
currantw Nov 15, 2024
63ffb0c
Also exclude `.o` and `.d` object files from test runner.
currantw Nov 16, 2024
7fcfa87
Rename configuration option from `IsServerless` to `ServerlessOverrid…
currantw Nov 18, 2024
f0b3abf
Add default `is_servrless` value to connection tests.
currantw Nov 18, 2024
caefd49
Revert unintentional change.
currantw Nov 18, 2024
ec3386a
Add clarifying comments.
currantw Nov 18, 2024
caf9d39
Add constant for OpenSearch distribution.
currantw Nov 19, 2024
4e55ffe
Rename from `is_serverless` to `is_aoss_serverless`.
currantw Nov 19, 2024
8e2f718
Update to first determine if serverless via URL, then default to usin…
currantw Nov 19, 2024
bed6886
Simplify logic, remove tertiary
currantw Nov 20, 2024
c5b48f7
Remove `ServerlessOverride` configuration option.
currantw Nov 21, 2024
effccef
Minor comment cleanup
currantw Nov 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ lib64/*
*.tlog
include/*
packages/*
# Visual Studio
.vs/*
# Visual Studio Code
.vscode/*
src/psqlodbc/psqlodbcBuilder/x64_*
src/*/x64
src/*/Win32
Expand Down Expand Up @@ -60,3 +56,8 @@ CTestTestfile.cmake
/src/PowerBIConnector/obj/
/src/PowerBIConnector/.vs/
src/vcpkg_installed/

# IDEs
.vs/*
.vscode/*
.idea/*
currantw marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 2 additions & 2 deletions docs/dev/BUILD_INSTRUCTIONS.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ See [run_tests.md](./run_tests.md)

**BUILD_WITH_TESTS**

(Defaults to ON) If disabled, all tests and and test dependencies will be excluded from build which will optimize the installer package size. This option can set with the command line (using `-D`).
(Defaults to ON) If disabled, all tests and test dependencies will be excluded from build which will optimize the installer package size. This option can set with the command line (using `-D`).

### Working With SSL/TLS

Expand All @@ -119,7 +119,7 @@ If you plan to use OpenSearch Dashboards, as suggested for this project, you mus

### Setting up a DSN

A **D**ata **S**ouce **N**ame is used to store driver information in the system. By storing the information in the system, the information does not need to be specified each time the driver connects.
A **D**ata **S**ource **N**ame is used to store driver information in the system. By storing the information in the system, the information does not need to be specified each time the driver connects.

#### Windows

Expand Down
5 changes: 3 additions & 2 deletions src/TestRunner/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
PERFORMANCE_INFO = "performance_info"
PERFORMANCE_RESULTS = "performance_results"
EXCLUDE_EXTENSION_LIST = (
".py", ".c", ".cmake", ".log",
".py", ".pyc", ".c", ".cmake", ".log",
".pdb", ".dll", ".sln", ".vcxproj", ".user",
".tlog", ".lastbuildstate", ".filters",
".obj", ".exp", ".lib", ".h", ".cpp", ".ilk")
".obj", ".o", ".d", ".exp", ".lib", ".h",
".cpp", ".ilk")
total_failures = 0
SYNC_START = "%%__PARSE__SYNC__START__%%"
SYNC_SEP = "%%__SEP__%%"
Expand Down
47 changes: 34 additions & 13 deletions src/sqlodbc/dlg_specific.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,41 @@ void makeConnectString(char *connect_string, const ConnInfo *ci, UWORD len) {
encode(ci->password, encoded_item, sizeof(encoded_item));
/* fundamental info */
nlen = MAX_CONNECT_STRING;

const char* connect_format_string =
"%s=%s;"
INI_SERVER "=%s;"
"database=OpenSearch;"
INI_PORT "=%s;"
INI_USERNAME_ABBR "=%s;"
INI_PASSWORD_ABBR "=%s;"
INI_AUTH_MODE "=%s;"
INI_REGION "=%s;"
INI_TUNNEL_HOST "=%s;"
INI_SSL_USE "=%d;"
INI_SSL_HOST_VERIFY "=%d;"
INI_LOG_LEVEL "=%d;"
INI_LOG_OUTPUT "=%s;"
INI_TIMEOUT "=%s;"
INI_FETCH_SIZE "=%s;";

olen = snprintf(
connect_string, nlen,
"%s=%s;" INI_SERVER
"=%s;"
"database=OpenSearch;" INI_PORT "=%s;" INI_USERNAME_ABBR
"=%s;" INI_PASSWORD_ABBR "=%s;" INI_AUTH_MODE "=%s;" INI_REGION
"=%s;" INI_TUNNEL_HOST "=%s;" INI_SSL_USE "=%d;" INI_SSL_HOST_VERIFY
"=%d;" INI_LOG_LEVEL "=%d;" INI_LOG_OUTPUT "=%s;" INI_TIMEOUT "=%s;"
INI_FETCH_SIZE "=%s;",
connect_format_string,
got_dsn ? "DSN" : "DRIVER", got_dsn ? ci->dsn : ci->drivername,
ci->server, ci->port, ci->username, encoded_item, ci->authtype,
ci->region, ci->tunnel_host, (int)ci->use_ssl, (int)ci->verify_server,
(int)ci->drivers.loglevel, ci->drivers.output_dir,
ci->response_timeout, ci->fetch_size);
ci->server,
ci->port,
ci->username,
encoded_item,
ci->authtype,
ci->region,
ci->tunnel_host,
(int)ci->use_ssl,
(int)ci->verify_server,
(int)ci->drivers.loglevel,
ci->drivers.output_dir,
ci->response_timeout,
ci->fetch_size);
if (olen < 0 || olen >= nlen) {
connect_string[0] = '\0';
return;
Expand Down Expand Up @@ -141,7 +162,7 @@ static void getCiDefaults(ConnInfo *ci) {
strncpy(ci->port, DEFAULT_PORT, SMALL_REGISTRY_LEN);
strncpy(ci->response_timeout, DEFAULT_RESPONSE_TIMEOUT_STR,
SMALL_REGISTRY_LEN);
strncpy(ci->fetch_size, DEFAULT_FETCH_SIZE_STR,
strncpy(ci->fetch_size, DEFAULT_FETCH_SIZE,
SMALL_REGISTRY_LEN);
strncpy(ci->authtype, DEFAULT_AUTHTYPE, MEDIUM_REGISTRY_LEN);
if (ci->password.name != NULL)
Expand Down Expand Up @@ -455,7 +476,7 @@ void CC_conninfo_init(ConnInfo *conninfo, UInt4 option) {
strncpy(conninfo->port, DEFAULT_PORT, SMALL_REGISTRY_LEN);
strncpy(conninfo->response_timeout, DEFAULT_RESPONSE_TIMEOUT_STR,
SMALL_REGISTRY_LEN);
strncpy(conninfo->fetch_size, DEFAULT_FETCH_SIZE_STR,
strncpy(conninfo->fetch_size, DEFAULT_FETCH_SIZE,
SMALL_REGISTRY_LEN);
strncpy(conninfo->authtype, DEFAULT_AUTHTYPE, MEDIUM_REGISTRY_LEN);
if (conninfo->password.name != NULL)
Expand Down
3 changes: 1 addition & 2 deletions src/sqlodbc/dlg_specific.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ extern "C" {
#define INI_TIMEOUT "responseTimeout"
#define INI_FETCH_SIZE "fetchSize"

#define DEFAULT_FETCH_SIZE -1
#define DEFAULT_FETCH_SIZE_STR "-1"
currantw marked this conversation as resolved.
Show resolved Hide resolved
#define DEFAULT_FETCH_SIZE "-1"
#define DEFAULT_RESPONSE_TIMEOUT 10 // Seconds
#define DEFAULT_RESPONSE_TIMEOUT_STR "10"
#define DEFAULT_AUTHTYPE "NONE"
Expand Down
113 changes: 81 additions & 32 deletions src/sqlodbc/opensearch_communication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,20 @@
#include <aws/core/http/HttpClient.h>
// clang-format on

#define SQL_ENDPOINT_ERROR_STR "Error"
static const std::string SQL_ENDPOINT_OPENSEARCH = "/_plugins/_sql";
static const std::string SQL_ENDPOINT_ELASTICSEARCH = "/_opendistro/_sql";
static const std::string SQL_ENDPOINT_ERROR = "Error";

static const std::string SERVICE_NAME_DEFAULT = "es";
static const std::string SERVICE_NAME_AOSS_SERVERLESS = "aoss";

static const std::string CREDENTIALS_PROFILE = "opensearchodbc";
static const std::string CREDENTIALS_PROVIDER_ALLOCATION_TAG =
"CREDENTIAL_PROVIDER";

static const std::string DISTRIBUTION_OPENSEARCH = "opensearch";

static const std::string ctype = "application/json";
static const std::string ALLOCATION_TAG = "AWS_SIGV4_AUTH";
static const std::string SERVICE_NAME = "es";
static const std::string ESODBC_PROFILE_NAME = "opensearchodbc";
static const std::string ERROR_MSG_PREFIX =
"[OpenSearch][SQL ODBC Driver][SQL Plugin] ";
static const std::string JSON_SCHEMA =
Expand Down Expand Up @@ -346,7 +354,7 @@ bool OpenSearchCommunication::CheckConnectionOptions() {
SetErrorDetails("Auth error", m_error_message,
ConnErrorType::CONN_ERROR_INVALID_AUTH);
}
} else if (m_rt_opts.conn.server == "") {
} else if (m_rt_opts.conn.server.empty()) {
m_error_message = "Host connection option was not specified.";
SetErrorDetails("Connection error", m_error_message,
ConnErrorType::CONN_ERROR_UNABLE_TO_ESTABLISH);
Expand All @@ -364,14 +372,14 @@ bool OpenSearchCommunication::CheckConnectionOptions() {
ConnErrorType::CONN_ERROR_UNABLE_TO_ESTABLISH);
}

if (m_error_message != "") {
if (!m_error_message.empty()) {
LogMsg(OPENSEARCH_ERROR, m_error_message.c_str());
m_valid_connection_options = false;
return false;
} else {
LogMsg(OPENSEARCH_DEBUG, "Required connection option are valid.");
m_valid_connection_options = true;
}

LogMsg(OPENSEARCH_DEBUG, "Required connection option are valid.");
m_valid_connection_options = true;
return m_valid_connection_options;
}

Expand Down Expand Up @@ -400,7 +408,7 @@ OpenSearchCommunication::IssueRequest(
const std::string& fetch_size, const std::string& cursor) {
// Generate http request
Aws::Http::URI host(m_rt_opts.conn.server.c_str());
if (m_rt_opts.conn.port.length() > 0) {
if (!m_rt_opts.conn.port.empty()) {
host.SetPort((uint16_t) atoi(m_rt_opts.conn.port.c_str()));
}
host.SetPath(endpoint.c_str());
Expand Down Expand Up @@ -433,7 +441,9 @@ OpenSearchCommunication::IssueRequest(
}

// Handle authentication
if (m_rt_opts.auth.auth_type == AUTHTYPE_BASIC) {
const std::string& auth_type = m_rt_opts.auth.auth_type;

if (auth_type == AUTHTYPE_BASIC) {
std::string userpw_str =
m_rt_opts.auth.username + ":" + m_rt_opts.auth.password;
Aws::Utils::Array< unsigned char > userpw_arr(
Expand All @@ -442,17 +452,25 @@ OpenSearchCommunication::IssueRequest(
Aws::String hashed_userpw =
Aws::Utils::HashingUtils::Base64Encode(userpw_arr);
request->SetAuthorization("Basic " + hashed_userpw);
} else if (m_rt_opts.auth.auth_type == AUTHTYPE_IAM) {
}

else if (auth_type == AUTHTYPE_IAM) {
std::shared_ptr< Aws::Auth::ProfileConfigFileAWSCredentialsProvider >
credential_provider = Aws::MakeShared<
Aws::Auth::ProfileConfigFileAWSCredentialsProvider >(
ALLOCATION_TAG.c_str(), ESODBC_PROFILE_NAME.c_str());
CREDENTIALS_PROVIDER_ALLOCATION_TAG.c_str(),
CREDENTIALS_PROFILE.c_str());

const std::string& service_name =
is_aoss_serverless
? SERVICE_NAME_AOSS_SERVERLESS
: SERVICE_NAME_DEFAULT;

Aws::Client::AWSAuthV4Signer signer(credential_provider,
SERVICE_NAME.c_str(),
service_name.c_str(),
m_rt_opts.auth.region.c_str());

if (m_rt_opts.auth.tunnel_host.length() > 0) {
if (!m_rt_opts.auth.tunnel_host.empty()) {
request->SetHeaderValue("host",
Aws::Http::URI(m_rt_opts.auth.tunnel_host.c_str())
.GetAuthority()
Expand All @@ -475,7 +493,7 @@ bool OpenSearchCommunication::IsSQLPluginEnabled(std::shared_ptr< ErrorDetails >

/**
* @brief Queries server to determine SQL plugin availability.
*
*
* @return true : Successfully queried server for SQL plugin
* @return false : Failed to query server, no plugin available, exception was caught
*/
Expand Down Expand Up @@ -548,26 +566,37 @@ bool OpenSearchCommunication::CheckSQLPluginAvailability() {
}

bool OpenSearchCommunication::EstablishConnection() {
// Generate HttpClient Connection class if it does not exist

LogMsg(OPENSEARCH_ALL, "Attempting to establish DB connection.");

// Generate HttpClient Connection class if it does not exist
if (!m_http_client) {
InitializeConnection();
}

// check if the endpoint is initialized
if (sql_endpoint.empty()) {
SetSqlEndpoint();
// Set whether the connection is to OpenSearch serverless cluster.
SetIsAossServerless();

// Set the SQL endpoint to connect to. If this is a serverless connection,
// the SQL endpoint is always set correctly; if not, the endpoint is
// determined by sending a request to OpenSearch, which may result in an
// error.
SetSqlEndpoint();

if (is_aoss_serverless && (sql_endpoint == SQL_ENDPOINT_ERROR)) {
LogMsg(OPENSEARCH_ERROR, m_error_message.c_str());
return false;
}

// Check whether SQL plugin has been installed and enabled in the
// OpenSearch server since the SQL plugin is a prerequisite to
// use this driver.
if((sql_endpoint != SQL_ENDPOINT_ERROR_STR) && CheckSQLPluginAvailability()) {
return true;
if(!CheckSQLPluginAvailability()) {
LogMsg(OPENSEARCH_ERROR, m_error_message.c_str());
return false;
}
currantw marked this conversation as resolved.
Show resolved Hide resolved

LogMsg(OPENSEARCH_ERROR, m_error_message.c_str());
return false;
return true;
}

std::vector< std::string > OpenSearchCommunication::GetColumnsWithSelectQuery(
Expand Down Expand Up @@ -929,7 +958,8 @@ std::string OpenSearchCommunication::GetServerVersion() {
/**
* @brief Queries supplied URL to validate Server Distribution. Maintains
* backwards compatibility with opendistro distribution.
*
* Not compatible with OpenSearch Serverless.
*
* @return std::string : Server distribution name, returns "" on error
*/
std::string OpenSearchCommunication::GetServerDistribution() {
Expand Down Expand Up @@ -1046,17 +1076,36 @@ std::string OpenSearchCommunication::GetClusterName() {
}

/**
* @brief Sets URL endpoint for SQL plugin. On failure to
* determine appropriate endpoint, value is set to SQL_ENDPOINT_ERROR_STR
*
* @brief Sets URL endpoint for the SQL plugin.
* Sets it to SQL_ENDPOINT_ERROR if an appropriate
* endpoint could not be determined.
*/
void OpenSearchCommunication::SetSqlEndpoint() {

// Serverless Elasticsearch is not supported.
if (is_aoss_serverless) {
sql_endpoint = SQL_ENDPOINT_OPENSEARCH;
return;
}

std::string distribution = GetServerDistribution();
if (distribution.empty()) {
sql_endpoint = SQL_ENDPOINT_ERROR_STR;
} else if (distribution.compare("opensearch") == 0) {
sql_endpoint = "/_plugins/_sql";
sql_endpoint = SQL_ENDPOINT_ERROR;
} else if (distribution == DISTRIBUTION_OPENSEARCH) {
sql_endpoint = SQL_ENDPOINT_OPENSEARCH;
} else {
sql_endpoint = "/_opendistro/_sql";
sql_endpoint = SQL_ENDPOINT_ELASTICSEARCH;
}
}

/**
* @brief Sets flag indicating whether this is
* connecting to an OpenSearch Serverless cluster.
*/
void OpenSearchCommunication::SetIsAossServerless() {

// Treat the connection as serverless if the server URL corresponds to
// Amazon OpenSearch Serverless. Limitation: does not support serverless
// with proxy server URL.
is_aoss_serverless = m_rt_opts.conn.server.find("aoss.amazonaws.com") != std::string::npos;
}
5 changes: 4 additions & 1 deletion src/sqlodbc/opensearch_communication.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ class OpenSearchCommunication {
void StopResultRetrieval();
std::vector< std::string > GetColumnsWithSelectQuery(
const std::string table_name);
void SetSqlEndpoint();

// the endpoint is set according to distribution (ES/OpenSearch)
std::string sql_endpoint;
Expand All @@ -82,13 +81,17 @@ class OpenSearchCommunication {
ConnErrorType error_type);
void SetErrorDetails(ErrorDetails details);

void SetIsAossServerless();
void SetSqlEndpoint();

// TODO #35 - Go through and add error messages on exit conditions
std::string m_error_message;
const std::vector< std::string > m_supported_client_encodings = {"UTF8"};

ConnStatusType m_status;
ConnErrorType m_error_type;
std::shared_ptr< ErrorDetails > m_error_details;
bool is_aoss_serverless;
bool m_valid_connection_options;
bool m_is_retrieving;
OpenSearchResultQueue m_result_queue;
Expand Down
4 changes: 2 additions & 2 deletions src/sqlodbc/opensearch_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ int LIBOPENSEARCH_connect(ConnectionClass *self) {
rt_opts.auth.tunnel_host.assign(self->connInfo.tunnel_host);

// Encryption
rt_opts.crypt.verify_server = (self->connInfo.verify_server == 1);
rt_opts.crypt.use_ssl = (self->connInfo.use_ssl == 1);
rt_opts.crypt.verify_server = (self->connInfo.verify_server == '1');
rt_opts.crypt.use_ssl = (self->connInfo.use_ssl == '1');

void *opensearchconn = OpenSearchConnectDBParams(rt_opts, FALSE, OPTION_COUNT);
if (opensearchconn == NULL) {
Expand Down
Loading