Skip to content

Commit

Permalink
refactor: implement value schema 1 (#735)
Browse files Browse the repository at this point in the history
  • Loading branch information
levy5307 authored May 26, 2021
1 parent a2c8355 commit 2c505b2
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 12 deletions.
3 changes: 2 additions & 1 deletion src/base/pegasus_value_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,9 @@ class pegasus_value_generator
enum data_version
{
VERSION_0 = 0,
VERSION_1 = 1,
VERSION_COUNT,
VERSION_MAX = VERSION_0,
VERSION_MAX = VERSION_1,
/// TBD(zlw)
};

Expand Down
4 changes: 3 additions & 1 deletion src/base/test/value_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ TEST(value_schema_manager, get_value_schema)
uint32_t version;
bool schema_exist;
} tests[] = {
{pegasus::data_version::VERSION_0, true}, {pegasus::data_version::VERSION_MAX + 1, false},
{pegasus::data_version::VERSION_0, true},
{pegasus::data_version::VERSION_1, true},
{pegasus::data_version::VERSION_MAX + 1, false},
};

for (const auto &t : tests) {
Expand Down
7 changes: 6 additions & 1 deletion src/base/test/value_schema_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ TEST(value_schema, generate_and_extract)
{0, std::numeric_limits<uint32_t>::max(), 0, "pegasus"},
{0, std::numeric_limits<uint32_t>::max(), 0, ""},
{0, 0, 0, "a"},

{1, 1000, 10001, ""},
{1, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint64_t>::max(), "pegasus"},
{1, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint64_t>::max(), ""},
{1, 0, 0, "a"},
};

for (const auto &t : tests) {
Expand All @@ -96,7 +101,7 @@ TEST(value_schema, update_expire_ts)
uint32_t expire_ts;
uint32_t update_expire_ts;
} tests[] = {
{0, 1000, 10086},
{0, 1000, 10086}, {1, 1000, 10086},
};

