Skip to content

Commit

Permalink
Merge pull request redpanda-data#23481 from andrwng/iceberg-value-bytes
Browse files Browse the repository at this point in the history
iceberg: value serialization to/from bytes
  • Loading branch information
andrwng authored Sep 25, 2024
2 parents ff5b7a2 + 4c3d26a commit 6f6c2cb
Show file tree
Hide file tree
Showing 7 changed files with 364 additions and 0 deletions.
20 changes: 20 additions & 0 deletions src/v/iceberg/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -643,3 +643,23 @@ redpanda_cc_library(
"@avro",
],
)

redpanda_cc_library(
name = "values_bytes",
srcs = [
"values_bytes.cc",
],
hdrs = [
"values_bytes.h",
],
implementation_deps = [
"//src/v/bytes:iobuf_parser",
"//src/v/utils:uuid",
],
include_prefix = "iceberg",
deps = [
":datatypes",
":values",
"//src/v/bytes",
],
)
1 change: 1 addition & 0 deletions src/v/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ v_cc_library(
transform_utils.cc
values.cc
values_avro.cc
values_bytes.cc
DEPS
Avro::avro
v::bytes
Expand Down
16 changes: 16 additions & 0 deletions src/v/iceberg/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,22 @@ redpanda_cc_gtest(
],
)

redpanda_cc_gtest(
name = "values_bytes_test",
timeout = "short",
srcs = [
"values_bytes_test.cc",
],
deps = [
"//src/v/bytes",
"//src/v/bytes:iobuf",
"//src/v/iceberg:values",
"//src/v/iceberg:values_bytes",
"//src/v/test_utils:gtest",
"@googletest//:gtest",
],
)

