Skip to content

Commit

Permalink
Update TigerBeetle to 0.15.4
Browse files Browse the repository at this point in the history
  • Loading branch information
rbino committed Aug 4, 2024
1 parent 7735f3c commit f407ecc
Show file tree
Hide file tree
Showing 17 changed files with 73 additions and 100 deletions.
31 changes: 12 additions & 19 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
matrix:
otp: ["25.3"]
elixir: ["1.14"]
zig: ["0.11.0"]
zig: ["0.13.0"]
steps:
- name: Clone the repository
uses: actions/checkout@v2
Expand Down Expand Up @@ -102,30 +102,23 @@ jobs:
with:
submodules: recursive

- name: Fetch TigerBeetle
- name: Fetch TigerBeetle for Linux
if: runner.os == 'Linux'
working-directory: ./src/tigerbeetle
run: |
curl -Lo tigerbeetle.zip https://linux.tigerbeetle.com && unzip tigerbeetle.zip
- name: Fetch TigerBeetle for Mac
if: runner.os == 'macOS'
working-directory: ./src/tigerbeetle
run: |
git fetch --tags --force --quiet
version="$(git describe --tags)"
os="$(uname)"
arch="$(uname -m)"
if [ "$os" = "Darwin" ]; then
arch="universal"
os="macos"
elif [ "$os" = "Linux" ]; then
os="linux"
else
echo "Unsupported OS."
exit 1
fi
curl -sLO "https://github.com/tigerbeetle/tigerbeetle/releases/download/$version/tigerbeetle-$arch-$os.zip"
unzip -qo "tigerbeetle-$arch-$os.zip"
curl -Lo tigerbeetle.zip https://mac.tigerbeetle.com && unzip tigerbeetle.zip
- name: Start TigerBeetle
working-directory: ./src/tigerbeetle
run: |
./tigerbeetle format --cluster=0 --replica=0 --replica-count=1 0_0.tigerbeetle
./tigerbeetle start --addresses=3000 0_0.tigerbeetle &
./tigerbeetle format --cluster=0 --replica=0 --replica-count=1 --development 0_0.tigerbeetle
./tigerbeetle start --addresses=3000 --development 0_0.tigerbeetle &
- name: Install OTP and Elixir
if: runner.os == 'Linux'
Expand Down
1 change: 0 additions & 1 deletion bench/benchmark.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ alias TigerBeetlex.TransferBatch
name: :tb,
cluster_id: <<0::128>>,
addresses: ["3000"],
concurrency_max: 1,
partitions: 1
)

Expand Down
8 changes: 4 additions & 4 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,26 @@ pub fn build(b: *std.Build) void {
"-noshell",
};

break :blk b.exec(&argv);
break :blk b.run(&argv);
};

const lib = b.addSharedLibrary(.{
.name = "tigerbeetlex",
// In this case the main source file is merely a path, however, in more
// complicated build scripts, this could be a generated file.
.root_source_file = .{ .path = "src/tigerbeetlex.zig" },
.root_source_file = .{ .src_path = .{ .owner = b, .sub_path = "src/tigerbeetlex.zig" } },
.target = target,
.optimize = optimize,
.link_libc = true,
});
lib.addSystemIncludePath(.{ .path = erts_include_dir });
lib.addSystemIncludePath(.{ .cwd_relative = erts_include_dir });
// This is needed to avoid errors on MacOS when loading the NIF
lib.linker_allow_shlib_undefined = true;