for (const auto &t : tests) {
Expand Down
4 changes: 3 additions & 1 deletion src/base/value_schema_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "value_schema_manager.h"
#include "value_schema_v0.h"
#include "value_schema_v1.h"

namespace pegasus {
value_schema_manager::value_schema_manager()
Expand All @@ -27,7 +28,8 @@ value_schema_manager::value_schema_manager()
* If someone wants to add a new data version, he only need to implement the new value schema,
* and register it here.
*/
value_schema_manager::instance().register_schema(dsn::make_unique<value_schema_v0>());
register_schema(dsn::make_unique<value_schema_v0>());
register_schema(dsn::make_unique<value_schema_v1>());
}

void value_schema_manager::register_schema(std::unique_ptr<value_schema> schema)
Expand Down
6 changes: 2 additions & 4 deletions src/base/value_schema_v0.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ std::unique_ptr<value_field> value_schema_v0::extract_field(dsn::string_view val
dsn::blob value_schema_v0::extract_user_data(std::string &&value)
{
auto ret = dsn::blob::create_from_bytes(std::move(value));
ret.range(sizeof(uint32_t));
return ret;
return ret.range(sizeof(uint32_t));
}

void value_schema_v0::update_field(std::string &value, std::unique_ptr<value_field> field)
Expand Down Expand Up @@ -90,8 +89,7 @@ void value_schema_v0::update_expire_ts(std::string &value, std::unique_ptr<value
dassert_f(value.length() >= sizeof(uint32_t), "value must include 'expire_ts' header");
auto expire_field = static_cast<expire_timestamp_field *>(field.get());

auto new_expire_ts = expire_field->expire_ts;
new_expire_ts = dsn::endian::hton(new_expire_ts);
auto new_expire_ts = dsn::endian::hton(expire_field->expire_ts);
memcpy(const_cast<char *>(value.data()), &new_expire_ts, sizeof(uint32_t));
}

Expand Down
4 changes: 1 addition & 3 deletions src/base/value_schema_v0.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

#include "pegasus_value_schema.h"

#include <dsn/utility/singleton.h>

namespace pegasus {
/**
* rocksdb value: |- expire_ts(4bytes) -|- user value(bytes) -|
Expand All @@ -37,7 +35,7 @@ class value_schema_v0 : public value_schema
dsn::blob extract_user_data(std::string &&value) override;
void update_field(std::string &value, std::unique_ptr<value_field> field) override;
rocksdb::SliceParts generate_value(const value_params &params) override;
data_version version() const override { return VERSION_0; }
data_version version() const override { return data_version::VERSION_0; }

private:
std::unique_ptr<value_field> extract_timestamp(dsn::string_view value);
Expand Down
113 changes: 113 additions & 0 deletions src/base/value_schema_v1.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "value_schema_v1.h"

#include <dsn/utility/endians.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/c/api_utilities.h>
#include <dsn/utility/smart_pointers.h>

namespace pegasus {
std::unique_ptr<value_field> value_schema_v1::extract_field(dsn::string_view value,
value_field_type type)
{
std::unique_ptr<value_field> field = nullptr;
switch (type) {
case value_field_type::EXPIRE_TIMESTAMP:
field = extract_timestamp(value);
break;
case value_field_type::TIME_TAG:
field = extract_time_tag(value);
break;
default:
dassert_f(false, "Unsupported field type: {}", type);
}
return field;
}

dsn::blob value_schema_v1::extract_user_data(std::string &&value)
{
auto ret = dsn::blob::create_from_bytes(std::move(value));
return ret.range(sizeof(uint32_t) + sizeof(uint64_t));
}

void value_schema_v1::update_field(std::string &value, std::unique_ptr<value_field> field)
{
auto type = field->type();
switch (field->type()) {
case value_field_type::EXPIRE_TIMESTAMP:
update_expire_ts(value, std::move(field));
break;
default:
dassert_f(false, "Unsupported update field type: {}", type);
}
}

rocksdb::SliceParts value_schema_v1::generate_value(const value_params &params)
{
auto expire_ts_field = static_cast<expire_timestamp_field *>(
params.fields[value_field_type::EXPIRE_TIMESTAMP].get());
auto timetag_field =
static_cast<time_tag_field *>(params.fields[value_field_type::TIME_TAG].get());
auto data_field =
static_cast<user_data_field *>(params.fields[value_field_type::USER_DATA].get());
if (dsn_unlikely(expire_ts_field == nullptr || data_field == nullptr ||
timetag_field == nullptr)) {
dassert_f(false, "USER_DATA or EXPIRE_TIMESTAMP or TIME_TAG is not provided");
return {nullptr, 0};
}

params.write_buf.resize(sizeof(uint32_t) + sizeof(uint64_t));
dsn::data_output(params.write_buf)
.write_u32(expire_ts_field->expire_ts)
.write_u64(timetag_field->time_tag);
params.write_slices.clear();
params.write_slices.emplace_back(params.write_buf.data(), params.write_buf.size());

dsn::string_view user_data = data_field->user_data;
if (user_data.length() > 0) {
params.write_slices.emplace_back(user_data.data(), user_data.length());
}
return {&params.write_slices[0], static_cast<int>(params.write_slices.size())};
}

std::unique_ptr<value_field> value_schema_v1::extract_timestamp(dsn::string_view value)
{
uint32_t expire_ts = dsn::data_input(value).read_u32();
return dsn::make_unique<expire_timestamp_field>(expire_ts);
}

std::unique_ptr<value_field> value_schema_v1::extract_time_tag(dsn::string_view value)
{
dsn::data_input input(value);
input.skip(sizeof(uint32_t));
return dsn::make_unique<time_tag_field>(input.read_u64());
}

void value_schema_v1::update_expire_ts(std::string &value, std::unique_ptr<value_field> field)
{
dassert_f(value.length() >= sizeof(uint32_t), "value must include 'expire_ts' header");
auto expire_field = static_cast<expire_timestamp_field *>(field.get());

auto new_expire_ts = dsn::endian::hton(expire_field->expire_ts);
memcpy(const_cast<char *>(value.data()), &new_expire_ts, sizeof(uint32_t));
}

} // namespace pegasus
46 changes: 46 additions & 0 deletions src/base/value_schema_v1.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include "pegasus_value_schema.h"

namespace pegasus {
/**
* rocksdb value: |- expire_ts(4bytes) -|- timetag(8 bytes) -|- user value(bytes) -|
*/
class value_schema_v1 : public value_schema
{
public:
value_schema_v1() = default;

std::unique_ptr<value_field> extract_field(dsn::string_view value,
value_field_type type) override;
dsn::blob extract_user_data(std::string &&value) override;
void update_field(std::string &value, std::unique_ptr<value_field> field) override;
rocksdb::SliceParts generate_value(const value_params &params) override;
data_version version() const override { return data_version::VERSION_1; }

protected:
std::unique_ptr<value_field> extract_timestamp(dsn::string_view value);
std::unique_ptr<value_field> extract_time_tag(dsn::string_view value);
void update_expire_ts(std::string &value, std::unique_ptr<value_field> field);
};

} // namespace pegasus

0 comments on commit 2c505b2

Please sign in to comment.