Skip to content

Commit

Permalink
Issue #70 - Add Serverless Support (#73)
Browse files Browse the repository at this point in the history
* Initial support for OpenSearch Serverless, based simply on parsing URL

---------

Signed-off-by: currantw <[email protected]>
  • Loading branch information
currantw authored Nov 21, 2024
1 parent edd708a commit 1ac3e6d
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 58 deletions.
9 changes: 5 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ lib64/*
*.tlog
include/*
packages/*
# Visual Studio
.vs/*
# Visual Studio Code
.vscode/*
src/psqlodbc/psqlodbcBuilder/x64_*
src/*/x64
src/*/Win32
Expand Down Expand Up @@ -60,3 +56,8 @@ CTestTestfile.cmake
/src/PowerBIConnector/obj/
/src/PowerBIConnector/.vs/
src/vcpkg_installed/

# IDEs
.vs/*
.vscode/*
.idea/*
4 changes: 2 additions & 2 deletions docs/dev/BUILD_INSTRUCTIONS.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ See [run_tests.md](./run_tests.md)

**BUILD_WITH_TESTS**

(Defaults to ON) If disabled, all tests and and test dependencies will be excluded from build which will optimize the installer package size. This option can set with the command line (using `-D`).
(Defaults to ON) If disabled, all tests and test dependencies will be excluded from build which will optimize the installer package size. This option can set with the command line (using `-D`).

### Working With SSL/TLS

Expand All @@ -119,7 +119,7 @@ If you plan to use OpenSearch Dashboards, as suggested for this project, you mus

### Setting up a DSN

A **D**ata **S**ouce **N**ame is used to store driver information in the system. By storing the information in the system, the information does not need to be specified each time the driver connects.
A **D**ata **S**ource **N**ame is used to store driver information in the system. By storing the information in the system, the information does not need to be specified each time the driver connects.

#### Windows

Expand Down
5 changes: 3 additions & 2 deletions src/TestRunner/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
PERFORMANCE_INFO = "performance_info"
PERFORMANCE_RESULTS = "performance_results"
EXCLUDE_EXTENSION_LIST = (
".py", ".c", ".cmake", ".log",
".py", ".pyc", ".c", ".cmake", ".log",
".pdb", ".dll", ".sln", ".vcxproj", ".user",
".tlog", ".lastbuildstate", ".filters",
".obj", ".exp", ".lib", ".h", ".cpp", ".ilk")
".obj", ".o", ".d", ".exp", ".lib", ".h",
".cpp", ".ilk")
total_failures = 0
SYNC_START = "%%__PARSE__SYNC__START__%%"
SYNC_SEP = "%%__SEP__%%"
Expand Down
47 changes: 34 additions & 13 deletions src/sqlodbc/dlg_specific.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,41 @@ void makeConnectString(char *connect_string, const ConnInfo *ci, UWORD len) {
encode(ci->password, encoded_item, sizeof(encoded_item));
/* fundamental info */
nlen = MAX_CONNECT_STRING;

const char* connect_format_string =
"%s=%s;"
INI_SERVER "=%s;"
"database=OpenSearch;"
INI_PORT "=%s;"
INI_USERNAME_ABBR "=%s;"
INI_PASSWORD_ABBR "=%s;"
INI_AUTH_MODE "=%s;"
INI_REGION "=%s;"
INI_TUNNEL_HOST "=%s;"
INI_SSL_USE "=%d;"
INI_SSL_HOST_VERIFY "=%d;"
INI_LOG_LEVEL "=%d;"
INI_LOG_OUTPUT "=%s;"
INI_TIMEOUT "=%s;"
INI_FETCH_SIZE "=%s;";