redpanda_cc_gtest(
name = "values_test",
timeout = "short",
Expand Down
1 change: 1 addition & 0 deletions src/v/iceberg/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ rp_test(
transform_utils_test.cc
values_test.cc
values_avro_test.cc
values_bytes_test.cc
LIBRARIES
Avro::avro
Boost::iostreams
Expand Down
149 changes: 149 additions & 0 deletions src/v/iceberg/tests/values_bytes_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "bytes/iobuf.h"
#include "iceberg/values.h"
#include "iceberg/values_bytes.h"

#include <gtest/gtest.h>

using namespace iceberg;

TEST(ValueBytesTest, TestBool) {
{
boolean_value v{false};
iobuf buf;
buf.append("\0", 1);
auto v_bytes = value_to_bytes(v);
ASSERT_EQ(iobuf_to_bytes(buf), value_to_bytes(v));
ASSERT_EQ(value_from_bytes(boolean_type{}, v_bytes), value{v});
}
{
boolean_value v{true};
iobuf buf;
buf.append("\1", 1);
auto v_bytes = value_to_bytes(v);
ASSERT_EQ(iobuf_to_bytes(buf), v_bytes);
ASSERT_EQ(value_from_bytes(boolean_type{}, v_bytes), value{v});
}
}

TEST(ValueBytesTest, TestInt) {
{
int_value v{2};
iobuf buf;
auto bytes = std::vector<uint8_t>{0x02, 0x00, 0x00, 0x00};
buf.append(bytes.data(), 4);
auto v_bytes = value_to_bytes(v);
ASSERT_EQ(iobuf_to_bytes(buf), value_to_bytes(v));
ASSERT_EQ(value_from_bytes(int_type{}, v_bytes), value{v});
}
{
int_value v{-2};
iobuf buf;
auto bytes = std::vector<uint8_t>{0xfe, 0xff, 0xff, 0xff};
buf.append(bytes.data(), 4);
auto v_bytes = value_to_bytes(v);
ASSERT_EQ(iobuf_to_bytes(buf), v_bytes);
ASSERT_EQ(value_from_bytes(int_type{}, v_bytes), value{v});
}
}

TEST(ValueBytesTest, TestLong) {
{
long_value v{2};
iobuf buf;
auto bytes = std::vector<uint8_t>{
0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
buf.append(bytes.data(), 8);
auto v_bytes = value_to_bytes(v);
ASSERT_EQ(iobuf_to_bytes(buf), v_bytes);
ASSERT_EQ(value_from_bytes(long_type{}, v_bytes), value{v});
}
{
long_value v{-2};
iobuf buf;
auto bytes = std::vector<uint8_t>{
0xfe, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
buf.append(bytes.data(), 8);
auto v_bytes = value_to_bytes(v);
ASSERT_EQ(iobuf_to_bytes(buf), v_bytes);
ASSERT_EQ(value_from_bytes(long_type{}, v_bytes), value{v});
}
}

TEST(ValueBytesTest, TestFloat) {
{
float_value v{2};
iobuf buf;
auto bytes = std::vector<uint8_t>{0x00, 0x00, 0x00, 0x40};
buf.append(bytes.data(), 4);
auto v_bytes = value_to_bytes(v);
ASSERT_EQ(iobuf_to_bytes(buf), value_to_bytes(v));
ASSERT_EQ(value_from_bytes(float_type{}, v_bytes), value{v});
}
{
float_value v{-2};
iobuf buf;
auto bytes = std::vector<uint8_t>{0x00, 0x00, 0x00, 0xc0};
buf.append(bytes.data(), 4);
auto v_bytes = value_to_bytes(v);
ASSERT_EQ(iobuf_to_bytes(buf), v_bytes);
ASSERT_EQ(value_from_bytes(float_type{}, v_bytes), value{v});
}
}

TEST(ValueBytesTest, TestDouble) {
{
double_value v{2};
iobuf buf;
auto bytes = std::vector<uint8_t>{
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40};
buf.append(bytes.data(), 8);
auto v_bytes = value_to_bytes(v);
ASSERT_EQ(iobuf_to_bytes(buf), v_bytes);
ASSERT_EQ(value_from_bytes(double_type{}, v_bytes), value{v});
}
{
double_value v{-2};
iobuf buf;
auto bytes = std::vector<uint8_t>{
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xc0};
buf.append(bytes.data(), 8);
auto v_bytes = value_to_bytes(v);
ASSERT_EQ(iobuf_to_bytes(buf), v_bytes);
ASSERT_EQ(value_from_bytes(double_type{}, v_bytes), value{v});
}
}

TEST(ValueBytesTest, TestUuid) {
uuid_value v{uuid_t::from_string("deadbeef-0000-0000-0000-000000000000")};
iobuf buf;
// clang-format off
auto bytes = std::vector<uint8_t>{
0xde, 0xad, 0xbe, 0xef, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
// clang-format on
buf.append(bytes.data(), 16);
auto v_bytes = value_to_bytes(v);
ASSERT_EQ(iobuf_to_bytes(buf), v_bytes);
ASSERT_EQ(value_from_bytes(uuid_type{}, v_bytes), value{v});
}

TEST(ValueBytesTest, TestString) {
string_value v{iobuf::from("abcd")};
iobuf buf;
auto bytes = std::vector<uint8_t>{0x61, 0x62, 0x63, 0x64};
buf.append(bytes.data(), 4);
auto v_bytes = value_to_bytes(std::move(v));
ASSERT_EQ(iobuf_to_bytes(buf), v_bytes);
ASSERT_EQ(
value_from_bytes(string_type{}, v_bytes),
value{string_value{iobuf::from("abcd")}});
}
157 changes: 157 additions & 0 deletions src/v/iceberg/values_bytes.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0
#include "iceberg/values_bytes.h"

#include "bytes/iobuf_parser.h"

#include <variant>

namespace iceberg {

namespace {

struct value_to_bytes_visitor {
template<typename T>
requires(std::is_arithmetic_v<std::decay_t<decltype(T::val)>>)
bytes operator()(const T& v) {
iobuf ret;
if constexpr (sizeof(T::val) == 1) {
ret.append(reinterpret_cast<const char*>(&v.val), 1);
} else if constexpr (std::is_same_v<decltype(T::val), float>) {
const auto le_t = htole32(std::bit_cast<std::uint32_t>(v.val));
ret.append(reinterpret_cast<const char*>(&le_t), sizeof(le_t));
} else if constexpr (std::is_same_v<decltype(T::val), double>) {
const auto le_t = htole64(std::bit_cast<std::uint64_t>(v.val));
ret.append(reinterpret_cast<const char*>(&le_t), sizeof(le_t));
} else {
const auto le_t = ss::cpu_to_le(v.val);
static_assert(sizeof(le_t) == sizeof(T::val));
ret.append(reinterpret_cast<const char*>(&le_t), sizeof(le_t));
}
return iobuf_to_bytes(ret);
}
bytes operator()(const string_value& v) { return iobuf_to_bytes(v.val); }
bytes operator()(const uuid_value& v) {
iobuf ret;
ret.append(v.val.to_vector().data(), uuid_t::length);
return iobuf_to_bytes(ret);
}
bytes operator()(const fixed_value& v) { return iobuf_to_bytes(v.val); }
bytes operator()(const binary_value& v) { return iobuf_to_bytes(v.val); }
bytes operator()(const decimal_value&) {
throw std::invalid_argument(
"XXX: decimals as bytes not implemented yet!");
}
};

struct value_from_bytes_visitor {
explicit value_from_bytes_visitor(const bytes& b)
: p(bytes_to_iobuf(b)) {}
iobuf_parser p;

void check_size(size_t expected_size) {
if (expected_size != p.bytes_left()) {
throw std::invalid_argument(fmt::format(
"Expected {} bytes, got {}", expected_size, p.bytes_left()));
}
}
template<typename T, typename V>
value make_int_val() {
check_size(sizeof(int32_t));
return V{ss::le_to_cpu(p.consume_type<int32_t>())};
}
template<typename T, typename V>
value make_long_val() {
check_size(sizeof(int64_t));
return V{ss::le_to_cpu(p.consume_type<int64_t>())};
}
template<typename T, typename V>
value make_iobuf_val() {
return V{p.copy(p.bytes_left())};
}

value operator()(const boolean_type&) {
check_size(sizeof(bool));
return boolean_value{p.consume_type<bool>()};
}
value operator()(const int_type&) {
return make_int_val<int_type, int_value>();
}
value operator()(const long_type&) {
return make_long_val<long_type, long_value>();
}
value operator()(const float_type&) {
check_size(sizeof(float));
return float_value{
std::bit_cast<float>(le32toh(p.consume_type<std::uint32_t>()))};
}
value operator()(const double_type&) {
check_size(sizeof(double));
return double_value{
std::bit_cast<double>(le64toh(p.consume_type<std::uint64_t>()))};
}
value operator()(const date_type&) {
return make_int_val<date_type, date_value>();
}
value operator()(const time_type&) {
return make_long_val<time_type, time_value>();
}
value operator()(const timestamp_type&) {
return make_long_val<timestamp_type, timestamp_value>();
}
value operator()(const timestamptz_type&) {
return make_long_val<timestamptz_type, timestamptz_value>();
}
value operator()(const decimal_type&) {
throw std::invalid_argument(
"XXX: decimals from bytes not implemented yet!");
}
value operator()(const uuid_type&) {
check_size(uuid_t::length);
auto bytes = p.peek_bytes(uuid_t::length);
std::vector<uint8_t> v;
v.reserve(bytes.size());
for (const auto b : bytes) {
v.emplace_back(b);
}
return uuid_value{uuid_t{v}};
}
value operator()(const string_type&) {
return make_iobuf_val<string_type, string_value>();
}
value operator()(const fixed_type&) {
return make_iobuf_val<fixed_type, fixed_value>();
}
value operator()(const binary_type&) {
return make_iobuf_val<binary_type, binary_value>();
}
};

} // namespace

bytes value_to_bytes(const value& v) {
// Only primitive types are expected to be serialized as bytes, e.g. as an
// upper/lower bound in a manifest.
if (!std::holds_alternative<primitive_value>(v)) {
throw std::invalid_argument(fmt::format(
"Can only translate primitive values to bytes, got {}", v));
}
return std::visit(value_to_bytes_visitor{}, std::get<primitive_value>(v));
}

value value_from_bytes(const field_type& type, const bytes& b) {
if (!std::holds_alternative<primitive_type>(type)) {
throw std::invalid_argument(fmt::format(
"Can only translate primitive values from bytes, got {}", type));
}
const auto& prim_type = std::get<primitive_type>(type);
return std::visit(value_from_bytes_visitor{b}, prim_type);
}

} // namespace iceberg
20 changes: 20 additions & 0 deletions src/v/iceberg/values_bytes.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0
#pragma once

#include "bytes/bytes.h"
#include "iceberg/datatypes.h"
#include "iceberg/values.h"

namespace iceberg {

bytes value_to_bytes(const value&);
value value_from_bytes(const field_type& type, const bytes&);

} // namespace iceberg

0 comments on commit 6f6c2cb

Please sign in to comment.