// Do this so `lib` doesn't get prepended to the lib name, and `.so` is used as suffix also
// on MacOS, since it's required by the NIF loading mechanism.
// See https://github.com/ziglang/zig/issues/2231
const nif_so_install = b.addInstallFileWithDir(lib.getOutputSource(), .lib, "tigerbeetlex.so");
const nif_so_install = b.addInstallFileWithDir(lib.getEmittedBin(), .lib, "tigerbeetlex.so");
nif_so_install.step.dependOn(&lib.step);
b.getInstallStep().dependOn(&nif_so_install.step);
}
13 changes: 4 additions & 9 deletions lib/tigerbeetlex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,21 @@ defmodule TigerBeetlex do
- `addresses` (list of `String.t()`) - The list of node addresses. These can either be a single
digit (e.g. `"3000"`), which is interpreted as a port on `127.0.0.1`, an IP address + port (e.g.
`"127.0.0.1:3000"`), or just an IP address (e.g. `"127.0.0.1"`), which defaults to port `3001`.
- `concurrency_max` (`pos_integer/0`) - The maximum number of concurrent requests the client can
handle. 32 is a good default, and can be increased to 4096 if there's the need of increased
throughput.
## Examples
{:ok, client} = TigerBeetlex.connect(<<0::128>>, ["3000"], 32)
"""
@spec connect(
cluster_id :: Types.id_128(),
addresses :: [binary()],
concurrency_max :: pos_integer()
addresses :: [binary()]
) ::
{:ok, t()} | {:error, Types.client_init_error()}
def connect(<<_::128>> = cluster_id, addresses, concurrency_max)
when is_list(addresses) and is_integer(concurrency_max) and
concurrency_max > 0 do
def connect(<<_::128>> = cluster_id, addresses)
when is_list(addresses) do
joined_addresses = Enum.join(addresses, ",")

with {:ok, ref} <- NifAdapter.client_init(cluster_id, joined_addresses, concurrency_max) do
with {:ok, ref} <- NifAdapter.client_init(cluster_id, joined_addresses) do
{:ok, %__MODULE__{ref: ref}}
end
end
Expand Down
13 changes: 1 addition & 12 deletions lib/tigerbeetlex/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,6 @@ defmodule TigerBeetlex.Connection do
interpreted as a port on `127.0.0.1`, an IP address + port (e.g. `"127.0.0.1:3000"`), or just
an IP address (e.g. `"127.0.0.1"`), which defaults to port `3001`.
"""
],
concurrency_max: [
type: :pos_integer,
required: true,
doc: """
The maximum number of concurrent requests the client can handle. 32 is a good default, and can
be increased to 4096 if there's the need of increased throughput.
"""
]
]

