diff --git a/.factory/automation.yml b/.factory/automation.yml index b455725cfe..ed0fa52b9a 100644 --- a/.factory/automation.yml +++ b/.factory/automation.yml @@ -95,7 +95,8 @@ build: source tool/test/start-cloud-servers.sh 3 && # use source to receive export vars bazel test //rust/tests --test_output=streamed --test_env=ROOT_CA=$ROOT_CA --test_arg=-- \ --test_arg=integration::queries::cloud \ - --test_arg=integration::runtimes && + --test_arg=integration::runtimes \ + --test_arg=integration::network && export CLOUD_FAILED= || export CLOUD_FAILED=1 tool/test/stop-cloud-servers.sh if [[ -n "$CLOUD_FAILED" ]]; then exit 1; fi @@ -272,7 +273,7 @@ build: tool/test/stop-cloud-servers.sh exit $TEST_SUCCESS - test-python-integration-core: + test-python-integration: image: vaticle-ubuntu-22.04 dependencies: - build @@ -285,29 +286,18 @@ build: bazel run @vaticle_dependencies//distribution/artifact:create-netrc tool/test/start-core-server.sh && - bazel test //python/tests/integration:test_connection --test_output=streamed --jobs=1 && bazel test //python/tests/integration:test_stream --test_output=streamed --jobs=1 && - export TEST_SUCCESS=0 || export TEST_SUCCESS=1 + export CORE_FAILED= || export CORE_FAILED=1 tool/test/stop-core-server.sh - exit $TEST_SUCCESS + if [[ -n "$CORE_FAILED" ]]; then exit 1; fi -# test-python-integration-cloud-failover: -# machine: 4-core-16-gb -# image: vaticle-ubuntu-22.04 -# dependencies: -# - build -# type: foreground -# command: | -# export PATH="$HOME/.local/bin:$PATH" -# sudo apt-get update -# sudo apt install python3-pip -y -# python3 -m pip install -U pip -# python3 -m pip install -r python/requirements_dev.txt -# export ARTIFACT_USERNAME=$REPO_TYPEDB_USERNAME -# export ARTIFACT_PASSWORD=$REPO_TYPEDB_PASSWORD -# bazel run @vaticle_dependencies//tool/bazelinstall:remote_cache_setup.sh -# bazel run @vaticle_dependencies//distribution/artifact:create-netrc -# bazel test //python/tests/integration:test_cloud_failover --test_output=errors + source tool/test/start-cloud-servers.sh 3 && # use source to receive export vars + bazel test //python/tests/integration:test_connection --test_env=ROOT_CA=$ROOT_CA --test_output=streamed --jobs=1 && + # TODO currently broken test + # bazel test //python/tests/integration:test_cloud_failover --test_env=ROOT_CA=$ROOT_CA --test_output=streamed --jobs=1 && + export CLOUD_FAILED= || export CLOUD_FAILED=1 + tool/test/stop-cloud-servers.sh + if [[ -n "$CLOUD_FAILED" ]]; then exit 1; fi test-nodejs-integration: image: vaticle-ubuntu-22.04 @@ -322,29 +312,18 @@ build: cp -rL bazel-bin/nodejs/dist nodejs/. tool/test/start-core-server.sh && node nodejs/test/integration/test-concept.js && - node nodejs/test/integration/test-connection.js && node nodejs/test/integration/test-query.js && - export TEST_SUCCESS=0 || export TEST_SUCCESS=1 + node nodejs/test/integration/test-connection-core.js && + export CORE_FAILED= || export CORE_FAILED=1 tool/test/stop-core-server.sh - exit $TEST_SUCCESS + if [[ -n "$CORE_FAILED" ]]; then exit 1; fi - test-nodejs-cloud-failover: - machine: 4-core-16-gb - image: vaticle-ubuntu-22.04 - dependencies: - - build - command: | - export ARTIFACT_USERNAME=$REPO_TYPEDB_USERNAME - export ARTIFACT_PASSWORD=$REPO_TYPEDB_PASSWORD - bazel run @vaticle_dependencies//distribution/artifact:create-netrc - bazel build //nodejs/... - cp -rL bazel-bin/nodejs/node_modules nodejs/. - cp -rL bazel-bin/nodejs/dist nodejs/. source tool/test/start-cloud-servers.sh 3 && # use source to receive export vars + node nodejs/test/integration/test-connection-cloud.js && node nodejs/test/integration/test-cloud-failover.js && - export TEST_SUCCESS=0 || export TEST_SUCCESS=1 + export CLOUD_FAILED= || export CLOUD_FAILED=1 tool/test/stop-cloud-servers.sh - exit $TEST_SUCCESS + if [[ -n "$CLOUD_FAILED" ]]; then exit 1; fi test-nodejs-behaviour-core: image: vaticle-ubuntu-22.04 @@ -385,7 +364,7 @@ build: tool/test/stop-cloud-servers.sh exit $TEST_SUCCESS - test-cpp-integration-core: + test-cpp-integration: image: vaticle-ubuntu-22.04 dependencies: - build @@ -398,10 +377,16 @@ build: bazel run @vaticle_dependencies//tool/bazelinstall:remote_cache_setup.sh bazel run @vaticle_dependencies//distribution/artifact:create-netrc tool/test/start-core-server.sh && - bazel test //cpp/test/integration/... --test_output=streamed --jobs=1 && - export TEST_SUCCESS=0 || export TEST_SUCCESS=1 + bazel test //cpp/test/integration:test-cpp-driver-core --test_output=streamed --jobs=1 && + export CORE_FAILED= || export CORE_FAILED=1 tool/test/stop-core-server.sh - exit $TEST_SUCCESS + if [[ -n "$CORE_FAILED" ]]; then exit 1; fi + + source tool/test/start-cloud-servers.sh 3 && # use source to receive export vars + bazel test //cpp/test/integration:test-cpp-driver-cloud --test_env=ROOT_CA=$ROOT_CA --test_output=streamed --jobs=1 && + export CLOUD_FAILED= || export CLOUD_FAILED=1 + tool/test/stop-cloud-servers.sh + if [[ -n "$CLOUD_FAILED" ]]; then exit 1; fi test-cpp-behaviour-core: image: vaticle-ubuntu-22.04 @@ -448,7 +433,7 @@ build: tool/test/stop-cloud-servers.sh exit $TEST_SUCCESS - test-csharp-integration-core: + test-csharp-integration: image: vaticle-ubuntu-22.04 dependencies: - build @@ -463,25 +448,16 @@ build: bazel test //csharp/Test/Integration/Data/... --test_output=streamed --jobs=1 && bazel test //csharp/Test/Integration/Marshal/... --test_output=streamed --jobs=1 && .factory/test-core.sh //csharp/Test/Integration/Examples/... --test_output=streamed --jobs=1 && - export TEST_SUCCESS=0 || export TEST_SUCCESS=1 + export CORE_FAILED= || export CORE_FAILED=1 tool/test/stop-core-server.sh - exit $TEST_SUCCESS + if [[ -n "$CORE_FAILED" ]]; then exit 1; fi - test-csharp-integration-cloud: - image: vaticle-ubuntu-22.04 - dependencies: - - build - type: foreground - command: | - export ARTIFACT_USERNAME=$REPO_TYPEDB_USERNAME - export ARTIFACT_PASSWORD=$REPO_TYPEDB_PASSWORD - bazel run @vaticle_dependencies//tool/bazelinstall:remote_cache_setup.sh - bazel run @vaticle_dependencies//distribution/artifact:create-netrc - source tool/test/start-cloud-servers.sh && + source tool/test/start-cloud-servers.sh 3 && # use source to receive export vars + bazel test //csharp/Test/Integration/Network/... --test_env=ROOT_CA=$ROOT_CA --test_output=streamed --jobs=1 && .factory/test-cloud.sh //csharp/Test/Integration/Examples/... --test_env=ROOT_CA=$ROOT_CA --test_output=streamed --jobs=1 && - export TEST_SUCCESS=0 || export TEST_SUCCESS=1 + export CLOUD_FAILED= || export CLOUD_FAILED=1 tool/test/stop-cloud-servers.sh - exit $TEST_SUCCESS + if [[ -n "$CLOUD_FAILED" ]]; then exit 1; fi test-csharp-behaviour-core: image: vaticle-ubuntu-22.04 diff --git a/c/docs/connection/connection.adoc b/c/docs/connection/connection.adoc index 4536233ae9..058c5e9323 100644 --- a/c/docs/connection/connection.adoc +++ b/c/docs/connection/connection.adoc @@ -102,6 +102,34 @@ a| `credential` a| The ``Credential`` to connect with a| `const struct Credentia .Returns `struct Connection*` +[#_connection_open_cloud_translated] +==== connection_open_cloud_translated + +[source,cpp] +---- +struct Connection* connection_open_cloud_translated(const char*const* advertised_addresses, const char*const* translated_addresses, const struct Credential* credential) +---- + + + +Open a TypeDB Driver to TypeDB Cloud server(s), using provided address translation, with the provided credential. + + +[caption=""] +.Input parameters +[cols=",,"] +[options="header"] +|=== +|Name |Description |Type +a| `advertised_addresses` a| A null-terminated array holding the address(es) the TypeDB server(s) are configured to advertise a| `const char*const*` +a| `translated_addresses` a| A null-terminated array holding the address(es) of the TypeDB server(s) the driver will connect to. This array _must_ have the same length as ``advertised_addresses`` a| `const char*const*` +a| `credential` a| The ``Credential`` to connect with a| `const struct Credential*` +|=== + +[caption=""] +.Returns +`struct Connection*` + [#_connection_open_core] ==== connection_open_core diff --git a/c/docs/connection/replica.adoc b/c/docs/connection/replica.adoc index 77cedf593f..f93370f3a4 100644 --- a/c/docs/connection/replica.adoc +++ b/c/docs/connection/replica.adoc @@ -31,17 +31,17 @@ Frees the native rust ``ReplicaInfo`` object .Returns `void` -[#_replica_info_get_address] -==== replica_info_get_address +[#_replica_info_get_server] +==== replica_info_get_server [source,cpp] ---- -char* replica_info_get_address(const struct ReplicaInfo* replica_info) +char* replica_info_get_server(const struct ReplicaInfo* replica_info) ---- -Retrieves the address of the server hosting this replica +The server hosting this replica [caption=""] .Returns diff --git a/c/src/connection.rs b/c/src/connection.rs index 6ccdfcfcef..f1b5babcf8 100644 --- a/c/src/connection.rs +++ b/c/src/connection.rs @@ -19,6 +19,7 @@ use std::{ffi::c_char, path::Path}; +use itertools::Itertools; use typedb_driver::{Connection, Credential}; use super::{ @@ -48,6 +49,24 @@ pub extern "C" fn connection_open_cloud( try_release(Connection::new_cloud(&addresses, borrow(credential).clone())) } +/// Open a TypeDB Driver to TypeDB Cloud server(s), using provided address translation, with +/// the provided credential. +/// +/// @param advertised_addresses A null-terminated array holding the address(es) the TypeDB server(s) +/// are configured to advertise +/// @param translated_addresses A null-terminated array holding the address(es) of the TypeDB server(s) +/// the driver will connect to. This array must have the same length as advertised_addresses +/// @param credential The Credential to connect with +#[no_mangle] +pub extern "C" fn connection_open_cloud_translated( + advertised_addresses: *const *const c_char, + translated_addresses: *const *const c_char, + credential: *const Credential, +) -> *mut Connection { + let addresses = string_array_view(advertised_addresses).zip_eq(string_array_view(translated_addresses)).collect(); + try_release(Connection::new_cloud_with_translation(addresses, borrow(credential).clone())) +} + /// Closes the driver. Before instantiating a new driver, the driver that’s currently open should first be closed. /// Closing a connction frees the underlying rust object. #[no_mangle] diff --git a/c/src/database.rs b/c/src/database.rs index aa65b984fb..4e3c77c0ce 100644 --- a/c/src/database.rs +++ b/c/src/database.rs @@ -107,10 +107,10 @@ pub extern "C" fn replica_info_drop(replica_info: *mut ReplicaInfo) { free(replica_info); } -/// Retrieves the address of the server hosting this replica +/// The server hosting this replica #[no_mangle] -pub extern "C" fn replica_info_get_address(replica_info: *const ReplicaInfo) -> *mut c_char { - release_string(borrow(replica_info).address.to_string()) +pub extern "C" fn replica_info_get_server(replica_info: *const ReplicaInfo) -> *mut c_char { + release_string(borrow(replica_info).server.to_string()) } /// Checks whether this is the primary replica of the raft cluster. diff --git a/c/swig/typedb_driver_csharp.swg b/c/swig/typedb_driver_csharp.swg index 9fc124eed8..1e2ae95417 100644 --- a/c/swig/typedb_driver_csharp.swg +++ b/c/swig/typedb_driver_csharp.swg @@ -191,7 +191,7 @@ %noexception user_get_username; %noexception user_get_password_expiry_seconds; -%noexception replica_info_get_address; +%noexception replica_info_get_server; %noexception replica_info_is_primary; %noexception replica_info_is_preferred; %noexception replica_info_get_term; @@ -620,27 +620,27 @@ %typemap( csin, pre=" - int arraySize = $csinput.Length; + int arraySize$csinput = $csinput.Length; global::System.IntPtr unmanaged$csinput = System.Runtime.InteropServices.Marshal.AllocHGlobal( - (arraySize + 1) * System.Runtime.InteropServices.Marshal.SizeOf()); + (arraySize$csinput + 1) * System.Runtime.InteropServices.Marshal.SizeOf()); unsafe { global::System.IntPtr* arrayPtr = (global::System.IntPtr*)unmanaged$csinput.ToPointer(); - for (int i = 0; i < arraySize; i++) + for (int i = 0; i < arraySize$csinput; i++) { arrayPtr[i] = global::System.Runtime.InteropServices.Marshal.StringToCoTaskMemAnsi($csinput[i]); } - arrayPtr[arraySize] = global::System.IntPtr.Zero; + arrayPtr[arraySize$csinput] = global::System.IntPtr.Zero; }", post=" unsafe { global::System.IntPtr* arrayPtr = (global::System.IntPtr*)unmanaged$csinput.ToPointer(); - for (int i = 0; i < arraySize; i++) + for (int i = 0; i < arraySize$csinput; i++) { global::System.Runtime.InteropServices.Marshal.FreeHGlobal(arrayPtr[i]); } diff --git a/c/swig/typedb_driver_java.swg b/c/swig/typedb_driver_java.swg index 0229b33011..37982673dd 100644 --- a/c/swig/typedb_driver_java.swg +++ b/c/swig/typedb_driver_java.swg @@ -136,7 +136,7 @@ %nojavaexception user_get_username; %nojavaexception user_get_password_expiry_seconds; -%nojavaexception replica_info_get_address; +%nojavaexception replica_info_get_server; %nojavaexception replica_info_is_primary; %nojavaexception replica_info_is_preferred; %nojavaexception replica_info_get_term; diff --git a/c/typedb_driver.i b/c/typedb_driver.i index cf4b8ebfdb..b49586ca75 100644 --- a/c/typedb_driver.i +++ b/c/typedb_driver.i @@ -343,6 +343,7 @@ void transaction_on_close_register(const Transaction* transaction, TransactionCa %newobject connection_open_core; %newobject connection_open_cloud; +%newobject connection_open_cloud_translated; %newobject credential_new; @@ -357,7 +358,7 @@ void transaction_on_close_register(const Transaction* transaction, TransactionCa %newobject database_get_primary_replica_info; %newobject database_get_replicas_info; -%newobject replica_info_get_address; +%newobject replica_info_get_server; %newobject replica_info_iterator_next; %newobject databases_all; diff --git a/cpp/docs/connection/Driver.adoc b/cpp/docs/connection/Driver.adoc index 18324459e7..c67f564232 100644 --- a/cpp/docs/connection/Driver.adoc +++ b/cpp/docs/connection/Driver.adoc @@ -44,12 +44,12 @@ Closes the driver. Before instantiating a new driver, the driver that’s curren driver.close() ---- -[#_static_Driver_TypeDBDrivercloudDriver___const_stdvector__stdstring____cloudAddresses__const_Credential__credential_] +[#_static_Driver_TypeDBDrivercloudDriver___const_stdvector__stdstring____addresses__const_Credential__credential_] ==== cloudDriver [source,cpp] ---- -static Driver TypeDB::Driver::cloudDriver(const std::vector< std::string >& cloudAddresses, const Credential& credential) +static Driver TypeDB::Driver::cloudDriver(const std::vector< std::string >& addresses, const Credential& credential) ---- @@ -63,7 +63,7 @@ Open a TypeDB Driver to TypeDB Cloud server(s) available at the provided address [options="header"] |=== |Name |Description |Type -a| `addresses` a| The address(es) of the TypeDB server(s) a| +a| `addresses` a| The address(es) of the TypeDB server(s) or translation map from addresses received from the TypeDB server(s) to addresses to be used by the driver for connection a| `const std::vector< std::string >&` a| `credential` a| The Credential to connect with a| `const Credential&` |=== @@ -78,12 +78,12 @@ a| `credential` a| The Credential to connect with a| `const Credential&` Driver::cloudDriver(addresses, credential); ---- -[#_static_Driver_TypeDBDrivercoreDriver___const_stdstring__coreAddress_] +[#_static_Driver_TypeDBDrivercoreDriver___const_stdstring__address_] ==== coreDriver [source,cpp] ---- -static Driver TypeDB::Driver::coreDriver(const std::string& coreAddress) +static Driver TypeDB::Driver::coreDriver(const std::string& address) ---- @@ -97,7 +97,7 @@ Open a TypeDB Driver to a TypeDB Core server available at the provided address. [options="header"] |=== |Name |Description |Type -a| `address` a| The address of the TypeDB server a| +a| `address` a| The address of the TypeDB server a| `const std::string&` |=== [caption=""] diff --git a/cpp/docs/connection/ReplicaInfo.adoc b/cpp/docs/connection/ReplicaInfo.adoc index 6cfb6ee7f9..28294967d0 100644 --- a/cpp/docs/connection/ReplicaInfo.adoc +++ b/cpp/docs/connection/ReplicaInfo.adoc @@ -8,53 +8,53 @@ The metadata and state of an individual raft replica of a database. // tag::methods[] -[#_stdstring_TypeDBReplicaInfoaddress___] -==== address +[#_bool_TypeDBReplicaInfoisPreferred___] +==== isPreferred [source,cpp] ---- -std::string TypeDB::ReplicaInfo::address() +bool TypeDB::ReplicaInfo::isPreferred() ---- -Retrieves the address of the server hosting this replica +Checks whether this is the preferred replica of the raft cluster. If true, Operations which can be run on any replica will prefer to use this replica. [caption=""] .Returns -`std::string` +`bool` -[#_bool_TypeDBReplicaInfoisPreferred___] -==== isPreferred +[#_bool_TypeDBReplicaInfoisPrimary___] +==== isPrimary [source,cpp] ---- -bool TypeDB::ReplicaInfo::isPreferred() +bool TypeDB::ReplicaInfo::isPrimary() ---- -Checks whether this is the preferred replica of the raft cluster. If true, Operations which can be run on any replica will prefer to use this replica. +Checks whether this is the primary replica of the raft cluster. [caption=""] .Returns `bool` -[#_bool_TypeDBReplicaInfoisPrimary___] -==== isPrimary +[#_stdstring_TypeDBReplicaInfoserver___] +==== server [source,cpp] ---- -bool TypeDB::ReplicaInfo::isPrimary() +std::string TypeDB::ReplicaInfo::server() ---- -Checks whether this is the primary replica of the raft cluster. +The server hosting this replica [caption=""] .Returns -`bool` +`std::string` [#_int64_t_TypeDBReplicaInfoterm___] ==== term diff --git a/cpp/include/typedb/connection/driver.hpp b/cpp/include/typedb/connection/driver.hpp index 79f93074b0..fdf84df209 100644 --- a/cpp/include/typedb/connection/driver.hpp +++ b/cpp/include/typedb/connection/driver.hpp @@ -24,6 +24,8 @@ #include "typedb/user/user_manager.hpp" #include +#include +#include // The namespace comment is needed to document enums. /** @@ -72,7 +74,7 @@ class Driver { * * @param address The address of the TypeDB server */ - static Driver coreDriver(const std::string& coreAddress); + static Driver coreDriver(const std::string& address); /** * Open a TypeDB Driver to TypeDB Cloud server(s) available at the provided addresses, using @@ -83,10 +85,12 @@ class Driver { * Driver::cloudDriver(addresses, credential); * * - * @param addresses The address(es) of the TypeDB server(s) + * @param addresses The address(es) of the TypeDB server(s) or translation map from addresses + * received from the TypeDB server(s) to addresses to be used by the driver for connection * @param credential The Credential to connect with */ - static Driver cloudDriver(const std::vector& cloudAddresses, const Credential& credential); + static Driver cloudDriver(const std::vector& addresses, const Credential& credential); + static Driver cloudDriver(const std::unordered_map& addressTranslation, const Credential& credential); Driver(const Driver&) = delete; Driver(Driver&& from) = default; diff --git a/cpp/include/typedb/database/database.hpp b/cpp/include/typedb/database/database.hpp index da00e24413..2c785311ac 100644 --- a/cpp/include/typedb/database/database.hpp +++ b/cpp/include/typedb/database/database.hpp @@ -39,9 +39,9 @@ class ReplicaInfo { ReplicaInfo& operator=(ReplicaInfo&&) = default; /** - * Retrieves the address of the server hosting this replica + * The server hosting this replica */ - std::string address(); + std::string server(); /** * Checks whether this is the primary replica of the raft cluster. diff --git a/cpp/lib/connection/driver.cpp b/cpp/lib/connection/driver.cpp index fb3629c935..4601386830 100644 --- a/cpp/lib/connection/driver.cpp +++ b/cpp/lib/connection/driver.cpp @@ -30,15 +30,15 @@ void Driver::initLogging() { _native::init_logging(); } -Driver Driver::coreDriver(const std::string& coreAddress) { - auto p = _native::connection_open_core(coreAddress.c_str()); +Driver Driver::coreDriver(const std::string& address) { + auto p = _native::connection_open_core(address.c_str()); DriverException::check_and_throw(); return Driver(p); } -Driver Driver::cloudDriver(const std::vector& cloudAddresses, const Credential& credential) { +Driver Driver::cloudDriver(const std::vector& addresses, const Credential& credential) { std::vector addressesNative; - for (auto& addr : cloudAddresses) + for (auto& addr : addresses) addressesNative.push_back(addr.c_str()); addressesNative.push_back(nullptr); auto p = _native::connection_open_cloud(addressesNative.data(), credential.getNative()); @@ -46,6 +46,24 @@ Driver Driver::cloudDriver(const std::vector& cloudAddresses, const return Driver(p); } +Driver Driver::cloudDriver(const std::unordered_map& addressTranslation, const Credential& credential) { + std::vector advertisedAddressesNative; + std::vector translatedAddressesNative; + for (auto& [advertised, translated] : addressTranslation) { + advertisedAddressesNative.push_back(advertised.c_str()); + translatedAddressesNative.push_back(translated.c_str()); + } + advertisedAddressesNative.push_back(nullptr); + translatedAddressesNative.push_back(nullptr); + auto p = _native::connection_open_cloud_translated( + advertisedAddressesNative.data(), + translatedAddressesNative.data(), + credential.getNative() + ); + DriverException::check_and_throw(); + return Driver(p); +} + Driver::Driver(_native::Connection* conn) noexcept : connectionNative(conn, _native::connection_close), databases(this->connectionNative.get()), diff --git a/cpp/lib/database/database.cpp b/cpp/lib/database/database.cpp index 045c5526be..22063d1153 100644 --- a/cpp/lib/database/database.cpp +++ b/cpp/lib/database/database.cpp @@ -31,9 +31,9 @@ namespace TypeDB { ReplicaInfo::ReplicaInfo(_native::ReplicaInfo* replicaInfoNative) : replicaInfoNative(replicaInfoNative, _native::replica_info_drop) {} -std::string ReplicaInfo::address() { +std::string ReplicaInfo::server() { CHECK_NATIVE(replicaInfoNative); - return Utils::stringFromNative(_native::replica_info_get_address(replicaInfoNative.get())); + return Utils::stringFromNative(_native::replica_info_get_server(replicaInfoNative.get())); } bool ReplicaInfo::isPrimary() { diff --git a/cpp/test/integration/BUILD b/cpp/test/integration/BUILD index 055ca63dc7..63c77a07bb 100644 --- a/cpp/test/integration/BUILD +++ b/cpp/test/integration/BUILD @@ -20,8 +20,23 @@ load("@vaticle_dependencies//builder/cpp:rules.bzl", "clang_format_test") load("//cpp:build_opts.bzl", "cxxopts") cc_test( - name = "test-cpp-driver", - srcs = ["test.cpp"], + name = "test-cpp-driver-core", + srcs = ["test_core.cpp"], + deps = [ + "//cpp:typedb-driver-cpp-import", + # External + "@gtest//:gtest", + "@gtest//:gtest_main" + ], + copts = cxxopts, + includes = ["include"], + env = {"RUST_BACKTRACE": "full"}, + linkstatic = True +) + +cc_test( + name = "test-cpp-driver-cloud", + srcs = ["test_cloud.cpp"], deps = [ "//cpp:typedb-driver-cpp-import", # External diff --git a/cpp/test/integration/test_cloud.cpp b/cpp/test/integration/test_cloud.cpp new file mode 100644 index 0000000000..9448a47632 --- /dev/null +++ b/cpp/test/integration/test_cloud.cpp @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "gtest/gtest.h" + +#include "typedb_driver.hpp" + +void delete_if_exists(const TypeDB::Driver& driver, const std::string& name) { + if (driver.databases.contains(name)) { + driver.databases.get(name).deleteDatabase(); + } +} + +TEST(TestConnection, TestAddressTranslation) { + std::string dbName = "hello_from_cpp"; + auto addressTranslation = std::unordered_map { + {"localhost:11729", "localhost:11729"}, + {"localhost:21729", "localhost:21729"}, + {"localhost:31729", "localhost:31729"}, + }; + auto credential = TypeDB::Credential("admin", "password", true, std::getenv("ROOT_CA")); + TypeDB::Driver driver = TypeDB::Driver::cloudDriver(addressTranslation, credential); + + delete_if_exists(driver, dbName); + driver.databases.create(dbName); + + auto sess = driver.session(dbName, TypeDB::SessionType::DATA); + auto tx = sess.transaction(TypeDB::TransactionType::WRITE); + + auto root = tx.concepts.getRootEntityType(); + int subtypeCount = 0; + for ([[maybe_unused]] auto& it : root->getSubtypes(tx)) + subtypeCount++; + ASSERT_EQ(1, subtypeCount); +} + diff --git a/cpp/test/integration/test.cpp b/cpp/test/integration/test_core.cpp similarity index 100% rename from cpp/test/integration/test.cpp rename to cpp/test/integration/test_core.cpp diff --git a/csharp/Api/Database/IDatabase.cs b/csharp/Api/Database/IDatabase.cs index 1a3d8b86fc..a3fca934bf 100644 --- a/csharp/Api/Database/IDatabase.cs +++ b/csharp/Api/Database/IDatabase.cs @@ -112,9 +112,9 @@ public interface IDatabase public interface IReplica { /** - * Retrieves the address of the server hosting this replica. + * The server hosting this replica. */ - public string Address { get; } + public string Server { get; } /** * The raft protocol ‘term’ of this replica. diff --git a/csharp/Connection/TypeDBDatabase.cs b/csharp/Connection/TypeDBDatabase.cs index 44a9dabd07..a4328d3ccb 100644 --- a/csharp/Connection/TypeDBDatabase.cs +++ b/csharp/Connection/TypeDBDatabase.cs @@ -169,7 +169,7 @@ public override string ToString() public class Replica : NativeObjectWrapper, IDatabase.IReplica { - private string? _address; + private string? _server; private long? _term; public Replica(Pinvoke.ReplicaInfo replicaInfo) @@ -186,9 +186,9 @@ public bool IsPreferred() return Pinvoke.typedb_driver.replica_info_is_preferred(NativeObject); } - public string Address + public string Server { - get { return _address ?? (_address = Pinvoke.typedb_driver.replica_info_get_address(NativeObject)); } + get { return _server ?? (_server = Pinvoke.typedb_driver.replica_info_get_server(NativeObject)); } } public long Term diff --git a/csharp/Connection/TypeDBDriver.cs b/csharp/Connection/TypeDBDriver.cs index d23c7cd920..acd44a947c 100644 --- a/csharp/Connection/TypeDBDriver.cs +++ b/csharp/Connection/TypeDBDriver.cs @@ -41,6 +41,10 @@ public TypeDBDriver(ICollection initAddresses, TypeDBCredential credenti : this(OpenCloud(initAddresses, credential)) {} + public TypeDBDriver(IDictionary addressTranslation, TypeDBCredential credential) + : this(OpenCloud(addressTranslation, credential)) + {} + private TypeDBDriver(Pinvoke.Connection connection) : base(connection) { @@ -72,6 +76,27 @@ private static Pinvoke.Connection OpenCloud(ICollection initAddresses, T } } + private static Pinvoke.Connection OpenCloud(IDictionary addressTranslation, TypeDBCredential credential) + { + try + { + string[] advertisedAddresses = new string[addressTranslation.Count]; + string[] translatedAddresses = new string[addressTranslation.Count]; + int index = 0; + foreach (KeyValuePair translation in addressTranslation) + { + advertisedAddresses[index] = translation.Key; + translatedAddresses[index] = translation.Value; + index++; + } + return Pinvoke.typedb_driver.connection_open_cloud_translated(advertisedAddresses, translatedAddresses, credential.NativeObject); + } + catch (Pinvoke.Error e) + { + throw new TypeDBDriverException(e); + } + } + public bool IsOpen() { return Pinvoke.typedb_driver.connection_is_open(NativeObject); diff --git a/csharp/Drivers.cs b/csharp/Drivers.cs index eb26c215ff..0a6b415035 100644 --- a/csharp/Drivers.cs +++ b/csharp/Drivers.cs @@ -70,12 +70,18 @@ public static ITypeDBDriver CloudDriver(string address, TypeDBCredential credent * Drivers.CloudDriver(addresses, credential); * * - * @param addresses The address(es) of the TypeDB server(s) + * @param addresses The address(es) of the TypeDB server(s) or translation map from addresses + * received from the TypeDB server(s) to addresses to be used by the driver for connection * @param credential The credential to connect with */ public static ITypeDBDriver CloudDriver(ICollection addresses, TypeDBCredential credential) { return new TypeDBDriver(addresses, credential); } + + public static ITypeDBDriver CloudDriver(IDictionary addressTranslation, TypeDBCredential credential) + { + return new TypeDBDriver(addressTranslation, credential); + } } } diff --git a/csharp/Test/Integration/Network/AddressTranslationTest.cs b/csharp/Test/Integration/Network/AddressTranslationTest.cs new file mode 100644 index 0000000000..12ec449ba1 --- /dev/null +++ b/csharp/Test/Integration/Network/AddressTranslationTest.cs @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using NUnit.Framework; +using System; +using System.Collections.Generic; +using System.Linq; + +using TypeDB.Driver; +using TypeDB.Driver.Api; +using TypeDB.Driver.Common; +using static TypeDB.Driver.Api.IThingType; +using static TypeDB.Driver.Api.IThingType.Annotation; + +namespace TypeDB.Driver.Test.Integration +{ + [TestFixture] + public class AddressTranslationTestFixture + { + [Test] + public void TestCloudConnectionWithTranslation() + { + try + { + IDictionary addressTranslation = new Dictionary() { + {"localhost:11729", "localhost:11729"}, + {"localhost:21729", "localhost:21729"}, + {"localhost:31729", "localhost:31729"}, + }; + + TypeDBCredential connectCredential = new TypeDBCredential( + "admin", + "password", + Environment.GetEnvironmentVariable("ROOT_CA")!); + + using (ITypeDBDriver driver = Drivers.CloudDriver(addressTranslation, connectCredential)) + { + driver.Databases.Create("typedb"); + using (ITypeDBSession session = driver.Session("typedb", SessionType.Data)) + { + using (ITypeDBTransaction transaction = session.Transaction(TransactionType.Write)) + { + IEntityType root = transaction.Concepts.RootEntityType; + Assert.IsNotNull(root); + Assert.AreEqual(1, root.GetSubtypes(transaction).Count()); + } + } + driver.Databases.Get("typedb").Delete(); + } + } + catch (TypeDBDriverException e) + { + Console.WriteLine($"Caught TypeDB Driver Exception: {e}"); + Assert.Fail(); + } + } + } +} + diff --git a/csharp/Test/Integration/Network/BUILD b/csharp/Test/Integration/Network/BUILD new file mode 100644 index 0000000000..4532438cab --- /dev/null +++ b/csharp/Test/Integration/Network/BUILD @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +load("//csharp:build_opts.bzl", "nullable_context", "target_frameworks", "targeting_packs") +load("//csharp/Test:rules.bzl", "csharp_integration_test") +load("@vaticle_dependencies//tool/checkstyle:rules.bzl", "checkstyle_test") + +csharp_integration_test( + name = "address-translation-cloud", + srcs = ["AddressTranslationTest.cs"], + deps = [ + "//csharp:driver-csharp", + "//csharp/Api:api", + "//csharp/Common:common", + ], + target_frameworks = target_frameworks, + targeting_packs = targeting_packs, +) + +checkstyle_test( + name = "checkstyle", + include = glob(["*"]), + license_type = "apache-header", +) + diff --git a/csharp/docs/connection/Drivers.adoc b/csharp/docs/connection/Drivers.adoc index 954b714579..c647fe15f2 100644 --- a/csharp/docs/connection/Drivers.adoc +++ b/csharp/docs/connection/Drivers.adoc @@ -57,7 +57,7 @@ Open a TypeDB Driver to TypeDB Cloud server(s) available at the provided address [options="header"] |=== |Name |Description |Type -a| `addresses` a| The address(es) of the TypeDB server(s) a| `ICollection< string >` +a| `addresses` a| The address(es) of the TypeDB server(s) or translation map from addresses received from the TypeDB server(s) to addresses to be used by the driver for connection a| `ICollection< string >` a| `credential` a| The credential to connect with a| `TypeDBCredential` |=== diff --git a/csharp/docs/connection/IReplica.adoc b/csharp/docs/connection/IReplica.adoc index edf44d3b13..2210d40e13 100644 --- a/csharp/docs/connection/IReplica.adoc +++ b/csharp/docs/connection/IReplica.adoc @@ -8,53 +8,53 @@ The metadata and state of an individual raft replica of a database. // tag::methods[] -[#_string_TypeDB_Driver_Api_IDatabase_IReplica_Address] -==== Address +[#_bool_TypeDB_Driver_Api_IDatabase_IReplica_IsPreferred___] +==== IsPreferred [source,cs] ---- -string TypeDB.Driver.Api.IDatabase.IReplica.Address +bool IsPreferred() ---- -Retrieves the address of the server hosting this replica. +Checks whether this is the preferred replica of the raft cluster. If true, Operations which can be run on any replica will prefer to use this replica. [caption=""] .Returns -`string` +`bool` -[#_bool_TypeDB_Driver_Api_IDatabase_IReplica_IsPreferred___] -==== IsPreferred +[#_bool_TypeDB_Driver_Api_IDatabase_IReplica_IsPrimary___] +==== IsPrimary [source,cs] ---- -bool IsPreferred() +bool IsPrimary() ---- -Checks whether this is the preferred replica of the raft cluster. If true, Operations which can be run on any replica will prefer to use this replica. +Checks whether this is the primary replica of the raft cluster. [caption=""] .Returns `bool` -[#_bool_TypeDB_Driver_Api_IDatabase_IReplica_IsPrimary___] -==== IsPrimary +[#_string_TypeDB_Driver_Api_IDatabase_IReplica_Server] +==== Server [source,cs] ---- -bool IsPrimary() +string TypeDB.Driver.Api.IDatabase.IReplica.Server ---- -Checks whether this is the primary replica of the raft cluster. +The server hosting this replica. [caption=""] .Returns -`bool` +`string` [#_long_TypeDB_Driver_Api_IDatabase_IReplica_Term] ==== Term diff --git a/java/TypeDB.java b/java/TypeDB.java index f391e92576..7fc7511225 100644 --- a/java/TypeDB.java +++ b/java/TypeDB.java @@ -23,6 +23,7 @@ import com.vaticle.typedb.driver.api.TypeDBCredential; import com.vaticle.typedb.driver.connection.TypeDBDriverImpl; +import java.util.Map; import java.util.Set; import static com.vaticle.typedb.common.collection.Collections.set; @@ -75,4 +76,21 @@ public static TypeDBDriver cloudDriver(String address, TypeDBCredential credenti public static TypeDBDriver cloudDriver(Set addresses, TypeDBCredential credential) { return new TypeDBDriverImpl(addresses, credential); } + + /** + * Open a TypeDB Driver to TypeDB Cloud server(s), using provided address translation, with + * the provided credential. + * + *

