Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
Merge pull request #350 from gnes-ai/rocksdb
Browse files Browse the repository at this point in the history
feat(rocksdb): add rocks db indexer
  • Loading branch information
mergify[bot] authored Oct 21, 2019
2 parents 97efb59 + 839bd7e commit 69ea3fd
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 0 deletions.
1 change: 1 addition & 0 deletions gnes/indexer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
_cls2file_map = {
'FaissIndexer': 'chunk.faiss',
'LVDBIndexer': 'doc.leveldb',
'RocksDBIndexer': 'doc.rocksdb',
'AsyncLVDBIndexer': 'doc.leveldb',
'NumpyIndexer': 'chunk.numpy',
'BIndexer': 'chunk.bindexer',
Expand Down
123 changes: 123 additions & 0 deletions gnes/indexer/doc/rocksdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Tencent is pleased to support the open source community by making GNES available.
#
# Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import gc
import pickle
from typing import List, Any

from ..base import BaseDocIndexer as BDI
from ...proto import gnes_pb2


class RocksDBIndexer(BDI):

def __init__(self, data_path: str,
drop_raw_data: bool = False,
drop_chunk_blob: bool = False,
read_only: bool = False,
*args, **kwargs):
super().__init__(*args, **kwargs)
self.data_path = data_path
self.drop_raw_data = drop_raw_data
self.drop_chunk_blob = drop_chunk_blob
self.read_only = read_only
self.kwargs = kwargs

def post_init(self):
import rocksdb

opts = rocksdb.Options()
opts.create_if_missing = True
opts.max_open_files = 300000
opts.write_buffer_size = 67108864
opts.max_write_buffer_number = 3
opts.target_file_size_base = 67108864

opts.table_factory = rocksdb.BlockBasedTableFactory(
filter_policy=rocksdb.BloomFilterPolicy(10),
block_cache=rocksdb.LRUCache(2 * (1024 ** 3)),
block_cache_compressed=rocksdb.LRUCache(500 * (1024 ** 2)))

for key, value in self.kwargs.items():
setattr(opts, key, value)

self._db = rocksdb.DB(self.data_path, opts, read_only=self.read_only)


@BDI.update_counter
def add(self, keys: List[int], docs: List['gnes_pb2.Document'], *args, **kwargs):
import rocksdb

write_batch = rocksdb.WriteBatch()
for k, d in zip(keys, docs):
key_bytes = pickle.dumps(k)
if self.drop_raw_data:
d.ClearField('raw_data')
if self.drop_chunk_blob:
for c in d.chunks:
c.ClearField('blob')
value_bytes = d.SerializeToString()
write_batch.put(key_bytes, value_bytes)
self._db.write(write_batch, sync=True)


def query(self, keys: List[int], *args, **kwargs) -> List['gnes_pb2.Document']:
query_keys = []
for k in keys:
key_value = pickle.dumps(k)
query_keys.append(key_value)

values = self._db.multi_get(query_keys)

docs = []
for k in query_keys:
v = values[k]
if v is not None:
_doc = gnes_pb2.Document()
_doc.ParseFromString(v)
docs.append(_doc)
else:
docs.append(None)
return docs


def scan(self, reversed_scan: bool=False):
iterator = self._db.iterkeys()

if reversed_scan:
iterator.seek_to_last()
else:
iterator.seek_to_first()

if reversed_scan:
iterator = reversed(iterator)

for key_bytes in iterator:
doc_id = pickle.loads(key_bytes)
value_bytes = self._db.get(key_bytes)
pb_doc = gnes_pb2.Document()
pb_doc.ParseFromString(value_bytes)

yield doc_id, pb_doc


def close(self):
super().close()
try:
del self._db
except AttributeError:
pass
gc.collect()

0 comments on commit 69ea3fd

Please sign in to comment.