From 2638f7862b76ef69216590e17bd2dbf1f992313c Mon Sep 17 00:00:00 2001 From: Javier Torres Date: Thu, 14 Nov 2024 13:33:40 +0100 Subject: [PATCH 1/4] Include nidx-protos in images --- nucliadb/tests/search/fixtures.py | 4 ++-- pdm.lock | 6 ++---- pyproject.toml | 9 +++++---- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/nucliadb/tests/search/fixtures.py b/nucliadb/tests/search/fixtures.py index 6ae0c0c0f6..259d2a9cc0 100644 --- a/nucliadb/tests/search/fixtures.py +++ b/nucliadb/tests/search/fixtures.py @@ -205,9 +205,9 @@ async def wait_for_shard(knowledgebox_ingest: str, count: int) -> str: await txn.abort() checks: dict[str, bool] = {} - if os.environ.get("NIDX_ENABLED"): + nidx_api = get_nidx_api_client() + if nidx_api: checks[""] = False - nidx_api = get_nidx_api_client() req = GetShardRequest() req.shard_id.id = shard.shard for i in range(30): diff --git a/pdm.lock b/pdm.lock index 7f337417e1..84485cf013 100644 --- a/pdm.lock +++ b/pdm.lock @@ -5,7 +5,7 @@ groups = ["default", "dev", "nidx", "sdk", "sidecar"] strategy = ["inherit_metadata"] lock_version = "4.5.0" -content_hash = "sha256:6978f28085ba2ae6fa7ff1e22eb326952cf82d99889685a307293f258a59afc9" +content_hash = "sha256:6d48f679c11d3bea635a4ad30770dd4f52172340a65bace9b60f863dfc77c0d2" [[metadata.targets]] requires_python = ">=3.9" @@ -2030,7 +2030,6 @@ files = [ name = "nidx-binding" version = "0.1.0" requires_python = ">=3.8" -editable = true path = "./nidx/nidx_binding" summary = "" groups = ["nidx"] @@ -2038,10 +2037,9 @@ groups = ["nidx"] [[package]] name = "nidx-protos" version = "0.0.1" -editable = true path = "./nidx/nidx_protos" summary = "" -groups = ["nidx"] +groups = ["default"] [[package]] name = "nkeys" diff --git a/pyproject.toml b/pyproject.toml index cf7c06c6ea..dc095cc5bf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,12 +4,17 @@ requires-python = ">=3.9" version = "0.0.0" dependencies = [ "nucliadb @ file:///${PROJECT_ROOT}/nucliadb", + # TODO: We don't want to add nidx-protos as a nucliadb dependency yet + "nidx-protos @ file:///${PROJECT_ROOT}/nidx/nidx_protos", ] [project.optional-dependencies] sidecar = [ "nucliadb-sidecar @ file:///${PROJECT_ROOT}/nucliadb_sidecar", ] +nidx = [ + "nidx-binding @ file:///${PROJECT_ROOT}/nidx/nidx_binding", +] [tool.pdm.dev-dependencies] dev = [ # List all the packages we want to install as editable @@ -61,10 +66,6 @@ sdk = [ "-e file:///${PROJECT_ROOT}/nucliadb_dataset#egg=nucliadb-dataset", "requests-mock>=1.12.1", ] -nidx = [ - "-e file:///${PROJECT_ROOT}/nidx/nidx_binding", - "-e file:///${PROJECT_ROOT}/nidx/nidx_protos", -] [tool.setuptools] py-modules = [] From a2d110c578d750459e2a71d8371d3b1ba76679aa Mon Sep 17 00:00:00 2001 From: Javier Torres Date: Thu, 14 Nov 2024 14:49:25 +0100 Subject: [PATCH 2/4] nidx enabled for all node operations --- nidx/Cargo.lock | 13 ++--- nidx/nidx_binding/Cargo.toml | 1 + nidx/nidx_binding/src/lib.rs | 7 ++- .../src/nucliadb/common/cluster/manager.py | 32 ++++++++--- .../src/nucliadb/common/cluster/rebalance.py | 5 +- nucliadb/src/nucliadb/common/nidx.py | 53 +++++++++++++------ .../src/nucliadb/ingest/consumer/auditing.py | 3 +- .../nucliadb/ingest/consumer/shard_creator.py | 2 +- nucliadb/src/nucliadb/ingest/orm/entities.py | 4 ++ .../nucliadb/search/api/v1/knowledgebox.py | 6 ++- .../src/nucliadb/search/requesters/utils.py | 9 +--- nucliadb/src/nucliadb/train/nodes.py | 3 +- .../common/cluster/test_manager.py | 10 ++-- .../nucliadb/integration/test_reindex.py | 6 ++- .../nucliadb/integration/test_vectorsets.py | 3 +- .../unit/common/cluster/test_cluster.py | 11 ++-- nucliadb/tests/search/fixtures.py | 1 - nucliadb/tests/search/node.py | 3 +- nucliadb_utils/src/nucliadb_utils/const.py | 1 + .../src/nucliadb_utils/featureflagging.py | 4 ++ 20 files changed, 117 insertions(+), 60 deletions(-) diff --git a/nidx/Cargo.lock b/nidx/Cargo.lock index 8d0cc27b22..ffda63e839 100644 --- a/nidx/Cargo.lock +++ b/nidx/Cargo.lock @@ -1200,9 +1200,9 @@ checksum = "0c2cdeb66e45e9f36bfad5bbdb4d2384e70936afbee843c6f6543f0c551ebb25" [[package]] name = "libc" -version = "0.2.159" +version = "0.2.162" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "561d97a539a36e26a9a5fad1ea11a3039a67714694aaa379433e580854bc3dc5" +checksum = "18d287de67fe55fd7e1581fe933d965a5a9477b38e949cfa9f8574ef01506398" [[package]] name = "libm" @@ -1426,6 +1426,7 @@ dependencies = [ "nidx", "nidx_protos", "pyo3", + "tempfile", "tokio", "tonic", ] @@ -2331,9 +2332,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.37" +version = "0.38.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8acb788b847c24f28525660c4d7758620a7210875711f79e7f663cc152726811" +checksum = "99e4ea3e1cdc4b559b8e5650f9c8e5998e3e5c1343b4eaf034565f32318d63c0" dependencies = [ "bitflags", "errno", @@ -3112,9 +3113,9 @@ checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1" [[package]] name = "tempfile" -version = "3.13.0" +version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0f2c9fc62d0beef6951ccffd757e241266a2c833136efbe35af6cd2567dca5b" +checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c" dependencies = [ "cfg-if", "fastrand", diff --git a/nidx/nidx_binding/Cargo.toml b/nidx/nidx_binding/Cargo.toml index 5c8cccac66..e9f72e76bb 100644 --- a/nidx/nidx_binding/Cargo.toml +++ b/nidx/nidx_binding/Cargo.toml @@ -12,5 +12,6 @@ anyhow = "1.0.93" nidx = { version = "0.1.0", path = ".." } nidx_protos = { version = "0.1.0", path = "../nidx_protos" } pyo3 = "0.22.0" +tempfile = "3.14.0" tokio = "1.41.1" tonic = "0.12.3" diff --git a/nidx/nidx_binding/src/lib.rs b/nidx/nidx_binding/src/lib.rs index 90fcc63077..e82b860fac 100644 --- a/nidx/nidx_binding/src/lib.rs +++ b/nidx/nidx_binding/src/lib.rs @@ -13,9 +13,9 @@ use nidx::Settings; use nidx_protos::prost::*; use nidx_protos::IndexMessage; use std::collections::HashMap; -use std::path::Path; use std::sync::atomic::AtomicI64; use std::sync::Arc; +use tempfile::{tempdir, TempDir}; use tokio::runtime::Runtime; use tokio::sync::watch; @@ -39,6 +39,7 @@ pub struct NidxBinding { seq: SeqSource, runtime: Option, sync_watcher: watch::Receiver, + _searcher_work_dir: TempDir, } #[pymethods] @@ -99,8 +100,9 @@ impl NidxBinding { tokio::task::spawn(api_server.serve(api_service)); // Searcher API + let searcher_work_dir = tempdir()?; let (sync_reporter, sync_watcher) = watch::channel(SyncStatus::Syncing); - let searcher = SyncedSearcher::new(settings.metadata.clone(), Path::new("/tmp/searcher")); + let searcher = SyncedSearcher::new(settings.metadata.clone(), searcher_work_dir.path()); let searcher_api = SearchServer::new(settings.metadata.clone(), searcher.index_cache()); let searcher_server = GrpcServer::new("localhost:0").await?; let searcher_port = searcher_server.port()?; @@ -130,6 +132,7 @@ impl NidxBinding { seq, runtime: None, sync_watcher, + _searcher_work_dir: searcher_work_dir, }) } } diff --git a/nucliadb/src/nucliadb/common/cluster/manager.py b/nucliadb/src/nucliadb/common/cluster/manager.py index 78317c92bd..d872874cc8 100644 --- a/nucliadb/src/nucliadb/common/cluster/manager.py +++ b/nucliadb/src/nucliadb/common/cluster/manager.py @@ -36,7 +36,7 @@ ShardsNotFound, ) from nucliadb.common.maindb.driver import Transaction -from nucliadb.common.nidx import get_nidx, get_nidx_api_client +from nucliadb.common.nidx import NIDX_ENABLED, get_nidx, get_nidx_api_client, get_nidx_fake_node from nucliadb_protos import ( knowledgebox_pb2, nodereader_pb2, @@ -144,13 +144,17 @@ async def apply_for_all_shards( kbid: str, aw: Callable[[AbstractIndexNode, str], Awaitable[Any]], timeout: float, + *, + use_nidx: bool, use_read_replica_nodes: bool = False, ) -> list[Any]: shards = await self.get_shards_by_kbid(kbid) ops = [] for shard_obj in shards: - node, shard_id = choose_node(shard_obj, use_read_replica_nodes=use_read_replica_nodes) + node, shard_id = choose_node( + shard_obj, use_nidx=use_nidx, use_read_replica_nodes=use_read_replica_nodes + ) if shard_id is None: raise ShardNotFound("Found a node but not a shard") @@ -349,13 +353,9 @@ async def delete_resource( if nidx is not None: nidxpb: nodewriter_pb2.IndexMessage = nodewriter_pb2.IndexMessage() - nidxpb.node = node_id nidxpb.shard = shard.shard - nidxpb.txid = txid nidxpb.resource = uuid nidxpb.typemessage = nodewriter_pb2.TypeMessage.DELETION - nidxpb.partition = partition - nidxpb.kbid = kb await nidx.index(nidxpb) async def add_resource( @@ -439,8 +439,12 @@ async def _create_vectorset(node: AbstractIndexNode, shard_id: str): ) await self.apply_for_all_shards( - kbid, _create_vectorset, timeout=10, use_read_replica_nodes=False + kbid, _create_vectorset, timeout=10, use_nidx=False, use_read_replica_nodes=False ) + if NIDX_ENABLED: + await self.apply_for_all_shards( + kbid, _create_vectorset, timeout=10, use_nidx=True, use_read_replica_nodes=False + ) async def delete_vectorset(self, kbid: str, vectorset_id: str): """Delete a vectorset from all KB shards""" @@ -453,8 +457,12 @@ async def _delete_vectorset(node: AbstractIndexNode, shard_id: str): ) await self.apply_for_all_shards( - kbid, _delete_vectorset, timeout=10, use_read_replica_nodes=False + kbid, _delete_vectorset, timeout=10, use_nidx=False, use_read_replica_nodes=False ) + if NIDX_ENABLED: + await self.apply_for_all_shards( + kbid, _delete_vectorset, timeout=10, use_nidx=True, use_read_replica_nodes=False + ) class StandaloneKBShardManager(KBShardManager): @@ -590,6 +598,7 @@ def get_all_shard_nodes( def choose_node( shard: writer_pb2.ShardObject, *, + use_nidx: bool, target_shard_replicas: Optional[list[str]] = None, use_read_replica_nodes: bool = False, ) -> tuple[AbstractIndexNode, str]: @@ -605,6 +614,13 @@ def choose_node( `target_shard_replicas` is the least preferent. """ + + # Use nidx if requested and enabled, fallback to node + if use_nidx: + fake_node = get_nidx_fake_node() + if fake_node: + return fake_node, shard.shard + target_shard_replicas = target_shard_replicas or [] shard_nodes = get_all_shard_nodes(shard, use_read_replicas=use_read_replica_nodes) diff --git a/nucliadb/src/nucliadb/common/cluster/rebalance.py b/nucliadb/src/nucliadb/common/cluster/rebalance.py index 88662688cc..653a16e4a4 100644 --- a/nucliadb/src/nucliadb/common/cluster/rebalance.py +++ b/nucliadb/src/nucliadb/common/cluster/rebalance.py @@ -51,7 +51,8 @@ async def get_shards_paragraphs(kbid: str) -> list[tuple[str, int]]: results = {} for shard_meta in kb_shards.shards: - node, shard_id = choose_node(shard_meta) + # Rebalance using node as source of truth. But it will rebalance nidx + node, shard_id = choose_node(shard_meta, use_nidx=False) shard_data: nodereader_pb2.Shard = await node.reader.GetShard( nodereader_pb2.GetShardRequest(shard_id=noderesources_pb2.ShardId(id=shard_id)) # type: ignore ) @@ -101,7 +102,7 @@ async def move_set_of_kb_resources( from_shard = [s for s in kb_shards.shards if s.shard == from_shard_id][0] to_shard = [s for s in kb_shards.shards if s.shard == to_shard_id][0] - from_node, from_shard_replica_id = choose_node(from_shard) + from_node, from_shard_replica_id = choose_node(from_shard, use_nidx=False) search_response: nodereader_pb2.SearchResponse = await from_node.reader.Search( # type: ignore nodereader_pb2.SearchRequest( shard=from_shard_replica_id, diff --git a/nucliadb/src/nucliadb/common/nidx.py b/nucliadb/src/nucliadb/common/nidx.py index 6ec013028d..3e8cc6a171 100644 --- a/nucliadb/src/nucliadb/common/nidx.py +++ b/nucliadb/src/nucliadb/common/nidx.py @@ -21,6 +21,8 @@ import os from typing import Optional +from nidx_protos.nidx_pb2_grpc import NidxApiStub, NidxSearcherStub + from nucliadb.common.cluster.base import AbstractIndexNode from nucliadb.common.cluster.settings import settings from nucliadb.ingest.settings import DriverConfig @@ -35,15 +37,7 @@ from nucliadb_utils.storages.settings import settings as extended_storage_settings from nucliadb_utils.utilities import Utility, clean_utility, get_utility, set_utility -NIDX_INSTALLED = False -if os.environ.get("NIDX_ENABLED"): - try: - # TODO: Remove this ignore once nidx_protos is actually required - from nidx_protos.nidx_pb2_grpc import NidxApiStub, NidxSearcherStub # type: ignore - - NIDX_INSTALLED = True - except ImportError: - logger.info("nidx not installed") +NIDX_ENABLED = bool(os.environ.get("NIDX_ENABLED")) class NidxUtility: @@ -134,10 +128,10 @@ class NidxServiceUtility(NidxUtility): def __init__(self): if indexing_settings.index_nidx_subject is None: - raise ValueError("nidx subject needed for nidx utility") + raise ValueError("INDEX_NIDX_SUBJECT needed for nidx utility") if not settings.nidx_api_address or not settings.nidx_searcher_address: - raise ValueError("NIDX_API and NIDX_SEARCHER are required") + raise ValueError("NIDX_API_ADDRESS and NIDX_SEARCHER_ADDRESS are required") self.nats_connection_manager = NatsConnectionManager( service_name="NidxIndexer", @@ -165,7 +159,7 @@ async def index(self, writer: IndexMessage) -> int: async def start_nidx_utility() -> Optional[NidxUtility]: - if not NIDX_INSTALLED: + if not NIDX_ENABLED: return None nidx = get_nidx() @@ -211,9 +205,34 @@ def get_nidx_searcher_client() -> Optional["NidxSearcherStub"]: return None +# TODO: Remove the index node abstraction +class NodeNidxAdapter: + def __init__(self, api_client, searcher_client): + # API methods + self.GetShard = api_client.GetShard + self.NewShard = api_client.NewShard + self.DeleteShard = api_client.DeleteShard + self.ListShards = api_client.ListShards + self.AddVectorSet = api_client.AddVectorSet + self.RemoveVectorSet = api_client.RemoveVectorSet + self.ListVectorSets = api_client.ListVectorSets + self.GetMetadata = api_client.GetMetadata + + # Searcher methods + self.DocumentIds = searcher_client.DocumentIds + self.ParagraphIds = searcher_client.ParagraphIds + self.VectorIds = searcher_client.VectorIds + self.RelationIds = searcher_client.RelationIds + self.RelationEdges = searcher_client.RelationEdges + self.Search = searcher_client.Search + self.Suggest = searcher_client.Suggest + self.Paragraphs = searcher_client.Paragraphs + self.Documents = searcher_client.Documents + + class FakeNode(AbstractIndexNode): - def __init__(self, searcher_client): - self.client = searcher_client + def __init__(self, api_client, searcher_client): + self.client = NodeNidxAdapter(api_client, searcher_client) @property def reader(self): @@ -221,7 +240,7 @@ def reader(self): @property def writer(self): - return None + return self.client def is_read_replica(_): return False @@ -240,8 +259,8 @@ def primary_id(self): def get_nidx_fake_node() -> Optional[FakeNode]: - nidx = get_nidx_searcher_client() + nidx = get_nidx() if nidx: - return FakeNode(nidx) + return FakeNode(nidx.api_client, nidx.searcher_client) else: return None diff --git a/nucliadb/src/nucliadb/ingest/consumer/auditing.py b/nucliadb/src/nucliadb/ingest/consumer/auditing.py index 59a044ec67..a7550e6308 100644 --- a/nucliadb/src/nucliadb/ingest/consumer/auditing.py +++ b/nucliadb/src/nucliadb/ingest/consumer/auditing.py @@ -112,7 +112,8 @@ async def process_kb(self, kbid: str) -> None: total_paragraphs = 0 for shard_obj in shard_groups: - node, shard_id = choose_node(shard_obj) + # TODO: Uses node for auditing, don't want to suddenly change metrics + node, shard_id = choose_node(shard_obj, use_nidx=False) shard: nodereader_pb2.Shard = await node.reader.GetShard( nodereader_pb2.GetShardRequest(shard_id=noderesources_pb2.ShardId(id=shard_id)) # type: ignore ) diff --git a/nucliadb/src/nucliadb/ingest/consumer/shard_creator.py b/nucliadb/src/nucliadb/ingest/consumer/shard_creator.py index 488c3be9be..6fd5ffb9c6 100644 --- a/nucliadb/src/nucliadb/ingest/consumer/shard_creator.py +++ b/nucliadb/src/nucliadb/ingest/consumer/shard_creator.py @@ -103,7 +103,7 @@ async def process_kb(self, kbid: str) -> None: async with locking.distributed_lock(locking.NEW_SHARD_LOCK.format(kbid=kbid)): # remember, a lock will do at least 1+ reads and 1 write. # with heavy writes, this adds some simple k/v pressure - node, shard_id = choose_node(current_shard) + node, shard_id = choose_node(current_shard, use_nidx=True) shard: nodereader_pb2.Shard = await node.reader.GetShard( nodereader_pb2.GetShardRequest(shard_id=noderesources_pb2.ShardId(id=shard_id)) # type: ignore ) diff --git a/nucliadb/src/nucliadb/ingest/orm/entities.py b/nucliadb/src/nucliadb/ingest/orm/entities.py index 4c840d684e..400fcf1a78 100644 --- a/nucliadb/src/nucliadb/ingest/orm/entities.py +++ b/nucliadb/src/nucliadb/ingest/orm/entities.py @@ -54,6 +54,8 @@ from nucliadb_protos.utils_pb2 import RelationNode from nucliadb_protos.writer_pb2 import GetEntitiesResponse from nucliadb_telemetry import errors +from nucliadb_utils import const +from nucliadb_utils.utilities import has_feature from .exceptions import EntityManagementException @@ -223,6 +225,7 @@ async def do_entities_search(node: AbstractIndexNode, shard_id: str) -> Relation self.kbid, do_entities_search, settings.relation_search_timeout, + use_nidx=has_feature(const.Features.NIDX_READS, context={"kbid": self.kbid}), use_read_replica_nodes=self.use_read_replica_nodes, ) for result in results: @@ -324,6 +327,7 @@ async def query_indexed_entities_group_names(node: AbstractIndexNode, shard_id: self.kbid, query_indexed_entities_group_names, settings.relation_types_timeout, + use_nidx=has_feature(const.Features.NIDX_READS, context={"kbid": self.kbid}), use_read_replica_nodes=self.use_read_replica_nodes, ) for result in results: diff --git a/nucliadb/src/nucliadb/search/api/v1/knowledgebox.py b/nucliadb/src/nucliadb/search/api/v1/knowledgebox.py index f900e5928b..33145dfd1f 100644 --- a/nucliadb/src/nucliadb/search/api/v1/knowledgebox.py +++ b/nucliadb/src/nucliadb/search/api/v1/knowledgebox.py @@ -47,7 +47,9 @@ from nucliadb_protos.writer_pb2 import ShardObject as PBShardObject from nucliadb_protos.writer_pb2 import Shards from nucliadb_telemetry import errors +from nucliadb_utils import const from nucliadb_utils.authentication import requires, requires_one +from nucliadb_utils.utilities import has_feature MAX_PARAGRAPHS_FOR_SMALL_KB = 250_000 @@ -159,7 +161,9 @@ async def get_node_index_counts(kbid: str) -> tuple[IndexCounts, list[str]]: queried_shards = [] for shard_object in shard_groups: try: - node, shard_id = choose_node(shard_object) + node, shard_id = choose_node( + shard_object, use_nidx=has_feature(const.Features.NIDX_READS, context={"kbid": kbid}) + ) except KeyError: raise HTTPException( status_code=500, diff --git a/nucliadb/src/nucliadb/search/requesters/utils.py b/nucliadb/src/nucliadb/search/requesters/utils.py index c29f226bf6..4df6406ecc 100644 --- a/nucliadb/src/nucliadb/search/requesters/utils.py +++ b/nucliadb/src/nucliadb/search/requesters/utils.py @@ -31,7 +31,6 @@ from nucliadb.common.cluster.base import AbstractIndexNode from nucliadb.common.cluster.exceptions import ShardsNotFound from nucliadb.common.cluster.utils import get_shard_manager -from nucliadb.common.nidx import get_nidx_fake_node from nucliadb.search import logger from nucliadb.search.search.shards import ( query_shard, @@ -124,16 +123,10 @@ async def node_query( try: node, shard_id = cluster_manager.choose_node( shard_obj, + use_nidx=has_feature(const.Features.NIDX_READS, context={"kbid": kbid}), use_read_replica_nodes=use_read_replica_nodes, target_shard_replicas=target_shard_replicas, ) - - # Query with nidx if installed - fake_node = get_nidx_fake_node() - - if fake_node: - node = fake_node - shard_id = shard_obj.shard except KeyError: incomplete_results = True else: diff --git a/nucliadb/src/nucliadb/train/nodes.py b/nucliadb/src/nucliadb/train/nodes.py index 0698dca163..51bf89bc37 100644 --- a/nucliadb/src/nucliadb/train/nodes.py +++ b/nucliadb/src/nucliadb/train/nodes.py @@ -55,7 +55,8 @@ async def get_reader(self, kbid: str, shard: str) -> tuple[AbstractIndexNode, st except StopIteration: raise KeyError("Shard not found") - node_obj, shard_id = manager.choose_node(shard_object) + # TODO: Id streams not yet implemented in nidx + node_obj, shard_id = manager.choose_node(shard_object, use_nidx=False) return node_obj, shard_id async def get_kb_obj(self, txn: Transaction, kbid: str) -> Optional[KnowledgeBox]: diff --git a/nucliadb/tests/nucliadb/integration/common/cluster/test_manager.py b/nucliadb/tests/nucliadb/integration/common/cluster/test_manager.py index dc67c26024..2576ba8ad4 100644 --- a/nucliadb/tests/nucliadb/integration/common/cluster/test_manager.py +++ b/nucliadb/tests/nucliadb/integration/common/cluster/test_manager.py @@ -143,7 +143,7 @@ async def test_choose_node_always_prefer_the_same_node(shards, shard_index: int, shard = shards.shards[shard_index] node_ids = set() for i in range(100): - node, _ = manager.choose_node(shard) + node, _ = manager.choose_node(shard, use_nidx=False) node_ids.add(node.id) assert len(node_ids) == 1 @@ -155,14 +155,14 @@ async def test_choose_node_attempts_target_replicas_but_is_not_imperative(shards r1 = shard.replicas[1].shard.id n1 = shard.replicas[1].node - node, replica_id = manager.choose_node(shard, target_shard_replicas=[r0]) + node, replica_id = manager.choose_node(shard, use_nidx=False, target_shard_replicas=[r0]) assert replica_id == r0 assert node.id == n0 # Change the node-0 to a non-existent node id in order to # test the target_shard_replicas logic is not imperative shard.replicas[0].node = "I-do-not-exist" - node, replica_id = manager.choose_node(shard, target_shard_replicas=[r0]) + node, replica_id = manager.choose_node(shard, use_nidx=False, target_shard_replicas=[r0]) assert replica_id == r1 assert node.id == n1 @@ -174,7 +174,7 @@ async def test_choose_node_raises_if_no_nodes(shards): shard.replicas[1].node = "bar" with pytest.raises(NoHealthyNodeAvailable): - manager.choose_node(shard) + manager.choose_node(shard, use_nidx=False) @pytest.mark.asyncio @@ -188,7 +188,7 @@ async def test_apply_for_all_shards(fake_kbid: str, shards, maindb_driver: Drive async def fun(node: AbstractIndexNode, shard_id: str): nodes.append((shard_id, node.id)) - await shard_manager.apply_for_all_shards(kbid, fun, timeout=10) + await shard_manager.apply_for_all_shards(kbid, fun, timeout=10, use_nidx=False) nodes.sort() assert len(nodes) == 2 diff --git a/nucliadb/tests/nucliadb/integration/test_reindex.py b/nucliadb/tests/nucliadb/integration/test_reindex.py index 6dad1d4dab..1ac213b59d 100644 --- a/nucliadb/tests/nucliadb/integration/test_reindex.py +++ b/nucliadb/tests/nucliadb/integration/test_reindex.py @@ -79,7 +79,9 @@ async def clean_shard(resources: list[str], node: AbstractIndexNode, shard_repli ) shard_manager = KBShardManager() - results = await shard_manager.apply_for_all_shards(kbid, partial(clean_shard, [rid]), timeout=5) + results = await shard_manager.apply_for_all_shards( + kbid, partial(clean_shard, [rid]), timeout=5, use_nidx=False + ) for result in results: assert not isinstance(result, Exception) @@ -133,7 +135,7 @@ async def test_reindex_vector_duplication( shard_manager = KBShardManager() shards = await shard_manager.get_shards_by_kbid(kbid) assert len(shards) == 1 - node, shard_replica_id = manager.choose_node(shards[0]) + node, shard_replica_id = manager.choose_node(shards[0], use_nidx=False) ids_before = {} async with datamanagers.with_ro_transaction() as txn: diff --git a/nucliadb/tests/nucliadb/integration/test_vectorsets.py b/nucliadb/tests/nucliadb/integration/test_vectorsets.py index 76b16413a0..27bd8aec56 100644 --- a/nucliadb/tests/nucliadb/integration/test_vectorsets.py +++ b/nucliadb/tests/nucliadb/integration/test_vectorsets.py @@ -31,6 +31,7 @@ from nucliadb.common.cluster import manager from nucliadb.common.cluster.base import AbstractIndexNode from nucliadb.common.maindb.driver import Driver +from nucliadb.common.nidx import NIDX_ENABLED from nucliadb.ingest.orm.knowledgebox import KnowledgeBox from nucliadb.search.predict import DummyPredictEngine from nucliadb.search.requesters import utils @@ -74,7 +75,7 @@ async def test_vectorsets_work_on_a_kb_with_a_single_vectorset( shards = await manager.KBShardManager().get_shards_by_kbid(kbid) logic_shard = shards[0] - node, shard_id = manager.choose_node(logic_shard) + node, shard_id = manager.choose_node(logic_shard, use_nidx=NIDX_ENABLED) test_cases = [ # If there is just one vectorset, it should be used by default when diff --git a/nucliadb/tests/nucliadb/unit/common/cluster/test_cluster.py b/nucliadb/tests/nucliadb/unit/common/cluster/test_cluster.py index 4fcc845b51..5f1f622e81 100644 --- a/nucliadb/tests/nucliadb/unit/common/cluster/test_cluster.py +++ b/nucliadb/tests/nucliadb/unit/common/cluster/test_cluster.py @@ -126,13 +126,15 @@ def test_choose_node_with_two_primary_nodes(): node, _ = manager.choose_node( writer_pb2.ShardObject( replicas=[writer_pb2.ShardReplica(shard=writer_pb2.ShardCreated(id="123"), node="node-0")] - ) + ), + use_nidx=False, ) assert node.id == "node-0" node, _ = manager.choose_node( writer_pb2.ShardObject( replicas=[writer_pb2.ShardReplica(shard=writer_pb2.ShardCreated(id="123"), node="node-1")] - ) + ), + use_nidx=False, ) assert node.id == "node-1" @@ -152,6 +154,7 @@ def test_choose_node_with_two_read_replicas(): writer_pb2.ShardObject( replicas=[writer_pb2.ShardReplica(shard=writer_pb2.ShardCreated(id="123"), node="node-0")] ), + use_nidx=False, use_read_replica_nodes=True, ) assert node.id == "node-replica-0" @@ -159,6 +162,7 @@ def test_choose_node_with_two_read_replicas(): writer_pb2.ShardObject( replicas=[writer_pb2.ShardReplica(shard=writer_pb2.ShardCreated(id="123"), node="node-1")] ), + use_nidx=False, use_read_replica_nodes=True, ) assert node.id == "node-replica-1" @@ -182,6 +186,7 @@ def test_choose_node_no_healthy_node_available(): writer_pb2.ShardReplica(shard=writer_pb2.ShardCreated(id="123"), node="node-1") ] ), + use_nidx=False, use_read_replica_nodes=True, ) @@ -195,7 +200,7 @@ def repeated_choose_node( node_ids = [] for _ in range(count): - node, shard_id = manager.choose_node(shard, **kwargs) + node, shard_id = manager.choose_node(shard, use_nidx=False, **kwargs) shard_ids.append(shard_id) node_ids.append(node.id) diff --git a/nucliadb/tests/search/fixtures.py b/nucliadb/tests/search/fixtures.py index 259d2a9cc0..53ee2a9d0c 100644 --- a/nucliadb/tests/search/fixtures.py +++ b/nucliadb/tests/search/fixtures.py @@ -18,7 +18,6 @@ # along with this program. If not, see . import asyncio -import os from enum import Enum from typing import AsyncIterable, Optional diff --git a/nucliadb/tests/search/node.py b/nucliadb/tests/search/node.py index 1665453cf7..234a2a42c1 100644 --- a/nucliadb/tests/search/node.py +++ b/nucliadb/tests/search/node.py @@ -36,6 +36,7 @@ from pytest_docker_fixtures.containers._base import BaseImage # type: ignore from nucliadb.common.cluster.settings import settings as cluster_settings +from nucliadb.common.nidx import NIDX_ENABLED from nucliadb_protos.nodewriter_pb2 import EmptyQuery, ShardId from nucliadb_protos.nodewriter_pb2_grpc import NodeWriterStub from nucliadb_utils.tests.fixtures import get_testing_storage_backend @@ -466,7 +467,7 @@ def _node(natsd: str, node_storage): @pytest.fixture(scope="session") async def _nidx(natsd, nidx_storage, pg): - if not os.environ.get("NIDX_ENABLED"): + if not NIDX_ENABLED: yield return diff --git a/nucliadb_utils/src/nucliadb_utils/const.py b/nucliadb_utils/src/nucliadb_utils/const.py index 0b488d0283..354d7e5dbe 100644 --- a/nucliadb_utils/src/nucliadb_utils/const.py +++ b/nucliadb_utils/src/nucliadb_utils/const.py @@ -79,3 +79,4 @@ class Features: NATS_SYNC_ACK = "nucliadb_nats_sync_ack" LOG_REQUEST_PAYLOADS = "nucliadb_log_request_payloads" IGNORE_EXTRACTED_IN_SEARCH = "nucliadb_ignore_extracted_in_search" + NIDX_READS = "nucliadb_nidx_reads" diff --git a/nucliadb_utils/src/nucliadb_utils/featureflagging.py b/nucliadb_utils/src/nucliadb_utils/featureflagging.py index 70b9d2733f..09c18a12fb 100644 --- a/nucliadb_utils/src/nucliadb_utils/featureflagging.py +++ b/nucliadb_utils/src/nucliadb_utils/featureflagging.py @@ -65,6 +65,10 @@ class Settings(pydantic_settings.BaseSettings): "rollout": 0, "variants": {"environment": ["local"]}, }, + const.Features.NIDX_READS: { + "rollout": 0, + "variants": {"environment": ["local"]}, + }, } From 187e93a3979e334fb127330a1763a5fa2680656f Mon Sep 17 00:00:00 2001 From: Javier Torres Date: Thu, 14 Nov 2024 16:03:55 +0100 Subject: [PATCH 3/4] nidx config for nucliadb shared --- charts/nucliadb_shared/templates/nucliadb.cm.yaml | 10 +++++++++- charts/nucliadb_shared/values.yaml | 5 +++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/charts/nucliadb_shared/templates/nucliadb.cm.yaml b/charts/nucliadb_shared/templates/nucliadb.cm.yaml index 590e15282b..1df3172b56 100644 --- a/charts/nucliadb_shared/templates/nucliadb.cm.yaml +++ b/charts/nucliadb_shared/templates/nucliadb.cm.yaml @@ -87,4 +87,12 @@ data: {{- if .Values.flag_settings_url }} FLAG_SETTINGS_URL: "{{ .Values.flag_settings_url }}" {{- end }} -{{- end }} \ No newline at end of file + +{{- with .Values.nidx }} + NIDX_ENABLED: "{{ .Values.flag_settings_url }}" + INDEX_NIDX_SUBJECT: {{ .nats_subject }} + NIDX_API_ADDRESS: {{ .api_address }} + NIDX_SEARCHER_ADDRES: {{ .searcher_address }} +{{- end }} + +{{- end }} diff --git a/charts/nucliadb_shared/values.yaml b/charts/nucliadb_shared/values.yaml index f2165b137f..4fefab6d1d 100644 --- a/charts/nucliadb_shared/values.yaml +++ b/charts/nucliadb_shared/values.yaml @@ -98,3 +98,8 @@ flag_settings_url: null encryption: secret_key: XX + +# nidx: +# api_address: +# searcher_address: +# nats_subject: From 1cc73814d0440983cbe357a941297d3c1cf6428b Mon Sep 17 00:00:00 2001 From: Javier Torres Date: Thu, 14 Nov 2024 22:27:05 +0100 Subject: [PATCH 4/4] Fix config map --- charts/nucliadb_shared/templates/nucliadb.cm.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/charts/nucliadb_shared/templates/nucliadb.cm.yaml b/charts/nucliadb_shared/templates/nucliadb.cm.yaml index 1df3172b56..c35e9c167f 100644 --- a/charts/nucliadb_shared/templates/nucliadb.cm.yaml +++ b/charts/nucliadb_shared/templates/nucliadb.cm.yaml @@ -89,10 +89,10 @@ data: {{- end }} {{- with .Values.nidx }} - NIDX_ENABLED: "{{ .Values.flag_settings_url }}" - INDEX_NIDX_SUBJECT: {{ .nats_subject }} - NIDX_API_ADDRESS: {{ .api_address }} - NIDX_SEARCHER_ADDRES: {{ .searcher_address }} + NIDX_ENABLED: "1" + INDEX_NIDX_SUBJECT: "{{ .nats_subject }}" + NIDX_API_ADDRESS: "{{ .api_address }}" + NIDX_SEARCHER_ADDRES: "{{ .searcher_address }}" {{- end }} {{- end }}