Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for create search pipeline #363

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions osbenchmark/worker_coordinator/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def register_default_runners():
register_runner(workload.OperationType.StartTransform, Retry(StartTransform()), async_runner=True)
register_runner(workload.OperationType.WaitForTransform, Retry(WaitForTransform()), async_runner=True)
register_runner(workload.OperationType.DeleteTransform, Retry(DeleteTransform()), async_runner=True)
register_runner(workload.OperationType.CreateSearchPipeline, Retry(CreateSearchPipeline()), async_runner=True)


def runner_for(operation_type):
Expand Down Expand Up @@ -1102,6 +1103,14 @@ async def __call__(self, opensearch, params):
def __repr__(self, *args, **kwargs):
return "put-pipeline"

# TODO: refactor it after python client support search pipeline https://github.com/opensearch-project/opensearch-py/issues/474
class CreateSearchPipeline(Runner):
async def __call__(self, opensearch, params):
endpoint = "/_search/pipeline/" + mandatory(params, "id", self)
await opensearch.transport.perform_request(method="PUT", url=endpoint, body=mandatory(params, "body", self))

def __repr__(self, *args, **kwargs):
return "create-search-pipeline"

class Refresh(Runner):
async def __call__(self, opensearch, params):
Expand Down
3 changes: 3 additions & 0 deletions osbenchmark/workload/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,7 @@ class OperationType(Enum):
DeleteComposableTemplate = 1031
CreateComponentTemplate = 1032
DeleteComponentTemplate = 1033
CreateSearchPipeline = 1040

@property
def admin_op(self):
Expand Down Expand Up @@ -710,6 +711,8 @@ def from_hyphenated_string(cls, v):
return OperationType.DeletePointInTime
elif v == "list-all-point-in-time":
return OperationType.ListAllPointInTime
elif v == "create-search-pipeline":
return OperationType.CreateSearchPipeline
else:
raise KeyError(f"No enum value for [{v}]")

Expand Down
135 changes: 135 additions & 0 deletions tests/worker_coordinator/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2147,6 +2147,65 @@ async def test_scroll_query_request_all_pages(self, opensearch):

opensearch.clear_scroll.assert_called_once_with(body={"scroll_id": ["some-scroll-id"]})

@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_search_pipeline_using_request_params(self, opensearch):
response = {
"timed_out": False,
"took": 62,
"hits": {
"total": {
"value": 2,
"relation": "eq"
},
"hits": [
{
"title": "some-doc-1"
},
{
"title": "some-doc-2"
}

]
}
}
opensearch.transport.perform_request.return_value = as_future(io.StringIO(json.dumps(response)))

query_runner = runner.Query()
params = {
"index": "_all",
"cache": False,
"detailed-results": True,
"body": None,
"request-params": {
"q": "user:kimchy",
"search-pipeline": "test-search-pipeline"
}
}

async with query_runner:
result = await query_runner(opensearch, params)

self.assertEqual(1, result["weight"])
self.assertEqual("ops", result["unit"])
self.assertEqual(2, result["hits"])
self.assertEqual("eq", result["hits_relation"])
self.assertFalse(result["timed_out"])
self.assertEqual(62, result["took"])
self.assertFalse("error-type" in result)

opensearch.transport.perform_request.assert_called_once_with(
"GET",
"/_all/_search",
params={
"request_cache": "false",
"q": "user:kimchy",
'search-pipeline': 'test-search-pipeline'
},
body=params["body"],
headers=None
)
opensearch.clear_scroll.assert_not_called()

class PutPipelineRunnerTests(TestCase):
@mock.patch("opensearchpy.OpenSearch")
Expand Down Expand Up @@ -5564,3 +5623,79 @@ def test_prefix_doesnt_exit(self):
suffix = runner.remove_prefix(index_name, "unrelatedprefix")

self.assertEqual(suffix, index_name)


class CreateSearchPipelineRunnerTests(TestCase):
@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_create_search_pipeline(self, opensearch):
opensearch.transport.perform_request.return_value = as_future()

r = runner.CreateSearchPipeline()

params = {
"id": "test_pipeline",
"body": {
"request_processors": [
{
"filter_query": {
"query": {
"match": {
"foo": "bar"
}
}
}
}
],
"response_processors": [
{
"rename_field": {
"field": "foo",
"target_field": "bar"
}
}
]
}
}

await r(opensearch, params)

opensearch.transport.perform_request.assert_called_once_with(method='PUT',
url='/_search/pipeline/test_pipeline',
body=params["body"])

@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_param_body_mandatory(self, opensearch):
opensearch.transport.perform_request.return_value = as_future()

r = runner.CreateSearchPipeline()

params = {
"id": "test_pipeline",
}
with self.assertRaisesRegex(
exceptions.DataError,
"Parameter source for operation 'create-search-pipeline' did not provide the mandatory parameter 'body'. "
"Add it to your parameter source and try again."):
await r(opensearch, params)

self.assertEqual(0, opensearch.transport.perform_request.call_count)

@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_param_id_mandatory(self, opensearch):
opensearch.transport.perform_request.return_value = as_future()

r = runner.CreateSearchPipeline()

params = {
"body": {}
}
with self.assertRaisesRegex(
exceptions.DataError,
"Parameter source for operation 'create-search-pipeline' did not provide the mandatory parameter 'id'. "
"Add it to your parameter source and try again."):
await r(opensearch, params)

self.assertEqual(0, opensearch.transport.perform_request.call_count)
Loading