diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 5baff2bf4..92ba65d9a 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -94,6 +94,7 @@ def register_default_runners(): register_runner(workload.OperationType.StartTransform, Retry(StartTransform()), async_runner=True) register_runner(workload.OperationType.WaitForTransform, Retry(WaitForTransform()), async_runner=True) register_runner(workload.OperationType.DeleteTransform, Retry(DeleteTransform()), async_runner=True) + register_runner(workload.OperationType.CreateSearchPipeline, Retry(CreateSearchPipeline()), async_runner=True) def runner_for(operation_type): @@ -1102,6 +1103,14 @@ async def __call__(self, opensearch, params): def __repr__(self, *args, **kwargs): return "put-pipeline" +# TODO: refactor it after python client support search pipeline https://github.com/opensearch-project/opensearch-py/issues/474 +class CreateSearchPipeline(Runner): + async def __call__(self, opensearch, params): + endpoint = "/_search/pipeline/" + mandatory(params, "id", self) + await opensearch.transport.perform_request(method="PUT", url=endpoint, body=mandatory(params, "body", self)) + + def __repr__(self, *args, **kwargs): + return "create-search-pipeline" class Refresh(Runner): async def __call__(self, opensearch, params): diff --git a/osbenchmark/workload/workload.py b/osbenchmark/workload/workload.py index 4a24d8291..4a1e20cf8 100644 --- a/osbenchmark/workload/workload.py +++ b/osbenchmark/workload/workload.py @@ -611,6 +611,7 @@ class OperationType(Enum): DeleteComposableTemplate = 1031 CreateComponentTemplate = 1032 DeleteComponentTemplate = 1033 + CreateSearchPipeline = 1040 @property def admin_op(self): @@ -710,6 +711,8 @@ def from_hyphenated_string(cls, v): return OperationType.DeletePointInTime elif v == "list-all-point-in-time": return OperationType.ListAllPointInTime + elif v == "create-search-pipeline": + return OperationType.CreateSearchPipeline else: raise KeyError(f"No enum value for [{v}]") diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index 0e8e6c8f6..7b6030aec 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -2147,6 +2147,65 @@ async def test_scroll_query_request_all_pages(self, opensearch): opensearch.clear_scroll.assert_called_once_with(body={"scroll_id": ["some-scroll-id"]}) + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_search_pipeline_using_request_params(self, opensearch): + response = { + "timed_out": False, + "took": 62, + "hits": { + "total": { + "value": 2, + "relation": "eq" + }, + "hits": [ + { + "title": "some-doc-1" + }, + { + "title": "some-doc-2" + } + + ] + } + } + opensearch.transport.perform_request.return_value = as_future(io.StringIO(json.dumps(response))) + + query_runner = runner.Query() + params = { + "index": "_all", + "cache": False, + "detailed-results": True, + "body": None, + "request-params": { + "q": "user:kimchy", + "search-pipeline": "test-search-pipeline" + } + } + + async with query_runner: + result = await query_runner(opensearch, params) + + self.assertEqual(1, result["weight"]) + self.assertEqual("ops", result["unit"]) + self.assertEqual(2, result["hits"]) + self.assertEqual("eq", result["hits_relation"]) + self.assertFalse(result["timed_out"]) + self.assertEqual(62, result["took"]) + self.assertFalse("error-type" in result) + + opensearch.transport.perform_request.assert_called_once_with( + "GET", + "/_all/_search", + params={ + "request_cache": "false", + "q": "user:kimchy", + 'search-pipeline': 'test-search-pipeline' + }, + body=params["body"], + headers=None + ) + opensearch.clear_scroll.assert_not_called() class PutPipelineRunnerTests(TestCase): @mock.patch("opensearchpy.OpenSearch") @@ -5564,3 +5623,79 @@ def test_prefix_doesnt_exit(self): suffix = runner.remove_prefix(index_name, "unrelatedprefix") self.assertEqual(suffix, index_name) + + +class CreateSearchPipelineRunnerTests(TestCase): + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_create_search_pipeline(self, opensearch): + opensearch.transport.perform_request.return_value = as_future() + + r = runner.CreateSearchPipeline() + + params = { + "id": "test_pipeline", + "body": { + "request_processors": [ + { + "filter_query": { + "query": { + "match": { + "foo": "bar" + } + } + } + } + ], + "response_processors": [ + { + "rename_field": { + "field": "foo", + "target_field": "bar" + } + } + ] + } + } + + await r(opensearch, params) + + opensearch.transport.perform_request.assert_called_once_with(method='PUT', + url='/_search/pipeline/test_pipeline', + body=params["body"]) + + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_param_body_mandatory(self, opensearch): + opensearch.transport.perform_request.return_value = as_future() + + r = runner.CreateSearchPipeline() + + params = { + "id": "test_pipeline", + } + with self.assertRaisesRegex( + exceptions.DataError, + "Parameter source for operation 'create-search-pipeline' did not provide the mandatory parameter 'body'. " + "Add it to your parameter source and try again."): + await r(opensearch, params) + + self.assertEqual(0, opensearch.transport.perform_request.call_count) + + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_param_id_mandatory(self, opensearch): + opensearch.transport.perform_request.return_value = as_future() + + r = runner.CreateSearchPipeline() + + params = { + "body": {} + } + with self.assertRaisesRegex( + exceptions.DataError, + "Parameter source for operation 'create-search-pipeline' did not provide the mandatory parameter 'id'. " + "Add it to your parameter source and try again."): + await r(opensearch, params) + + self.assertEqual(0, opensearch.transport.perform_request.call_count)