Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nidx dev deploy #2632

Merged
merged 4 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion charts/nucliadb_shared/templates/nucliadb.cm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,12 @@ data:
{{- if .Values.flag_settings_url }}
FLAG_SETTINGS_URL: "{{ .Values.flag_settings_url }}"
{{- end }}
{{- end }}

{{- with .Values.nidx }}
javitonino marked this conversation as resolved.
Show resolved Hide resolved
NIDX_ENABLED: "{{ .Values.flag_settings_url }}"
INDEX_NIDX_SUBJECT: {{ .nats_subject }}
NIDX_API_ADDRESS: {{ .api_address }}
NIDX_SEARCHER_ADDRES: {{ .searcher_address }}
{{- end }}

{{- end }}
5 changes: 5 additions & 0 deletions charts/nucliadb_shared/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,8 @@ flag_settings_url: null

encryption:
secret_key: XX

# nidx:
# api_address:
# searcher_address:
# nats_subject:
13 changes: 7 additions & 6 deletions nidx/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions nidx/nidx_binding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ anyhow = "1.0.93"
nidx = { version = "0.1.0", path = ".." }
nidx_protos = { version = "0.1.0", path = "../nidx_protos" }
pyo3 = "0.22.0"
tempfile = "3.14.0"
tokio = "1.41.1"
tonic = "0.12.3"
7 changes: 5 additions & 2 deletions nidx/nidx_binding/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use nidx::Settings;
use nidx_protos::prost::*;
use nidx_protos::IndexMessage;
use std::collections::HashMap;
use std::path::Path;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
use tempfile::{tempdir, TempDir};
use tokio::runtime::Runtime;
use tokio::sync::watch;

Expand All @@ -39,6 +39,7 @@ pub struct NidxBinding {
seq: SeqSource,
runtime: Option<Runtime>,
sync_watcher: watch::Receiver<SyncStatus>,
_searcher_work_dir: TempDir,
}