Examples

+ *
+     * TypeDB.cloudDriver(addressTranslation, credential);
+     * 
+ * + * @param addressTranslation Translation map from addresses received from the TypeDB server(s) + * to addresses to be used by the driver for connection + * @param credential The credential to connect with + */ + public static TypeDBDriver cloudDriver(Map addressTranslation, TypeDBCredential credential) { + return new TypeDBDriverImpl(addressTranslation, credential); + } } diff --git a/java/api/database/Database.java b/java/api/database/Database.java index 542d9039e0..a8b5c81fae 100644 --- a/java/api/database/Database.java +++ b/java/api/database/Database.java @@ -116,10 +116,10 @@ public interface Database { interface Replica { /** - * Retrieves the address of the server hosting this replica + * The server hosting this replica */ @CheckReturnValue - String address(); + String server(); /** * Checks whether this is the primary replica of the raft cluster. diff --git a/java/connection/TypeDBDatabaseImpl.java b/java/connection/TypeDBDatabaseImpl.java index 61fd7fd16a..5b7cf56d8e 100644 --- a/java/connection/TypeDBDatabaseImpl.java +++ b/java/connection/TypeDBDatabaseImpl.java @@ -37,7 +37,7 @@ import static com.vaticle.typedb.driver.jni.typedb_driver.database_rule_schema; import static com.vaticle.typedb.driver.jni.typedb_driver.database_schema; import static com.vaticle.typedb.driver.jni.typedb_driver.database_type_schema; -import static com.vaticle.typedb.driver.jni.typedb_driver.replica_info_get_address; +import static com.vaticle.typedb.driver.jni.typedb_driver.replica_info_get_server; import static com.vaticle.typedb.driver.jni.typedb_driver.replica_info_get_term; import static com.vaticle.typedb.driver.jni.typedb_driver.replica_info_is_preferred; import static com.vaticle.typedb.driver.jni.typedb_driver.replica_info_is_primary; @@ -127,8 +127,8 @@ public static class Replica extends NativeObject implements TypeDBDriver { @@ -49,6 +53,10 @@ public TypeDBDriverImpl(Set initAddresses, TypeDBCredential credential) this(openCloud(initAddresses, credential)); } + public TypeDBDriverImpl(Map addressTranslation, TypeDBCredential credential) throws TypeDBDriverException { + this(openCloud(addressTranslation, credential)); + } + private TypeDBDriverImpl(com.vaticle.typedb.driver.jni.Connection connection) { super(connection); databaseMgr = new TypeDBDatabaseManagerImpl(this.nativeObject); @@ -71,6 +79,24 @@ private static com.vaticle.typedb.driver.jni.Connection openCloud(Set in } } + private static com.vaticle.typedb.driver.jni.Connection openCloud(Map addressTranslation, TypeDBCredential credential) { + try { + List advertised = new ArrayList(); + List translated = new ArrayList(); + for (Map.Entry entry: addressTranslation.entrySet()) { + advertised.add(entry.getKey()); + translated.add(entry.getValue()); + } + return connection_open_cloud_translated( + advertised.toArray(new String[0]), + translated.toArray(new String[0]), + credential.nativeObject + ); + } catch (com.vaticle.typedb.driver.jni.Error e) { + throw new TypeDBDriverException(e); + } + } + @Override public boolean isOpen() { return connection_is_open(nativeObject); diff --git a/java/docs/connection/Database.Replica.adoc b/java/docs/connection/Database.Replica.adoc index 115a27b252..3f5426cf30 100644 --- a/java/docs/connection/Database.Replica.adoc +++ b/java/docs/connection/Database.Replica.adoc @@ -6,50 +6,50 @@ The metadata and state of an individual raft replica of a database. // tag::methods[] -[#_Database_Replica_address__] -==== address +[#_Database_Replica_isPreferred__] +==== isPreferred [source,java] ---- @CheckReturnValue -java.lang.String address() +boolean isPreferred() ---- -Retrieves the address of the server hosting this replica +Checks whether this is the preferred replica of the raft cluster. If true, Operations which can be run on any replica will prefer to use this replica. [caption=""] .Returns -`java.lang.String` +`boolean` -[#_Database_Replica_isPreferred__] -==== isPreferred +[#_Database_Replica_isPrimary__] +==== isPrimary [source,java] ---- @CheckReturnValue -boolean isPreferred() +boolean isPrimary() ---- -Checks whether this is the preferred replica of the raft cluster. If true, Operations which can be run on any replica will prefer to use this replica. +Checks whether this is the primary replica of the raft cluster. [caption=""] .Returns `boolean` -[#_Database_Replica_isPrimary__] -==== isPrimary +[#_Database_Replica_server__] +==== server [source,java] ---- @CheckReturnValue -boolean isPrimary() +java.lang.String server() ---- -Checks whether this is the primary replica of the raft cluster. +The server hosting this replica [caption=""] .Returns -`boolean` +`java.lang.String` [#_Database_Replica_term__] ==== term diff --git a/java/docs/connection/TypeDB.adoc b/java/docs/connection/TypeDB.adoc index 3608520d07..6411f63d1d 100644 --- a/java/docs/connection/TypeDB.adoc +++ b/java/docs/connection/TypeDB.adoc @@ -95,6 +95,39 @@ a| `credential` a| The credential to connect with a| `TypeDBCredential` TypeDB.cloudDriver(addresses, credential); ---- +[#_TypeDB_cloudDriver__java_util_Map_java_lang_String_​java_lang_String___TypeDBCredential] +==== cloudDriver + +[source,java] +---- +public static TypeDBDriver cloudDriver​(java.util.Map addressTranslation, + TypeDBCredential credential) +---- + +Open a TypeDB Driver to TypeDB Cloud server(s), using provided address translation, with the provided credential. + + +[caption=""] +.Input parameters +[cols=",,"] +[options="header"] +|=== +|Name |Description |Type +a| `addressTranslation` a| Translation map from addresses received from the TypeDB server(s) to addresses to be used by the driver for connection a| `java.util.Map` +a| `credential` a| The credential to connect with a| `TypeDBCredential` +|=== + +[caption=""] +.Returns +`public static TypeDBDriver` + +[caption=""] +.Code examples +[source,java] +---- +TypeDB.cloudDriver(addressTranslation, credential); +---- + [#_TypeDB_coreDriver__java_lang_String] ==== coreDriver diff --git a/java/test/integration/AddressTranslationTest.java b/java/test/integration/AddressTranslationTest.java new file mode 100644 index 0000000000..f25da6bbae --- /dev/null +++ b/java/test/integration/AddressTranslationTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.vaticle.typedb.driver.test.integration; + +import com.vaticle.typedb.cloud.tool.runner.TypeDBCloudRunner; +import com.vaticle.typedb.common.collection.Pair; +import com.vaticle.typedb.driver.TypeDB; +import com.vaticle.typedb.driver.api.TypeDBCredential; +import com.vaticle.typedb.driver.api.TypeDBDriver; +import com.vaticle.typedb.driver.api.TypeDBSession; +import com.vaticle.typedb.driver.api.TypeDBTransaction; +import com.vaticle.typedb.driver.api.concept.type.EntityType; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static com.vaticle.typedb.common.collection.Collections.map; +import static com.vaticle.typedb.common.collection.Collections.pair; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +@SuppressWarnings("Duplicates") +public class AddressTranslationTest { + private static final Logger LOG = LoggerFactory.getLogger(AddressTranslationTest.class); + + private static final Map serverOptions = map( + pair("--diagnostics.reporting.errors", "false"), + pair("--diagnostics.reporting.statistics", "false"), + pair("--diagnostics.monitoring.enable", "false") + ); + private static final TypeDBCredential credential = new TypeDBCredential("admin", "password", false); + + @Test + public void testAddressTranslation() { + TypeDBCloudRunner typedb = TypeDBCloudRunner.create(Paths.get("."), 3, serverOptions); + typedb.start(); + Map addresses = typedb.externalAddresses().stream().map(address -> pair(address, address)) + .collect(Collectors.toMap(Pair::first, Pair::second)); + TypeDBDriver driver = TypeDB.cloudDriver(addresses, credential); + driver.databases().create("typedb"); + TypeDBSession session = driver.session("typedb", TypeDBSession.Type.DATA); + TypeDBTransaction tx = session.transaction(TypeDBTransaction.Type.WRITE); + EntityType root = tx.concepts().getRootEntityType(); + assertNotNull(root); + assertEquals(1, root.getSubtypes(tx).collect(Collectors.toList()).size()); + tx.close(); + session.close(); + driver.close(); + } +} diff --git a/java/test/integration/BUILD b/java/test/integration/BUILD index 5c7dacd507..14f6b44e56 100644 --- a/java/test/integration/BUILD +++ b/java/test/integration/BUILD @@ -44,6 +44,29 @@ typedb_java_test( ], ) +typedb_java_test( + name = "test-address-translation", + srcs = ["AddressTranslationTest.java"], + server_artifacts = { + "@vaticle_bazel_distribution//platform:is_linux_arm64": "@vaticle_typedb_cloud_artifact_linux-arm64//file", + "@vaticle_bazel_distribution//platform:is_linux_x86_64": "@vaticle_typedb_cloud_artifact_linux-x86_64//file", + "@vaticle_bazel_distribution//platform:is_mac_arm64": "@vaticle_typedb_cloud_artifact_mac-arm64//file", + "@vaticle_bazel_distribution//platform:is_mac_x86_64": "@vaticle_typedb_cloud_artifact_mac-x86_64//file", + "@vaticle_bazel_distribution//platform:is_windows_x86_64": "@vaticle_typedb_cloud_artifact_windows-x86_64//file", + }, + test_class = "com.vaticle.typedb.driver.test.integration.AddressTranslationTest", + deps = [ + # Internal dependencies + "//java:driver-java", + "//java/api", + + # External dependencies from @vaticle + "@vaticle_typeql//common/java:common", + "@maven//:org_slf4j_slf4j_api", + "@maven//:com_vaticle_typedb_typedb_cloud_runner", + ], +) + checkstyle_test( name = "checkstyle", include = glob(["*"]), diff --git a/nodejs/TypeDB.ts b/nodejs/TypeDB.ts index 633ea5925e..4b6372ef80 100644 --- a/nodejs/TypeDB.ts +++ b/nodejs/TypeDB.ts @@ -50,7 +50,7 @@ export namespace TypeDB { * const driver = TypeDB.cloudDriver(["127.0.0.1:11729"], new TypeDBCredential(username, password)); * ``` */ - export function cloudDriver(addresses: string | string[], credential: TypeDBCredential): Promise { + export function cloudDriver(addresses: string | string[] | Record, credential: TypeDBCredential): Promise { if (typeof addresses === 'string') addresses = [addresses]; return new TypeDBDriverImpl(addresses, credential).open(); } diff --git a/nodejs/api/connection/database/Database.ts b/nodejs/api/connection/database/Database.ts index d063d1aa58..035d62f045 100644 --- a/nodejs/api/connection/database/Database.ts +++ b/nodejs/api/connection/database/Database.ts @@ -69,7 +69,7 @@ export namespace Database { * If true, Operations which can be run on any replica will prefer to use this replica. */ readonly preferred: boolean; - /** The address of the server hosting this replica */ - readonly address: string; + /** The server hosting this replica */ + readonly server: string; } } diff --git a/nodejs/common/errors/ErrorMessage.ts b/nodejs/common/errors/ErrorMessage.ts index 6cf02d49aa..63b651449f 100644 --- a/nodejs/common/errors/ErrorMessage.ts +++ b/nodejs/common/errors/ErrorMessage.ts @@ -102,6 +102,7 @@ export namespace ErrorMessage { export const CLOUD_INVALID_ROOT_CA_PATH = new Driver(21, (args: Stringable[]) => `The provided Root CA path '${args[0]}' does not exist`); export const UNRECOGNISED_SESSION_TYPE = new Driver(22, (args: Stringable[]) => `Session type '${args[1]}' was not recognised.`); export const MISSING_PORT = new Driver(23, (args: Stringable[]) => `Invalid URL '${args[1]}': missing port.`); + export const ADDRESS_TRANSLATION_MISMATCH = new Driver(24, (args: Stringable[]) => `Address translation map does not match the server's advertised address list. User-provided servers not in the advertised list: {${args[0]}}. Advertised servers not mapped by user: {${args[1]}}.`); } export class Concept extends ErrorMessage { diff --git a/nodejs/connection/TypeDBDatabaseImpl.ts b/nodejs/connection/TypeDBDatabaseImpl.ts index e3d3df21e8..afa17a4548 100644 --- a/nodejs/connection/TypeDBDatabaseImpl.ts +++ b/nodejs/connection/TypeDBDatabaseImpl.ts @@ -128,7 +128,7 @@ export class TypeDBDatabaseImpl implements Database { async runOnAnyReplica(task: (serverDriver: ServerDriver, serverDatabase: ServerDatabase) => Promise): Promise { for (const replica of this.replicas) { try { - return await task(this._driver.serverDrivers.get(replica.address), replica.database); + return await task(this._driver.serverDrivers.get(replica.server), replica.database); } catch (e) { if (e instanceof TypeDBDriverError && UNABLE_TO_CONNECT === e.messageTemplate) { // TODO log @@ -144,7 +144,7 @@ export class TypeDBDatabaseImpl implements Database { } for (const _ of Array(PRIMARY_REPLICA_TASK_MAX_RETRIES)) { try { - return await task(this._driver.serverDrivers.get(this.primaryReplica.address), this.primaryReplica.database); + return await task(this._driver.serverDrivers.get(this.primaryReplica.server), this.primaryReplica.database); } catch (e) { if (e instanceof TypeDBDriverError && (UNABLE_TO_CONNECT === e.messageTemplate || CLOUD_REPLICA_NOT_PRIMARY === e.messageTemplate) @@ -176,14 +176,14 @@ export class TypeDBDatabaseImpl implements Database { } export class Replica implements Database.Replica { - private readonly _address: string; + private readonly _server: string; private readonly _database: ServerDatabase; private readonly _term: number; private readonly _isPrimary: boolean; private readonly _isPreferred: boolean; - private constructor(database: ServerDatabase, address: string, term: number, isPrimary: boolean, isPreferred: boolean) { - this._address = address; + private constructor(database: ServerDatabase, server: string, term: number, isPrimary: boolean, isPreferred: boolean) { + this._server = server; this._database = database; this._term = term; this._isPrimary = isPrimary; @@ -198,8 +198,8 @@ export class Replica implements Database.Replica { return this._database; } - get address(): string { - return this._address; + get server(): string { + return this._server; } get databaseName(): string { @@ -219,7 +219,7 @@ export class Replica implements Database.Replica { } toString(): string { - return `${this._address}/${this.databaseName}:${this._isPrimary ? "P" : "S"}:${this._term}`; + return `${this._server}/${this.databaseName}:${this._isPrimary ? "P" : "S"}:${this._term}`; } } diff --git a/nodejs/connection/TypeDBDriverImpl.ts b/nodejs/connection/TypeDBDriverImpl.ts index fc6dcc1ddc..c2f227bd97 100644 --- a/nodejs/connection/TypeDBDriverImpl.ts +++ b/nodejs/connection/TypeDBDriverImpl.ts @@ -37,12 +37,13 @@ import CLOUD_UNABLE_TO_CONNECT = ErrorMessage.Driver.CLOUD_UNABLE_TO_CONNECT; import SESSION_ID_EXISTS = ErrorMessage.Driver.SESSION_ID_EXISTS; import UNABLE_TO_CONNECT = ErrorMessage.Driver.UNABLE_TO_CONNECT; import MISSING_PORT = ErrorMessage.Driver.MISSING_PORT; +import ADDRESS_TRANSLATION_MISMATCH = ErrorMessage.Driver.ADDRESS_TRANSLATION_MISMATCH; export class TypeDBDriverImpl implements TypeDBDriver { private _isOpen: boolean; private readonly _isCloud: boolean; - private readonly _initAddresses: string[]; + private readonly _initAddresses: string[] | Record; private readonly _credential: TypeDBCredential; private _userManager: UserManagerImpl; @@ -53,10 +54,10 @@ export class TypeDBDriverImpl implements TypeDBDriver { private readonly _sessions: { [id: string]: TypeDBSessionImpl }; - constructor(addresses: string | string[], credential?: TypeDBCredential) { + constructor(addresses: string | string[] | Record, credential?: TypeDBCredential) { if (typeof addresses === 'string') addresses = [addresses]; - for (const address of addresses) + for (const [_, address] of Object.entries(addresses)) if (!/:\d+/.test(address)) throw new TypeDBDriverError(MISSING_PORT.message(address)); @@ -77,7 +78,7 @@ export class TypeDBDriverImpl implements TypeDBDriver { } private async openCore(): Promise { - const serverAddress = this._initAddresses[0]; + const serverAddress = (this._initAddresses as string[])[0]; const serverStub = new TypeDBStubImpl(serverAddress, this._credential); await serverStub.open(); const advertisedAddress = (await serverStub.serversAll(RequestBuilder.ServerManager.allReq())).servers[0].address; @@ -89,10 +90,38 @@ export class TypeDBDriverImpl implements TypeDBDriver { private async openCloud(): Promise { const serverAddresses = await this.fetchCloudServerAddresses(); const openReqs: Promise[] = [] - for (const addr of serverAddresses) { - const serverStub = new TypeDBStubImpl(addr, this._credential); + + let addressTranslation: Record; + if (Array.isArray(this._initAddresses)) { + addressTranslation = {}; + for (const address of serverAddresses) { + addressTranslation[address] = address; + } + } else { + addressTranslation = this._initAddresses; + const unknown = []; + for (const [advertised, _] of Object.entries(addressTranslation)) { + if (serverAddresses.indexOf(advertised) === -1) { + unknown.push(advertised); + } + } + const unmapped = []; + for (const advertisedAddress of serverAddresses) { + if (!(advertisedAddress in addressTranslation)) { + unmapped.push(advertisedAddress); + } + } + if (unknown.length > 0 || unmapped.length > 0) { + throw new TypeDBDriverError( + ADDRESS_TRANSLATION_MISMATCH.message(unknown.join(", "), unmapped.join(", ")) + ); + } + } + + for (const [serverID, address] of Object.entries(addressTranslation)) { + const serverStub = new TypeDBStubImpl(address, this._credential); openReqs.push(serverStub.open()); - this.serverDrivers.set(addr, new ServerDriver(addr, serverStub)); + this.serverDrivers.set(serverID, new ServerDriver(address, serverStub)); } try { await Promise.any(openReqs); @@ -105,7 +134,7 @@ export class TypeDBDriverImpl implements TypeDBDriver { } private async fetchCloudServerAddresses(): Promise { - for (const address of this._initAddresses) { + for (const [_, address] of Object.entries(this._initAddresses)) { try { const stub = new TypeDBStubImpl(address, this._credential); await stub.open(); @@ -116,7 +145,7 @@ export class TypeDBDriverImpl implements TypeDBDriver { console.error(`Fetching cloud servers from ${address} failed.`, e); } } - throw new TypeDBDriverError(CLOUD_UNABLE_TO_CONNECT.message(this._initAddresses.join(","))); + throw new TypeDBDriverError(CLOUD_UNABLE_TO_CONNECT.message(Object.values(this._initAddresses).join(","))); } isOpen(): boolean { diff --git a/nodejs/docs/connection/Replica.adoc b/nodejs/docs/connection/Replica.adoc index 3464de5cd9..e42d26308b 100644 --- a/nodejs/docs/connection/Replica.adoc +++ b/nodejs/docs/connection/Replica.adoc @@ -10,10 +10,10 @@ The metadata and state of an individual raft replica of a database. [options="header"] |=== |Name |Type |Description -a| `address` a| `string` a| The address of the server hosting this replica a| `databaseName` a| `string` a| The database for which this is a replica. a| `preferred` a| `boolean` a| Checks whether this is the preferred replica of the raft cluster. If true, Operations which can be run on any replica will prefer to use this replica. a| `primary` a| `boolean` a| Checks whether this is the primary replica of the raft cluster. +a| `server` a| `string` a| The server hosting this replica a| `term` a| `number` a| The raft protocol ‘term’ of this replica. |=== // end::properties[] diff --git a/nodejs/docs/connection/TypeDB.adoc b/nodejs/docs/connection/TypeDB.adoc index 705cc9ad41..23e1f06596 100644 --- a/nodejs/docs/connection/TypeDB.adoc +++ b/nodejs/docs/connection/TypeDB.adoc @@ -13,7 +13,7 @@ a| `DEFAULT_ADDRESS` // end::enum_constants[] // tag::methods[] -[#_TypeDB_cloudDriver__addresses_string__string____credential_TypeDBCredential] +[#_TypeDB_cloudDriver__addresses_string__string____Record_string__string___credential_TypeDBCredential] ==== cloudDriver [source,nodejs] @@ -29,7 +29,7 @@ Creates a connection to TypeDB Cloud, authenticating with the provided credentia [options="header"] |=== |Name |Description |Type -a| `addresses` a| List of addresses of the individual TypeDB Cloud servers. As long one specified address is provided, the driver will discover the other addresses from that server. a| `string \| string[]` +a| `addresses` a| List of addresses of the individual TypeDB Cloud servers. As long one specified address is provided, the driver will discover the other addresses from that server. a| `string \| string[] \| Record` a| `credential` a| The credentials to log in, and encryption settings. See ``TypeDBCredential`` Examples ``const driver = TypeDB.cloudDriver(["127.0.0.1:11729"], new TypeDBCredential(username, password)); diff --git a/nodejs/test/integration/test-cloud-failover.js b/nodejs/test/integration/test-cloud-failover.js index 320d6a18a1..d766f7bb8a 100644 --- a/nodejs/test/integration/test-cloud-failover.js +++ b/nodejs/test/integration/test-cloud-failover.js @@ -115,7 +115,7 @@ async function run() { ); primaryReplica = await seekPrimaryReplica(driver.databases); console.info(`Stopping primary replica (test ${iteration}/10)...`); - const port = primaryReplica.address.substring(10,15); + const port = primaryReplica.server.substring(10,15); const primaryReplicaServerPID = getServerPID(port); console.info(`Primary replica is hosted by server with PID ${primaryReplicaServerPID}`); spawnSync("kill", ["-9", primaryReplicaServerPID]); @@ -129,7 +129,7 @@ async function run() { console.info(`Retrieved entity type with label '${person.label.scopedName}' from new primary replica`); assert(person.label.scopedName === "person"); await session.close(); - const idx = primaryReplica.address[10]; + const idx = primaryReplica.server[10]; serverStart(idx); await new Promise(resolve => setTimeout(resolve, 20000)); const spawned = getServerPID(`${idx}1729`); diff --git a/nodejs/test/integration/test-connection-cloud.js b/nodejs/test/integration/test-connection-cloud.js new file mode 100644 index 0000000000..5d589c5f57 --- /dev/null +++ b/nodejs/test/integration/test-connection-cloud.js @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +const { TypeDB, SessionType, TransactionType, TypeDBCredential } = require("../../dist"); +const assert = require("assert"); + +async function run() { + try { + const driver = await TypeDB.cloudDriver( + { + "localhost:11729": "localhost:11729", + "localhost:21729": "localhost:21729", + "localhost:31729": "localhost:31729", + }, + new TypeDBCredential("admin", "password", process.env.ROOT_CA) + ); + + const dbs = await driver.databases.all(); + const typedb = dbs.find(x => x.name === "typedb"); + if (typedb) { + await typedb.delete(); + } + await driver.databases.create("typedb"); + const session = await driver.session("typedb", SessionType.DATA); + const tx = await session.transaction(TransactionType.WRITE); + + const root = await tx.concepts.getRootEntityType(); + const subtypes = await root.getSubtypes(tx).collect(); + assert(subtypes.length === 1); + + await tx.close(); + await session.close(); + await driver.close(); + } catch (err) { + console.error(`ERROR: ${err.stack || err}`); + process.exit(1); + } +} + +run(); + diff --git a/nodejs/test/integration/test-connection.js b/nodejs/test/integration/test-connection-core.js similarity index 100% rename from nodejs/test/integration/test-connection.js rename to nodejs/test/integration/test-connection-core.js diff --git a/python/docs/connection/Replica.adoc b/python/docs/connection/Replica.adoc index 72e8c9568d..d686bd4f8a 100644 --- a/python/docs/connection/Replica.adoc +++ b/python/docs/connection/Replica.adoc @@ -4,20 +4,6 @@ The metadata and state of an individual raft replica of a database. // tag::methods[] -[#_Replica_address__] -==== address - -[source,python] ----- -address() -> str ----- - -Retrieves address of the server hosting this replica - -[caption=""] -.Returns -`str` - [#_Replica_database__] ==== database @@ -60,6 +46,20 @@ Checks whether this is the primary replica of the raft cluster. .Returns `bool` +[#_Replica_server__] +==== server + +[source,python] +---- +server() -> str +---- + +The server hosting this replica + +[caption=""] +.Returns +`str` + [#_Replica_term__] ==== term diff --git a/python/docs/connection/TypeDB.adoc b/python/docs/connection/TypeDB.adoc index 2c6a39f781..49c8a01fb4 100644 --- a/python/docs/connection/TypeDB.adoc +++ b/python/docs/connection/TypeDB.adoc @@ -2,12 +2,12 @@ === TypeDB // tag::methods[] -[#_TypeDB_cloud_driver__addresses_Iterable_str___str__credential_TypeDBCredential] +[#_TypeDB_cloud_driver__addresses_Mapping_str__str___Iterable_str___str__credential_TypeDBCredential] ==== cloud_driver [source,python] ---- -static cloud_driver(addresses: Iterable[str] | str, credential: TypeDBCredential) -> TypeDBDriver +static cloud_driver(addresses: Mapping[str, str] | Iterable[str] | str, credential: TypeDBCredential) -> TypeDBDriver ---- Creates a connection to TypeDB Cloud, authenticating with the provided credentials. @@ -18,7 +18,7 @@ Creates a connection to TypeDB Cloud, authenticating with the provided credentia [options="header"] |=== |Name |Description |Type |Default Value -a| `addresses` a| – a| `Iterable[str] \| str` a| +a| `addresses` a| – a| `Mapping[str, str] \| Iterable[str] \| str` a| a| `credential` a| – a| `TypeDBCredential` a| |=== diff --git a/python/tests/integration/test_connection.py b/python/tests/integration/test_connection.py index bf9720ca17..0ddff99154 100644 --- a/python/tests/integration/test_connection.py +++ b/python/tests/integration/test_connection.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +import os import unittest from unittest import TestCase @@ -22,6 +23,10 @@ from typedb.driver import * +TYPEDB = "typedb" +DATA = SessionType.DATA +WRITE = TransactionType.WRITE + class TestDebug(TestCase): @@ -29,6 +34,20 @@ def test_missing_port(self): assert_that(calling(lambda: TypeDB.core_driver("localhost")), raises(TypeDBDriverException)) + def test_address_translation(self): + address_translation = { + "localhost:11729": "localhost:11729", + "localhost:21729": "localhost:21729", + "localhost:31729": "localhost:31729" + } + credential = TypeDBCredential("admin", "password", tls_enabled=True, tls_root_ca_path=os.environ["ROOT_CA"]) + with TypeDB.cloud_driver(address_translation, credential) as driver: + if TYPEDB not in [db.name for db in driver.databases.all()]: + driver.databases.create(TYPEDB) + with driver.session(TYPEDB, DATA) as session, session.transaction(WRITE) as tx: + root = tx.concepts.get_root_entity_type() + assert_that(len(list(root.get_subtypes(tx))), equal_to(1)) + if __name__ == "__main__": unittest.main(verbosity=2) diff --git a/python/typedb/api/connection/database.py b/python/typedb/api/connection/database.py index ee16b60ec8..923edaae81 100644 --- a/python/typedb/api/connection/database.py +++ b/python/typedb/api/connection/database.py @@ -154,9 +154,9 @@ def database(self) -> Database: pass @abstractmethod - def address(self) -> str: + def server(self) -> str: """ - Retrieves address of the server hosting this replica + The server hosting this replica :return: """ diff --git a/python/typedb/connection/database.py b/python/typedb/connection/database.py index e37e29cce5..b71d84d231 100644 --- a/python/typedb/connection/database.py +++ b/python/typedb/connection/database.py @@ -20,7 +20,7 @@ from typing import Optional from typedb.native_driver_wrapper import database_get_name, database_schema, database_delete, database_rule_schema, \ - database_type_schema, ReplicaInfo, replica_info_get_address, replica_info_is_primary, replica_info_is_preferred, \ + database_type_schema, ReplicaInfo, replica_info_get_server, replica_info_is_primary, replica_info_is_preferred, \ replica_info_get_term, database_get_replicas_info, database_get_primary_replica_info, \ database_get_preferred_replica_info, replica_info_iterator_next, Database as NativeDatabase, \ TypeDBDriverExceptionNative @@ -105,8 +105,8 @@ def __init__(self, replica_info: ReplicaInfo): def database(self) -> Database: pass - def address(self) -> str: - return replica_info_get_address(self._info) + def server(self) -> str: + return replica_info_get_server(self._info) def is_primary(self) -> bool: return replica_info_is_primary(self._info) diff --git a/python/typedb/connection/driver.py b/python/typedb/connection/driver.py index f3ae5e243f..bbaa52eb98 100644 --- a/python/typedb/connection/driver.py +++ b/python/typedb/connection/driver.py @@ -19,8 +19,8 @@ from typing import Optional, TYPE_CHECKING -from typedb.native_driver_wrapper import connection_open_core, connection_open_cloud, connection_is_open, \ - connection_force_close, Connection as NativeConnection, TypeDBDriverExceptionNative +from typedb.native_driver_wrapper import connection_open_core, connection_open_cloud, connection_open_cloud_translated, \ + connection_is_open, connection_force_close, Connection as NativeConnection, TypeDBDriverExceptionNative from typedb.api.connection.driver import TypeDBDriver from typedb.api.connection.options import TypeDBOptions @@ -38,10 +38,16 @@ class _Driver(TypeDBDriver, NativeWrapper[NativeConnection]): - def __init__(self, addresses: list[str], credential: Optional[TypeDBCredential] = None): + def __init__(self, addresses: list[str] | dict[str], credential: Optional[TypeDBCredential] = None): if credential: try: - native_connection = connection_open_cloud(addresses, credential.native_object) + if isinstance(addresses, list): + native_connection = connection_open_cloud(addresses, credential.native_object) + else: + advertised_addresses = list(addresses.keys()) + translated_addresses = [addresses[advertised] for advertised in advertised_addresses] + native_connection = connection_open_cloud_translated( + advertised_addresses, translated_addresses, credential.native_object) except TypeDBDriverExceptionNative as e: raise TypeDBDriverException.of(e) else: diff --git a/python/typedb/driver.py b/python/typedb/driver.py index 1acaaf5ff5..45a00413f1 100644 --- a/python/typedb/driver.py +++ b/python/typedb/driver.py @@ -15,7 +15,8 @@ # specific language governing permissions and limitations # under the License. -from typing import Iterable, Union +from collections.abc import Iterable, Mapping +from typing import Union from typedb.api.answer.concept_map import * # noqa # pylint: disable=unused-import from typedb.api.answer.concept_map_group import * # noqa # pylint: disable=unused-import @@ -67,7 +68,7 @@ def core_driver(address: str) -> TypeDBDriver: return _Driver([address]) @staticmethod - def cloud_driver(addresses: Union[Iterable[str], str], credential: TypeDBCredential) -> TypeDBDriver: + def cloud_driver(addresses: Union[Mapping[str, str], Iterable[str], str], credential: TypeDBCredential) -> TypeDBDriver: """ Creates a connection to TypeDB Cloud, authenticating with the provided credentials. @@ -77,5 +78,7 @@ def cloud_driver(addresses: Union[Iterable[str], str], credential: TypeDBCredent """ if isinstance(addresses, str): return _Driver([addresses], credential) + elif isinstance(addresses, Mapping): + return _Driver(dict(addresses), credential) else: return _Driver(list(addresses), credential) diff --git a/rust/docs/connection/Connection.adoc b/rust/docs/connection/Connection.adoc index 4a63a18e27..639d3222a1 100644 --- a/rust/docs/connection/Connection.adoc +++ b/rust/docs/connection/Connection.adoc @@ -128,6 +128,54 @@ Connection::new_cloud( ) ---- +[#_struct_Connection_new_cloud_with_translation__address_translation_HashMap_T__credential_Credential] +==== new_cloud_with_translation + +[source,rust] +---- +pub fn new_cloud_with_translation( + address_translation: HashMap, + credential: Credential +) -> Resultwhere + T: AsRef + Sync, + U: AsRef + Sync, +---- + +Creates a new TypeDB Cloud connection. + +[caption=""] +.Input parameters +[cols=",,"] +[options="header"] +|=== +|Name |Description |Type +a| `address_translation` a| Translation map from addresses received from the TypeDB server(s) to addresses to be used by the driver for connection a| `HashMapwhere + T: AsRef + Sync, + U: AsRef + Sync, +---- + +[caption=""] +.Code examples +[source,rust] +---- +Connection::new_cloud_with_translation( + [ + ("typedb-cloud.ext:11729", "localhost:11729"), + ("typedb-cloud.ext:21729", "localhost:21729"), + ("typedb-cloud.ext:31729", "localhost:31729"), + ].into(), + credential, +) +---- + [#_struct_Connection_new_core__address_impl_AsRef_str_] ==== new_core diff --git a/rust/docs/connection/ReplicaInfo.adoc b/rust/docs/connection/ReplicaInfo.adoc index 4afca870f3..f84cf2439f 100644 --- a/rust/docs/connection/ReplicaInfo.adoc +++ b/rust/docs/connection/ReplicaInfo.adoc @@ -14,9 +14,9 @@ The metadata and state of an individual raft replica of a database. [options="header"] |=== |Name |Type |Description -a| `address` a| `Address` a| The address of the server hosting this replica a| `is_preferred` a| `bool` a| Whether this is the preferred replica of the raft cluster. If true, Operations which can be run on any replica will prefer to use this replica. a| `is_primary` a| `bool` a| Whether this is the primary replica of the raft cluster. +a| `server` a| `String` a| The server hosting this replica a| `term` a| `i64` a| The raft protocol ‘term’ of this replica. |=== // end::properties[] diff --git a/rust/docs/errors/ConnectionError.adoc b/rust/docs/errors/ConnectionError.adoc index a054d17d61..4d243d8d39 100644 --- a/rust/docs/errors/ConnectionError.adoc +++ b/rust/docs/errors/ConnectionError.adoc @@ -8,6 +8,7 @@ [options="header"] |=== |Variant +a| `AddressTranslationMismatch` a| `BrokenPipe` a| `CloudAllNodesFailed` a| `CloudEndpointEncrypted` diff --git a/rust/docs/errors/InternalError.adoc b/rust/docs/errors/InternalError.adoc index a59d2cc829..78f7c480b5 100644 --- a/rust/docs/errors/InternalError.adoc +++ b/rust/docs/errors/InternalError.adoc @@ -13,7 +13,7 @@ a| `RecvError` a| `SendError` a| `UnexpectedRequestType` a| `UnexpectedResponseType` -a| `UnknownConnectionAddress` +a| `UnknownServer` |=== // end::enum_constants[] diff --git a/rust/src/common/error.rs b/rust/src/common/error.rs index d141e3d23a..359fab7ac0 100644 --- a/rust/src/common/error.rs +++ b/rust/src/common/error.rs @@ -17,12 +17,12 @@ * under the License. */ -use std::{error::Error as StdError, fmt}; +use std::{collections::HashSet, error::Error as StdError, fmt}; use tonic::{Code, Status}; use typeql::error_messages; -use super::{address::Address, RequestID}; +use super::RequestID; error_messages! { ConnectionError code: "CXN", type: "Connection Error", @@ -72,6 +72,8 @@ error_messages! { ConnectionError 22: "Connection failed. Please check the server is running and the address is accessible. Encrypted Cloud endpoints may also have misconfigured SSL certificates.", MissingPort { address: String } = 23: "Invalid URL '{address}': missing port.", + AddressTranslationMismatch { unknown: HashSet, unmapped: HashSet } = + 24: "Address translation map does not match the server's advertised address list. User-provided servers not in the advertised list: {unknown:?}. Advertised servers not mapped by user: {unmapped:?}.", } error_messages! { InternalError @@ -84,8 +86,8 @@ error_messages! { InternalError 3: "Unexpected request type for remote procedure call: {request_type}.", UnexpectedResponseType { response_type: String } = 4: "Unexpected response type for remote procedure call: {response_type}.", - UnknownConnectionAddress { address: Address } = - 5: "Received unrecognized address from the server: {address}.", + UnknownServer { server: String } = + 5: "Received replica at unrecognized server: {server}.", EnumOutOfBounds { value: i32, enum_name: &'static str } = 6: "Value '{value}' is out of bounds for enum '{enum_name}'.", } diff --git a/rust/src/common/info.rs b/rust/src/common/info.rs index 2ce6b74f33..3122fa050a 100644 --- a/rust/src/common/info.rs +++ b/rust/src/common/info.rs @@ -21,12 +21,10 @@ use std::time::Duration; use tokio::sync::mpsc::UnboundedSender; -use super::{address::Address, SessionID}; -use crate::common::Callback; +use super::{Callback, SessionID}; #[derive(Clone, Debug)] pub(crate) struct SessionInfo { - pub(crate) address: Address, pub(crate) session_id: SessionID, pub(crate) network_latency: Duration, pub(crate) on_close_register_sink: UnboundedSender, @@ -41,8 +39,8 @@ pub(crate) struct DatabaseInfo { /// The metadata and state of an individual raft replica of a database. #[derive(Debug)] pub struct ReplicaInfo { - /// The address of the server hosting this replica - pub address: Address, + /// The server hosting this replica + pub server: String, /// Whether this is the primary replica of the raft cluster. pub is_primary: bool, /// Whether this is the preferred replica of the raft cluster. diff --git a/rust/src/connection/connection.rs b/rust/src/connection/connection.rs index 29f636d1db..d664e98aef 100644 --- a/rust/src/connection/connection.rs +++ b/rust/src/connection/connection.rs @@ -57,7 +57,7 @@ use crate::{ /// A connection to a TypeDB server which serves as the starting point for all interaction. #[derive(Clone)] pub struct Connection { - server_connections: HashMap, + server_connections: HashMap, background_runtime: Arc, username: Option, is_cloud: bool, @@ -76,18 +76,21 @@ impl Connection { /// Connection::new_core("127.0.0.1:1729") /// ``` pub fn new_core(address: impl AsRef) -> Result { - let address: Address = address.as_ref().parse()?; + let id = address.as_ref().to_string(); + let address: Address = id.parse()?; let background_runtime = Arc::new(BackgroundRuntime::new()?); - let mut server_connection = ServerConnection::new_core(background_runtime.clone(), address)?; - let address = server_connection + let server_connection = ServerConnection::new_core(background_runtime.clone(), address)?; + + let advertised_id = server_connection .servers_all()? .into_iter() .exactly_one() - .map_err(|e| ConnectionError::ServerConnectionFailedStatusError { error: e.to_string() })?; - server_connection.set_address(address.clone()); + .map_err(|e| ConnectionError::ServerConnectionFailedStatusError { error: e.to_string() })? + .to_string(); + match server_connection.validate() { Ok(()) => Ok(Self { - server_connections: [(address, server_connection)].into(), + server_connections: [(advertised_id, server_connection)].into(), background_runtime, username: None, is_cloud: false, @@ -121,14 +124,79 @@ impl Connection { pub fn new_cloud + Sync>(init_addresses: &[T], credential: Credential) -> Result { let background_runtime = Arc::new(BackgroundRuntime::new()?); - let init_addresses = init_addresses.iter().map(|addr| addr.as_ref().parse()).try_collect()?; - let addresses = Self::fetch_current_addresses(background_runtime.clone(), init_addresses, credential.clone())?; + let servers = Self::fetch_server_list(background_runtime.clone(), init_addresses, credential.clone())?; - let server_connections: HashMap = addresses + let server_to_address: HashMap = servers .into_iter() .map(|address| { - ServerConnection::new_cloud(background_runtime.clone(), address.clone(), credential.clone()) - .map(|server_connection| (address, server_connection)) + let id = address.clone(); + address.parse().map(|address| (id, address)) + }) + .try_collect()?; + + Self::new_cloud_impl(server_to_address, background_runtime, credential) + } + + /// Creates a new TypeDB Cloud connection. + /// + /// # Arguments + /// + /// * `address_translation` -- Translation map from addresses received from the TypeDB server(s) + /// to addresses to be used by the driver for connection + /// * `credential` -- User credential and TLS encryption setting + /// + /// # Examples + /// + /// ```rust + /// Connection::new_cloud_with_translation( + /// [ + /// ("typedb-cloud.ext:11729", "localhost:11729"), + /// ("typedb-cloud.ext:21729", "localhost:21729"), + /// ("typedb-cloud.ext:31729", "localhost:31729"), + /// ].into(), + /// credential, + /// ) + /// ``` + pub fn new_cloud_with_translation(address_translation: HashMap, credential: Credential) -> Result + where + T: AsRef + Sync, + U: AsRef + Sync, + { + let background_runtime = Arc::new(BackgroundRuntime::new()?); + + let servers = + Self::fetch_server_list(background_runtime.clone(), address_translation.values(), credential.clone())?; + + let server_to_address: HashMap = address_translation + .into_iter() + .map(|(id, address)| { + let id = id.as_ref().to_owned(); + address.as_ref().parse().map(|address| (id, address)) + }) + .try_collect()?; + + let translated: HashSet = server_to_address.keys().cloned().collect(); + let unknown = &translated - &servers; + let unmapped = &servers - &translated; + if !unknown.is_empty() || !unmapped.is_empty() { + return Err(ConnectionError::AddressTranslationMismatch { unknown, unmapped }.into()); + } + + debug_assert_eq!(servers, translated); + + Self::new_cloud_impl(server_to_address, background_runtime, credential) + } + + fn new_cloud_impl( + server_to_address: HashMap, + background_runtime: Arc, + credential: Credential, + ) -> Result { + let server_connections: HashMap = server_to_address + .into_iter() + .map(|(server_id, address)| { + ServerConnection::new_cloud(background_runtime.clone(), address, credential.clone()) + .map(|server_connection| (server_id, server_connection)) }) .try_collect()?; @@ -142,20 +210,20 @@ impl Connection { Ok(Self { server_connections, background_runtime, - username: Some(credential.username().to_string()), + username: Some(credential.username().to_owned()), is_cloud: true, }) } } - fn fetch_current_addresses( + fn fetch_server_list( background_runtime: Arc, - addresses: Vec
, + addresses: impl IntoIterator> + Clone, credential: Credential, - ) -> Result> { - for address in &addresses { + ) -> Result> { + for address in addresses.clone() { let server_connection = - ServerConnection::new_cloud(background_runtime.clone(), address.clone(), credential.clone()); + ServerConnection::new_cloud(background_runtime.clone(), address.as_ref().parse()?, credential.clone()); match server_connection { Ok(server_connection) => match server_connection.servers_all() { Ok(servers) => return Ok(servers.into_iter().collect()), @@ -171,7 +239,7 @@ impl Connection { } } Err(ConnectionError::ServerConnectionFailed { - addresses: addresses.into_iter().map(|a| a.to_string()).collect::>().join(","), + addresses: addresses.into_iter().map(|addr| addr.as_ref().to_owned()).join(", "), } .into()) } @@ -215,27 +283,25 @@ impl Connection { self.server_connections.len() } - pub(crate) fn addresses(&self) -> impl Iterator { - self.server_connections.keys() + pub(crate) fn servers(&self) -> impl Iterator { + self.server_connections.keys().map(String::as_str) } - pub(crate) fn connection(&self, address: &Address) -> Result<&ServerConnection> { - self.server_connections - .get(address) - .ok_or_else(|| InternalError::UnknownConnectionAddress { address: address.clone() }.into()) + pub(crate) fn connection(&self, id: &str) -> Option<&ServerConnection> { + self.server_connections.get(id) } - pub(crate) fn connections(&self) -> impl Iterator + '_ { - self.server_connections.values() + pub(crate) fn connections(&self) -> impl Iterator + '_ { + self.server_connections.iter().map(|(id, conn)| (id.as_str(), conn)) } pub(crate) fn username(&self) -> Option<&str> { - self.username.as_ref().map(|s| s.as_ref()) + self.username.as_deref() } pub(crate) fn unable_to_connect_error(&self) -> Error { Error::Connection(ConnectionError::ServerConnectionFailedStatusError { - error: self.addresses().map(Address::to_string).collect::>().join(", "), + error: self.servers().map(str::to_owned).collect_vec().join(", "), }) } } @@ -248,7 +314,6 @@ impl fmt::Debug for Connection { #[derive(Clone)] pub(crate) struct ServerConnection { - address: Address, background_runtime: Arc, open_sessions: Arc>>>, request_transmitter: Arc, @@ -256,14 +321,13 @@ pub(crate) struct ServerConnection { impl ServerConnection { fn new_core(background_runtime: Arc, address: Address) -> Result { - let request_transmitter = Arc::new(RPCTransmitter::start_core(address.clone(), &background_runtime)?); - Ok(Self { address, background_runtime, open_sessions: Default::default(), request_transmitter }) + let request_transmitter = Arc::new(RPCTransmitter::start_core(address, &background_runtime)?); + Ok(Self { background_runtime, open_sessions: Default::default(), request_transmitter }) } fn new_cloud(background_runtime: Arc, address: Address, credential: Credential) -> Result { - let request_transmitter = - Arc::new(RPCTransmitter::start_cloud(address.clone(), credential, &background_runtime)?); - Ok(Self { address, background_runtime, open_sessions: Default::default(), request_transmitter }) + let request_transmitter = Arc::new(RPCTransmitter::start_cloud(address, credential, &background_runtime)?); + Ok(Self { background_runtime, open_sessions: Default::default(), request_transmitter }) } pub(crate) fn validate(&self) -> Result { @@ -273,14 +337,6 @@ impl ServerConnection { } } - fn set_address(&mut self, address: Address) { - self.address = address; - } - - pub(crate) fn address(&self) -> &Address { - &self.address - } - #[cfg_attr(feature = "sync", maybe_async::must_be_sync)] async fn request(&self, request: Request) -> Result { if !self.background_runtime.is_open() { @@ -304,7 +360,7 @@ impl ServerConnection { self.request_transmitter.force_close() } - pub(crate) fn servers_all(&self) -> Result> { + pub(crate) fn servers_all(&self) -> Result> { match self.request_blocking(Request::ServersAll)? { Response::ServersAll { servers } => Ok(servers), other => Err(InternalError::UnexpectedResponseType { response_type: format!("{other:?}") }.into()), @@ -392,7 +448,6 @@ impl ServerConnection { pulse_shutdown_source, )); Ok(SessionInfo { - address: self.address.clone(), session_id, network_latency: start.elapsed().saturating_sub(server_duration), on_close_register_sink, @@ -506,10 +561,7 @@ impl ServerConnection { impl fmt::Debug for ServerConnection { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ServerConnection") - .field("address", &self.address) - .field("open_sessions", &self.open_sessions) - .finish() + f.debug_struct("ServerConnection").field("open_sessions", &self.open_sessions).finish() } } diff --git a/rust/src/connection/message.rs b/rust/src/connection/message.rs index a710dd748f..03bf77f2d7 100644 --- a/rust/src/connection/message.rs +++ b/rust/src/connection/message.rs @@ -26,7 +26,7 @@ use typeql::pattern::{Conjunction, Statement}; use crate::{ answer::{readable_concept, ConceptMap, ConceptMapGroup, ValueGroup}, - common::{address::Address, info::DatabaseInfo, RequestID, SessionID, IID}, + common::{info::DatabaseInfo, RequestID, SessionID, IID}, concept::{ Annotation, Attribute, AttributeType, Entity, EntityType, Relation, RelationType, RoleType, SchemaException, Thing, ThingType, Transitivity, Value, ValueType, @@ -73,7 +73,7 @@ pub(super) enum Response { ConnectionOpen, ServersAll { - servers: Vec
, + servers: Vec, }, DatabasesContains { diff --git a/rust/src/connection/network/proto/database.rs b/rust/src/connection/network/proto/database.rs index 471d116619..c9d9f0734a 100644 --- a/rust/src/connection/network/proto/database.rs +++ b/rust/src/connection/network/proto/database.rs @@ -17,31 +17,24 @@ * under the License. */ -use itertools::Itertools; use typedb_protocol::{database_replicas::Replica as ReplicaProto, DatabaseReplicas as DatabaseProto}; -use super::TryFromProto; -use crate::{ - common::info::{DatabaseInfo, ReplicaInfo}, - Result, -}; +use super::FromProto; +use crate::common::info::{DatabaseInfo, ReplicaInfo}; -impl TryFromProto for DatabaseInfo { - fn try_from_proto(proto: DatabaseProto) -> Result { - Ok(Self { - name: proto.name, - replicas: proto.replicas.into_iter().map(ReplicaInfo::try_from_proto).try_collect()?, - }) +impl FromProto for DatabaseInfo { + fn from_proto(proto: DatabaseProto) -> Self { + Self { name: proto.name, replicas: proto.replicas.into_iter().map(ReplicaInfo::from_proto).collect() } } } -impl TryFromProto for ReplicaInfo { - fn try_from_proto(proto: ReplicaProto) -> Result { - Ok(Self { - address: proto.address.parse()?, +impl FromProto for ReplicaInfo { + fn from_proto(proto: ReplicaProto) -> Self { + Self { + server: proto.address, // TODO should be eventually replaced by "server_id" or "server_name" in protocol is_primary: proto.primary, is_preferred: proto.preferred, term: proto.term, - }) + } } } diff --git a/rust/src/connection/network/proto/message.rs b/rust/src/connection/network/proto/message.rs index 7adad840f5..a94cc152bd 100644 --- a/rust/src/connection/network/proto/message.rs +++ b/rust/src/connection/network/proto/message.rs @@ -245,10 +245,10 @@ impl FromProto for Response { } } -impl TryFromProto for Response { - fn try_from_proto(proto: server_manager::all::Res) -> Result { - let servers = proto.servers.into_iter().map(|server| server.address.parse()).try_collect()?; - Ok(Self::ServersAll { servers }) +impl FromProto for Response { + fn from_proto(proto: server_manager::all::Res) -> Self { + let servers = proto.servers.into_iter().map(|server| server.address).collect(); + Self::ServersAll { servers } } } @@ -267,18 +267,16 @@ impl FromProto for Response { impl TryFromProto for Response { fn try_from_proto(proto: database_manager::get::Res) -> Result { Ok(Self::DatabaseGet { - database: DatabaseInfo::try_from_proto( + database: DatabaseInfo::from_proto( proto.database.ok_or(ConnectionError::MissingResponseField { field: "database" })?, - )?, + ), }) } } -impl TryFromProto for Response { - fn try_from_proto(proto: database_manager::all::Res) -> Result { - Ok(Self::DatabasesAll { - databases: proto.databases.into_iter().map(DatabaseInfo::try_from_proto).try_collect()?, - }) +impl FromProto for Response { + fn from_proto(proto: database_manager::all::Res) -> Self { + Self::DatabasesAll { databases: proto.databases.into_iter().map(DatabaseInfo::from_proto).collect() } } } diff --git a/rust/src/connection/network/transmitter/rpc.rs b/rust/src/connection/network/transmitter/rpc.rs index a867d305f3..2d66aa7ea0 100644 --- a/rust/src/connection/network/transmitter/rpc.rs +++ b/rust/src/connection/network/transmitter/rpc.rs @@ -121,7 +121,7 @@ impl RPCTransmitter { match request { Request::ConnectionOpen => rpc.connection_open(request.try_into_proto()?).await.map(Response::from_proto), - Request::ServersAll => rpc.servers_all(request.try_into_proto()?).await.and_then(Response::try_from_proto), + Request::ServersAll => rpc.servers_all(request.try_into_proto()?).await.map(Response::from_proto), Request::DatabasesContains { .. } => { rpc.databases_contains(request.try_into_proto()?).await.map(Response::from_proto) @@ -132,9 +132,7 @@ impl RPCTransmitter { Request::DatabaseGet { .. } => { rpc.databases_get(request.try_into_proto()?).await.and_then(Response::try_from_proto) } - Request::DatabasesAll => { - rpc.databases_all(request.try_into_proto()?).await.and_then(Response::try_from_proto) - } + Request::DatabasesAll => rpc.databases_all(request.try_into_proto()?).await.map(Response::from_proto), Request::DatabaseDelete { .. } => { rpc.database_delete(request.try_into_proto()?).await.map(Response::from_proto) diff --git a/rust/src/database/database.rs b/rust/src/database/database.rs index a8d8e8a178..8cca837e89 100644 --- a/rust/src/database/database.rs +++ b/rust/src/database/database.rs @@ -21,17 +21,16 @@ use std::future::Future; use std::{fmt, sync::RwLock, thread::sleep, time::Duration}; -use itertools::Itertools; use log::{debug, error}; use crate::{ common::{ - address::Address, error::ConnectionError, info::{DatabaseInfo, ReplicaInfo}, Error, Result, }, connection::ServerConnection, + error::InternalError, Connection, }; @@ -104,10 +103,6 @@ impl Database { self.preferred_replica().map(|replica| replica.to_info()) } - pub(super) fn connection(&self) -> &Connection { - &self.connection - } - /// Deletes this database. /// /// # Examples @@ -118,7 +113,7 @@ impl Database { /// ``` #[cfg_attr(feature = "sync", maybe_async::must_be_sync)] pub async fn delete(self) -> Result { - self.run_on_primary_replica(|database, _| database.delete()).await + self.run_on_primary_replica(|database| database.delete()).await } /// Returns a full schema text as a valid TypeQL define query string. @@ -131,7 +126,7 @@ impl Database { /// ``` #[cfg_attr(feature = "sync", maybe_async::must_be_sync)] pub async fn schema(&self) -> Result { - self.run_failsafe(|database, _| async move { database.schema().await }).await + self.run_failsafe(|database| async move { database.schema().await }).await } /// Returns the types in the schema as a valid TypeQL define query string. @@ -144,7 +139,7 @@ impl Database { /// ``` #[cfg_attr(feature = "sync", maybe_async::must_be_sync)] pub async fn type_schema(&self) -> Result { - self.run_failsafe(|database, _| async move { database.type_schema().await }).await + self.run_failsafe(|database| async move { database.type_schema().await }).await } /// Returns the rules in the schema as a valid TypeQL define query string. @@ -157,13 +152,13 @@ impl Database { /// ``` #[cfg_attr(feature = "sync", maybe_async::must_be_sync)] pub async fn rule_schema(&self) -> Result { - self.run_failsafe(|database, _| async move { database.rule_schema().await }).await + self.run_failsafe(|database| async move { database.rule_schema().await }).await } #[cfg_attr(feature = "sync", maybe_async::must_be_sync)] pub(crate) async fn run_failsafe(&self, task: F) -> Result where - F: Fn(ServerDatabase, ServerConnection) -> P, + F: Fn(ServerDatabase) -> P, P: Future>, { match self.run_on_any_replica(&task).await { @@ -178,16 +173,16 @@ impl Database { #[cfg_attr(feature = "sync", maybe_async::must_be_sync)] pub(super) async fn run_on_any_replica(&self, task: F) -> Result where - F: Fn(ServerDatabase, ServerConnection) -> P, + F: Fn(ServerDatabase) -> P, P: Future>, { let replicas = self.replicas.read().unwrap().clone(); for replica in replicas { - match task(replica.database.clone(), self.connection.connection(&replica.address)?.clone()).await { + match task(replica.database.clone()).await { Err(Error::Connection( ConnectionError::ServerConnectionFailedStatusError { .. } | ConnectionError::ConnectionFailed, )) => { - debug!("Unable to connect to {}. Attempting next server.", replica.address); + debug!("Unable to connect to {}. Attempting next server.", replica.server); } res => return res, } @@ -198,16 +193,14 @@ impl Database { #[cfg_attr(feature = "sync", maybe_async::must_be_sync)] pub(super) async fn run_on_primary_replica(&self, task: F) -> Result where - F: Fn(ServerDatabase, ServerConnection) -> P, + F: Fn(ServerDatabase) -> P, P: Future>, { let mut primary_replica = if let Some(replica) = self.primary_replica() { replica } else { self.seek_primary_replica().await? }; for _ in 0..Self::PRIMARY_REPLICA_TASK_MAX_RETRIES { - match task(primary_replica.database.clone(), self.connection.connection(&primary_replica.address)?.clone()) - .await - { + match task(primary_replica.database.clone()).await { Err(Error::Connection( ConnectionError::CloudReplicaNotPrimary | ConnectionError::ServerConnectionFailedStatusError { .. } @@ -260,8 +253,8 @@ impl fmt::Debug for Database { /// The metadata and state of an individual raft replica of a database. #[derive(Clone)] pub(super) struct Replica { - /// Retrieves address of the server hosting this replica - address: Address, + /// The server hosting this replica + server: String, /// Retrieves the database name for which this is a replica database_name: String, /// Checks whether this is the primary replica of the raft cluster. @@ -277,7 +270,7 @@ pub(super) struct Replica { impl Replica { fn new(name: String, metadata: ReplicaInfo, server_connection: ServerConnection) -> Self { Self { - address: metadata.address, + server: metadata.server, database_name: name.clone(), is_primary: metadata.is_primary, term: metadata.term, @@ -291,15 +284,17 @@ impl Replica { .replicas .into_iter() .map(|replica| { - let server_connection = connection.connection(&replica.address)?.clone(); - Ok(Self::new(database_info.name.clone(), replica, server_connection)) + let server_connection = connection + .connection(&replica.server) + .ok_or_else(|| InternalError::UnknownServer { server: replica.server.clone() })?; + Ok(Self::new(database_info.name.clone(), replica, server_connection.clone())) }) - .try_collect() + .collect() } fn to_info(&self) -> ReplicaInfo { ReplicaInfo { - address: self.address.clone(), + server: self.server.clone(), is_primary: self.is_primary, is_preferred: self.is_preferred, term: self.term, @@ -308,7 +303,7 @@ impl Replica { #[cfg_attr(feature = "sync", maybe_async::must_be_sync)] async fn fetch_all(name: String, connection: Connection) -> Result> { - for server_connection in connection.connections() { + for (server, server_connection) in connection.connections() { let res = server_connection.get_database_replicas(name.clone()).await; match res { Ok(info) => { @@ -321,8 +316,7 @@ impl Replica { )) => { error!( "Failed to fetch replica info for database '{}' from {}. Attempting next server.", - name, - server_connection.address() + name, server ); } Err(err) => return Err(err), @@ -335,7 +329,7 @@ impl Replica { impl fmt::Debug for Replica { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Replica") - .field("address", &self.address) + .field("server", &self.server) .field("database_name", &self.database_name) .field("is_primary", &self.is_primary) .field("term", &self.term) @@ -359,7 +353,7 @@ impl ServerDatabase { self.name.as_str() } - pub(super) fn connection(&self) -> &ServerConnection { + pub(crate) fn connection(&self) -> &ServerConnection { &self.connection } diff --git a/rust/src/database/database_manager.rs b/rust/src/database/database_manager.rs index c6676f6abe..df87df823a 100644 --- a/rust/src/database/database_manager.rs +++ b/rust/src/database/database_manager.rs @@ -109,12 +109,12 @@ impl DatabaseManager { #[cfg_attr(feature = "sync", maybe_async::must_be_sync)] pub async fn all(&self) -> Result> { let mut error_buffer = Vec::with_capacity(self.connection.server_count()); - for server_connection in self.connection.connections() { + for (server_id, server_connection) in self.connection.connections() { match server_connection.all_databases().await { Ok(list) => { return list.into_iter().map(|db_info| Database::new(db_info, self.connection.clone())).collect() } - Err(err) => error_buffer.push(format!("- {}: {}", server_connection.address(), err)), + Err(err) => error_buffer.push(format!("- {}: {}", server_id, err)), } } Err(ConnectionError::ServerConnectionFailedWithError { error: error_buffer.join("\n") })? @@ -127,20 +127,20 @@ impl DatabaseManager { P: Future>, { let mut error_buffer = Vec::with_capacity(self.connection.server_count()); - for server_connection in self.connection.connections() { + for (server_id, server_connection) in self.connection.connections() { match task(server_connection.clone(), name.clone()).await { Ok(res) => return Ok(res), Err(Error::Connection(ConnectionError::CloudReplicaNotPrimary)) => { return Database::get(name, self.connection.clone()) .await? - .run_on_primary_replica(|database, server_connection| { + .run_on_primary_replica(|database| { let task = &task; - async move { task(server_connection, database.name().to_owned()).await } + async move { task(database.connection().clone(), database.name().to_owned()).await } }) .await } err @ Err(Error::Connection(ConnectionError::ConnectionIsClosed)) => return err, - Err(err) => error_buffer.push(format!("- {}: {}", server_connection.address(), err)), + Err(err) => error_buffer.push(format!("- {}: {}", server_id, err)), } } Err(ConnectionError::ServerConnectionFailedWithError { error: error_buffer.join("\n") })? diff --git a/rust/src/database/session.rs b/rust/src/database/session.rs index 4d772f1e40..60093bfe14 100644 --- a/rust/src/database/session.rs +++ b/rust/src/database/session.rs @@ -25,6 +25,7 @@ use log::warn; use crate::{ common::{error::ConnectionError, info::SessionInfo, Result, SessionType, TransactionType}, + connection::ServerConnection, Database, Options, Transaction, }; @@ -33,19 +34,25 @@ type Callback = Box; /// A session with a TypeDB database. pub struct Session { database: Database, - server_session_info: RwLock, + server_session: RwLock, session_type: SessionType, is_open: Arc>, on_close: Arc>>, on_reopen: Mutex>, } +#[derive(Clone, Debug)] +struct ServerSession { + connection: ServerConnection, + info: SessionInfo, +} + impl fmt::Debug for Session { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Session") .field("database", &self.database) .field("session_type", &self.session_type) - .field("server_session_info", &self.server_session_info) + .field("server_session", &self.server_session) .field("is_open", &self.is_open) .finish() } @@ -83,9 +90,11 @@ impl Session { /// ``` #[cfg_attr(feature = "sync", maybe_async::must_be_sync)] pub async fn new_with_options(database: Database, session_type: SessionType, options: Options) -> Result { - let server_session_info = database - .run_failsafe(|database, _| async move { - database.connection().open_session(database.name().to_owned(), session_type, options).await + let server_session = database + .run_failsafe(|database| async move { + let session_info = + database.connection().open_session(database.name().to_owned(), session_type, options).await?; + Ok(ServerSession { connection: database.connection().clone(), info: session_info }) }) .await?; @@ -94,12 +103,12 @@ impl Session { let is_open = is_open.clone(); move || is_open.store(false) })])); - register_persistent_on_close(&server_session_info, on_close.clone()); + register_persistent_on_close(&server_session.info, on_close.clone()); Ok(Self { database, session_type, - server_session_info: RwLock::new(server_session_info), + server_session: RwLock::new(server_session), is_open, on_close, on_reopen: Mutex::default(), @@ -142,9 +151,8 @@ impl Session { /// ``` pub fn force_close(&self) -> Result { if self.is_open.compare_exchange(true, false).is_ok() { - let session_info = self.server_session_info.write().unwrap(); - let connection = self.database.connection().connection(&session_info.address).unwrap(); - connection.close_session(session_info.session_id.clone())?; + let server_session = self.server_session.write().unwrap(); + server_session.connection.close_session(server_session.info.session_id.clone())?; } Ok(()) } @@ -165,8 +173,8 @@ impl Session { } fn on_server_session_close(&self, callback: impl FnOnce() + Send + 'static) { - let session_info = self.server_session_info.write().unwrap(); - session_info.on_close_register_sink.send(Box::new(callback)).ok(); + let server_session = self.server_session.write().unwrap(); + server_session.info.on_close_register_sink.send(Box::new(callback)).ok(); } /// Registers a callback function which will be executed when this session is reopened. @@ -188,8 +196,8 @@ impl Session { fn reopened(&self) { self.on_reopen.lock().unwrap().iter_mut().for_each(|callback| (callback)()); - let session_info = self.server_session_info.write().unwrap(); - register_persistent_on_close(&session_info, self.on_close.clone()); + let server_session = self.server_session.write().unwrap(); + register_persistent_on_close(&server_session.info, self.on_close.clone()); } /// Opens a transaction to perform read or write queries on the database connected to the session. @@ -222,8 +230,8 @@ impl Session { return Err(ConnectionError::SessionIsClosed.into()); } - let SessionInfo { address, session_id, network_latency, .. } = self.server_session_info.read().unwrap().clone(); - let server_connection = &self.database.connection().connection(&address)?; + let ServerSession { connection: server_connection, info: SessionInfo { session_id, network_latency, .. } } = + self.server_session.read().unwrap().clone(); let (transaction_stream, transaction_shutdown_sink) = match server_connection .open_transaction(session_id.clone(), transaction_type, options, network_latency) @@ -234,16 +242,16 @@ impl Session { self.is_open.store(false); server_connection.close_session(session_id).ok(); - let (session_info, (transaction_stream, transaction_shutdown_sink)) = self + let (server_session, (transaction_stream, transaction_shutdown_sink)) = self .database - .run_failsafe(|database, _| { + .run_failsafe(|database| { let session_type = self.session_type; async move { let connection = database.connection(); let database_name = database.name().to_owned(); let session_info = connection.open_session(database_name, session_type, options).await?; Ok(( - session_info.clone(), + ServerSession { connection: connection.clone(), info: session_info.clone() }, connection .open_transaction( session_info.session_id, @@ -256,7 +264,7 @@ impl Session { } }) .await?; - *self.server_session_info.write().unwrap() = session_info; + *self.server_session.write().unwrap() = server_session; self.is_open.store(true); self.reopened(); (transaction_stream, transaction_shutdown_sink) diff --git a/rust/src/user/user.rs b/rust/src/user/user.rs index a3f9582dbd..292cda4fd8 100644 --- a/rust/src/user/user.rs +++ b/rust/src/user/user.rs @@ -52,13 +52,13 @@ impl User { let password_old = password_old.into(); let password_new = password_new.into(); let mut error_buffer = Vec::with_capacity(connection.server_count()); - for server_connection in connection.connections() { + for (server_id, server_connection) in connection.connections() { match server_connection .update_user_password(self.username.clone(), password_old.clone(), password_new.clone()) .await { Ok(()) => return Ok(()), - Err(err) => error_buffer.push(format!("- {}: {}", server_connection.address(), err)), + Err(err) => error_buffer.push(format!("- {}: {}", server_id, err)), } } Err(ConnectionError::CloudAllNodesFailed { errors: error_buffer.join("\n") })? diff --git a/rust/src/user/user_manager.rs b/rust/src/user/user_manager.rs index fe48b9ca52..229013b618 100644 --- a/rust/src/user/user_manager.rs +++ b/rust/src/user/user_manager.rs @@ -190,7 +190,7 @@ impl UserManager { DatabaseManager::new(self.connection.clone()) .get(Self::SYSTEM_DB) .await? - .run_failsafe(|_, server_connection| task(server_connection)) + .run_failsafe(|database| task(database.connection().clone())) .await } } diff --git a/rust/tests/integration/mod.rs b/rust/tests/integration/mod.rs index 31cac660b8..fd618d7845 100644 --- a/rust/tests/integration/mod.rs +++ b/rust/tests/integration/mod.rs @@ -19,5 +19,6 @@ mod common; mod logic; +mod network; mod queries; mod runtimes; diff --git a/rust/tests/integration/network.rs b/rust/tests/integration/network.rs new file mode 100644 index 0000000000..250e4199ba --- /dev/null +++ b/rust/tests/integration/network.rs @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use std::path::PathBuf; + +use futures::StreamExt; +use serial_test::serial; +use typedb_driver::{Connection, Credential, DatabaseManager, Session, SessionType::Data, TransactionType::Write}; + +use super::common; + +#[tokio::test] +#[serial] +async fn address_translation() { + let connection = Connection::new_cloud_with_translation( + [ + ("localhost:11729", "localhost:11729"), + ("localhost:21729", "localhost:21729"), + ("localhost:31729", "localhost:31729"), + ] + .into(), + Credential::with_tls( + "admin", + "password", + Some(&PathBuf::from( + std::env::var("ROOT_CA").expect("ROOT_CA environment variable needs to be set for cloud tests to run"), + )), + ) + .unwrap(), + ) + .unwrap(); + + common::create_test_database_with_schema(connection.clone(), "define person sub entity;").await.unwrap(); + let databases = DatabaseManager::new(connection); + assert!(databases.contains(common::TEST_DATABASE).await.unwrap()); + + let session = Session::new(databases.get(common::TEST_DATABASE).await.unwrap(), Data).await.unwrap(); + let transaction = session.transaction(Write).await.unwrap(); + let answer_stream = transaction.query().get("match $x sub thing; get;").unwrap(); + let results: Vec<_> = answer_stream.collect().await; + transaction.commit().await.unwrap(); + assert_eq!(results.len(), 5); + assert!(results.into_iter().all(|res| res.is_ok())); +}