Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for the List/Set type #5914

Merged
merged 36 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
4ac8f7e
Delete logs directory
YZW00 Jul 31, 2024
c584979
Delete data directory
YZW00 Jul 31, 2024
9ebe28f
Delete etc directory
YZW00 Jul 31, 2024
be0b1d4
Merged local changes into New_ListSet
YZW00 Jul 31, 2024
14149ba
Merged local changes into New_ListSet
YZW00 Jul 31, 2024
8ae1389
Merge branch 'New_ListSet' of github.com:YZW00/Nebula-List_and_Set in…
YZW00 Jul 31, 2024
744c33c
Merged local changes into New_ListSet
YZW00 Jul 31, 2024
2ba2e04
Merged local changes into New_ListSet
YZW00 Jul 31, 2024
9278b60
Delete share/resources directory
YZW00 Aug 1, 2024
6f9b0ce
Merged local changes into New_ListSet
YZW00 Aug 1, 2024
3562880
Merge branch 'New_ListSet' of github.com:YZW00/Nebula-List_and_Set in…
YZW00 Aug 1, 2024
24ca613
Merged local changes into New_ListSet
YZW00 Aug 21, 2024
d020725
Merge branch 'master' into New_ListSet
Salieri-004 Aug 22, 2024
85da782
New changes into New_ListSet
YZW00 Aug 25, 2024
bcb6a50
Merge branch 'New_ListSet' of github.com:YZW00/Nebula-List_and_Set in…
YZW00 Aug 25, 2024
8f165f9
New changes into New_ListSet
YZW00 Aug 25, 2024
3710e0f
New changes into New_ListSet
YZW00 Aug 25, 2024
98792f5
New changes into New_ListSet
YZW00 Aug 25, 2024
bf47108
New changes into New_ListSet
YZW00 Aug 25, 2024
7929ad1
New changes into New_ListSet
YZW00 Aug 27, 2024
59fcb12
New changes into New_ListSet
YZW00 Aug 31, 2024
b6052b7
Merge branch 'master' into New_ListSet
Salieri-004 Sep 24, 2024
75c0408
Latest changes into New_ListSet
YZW00 Sep 29, 2024
9b3fa01
Merge branch 'New_ListSet' of github.com:YZW00/Nebula-List_and_Set in…
YZW00 Sep 29, 2024
051b677
Merge branch 'master' into New_ListSet
YZW00 Oct 6, 2024
373d7a0
Merge branch 'master' into New_ListSet
YZW00 Oct 12, 2024
c956252
Latest changes into New_ListSet
YZW00 Oct 12, 2024
5e57d83
Latest changes into New_ListSet
YZW00 Oct 12, 2024
26b6ce7
Latest changes into New_ListSet
YZW00 Oct 15, 2024
a828dd6
Merge branch 'New_ListSet' of github.com:YZW00/Nebula-List_and_Set in…
YZW00 Oct 15, 2024
9e59530
Latest changes into New_ListSet
YZW00 Oct 15, 2024
0724a77
Latest changes into New_ListSet
YZW00 Oct 24, 2024
7bc462f
Merge branch 'master' into New_ListSet
YZW00 Oct 24, 2024
b112fa8
Latest changes into New_ListSet
YZW00 Oct 24, 2024
2658e17
Latest changes into New_ListSet
YZW00 Oct 24, 2024
32669b8
Merge branch 'New_ListSet' of github.com:YZW00/Nebula-List_and_Set in…
YZW00 Oct 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 96 additions & 1 deletion src/codec/RowReaderV2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,44 @@
*
* This source code is licensed under Apache 2.0 License.
*/

#include "codec/RowReaderV2.h"