olen = snprintf(
connect_string, nlen,
"%s=%s;" INI_SERVER
"=%s;"
"database=OpenSearch;" INI_PORT "=%s;" INI_USERNAME_ABBR
"=%s;" INI_PASSWORD_ABBR "=%s;" INI_AUTH_MODE "=%s;" INI_REGION
"=%s;" INI_TUNNEL_HOST "=%s;" INI_SSL_USE "=%d;" INI_SSL_HOST_VERIFY
"=%d;" INI_LOG_LEVEL "=%d;" INI_LOG_OUTPUT "=%s;" INI_TIMEOUT "=%s;"
INI_FETCH_SIZE "=%s;",
connect_format_string,
got_dsn ? "DSN" : "DRIVER", got_dsn ? ci->dsn : ci->drivername,
ci->server, ci->port, ci->username, encoded_item, ci->authtype,
ci->region, ci->tunnel_host, (int)ci->use_ssl, (int)ci->verify_server,
(int)ci->drivers.loglevel, ci->drivers.output_dir,
ci->response_timeout, ci->fetch_size);
ci->server,
ci->port,
ci->username,
encoded_item,
ci->authtype,
ci->region,
ci->tunnel_host,
(int)ci->use_ssl,
(int)ci->verify_server,
(int)ci->drivers.loglevel,
ci->drivers.output_dir,
ci->response_timeout,
ci->fetch_size);
if (olen < 0 || olen >= nlen) {
connect_string[0] = '\0';
return;
Expand Down Expand Up @@ -141,7 +162,7 @@ static void getCiDefaults(ConnInfo *ci) {
strncpy(ci->port, DEFAULT_PORT, SMALL_REGISTRY_LEN);
strncpy(ci->response_timeout, DEFAULT_RESPONSE_TIMEOUT_STR,
SMALL_REGISTRY_LEN);
strncpy(ci->fetch_size, DEFAULT_FETCH_SIZE_STR,
strncpy(ci->fetch_size, DEFAULT_FETCH_SIZE,
SMALL_REGISTRY_LEN);
strncpy(ci->authtype, DEFAULT_AUTHTYPE, MEDIUM_REGISTRY_LEN);
if (ci->password.name != NULL)
Expand Down Expand Up @@ -455,7 +476,7 @@ void CC_conninfo_init(ConnInfo *conninfo, UInt4 option) {
strncpy(conninfo->port, DEFAULT_PORT, SMALL_REGISTRY_LEN);
strncpy(conninfo->response_timeout, DEFAULT_RESPONSE_TIMEOUT_STR,
SMALL_REGISTRY_LEN);
strncpy(conninfo->fetch_size, DEFAULT_FETCH_SIZE_STR,
strncpy(conninfo->fetch_size, DEFAULT_FETCH_SIZE,
SMALL_REGISTRY_LEN);
strncpy(conninfo->authtype, DEFAULT_AUTHTYPE, MEDIUM_REGISTRY_LEN);
if (conninfo->password.name != NULL)
Expand Down
3 changes: 1 addition & 2 deletions src/sqlodbc/dlg_specific.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ extern "C" {
#define INI_TIMEOUT "responseTimeout"
#define INI_FETCH_SIZE "fetchSize"

#define DEFAULT_FETCH_SIZE -1
#define DEFAULT_FETCH_SIZE_STR "-1"
#define DEFAULT_FETCH_SIZE "-1"
#define DEFAULT_RESPONSE_TIMEOUT 10 // Seconds
#define DEFAULT_RESPONSE_TIMEOUT_STR "10"
#define DEFAULT_AUTHTYPE "NONE"
Expand Down
113 changes: 81 additions & 32 deletions src/sqlodbc/opensearch_communication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,20 @@
#include <aws/core/http/HttpClient.h>
// clang-format on

#define SQL_ENDPOINT_ERROR_STR "Error"
static const std::string SQL_ENDPOINT_OPENSEARCH = "/_plugins/_sql";
static const std::string SQL_ENDPOINT_ELASTICSEARCH = "/_opendistro/_sql";
static const std::string SQL_ENDPOINT_ERROR = "Error";

static const std::string SERVICE_NAME_DEFAULT = "es";
static const std::string SERVICE_NAME_AOSS_SERVERLESS = "aoss";

static const std::string CREDENTIALS_PROFILE = "opensearchodbc";
static const std::string CREDENTIALS_PROVIDER_ALLOCATION_TAG =
"CREDENTIAL_PROVIDER";

static const std::string DISTRIBUTION_OPENSEARCH = "opensearch";

static const std::string ctype = "application/json";
static const std::string ALLOCATION_TAG = "AWS_SIGV4_AUTH";
static const std::string SERVICE_NAME = "es";
static const std::string ESODBC_PROFILE_NAME = "opensearchodbc";
static const std::string ERROR_MSG_PREFIX =
"[OpenSearch][SQL ODBC Driver][SQL Plugin] ";
static const std::string JSON_SCHEMA =
Expand Down Expand Up @@ -346,7 +354,7 @@ bool OpenSearchCommunication::CheckConnectionOptions() {
SetErrorDetails("Auth error", m_error_message,
ConnErrorType::CONN_ERROR_INVALID_AUTH);
}
} else if (m_rt_opts.conn.server == "") {
} else if (m_rt_opts.conn.server.empty()) {
m_error_message = "Host connection option was not specified.";
SetErrorDetails("Connection error", m_error_message,
ConnErrorType::CONN_ERROR_UNABLE_TO_ESTABLISH);
Expand All @@ -364,14 +372,14 @@ bool OpenSearchCommunication::CheckConnectionOptions() {
ConnErrorType::CONN_ERROR_UNABLE_TO_ESTABLISH);
}

if (m_error_message != "") {
if (!m_error_message.empty()) {
LogMsg(OPENSEARCH_ERROR, m_error_message.c_str());
m_valid_connection_options = false;
return false;
} else {
LogMsg(OPENSEARCH_DEBUG, "Required connection option are valid.");
m_valid_connection_options = true;
}

LogMsg(OPENSEARCH_DEBUG, "Required connection option are valid.");
m_valid_connection_options = true;
return m_valid_connection_options;
}

Expand Down Expand Up @@ -400,7 +408,7 @@ OpenSearchCommunication::IssueRequest(
const std::string& fetch_size, const std::string& cursor) {
// Generate http request
Aws::Http::URI host(m_rt_opts.conn.server.c_str());
if (m_rt_opts.conn.port.length() > 0) {
if (!m_rt_opts.conn.port.empty()) {
host.SetPort((uint16_t) atoi(m_rt_opts.conn.port.c_str()));
}
host.SetPath(endpoint.c_str());
Expand Down Expand Up @@ -433,7 +441,9 @@ OpenSearchCommunication::IssueRequest(
}

// Handle authentication
if (m_rt_opts.auth.auth_type == AUTHTYPE_BASIC) {
const std::string& auth_type = m_rt_opts.auth.auth_type;

if (auth_type == AUTHTYPE_BASIC) {
std::string userpw_str =
m_rt_opts.auth.username + ":" + m_rt_opts.auth.password;
Aws::Utils::Array< unsigned char > userpw_arr(
Expand All @@ -442,17 +452,25 @@ OpenSearchCommunication::IssueRequest(
Aws::String hashed_userpw =
Aws::Utils::HashingUtils::Base64Encode(userpw_arr);
request->SetAuthorization("Basic " + hashed_userpw);
} else if (m_rt_opts.auth.auth_type == AUTHTYPE_IAM) {
}

else if (auth_type == AUTHTYPE_IAM) {
std::shared_ptr< Aws::Auth::ProfileConfigFileAWSCredentialsProvider >
credential_provider = Aws::MakeShared<
Aws::Auth::ProfileConfigFileAWSCredentialsProvider >(
ALLOCATION_TAG.c_str(), ESODBC_PROFILE_NAME.c_str());
CREDENTIALS_PROVIDER_ALLOCATION_TAG.c_str(),
CREDENTIALS_PROFILE.c_str());

const std::string& service_name =
is_aoss_serverless
? SERVICE_NAME_AOSS_SERVERLESS
: SERVICE_NAME_DEFAULT;

Aws::Client::AWSAuthV4Signer signer(credential_provider,
SERVICE_NAME.c_str(),
service_name.c_str(),
m_rt_opts.auth.region.c_str());

if (m_rt_opts.auth.tunnel_host.length() > 0) {
if (!m_rt_opts.auth.tunnel_host.empty()) {
request->SetHeaderValue("host",
Aws::Http::URI(m_rt_opts.auth.tunnel_host.c_str())
.GetAuthority()
Expand All @@ -475,7 +493,7 @@ bool OpenSearchCommunication::IsSQLPluginEnabled(std::shared_ptr< ErrorDetails >

/**
* @brief Queries server to determine SQL plugin availability.
*
*
* @return true : Successfully queried server for SQL plugin
* @return false : Failed to query server, no plugin available, exception was caught
*/
Expand Down Expand Up @@ -548,26 +566,37 @@ bool OpenSearchCommunication::CheckSQLPluginAvailability() {
}

bool OpenSearchCommunication::EstablishConnection() {
// Generate HttpClient Connection class if it does not exist

LogMsg(OPENSEARCH_ALL, "Attempting to establish DB connection.");

// Generate HttpClient Connection class if it does not exist
if (!m_http_client) {
InitializeConnection();
}

// check if the endpoint is initialized
if (sql_endpoint.empty()) {
SetSqlEndpoint();
// Set whether the connection is to OpenSearch serverless cluster.
SetIsAossServerless();

// Set the SQL endpoint to connect to. If this is a serverless connection,
// the SQL endpoint is always set correctly; if not, the endpoint is
// determined by sending a request to OpenSearch, which may result in an
// error.
SetSqlEndpoint();

if (is_aoss_serverless && (sql_endpoint == SQL_ENDPOINT_ERROR)) {
LogMsg(OPENSEARCH_ERROR, m_error_message.c_str());
return false;
}

// Check whether SQL plugin has been installed and enabled in the
// OpenSearch server since the SQL plugin is a prerequisite to
// use this driver.
if((sql_endpoint != SQL_ENDPOINT_ERROR_STR) && CheckSQLPluginAvailability()) {
return true;
if(!CheckSQLPluginAvailability()) {
LogMsg(OPENSEARCH_ERROR, m_error_message.c_str());
return false;
}

LogMsg(OPENSEARCH_ERROR, m_error_message.c_str());
return false;
return true;
}

std::vector< std::string > OpenSearchCommunication::GetColumnsWithSelectQuery(
Expand Down Expand Up @@ -929,7 +958,8 @@ std::string OpenSearchCommunication::GetServerVersion() {
/**
* @brief Queries supplied URL to validate Server Distribution. Maintains
* backwards compatibility with opendistro distribution.
*
* Not compatible with OpenSearch Serverless.
*
* @return std::string : Server distribution name, returns "" on error
*/
std::string OpenSearchCommunication::GetServerDistribution() {
Expand Down Expand Up @@ -1046,17 +1076,36 @@ std::string OpenSearchCommunication::GetClusterName() {
}

/**
* @brief Sets URL endpoint for SQL plugin. On failure to
* determine appropriate endpoint, value is set to SQL_ENDPOINT_ERROR_STR
*
* @brief Sets URL endpoint for the SQL plugin.
* Sets it to SQL_ENDPOINT_ERROR if an appropriate
* endpoint could not be determined.
*/
void OpenSearchCommunication::SetSqlEndpoint() {

// Serverless Elasticsearch is not supported.
if (is_aoss_serverless) {
sql_endpoint = SQL_ENDPOINT_OPENSEARCH;
return;
}

std::string distribution = GetServerDistribution();
if (distribution.empty()) {
sql_endpoint = SQL_ENDPOINT_ERROR_STR;
} else if (distribution.compare("opensearch") == 0) {
sql_endpoint = "/_plugins/_sql";
sql_endpoint = SQL_ENDPOINT_ERROR;
} else if (distribution == DISTRIBUTION_OPENSEARCH) {
sql_endpoint = SQL_ENDPOINT_OPENSEARCH;
} else {
sql_endpoint = "/_opendistro/_sql";
sql_endpoint = SQL_ENDPOINT_ELASTICSEARCH;
}
}

/**
* @brief Sets flag indicating whether this is
* connecting to an OpenSearch Serverless cluster.
*/
void OpenSearchCommunication::SetIsAossServerless() {

// Treat the connection as serverless if the server URL corresponds to
// Amazon OpenSearch Serverless. Limitation: does not support serverless
// with proxy server URL.
is_aoss_serverless = m_rt_opts.conn.server.find("aoss.amazonaws.com") != std::string::npos;
}
5 changes: 4 additions & 1 deletion src/sqlodbc/opensearch_communication.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ class OpenSearchCommunication {
void StopResultRetrieval();
std::vector< std::string > GetColumnsWithSelectQuery(
const std::string table_name);
void SetSqlEndpoint();

// the endpoint is set according to distribution (ES/OpenSearch)
std::string sql_endpoint;
Expand All @@ -82,13 +81,17 @@ class OpenSearchCommunication {
ConnErrorType error_type);
void SetErrorDetails(ErrorDetails details);

void SetIsAossServerless();
void SetSqlEndpoint();

// TODO #35 - Go through and add error messages on exit conditions
std::string m_error_message;
const std::vector< std::string > m_supported_client_encodings = {"UTF8"};

ConnStatusType m_status;
ConnErrorType m_error_type;
std::shared_ptr< ErrorDetails > m_error_details;
bool is_aoss_serverless;
bool m_valid_connection_options;
bool m_is_retrieving;
OpenSearchResultQueue m_result_queue;
Expand Down
4 changes: 2 additions & 2 deletions src/sqlodbc/opensearch_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ int LIBOPENSEARCH_connect(ConnectionClass *self) {
rt_opts.auth.tunnel_host.assign(self->connInfo.tunnel_host);

// Encryption
rt_opts.crypt.verify_server = (self->connInfo.verify_server == 1);
rt_opts.crypt.use_ssl = (self->connInfo.use_ssl == 1);
rt_opts.crypt.verify_server = (self->connInfo.verify_server == '1');
rt_opts.crypt.use_ssl = (self->connInfo.use_ssl == '1');

void *opensearchconn = OpenSearchConnectDBParams(rt_opts, FALSE, OPTION_COUNT);
if (opensearchconn == NULL) {
Expand Down

0 comments on commit 1ac3e6d

Please sign in to comment.