Skip to content

Commit

Permalink
Implemented V2 codec (Reader and writer)
Browse files Browse the repository at this point in the history
The new codec's read performance is almost double the old one, while the write perfomance is about the same.

The new codec supports NULL value, default value, Date, DateTime, and fixed length string

1) RowReader now can be reused
2) Only nullable fields have NULL flag bit
  • Loading branch information
sherman-the-tank committed Apr 6, 2020
1 parent ec97c12 commit 3b8ce36
Show file tree
Hide file tree
Showing 28 changed files with 2,888 additions and 1,464 deletions.
29 changes: 15 additions & 14 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -270,20 +270,6 @@ if(ENABLE_FUZZ_TEST)
endif()
endif()

# All thrift libraries
set(THRIFT_LIBRARIES
thriftcpp2
thrift
thriftprotocol
async
protocol
transport
concurrency
security
thriftfrozen2
thrift-core
)

macro(nebula_add_executable)
cmake_parse_arguments(
nebula_exec # prefix
Expand Down Expand Up @@ -364,6 +350,20 @@ include_directories(AFTER ${CMAKE_CURRENT_BINARY_DIR}/src/kvstore/plugins/hbase)
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -L ${NEBULA_THIRDPARTY_ROOT}/lib")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -L ${NEBULA_THIRDPARTY_ROOT}/lib64")

# All thrift libraries
set(THRIFT_LIBRARIES
thriftcpp2
thrift
thriftprotocol
async
protocol
transport
concurrency
security
thriftfrozen2
thrift-core
)

set(ROCKSDB_LIBRARIES ${Rocksdb_LIBRARY})

# All compression libraries
Expand Down Expand Up @@ -412,6 +412,7 @@ macro(nebula_link_libraries target)
event
double-conversion
resolv
s2
${COMPRESSION_LIBRARIES}
${JEMALLOC_LIB}
${OPENSSL_SSL_LIBRARY}
Expand Down
3 changes: 3 additions & 0 deletions src/codec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ nebula_add_library(
codec_obj OBJECT
RowReader.cpp
RowReaderV1.cpp
RowReaderV2.cpp
RowWriterV2.cpp
RowReaderWrapper.cpp
)

nebula_add_subdirectory(test)
64 changes: 21 additions & 43 deletions src/codec/RowReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

#include "base/Base.h"
#include "codec/RowReader.h"
#include "codec/RowReaderV1.h"
#include "codec/RowReaderWrapper.h"