namespace nebula {

using nebula::cpp2::PropertyType;

template <typename T, typename Container>
Value extractIntOrFloat(const folly::StringPiece& data, size_t& offset) {
int32_t containerOffset;
memcpy(reinterpret_cast<void*>(&containerOffset), data.data() + offset, sizeof(int32_t));
if (static_cast<size_t>(containerOffset) >= data.size()) {
LOG(ERROR) << "Container offset out of bounds. Offset: " << containerOffset
<< ", Data size: " << data.size();
return Value::kNullValue;
}
int32_t containerSize;
memcpy(reinterpret_cast<void*>(&containerSize), data.data() + containerOffset, sizeof(int32_t));
containerOffset += sizeof(int32_t);
Container container;
for (int32_t i = 0; i < containerSize; ++i) {
T value;
if (static_cast<size_t>(containerOffset + sizeof(T)) > data.size()) {
LOG(ERROR) << "Reading beyond data bounds. Attempting to read at offset: " << containerOffset
<< ", Data size: " << data.size();
return Value::kNullValue;
}
memcpy(reinterpret_cast<void*>(&value), data.data() + containerOffset, sizeof(T));
containerOffset += sizeof(T);

if constexpr (std::is_same_v<Container, List>) {
container.values.emplace_back(Value(value));
} else if constexpr (std::is_same_v<Container, Set>) {
container.values.insert(Value(value));
}
}
return Value(std::move(container));
}

bool RowReaderV2::resetImpl(meta::NebulaSchemaProvider const* schema, folly::StringPiece row) {
schema_ = schema;
data_ = row;
Expand Down Expand Up @@ -206,6 +237,70 @@ Value RowReaderV2::getValueByIndex(const int64_t index) const {
}
return std::move(geogRet).value();
}
case PropertyType::LIST_STRING: {
int32_t listOffset;
memcpy(reinterpret_cast<void*>(&listOffset), &data_[offset], sizeof(int32_t));
if (static_cast<size_t>(listOffset) >= data_.size()) {
LOG(ERROR) << "List offset out of bounds for LIST_STRING.";
return Value::kNullValue;
}
int32_t listSize;
memcpy(reinterpret_cast<void*>(&listSize), &data_[listOffset], sizeof(int32_t));
listOffset += sizeof(int32_t);

List list;
for (int32_t i = 0; i < listSize; ++i) {
int32_t strLen;
memcpy(reinterpret_cast<void*>(&strLen), &data_[listOffset], sizeof(int32_t));
listOffset += sizeof(int32_t);
if (static_cast<size_t>(listOffset + strLen) > data_.size()) {
LOG(ERROR) << "String length out of bounds for LIST_STRING.";
return Value::kNullValue;
}
std::string str(&data_[listOffset], strLen);
listOffset += strLen;
list.values.emplace_back(str);
}
return Value(std::move(list));
}
case PropertyType::LIST_INT:
return nebula::extractIntOrFloat<int32_t, List>(data_, offset);
case PropertyType::LIST_FLOAT:
return nebula::extractIntOrFloat<float, List>(data_, offset);
case PropertyType::SET_STRING: {
int32_t setOffset;
memcpy(reinterpret_cast<void*>(&setOffset), &data_[offset], sizeof(int32_t));
if (static_cast<size_t>(setOffset) >= data_.size()) {
LOG(ERROR) << "Set offset out of bounds for SET_STRING.";
return Value::kNullValue;
}
int32_t setSize;
memcpy(reinterpret_cast<void*>(&setSize), &data_[setOffset], sizeof(int32_t));
setOffset += sizeof(int32_t);

Set set;
std::unordered_set<std::string> uniqueStrings;
for (int32_t i = 0; i < setSize; ++i) {
int32_t strLen;
memcpy(reinterpret_cast<void*>(&strLen), &data_[setOffset], sizeof(int32_t));
setOffset += sizeof(int32_t);
if (static_cast<size_t>(setOffset + strLen) > data_.size()) {
LOG(ERROR) << "String length out of bounds for SET_STRING.";
return Value::kNullValue;
}
std::string str(&data_[setOffset], strLen);
setOffset += strLen;
uniqueStrings.insert(std::move(str));
}
for (const auto& str : uniqueStrings) {
set.values.insert(Value(str));
}
return Value(std::move(set));
}
case PropertyType::SET_INT:
return nebula::extractIntOrFloat<int32_t, Set>(data_, offset);
case PropertyType::SET_FLOAT:
return nebula::extractIntOrFloat<float, Set>(data_, offset);
case PropertyType::UNKNOWN:
break;
}
Expand Down
146 changes: 145 additions & 1 deletion src/codec/RowWriterV2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,61 @@ namespace nebula {

using nebula::cpp2::PropertyType;

// Template functions for handling different types (e.g. strings, ints, floats) in lists and
// collections
template <typename Container>
WriteResult writeContainer(const Container& container,
Value::Type valueType,
bool isSetType,
std::unordered_set<std::string>& serializedStrings,
Salieri-004 marked this conversation as resolved.
Show resolved Hide resolved
std::unordered_set<int32_t>& serializedInts,
std::unordered_set<float>& serializedFloats,
std::string& buffer) {
for (const auto& item : container.values) {
if (item.type() != valueType) {
LOG(ERROR) << "Type mismatch: Expected " << static_cast<int>(valueType) << " but got "
<< static_cast<int>(item.type());
return WriteResult::TYPE_MISMATCH;
}
switch (valueType) {
case Value::Type::STRING: {
std::string str = item.getStr();
if (isSetType && serializedStrings.find(str) != serializedStrings.end()) {
continue;
}
int32_t strLen = str.size();
buffer.append(reinterpret_cast<const char*>(&strLen), sizeof(int32_t));
buffer.append(str.data(), strLen);
serializedStrings.insert(str);
break;
}
case Value::Type::INT: {
int32_t intVal = item.getInt();
if (isSetType && serializedInts.find(intVal) != serializedInts.end()) {
continue;
}
buffer.append(reinterpret_cast<const char*>(&intVal), sizeof(int32_t));
serializedInts.insert(intVal);
break;
}
case Value::Type::FLOAT: {
float floatVal = item.getFloat();
if (isSetType && serializedFloats.find(floatVal) != serializedFloats.end()) {
continue;
}
buffer.append(reinterpret_cast<const char*>(&floatVal), sizeof(float));
serializedFloats.insert(floatVal);
break;
}
default:
LOG(ERROR) << "Unsupported value type: " << static_cast<int>(valueType);
return WriteResult::TYPE_MISMATCH;
}
}

return WriteResult::SUCCEEDED;
}

RowWriterV2::RowWriterV2(const meta::NebulaSchemaProvider* schema)
: schema_(schema), numNullBytes_(0), approxStrLen_(0), finished_(false), outOfSpaceStr_(false) {
CHECK(!!schema_);
Expand Down Expand Up @@ -138,6 +193,12 @@ RowWriterV2::RowWriterV2(RowReaderWrapper& reader) : RowWriterV2(reader.getSchem
case Value::Type::DURATION:
set(i, v.moveDuration());
break;
case Value::Type::LIST:
set(i, v.moveList());
break;
case Value::Type::SET:
set(i, v.moveSet());
break;
default:
LOG(FATAL) << "Invalid data: " << v << ", type: " << v.typeName();
isSet_[i] = false;
Expand Down Expand Up @@ -226,11 +287,14 @@ WriteResult RowWriterV2::setValue(ssize_t index, const Value& val) {
return write(index, val.getGeography());
case Value::Type::DURATION:
return write(index, val.getDuration());
case Value::Type::LIST:
return write(index, val.getList());
case Value::Type::SET:
YZW00 marked this conversation as resolved.
Show resolved Hide resolved
return write(index, val.getSet());
default:
return WriteResult::TYPE_MISMATCH;
}
}

WriteResult RowWriterV2::setValue(const std::string& name, const Value& val) {
CHECK(!finished_) << "You have called finish()";
int64_t index = schema_->getFieldIndex(name);
Expand Down Expand Up @@ -821,6 +885,80 @@ WriteResult RowWriterV2::write(ssize_t index, const Geography& v) {
return write(index, folly::StringPiece(wkb), true);
}

WriteResult RowWriterV2::write(ssize_t index, const List& list) {
auto field = schema_->field(index);
auto offset = headerLen_ + numNullBytes_ + field->offset();
if (isSet_[index]) {
outOfSpaceStr_ = true;
}
int32_t listSize = list.size();
int32_t listOffset = buf_.size();
buf_.append(reinterpret_cast<const char*>(&listSize), sizeof(int32_t));
std::unordered_set<std::string> serializedStrings;
std::unordered_set<int32_t> serializedInts;
std::unordered_set<float> serializedFloats;
Value::Type valueType;
if (field->type() == PropertyType::LIST_STRING) {
valueType = Value::Type::STRING;
} else if (field->type() == PropertyType::LIST_INT) {
valueType = Value::Type::INT;
} else if (field->type() == PropertyType::LIST_FLOAT) {
valueType = Value::Type::FLOAT;
} else {
LOG(ERROR) << "Unsupported list type: " << static_cast<int>(field->type());
return WriteResult::TYPE_MISMATCH;
}
auto result = writeContainer(
list, valueType, false, serializedStrings, serializedInts, serializedFloats, buf_);
if (result != WriteResult::SUCCEEDED) {
return result;
}

memcpy(&buf_[offset], reinterpret_cast<void*>(&listOffset), sizeof(int32_t));
if (field->nullable()) {
clearNullBit(field->nullFlagPos());
}
isSet_[index] = true;
return WriteResult::SUCCEEDED;
}

WriteResult RowWriterV2::write(ssize_t index, const Set& set) {
auto field = schema_->field(index);
auto offset = headerLen_ + numNullBytes_ + field->offset();
if (isSet_[index]) {
outOfSpaceStr_ = true;
}
int32_t setSize = set.size();
int32_t setOffset = buf_.size();
buf_.append(reinterpret_cast<const char*>(&setSize), sizeof(int32_t));
std::unordered_set<std::string> serializedStrings;
std::unordered_set<int32_t> serializedInts;
std::unordered_set<float> serializedFloats;
Value::Type valueType;
if (field->type() == PropertyType::SET_STRING) {
valueType = Value::Type::STRING;
} else if (field->type() == PropertyType::SET_INT) {
valueType = Value::Type::INT;
} else if (field->type() == PropertyType::SET_FLOAT) {
valueType = Value::Type::FLOAT;
} else {
LOG(ERROR) << "Unsupported set type: " << static_cast<int>(field->type());
return WriteResult::TYPE_MISMATCH;
}
auto result = writeContainer(
set, valueType, true, serializedStrings, serializedInts, serializedFloats, buf_);
if (result != WriteResult::SUCCEEDED) {
return result;
}

memcpy(&buf_[offset], reinterpret_cast<void*>(&setOffset), sizeof(int32_t));
if (field->nullable()) {
clearNullBit(field->nullFlagPos());
}
isSet_[index] = true;
return WriteResult::SUCCEEDED;
}

WriteResult RowWriterV2::checkUnsetFields() {
DefaultValueContext expCtx;
for (size_t i = 0; i < schema_->getNumFields(); i++) {
Expand Down Expand Up @@ -868,6 +1006,12 @@ WriteResult RowWriterV2::checkUnsetFields() {
case Value::Type::DURATION:
r = write(i, defVal.getDuration());
break;
case Value::Type::LIST:
r = write(i, defVal.getList());
break;
case Value::Type::SET:
r = write(i, defVal.getSet());
break;
default:
LOG(FATAL) << "Unsupported default value type: " << defVal.typeName()
<< ", default value: " << defVal
Expand Down
6 changes: 6 additions & 0 deletions src/codec/RowWriterV2.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,12 @@ class RowWriterV2 {
WriteResult write(ssize_t index, const Duration& v);

WriteResult write(ssize_t index, const Geography& v);
// Supports storing ordered lists of strings, integers, and floats,
// including LIST_STRING, LIST_INT, and LIST_FLOAT.
WriteResult write(ssize_t index, const List& list);
YZW00 marked this conversation as resolved.
Show resolved Hide resolved
// Supports storing unordered sets of strings, integers, and floats,
// including SET_STRING, SET_INT, and SET_FLOAT
WriteResult write(ssize_t index, const Set& set);
};

} // namespace nebula
Expand Down
14 changes: 14 additions & 0 deletions src/common/datatypes/List.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ struct List {
}
explicit List(const std::vector<Value>& l) : values(l) {}

// Template static factory method to create a list with specific types
template <typename T>
static List createFromVector(const std::vector<T>& items);

bool empty() const {
return values.empty();
}
Expand Down Expand Up @@ -102,6 +106,16 @@ inline std::ostream& operator<<(std::ostream& os, const List& l) {
return os << l.toString();
}

// Define using template static factory method
template <typename T>
inline List List::createFromVector(const std::vector<T>& items) {
std::vector<Value> values;
values.reserve(items.size());
for (const auto& item : items) {
values.emplace_back(Value(item));
}
return List(std::move(values));
}
} // namespace nebula

namespace std {
Expand Down
14 changes: 14 additions & 0 deletions src/common/datatypes/Set.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ struct Set {
values = std::move(value);
}

// Template Static Factory Method Declaration
template <typename T>
static Set createFromVector(const std::vector<T>& items);

void clear() {
values.clear();
}
Expand Down Expand Up @@ -68,6 +72,16 @@ struct Set {
inline std::ostream& operator<<(std::ostream& os, const Set& s) {
return os << s.toString();
}

// define using template static factory method
template <typename T>
inline Set Set::createFromVector(const std::vector<T>& items) {
std::unordered_set<Value> values;
for (const auto& item : items) {
values.emplace(Value(item));
}
return Set(std::move(values));
}
} // namespace nebula

namespace std {
Expand Down
Loading
Loading