Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for the List/Set type #5914

Merged
merged 36 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
4ac8f7e
Delete logs directory
YZW00 Jul 31, 2024
c584979
Delete data directory
YZW00 Jul 31, 2024
9ebe28f
Delete etc directory
YZW00 Jul 31, 2024
be0b1d4
Merged local changes into New_ListSet
YZW00 Jul 31, 2024
14149ba
Merged local changes into New_ListSet
YZW00 Jul 31, 2024
8ae1389
Merge branch 'New_ListSet' of github.com:YZW00/Nebula-List_and_Set in…
YZW00 Jul 31, 2024
744c33c
Merged local changes into New_ListSet
YZW00 Jul 31, 2024
2ba2e04
Merged local changes into New_ListSet
YZW00 Jul 31, 2024
9278b60
Delete share/resources directory
YZW00 Aug 1, 2024
6f9b0ce
Merged local changes into New_ListSet
YZW00 Aug 1, 2024
3562880
Merge branch 'New_ListSet' of github.com:YZW00/Nebula-List_and_Set in…
YZW00 Aug 1, 2024
24ca613
Merged local changes into New_ListSet
YZW00 Aug 21, 2024
d020725
Merge branch 'master' into New_ListSet
Salieri-004 Aug 22, 2024
85da782
New changes into New_ListSet
YZW00 Aug 25, 2024
bcb6a50
Merge branch 'New_ListSet' of github.com:YZW00/Nebula-List_and_Set in…
YZW00 Aug 25, 2024
8f165f9
New changes into New_ListSet
YZW00 Aug 25, 2024
3710e0f
New changes into New_ListSet
YZW00 Aug 25, 2024
98792f5
New changes into New_ListSet
YZW00 Aug 25, 2024
bf47108
New changes into New_ListSet
YZW00 Aug 25, 2024
7929ad1
New changes into New_ListSet
YZW00 Aug 27, 2024
59fcb12
New changes into New_ListSet
YZW00 Aug 31, 2024
b6052b7
Merge branch 'master' into New_ListSet
Salieri-004 Sep 24, 2024
75c0408
Latest changes into New_ListSet
YZW00 Sep 29, 2024
9b3fa01
Merge branch 'New_ListSet' of github.com:YZW00/Nebula-List_and_Set in…
YZW00 Sep 29, 2024
051b677
Merge branch 'master' into New_ListSet
YZW00 Oct 6, 2024
373d7a0
Merge branch 'master' into New_ListSet
YZW00 Oct 12, 2024
c956252
Latest changes into New_ListSet
YZW00 Oct 12, 2024
5e57d83
Latest changes into New_ListSet
YZW00 Oct 12, 2024
26b6ce7
Latest changes into New_ListSet
YZW00 Oct 15, 2024
a828dd6
Merge branch 'New_ListSet' of github.com:YZW00/Nebula-List_and_Set in…
YZW00 Oct 15, 2024
9e59530
Latest changes into New_ListSet
YZW00 Oct 15, 2024
0724a77
Latest changes into New_ListSet
YZW00 Oct 24, 2024
7bc462f
Merge branch 'master' into New_ListSet
YZW00 Oct 24, 2024
b112fa8
Latest changes into New_ListSet
YZW00 Oct 24, 2024
2658e17
Latest changes into New_ListSet
YZW00 Oct 24, 2024
32669b8
Merge branch 'New_ListSet' of github.com:YZW00/Nebula-List_and_Set in…
YZW00 Oct 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 0 additions & 90 deletions src/codec/RowReaderV2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,51 +267,6 @@ Value RowReaderV2::getValueByIndex(const int64_t index) const {
return nebula::extractIntOrFloat<int32_t, List>(data_, offset);
case PropertyType::LIST_FLOAT:
return nebula::extractIntOrFloat<float, List>(data_, offset);
case PropertyType::LIST_LIST_STRING: {
int32_t numLists;
if (offset + sizeof(int32_t) > data_.size()) {
LOG(ERROR) << "Offset out of bounds for LIST_LIST_STRING.";
return Value(NullType::BAD_DATA);
}
memcpy(&numLists, &data_[offset], sizeof(int32_t));
offset += sizeof(int32_t);
std::vector<List> listOfLists;
listOfLists.reserve(numLists);
for (int i = 0; i < numLists; ++i) {
int32_t listLen;
if (offset + sizeof(int32_t) > data_.size()) {
LOG(ERROR) << "List length out of bounds for LIST_LIST_STRING.";
return Value(NullType::BAD_DATA);
}
memcpy(&listLen, &data_[offset], sizeof(int32_t));
offset += sizeof(int32_t);
std::vector<std::string> strings;
strings.reserve(listLen);
for (int j = 0; j < listLen; ++j) {
int32_t strLen;
if (offset + sizeof(int32_t) > data_.size()) {
LOG(ERROR) << "String length out of bounds for LIST_LIST_STRING.";
return Value(NullType::BAD_DATA);
}
memcpy(&strLen, &data_[offset], sizeof(int32_t));
offset += sizeof(int32_t);
if (offset + strLen > data_.size()) {
LOG(ERROR) << "String offset out of bounds for LIST_LIST_STRING.";
return Value(NullType::BAD_DATA);
}
strings.push_back(std::string(&data_[offset], strLen));
offset += strLen;
}
List list = List::createFromVector(strings);
listOfLists.push_back(std::move(list));
}
List outerList = List::createFromVector(listOfLists);
return Value(std::move(outerList));
}
case PropertyType::LIST_LIST_INT:
return nebula::extractIntOrFloat<int64_t, List>(data_, offset);
case PropertyType::LIST_LIST_FLOAT:
return nebula::extractIntOrFloat<float, List>(data_, offset);
case PropertyType::SET_STRING: {
int32_t setOffset;
memcpy(reinterpret_cast<void*>(&setOffset), &data_[offset], sizeof(int32_t));
Expand Down Expand Up @@ -346,51 +301,6 @@ Value RowReaderV2::getValueByIndex(const int64_t index) const {
return nebula::extractIntOrFloat<int32_t, Set>(data_, offset);
case PropertyType::SET_FLOAT:
return nebula::extractIntOrFloat<float, Set>(data_, offset);
case PropertyType::SET_SET_STRING: {
int32_t numSets;
if (offset + sizeof(int32_t) > data_.size()) {
LOG(ERROR) << "Offset out of bounds for SET_SET_STRING.";
return Value(NullType::BAD_DATA);
}
memcpy(&numSets, &data_[offset], sizeof(int32_t));
offset += sizeof(int32_t);
std::vector<Set> setsOfSets;
setsOfSets.reserve(numSets);
for (int i = 0; i < numSets; ++i) {
int32_t setLen;
if (offset + sizeof(int32_t) > data_.size()) {
LOG(ERROR) << "Set length out of bounds for SET_SET_STRING.";
return Value(NullType::BAD_DATA);
}
memcpy(&setLen, &data_[offset], sizeof(int32_t));
offset += sizeof(int32_t);
std::vector<std::string> strings;
strings.reserve(setLen);
for (int j = 0; j < setLen; ++j) {
int32_t strLen;
if (offset + sizeof(int32_t) > data_.size()) {
LOG(ERROR) << "String length out of bounds for SET_SET_STRING.";
return Value(NullType::BAD_DATA);
}
memcpy(&strLen, &data_[offset], sizeof(int32_t));
offset += sizeof(int32_t);
if (offset + strLen > data_.size()) {
LOG(ERROR) << "String offset out of bounds for SET_SET_STRING.";
return Value(NullType::BAD_DATA);
}
strings.push_back(std::string(&data_[offset], strLen));
offset += strLen;
}
Set innerSet = Set::createFromVector(strings);
setsOfSets.push_back(std::move(innerSet));
}
Set outerSet = Set::createFromVector(setsOfSets);
return Value(std::move(outerSet));
}
case PropertyType::SET_SET_INT:
return nebula::extractIntOrFloat<int64_t, Set>(data_, offset);
case PropertyType::SET_SET_FLOAT:
return nebula::extractIntOrFloat<float, Set>(data_, offset);
case PropertyType::UNKNOWN:
break;
}
Expand Down
181 changes: 93 additions & 88 deletions src/codec/RowWriterV2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,61 @@ namespace nebula {

using nebula::cpp2::PropertyType;

// Template functions for handling different types (e.g. strings, ints, floats) in lists and
// collections
template <typename Container>
WriteResult writeContainer(const Container& container,
Value::Type valueType,
bool isSetType,
std::unordered_set<std::string>& serializedStrings,
Salieri-004 marked this conversation as resolved.
Show resolved Hide resolved
std::unordered_set<int32_t>& serializedInts,
std::unordered_set<float>& serializedFloats,
std::string& buffer) {
for (const auto& item : container.values) {
if (item.type() != valueType) {
LOG(ERROR) << "Type mismatch: Expected " << static_cast<int>(valueType) << " but got "
<< static_cast<int>(item.type());
return WriteResult::TYPE_MISMATCH;
}
switch (valueType) {
case Value::Type::STRING: {
std::string str = item.getStr();
if (isSetType && serializedStrings.find(str) != serializedStrings.end()) {
continue;
}
int32_t strLen = str.size();
buffer.append(reinterpret_cast<const char*>(&strLen), sizeof(int32_t));
buffer.append(str.data(), strLen);
serializedStrings.insert(str);
break;
}
case Value::Type::INT: {
int32_t intVal = item.getInt();
if (isSetType && serializedInts.find(intVal) != serializedInts.end()) {
continue;
}
buffer.append(reinterpret_cast<const char*>(&intVal), sizeof(int32_t));
serializedInts.insert(intVal);
break;
}
case Value::Type::FLOAT: {
float floatVal = item.getFloat();
if (isSetType && serializedFloats.find(floatVal) != serializedFloats.end()) {
continue;
}
buffer.append(reinterpret_cast<const char*>(&floatVal), sizeof(float));
serializedFloats.insert(floatVal);
break;
}
default:
LOG(ERROR) << "Unsupported value type: " << static_cast<int>(valueType);
return WriteResult::TYPE_MISMATCH;
}
}

return WriteResult::SUCCEEDED;
}

RowWriterV2::RowWriterV2(const meta::NebulaSchemaProvider* schema)
: schema_(schema), numNullBytes_(0), approxStrLen_(0), finished_(false), outOfSpaceStr_(false) {
CHECK(!!schema_);
Expand Down Expand Up @@ -829,57 +884,34 @@ WriteResult RowWriterV2::write(ssize_t index, const Geography& v) {
std::string wkb = v.asWKB();
return write(index, folly::StringPiece(wkb), true);
}

WriteResult RowWriterV2::write(ssize_t index, const List& list) {
auto field = schema_->field(index);
auto offset = headerLen_ + numNullBytes_ + field->offset();

if (isSet_[index]) {
outOfSpaceStr_ = true;
}

int32_t listSize = list.size();
int32_t listOffset = buf_.size();
buf_.append(reinterpret_cast<char*>(&listSize), sizeof(int32_t));

for (const auto& item : list.values) {
switch (item.type()) {
case Value::Type::STRING: {
if (field->type() != PropertyType::LIST_STRING) {
LOG(ERROR) << "Type mismatch: Expected LIST_STRING but got " << item.type()
<< " for field " << field->name();
return WriteResult::TYPE_MISMATCH;
}
std::string str = item.getStr();
int32_t strLen = str.size();
buf_.append(reinterpret_cast<char*>(&strLen), sizeof(int32_t));
buf_.append(str.data(), strLen);
break;
}
case Value::Type::INT: {
if (field->type() != PropertyType::LIST_INT) {
LOG(ERROR) << "Type mismatch: Expected LIST_INT but got " << item.type() << " for field "
<< field->name();
return WriteResult::TYPE_MISMATCH;
}
int32_t intVal = item.getInt();
buf_.append(reinterpret_cast<char*>(&intVal), sizeof(int32_t));
break;
}
case Value::Type::FLOAT: {
if (field->type() != PropertyType::LIST_FLOAT) {
LOG(ERROR) << "Type mismatch: Expected LIST_FLOAT but got " << item.type()
<< " for field " << field->name();
return WriteResult::TYPE_MISMATCH;
}
float floatVal = item.getFloat();
buf_.append(reinterpret_cast<char*>(&floatVal), sizeof(float));
break;
}
default:
LOG(ERROR) << "Type mismatch: Unexpected type " << item.type() << " for field "
<< field->name();
return WriteResult::TYPE_MISMATCH;
}
buf_.append(reinterpret_cast<const char*>(&listSize), sizeof(int32_t));
std::unordered_set<std::string> serializedStrings;
std::unordered_set<int32_t> serializedInts;
std::unordered_set<float> serializedFloats;
Value::Type valueType;
if (field->type() == PropertyType::LIST_STRING) {
valueType = Value::Type::STRING;
} else if (field->type() == PropertyType::LIST_INT) {
valueType = Value::Type::INT;
} else if (field->type() == PropertyType::LIST_FLOAT) {
valueType = Value::Type::FLOAT;
} else {
LOG(ERROR) << "Unsupported list type: " << static_cast<int>(field->type());
return WriteResult::TYPE_MISMATCH;
}
auto result = writeContainer(
list, valueType, false, serializedStrings, serializedInts, serializedFloats, buf_);
if (result != WriteResult::SUCCEEDED) {
return result;
}

memcpy(&buf_[offset], reinterpret_cast<void*>(&listOffset), sizeof(int32_t));
Expand All @@ -889,63 +921,36 @@ WriteResult RowWriterV2::write(ssize_t index, const List& list) {
isSet_[index] = true;
return WriteResult::SUCCEEDED;
}

WriteResult RowWriterV2::write(ssize_t index, const Set& set) {
auto field = schema_->field(index);
auto offset = headerLen_ + numNullBytes_ + field->offset();

if (isSet_[index]) {
outOfSpaceStr_ = true;
}

int32_t setSize = set.size();
int32_t setOffset = buf_.size();
buf_.append(reinterpret_cast<char*>(&setSize), sizeof(int32_t));

buf_.append(reinterpret_cast<const char*>(&setSize), sizeof(int32_t));
std::unordered_set<std::string> serializedStrings;
std::unordered_set<int32_t> serializedInts;
std::unordered_set<float> serializedFloats;

for (const auto& item : set.values) {
switch (item.type()) {
case Value::Type::STRING: {
if (field->type() != PropertyType::SET_STRING) {
return WriteResult::TYPE_MISMATCH;
}
std::string str = item.getStr();
if (serializedStrings.find(str) == serializedStrings.end()) {
int32_t strLen = str.size();
buf_.append(reinterpret_cast<char*>(&strLen), sizeof(int32_t));
buf_.append(str.data(), strLen);
serializedStrings.insert(str);
}
break;
}
case Value::Type::INT: {
if (field->type() != PropertyType::SET_INT) {
return WriteResult::TYPE_MISMATCH;
}
int32_t intVal = item.getInt();
if (serializedInts.find(intVal) == serializedInts.end()) {
buf_.append(reinterpret_cast<char*>(&intVal), sizeof(int32_t));
serializedInts.insert(intVal);
}
break;
}
case Value::Type::FLOAT: {
if (field->type() != PropertyType::SET_FLOAT) {
return WriteResult::TYPE_MISMATCH;
}
float floatVal = item.getFloat();
if (serializedFloats.find(floatVal) == serializedFloats.end()) {
buf_.append(reinterpret_cast<char*>(&floatVal), sizeof(float));
serializedFloats.insert(floatVal);
}
break;
}
default:
return WriteResult::TYPE_MISMATCH;
}
Value::Type valueType;
if (field->type() == PropertyType::SET_STRING) {
valueType = Value::Type::STRING;
} else if (field->type() == PropertyType::SET_INT) {
valueType = Value::Type::INT;
} else if (field->type() == PropertyType::SET_FLOAT) {
valueType = Value::Type::FLOAT;
} else {
LOG(ERROR) << "Unsupported set type: " << static_cast<int>(field->type());
return WriteResult::TYPE_MISMATCH;
}
auto result = writeContainer(
set, valueType, true, serializedStrings, serializedInts, serializedFloats, buf_);
if (result != WriteResult::SUCCEEDED) {
return result;
}

memcpy(&buf_[offset], reinterpret_cast<void*>(&setOffset), sizeof(int32_t));
if (field->nullable()) {
clearNullBit(field->nullFlagPos());
Expand Down
4 changes: 4 additions & 0 deletions src/codec/RowWriterV2.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,11 @@ class RowWriterV2 {
WriteResult write(ssize_t index, const Duration& v);

WriteResult write(ssize_t index, const Geography& v);
// Supports storing ordered lists of strings, integers, and floats,
// including LIST_STRING, LIST_INT, and LIST_FLOAT.
WriteResult write(ssize_t index, const List& list);
YZW00 marked this conversation as resolved.
Show resolved Hide resolved
// Supports storing unordered sets of strings, integers, and floats,
// including SET_STRING, SET_INT, and SET_FLOAT
WriteResult write(ssize_t index, const Set& set);
};

Expand Down
36 changes: 17 additions & 19 deletions src/common/function/FunctionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1388,43 +1388,41 @@ FunctionManager::FunctionManager() {
if (args[0].get().isNull() || args[1].get().isNull() || args[2].get().isNull()) {
return Value::kNullValue;
}

if (args[0].get().isStr() && args[1].get().isStr() && args[2].get().isStr()) {
std::string origStr(args[0].get().getStr());
std::string search(args[1].get().getStr());
std::string newStr(args[2].get().getStr());
return boost::replace_all_copy(origStr, search, newStr);
std::string &origStr = const_cast<std::string &>(args[0].get().getStr());
YZW00 marked this conversation as resolved.
Show resolved Hide resolved
std::string search = args[1].get().getStr();
std::string newStr = args[2].get().getStr();
boost::replace_all(origStr, search, newStr);
return origStr;
}

if (args[0].get().isList()) {
auto list = args[0].get().getList();
auto &list = const_cast<List &>(args[0].get().getList());
YZW00 marked this conversation as resolved.
Show resolved Hide resolved
auto search = args[1].get();
auto newValue = args[2].get();
List result;
for (auto &item : list.values) {
if (item == search) {
result.values.emplace_back(newValue);
} else {
result.values.emplace_back(item);
item = newValue;
}
}
return result;
return list;
}

if (args[0].get().isSet()) {
auto set = args[0].get().getSet();
auto &set = const_cast<Set &>(args[0].get().getSet());
YZW00 marked this conversation as resolved.
Show resolved Hide resolved
auto search = args[1].get();
auto newValue = args[2].get();
Set result;
for (auto &item : set.values) {
if (item == search) {
result.values.insert(newValue);
} else {
result.values.insert(item);
}
if (set.values.erase(search)) {
set.values.insert(newValue);
}
return result;
return set;
}

return Value::kNullBadType;
};
}

{
auto &attr = functions_["erase"];
attr.minArity_ = 2;
Expand Down
Loading