From ef658ffdc4b13ae688da65ec379e34553b3ca4d8 Mon Sep 17 00:00:00 2001 From: Victor Savkov Date: Sat, 2 Dec 2023 17:47:11 -0500 Subject: [PATCH] Release 0.1.0 --- README.md | 5 +- apps/aspike_server/README.md | 2 +- .../include/aspike_node_test.hrl | 11 + .../aspike_server/include/aspike_protocol.hrl | 154 ++++++ apps/aspike_server/include/aspike_status.hrl | 96 ++++ apps/aspike_server/lib/application.ex | 20 + apps/aspike_server/lib/aspike_server.ex | 37 +- .../lib/aspike_server_command.ex | 90 ++++ .../lib/aspike_server_processor.ex | 31 ++ apps/aspike_server/lib/aspike_status.ex | 8 + apps/aspike_server/mix.exs | 8 +- apps/aspike_server/src/aspike_receive.erl | 28 + .../src/aspike_server_processor.erl | 45 ++ .../src/aspike_server_protocol.erl | 119 +++++ .../aspike_server/test/aspike_server_test.exs | 490 +++++++++++++++++- .../.formatter.exs | 4 + apps/aspike_server_text_protocol/.gitignore | 26 + apps/aspike_server_text_protocol/README.md | 21 + .../lib/application.ex | 20 + .../lib/aspike_text_server.ex | 25 + .../lib/aspike_text_server_command.ex | 150 ++++++ .../lib/aspike_text_server_processor.ex | 46 ++ apps/aspike_server_text_protocol/mix.exs | 35 ++ .../test/aspike_text_server_command_test.exs | 27 + .../test/aspike_text_server_test.exs | 65 +++ .../test/test_helper.exs | 1 + apps/aspike_storage/README.md | 2 +- apps/aspike_storage/lib/aspike_ns.ex | 62 +++ apps/aspike_storage/lib/aspike_ns_registry.ex | 83 +++ apps/aspike_storage/lib/aspike_ns_sup.ex | 26 + apps/aspike_storage/lib/aspike_storage.ex | 21 +- apps/aspike_storage/lib/aspike_storage_sup.ex | 25 + apps/aspike_storage/mix.exs | 3 +- .../test/aspike_ns_registry_test.exs | 57 ++ apps/aspike_storage/test/aspike_ns_test.exs | 82 +++ .../test/aspike_storage_test.exs | 8 - mix.lock | 3 + 37 files changed, 1888 insertions(+), 48 deletions(-) create mode 100644 apps/aspike_server/include/aspike_node_test.hrl create mode 100644 apps/aspike_server/include/aspike_protocol.hrl create mode 100644 apps/aspike_server/include/aspike_status.hrl create mode 100644 apps/aspike_server/lib/application.ex create mode 100644 apps/aspike_server/lib/aspike_server_command.ex create mode 100644 apps/aspike_server/lib/aspike_server_processor.ex create mode 100644 apps/aspike_server/lib/aspike_status.ex create mode 100644 apps/aspike_server/src/aspike_receive.erl create mode 100644 apps/aspike_server/src/aspike_server_processor.erl create mode 100644 apps/aspike_server/src/aspike_server_protocol.erl create mode 100644 apps/aspike_server_text_protocol/.formatter.exs create mode 100644 apps/aspike_server_text_protocol/.gitignore create mode 100644 apps/aspike_server_text_protocol/README.md create mode 100644 apps/aspike_server_text_protocol/lib/application.ex create mode 100644 apps/aspike_server_text_protocol/lib/aspike_text_server.ex create mode 100644 apps/aspike_server_text_protocol/lib/aspike_text_server_command.ex create mode 100644 apps/aspike_server_text_protocol/lib/aspike_text_server_processor.ex create mode 100644 apps/aspike_server_text_protocol/mix.exs create mode 100644 apps/aspike_server_text_protocol/test/aspike_text_server_command_test.exs create mode 100644 apps/aspike_server_text_protocol/test/aspike_text_server_test.exs create mode 100644 apps/aspike_server_text_protocol/test/test_helper.exs create mode 100644 apps/aspike_storage/lib/aspike_ns.ex create mode 100644 apps/aspike_storage/lib/aspike_ns_registry.ex create mode 100644 apps/aspike_storage/lib/aspike_ns_sup.ex create mode 100644 apps/aspike_storage/lib/aspike_storage_sup.ex create mode 100644 apps/aspike_storage/test/aspike_ns_registry_test.exs create mode 100644 apps/aspike_storage/test/aspike_ns_test.exs delete mode 100644 apps/aspike_storage/test/aspike_storage_test.exs create mode 100644 mix.lock diff --git a/README.md b/README.md index aad3a16..a8112c7 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,3 @@ -# AspikeServerUmbrella - -**TODO: Add description** +# Aerospike Server Emulator +*Uses Aerospike Binary Protocol* - https://github.com/vsavkov/aspike-protocol diff --git a/apps/aspike_server/README.md b/apps/aspike_server/README.md index f712aa8..e74ff26 100644 --- a/apps/aspike_server/README.md +++ b/apps/aspike_server/README.md @@ -1,6 +1,6 @@ # AspikeServer -**TODO: Add description** +**Binary Aerospike Cluster protocol - Aerospike Server Emulator, in Erlang/Elixir** ## Installation diff --git a/apps/aspike_server/include/aspike_node_test.hrl b/apps/aspike_server/include/aspike_node_test.hrl new file mode 100644 index 0000000..4074636 --- /dev/null +++ b/apps/aspike_server/include/aspike_node_test.hrl @@ -0,0 +1,11 @@ +-define(TEST_NODE_NAME_0, "A0"). +-define(TEST_ADDRESS, "127.0.0.1"). +-define(TEST_PORT, 62169). + +-define(TEST_USER1, <<"User1">>). +-define(TEST_PASSWORD1, "pass1"). +-define(TEST_PASSWORD1_CRYPT, <<"$2a$10$7EqJtq98hPqEX7fNZaFWoOOY1Ba9.gZNwHJkrSKJl7mXQyPCsCrQa">>). +-define(TEST_SESSION_TTL, 12345). +-define(TEST_SESSION_TOKEN1, <<"test_session_token1">>). + +-define(TEST_NAMESPACE, "Test-Namespace"). \ No newline at end of file diff --git a/apps/aspike_server/include/aspike_protocol.hrl b/apps/aspike_server/include/aspike_protocol.hrl new file mode 100644 index 0000000..7f2e76d --- /dev/null +++ b/apps/aspike_server/include/aspike_protocol.hrl @@ -0,0 +1,154 @@ +%% Proto header version +-define(AS_PROTO_VERSION, 2). + +%% Proto message types +-define(AS_INFO_MESSAGE_TYPE, 1). +-define(AS_ADMIN_MESSAGE_TYPE, 2). +-define(AS_MESSAGE_TYPE, 3). +-define(AS_COMPRESSED_MESSAGE_TYPE, 4). +-define(PROTO_SIZE_MAX, (128 * 1024 * 1024)). % 2^27 = 128 MB + +%% admin Commands +-define(LOGIN, 20). % 0x14 = 16#14. + +%% Field IDs in admin Commands +-define(USER, 0). +-define(CREDENTIAL, 3). +-define(SESSION_TOKEN, 5). +-define(SESSION_TTL, 6). + +-define(AS_PROTO_DISTANCE_BTW_SIZE_AND_FIELDS, 16). + +%% Field IDs +-define(AS_FIELD_NAMESPACE, 0). +-define(AS_FIELD_SETNAME, 1). +-define(AS_FIELD_KEY, 2). +-define(AS_FIELD_DIGEST, 4). +-define(AS_FIELD_TASK_ID, 7). +-define(AS_FIELD_SOCKET_TIMEOUT, 9). +-define(AS_FIELD_RPS, 10). +-define(AS_FIELD_PID_ARRAY, 11). +-define(AS_FIELD_DIGEST_ARRAY, 12). +-define(AS_FIELD_MAX_RECORDS, 13). +-define(AS_FIELD_BVAL_ARRAY, 15). +-define(AS_FIELD_INDEX_RANGE, 22). +-define(AS_FIELD_INDEX_CONTEXT, 23). +-define(AS_FIELD_INDEX_TYPE, 26). +-define(AS_FIELD_UDF_PACKAGE_NAME, 30). +-define(AS_FIELD_UDF_FUNCTION, 31). +-define(AS_FIELD_UDF_ARGLIST, 32). +-define(AS_FIELD_UDF_OP, 33). +-define(AS_FIELD_QUERY_BINS, 40). +-define(AS_FIELD_BATCH_INDEX, 41). +-define(AS_FIELD_FILTER, 43). + +%% Message info1 bits +-define(AS_MSG_INFO1_READ, (1 bsl 0)). % contains a read operation +-define(AS_MSG_INFO1_GET_ALL, (1 bsl 1)). % get all bins, period +-define(AS_MSG_INFO1_SHORT_QUERY, (1 bsl 2)). % short query +-define(AS_MSG_INFO1_BATCH_INDEX, (1 bsl 3)). % batch +-define(AS_MSG_INFO1_XDR, (1 bsl 4)). % operation is being performed by XDR +-define(AS_MSG_INFO1_GET_NOBINDATA, (1 bsl 5)). % do not get information about bins and its data +-define(AS_MSG_INFO1_READ_MODE_AP_ALL, (1 bsl 6)). % read mode all for AP namespaces. +-define(AS_MSG_INFO1_COMPRESS_RESPONSE, (1 bsl 7)). % tell server to compress it's response. + +%% Message info2 bits +-define(AS_MSG_INFO2_WRITE, (1 bsl 0)). % contains a write semantic +-define(AS_MSG_INFO2_DELETE, (1 bsl 1)). % delete record +-define(AS_MSG_INFO2_GENERATION, (1 bsl 2)). % pay attention to the generation +-define(AS_MSG_INFO2_GENERATION_GT, (1 bsl 3)). % apply write if new generation >= old, good for restore +-define(AS_MSG_INFO2_DURABLE_DELETE, (1 bsl 4)). % transaction resulting in record deletion leaves tombstone (Enterprise only). +-define(AS_MSG_INFO2_CREATE_ONLY, (1 bsl 5)). % write record only if it doesn't exist +%% (Note: Bit 6 is unused.) +-define(AS_MSG_INFO2_RESPOND_ALL_OPS, (1 bsl 7)). % return a result for every operation. + +%% Message info3 bits +-define(AS_MSG_INFO3_LAST, (1 bsl 0)). % this is the last of a multi-part message +-define(AS_MSG_INFO3_COMMIT_MASTER, (1 bsl 1)). % write commit level - bit 0 +%% On send: Do not return partition done in scan/query. +%% On receive: Specified partition is done in scan/query. +-define(AS_MSG_INFO3_PARTITION_DONE, (1 bsl 2)). +-define(AS_MSG_INFO3_UPDATE_ONLY, (1 bsl 3)). % update existing record only, do not create new record +-define(AS_MSG_INFO3_CREATE_OR_REPLACE, (1 bsl 4)). % completely replace existing record, or create new record +-define(AS_MSG_INFO3_REPLACE_ONLY, (1 bsl 5)). % completely replace existing record, do not create new record +-define(AS_MSG_INFO3_SC_READ_TYPE, (1 bsl 6)). % see aerospike-client-c for details +-define(AS_MSG_INFO3_SC_READ_RELAX, (1 bsl 7)). % see aerospike-client-c for details + + +-define(AS_OPERATOR_READ, 1). +-define(AS_OPERATOR_WRITE, 2). +-define(AS_OPERATOR_CDT_READ, 3). +-define(AS_OPERATOR_CDT_MODIFY, 4). +-define(AS_OPERATOR_MAP_READ, 5). +-define(AS_OPERATOR_MAP_MODIFY, 6). +-define(AS_OPERATOR_INCR, 7). +-define(AS_OPERATOR_EXP_READ, 8). +-define(AS_OPERATOR_EXP_MODIFY, 9). +-define(AS_OPERATOR_APPEND, 10). +-define(AS_OPERATOR_PREPEND, 11). +-define(AS_OPERATOR_TOUCH, 12). +-define(AS_OPERATOR_BIT_READ, 13). +-define(AS_OPERATOR_BIT_MODIFY, 14). +-define(AS_OPERATOR_DELETE, 15). +-define(AS_OPERATOR_HLL_READ, 16). +-define(AS_OPERATOR_HLL_MODIFY, 17). + +-define(KEY_DIGEST_SIZE, 160). % RIPEMD160 crypto:hash(ripemd160, ...) + + +-define(AS_BYTES_UNDEF, 0). +-define(AS_BYTES_INTEGER, 1). +-define(AS_BYTES_DOUBLE, 2). +-define(AS_BYTES_STRING, 3). +-define(AS_BYTES_BLOB, 4). +-define(AS_BYTES_JAVA, 7). +-define(AS_BYTES_CSHARP, 8). +-define(AS_BYTES_PYTHON, 9). +-define(AS_BYTES_RUBY, 10). +-define(AS_BYTES_PHP, 11). +-define(AS_BYTES_ERLANG, 12). +-define(AS_BYTES_BOOL, 17). +-define(AS_BYTES_HLL, 18). +-define(AS_BYTES_MAP, 19). +-define(AS_BYTES_LIST, 20). +-define(AS_BYTES_GEOJSON, 23). +-define(AS_BYTES_TYPE_MAX, 24). + +%% as_proto reflects binary (physical) layout +-record(as_proto, { + version :: aspike:uint8_t(), + type :: aspike:uint8_t(), + sz :: aspike:uint48_t() +}). + +%% as_msg reflects binary (physical) layout +-record(as_msg, { + header_sz :: aspike:uint8_t(), % number of uint8_ts in this header + info1 :: aspike:uint8_t(), % bitfield about this request + info2 :: aspike:uint8_t(), + info3 :: aspike:uint8_t(), + unused :: aspike:uint8_t(), + result_code :: aspike:uint8_t(), + generation :: aspike:uint32_t(), + record_ttl :: aspike:uint32_t(), + transaction_ttl :: aspike:uint32_t(), + n_fields :: aspike:uint16_t(), % size in uint8_ts + n_ops :: aspike:uint16_t() % number of operations + % followed by data that contain + % the fields, occupying n_fields bytes + % then the n_ops number of ops. +}). + +%% aspike_message_type_header is a logical representation of as_msg +-record(aspike_message_type_header, { + result_code :: aspike:uint8_t(), + n_fields :: aspike:uint16_t(), + n_bins :: aspike:uint16_t(), + ttl :: aspike:uint32_t(), + timeout :: aspike:uint32_t(), + read_attr :: aspike:uint8_t(), + write_attr :: aspike:uint8_t(), + info_attr :: aspike:uint8_t(), + generation :: aspike:uint32_t(), + unused :: aspike:uint8_t() +}). \ No newline at end of file diff --git a/apps/aspike_server/include/aspike_status.hrl b/apps/aspike_server/include/aspike_status.hrl new file mode 100644 index 0000000..c873d9a --- /dev/null +++ b/apps/aspike_server/include/aspike_status.hrl @@ -0,0 +1,96 @@ +%% Corresponds to as_status.h + +%% Success +-define(AEROSPIKE_OK, 0). + +%% Server Errors +-define(AEROSPIKE_ERR_SERVER, 1). +-define(AEROSPIKE_ERR_RECORD_NOT_FOUND, 2). +-define(AEROSPIKE_ERR_RECORD_GENERATION, 3). +-define(AEROSPIKE_ERR_REQUEST_INVALID, 4). +-define(AEROSPIKE_ERR_RECORD_EXISTS, 5). +-define(AEROSPIKE_ERR_BIN_EXISTS, 6). +-define(AEROSPIKE_ERR_CLUSTER_CHANGE, 7). +-define(AEROSPIKE_ERR_SERVER_FULL, 8). +-define(AEROSPIKE_ERR_TIMEOUT, 9). +-define(AEROSPIKE_ERR_ALWAYS_FORBIDDEN, 10). +-define(AEROSPIKE_ERR_CLUSTER, 11). +-define(AEROSPIKE_ERR_BIN_INCOMPATIBLE_TYPE, 12). +-define(AEROSPIKE_ERR_RECORD_TOO_BIG, 13). +-define(AEROSPIKE_ERR_RECORD_BUSY, 14). +-define(AEROSPIKE_ERR_SCAN_ABORTED, 15). +-define(AEROSPIKE_ERR_UNSUPPORTED_FEATURE, 16). +-define(AEROSPIKE_ERR_BIN_NOT_FOUND, 17). +-define(AEROSPIKE_ERR_DEVICE_OVERLOAD, 18). +-define(AEROSPIKE_ERR_RECORD_KEY_MISMATCH, 19). +-define(AEROSPIKE_ERR_NAMESPACE_NOT_FOUND, 20). +-define(AEROSPIKE_ERR_BIN_NAME, 21). +-define(AEROSPIKE_ERR_FAIL_FORBIDDEN, 22). +-define(AEROSPIKE_ERR_FAIL_ELEMENT_NOT_FOUND, 23). +-define(AEROSPIKE_ERR_FAIL_ELEMENT_EXISTS, 24). +-define(AEROSPIKE_ERR_ENTERPRISE_ONLY, 25). +-define(AEROSPIKE_ERR_OP_NOT_APPLICABLE, 26). +-define(AEROSPIKE_FILTERED_OUT, 27). +-define(AEROSPIKE_LOST_CONFLICT, 28). +-define(AEROSPIKE_QUERY_END, 50). +-define(AEROSPIKE_SECURITY_NOT_SUPPORTED, 51). +-define(AEROSPIKE_SECURITY_NOT_ENABLED, 52). +-define(AEROSPIKE_SECURITY_SCHEME_NOT_SUPPORTED, 53). +-define(AEROSPIKE_INVALID_COMMAND, 54). +-define(AEROSPIKE_INVALID_FIELD, 55). +-define(AEROSPIKE_ILLEGAL_STATE, 56). +-define(AEROSPIKE_INVALID_USER, 60). +-define(AEROSPIKE_USER_ALREADY_EXISTS, 61). +-define(AEROSPIKE_INVALID_PASSWORD, 62). +-define(AEROSPIKE_EXPIRED_PASSWORD, 63). +-define(AEROSPIKE_FORBIDDEN_PASSWORD, 64). +-define(AEROSPIKE_INVALID_CREDENTIAL, 65). +-define(AEROSPIKE_EXPIRED_SESSION, 66). +-define(AEROSPIKE_INVALID_ROLE, 70). +-define(AEROSPIKE_ROLE_ALREADY_EXISTS, 71). +-define(AEROSPIKE_INVALID_PRIVILEGE, 72). +-define(AEROSPIKE_INVALID_WHITELIST, 73). +-define(AEROSPIKE_QUOTAS_NOT_ENABLED, 74). +-define(AEROSPIKE_INVALID_QUOTA, 75). +-define(AEROSPIKE_NOT_AUTHENTICATED, 80). +-define(AEROSPIKE_ROLE_VIOLATION, 81). +-define(AEROSPIKE_NOT_WHITELISTED, 82). +-define(AEROSPIKE_QUOTA_EXCEEDED, 83). +-define(AEROSPIKE_ERR_UDF, 100). +-define(AEROSPIKE_ERR_BATCH_DISABLED, 150). +-define(AEROSPIKE_ERR_BATCH_MAX_REQUESTS_EXCEEDED, 151). +-define(AEROSPIKE_ERR_BATCH_QUEUES_FULL, 152). +-define(AEROSPIKE_ERR_GEO_INVALID_GEOJSON, 160). +-define(AEROSPIKE_ERR_INDEX_FOUND, 200). +-define(AEROSPIKE_ERR_INDEX_NOT_FOUND, 201). +-define(AEROSPIKE_ERR_INDEX_OOM, 202). +-define(AEROSPIKE_ERR_INDEX_NOT_READABLE, 203). +-define(AEROSPIKE_ERR_INDEX, 204). +-define(AEROSPIKE_ERR_INDEX_NAME_MAXLEN, 205). +-define(AEROSPIKE_ERR_INDEX_MAXCOUNT, 206). +-define(AEROSPIKE_ERR_QUERY_ABORTED, 210). +-define(AEROSPIKE_ERR_QUERY_QUEUE_FULL, 211). +-define(AEROSPIKE_ERR_QUERY_TIMEOUT, 212). +-define(AEROSPIKE_ERR_QUERY, 213). +-define(AEROSPIKE_ERR_UDF_NOT_FOUND, 1301). % likely internal Aerospike server error? +-define(AEROSPIKE_ERR_LUA_FILE_NOT_FOUND, 1302). % likely internal Aerospike server error? + +%% Client Errors. +%% These are C-client errors, irrelevant for Erlang client. +-define(AEROSPIKE_BATCH_FAILED, -16). +-define(AEROSPIKE_NO_RESPONSE, -15). +-define(AEROSPIKE_MAX_ERROR_RATE, -14). +-define(AEROSPIKE_USE_NORMAL_RETRY, -13). +-define(AEROSPIKE_ERR_MAX_RETRIES_EXCEEDED, -12). +-define(AEROSPIKE_ERR_ASYNC_QUEUE_FULL, -11). +-define(AEROSPIKE_ERR_CONNECTION, -10). +-define(AEROSPIKE_ERR_TLS_ERROR, -9). +-define(AEROSPIKE_ERR_INVALID_NODE, -8). +-define(AEROSPIKE_ERR_NO_MORE_CONNECTIONS, -7). +-define(AEROSPIKE_ERR_ASYNC_CONNECTION, -6). +-define(AEROSPIKE_ERR_CLIENT_ABORT, -5). +-define(AEROSPIKE_ERR_INVALID_HOST, -4). +-define(AEROSPIKE_NO_MORE_RECORDS, -3). +-define(AEROSPIKE_ERR_PARAM, -2). +-define(AEROSPIKE_ERR_CLIENT, -1). +-define(AEROSPIKE_ERR, -1). diff --git a/apps/aspike_server/lib/application.ex b/apps/aspike_server/lib/application.ex new file mode 100644 index 0000000..b6fd071 --- /dev/null +++ b/apps/aspike_server/lib/application.ex @@ -0,0 +1,20 @@ +defmodule Aspike.Server.Application do + # See https://hexdocs.pm/elixir/Application.html + # for more information on OTP Applications + @moduledoc false + + use Application + + @impl true + def start(_type, _args) do + children = [ + {Task.Supervisor, name: Aspike.Server.ProcessorSupervisor}, + {Aspike.Server, 4041} + ] + + # See https://hexdocs.pm/elixir/Supervisor.html + # for other strategies and supported options + opts = [strategy: :one_for_one, name: Aspike.Server.Supervisor] + Supervisor.start_link(children, opts) + end +end diff --git a/apps/aspike_server/lib/aspike_server.ex b/apps/aspike_server/lib/aspike_server.ex index 8ed3f89..ba82ff8 100644 --- a/apps/aspike_server/lib/aspike_server.ex +++ b/apps/aspike_server/lib/aspike_server.ex @@ -1,18 +1,29 @@ -defmodule AspikeServer do - @moduledoc """ - Documentation for `AspikeServer`. - """ +defmodule Aspike.Server do + @moduledoc false + use Task, restart: :transient + require Logger - @doc """ - Hello world. - - ## Examples + def start_link(arg) do + Task.start_link(__MODULE__, :accept, [arg]) + end - iex> AspikeServer.hello() - :world +def accept(port) do + {:ok, socket} = :gen_tcp.listen(port, + [:binary, + packet: :raw, + active: false, + reuseaddr: true, + backlog: 4096]) + Logger.info "Accepting connections on port #{port}" + loop_acceptor(socket) + end - """ - def hello do - :world + defp loop_acceptor(socket) do + {:ok, client} = :gen_tcp.accept(socket) + {:ok, pid} = Task.Supervisor.start_child( + Aspike.Server.ProcessorSupervisor, + Aspike.Server.Processor, :run, [client, <<>>]) + :ok = :gen_tcp.controlling_process(client, pid) + loop_acceptor(socket) end end diff --git a/apps/aspike_server/lib/aspike_server_command.ex b/apps/aspike_server/lib/aspike_server_command.ex new file mode 100644 index 0000000..098195b --- /dev/null +++ b/apps/aspike_server/lib/aspike_server_command.ex @@ -0,0 +1,90 @@ +defmodule Aspike.Server.Command do + require Logger + + def decode(data) do + :aspike_server_protocol.dec_request(data) + end + + def encode(response) do + :aspike_server_protocol.enc_response(response) + end + + # Returns: + # {login_response, #{?SESSION_TTL => ?TEST_SESSION_TTL, ?SESSION_TOKEN => ?TEST_SESSION_TOKEN1}} + # {login_response, no_password} + # {login_response, wrong_password} + # {login_response, no_user} + # {login_response, unknown_user} + def run({:login_request, _params} = request, _pid) do + :aspike_server_processor.process(request) + end + + # Returns: + # {:error, :namespace_not_found} + # {:put_response, :ok} + def run({:put_request, _params} = request, pid) do + {ns_erl, set, key, bvs} = :aspike_server_processor.process(request) + ns = to_string(ns_erl) + case lookup(pid, ns, + fn ns_pid -> Aspike.Ns.put(ns_pid, set, key, bvs) end) do + {:error, :not_found} -> {:error, :namespace_not_found} + :ok -> {:put_response, :ok} + end + end + + # Returns: + # {:error, :namespace_not_found} + # {:error, :record_not_found} + # {:get_response, [{bin, value}]} + def run({:get_request, _params} = request, pid) do + {ns_erl, set, key, _bvs} = :aspike_server_processor.process(request) + ns = to_string(ns_erl) + case lookup(pid, ns, + fn ns_pid -> Aspike.Ns.get(ns_pid, set, key) end) do + {:error, :not_found} -> {:error, :namespace_not_found} + nil -> {:error, :record_not_found} + found -> {:get_response, found} + end + end + + # Returns: + # {:error, :namespace_not_found} + # {:error, :record_not_found} + # {:remove_response, :ok} + def run({:remove_request, _params} = request, pid) do + {ns_erl, set, key} = :aspike_server_processor.process(request) + ns = to_string(ns_erl) + case lookup(pid, ns, + fn ns_pid -> Aspike.Ns.remove(ns_pid, set, key) end) do + {:error, :not_found} -> {:error, :namespace_not_found} + nil -> {:error, :record_not_found} + _ -> {:remove_response, :ok} + end + end + + # Returns: + # {:error, :namespace_not_found} + # {:error, :record_not_found} + # {:exists_response, ok} + def run({:exists_request, _params} = request, pid) do + {ns_erl, set, key} = :aspike_server_processor.process(request) + ns = to_string(ns_erl) + case lookup(pid, ns, + fn ns_pid -> Aspike.Ns.exists(ns_pid, set, key) end) do + {:error, :not_found} -> {:error, :namespace_not_found} + false -> {:error, :record_not_found} + true -> {:exists_response, :ok} + end + end + + def run(request, _pid) do + {:unknown_response, request} + end + + defp lookup(pid, ns, callback) do + case Aspike.Ns.Registry.lookup(pid, ns) do + {:ok, ns_pid} -> callback.(ns_pid) + :error -> {:error, :not_found} + end + end +end \ No newline at end of file diff --git a/apps/aspike_server/lib/aspike_server_processor.ex b/apps/aspike_server/lib/aspike_server_processor.ex new file mode 100644 index 0000000..824d658 --- /dev/null +++ b/apps/aspike_server/lib/aspike_server_processor.ex @@ -0,0 +1,31 @@ +defmodule Aspike.Server.Processor do + @moduledoc false + require Logger + + def run(socket, buffer) do + case :gen_tcp.recv(socket, 0) do + {:error, :closed} -> + :ok + {:ok, data} -> + buffer1 = buffer <> data + buffer2 = process(buffer1, socket) + run(socket, buffer2) + end + end + + defp process(data, socket) do + case Aspike.Server.Command.decode(data) do + :need_more -> data + {:error, reason} -> + Logger.error "Aspike.Server.Command.parse: error: #{reason}" + {:ok, decoded, rest} -> + response = Aspike.Server.Command.run(decoded, Aspike.Ns.Registry) + encoded = Aspike.Server.Command.encode(response) + case :gen_tcp.send(socket, encoded) do + {:error, :closed} -> + <<>> + :ok -> process(rest, socket) + end + end + end +end diff --git a/apps/aspike_server/lib/aspike_status.ex b/apps/aspike_server/lib/aspike_status.ex new file mode 100644 index 0000000..2f4bde4 --- /dev/null +++ b/apps/aspike_server/lib/aspike_status.ex @@ -0,0 +1,8 @@ +defmodule Aspike.Status do +# see https://stackoverflow.com/questions/33851536/how-do-you-define-constants-in-elixir-modules + @moduledoc false + + def ok, do: 0 + def err_NAMESPACE_NOT_FOUND, do: 20 + def err_RECORD_NOT_FOUND, do: 2 +end diff --git a/apps/aspike_server/mix.exs b/apps/aspike_server/mix.exs index 2a2cf37..2bd42c3 100644 --- a/apps/aspike_server/mix.exs +++ b/apps/aspike_server/mix.exs @@ -1,4 +1,4 @@ -defmodule AspikeServer.MixProject do +defmodule Aspike.Server.MixProject do use Mix.Project def project do @@ -18,7 +18,8 @@ defmodule AspikeServer.MixProject do # Run "mix help compile.app" to learn about applications. def application do [ - extra_applications: [:logger] + extra_applications: [:logger], + mod: {Aspike.Server.Application, []} ] end @@ -28,6 +29,9 @@ defmodule AspikeServer.MixProject do # {:dep_from_hexpm, "~> 0.3.0"}, # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}, # {:sibling_app_in_umbrella, in_umbrella: true} + {:aspike_storage, in_umbrella: true}, + {:aspike_server_text_protocol, in_umbrella: true}, + {:aspike_protocol, git: "git@github.com:vsavkov/aspike-protocol.git", tag: "v0.1.0"} ] end end diff --git a/apps/aspike_server/src/aspike_receive.erl b/apps/aspike_server/src/aspike_receive.erl new file mode 100644 index 0000000..cc9993a --- /dev/null +++ b/apps/aspike_server/src/aspike_receive.erl @@ -0,0 +1,28 @@ +-module(aspike_receive). +-include("../include/aspike_protocol.hrl"). + +%% API +-export([ + receive_response_admin/2, + receive_response_operation/2 +]). + +receive_response_admin(Socket, Timeout) -> + receive_proto(Socket, 8, Timeout, ?AS_ADMIN_MESSAGE_TYPE). + +receive_response_operation(Socket, Timeout) -> + receive_proto(Socket, 8, Timeout, ?AS_MESSAGE_TYPE). + +receive_proto(Socket, Header_size, Timeout, Type) -> + Ret_header = gen_tcp:recv(Socket, Header_size, Timeout), + case Ret_header of + {ok, Header} -> + <> + = Header, + Ret_data = gen_tcp:recv(Socket, Size, Timeout), + case Ret_data of + {ok, Data} -> <
>; + Other_data -> Other_data + end; + Other_header -> Other_header + end. diff --git a/apps/aspike_server/src/aspike_server_processor.erl b/apps/aspike_server/src/aspike_server_processor.erl new file mode 100644 index 0000000..ae789fb --- /dev/null +++ b/apps/aspike_server/src/aspike_server_processor.erl @@ -0,0 +1,45 @@ +-module(aspike_server_processor). +-include("../include/aspike_protocol.hrl"). +-include("../include/aspike_node_test.hrl"). +%%-include_lib("eunit/include/eunit.hrl"). + +%% +%% Aerospike server emulator +%% + +%% API +-export([ + process/1 +]). + +process({login_request, {_Command, Fields}}) -> + case proplists:get_value(?USER, Fields) of + ?TEST_USER1 -> + case proplists:get_value(?CREDENTIAL, Fields) of + ?TEST_PASSWORD1_CRYPT -> + Response = {login_response, + #{?SESSION_TTL => ?TEST_SESSION_TTL, + ?SESSION_TOKEN => ?TEST_SESSION_TOKEN1}}, + Response; + undefined -> {login_response, no_password}; + _ -> {login_response, wrong_password} + end; + undefined -> {login_response, no_user}; + _ -> {login_response, unknown_user} + end; + +process({put_request, {Namespace_str, Set_name, Key_digest, Bins}}) -> + Bvs = [{B, V} || {_Op, B, V} <- Bins], + {Namespace_str, Set_name, Key_digest, Bvs}; + +process({get_request, {Namespace_str, Set_name, Key_digest, Bins}}) -> + {Namespace_str, Set_name, Key_digest, Bins}; + +process({remove_request, {Namespace_str, Set_name, Key_digest}}) -> + {Namespace_str, Set_name, Key_digest}; + +process({exists_request, {Namespace_str, Set_name, Key_digest}}) -> + {Namespace_str, Set_name, Key_digest}; + +process(X) -> + {unknown_response, X}. diff --git a/apps/aspike_server/src/aspike_server_protocol.erl b/apps/aspike_server/src/aspike_server_protocol.erl new file mode 100644 index 0000000..34b36f5 --- /dev/null +++ b/apps/aspike_server/src/aspike_server_protocol.erl @@ -0,0 +1,119 @@ +-module(aspike_server_protocol). +-include("../include/aspike_protocol.hrl"). +-include("../include/aspike_status.hrl"). + +%% +%% Aerospike server emulator +%% + +-export([ + dec_request/1, + enc_response/1 +]). + +dec_request(<> = Data) -> + case aspike_protocol:dec_login_request(Data) of + need_more -> need_more; + {error, Reason} -> {error, {login_request, Reason}}; + {ok, Decoded, Rest} -> {ok, {login_request, Decoded}, Rest} + end; + +dec_request(<> = Data) -> + case aspike_protocol:dec_message_pkt(Data) of + need_more -> need_more; + {error, _Reason} = Err -> Err; + {ok, {_Version, _Type, Header_and_else}, _Rest1} -> + {ok, H, _Rest2} = aspike_protocol:dec_message_type_header(Header_and_else), + case message_type_request_type(H) of + undefined -> {error, unrecognized_message_type_request}; + put -> + case aspike_protocol:dec_put_request(Data) of + need_more -> need_more; + {error, Reason} -> {error, {put_request, Reason}}; + {ok, Decoded, Rest} -> {ok, {put_request, Decoded}, Rest} + end; + get -> + case aspike_protocol:dec_get_request(Data) of + need_more -> need_more; + {error, Reason} -> {error, {get_request, Reason}}; + {ok, Decoded, Rest} -> {ok, {get_request, Decoded}, Rest} + end; + remove -> + case aspike_protocol:dec_remove_request(Data) of + need_more -> need_more; + {error, Reason} -> {error, {remove_request, Reason}}; + {ok, Decoded, Rest} -> {ok, {remove_request, Decoded}, Rest} + end; + exists -> + case aspike_protocol:dec_exists_request(Data) of + need_more -> need_more; + {error, Reason} -> {error, {exists_request, Reason}}; + {ok, Decoded, Rest} -> {ok, {exists_request, Decoded}, Rest} + end + end + end; + +dec_request(<<>>) -> + need_more; +dec_request(<>) -> + need_more; + +dec_request(Data) -> + {error, {unknown_request, Data}}. + +enc_response({error, namespace_not_found}) -> + aspike_protocol:enc_error_response(?AEROSPIKE_ERR_NAMESPACE_NOT_FOUND); + +enc_response({error, record_not_found}) -> + aspike_protocol:enc_error_response(?AEROSPIKE_ERR_RECORD_NOT_FOUND); + +enc_response({login_response, no_password}) -> + aspike_protocol:enc_login_response(?AEROSPIKE_INVALID_CREDENTIAL, []); +enc_response({login_response, wrong_password}) -> + aspike_protocol:enc_login_response(?AEROSPIKE_INVALID_CREDENTIAL, []); +enc_response({login_response, no_user}) -> + aspike_protocol:enc_login_response(?AEROSPIKE_INVALID_USER, []); +enc_response({login_response, unknown_user}) -> + aspike_protocol:enc_login_response(?AEROSPIKE_INVALID_USER, []); +enc_response({login_response, #{?SESSION_TTL := Ttl, ?SESSION_TOKEN := Token}}) -> + aspike_protocol:enc_login_response(?AEROSPIKE_OK, [{session_token, Token}, {session_ttl, Ttl}]); + +enc_response({put_response, ok}) -> + aspike_protocol:enc_put_response(?AEROSPIKE_OK); + +enc_response({get_response, Bins}) -> + aspike_protocol:enc_get_response(?AEROSPIKE_OK, [], Bins); + +enc_response({remove_response, ok}) -> + aspike_protocol:enc_remove_response(?AEROSPIKE_OK); + +enc_response({remove_response, record_not_found}) -> + aspike_protocol:enc_remove_response(?AEROSPIKE_ERR_RECORD_NOT_FOUND); + +enc_response({exists_response, ok}) -> + aspike_protocol:enc_exists_response(?AEROSPIKE_OK); + +enc_response({exists_response, record_not_found}) -> + aspike_protocol:enc_exists_response(?AEROSPIKE_ERR_RECORD_NOT_FOUND); + +enc_response(X) -> + {error, {unknown_response, X}}. + +message_type_request_type(#aspike_message_type_header{ + write_attr = ?AS_MSG_INFO2_WRITE}) -> + put; +message_type_request_type(#aspike_message_type_header{ + read_attr = ?AS_MSG_INFO1_READ}) -> + get; +message_type_request_type(#aspike_message_type_header{ + read_attr = (?AS_MSG_INFO1_READ bor ?AS_MSG_INFO1_GET_ALL)}) -> + get; +message_type_request_type(#aspike_message_type_header{ + write_attr = (?AS_MSG_INFO2_WRITE bor ?AS_MSG_INFO2_DELETE)}) -> + remove; +message_type_request_type(#aspike_message_type_header{ + read_attr = (?AS_MSG_INFO1_READ bor ?AS_MSG_INFO1_GET_NOBINDATA)}) -> + exists; +message_type_request_type(_) -> undefined. diff --git a/apps/aspike_server/test/aspike_server_test.exs b/apps/aspike_server/test/aspike_server_test.exs index 9657bff..5b8c310 100644 --- a/apps/aspike_server/test/aspike_server_test.exs +++ b/apps/aspike_server/test/aspike_server_test.exs @@ -1,8 +1,490 @@ -defmodule AspikeServerTest do +defmodule Aspike.ServerTest do use ExUnit.Case - doctest AspikeServer + doctest Aspike.Server - test "greets the world" do - assert AspikeServer.hello() == :world + setup do + Application.stop(:aspike_storage) + :ok = Application.start(:aspike_storage) +# Application.ensure_all_started(:aspike_storage) end + + setup do + opts = [:binary, packet: :raw, active: false] + {:ok, socket} = :gen_tcp.connect('localhost', 4041, opts) + {:ok, socket: socket} + end + + setup do + opts = [:binary, packet: :line, active: false] + {:ok, socket_text} = :gen_tcp.connect('localhost', 4040, opts) + {:ok, socket_text: socket_text} + end + + setup do + {:ok, + namespace: "namespace-gateway", + set: "set-gateway", + bin: 'aspike-bin-1', + kp: "key-aspike-", + vp: "value-aspike-" + } + end + + setup do + {:ok, + test_user: <<"User1">>, + credential: 3, + test_password_crypt: <<"$2a$10$7EqJtq98hPqEX7fNZaFWoOOY1Ba9.gZNwHJkrSKJl7mXQyPCsCrQa">> + } + end + + setup do + {:ok, + session_token: <<"test_session_token1">>, + session_ttl: 12345 + } + end + + test "login", %{ + socket: socket, + test_user: test_user, + credential: credential, + test_password_crypt: test_password_crypt, + session_token: session_token, + session_ttl: session_ttl} do + + pkt_login = :aspike_protocol.enc_login_request(test_user, {credential, test_password_crypt}) + send = :gen_tcp.send(socket, pkt_login) + assert send == :ok + + response = :aspike_receive.receive_response_admin(socket, 1000) + {:ok, {status, fields}, <<>>} = :aspike_protocol.dec_login_response(response) + assert Aspike.Status.ok == status + assert length(fields) == 2 + assert fields[:session_token] == session_token + assert fields[:session_ttl] == session_ttl + end + + test "put into not existing namespace", %{ + socket: socket, + test_user: test_user, + credential: credential, + test_password_crypt: test_password_crypt, + session_token: session_token, + session_ttl: session_ttl, + namespace: namespace, + set: set, + bin: bin, + kp: kp, + vp: vp} do + + login_request = :aspike_protocol.enc_login_request(test_user, {credential, test_password_crypt}) + :ok = :gen_tcp.send(socket, login_request) + + response = :aspike_receive.receive_response_admin(socket, 1000) + {:ok, {status, fields}, <<>>} = :aspike_protocol.dec_login_response(response) + assert Aspike.Status.ok == status + assert length(fields) == 2 + assert fields[:session_token] == session_token + assert fields[:session_ttl] == session_ttl + + {key, value} = mk_kv(kp, vp) + key_digest = :aspike_protocol.digest(set, key) + put_request = :aspike_protocol.enc_put_request(namespace, set, key_digest, [{bin, value}]) + :ok = :gen_tcp.send(socket, put_request) + put_response = :aspike_receive.receive_response_operation(socket, 1000) + {:ok, put_decoded, rest} = :aspike_protocol.dec_put_response(put_response) + assert put_decoded == Aspike.Status.err_NAMESPACE_NOT_FOUND + assert rest == <<>> + end + + test "put", %{ + socket: socket, + socket_text: socket_text, + test_user: test_user, + credential: credential, + test_password_crypt: test_password_crypt, + session_token: session_token, + session_ttl: session_ttl, + namespace: namespace, + set: set, + bin: bin, + kp: kp, + vp: vp} do + + assert send_and_recv(socket_text, "CREATE #{namespace}\r\n") == "OK\r\n" + + login_request = :aspike_protocol.enc_login_request(test_user, {credential, test_password_crypt}) + :ok = :gen_tcp.send(socket, login_request) + + response = :aspike_receive.receive_response_admin(socket, 1000) + {:ok, {status, fields}, <<>>} = :aspike_protocol.dec_login_response(response) + assert Aspike.Status.ok == status + assert length(fields) == 2 + assert fields[:session_token] == session_token + assert fields[:session_ttl] == session_ttl + + {key, value} = mk_kv(kp, vp) + key_digest = :aspike_protocol.digest(set, key) + put_request = :aspike_protocol.enc_put_request(namespace, set, key_digest, [{bin, value}]) + :ok = :gen_tcp.send(socket, put_request) + put_response = :aspike_receive.receive_response_operation(socket, 1000) + {:ok, put_decoded, rest} = :aspike_protocol.dec_put_response(put_response) + assert put_decoded == Aspike.Status.ok + assert rest == <<>> + end + + test "get from not existing namespace", %{ + socket: socket, + test_user: test_user, + credential: credential, + test_password_crypt: test_password_crypt, + session_token: session_token, + session_ttl: session_ttl, + namespace: namespace, + set: set, + bin: _bin, + kp: kp, + vp: vp} do + + login_request = :aspike_protocol.enc_login_request(test_user, {credential, test_password_crypt}) + :ok = :gen_tcp.send(socket, login_request) + + response = :aspike_receive.receive_response_admin(socket, 1000) + {:ok, {status, fields}, <<>>} = :aspike_protocol.dec_login_response(response) + assert Aspike.Status.ok == status + assert length(fields) == 2 + assert fields[:session_token] == session_token + assert fields[:session_ttl] == session_ttl + + {key, _value} = mk_kv(kp, vp) + key_digest = :aspike_protocol.digest(set, key) + get_request = :aspike_protocol.enc_get_request(namespace, set, key_digest, []) + :ok = :gen_tcp.send(socket, get_request) + get_response = :aspike_receive.receive_response_operation(socket, 1000) + {:ok, get_decoded, rest} = :aspike_protocol.dec_get_response(get_response) + assert elem(get_decoded, 0) == Aspike.Status.err_NAMESPACE_NOT_FOUND + assert rest == <<>> + end + + test "get non-existing record", %{ + socket: socket, + socket_text: socket_text, + test_user: test_user, + credential: credential, + test_password_crypt: test_password_crypt, + session_token: session_token, + session_ttl: session_ttl, + namespace: namespace, + set: set, + bin: _bin, + kp: kp, + vp: vp} do + + assert send_and_recv(socket_text, "CREATE #{namespace}\r\n") == "OK\r\n" + + login_request = :aspike_protocol.enc_login_request(test_user, {credential, test_password_crypt}) + :ok = :gen_tcp.send(socket, login_request) + + response = :aspike_receive.receive_response_admin(socket, 1000) + {:ok, {status, fields}, <<>>} = :aspike_protocol.dec_login_response(response) + assert Aspike.Status.ok == status + assert length(fields) == 2 + assert fields[:session_token] == session_token + assert fields[:session_ttl] == session_ttl + + {key, _value} = mk_kv(kp, vp) + key_digest = :aspike_protocol.digest(set, key) + get_request = :aspike_protocol.enc_get_request(namespace, set, key_digest, []) + :ok = :gen_tcp.send(socket, get_request) + get_response = :aspike_receive.receive_response_operation(socket, 1000) + {:ok, get_decoded, rest} = :aspike_protocol.dec_get_response(get_response) + assert elem(get_decoded, 0) == Aspike.Status.err_RECORD_NOT_FOUND + assert rest == <<>> + end + + test "get existing record", %{ + socket: socket, + socket_text: socket_text, + test_user: test_user, + credential: credential, + test_password_crypt: test_password_crypt, + session_token: session_token, + session_ttl: session_ttl, + namespace: namespace, + set: set, + bin: bin, + kp: kp, + vp: vp} do + + assert send_and_recv(socket_text, "CREATE #{namespace}\r\n") == "OK\r\n" + + login_request = :aspike_protocol.enc_login_request(test_user, {credential, test_password_crypt}) + :ok = :gen_tcp.send(socket, login_request) + + response = :aspike_receive.receive_response_admin(socket, 1000) + {:ok, {status, fields}, <<>>} = :aspike_protocol.dec_login_response(response) + assert Aspike.Status.ok == status + assert length(fields) == 2 + assert fields[:session_token] == session_token + assert fields[:session_ttl] == session_ttl + + {key, value} = mk_kv(kp, vp) + key_digest = :aspike_protocol.digest(set, key) + + put_request = :aspike_protocol.enc_put_request(namespace, set, key_digest, [{bin, value}]) + :ok = :gen_tcp.send(socket, put_request) + put_response = :aspike_receive.receive_response_operation(socket, 1000) + {:ok, put_decoded, rest} = :aspike_protocol.dec_put_response(put_response) + assert put_decoded == Aspike.Status.ok + assert rest == <<>> + + get_request = :aspike_protocol.enc_get_request(namespace, set, key_digest, []) + :ok = :gen_tcp.send(socket, get_request) + get_response = :aspike_receive.receive_response_operation(socket, 1000) + {:ok, get_decoded, rest} = :aspike_protocol.dec_get_response(get_response) + assert elem(get_decoded, 0) == Aspike.Status.ok + assert elem(get_decoded, 1) == [] + assert elem(get_decoded, 2) == [{to_string(bin), value}] + assert rest == <<>> + end + + test "remove from not existing namespace", %{ + socket: socket, + test_user: test_user, + credential: credential, + test_password_crypt: test_password_crypt, + session_token: session_token, + session_ttl: session_ttl, + namespace: namespace, + set: set, + bin: _bin, + kp: kp, + vp: vp} do + + login_request = :aspike_protocol.enc_login_request(test_user, {credential, test_password_crypt}) + :ok = :gen_tcp.send(socket, login_request) + + response = :aspike_receive.receive_response_admin(socket, 1000) + {:ok, {status, fields}, <<>>} = :aspike_protocol.dec_login_response(response) + assert Aspike.Status.ok == status + assert length(fields) == 2 + assert fields[:session_token] == session_token + assert fields[:session_ttl] == session_ttl + + {key, _value} = mk_kv(kp, vp) + key_digest = :aspike_protocol.digest(set, key) + remove_request = :aspike_protocol.enc_remove_request(namespace, set, key_digest) + :ok = :gen_tcp.send(socket, remove_request) + remove_response = :aspike_receive.receive_response_operation(socket, 1000) + {:ok, remove_decoded, rest} = :aspike_protocol.dec_remove_response(remove_response) + assert remove_decoded == Aspike.Status.err_NAMESPACE_NOT_FOUND + assert rest == <<>> + end + + test "remove non-existing record", %{ + socket: socket, + socket_text: socket_text, + test_user: test_user, + credential: credential, + test_password_crypt: test_password_crypt, + session_token: session_token, + session_ttl: session_ttl, + namespace: namespace, + set: set, + bin: _bin, + kp: kp, + vp: vp} do + + assert send_and_recv(socket_text, "CREATE #{namespace}\r\n") == "OK\r\n" + + login_request = :aspike_protocol.enc_login_request(test_user, {credential, test_password_crypt}) + :ok = :gen_tcp.send(socket, login_request) + + response = :aspike_receive.receive_response_admin(socket, 1000) + {:ok, {status, fields}, <<>>} = :aspike_protocol.dec_login_response(response) + assert Aspike.Status.ok == status + assert length(fields) == 2 + assert fields[:session_token] == session_token + assert fields[:session_ttl] == session_ttl + + {key, _value} = mk_kv(kp, vp) + key_digest = :aspike_protocol.digest(set, key) + remove_request = :aspike_protocol.enc_remove_request(namespace, set, key_digest) + :ok = :gen_tcp.send(socket, remove_request) + remove_response = :aspike_receive.receive_response_operation(socket, 1000) + {:ok, remove_decoded, rest} = :aspike_protocol.dec_remove_response(remove_response) + assert remove_decoded == Aspike.Status.err_RECORD_NOT_FOUND + assert rest == <<>> + end + + test "remove existing record", %{ + socket: socket, + socket_text: socket_text, + test_user: test_user, + credential: credential, + test_password_crypt: test_password_crypt, + session_token: session_token, + session_ttl: session_ttl, + namespace: namespace, + set: set, + bin: bin, + kp: kp, + vp: vp} do + + assert send_and_recv(socket_text, "CREATE #{namespace}\r\n") == "OK\r\n" + + login_request = :aspike_protocol.enc_login_request(test_user, {credential, test_password_crypt}) + :ok = :gen_tcp.send(socket, login_request) + + response = :aspike_receive.receive_response_admin(socket, 1000) + {:ok, {status, fields}, <<>>} = :aspike_protocol.dec_login_response(response) + assert Aspike.Status.ok == status + assert length(fields) == 2 + assert fields[:session_token] == session_token + assert fields[:session_ttl] == session_ttl + + {key, value} = mk_kv(kp, vp) + key_digest = :aspike_protocol.digest(set, key) + + put_request = :aspike_protocol.enc_put_request(namespace, set, key_digest, [{bin, value}]) + :ok = :gen_tcp.send(socket, put_request) + put_response = :aspike_receive.receive_response_operation(socket, 1000) + {:ok, put_decoded, rest} = :aspike_protocol.dec_put_response(put_response) + assert put_decoded == Aspike.Status.ok + assert rest == <<>> + + get_request = :aspike_protocol.enc_get_request(namespace, set, key_digest, []) + :ok = :gen_tcp.send(socket, get_request) + get_response = :aspike_receive.receive_response_operation(socket, 1000) + {:ok, get_decoded, rest} = :aspike_protocol.dec_get_response(get_response) + assert elem(get_decoded, 0) == Aspike.Status.ok + assert elem(get_decoded, 1) == [] + assert elem(get_decoded, 2) == [{to_string(bin), value}] + assert rest == <<>> + + remove_request = :aspike_protocol.enc_remove_request(namespace, set, key_digest) + :ok = :gen_tcp.send(socket, remove_request) + remove_response = :aspike_receive.receive_response_operation(socket, 1000) + {:ok, remove_decoded, rest} = :aspike_protocol.dec_remove_response(remove_response) + assert remove_decoded == Aspike.Status.ok + assert rest == <<>> + + :ok = :gen_tcp.send(socket, get_request) + get_response = :aspike_receive.receive_response_operation(socket, 1000) + {:ok, get_decoded, rest} = :aspike_protocol.dec_get_response(get_response) + assert elem(get_decoded, 0) == Aspike.Status.err_RECORD_NOT_FOUND + assert rest == <<>> + end + + test "exists in not existing namespace", %{ + socket: socket, + test_user: test_user, + credential: credential, + test_password_crypt: test_password_crypt, + session_token: session_token, + session_ttl: session_ttl, + namespace: namespace, + set: set, + bin: _bin, + kp: kp, + vp: vp} do + + login_request = :aspike_protocol.enc_login_request(test_user, {credential, test_password_crypt}) + :ok = :gen_tcp.send(socket, login_request) + + response = :aspike_receive.receive_response_admin(socket, 1000) + {:ok, {status, fields}, <<>>} = :aspike_protocol.dec_login_response(response) + assert Aspike.Status.ok == status + assert length(fields) == 2 + assert fields[:session_token] == session_token + assert fields[:session_ttl] == session_ttl + + {key, _value} = mk_kv(kp, vp) + key_digest = :aspike_protocol.digest(set, key) + exists_request = :aspike_protocol.enc_exists_request(namespace, set, key_digest) + :ok = :gen_tcp.send(socket, exists_request) + exists_response = :aspike_receive.receive_response_operation(socket, 1000) + {:ok, exists_decoded, rest} = :aspike_protocol.dec_exists_response(exists_response) + assert exists_decoded == Aspike.Status.err_NAMESPACE_NOT_FOUND + assert rest == <<>> + end + + test "exists record", %{ + socket: socket, + socket_text: socket_text, + test_user: test_user, + credential: credential, + test_password_crypt: test_password_crypt, + session_token: session_token, + session_ttl: session_ttl, + namespace: namespace, + set: set, + bin: bin, + kp: kp, + vp: vp} do + + assert send_and_recv(socket_text, "CREATE #{namespace}\r\n") == "OK\r\n" + + login_request = :aspike_protocol.enc_login_request(test_user, {credential, test_password_crypt}) + :ok = :gen_tcp.send(socket, login_request) + + response = :aspike_receive.receive_response_admin(socket, 1000) + {:ok, {status, fields}, <<>>} = :aspike_protocol.dec_login_response(response) + assert Aspike.Status.ok == status + assert length(fields) == 2 + assert fields[:session_token] == session_token + assert fields[:session_ttl] == session_ttl + + {key, value} = mk_kv(kp, vp) + key_digest = :aspike_protocol.digest(set, key) + + exists_request = :aspike_protocol.enc_exists_request(namespace, set, key_digest) + :ok = :gen_tcp.send(socket, exists_request) + exists_response = :aspike_receive.receive_response_operation(socket, 1000) + {:ok, exists_decoded, rest} = :aspike_protocol.dec_exists_response(exists_response) + assert exists_decoded == Aspike.Status.err_RECORD_NOT_FOUND + assert rest == <<>> + + put_request = :aspike_protocol.enc_put_request(namespace, set, key_digest, [{bin, value}]) + :ok = :gen_tcp.send(socket, put_request) + put_response = :aspike_receive.receive_response_operation(socket, 1000) + {:ok, put_decoded, rest} = :aspike_protocol.dec_put_response(put_response) + assert put_decoded == Aspike.Status.ok + assert rest == <<>> + + :ok = :gen_tcp.send(socket, exists_request) + exists_response = :aspike_receive.receive_response_operation(socket, 1000) + {:ok, exists_decoded, rest} = :aspike_protocol.dec_exists_response(exists_response) + assert exists_decoded == Aspike.Status.ok + assert rest == <<>> + + remove_request = :aspike_protocol.enc_remove_request(namespace, set, key_digest) + :ok = :gen_tcp.send(socket, remove_request) + remove_response = :aspike_receive.receive_response_operation(socket, 1000) + {:ok, remove_decoded, rest} = :aspike_protocol.dec_remove_response(remove_response) + assert remove_decoded == Aspike.Status.ok + assert rest == <<>> + + :ok = :gen_tcp.send(socket, exists_request) + exists_response = :aspike_receive.receive_response_operation(socket, 1000) + {:ok, exists_decoded, rest} = :aspike_protocol.dec_exists_response(exists_response) + assert exists_decoded == Aspike.Status.err_RECORD_NOT_FOUND + assert rest == <<>> + end + + ## Key-Value + defp mk_kv(key_prefix, value_prefix) do + ts = to_string(:erlang.system_time(:millisecond)) + k = key_prefix <> ts + v = value_prefix <> ts + {k, v} + end + + defp send_and_recv(socket, command) do + :ok = :gen_tcp.send(socket, command) + {:ok, data} = :gen_tcp.recv(socket, 0, 1000) + data + end + end diff --git a/apps/aspike_server_text_protocol/.formatter.exs b/apps/aspike_server_text_protocol/.formatter.exs new file mode 100644 index 0000000..d2cda26 --- /dev/null +++ b/apps/aspike_server_text_protocol/.formatter.exs @@ -0,0 +1,4 @@ +# Used by "mix format" +[ + inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] +] diff --git a/apps/aspike_server_text_protocol/.gitignore b/apps/aspike_server_text_protocol/.gitignore new file mode 100644 index 0000000..d02dcd3 --- /dev/null +++ b/apps/aspike_server_text_protocol/.gitignore @@ -0,0 +1,26 @@ +# The directory Mix will write compiled artifacts to. +/_build/ + +# If you run "mix test --cover", coverage assets end up here. +/cover/ + +# The directory Mix downloads your dependencies sources to. +/deps/ + +# Where third-party dependencies like ExDoc output generated docs. +/doc/ + +# Ignore .fetch files in case you like to edit your project deps locally. +/.fetch + +# If the VM crashes, it generates a dump, let's ignore it too. +erl_crash.dump + +# Also ignore archive artifacts (built via "mix archive.build"). +*.ez + +# Ignore package tarball (built via "mix hex.build"). +aspike_server-*.tar + +# Temporary files, for example, from tests. +/tmp/ diff --git a/apps/aspike_server_text_protocol/README.md b/apps/aspike_server_text_protocol/README.md new file mode 100644 index 0000000..34322d4 --- /dev/null +++ b/apps/aspike_server_text_protocol/README.md @@ -0,0 +1,21 @@ +# AspikeServerTextProtocol + +**Text-based protocol for Aerospike Server Emulator Storage in Erlang/Elixir** + +## Installation + +If [available in Hex](https://hex.pm/docs/publish), the package can be installed +by adding `aspike_server` to your list of dependencies in `mix.exs`: + +```elixir +def deps do + [ + {:aspike_server, "~> 0.1.0"} + ] +end +``` + +Documentation can be generated with [ExDoc](https://github.com/elixir-lang/ex_doc) +and published on [HexDocs](https://hexdocs.pm). Once published, the docs can +be found at . + diff --git a/apps/aspike_server_text_protocol/lib/application.ex b/apps/aspike_server_text_protocol/lib/application.ex new file mode 100644 index 0000000..9663b42 --- /dev/null +++ b/apps/aspike_server_text_protocol/lib/application.ex @@ -0,0 +1,20 @@ +defmodule Aspike.TextServer.Application do + # See https://hexdocs.pm/elixir/Application.html + # for more information on OTP Applications + @moduledoc false + + use Application + + @impl true + def start(_type, _args) do + children = [ + {Task.Supervisor, name: Aspike.TextServer.ProcessorSupervisor}, + {Aspike.TextServer, 4040} + ] + + # See https://hexdocs.pm/elixir/Supervisor.html + # for other strategies and supported options + opts = [strategy: :one_for_one, name: Aspike.TextServer.Supervisor] + Supervisor.start_link(children, opts) + end +end diff --git a/apps/aspike_server_text_protocol/lib/aspike_text_server.ex b/apps/aspike_server_text_protocol/lib/aspike_text_server.ex new file mode 100644 index 0000000..9cc8060 --- /dev/null +++ b/apps/aspike_server_text_protocol/lib/aspike_text_server.ex @@ -0,0 +1,25 @@ +defmodule Aspike.TextServer do + @moduledoc false + use Task, restart: :transient + require Logger + + def start_link(arg) do + Task.start_link(__MODULE__, :accept, [arg]) + end + + def accept(port) do + {:ok, socket} = :gen_tcp.listen(port, + [:binary, packet: :line, active: false, reuseaddr: true]) + Logger.info "Accepting connections on port #{port}" + loop_acceptor(socket) + end + + defp loop_acceptor(socket) do + {:ok, client} = :gen_tcp.accept(socket) + {:ok, pid} = Task.Supervisor.start_child( + Aspike.TextServer.ProcessorSupervisor, + Aspike.TextServer.Processor, :process, [client]) + :ok = :gen_tcp.controlling_process(client, pid) + loop_acceptor(socket) + end +end diff --git a/apps/aspike_server_text_protocol/lib/aspike_text_server_command.ex b/apps/aspike_server_text_protocol/lib/aspike_text_server_command.ex new file mode 100644 index 0000000..11395b2 --- /dev/null +++ b/apps/aspike_server_text_protocol/lib/aspike_text_server_command.ex @@ -0,0 +1,150 @@ +defmodule Aspike.TextServer.Command do + require Logger + @doc ~S""" + Parses the given `line` into a command. + + ## Examples + + iex> Aspike.TextServer.Command.parse "CREATE namespace_gateway\r\n" + {:ok, {:create, "namespace_gateway"}} + + iex> Aspike.TextServer.Command.parse "CREATE namespace_gateway \r\n" + {:ok, {:create, "namespace_gateway"}} + + iex> Aspike.TextServer.Command.parse "PUT namespace_gateway set_user key_1 bin_1 value_1\r\n" + {:ok, {:put, "namespace_gateway", "set_user", "key_1", [{"bin_1", "value_1"}]}} + + iex> Aspike.TextServer.Command.parse "GET namespace_gateway set_user key_1\r\n" + {:ok, {:get, "namespace_gateway", "set_user", "key_1"}} + + iex> Aspike.TextServer.Command.parse "GETFROMSET namespace_gateway set_user\r\n" + {:ok, {:get_from_set, "namespace_gateway", "set_user"}} + + iex> Aspike.TextServer.Command.parse "GETBYKEY namespace_gateway key_1\r\n" + {:ok, {:get_by_key, "namespace_gateway", "key_1"}} + + iex> Aspike.TextServer.Command.parse "REMOVE namespace_gateway set_user key_1\r\n" + {:ok, {:remove, "namespace_gateway", "set_user", "key_1"}} + + Unknown commands or commands with the wrong number of + arguments return an error: + + iex> Aspike.TextServer.Command.parse "UNKNOWN namespace_gateway key_1\r\n" + {:error, :unknown_command} + + iex> Aspike.TextServer.Command.parse "GET namespace_gateway\r\n" + {:error, :unknown_command} + + """ + def parse(line) do + case String.split(line) do + ["CREATE", ns] -> {:ok, {:create, ns}} + ["NAMESPACES"] -> {:ok, {:namespaces}} + ["SETS", ns] -> {:ok, {:sets, ns}} + ["GET", ns, set, key] -> {:ok, {:get, ns, set, key}} + ["GETFROMSET", ns, set] -> {:ok, {:get_from_set, ns, set}} + ["GETBYKEY", ns, key] -> {:ok, {:get_by_key, ns, key}} + ["PUT", ns, set, key, bin, value] -> {:ok, {:put, ns, set, key, [{bin, value}]}} + ["REMOVE", ns, set, key] -> {:ok, {:remove, ns, set, key}} + _ -> {:error, :unknown_command} + end + end + + @doc """ + Runs the given `command` on a server with `pid`. + """ + def run(command, pid) + + def run({:create, ns}, pid) do + case Aspike.Ns.Registry.create(pid, ns) do + :ok -> {:ok, "OK\r\n"} + :exists_already -> {:error, :exists_already} + end + end + + def run({:namespaces}, pid) do + xs = Aspike.Ns.Registry.list(pid) +# :io.format("[run] ~p~n", [xs]) + value = mk_string(xs) + {:ok, "#{value}\r\nOK\r\n"} + end + + def run({:sets, ns}, pid) do + lookup pid, ns, fn ns_pid -> + xs = Aspike.Ns.list(ns_pid) +# :io.format("[run] ~p~n", [xs]) + value = mk_string(xs) + {:ok, "#{value}\r\nOK\r\n"} + end + end + + def run({:get, ns, set, key}, pid) do + lookup pid, ns, fn ns_pid -> + value = case Aspike.Ns.get(ns_pid, set, key) do + nil -> "" + xs -> mk_string(xs) + end + {:ok, "#{value}\r\nOK\r\n"} + end + end + + def run({:get_from_set, ns, set}, pid) do + lookup pid, ns, fn ns_pid -> + value = case Aspike.Ns.get_from_set(ns_pid, set) do + nil -> "" + xs -> mk_string(xs) + end + {:ok, "#{value}\r\nOK\r\n"} + end + end + + def run({:get_by_key, ns, key}, pid) do + lookup pid, ns, fn ns_pid -> + value = case Aspike.Ns.get_by_key(ns_pid, key) do + nil -> "" + xs -> mk_string(xs) + end + {:ok, "#{value}\r\nOK\r\n"} + end + end + + def run({:put, ns, set, key, [{bin, value}]}, pid) do + lookup pid, ns, fn ns_pid -> + Aspike.Ns.put(ns_pid, set, key, [{bin, value}]) + {:ok, "OK\r\n"} + end + end + + def run({:remove, ns, set, key}, pid) do + lookup pid, ns, fn ns_pid -> + Aspike.Ns.remove(ns_pid, set, key) + {:ok, "OK\r\n"} + end + end + + defp lookup(pid, ns, callback) do + case Aspike.Ns.Registry.lookup(pid, ns) do + {:ok, ns_pid} -> callback.(ns_pid) + :error -> {:error, :not_found} + end + end + + defp mk_string(x) when is_binary(x) do + x + end + + defp mk_string({x, y}) do + "{#{x},#{y}}" + end + + defp mk_string(xs) when is_list(xs) do + "[" <> Enum.join((for x <- xs, do: mk_string(x)), ",") <> "]" + end + + defp mk_string(xs) when is_map(xs) do + "%{" <> + Enum.join((for {sk, vs} <- xs, do: + mk_string(sk) <> ":" <> mk_string(vs)), + ",") <> "}" + end +end diff --git a/apps/aspike_server_text_protocol/lib/aspike_text_server_processor.ex b/apps/aspike_server_text_protocol/lib/aspike_text_server_processor.ex new file mode 100644 index 0000000..f549569 --- /dev/null +++ b/apps/aspike_server_text_protocol/lib/aspike_text_server_processor.ex @@ -0,0 +1,46 @@ +defmodule Aspike.TextServer.Processor do + @moduledoc false + + def process(socket) do + msg = + with {:ok, data} <- read_line(socket), + {:ok, command} <- Aspike.TextServer.Command.parse(data), + do: Aspike.TextServer.Command.run(command, Aspike.Ns.Registry) + + write_line(socket, msg) + process(socket) + end + + defp read_line(socket) do + :gen_tcp.recv(socket, 0) + end + + defp write_line(socket, {:ok, text}) do + :gen_tcp.send(socket, text) + end + + defp write_line(socket, {:error, :unknown_command}) do + # Known error. Write to the client. + :gen_tcp.send(socket, "UNKNOWN COMMAND\r\n") + end + + defp write_line(socket, {:error, :exists_already}) do + # Known error. Write to the client. + :gen_tcp.send(socket, "EXISTS ALREADY\r\n") + end + + defp write_line(_socket, {:error, :closed}) do + # The connection was closed, exit politely. + exit(:shutdown) + end + + defp write_line(socket, {:error, :not_found}) do + :gen_tcp.send(socket, "NOT FOUND\r\n") + end + + defp write_line(socket, {:error, error}) do + # Unknown error. Write to the client and exit. + :gen_tcp.send(socket, "ERROR\r\n") + exit(error) + end +end diff --git a/apps/aspike_server_text_protocol/mix.exs b/apps/aspike_server_text_protocol/mix.exs new file mode 100644 index 0000000..1a3ec05 --- /dev/null +++ b/apps/aspike_server_text_protocol/mix.exs @@ -0,0 +1,35 @@ +defmodule Aspike.TextServer.MixProject do + use Mix.Project + + def project do + [ + app: :aspike_server_text_protocol, + version: "0.1.0", + build_path: "../../_build", + config_path: "../../config/config.exs", + deps_path: "../../deps", + lockfile: "../../mix.lock", + elixir: "~> 1.15", + start_permanent: Mix.env() == :prod, + deps: deps() + ] + end + + # Run "mix help compile.app" to learn about applications. + def application do + [ + extra_applications: [:logger], + mod: {Aspike.TextServer.Application, []} + ] + end + + # Run "mix help deps" to learn about dependencies. + defp deps do + [ + # {:dep_from_hexpm, "~> 0.3.0"}, + # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}, + # {:sibling_app_in_umbrella, in_umbrella: true} + {:aspike_storage, in_umbrella: true} + ] + end +end diff --git a/apps/aspike_server_text_protocol/test/aspike_text_server_command_test.exs b/apps/aspike_server_text_protocol/test/aspike_text_server_command_test.exs new file mode 100644 index 0000000..4f2838a --- /dev/null +++ b/apps/aspike_server_text_protocol/test/aspike_text_server_command_test.exs @@ -0,0 +1,27 @@ +defmodule Aspike.TextServer.CommandTest do + use ExUnit.Case + + @moduletag :capture_log + doctest Aspike.TextServer.Command + + setup do + {:ok, ns_registry_pid} = Aspike.Ns.Registry.start_link(:test_registry) + {:ok, ns_registry_pid: ns_registry_pid} + end + + test "module exists" do + assert is_list(Aspike.TextServer.Command.module_info()) + end + + test "create namespace", %{ns_registry_pid: ns_registry_pid} do + assert Aspike.TextServer.Command.run({:create, "namespace_1"}, ns_registry_pid) + == {:ok, "OK\r\n"} + end + + test "create existing namespace", %{ns_registry_pid: ns_registry_pid} do + assert Aspike.TextServer.Command.run({:create, "namespace_1"}, ns_registry_pid) + == {:ok, "OK\r\n"} + assert Aspike.TextServer.Command.run({:create, "namespace_1"}, ns_registry_pid) + == {:error, :exists_already} + end +end diff --git a/apps/aspike_server_text_protocol/test/aspike_text_server_test.exs b/apps/aspike_server_text_protocol/test/aspike_text_server_test.exs new file mode 100644 index 0000000..ff7ae35 --- /dev/null +++ b/apps/aspike_server_text_protocol/test/aspike_text_server_test.exs @@ -0,0 +1,65 @@ +defmodule Aspike.TextServerTest do + use ExUnit.Case + @moduletag :capture_log + doctest Aspike.TextServer + + setup do + Application.stop(:aspike_storage) + :ok = Application.start(:aspike_storage) + end + + setup do + opts = [:binary, packet: :line, active: false] + {:ok, socket} = :gen_tcp.connect('localhost', 4040, opts) + {:ok, socket: socket} + end + + test "server interaction", %{socket: socket} do + assert send_and_recv(socket, "UNKNOWN namespace\r\n") == "UNKNOWN COMMAND\r\n" + + assert send_and_recv(socket, "GET ns_1 set_1 key_1\r\n") == "NOT FOUND\r\n" + + assert send_and_recv(socket, "CREATE ns_1\r\n") == "OK\r\n" + + assert send_and_recv(socket, "PUT ns_1 set_1 key_1 bin_1 value_1\r\n") == "OK\r\n" + + # GET returns two lines + assert send_and_recv(socket, "GET ns_1 set_1 key_1\r\n") == "[{bin_1,value_1}]\r\n" + assert send_and_recv(socket, "") == "OK\r\n" + + assert send_and_recv(socket, "REMOVE ns_1 set_1 key_1\r\n") == "OK\r\n" + + # GET returns two lines + assert send_and_recv(socket, "GET ns_1 set_1 key_1\r\n") == "\r\n" + assert send_and_recv(socket, "") == "OK\r\n" + + # GETSET tests + assert send_and_recv(socket, "PUT ns_1 set_1 key_1 bin_1 value_1\r\n") == "OK\r\n" + + # GETFROMSET returns two lines + assert send_and_recv(socket, "GETFROMSET ns_1 set_1\r\n") == + "%{{set_1,key_1}:[{bin_1,value_1}]}\r\n" + assert send_and_recv(socket, "") == "OK\r\n" + + assert send_and_recv(socket, "PUT ns_1 set_1 key_2 bin_1 value_2\r\n") == "OK\r\n" + assert send_and_recv(socket, "PUT ns_1 set_1 key_3 bin_3 value_3\r\n") == "OK\r\n" + + assert send_and_recv(socket, "PUT ns_1 set_2 key_2 bin_1 value_2\r\n") == "OK\r\n" + assert send_and_recv(socket, "PUT ns_1 set_2 key_3 bin_3 value_3\r\n") == "OK\r\n" + + assert send_and_recv(socket, "GETFROMSET ns_1 set_1\r\n") == + "%{{set_1,key_1}:[{bin_1,value_1}],{set_1,key_2}:[{bin_1,value_2}],{set_1,key_3}:[{bin_3,value_3}]}\r\n" + assert send_and_recv(socket, "") == "OK\r\n" + + # GETBYKEY test + assert send_and_recv(socket, "GETBYKEY ns_1 key_2\r\n") == + "%{{set_1,key_2}:[{bin_1,value_2}],{set_2,key_2}:[{bin_1,value_2}]}\r\n" + assert send_and_recv(socket, "") == "OK\r\n" + end + + defp send_and_recv(socket, command) do + :ok = :gen_tcp.send(socket, command) + {:ok, data} = :gen_tcp.recv(socket, 0, 1000) + data + end +end diff --git a/apps/aspike_server_text_protocol/test/test_helper.exs b/apps/aspike_server_text_protocol/test/test_helper.exs new file mode 100644 index 0000000..869559e --- /dev/null +++ b/apps/aspike_server_text_protocol/test/test_helper.exs @@ -0,0 +1 @@ +ExUnit.start() diff --git a/apps/aspike_storage/README.md b/apps/aspike_storage/README.md index 73b705e..3982632 100644 --- a/apps/aspike_storage/README.md +++ b/apps/aspike_storage/README.md @@ -1,6 +1,6 @@ # AspikeStorage -**TODO: Add description** +**Storage for Aerospike Server Emulator in Erlang/Elixir** ## Installation diff --git a/apps/aspike_storage/lib/aspike_ns.ex b/apps/aspike_storage/lib/aspike_ns.ex new file mode 100644 index 0000000..e963b22 --- /dev/null +++ b/apps/aspike_storage/lib/aspike_ns.ex @@ -0,0 +1,62 @@ +defmodule Aspike.Ns do + + @doc """ + Starts a new namespace. + """ + def start_link do + Agent.start_link(fn -> %{} end) + end + + def list(namespace) do + keys = Agent.get(namespace, &Map.keys(&1)) + sets = Enum.map(Enum.uniq(Enum.map(keys, &elem(&1, 0))), &to_string(&1)) +# :io.format("[list] ~p~n", [sets]) + sets + end + + @doc """ + Gets a value from the (`namespace(pid)`, `set`) by `key`. + """ + def get(namespace, set, key) do + Agent.get(namespace, &Map.get(&1, {set, key})) + end + + @doc """ + Gets values from the (`namespace(pid)`, `set`). + """ + def get_from_set(namespace, set) do + Agent.get(namespace, &Map.filter(&1, fn {{s,_},_} -> s == set end)) + end + + @doc """ + Gets values from the `namespace(pid)` by `key`. + """ + def get_by_key(namespace, key) do + Agent.get(namespace, &Map.filter(&1, fn {{_,k},_} -> k == key end)) + end + + @doc """ + Puts `[{bin, value}]` pairs in the (`namespace(pid)`, `set`). + """ + def put(namespace, set, key, bin_value_pairs) do + Agent.update(namespace, &Map.put(&1, {set, key}, bin_value_pairs)) + end + + @doc """ + Removes `key` from (`namespace(pid)`, `set`). + + Returns the current value of `key`, if `key` exists. + """ + def remove(namespace, set, key) do + Agent.get_and_update(namespace, &Map.pop(&1, {set, key})) + end + + @doc """ + Checks if `key` exists in (`namespace(pid)`, `set`). + + Returns `true`, if `key` exists; `false` otherwise. + """ + def exists(namespace, set, key) do + Agent.get(namespace, &Map.has_key?(&1, {set, key})) + end +end diff --git a/apps/aspike_storage/lib/aspike_ns_registry.ex b/apps/aspike_storage/lib/aspike_ns_registry.ex new file mode 100644 index 0000000..9fc4f39 --- /dev/null +++ b/apps/aspike_storage/lib/aspike_ns_registry.ex @@ -0,0 +1,83 @@ +defmodule Aspike.Ns.Registry do + use GenServer + + ## Client API + + @doc """ + Starts the namespace registry. + """ + def start_link(name) do + GenServer.start_link(__MODULE__, name, name: name) + end + + @doc """ + Stops the namespace registry. + """ + def stop(server) do + GenServer.stop(server) + end + + @doc """ + Looks up the namespace pid for `namespace` stored in `server`. + + Returns `{:ok, pid}` if the namespace exists, `:error` otherwise. + """ + def lookup(server, namespace) when is_atom(server) do + case :ets.lookup(server, namespace) do + [{^namespace, pid}] -> {:ok, pid} + [] -> :error + end + end + + @doc """ + Ensures there is a namespace associated to the given `namespace` in `server`. + """ + def create(server, namespace) do + GenServer.call(server, {:create, namespace}) + end + + def list(server) do + GenServer.call(server, {:list}) + end + + ## GenServer callbacks + + def init(table) do + # nss = %{} # namespace -> pid + nss = :ets.new(table, + [:named_table, :protected, read_concurrency: true]) + refs = %{} # ref -> namespace + {:ok, {nss, refs}} + end + + def handle_call({:create, ns}, _from, {nss, refs} = state) do + case lookup(nss, ns) do + {:ok, _pid} -> + {:reply, :exists_already, state} + :error -> + {:ok, pid} = Aspike.Ns.Sup.start_ns + ref = Process.monitor(pid) + refs = Map.put(refs, ref, ns) + :ets.insert(nss, {ns, pid}) + {:reply, :ok, {nss, refs}} + end + end + + def handle_call({:list}, _from, {nss, _refs} = state) do + xs = :ets.tab2list(nss) +# :io.format("[List] ~p~n", [x]) + ys = Enum.map(xs, &elem(&1, 0)) +# :io.format("[List] ~p~n", [y]) + {:reply, ys, state} + end + + def handle_info({:DOWN, ref, :process, _pid, _reason}, {nss, refs}) do + {ns, refs} = Map.pop(refs, ref) + :ets.delete(nss, ns) + {:noreply, {nss, refs}} + end + + def handle_info(_msg, state) do + {:noreply, state} + end +end diff --git a/apps/aspike_storage/lib/aspike_ns_sup.ex b/apps/aspike_storage/lib/aspike_ns_sup.ex new file mode 100644 index 0000000..14c8337 --- /dev/null +++ b/apps/aspike_storage/lib/aspike_ns_sup.ex @@ -0,0 +1,26 @@ +defmodule Aspike.Ns.Sup do + use DynamicSupervisor + + @moduledoc false + + # A simple module attribute that stores the supervisor name + @name Aspike.Ns.Sup + + def start_link do + DynamicSupervisor.start_link(__MODULE__, :ok, name: @name) + end + + def start_ns do + spec = %{ + id: Aspike.Ns, + start: {Aspike.Ns, :start_link, []}, + restart: :temporary + } + DynamicSupervisor.start_child(__MODULE__, spec) + end + + @impl true + def init(:ok) do + DynamicSupervisor.init(strategy: :one_for_one) + end +end diff --git a/apps/aspike_storage/lib/aspike_storage.ex b/apps/aspike_storage/lib/aspike_storage.ex index 3937f3a..97b1c4e 100644 --- a/apps/aspike_storage/lib/aspike_storage.ex +++ b/apps/aspike_storage/lib/aspike_storage.ex @@ -1,18 +1,9 @@ -defmodule AspikeStorage do - @moduledoc """ - Documentation for `AspikeStorage`. - """ +defmodule Aspike.Storage do + @moduledoc false + use Application - @doc """ - Hello world. - - ## Examples - - iex> AspikeStorage.hello() - :world - - """ - def hello do - :world + def start(_type, _args) do + Aspike.Storage.Sup.start_link end + end diff --git a/apps/aspike_storage/lib/aspike_storage_sup.ex b/apps/aspike_storage/lib/aspike_storage_sup.ex new file mode 100644 index 0000000..8877f78 --- /dev/null +++ b/apps/aspike_storage/lib/aspike_storage_sup.ex @@ -0,0 +1,25 @@ +defmodule Aspike.Storage.Sup do + @moduledoc false + use Supervisor + + def start_link do + Supervisor.start_link(__MODULE__, :ok, name: __MODULE__) + end + + @impl true + def init(:ok) do + children = [ + %{ + id: Aspike.Ns.Registry, + type: :worker, + start: {Aspike.Ns.Registry, :start_link, [Aspike.Ns.Registry]} + }, + %{ + id: Aspike.Ns.Sup, + type: :supervisor, + start: {Aspike.Ns.Sup, :start_link, []} + } + ] + Supervisor.init(children, strategy: :rest_for_one) + end +end diff --git a/apps/aspike_storage/mix.exs b/apps/aspike_storage/mix.exs index 6bc870e..6c3d39c 100644 --- a/apps/aspike_storage/mix.exs +++ b/apps/aspike_storage/mix.exs @@ -18,7 +18,8 @@ defmodule AspikeStorage.MixProject do # Run "mix help compile.app" to learn about applications. def application do [ - extra_applications: [:logger] + extra_applications: [:logger, :eex, :wx, :observer, :runtime_tools], + mod: {Aspike.Storage, []} ] end diff --git a/apps/aspike_storage/test/aspike_ns_registry_test.exs b/apps/aspike_storage/test/aspike_ns_registry_test.exs new file mode 100644 index 0000000..c9693ee --- /dev/null +++ b/apps/aspike_storage/test/aspike_ns_registry_test.exs @@ -0,0 +1,57 @@ +defmodule Aspike.Ns.RegistryTest do + use ExUnit.Case, async: true + +# alias AspikeNsRegistry +# +# @moduletag :capture_log + +# doctest Aspike.Ns.Registry + + setup context do + {:ok, _} = Aspike.Ns.Registry.start_link(context.test) + {:ok, registry: context.test} + end + + test "module exists" do + assert is_list(Aspike.Ns.Registry.module_info()) + end + + test "spawns namespaces", %{registry: registry} do + assert Aspike.Ns.Registry.lookup(registry, "namespace-1") == :error + + Aspike.Ns.Registry.create(registry, "namespace-1") + assert {:ok, ns} = Aspike.Ns.Registry.lookup(registry, "namespace-1") + assert is_pid(ns) + + Aspike.Ns.put(ns, "set-1", "key-1", [{"bin-1", "value-1"}]) + assert Aspike.Ns.get(ns, "set-1", "key-1") == [{"bin-1", "value-1"}] + end + + test "removes namespaces on exit", %{registry: registry} do + Aspike.Ns.Registry.create(registry, "namespace-1") + assert {:ok, ns} = Aspike.Ns.Registry.lookup(registry, "namespace-1") + assert is_pid(ns) + + Agent.stop(ns) + # Do a call to ensure the registry processed the DOWN message + _ = Aspike.Ns.Registry.create(registry, "bogus") + assert Aspike.Ns.Registry.lookup(registry, "namespace-1") == :error + end + + test "removes namespace on exit", %{registry: registry} do + Aspike.Ns.Registry.create(registry, "namespace-1") + assert {:ok, ns} = Aspike.Ns.Registry.lookup(registry, "namespace-1") + assert is_pid(ns) + + # Stop the namespace with non-normal reason + Process.exit(ns, :shutdown) + + # Wait until the namespace is dead + ref = Process.monitor(ns) + assert_receive {:DOWN, ^ref, _, _, _} + + # Do a call to ensure the registry processed the DOWN message + _ = Aspike.Ns.Registry.create(registry, "bogus") + assert Aspike.Ns.Registry.lookup(registry, "namespace-1") == :error + end +end diff --git a/apps/aspike_storage/test/aspike_ns_test.exs b/apps/aspike_storage/test/aspike_ns_test.exs new file mode 100644 index 0000000..3b93415 --- /dev/null +++ b/apps/aspike_storage/test/aspike_ns_test.exs @@ -0,0 +1,82 @@ +defmodule Aspike.NsTest do + use ExUnit.Case, async: true + +# alias AspikeNamespace +# +# @moduletag :capture_log + + doctest Aspike.Ns + + setup do + {:ok, namespace} = Aspike.Ns.start_link + {:ok, namespace: namespace} + end + + test "module exists" do + assert is_list(Aspike.Ns.module_info()) + end + + test "stores set, key, [{bin1, value1},...,{binN, valueN}] in namespace", + %{namespace: namespace} do + assert Aspike.Ns.get(namespace, "set1", <<"key1">>) == nil + + Aspike.Ns.put(namespace, "set1", <<"key1">>, [{"bin1", "value1"}]) + assert Aspike.Ns.get(namespace, "set1", <<"key1">>) == [{"bin1", "value1"}] + end + + test "retrieves all key-value pairs from set in namespace", + %{namespace: namespace} do + assert Aspike.Ns.get_from_set(namespace, "set1") == %{} + + Aspike.Ns.put(namespace, "set1", <<"s1key1">>, [{"s1bin1", "s1value1"}]) + Aspike.Ns.put(namespace, "set1", <<"s1key2">>, [{"s1bin1", "s1value2"}]) + Aspike.Ns.put(namespace, "set1", <<"s1key3">>, [{"s1bin2", "s1value3"}]) + + Aspike.Ns.put(namespace, "set2", <<"s2key1">>, [{"s2bin1", "s2value1"}]) + Aspike.Ns.put(namespace, "set2", <<"s2key2">>, [{"s2bin1", "s2value2"}]) + + assert Aspike.Ns.get_from_set(namespace, "set1") == + %{ + {"set1", <<"s1key1">>} => [{"s1bin1", "s1value1"}], + {"set1", <<"s1key2">>} => [{"s1bin1", "s1value2"}], + {"set1", <<"s1key3">>} => [{"s1bin2", "s1value3"}] + } + end + + test "retrieves all key-value pairs from namespace by key", + %{namespace: namespace} do + assert Aspike.Ns.get_by_key(namespace, "key1") == %{} + + Aspike.Ns.put(namespace, "set1", <<"key1">>, [{"s1bin1", "s1value1"}]) + Aspike.Ns.put(namespace, "set1", <<"key2">>, [{"s1bin1", "s1value2"}]) + Aspike.Ns.put(namespace, "set1", <<"key3">>, [{"s1bin2", "s1value3"}]) + + Aspike.Ns.put(namespace, "set2", <<"key1">>, [{"s2bin1", "s2value1"}]) + Aspike.Ns.put(namespace, "set2", <<"key2">>, [{"s2bin1", "s2value2"}]) + + assert Aspike.Ns.get_by_key(namespace, "key1") == + %{ + {"set1", <<"key1">>} => [{"s1bin1", "s1value1"}], + {"set2", <<"key1">>} => [{"s2bin1", "s2value1"}] + } + end + + test "removes key from set", + %{namespace: namespace} do + assert Aspike.Ns.get(namespace, "set1", <<"key1">>) == nil + + Aspike.Ns.put(namespace, "set1", <<"key1">>, [{"bin1", "value1"}]) + assert Aspike.Ns.get(namespace, "set1", <<"key1">>) == [{"bin1", "value1"}] + + Aspike.Ns.remove(namespace, "set1", <<"key1">>) + assert Aspike.Ns.get(namespace, "set1", <<"key1">>) == nil + end + + test "key exists", + %{namespace: namespace} do + assert Aspike.Ns.exists(namespace, "set1", <<"key1">>) == false + + Aspike.Ns.put(namespace, "set1", <<"key1">>, [{"bin1", "value1"}]) + assert Aspike.Ns.exists(namespace, "set1", <<"key1">>) == true + end +end diff --git a/apps/aspike_storage/test/aspike_storage_test.exs b/apps/aspike_storage/test/aspike_storage_test.exs deleted file mode 100644 index 1b600a3..0000000 --- a/apps/aspike_storage/test/aspike_storage_test.exs +++ /dev/null @@ -1,8 +0,0 @@ -defmodule AspikeStorageTest do - use ExUnit.Case - doctest AspikeStorage - - test "greets the world" do - assert AspikeStorage.hello() == :world - end -end diff --git a/mix.lock b/mix.lock new file mode 100644 index 0000000..0663f73 --- /dev/null +++ b/mix.lock @@ -0,0 +1,3 @@ +%{ + "aspike_protocol": {:git, "git@github.com:vsavkov/aspike-protocol.git", "b73c0a6189e4f25b03f1a82a02945573122f3561", [tag: "v0.1.0"]}, +}