#[pymethods]
Expand Down Expand Up @@ -99,8 +100,9 @@ impl NidxBinding {
tokio::task::spawn(api_server.serve(api_service));

// Searcher API
let searcher_work_dir = tempdir()?;
let (sync_reporter, sync_watcher) = watch::channel(SyncStatus::Syncing);
let searcher = SyncedSearcher::new(settings.metadata.clone(), Path::new("/tmp/searcher"));
let searcher = SyncedSearcher::new(settings.metadata.clone(), searcher_work_dir.path());
let searcher_api = SearchServer::new(settings.metadata.clone(), searcher.index_cache());
let searcher_server = GrpcServer::new("localhost:0").await?;
let searcher_port = searcher_server.port()?;
Expand Down Expand Up @@ -130,6 +132,7 @@ impl NidxBinding {
seq,
runtime: None,
sync_watcher,
_searcher_work_dir: searcher_work_dir,
})
}
}
Expand Down
32 changes: 24 additions & 8 deletions nucliadb/src/nucliadb/common/cluster/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
ShardsNotFound,
)
from nucliadb.common.maindb.driver import Transaction
from nucliadb.common.nidx import get_nidx, get_nidx_api_client
from nucliadb.common.nidx import NIDX_ENABLED, get_nidx, get_nidx_api_client, get_nidx_fake_node
from nucliadb_protos import (
knowledgebox_pb2,
nodereader_pb2,
Expand Down Expand Up @@ -144,13 +144,17 @@ async def apply_for_all_shards(
kbid: str,
aw: Callable[[AbstractIndexNode, str], Awaitable[Any]],
timeout: float,
*,
use_nidx: bool,
use_read_replica_nodes: bool = False,
) -> list[Any]:
shards = await self.get_shards_by_kbid(kbid)
ops = []

for shard_obj in shards:
node, shard_id = choose_node(shard_obj, use_read_replica_nodes=use_read_replica_nodes)
node, shard_id = choose_node(
shard_obj, use_nidx=use_nidx, use_read_replica_nodes=use_read_replica_nodes
)
if shard_id is None:
raise ShardNotFound("Found a node but not a shard")

Expand Down Expand Up @@ -349,13 +353,9 @@ async def delete_resource(

if nidx is not None:
nidxpb: nodewriter_pb2.IndexMessage = nodewriter_pb2.IndexMessage()
nidxpb.node = node_id
nidxpb.shard = shard.shard
nidxpb.txid = txid
nidxpb.resource = uuid
nidxpb.typemessage = nodewriter_pb2.TypeMessage.DELETION
nidxpb.partition = partition
nidxpb.kbid = kb
await nidx.index(nidxpb)

async def add_resource(
Expand Down Expand Up @@ -439,8 +439,12 @@ async def _create_vectorset(node: AbstractIndexNode, shard_id: str):
)

await self.apply_for_all_shards(
kbid, _create_vectorset, timeout=10, use_read_replica_nodes=False
kbid, _create_vectorset, timeout=10, use_nidx=False, use_read_replica_nodes=False
)
if NIDX_ENABLED:
await self.apply_for_all_shards(
kbid, _create_vectorset, timeout=10, use_nidx=True, use_read_replica_nodes=False
)

async def delete_vectorset(self, kbid: str, vectorset_id: str):
"""Delete a vectorset from all KB shards"""
Expand All @@ -453,8 +457,12 @@ async def _delete_vectorset(node: AbstractIndexNode, shard_id: str):
)

await self.apply_for_all_shards(
kbid, _delete_vectorset, timeout=10, use_read_replica_nodes=False
kbid, _delete_vectorset, timeout=10, use_nidx=False, use_read_replica_nodes=False
)
if NIDX_ENABLED:
await self.apply_for_all_shards(
kbid, _delete_vectorset, timeout=10, use_nidx=True, use_read_replica_nodes=False
)


class StandaloneKBShardManager(KBShardManager):
Expand Down Expand Up @@ -590,6 +598,7 @@ def get_all_shard_nodes(
def choose_node(
shard: writer_pb2.ShardObject,
*,
use_nidx: bool,
target_shard_replicas: Optional[list[str]] = None,
use_read_replica_nodes: bool = False,
) -> tuple[AbstractIndexNode, str]:
Expand All @@ -605,6 +614,13 @@ def choose_node(
`target_shard_replicas` is the least preferent.

"""

# Use nidx if requested and enabled, fallback to node
if use_nidx:
fake_node = get_nidx_fake_node()
if fake_node:
return fake_node, shard.shard

target_shard_replicas = target_shard_replicas or []

shard_nodes = get_all_shard_nodes(shard, use_read_replicas=use_read_replica_nodes)
Expand Down
5 changes: 3 additions & 2 deletions nucliadb/src/nucliadb/common/cluster/rebalance.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ async def get_shards_paragraphs(kbid: str) -> list[tuple[str, int]]:

results = {}
for shard_meta in kb_shards.shards:
node, shard_id = choose_node(shard_meta)
# Rebalance using node as source of truth. But it will rebalance nidx
node, shard_id = choose_node(shard_meta, use_nidx=False)
shard_data: nodereader_pb2.Shard = await node.reader.GetShard(
nodereader_pb2.GetShardRequest(shard_id=noderesources_pb2.ShardId(id=shard_id)) # type: ignore
)
Expand Down Expand Up @@ -101,7 +102,7 @@ async def move_set_of_kb_resources(
from_shard = [s for s in kb_shards.shards if s.shard == from_shard_id][0]
to_shard = [s for s in kb_shards.shards if s.shard == to_shard_id][0]

from_node, from_shard_replica_id = choose_node(from_shard)
from_node, from_shard_replica_id = choose_node(from_shard, use_nidx=False)
search_response: nodereader_pb2.SearchResponse = await from_node.reader.Search( # type: ignore
nodereader_pb2.SearchRequest(
shard=from_shard_replica_id,
Expand Down
53 changes: 36 additions & 17 deletions nucliadb/src/nucliadb/common/nidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import os
from typing import Optional

from nidx_protos.nidx_pb2_grpc import NidxApiStub, NidxSearcherStub

from nucliadb.common.cluster.base import AbstractIndexNode
from nucliadb.common.cluster.settings import settings
from nucliadb.ingest.settings import DriverConfig
Expand All @@ -35,15 +37,7 @@
from nucliadb_utils.storages.settings import settings as extended_storage_settings
from nucliadb_utils.utilities import Utility, clean_utility, get_utility, set_utility

NIDX_INSTALLED = False
if os.environ.get("NIDX_ENABLED"):
try:
# TODO: Remove this ignore once nidx_protos is actually required
from nidx_protos.nidx_pb2_grpc import NidxApiStub, NidxSearcherStub # type: ignore

NIDX_INSTALLED = True
except ImportError:
logger.info("nidx not installed")
NIDX_ENABLED = bool(os.environ.get("NIDX_ENABLED"))


class NidxUtility:
Expand Down Expand Up @@ -134,10 +128,10 @@ class NidxServiceUtility(NidxUtility):

def __init__(self):
if indexing_settings.index_nidx_subject is None:
raise ValueError("nidx subject needed for nidx utility")
raise ValueError("INDEX_NIDX_SUBJECT needed for nidx utility")

if not settings.nidx_api_address or not settings.nidx_searcher_address:
raise ValueError("NIDX_API and NIDX_SEARCHER are required")
raise ValueError("NIDX_API_ADDRESS and NIDX_SEARCHER_ADDRESS are required")

self.nats_connection_manager = NatsConnectionManager(
service_name="NidxIndexer",
Expand Down Expand Up @@ -165,7 +159,7 @@ async def index(self, writer: IndexMessage) -> int:


async def start_nidx_utility() -> Optional[NidxUtility]:
if not NIDX_INSTALLED:
if not NIDX_ENABLED:
return None

nidx = get_nidx()
Expand Down Expand Up @@ -211,17 +205,42 @@ def get_nidx_searcher_client() -> Optional["NidxSearcherStub"]:
return None


# TODO: Remove the index node abstraction
class NodeNidxAdapter:
def __init__(self, api_client, searcher_client):
# API methods
self.GetShard = api_client.GetShard
self.NewShard = api_client.NewShard
self.DeleteShard = api_client.DeleteShard
self.ListShards = api_client.ListShards
self.AddVectorSet = api_client.AddVectorSet
self.RemoveVectorSet = api_client.RemoveVectorSet
self.ListVectorSets = api_client.ListVectorSets
self.GetMetadata = api_client.GetMetadata

# Searcher methods
self.DocumentIds = searcher_client.DocumentIds
self.ParagraphIds = searcher_client.ParagraphIds
self.VectorIds = searcher_client.VectorIds
self.RelationIds = searcher_client.RelationIds
self.RelationEdges = searcher_client.RelationEdges
self.Search = searcher_client.Search
self.Suggest = searcher_client.Suggest
self.Paragraphs = searcher_client.Paragraphs
self.Documents = searcher_client.Documents


class FakeNode(AbstractIndexNode):
def __init__(self, searcher_client):
self.client = searcher_client
def __init__(self, api_client, searcher_client):
self.client = NodeNidxAdapter(api_client, searcher_client)

@property
def reader(self):
return self.client

@property
def writer(self):
return None
return self.client

def is_read_replica(_):
return False
Expand All @@ -240,8 +259,8 @@ def primary_id(self):


def get_nidx_fake_node() -> Optional[FakeNode]:
nidx = get_nidx_searcher_client()
nidx = get_nidx()
if nidx:
return FakeNode(nidx)
return FakeNode(nidx.api_client, nidx.searcher_client)
else:
return None
3 changes: 2 additions & 1 deletion nucliadb/src/nucliadb/ingest/consumer/auditing.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ async def process_kb(self, kbid: str) -> None:
total_paragraphs = 0

for shard_obj in shard_groups:
node, shard_id = choose_node(shard_obj)
# TODO: Uses node for auditing, don't want to suddenly change metrics
node, shard_id = choose_node(shard_obj, use_nidx=False)
shard: nodereader_pb2.Shard = await node.reader.GetShard(
nodereader_pb2.GetShardRequest(shard_id=noderesources_pb2.ShardId(id=shard_id)) # type: ignore
)
Expand Down
2 changes: 1 addition & 1 deletion nucliadb/src/nucliadb/ingest/consumer/shard_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ async def process_kb(self, kbid: str) -> None:
async with locking.distributed_lock(locking.NEW_SHARD_LOCK.format(kbid=kbid)):
# remember, a lock will do at least 1+ reads and 1 write.
# with heavy writes, this adds some simple k/v pressure
node, shard_id = choose_node(current_shard)
node, shard_id = choose_node(current_shard, use_nidx=True)
shard: nodereader_pb2.Shard = await node.reader.GetShard(
nodereader_pb2.GetShardRequest(shard_id=noderesources_pb2.ShardId(id=shard_id)) # type: ignore
)
Expand Down
4 changes: 4 additions & 0 deletions nucliadb/src/nucliadb/ingest/orm/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
from nucliadb_protos.utils_pb2 import RelationNode
from nucliadb_protos.writer_pb2 import GetEntitiesResponse
from nucliadb_telemetry import errors
from nucliadb_utils import const
from nucliadb_utils.utilities import has_feature

from .exceptions import EntityManagementException

Expand Down Expand Up @@ -223,6 +225,7 @@ async def do_entities_search(node: AbstractIndexNode, shard_id: str) -> Relation
self.kbid,
do_entities_search,
settings.relation_search_timeout,
use_nidx=has_feature(const.Features.NIDX_READS, context={"kbid": self.kbid}),
use_read_replica_nodes=self.use_read_replica_nodes,
)
for result in results:
Expand Down Expand Up @@ -324,6 +327,7 @@ async def query_indexed_entities_group_names(node: AbstractIndexNode, shard_id:
self.kbid,
query_indexed_entities_group_names,
settings.relation_types_timeout,
use_nidx=has_feature(const.Features.NIDX_READS, context={"kbid": self.kbid}),
use_read_replica_nodes=self.use_read_replica_nodes,
)
for result in results:
Expand Down
6 changes: 5 additions & 1 deletion nucliadb/src/nucliadb/search/api/v1/knowledgebox.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@
from nucliadb_protos.writer_pb2 import ShardObject as PBShardObject
from nucliadb_protos.writer_pb2 import Shards
from nucliadb_telemetry import errors
from nucliadb_utils import const
from nucliadb_utils.authentication import requires, requires_one
from nucliadb_utils.utilities import has_feature

MAX_PARAGRAPHS_FOR_SMALL_KB = 250_000

Expand Down Expand Up @@ -159,7 +161,9 @@ async def get_node_index_counts(kbid: str) -> tuple[IndexCounts, list[str]]:
queried_shards = []
for shard_object in shard_groups:
try:
node, shard_id = choose_node(shard_object)
node, shard_id = choose_node(
shard_object, use_nidx=has_feature(const.Features.NIDX_READS, context={"kbid": kbid})
)
except KeyError:
raise HTTPException(
status_code=500,
Expand Down
Loading
Loading