Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](inverted index) Optimize the compression of inverted index position information #242

Merged
merged 1 commit into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions src/core/CLucene/index/FieldInfos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,23 +125,26 @@ void FieldInfos::addIndexed(const TCHAR** names, const bool storeTermVectors, co

void FieldInfos::add(const TCHAR** names, const bool isIndexed, const bool storeTermVectors,
const bool storePositionWithTermVector, const bool storeOffsetWithTermVector,
const bool omitNorms, const bool hasProx, const bool storePayloads) {
const bool omitNorms, const bool hasProx, const bool storePayloads,
IndexVersion indexVersion) {
size_t i = 0;
while ( names[i] != NULL ){
add(names[i], isIndexed, storeTermVectors, storePositionWithTermVector,
storeOffsetWithTermVector, omitNorms, hasProx, storePayloads);
storeOffsetWithTermVector, omitNorms, hasProx, storePayloads, indexVersion);
++i;
}
}

FieldInfo* FieldInfos::add(const TCHAR* name, const bool isIndexed, const bool storeTermVector,
const bool storePositionWithTermVector,
const bool storeOffsetWithTermVector, const bool omitNorms,
const bool hasProx, const bool storePayloads) {
const bool hasProx, const bool storePayloads,
IndexVersion indexVersion) {
FieldInfo* fi = fieldInfo(name);
if (fi == NULL) {
return addInternal(name, isIndexed, storeTermVector, storePositionWithTermVector,
storeOffsetWithTermVector, omitNorms, hasProx, storePayloads);
storeOffsetWithTermVector, omitNorms, hasProx, storePayloads,
indexVersion);
} else {
if (fi->isIndexed != isIndexed) {
fi->isIndexed = true; // once indexed, always index
Expand All @@ -164,6 +167,9 @@ FieldInfo* FieldInfos::add(const TCHAR* name, const bool isIndexed, const bool s
if (fi->storePayloads != storePayloads) {
fi->storePayloads = true;
}
if (fi->indexVersion_ != indexVersion) {
fi->indexVersion_ = indexVersion;
}
}
return fi;
}
Expand All @@ -172,10 +178,12 @@ FieldInfo* FieldInfos::addInternal(const TCHAR* name, const bool isIndexed,
const bool storeTermVector,
const bool storePositionWithTermVector,
const bool storeOffsetWithTermVector, const bool omitNorms,
const bool hasProx, const bool storePayloads) {
const bool hasProx, const bool storePayloads,
IndexVersion indexVersion) {
FieldInfo* fi = _CLNEW FieldInfo(name, isIndexed, byNumber.size(), storeTermVector,
storePositionWithTermVector, storeOffsetWithTermVector,
omitNorms, hasProx, storePayloads);
fi->setIndexVersion(indexVersion);
byNumber.push_back(fi);
byName.put( fi->name, fi);
return fi;
Expand Down
1 change: 1 addition & 0 deletions src/core/CLucene/index/IndexVersion.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
enum class IndexVersion {
kV0 = 0,
kV1 = 1,
kV2 = 2,

kNone
};
34 changes: 27 additions & 7 deletions src/core/CLucene/index/IndexWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "CLucene/analysis/AnalysisHeader.h"
#include "CLucene/analysis/Analyzers.h"
#include "CLucene/config/repl_wchar.h"
#include "CLucene/document/Document.h"
#include "CLucene/search/Similarity.h"
#include "CLucene/store/Directory.h"
Expand Down Expand Up @@ -1327,22 +1328,27 @@ void IndexWriter::indexCompaction(std::vector<lucene::store::Directory *> &src_d
std::vector<lucene::index::IndexWriter *> destIndexWriterList;
std::vector<lucene::store::IndexOutput *> nullBitmapIndexOutputList;
try {
// check hasProx
// check hasProx, indexVersion
bool hasProx = false;
IndexVersion indexVersion = IndexVersion::kV1;
{
if (!readers.empty()) {
IndexReader* reader = readers[0];
hasProx = reader->getFieldInfos()->hasProx();
indexVersion = reader->getFieldInfos()->getIndexVersion();
for (int32_t i = 1; i < readers.size(); i++) {
if (hasProx != readers[i]->getFieldInfos()->hasProx()) {
_CLTHROWA(CL_ERR_IllegalArgument, "src_dirs hasProx inconformity");
}
if (indexVersion != readers[i]->getFieldInfos()->getIndexVersion()) {
_CLTHROWA(CL_ERR_IllegalArgument, "src_dirs indexVersion inconformity");
}
}
}
}

/// merge fields
mergeFields(hasProx);
mergeFields(hasProx, indexVersion);

/// write fields and create files writers
for (int j = 0; j < numDestIndexes; j++) {
Expand Down Expand Up @@ -1390,7 +1396,7 @@ void IndexWriter::indexCompaction(std::vector<lucene::store::Directory *> &src_d
}

/// merge terms
mergeTerms(hasProx);
mergeTerms(hasProx, indexVersion);

/// merge null_bitmap
mergeNullBitmap(srcNullBitmapValues, nullBitmapIndexOutputList);
Expand Down Expand Up @@ -1555,7 +1561,7 @@ void IndexWriter::compareIndexes(lucene::store::Directory *other) {
}
}

void IndexWriter::mergeFields(bool hasProx) {
void IndexWriter::mergeFields(bool hasProx, IndexVersion indexVersion) {
//Create a new FieldInfos
fieldInfos = _CLNEW FieldInfos();
//Condition check to see if fieldInfos points to a valid instance
Expand All @@ -1570,7 +1576,8 @@ void IndexWriter::mergeFields(bool hasProx) {
FieldInfo *fi = reader->getFieldInfos()->fieldInfo(j);
fieldInfos->add(fi->name, fi->isIndexed, fi->storeTermVector,
fi->storePositionWithTermVector, fi->storeOffsetWithTermVector,
!reader->hasNorms(fi->name), hasProx, fi->storePayloads);
!reader->hasNorms(fi->name), hasProx, fi->storePayloads,
fi->indexVersion_);
}
}

Expand Down Expand Up @@ -1614,7 +1621,7 @@ class postingQueue : public CL_NS(util)::PriorityQueue<DestDoc*,CL_NS(util)::Del

};

void IndexWriter::mergeTerms(bool hasProx) {
void IndexWriter::mergeTerms(bool hasProx, IndexVersion indexVersion) {
auto queue = _CLNEW SegmentMergeQueue(readers.size());
auto numSrcIndexes = readers.size();
//std::vector<TermPositions *> postingsList(numSrcIndexes);
Expand Down Expand Up @@ -1667,6 +1674,7 @@ void IndexWriter::mergeTerms(bool hasProx) {

std::vector<std::vector<uint32_t>> docDeltaBuffers(numDestIndexes);
std::vector<std::vector<uint32_t>> freqBuffers(numDestIndexes);
std::vector<std::vector<uint32_t>> posBuffers(numDestIndexes);
auto destPostingQueues = _CLNEW postingQueue(matchSize);
std::vector<DestDoc> destDocs(matchSize);

Expand Down Expand Up @@ -1758,6 +1766,7 @@ void IndexWriter::mergeTerms(bool hasProx) {
auto proxOut = proxOutputList[destIdx];
auto& docDeltaBuffer = docDeltaBuffers[destIdx];
auto& freqBuffer = freqBuffers[destIdx];
auto& posBuffer = posBuffers[destIdx];
auto skipWriter = skipListWriterList[destIdx];
auto& df = dfs[destIdx];
auto& lastDoc = lastDocs[destIdx];
Expand All @@ -1776,6 +1785,9 @@ void IndexWriter::mergeTerms(bool hasProx) {
encode(freqOut, docDeltaBuffer, true);
if (hasProx) {
encode(freqOut, freqBuffer, false);
if (indexVersion >= IndexVersion::kV2) {
PforUtil::encodePos(proxOut, posBuffer);
}
}

skipWriter->setSkipData(lastDoc, false, -1);
Expand All @@ -1791,7 +1803,11 @@ void IndexWriter::mergeTerms(bool hasProx) {
for (int32_t i = 0; i < descPositions.size(); i++) {
int32_t position = descPositions[i];
int32_t delta = position - lastPosition;
proxOut->writeVInt(delta);
if (indexVersion >= IndexVersion::kV2) {
posBuffer.push_back(delta);
} else {
proxOut->writeVInt(delta);
}
lastPosition = position;
}
freqBuffer.push_back(destFreq);
Expand Down Expand Up @@ -1828,6 +1844,7 @@ void IndexWriter::mergeTerms(bool hasProx) {
{
auto& docDeltaBuffer = docDeltaBuffers[i];
auto& freqBuffer = freqBuffers[i];
auto& posBuffer = posBuffers[i];

freqOutput->writeByte((char)CodeMode::kDefault);
freqOutput->writeVInt(docDeltaBuffer.size());
Expand All @@ -1851,6 +1868,9 @@ void IndexWriter::mergeTerms(bool hasProx) {
}
docDeltaBuffer.resize(0);
freqBuffer.resize(0);
if (indexVersion >= IndexVersion::kV2) {
PforUtil::encodePos(proxOutput, posBuffer);
}
}

int64_t skipPointer = skipListWriter->writeSkip(freqOutput);
Expand Down
5 changes: 3 additions & 2 deletions src/core/CLucene/index/IndexWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#ifndef _lucene_index_IndexWriter_
#define _lucene_index_IndexWriter_

#include "CLucene/index/IndexVersion.h"
#include "CLucene/util/VoidList.h"
#include "CLucene/util/Array.h"

Expand Down Expand Up @@ -320,11 +321,11 @@ class CLUCENE_EXPORT IndexWriter:LUCENE_BASE {
std::vector<uint32_t> dest_index_docs);

// create new fields info
void mergeFields(bool hasProx);
void mergeFields(bool hasProx, IndexVersion indexVersion);
// write fields info file
void writeFields(lucene::store::Directory* d, std::string segment);
// merge terms and write files
void mergeTerms(bool hasProx);
void mergeTerms(bool hasProx, IndexVersion indexVersion);
// merge null_bitmap
void mergeNullBitmap(std::vector<std::vector<uint32_t>> srcBitmapValues, std::vector<lucene::store::IndexOutput *> nullBitmapIndexOutputList);

Expand Down
12 changes: 11 additions & 1 deletion src/core/CLucene/index/SDocumentWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1222,6 +1222,9 @@ void SDocumentsWriter<T>::appendPostings(ArrayBase<typename ThreadState::FieldDa
encode(freqOut, docDeltaBuffer, true);
if (hasProx_) {
encode(freqOut, freqBuffer, false);
if (indexVersion_ >= IndexVersion::kV2) {
PforUtil::encodePos(proxOut, posBuffer);
}
}

skipListWriter->setSkipData(lastDoc, currentFieldStorePayloads, lastPayloadLength);
Expand Down Expand Up @@ -1253,7 +1256,11 @@ void SDocumentsWriter<T>::appendPostings(ArrayBase<typename ThreadState::FieldDa
for (int32_t j = 0; j < termDocFreq; j++) {
const int32_t code = prox.readVInt();
assert(0 == (code & 1));
proxOut->writeVInt(code >> 1);
if (indexVersion_ >= IndexVersion::kV2) {
posBuffer.push_back(code >> 1);
} else {
proxOut->writeVInt(code >> 1);
}
}
freqBuffer.push_back(termDocFreq);
}
Expand Down Expand Up @@ -1310,6 +1317,9 @@ void SDocumentsWriter<T>::appendPostings(ArrayBase<typename ThreadState::FieldDa
}
docDeltaBuffer.resize(0);
freqBuffer.resize(0);
if (indexVersion_ >= IndexVersion::kV2) {
PforUtil::encodePos(proxOut, posBuffer);
}
}

int64_t skipPointer = skipListWriter->writeSkip(freqOut);
Expand Down
1 change: 1 addition & 0 deletions src/core/CLucene/index/SDocumentWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class SDocumentsWriter : public IDocumentsWriter {
std::string segment;// Current segment we are working on
std::vector<uint32_t> docDeltaBuffer;
std::vector<uint32_t> freqBuffer;
std::vector<uint32_t> posBuffer;
std::ostream* infoStream{};
int64_t ramBufferSize;
// Flush @ this number of docs. If rarmBufferSize is
Expand Down
4 changes: 2 additions & 2 deletions src/core/CLucene/index/SegmentTermDocs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ void TermDocsBuffer::refill() {
cur_doc_ = 0;
cur_freq_ = 0;

if (indexVersion_ == IndexVersion::kV1) {
if (indexVersion_ >= IndexVersion::kV1) {
size_ = refillV1();
} else {
size_ = refillV0();
Expand All @@ -199,7 +199,7 @@ void TermDocsBuffer::refill() {

void TermDocsBuffer::readRange(DocRange* docRange) {
int32_t size = 0;
if (indexVersion_ == IndexVersion::kV1) {
if (indexVersion_ >= IndexVersion::kV1) {
size = refillV1();
} else {
size = refillV0();
Expand Down
16 changes: 11 additions & 5 deletions src/core/CLucene/index/SegmentTermPositions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ CL_NS_DEF(index)
SegmentTermPositions::SegmentTermPositions(const SegmentReader* _parent):
SegmentTermDocs(_parent), proxStream(NULL)// the proxStream will be cloned lazily when nextPosition() is called for the first time
,lazySkipPointer(-1), lazySkipProxCount(0)
, indexVersion_(_parent->_fieldInfos->getIndexVersion())
, buffer_(proxStream, indexVersion_)
{
CND_CONDITION(_parent != NULL, "Parent is NULL");
}
Expand Down Expand Up @@ -64,14 +66,15 @@ int32_t SegmentTermPositions::nextPosition() {
}

int32_t SegmentTermPositions::readDeltaPosition() {
int32_t delta = proxStream->readVInt();
int32_t delta = buffer_.getPos();
if (currentFieldStoresPayloads) {
// if the current field stores payloads then
// the position delta is shifted one bit to the left.
// if the LSB is set, then we have to read the current
// payload length
if ((delta & 1) != 0) {
payloadLength = proxStream->readVInt();
// payloadLength = proxStream->readVInt();
_CLTHROWA(CL_ERR_UnsupportedOperation, "Processing the payload flow is not supported at the moment");
}
delta = (int32_t)((uint32_t)delta >> (uint32_t)1);
needToLoadPayload = true;
Expand Down Expand Up @@ -122,7 +125,8 @@ void SegmentTermPositions::skipPositions(const int32_t n) {

void SegmentTermPositions::skipPayload() {
if (needToLoadPayload && payloadLength > 0) {
proxStream->seek(proxStream->getFilePointer() + payloadLength);
// proxStream->seek(proxStream->getFilePointer() + payloadLength);
_CLTHROWA(CL_ERR_UnsupportedOperation, "Processing the payload flow is not supported at the moment");
}
needToLoadPayload = false;
}
Expand All @@ -131,14 +135,15 @@ void SegmentTermPositions::lazySkip() {
if (proxStream == NULL) {
// clone lazily
proxStream = parent->proxStream->clone();
buffer_.reset(proxStream);
}

// we might have to skip the current payload
// if it was not read yet
skipPayload();

if (lazySkipPointer != -1) {
proxStream->seek(lazySkipPointer);
buffer_.seek(lazySkipPointer);
lazySkipPointer = -1;
}

Expand Down Expand Up @@ -166,7 +171,8 @@ uint8_t* SegmentTermPositions::getPayload(uint8_t* data) {
} else {
retArray = data;
}
proxStream->readBytes(retArray, payloadLength);
// proxStream->readBytes(retArray, payloadLength);
_CLTHROWA(CL_ERR_UnsupportedOperation, "Processing the payload flow is not supported at the moment");
needToLoadPayload = false;
return retArray;
}
Expand Down
9 changes: 6 additions & 3 deletions src/core/CLucene/index/_FieldInfos.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ class CLUCENE_EXPORT FieldInfos :LUCENE_BASE{
void add(const TCHAR** names, const bool isIndexed, const bool storeTermVector = false,
const bool storePositionWithTermVector = false,
const bool storeOffsetWithTermVector = false, const bool omitNorms = false,
const bool hasProx = false, const bool storePayloads = false);
const bool hasProx = false, const bool storePayloads = false,
IndexVersion indexVersion = IndexVersion::kV1);

// Merges in information from another FieldInfos.
void add(FieldInfos* other);
Expand All @@ -167,13 +168,15 @@ class CLUCENE_EXPORT FieldInfos :LUCENE_BASE{
FieldInfo* add(const TCHAR* name, const bool isIndexed, const bool storeTermVector = false,
const bool storePositionWithTermVector = false,
const bool storeOffsetWithTermVector = false, const bool omitNorms = false,
const bool hasProx = false, const bool storePayloads = false);
const bool hasProx = false, const bool storePayloads = false,
IndexVersion indexVersion = IndexVersion::kV1);

// was void
FieldInfo* addInternal(const TCHAR* name, const bool isIndexed, const bool storeTermVector,
const bool storePositionWithTermVector,
const bool storeOffsetWithTermVector, const bool omitNorms,
const bool hasProx, const bool storePayloads);
const bool hasProx, const bool storePayloads,
IndexVersion indexVersion = IndexVersion::kV1);

int32_t fieldNumber(const TCHAR* fieldName)const;

Expand Down
Loading
Loading