Skip to content

Commit

Permalink
Optimize the compression of inverted index position information
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzxl1993 committed Oct 17, 2024
1 parent 5a458e6 commit 1761933
Show file tree
Hide file tree
Showing 16 changed files with 514 additions and 25 deletions.
18 changes: 13 additions & 5 deletions src/core/CLucene/index/FieldInfos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,23 +125,26 @@ void FieldInfos::addIndexed(const TCHAR** names, const bool storeTermVectors, co

void FieldInfos::add(const TCHAR** names, const bool isIndexed, const bool storeTermVectors,
const bool storePositionWithTermVector, const bool storeOffsetWithTermVector,
const bool omitNorms, const bool hasProx, const bool storePayloads) {
const bool omitNorms, const bool hasProx, const bool storePayloads,
IndexVersion indexVersion) {
size_t i = 0;
while ( names[i] != NULL ){
add(names[i], isIndexed, storeTermVectors, storePositionWithTermVector,
storeOffsetWithTermVector, omitNorms, hasProx, storePayloads);
storeOffsetWithTermVector, omitNorms, hasProx, storePayloads, indexVersion);
++i;
}
}

FieldInfo* FieldInfos::add(const TCHAR* name, const bool isIndexed, const bool storeTermVector,
const bool storePositionWithTermVector,
const bool storeOffsetWithTermVector, const bool omitNorms,
const bool hasProx, const bool storePayloads) {
const bool hasProx, const bool storePayloads,
IndexVersion indexVersion) {
FieldInfo* fi = fieldInfo(name);
if (fi == NULL) {
return addInternal(name, isIndexed, storeTermVector, storePositionWithTermVector,
storeOffsetWithTermVector, omitNorms, hasProx, storePayloads);
storeOffsetWithTermVector, omitNorms, hasProx, storePayloads,
indexVersion);
} else {
if (fi->isIndexed != isIndexed) {
fi->isIndexed = true; // once indexed, always index
Expand All @@ -164,6 +167,9 @@ FieldInfo* FieldInfos::add(const TCHAR* name, const bool isIndexed, const bool s
if (fi->storePayloads != storePayloads) {
fi->storePayloads = true;
}
if (fi->indexVersion_ != indexVersion) {
fi->indexVersion_ = indexVersion;
}
}
return fi;
}
Expand All @@ -172,10 +178,12 @@ FieldInfo* FieldInfos::addInternal(const TCHAR* name, const bool isIndexed,
const bool storeTermVector,
const bool storePositionWithTermVector,
const bool storeOffsetWithTermVector, const bool omitNorms,
const bool hasProx, const bool storePayloads) {
const bool hasProx, const bool storePayloads,
IndexVersion indexVersion) {
FieldInfo* fi = _CLNEW FieldInfo(name, isIndexed, byNumber.size(), storeTermVector,
storePositionWithTermVector, storeOffsetWithTermVector,
omitNorms, hasProx, storePayloads);
fi->setIndexVersion(indexVersion);
byNumber.push_back(fi);
byName.put( fi->name, fi);
return fi;
Expand Down
1 change: 1 addition & 0 deletions src/core/CLucene/index/IndexVersion.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
enum class IndexVersion {
kV0 = 0,
kV1 = 1,
kV2 = 2,

kNone
};
34 changes: 27 additions & 7 deletions src/core/CLucene/index/IndexWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "CLucene/analysis/AnalysisHeader.h"
#include "CLucene/analysis/Analyzers.h"
#include "CLucene/config/repl_wchar.h"
#include "CLucene/document/Document.h"
#include "CLucene/search/Similarity.h"
#include "CLucene/store/Directory.h"
Expand Down Expand Up @@ -1327,22 +1328,27 @@ void IndexWriter::indexCompaction(std::vector<lucene::store::Directory *> &src_d
std::vector<lucene::index::IndexWriter *> destIndexWriterList;
std::vector<lucene::store::IndexOutput *> nullBitmapIndexOutputList;
try {
// check hasProx
// check hasProx, indexVersion
bool hasProx = false;
IndexVersion indexVersion = IndexVersion::kV1;
{
if (!readers.empty()) {
IndexReader* reader = readers[0];
hasProx = reader->getFieldInfos()->hasProx();
indexVersion = reader->getFieldInfos()->getIndexVersion();
for (int32_t i = 1; i < readers.size(); i++) {
if (hasProx != readers[i]->getFieldInfos()->hasProx()) {
_CLTHROWA(CL_ERR_IllegalArgument, "src_dirs hasProx inconformity");
}
if (indexVersion != readers[i]->getFieldInfos()->getIndexVersion()) {
_CLTHROWA(CL_ERR_IllegalArgument, "src_dirs indexVersion inconformity");
}
}
}
}

/// merge fields
mergeFields(hasProx);
mergeFields(hasProx, indexVersion);

/// write fields and create files writers
for (int j = 0; j < numDestIndexes; j++) {
Expand Down Expand Up @@ -1390,7 +1396,7 @@ void IndexWriter::indexCompaction(std::vector<lucene::store::Directory *> &src_d
}

/// merge terms
mergeTerms(hasProx);
mergeTerms(hasProx, indexVersion);

/// merge null_bitmap
mergeNullBitmap(srcNullBitmapValues, nullBitmapIndexOutputList);
Expand Down Expand Up @@ -1555,7 +1561,7 @@ void IndexWriter::compareIndexes(lucene::store::Directory *other) {
}
}

void IndexWriter::mergeFields(bool hasProx) {
void IndexWriter::mergeFields(bool hasProx, IndexVersion indexVersion) {
//Create a new FieldInfos
fieldInfos = _CLNEW FieldInfos();
//Condition check to see if fieldInfos points to a valid instance
Expand All @@ -1570,7 +1576,8 @@ void IndexWriter::mergeFields(bool hasProx) {
FieldInfo *fi = reader->getFieldInfos()->fieldInfo(j);
fieldInfos->add(fi->name, fi->isIndexed, fi->storeTermVector,
fi->storePositionWithTermVector, fi->storeOffsetWithTermVector,
!reader->hasNorms(fi->name), hasProx, fi->storePayloads);
!reader->hasNorms(fi->name), hasProx, fi->storePayloads,
fi->indexVersion_);
}
}

Expand Down Expand Up @@ -1614,7 +1621,7 @@ class postingQueue : public CL_NS(util)::PriorityQueue<DestDoc*,CL_NS(util)::Del

};

void IndexWriter::mergeTerms(bool hasProx) {
void IndexWriter::mergeTerms(bool hasProx, IndexVersion indexVersion) {
auto queue = _CLNEW SegmentMergeQueue(readers.size());
auto numSrcIndexes = readers.size();
//std::vector<TermPositions *> postingsList(numSrcIndexes);
Expand Down Expand Up @@ -1667,6 +1674,7 @@ void IndexWriter::mergeTerms(bool hasProx) {

std::vector<std::vector<uint32_t>> docDeltaBuffers(numDestIndexes);
std::vector<std::vector<uint32_t>> freqBuffers(numDestIndexes);
std::vector<std::vector<uint32_t>> posBuffers(numDestIndexes);
auto destPostingQueues = _CLNEW postingQueue(matchSize);
std::vector<DestDoc> destDocs(matchSize);

Expand Down Expand Up @@ -1758,6 +1766,7 @@ void IndexWriter::mergeTerms(bool hasProx) {
auto proxOut = proxOutputList[destIdx];
auto& docDeltaBuffer = docDeltaBuffers[destIdx];
auto& freqBuffer = freqBuffers[destIdx];
auto& posBuffer = posBuffers[destIdx];
auto skipWriter = skipListWriterList[destIdx];
auto& df = dfs[destIdx];
auto& lastDoc = lastDocs[destIdx];
Expand All @@ -1776,6 +1785,9 @@ void IndexWriter::mergeTerms(bool hasProx) {
encode(freqOut, docDeltaBuffer, true);
if (hasProx) {
encode(freqOut, freqBuffer, false);
if (indexVersion >= IndexVersion::kV2) {
PforUtil::encodePos(proxOut, posBuffer);
}
}

skipWriter->setSkipData(lastDoc, false, -1);
Expand All @@ -1791,7 +1803,11 @@ void IndexWriter::mergeTerms(bool hasProx) {
for (int32_t i = 0; i < descPositions.size(); i++) {
int32_t position = descPositions[i];
int32_t delta = position - lastPosition;
proxOut->writeVInt(delta);
if (indexVersion >= IndexVersion::kV2) {
posBuffer.push_back(delta);
} else {
proxOut->writeVInt(delta);
}
lastPosition = position;
}
freqBuffer.push_back(destFreq);
Expand Down Expand Up @@ -1828,6 +1844,7 @@ void IndexWriter::mergeTerms(bool hasProx) {
{
auto& docDeltaBuffer = docDeltaBuffers[i];
auto& freqBuffer = freqBuffers[i];
auto& posBuffer = posBuffers[i];

freqOutput->writeByte((char)CodeMode::kDefault);
freqOutput->writeVInt(docDeltaBuffer.size());
Expand All @@ -1851,6 +1868,9 @@ void IndexWriter::mergeTerms(bool hasProx) {
}
docDeltaBuffer.resize(0);
freqBuffer.resize(0);
if (indexVersion >= IndexVersion::kV2) {
PforUtil::encodePos(proxOutput, posBuffer);
}
}

int64_t skipPointer = skipListWriter->writeSkip(freqOutput);
Expand Down
5 changes: 3 additions & 2 deletions src/core/CLucene/index/IndexWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#ifndef _lucene_index_IndexWriter_
#define _lucene_index_IndexWriter_

#include "CLucene/index/IndexVersion.h"
#include "CLucene/util/VoidList.h"
#include "CLucene/util/Array.h"

Expand Down Expand Up @@ -320,11 +321,11 @@ class CLUCENE_EXPORT IndexWriter:LUCENE_BASE {
std::vector<uint32_t> dest_index_docs);

// create new fields info
void mergeFields(bool hasProx);
void mergeFields(bool hasProx, IndexVersion indexVersion);
// write fields info file
void writeFields(lucene::store::Directory* d, std::string segment);
// merge terms and write files
void mergeTerms(bool hasProx);
void mergeTerms(bool hasProx, IndexVersion indexVersion);
// merge null_bitmap
void mergeNullBitmap(std::vector<std::vector<uint32_t>> srcBitmapValues, std::vector<lucene::store::IndexOutput *> nullBitmapIndexOutputList);

Expand Down
12 changes: 11 additions & 1 deletion src/core/CLucene/index/SDocumentWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1222,6 +1222,9 @@ void SDocumentsWriter<T>::appendPostings(ArrayBase<typename ThreadState::FieldDa
encode(freqOut, docDeltaBuffer, true);
if (hasProx_) {
encode(freqOut, freqBuffer, false);
if (indexVersion_ >= IndexVersion::kV2) {
PforUtil::encodePos(proxOut, posBuffer);
}
}

skipListWriter->setSkipData(lastDoc, currentFieldStorePayloads, lastPayloadLength);
Expand Down Expand Up @@ -1253,7 +1256,11 @@ void SDocumentsWriter<T>::appendPostings(ArrayBase<typename ThreadState::FieldDa
for (int32_t j = 0; j < termDocFreq; j++) {
const int32_t code = prox.readVInt();
assert(0 == (code & 1));
proxOut->writeVInt(code >> 1);
if (indexVersion_ >= IndexVersion::kV2) {
posBuffer.push_back(code >> 1);
} else {
proxOut->writeVInt(code >> 1);
}
}
freqBuffer.push_back(termDocFreq);
}
Expand Down Expand Up @@ -1310,6 +1317,9 @@ void SDocumentsWriter<T>::appendPostings(ArrayBase<typename ThreadState::FieldDa
}
docDeltaBuffer.resize(0);
freqBuffer.resize(0);
if (indexVersion_ >= IndexVersion::kV2) {
PforUtil::encodePos(proxOut, posBuffer);
}
}

int64_t skipPointer = skipListWriter->writeSkip(freqOut);
Expand Down
1 change: 1 addition & 0 deletions src/core/CLucene/index/SDocumentWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class SDocumentsWriter : public IDocumentsWriter {
std::string segment;// Current segment we are working on
std::vector<uint32_t> docDeltaBuffer;
std::vector<uint32_t> freqBuffer;
std::vector<uint32_t> posBuffer;
std::ostream* infoStream{};
int64_t ramBufferSize;
// Flush @ this number of docs. If rarmBufferSize is
Expand Down
4 changes: 2 additions & 2 deletions src/core/CLucene/index/SegmentTermDocs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ void TermDocsBuffer::refill() {
cur_doc_ = 0;
cur_freq_ = 0;

if (indexVersion_ == IndexVersion::kV1) {
if (indexVersion_ >= IndexVersion::kV1) {
size_ = refillV1();
} else {
size_ = refillV0();
Expand All @@ -199,7 +199,7 @@ void TermDocsBuffer::refill() {

void TermDocsBuffer::readRange(DocRange* docRange) {
int32_t size = 0;
if (indexVersion_ == IndexVersion::kV1) {
if (indexVersion_ >= IndexVersion::kV1) {
size = refillV1();
} else {
size = refillV0();
Expand Down
16 changes: 11 additions & 5 deletions src/core/CLucene/index/SegmentTermPositions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ CL_NS_DEF(index)
SegmentTermPositions::SegmentTermPositions(const SegmentReader* _parent):
SegmentTermDocs(_parent), proxStream(NULL)// the proxStream will be cloned lazily when nextPosition() is called for the first time
,lazySkipPointer(-1), lazySkipProxCount(0)
, indexVersion_(_parent->_fieldInfos->getIndexVersion())
, buffer_(proxStream, indexVersion_)
{
CND_CONDITION(_parent != NULL, "Parent is NULL");
}
Expand Down Expand Up @@ -64,14 +66,15 @@ int32_t SegmentTermPositions::nextPosition() {
}

int32_t SegmentTermPositions::readDeltaPosition() {
int32_t delta = proxStream->readVInt();
int32_t delta = buffer_.getPos();
if (currentFieldStoresPayloads) {
// if the current field stores payloads then
// the position delta is shifted one bit to the left.
// if the LSB is set, then we have to read the current
// payload length
if ((delta & 1) != 0) {
payloadLength = proxStream->readVInt();
// payloadLength = proxStream->readVInt();
_CLTHROWA(CL_ERR_UnsupportedOperation, "Processing the payload flow is not supported at the moment");
}
delta = (int32_t)((uint32_t)delta >> (uint32_t)1);
needToLoadPayload = true;
Expand Down Expand Up @@ -122,7 +125,8 @@ void SegmentTermPositions::skipPositions(const int32_t n) {

void SegmentTermPositions::skipPayload() {
if (needToLoadPayload && payloadLength > 0) {
proxStream->seek(proxStream->getFilePointer() + payloadLength);
// proxStream->seek(proxStream->getFilePointer() + payloadLength);
_CLTHROWA(CL_ERR_UnsupportedOperation, "Processing the payload flow is not supported at the moment");
}
needToLoadPayload = false;
}
Expand All @@ -131,14 +135,15 @@ void SegmentTermPositions::lazySkip() {
if (proxStream == NULL) {
// clone lazily
proxStream = parent->proxStream->clone();
buffer_.reset(proxStream);
}

// we might have to skip the current payload
// if it was not read yet
skipPayload();

if (lazySkipPointer != -1) {
proxStream->seek(lazySkipPointer);
buffer_.seek(lazySkipPointer);
lazySkipPointer = -1;
}

Expand Down Expand Up @@ -166,7 +171,8 @@ uint8_t* SegmentTermPositions::getPayload(uint8_t* data) {
} else {
retArray = data;
}
proxStream->readBytes(retArray, payloadLength);
// proxStream->readBytes(retArray, payloadLength);
_CLTHROWA(CL_ERR_UnsupportedOperation, "Processing the payload flow is not supported at the moment");
needToLoadPayload = false;
return retArray;
}
Expand Down
9 changes: 6 additions & 3 deletions src/core/CLucene/index/_FieldInfos.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ class CLUCENE_EXPORT FieldInfos :LUCENE_BASE{
void add(const TCHAR** names, const bool isIndexed, const bool storeTermVector = false,
const bool storePositionWithTermVector = false,
const bool storeOffsetWithTermVector = false, const bool omitNorms = false,
const bool hasProx = false, const bool storePayloads = false);
const bool hasProx = false, const bool storePayloads = false,
IndexVersion indexVersion = IndexVersion::kV1);

// Merges in information from another FieldInfos.
void add(FieldInfos* other);
Expand All @@ -167,13 +168,15 @@ class CLUCENE_EXPORT FieldInfos :LUCENE_BASE{
FieldInfo* add(const TCHAR* name, const bool isIndexed, const bool storeTermVector = false,
const bool storePositionWithTermVector = false,
const bool storeOffsetWithTermVector = false, const bool omitNorms = false,
const bool hasProx = false, const bool storePayloads = false);
const bool hasProx = false, const bool storePayloads = false,
IndexVersion indexVersion = IndexVersion::kV1);

// was void
FieldInfo* addInternal(const TCHAR* name, const bool isIndexed, const bool storeTermVector,
const bool storePositionWithTermVector,
const bool storeOffsetWithTermVector, const bool omitNorms,
const bool hasProx, const bool storePayloads);
const bool hasProx, const bool storePayloads,
IndexVersion indexVersion = IndexVersion::kV1);

int32_t fieldNumber(const TCHAR* fieldName)const;

Expand Down
Loading

0 comments on commit 1761933

Please sign in to comment.