namespace nebula {

Expand Down Expand Up @@ -73,12 +73,12 @@ std::unique_ptr<RowReader> RowReader::getTagPropReader(
meta::SchemaManager* schemaMan,
GraphSpaceID space,
TagID tag,
folly::StringPiece row) {
std::string row) {
CHECK_NOTNULL(schemaMan);
int32_t ver = getSchemaVer(row);
if (ver >= 0) {
return std::unique_ptr<RowReader>(
new RowReaderV1(row, schemaMan->getTagSchema(space, tag, ver)));
new RowReaderV1(std::move(row), schemaMan->getTagSchema(space, tag, ver)));
}
// Invalid data
Expand All @@ -92,12 +92,12 @@ std::unique_ptr<RowReader> RowReader::getEdgePropReader(
meta::SchemaManager* schemaMan,
GraphSpaceID space,
EdgeType edge,
folly::StringPiece row) {
std::string row) {
CHECK_NOTNULL(schemaMan);
int32_t ver = getSchemaVer(row);
if (ver >= 0) {
return std::unique_ptr<RowReader>(
new RowReaderV1(row, schemaMan->getEdgeSchema(space, edge, ver)));
new RowReaderV1(st::move(row), schemaMan->getEdgeSchema(space, edge, ver)));
}
// Invalid data
Expand All @@ -108,50 +108,28 @@ std::unique_ptr<RowReader> RowReader::getEdgePropReader(

// static
std::unique_ptr<RowReader> RowReader::getRowReader(
std::shared_ptr<const meta::SchemaProviderIf> schema,
const meta::SchemaProviderIf* schema,
folly::StringPiece row) {
SchemaVer ver = getSchemaVer(row);
CHECK_EQ(ver, schema->getVersion());
return std::unique_ptr<RowReader>(new RowReaderV1(row, std::move(schema)));
}


// static
SchemaVer RowReader::getSchemaVer(folly::StringPiece row) {
const uint8_t* it = reinterpret_cast<const uint8_t*>(row.begin());
if (reinterpret_cast<const char*>(it) == row.end()) {
LOG(ERROR) << "Row data is empty, so there is no schema version";
return 0;
}

// The first three bits indicate the number of bytes for the
// schema version. If the number is zero, no schema version
// presents
size_t verBytes = *(it++) >> 5;
int32_t ver = 0;
if (verBytes > 0) {
if (verBytes + 1 > row.size()) {
// Data is too short
LOG(ERROR) << "Row data is too short: " << toHexStr(row);
return 0;
}
// Schema Version is stored in Little Endian
for (size_t i = 0; i < verBytes; i++) {
ver |= (uint32_t(*(it++)) << (8 * i));
}
auto reader = std::make_unique<RowReaderWrapper>();
if (reader->reset(schema, row)) {
return reader;
} else {
LOG(ERROR) << "Failed to initiate the reader, most likely the data"
"is corrupted. The data is ["
<< toHexStr(row)
<< "]";
return std::unique_ptr<RowReader>();
}

return ver;
}


const Value& RowReader::getDefaultValue(const std::string& prop) {
auto field = schema_->field(prop);
if (!!field && field->hasDefault()) {
return field->defaultValue();
}
bool RowReader::resetImpl(meta::SchemaProviderIf const* schema,
folly::StringPiece row) noexcept {
schema_ = schema;
data_ = row;

return Value::null();
endIter_.reset(schema_->getNumFields());
return true;
}

} // namespace nebula
51 changes: 33 additions & 18 deletions src/codec/RowReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,15 @@ class RowReader {
Cell cell_;
size_t index_;

Iterator(const RowReader* reader, size_t index = 0)
explicit Iterator(const RowReader* reader)
: reader_(reader), cell_(this) {}

Iterator(const RowReader* reader, size_t index)
: reader_(reader), cell_(this), index_(index) {}

void reset(size_t index = 0) {
index_ = index;
}
};


Expand All @@ -67,58 +74,66 @@ class RowReader {
meta::SchemaManager* schemaMan,
GraphSpaceID space,
TagID tag,
folly::StringPiece row);
std::string row);
static std::unique_ptr<RowReader> getEdgePropReader(
meta::SchemaManager* schemaMan,
GraphSpaceID space,
EdgeType edge,
folly::StringPiece row);
std::string row);
*/

static std::unique_ptr<RowReader> getRowReader(
std::shared_ptr<const meta::SchemaProviderIf> schema,
meta::SchemaProviderIf const* schema,
folly::StringPiece row);

virtual ~RowReader() = default;

virtual Value getValueByName(const std::string& prop) const noexcept = 0;
virtual Value getValueByIndex(const int64_t index) const noexcept = 0;

const Value& getDefaultValue(const std::string& prop);
virtual int32_t readerVer() const noexcept = 0;

// Return the number of bytes used for the header info
virtual size_t headerLen() const noexcept = 0;

virtual bool reset(meta::SchemaProviderIf const* schema,
folly::StringPiece row) noexcept = 0;

Iterator begin() const noexcept {
virtual Iterator begin() const noexcept {
return Iterator(this, 0);
}

const Iterator& end() const noexcept {
virtual const Iterator& end() const noexcept {
return endIter_;
}

SchemaVer schemaVer() const noexcept {
virtual SchemaVer schemaVer() const noexcept {
return schema_->getVersion();
}

size_t numFields() const noexcept {
virtual size_t numFields() const noexcept {
return schema_->getNumFields();
}

std::shared_ptr<const meta::SchemaProviderIf> getSchema() const {
virtual const meta::SchemaProviderIf* getSchema() const {
return schema_;
}

virtual const std::string getData() const {
return data_.toString();
}

protected:
std::shared_ptr<const meta::SchemaProviderIf> schema_;
meta::SchemaProviderIf const* schema_;
folly::StringPiece data_;

explicit RowReader(std::shared_ptr<const meta::SchemaProviderIf> schema,
folly::StringPiece row)
: schema_(schema)
, data_(row)
, endIter_(this, schema_->getNumFields()) {}
RowReader() : endIter_(this) {}

static SchemaVer getSchemaVer(folly::StringPiece row);
virtual bool resetImpl(meta::SchemaProviderIf const* schema,
folly::StringPiece row) noexcept;

private:
const Iterator endIter_;
Iterator endIter_;
};

} // namespace nebula
Expand Down
30 changes: 19 additions & 11 deletions src/codec/RowReaderV1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@

#define RR_GET_OFFSET() \
if (index >= static_cast<int64_t>(schema_->getNumFields())) { \
return NullType::BAD_DATA; \
return NullType::BAD_DATA; \
} \
int64_t offset = skipToField(index); \
if (offset < 0) { \
return NullType::BAD_DATA; \
return NullType::BAD_DATA; \
}

namespace nebula {
Expand All @@ -24,17 +24,25 @@ namespace nebula {
* class RowReaderV1
*
********************************************/
RowReaderV1::RowReaderV1(folly::StringPiece row,
std::shared_ptr<const meta::SchemaProviderIf> schema)
: RowReader(schema, row) {
CHECK(!!schema_) << "A schema must be provided";
bool RowReaderV1::resetImpl(meta::SchemaProviderIf const* schema,
folly::StringPiece row) noexcept {
RowReader::resetImpl(schema, row);

if (processHeader(row)) {
DCHECK(schema_ != nullptr) << "A schema must be provided";

headerLen_ = 0;
numBytesForOffset_ = 0;
blockOffsets_.clear();
offsets_.clear();

if (processHeader(data_)) {
// data_.begin() points to the first field
data_.reset(row.begin() + headerLen_, row.size() - headerLen_);
data_ = data_.subpiece(headerLen_);
return true;
} else {
// Invalid data
LOG(FATAL) << "Invalid row data: " << toHexStr(row);
LOG(ERROR) << "Invalid row data: " << toHexStr(row);
return false;
}
}

Expand All @@ -45,7 +53,7 @@ bool RowReaderV1::processHeader(folly::StringPiece row) {
return false;
}

DCHECK(!!schema_) << "A schema must be provided";
DCHECK(schema_ != nullptr) << "A schema must be provided";

// The last three bits indicate the number of bytes for offsets
// The first three bits indicate the number of bytes for the
Expand Down Expand Up @@ -483,7 +491,7 @@ int32_t RowReaderV1::readString(int64_t offset, folly::StringPiece& v) const noe
return -1;
}

v = data_.subpiece(offset + intLen, strLen);
v = folly::StringPiece(data_.data() + offset + intLen, strLen);
return intLen + strLen;
}

Expand Down
25 changes: 21 additions & 4 deletions src/codec/RowReaderV1.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,38 @@

namespace nebula {

class RowReaderWrapper;

/**
* This class decodes the data from version 1.0
*/
class RowReaderV1 : public RowReader {
friend class RowReader;
friend class RowReaderWrapper;

FRIEND_TEST(RowReaderV1, headerInfo);
FRIEND_TEST(RowReaderV1, encodedData);
FRIEND_TEST(RowReaderV1, iterator);

public:
~RowReaderV1() = default;

Value getValueByName(const std::string& prop) const noexcept override;
Value getValueByIndex(const int64_t index) const noexcept override;

int32_t readerVer() const noexcept override {
return 1;
}

size_t headerLen() const noexcept override {
return headerLen_;
}

bool reset(meta::SchemaProviderIf const*, folly::StringPiece) noexcept override {
LOG(FATAL) << "Not implemented";
}

protected:
RowReaderV1(folly::StringPiece row,
std::shared_ptr<const meta::SchemaProviderIf> schema);
bool resetImpl(meta::SchemaProviderIf const* schema, folly::StringPiece row)
noexcept override;

private:
int32_t headerLen_ = 0;
Expand All @@ -41,6 +56,8 @@ class RowReaderV1 : public RowReader {
mutable std::vector<int64_t> offsets_;

private:
RowReaderV1() = default;

// Process the row header infomation
// Returns false when the row data is invalid
bool processHeader(folly::StringPiece row);
Expand Down
Loading

0 comments on commit 3b8ce36

Please sign in to comment.