Expand Down Expand Up @@ -88,15 +80,13 @@ defmodule TigerBeetlex.Connection do
TigerBeetlex.Connection.start_link(
cluster_id: <<0::128>>,
addresses: ["3000"],
concurrency_max: 32
)
# Start a named TigerBeetlex connection
{:ok, pid} =
TigerBeetlex.Connection.start_link(
cluster_id: <<0::128>>,
addresses: ["3000"],
concurrency_max: 32,
name: :tb
)
"""
Expand All @@ -113,9 +103,8 @@ defmodule TigerBeetlex.Connection do

cluster_id = Keyword.fetch!(tigerbeetlex_opts, :cluster_id)
addresses = Keyword.fetch!(tigerbeetlex_opts, :addresses)
concurrency_max = Keyword.fetch!(tigerbeetlex_opts, :concurrency_max)

with {:ok, client} <- TigerBeetlex.connect(cluster_id, addresses, concurrency_max) do
with {:ok, client} <- TigerBeetlex.connect(cluster_id, addresses) do
start_opts = Keyword.merge(partition_supervisor_opts, child_spec: {Receiver, client})
PartitionSupervisor.start_link(start_opts)
end
Expand Down
5 changes: 2 additions & 3 deletions lib/tigerbeetlex/nif_adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ defmodule TigerBeetlex.NifAdapter do

@spec client_init(
cluster_id :: Types.id_128(),
addresses :: binary(),
concurrency_max :: pos_integer()
addresses :: binary()
) ::
{:ok, Types.client()} | {:error, Types.client_init_error()}
def client_init(_cluster_id, _addresses, _concurrency_max) do
def client_init(_cluster_id, _addresses) do
:erlang.nif_error(:nif_not_loaded)
end

Expand Down
1 change: 0 additions & 1 deletion lib/tigerbeetlex/types.ex
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ defmodule TigerBeetlex.Types do
@type tigerbeetlex_connection_start_option ::
{:cluster_id, non_neg_integer()}
| {:addresses, [String.t()]}
| {:concurrency_max, [pos_integer()]}

@type partition_supervisor_start_option :: {atom(), any()}

Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule TigerBeetlex.MixProject do
app: :tigerbeetlex,
version: "0.1.0",
elixir: "~> 1.14",
install_zig: "0.11.0",
install_zig: "0.13.0",
zig_build_mode: zig_build_mode(Mix.env()),
compilers: [:build_dot_zig] ++ Mix.compilers(),
start_permanent: Mix.env() == :prod,
Expand Down
8 changes: 4 additions & 4 deletions src/batch.zig
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ pub fn append(
}
batch.len += 1;

// We need to pass the item as bytes and copy it with std.mem.copy
// We need to pass the item as bytes and copy it with @memcpy
// because we can't enforce a specifi alignment to the underlying
// ErlNifBinary, which becomes our slice of bytes

// Get a pointer to the memory backing the newly inserted item
const new_batch_item_bytes = std.mem.asBytes(&batch.items[batch.len - 1]);
// Fill it with the input item bytes
std.mem.copy(u8, new_batch_item_bytes, item_bytes);
@memcpy(new_batch_item_bytes, item_bytes);
}

return beam.make_ok(env);
Expand Down Expand Up @@ -106,14 +106,14 @@ pub fn replace(
return error.OutOfBounds;
}

// We need to pass the item as bytes and copy it with std.mem.copy
// We need to pass the item as bytes and copy it with @memcpy
// because we can't enforce a specific alignment to the underlying
// ErlNifBinary, which becomes our slice of bytes

// Get a pointer to the memory backing the newly inserted item
const batch_item_bytes = std.mem.asBytes(&batch.items[idx]);
// Fill it with the input item bytes
std.mem.copy(u8, batch_item_bytes, replacement_item_bytes);
@memcpy(batch_item_bytes, replacement_item_bytes);
}

return beam.make_ok(env);
Expand Down
9 changes: 7 additions & 2 deletions src/beam.zig
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub fn make_error_atom(env: Env, atom_str: []const u8) Term {
pub fn make_slice(env: Env, val: []const u8) Term {
var result: Term = undefined;
var bin: [*]u8 = @ptrCast(e.enif_make_new_binary(env, val.len, &result));
std.mem.copy(u8, bin[0..val.len], val);
@memcpy(bin[0..val.len], val);

return result;
}
Expand Down Expand Up @@ -140,7 +140,7 @@ pub fn get_u128(env: Env, src_term: Term) GetError!u128 {
// We represent the u128 as a 16 byte binary, little endian (required by TigerBeetle)
if (bin.len != required_length) return GetError.ArgumentError;

return std.mem.readIntLittle(u128, bin[0..required_length]);
return std.mem.readInt(u128, bin[0..required_length], .little);
}

/// Extract a u64 from a term
Expand Down Expand Up @@ -198,3 +198,8 @@ pub fn alloc_env() Env {
pub fn clear_env(env: Env) void {
e.enif_clear_env(env);
}

/// Frees a process independent environment
pub fn free_env(env: Env) void {
e.enif_free_env(env);
}
4 changes: 2 additions & 2 deletions src/beam/allocator.zig
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ fn large_beam_free(

fn aligned_alloc(len: usize, log2_align: u8) ?[*]u8 {
const alignment = @as(usize, 1) << @as(Allocator.Log2Align, @intCast(log2_align));
var unaligned_ptr: [*]u8 = @ptrCast(e.enif_alloc(len + alignment - 1 + @sizeOf(usize)) orelse return null);
const unaligned_ptr: [*]u8 = @ptrCast(e.enif_alloc(len + alignment - 1 + @sizeOf(usize)) orelse return null);
const unaligned_addr = @intFromPtr(unaligned_ptr);
const aligned_addr = std.mem.alignForward(usize, unaligned_addr + @sizeOf(usize), alignment);
var aligned_ptr = unaligned_ptr + (aligned_addr - unaligned_addr);
const aligned_ptr = unaligned_ptr + (aligned_addr - unaligned_addr);
get_header(aligned_ptr).* = unaligned_ptr;

return aligned_ptr;
Expand Down
2 changes: 1 addition & 1 deletion src/beam/resource.zig
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub fn Resource(comptime T: anytype, comptime deinit_fn: ?DeinitFn) type {

/// Allocates the memory of the resource
pub fn alloc() !Self {
var raw_ptr: ?*anyopaque = e.enif_alloc_resource(res_type(), @sizeOf(T));
const raw_ptr: ?*anyopaque = e.enif_alloc_resource(res_type(), @sizeOf(T));

if (raw_ptr) |p| {
return Self{ .raw_ptr = p };
Expand Down
43 changes: 23 additions & 20 deletions src/client.zig
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,21 @@ const RequestContext = struct {
caller_pid: beam.Pid,
request_ref_binary: beam.Binary,
payload_raw_obj: *anyopaque,
client_raw_obj: *anyopaque,
};

pub fn init(env: beam.Env, cluster_id: u128, addresses: []const u8, concurrency_max: u32) beam.Term {
pub fn init(env: beam.Env, cluster_id: u128, addresses: []const u8) beam.Term {
const client: tb_client.tb_client_t = tb_client.init(
beam.general_purpose_allocator,
cluster_id,
addresses,
concurrency_max,
@intFromPtr(beam.alloc_env()),
on_completion,
) catch |err| switch (err) {
error.Unexpected => return beam.make_error_atom(env, "unexpected"),
error.OutOfMemory => return beam.make_error_atom(env, "out_of_memory"),
error.AddressInvalid => return beam.make_error_atom(env, "invalid_address"),
error.AddressLimitExceeded => return beam.make_error_atom(env, "address_limit_exceeded"),
error.ConcurrencyMaxInvalid => return beam.make_error_atom(env, "invalid_concurrency_max"),
error.SystemResources => return beam.make_error_atom(env, "system_resources"),
error.NetworkSubsystemFailed => return beam.make_error_atom(env, "network_subsystem"),
};
Expand All @@ -62,7 +61,7 @@ fn OperationBatchItemType(comptime operation: tb_client.tb_operation_t) type {
.create_accounts => Account,
.create_transfers => Transfer,
.lookup_accounts, .lookup_transfers => u128,
.get_account_transfers, .get_account_balances => @panic("TODO"),
.get_account_transfers, .get_account_balances, .query_accounts, .query_transfers => @panic("TODO"),
.pulse => unreachable,
};
}
Expand Down Expand Up @@ -117,16 +116,13 @@ fn submit(
const client = client_resource.value();
const payload = payload_resource.ptr_const();

var out_packet: ?*tb_client.tb_packet_t = undefined;
const status = tb_client.acquire_packet(client, &out_packet);
const packet = switch (status) {
.ok => if (out_packet) |pkt| pkt else @panic("acquire packet returned null"),
.concurrency_max_exceeded => return error.TooManyRequests,
.shutdown => return error.Shutdown,
const packet = beam.general_purpose_allocator.create(tb_client.tb_packet_t) catch {
return error.OutOfMemory;
};
errdefer tb_client.release_packet(client, packet);
errdefer beam.general_purpose_allocator.destroy(packet);

var ctx: *RequestContext = try beam.general_purpose_allocator.create(RequestContext);
errdefer beam.general_purpose_allocator.destroy(ctx);

// We're calling this from a process bound env so we expect not to fail
ctx.caller_pid = process.self(env) catch unreachable;
Expand All @@ -142,8 +138,13 @@ fn submit(
// collected until we release it
payload_resource.keep();

// We save the raw pointer in the context so we can release it later
// We do the same with the client to make sure we don't deinit it until all requests
// have been handled
client_resource.keep();

// We save the raw pointers in the context so we can release it later
ctx.payload_raw_obj = payload_resource.raw_ptr;
ctx.client_raw_obj = client_resource.raw_ptr;

packet.operation = @intFromEnum(operation);
packet.data = payload.items.ptr;
Expand All @@ -163,11 +164,14 @@ fn on_completion(
result_ptr: ?[*]const u8,
result_len: u32,
) callconv(.C) void {
var ctx: *RequestContext = @ptrCast(@alignCast(packet.user_data.?));
_ = client;
const ctx: *RequestContext = @ptrCast(@alignCast(packet.user_data.?));
defer beam.general_purpose_allocator.destroy(ctx);

// We don't need the payload anymore, let the garbage collector take care of it
resource.raw_release(ctx.payload_raw_obj);
// Same for the client
resource.raw_release(ctx.client_raw_obj);

const env: beam.Env = @ptrFromInt(context);
defer beam.clear_env(env);
Expand All @@ -180,6 +184,7 @@ fn on_completion(

const status = beam.make_u8(env, @intFromEnum(packet.status));
const operation = beam.make_u8(env, packet.operation);
beam.general_purpose_allocator.destroy(packet);
const result = if (result_ptr) |p|
beam.make_slice(env, p[0..result_len])
else
Expand All @@ -189,19 +194,17 @@ fn on_completion(
const tag = beam.make_atom(env, "tigerbeetlex_response");
const msg = beam.make_tuple(env, .{ tag, ref, response });

// We're done with the packet, put it back in the pool
tb_client.release_packet(client, packet);

// Send the result to the caller
process.send(caller_pid, env, msg) catch unreachable;
}

fn client_resource_deinit_fn(_: beam.Env, ptr: ?*anyopaque) callconv(.C) void {
if (ptr) |p| {
const cl: *Client = @ptrCast(@alignCast(p));
// TODO: this can now potentially block for a long time since it waits
// for all the requests to be drained, investigate what it is blocking
// and if this needs to be done in a separate thread
tb_client.deinit(cl.*);
defer tb_client.deinit(cl.*);

const completion_ctx = tb_client.completion_context(cl.*);
const env: beam.Env = @ptrFromInt(completion_ctx);
beam.free_env(env);
} else unreachable;
}
2 changes: 1 addition & 1 deletion src/tigerbeetle
Loading

0 comments on commit f407ecc

Please sign in to comment.