From 0ac68fc2e2cfb05c22bd3076da5c65a00f8d5af8 Mon Sep 17 00:00:00 2001 From: Mike Baamonde Date: Tue, 21 Jun 2022 10:31:14 -0400 Subject: [PATCH] Create an ES client per simulated client instead of per worker. (#1516) Previously, Rally would instantiate an ES client per worker and scale its connection pool to match the number of simulated clients associated with that worker. This worker-level ES client would be shared across all of the simulated clients spawned by the worker. This meant that any simulated client and its siblings would have to use exactly the same ES transport options, as they all shared a single ES client object. There are use cases, however, where sibling clients may each need to set unique transport options (e.g. for authentication). This commit facilitates support for these lower-level overrides by creating a distinct ES client per simulated client, not per worker. --- esrally/driver/driver.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index 255a8014c..f3f94c928 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -1662,14 +1662,10 @@ def es_clients(all_hosts, all_client_options): es[cluster_name] = client.EsClientFactory(cluster_hosts, all_client_options[cluster_name]).create_async() return es - # Properly size the internal connection pool to match the number of expected clients but allow the user - # to override it if needed. - client_count = len(self.task_allocations) - es = es_clients(self.cfg.opts("client", "hosts").all_hosts, self.cfg.opts("client", "options").with_max_connections(client_count)) - self.logger.info("Task assertions enabled: %s", str(self.assertions_enabled)) runner.enable_assertions(self.assertions_enabled) + clients = [] aws = [] # A parameter source should only be created once per task - it is partitioned later on per client. params_per_task = {} @@ -1679,6 +1675,8 @@ def es_clients(all_hosts, all_client_options): param_source = track.operation_parameters(self.track, task) params_per_task[task] = param_source schedule = schedule_for(task_allocation, params_per_task[task]) + es = es_clients(self.cfg.opts("client", "hosts").all_hosts, self.cfg.opts("client", "options")) + clients.append(es) async_executor = AsyncExecutor( client_id, task, schedule, es, self.sampler, self.cancel, self.complete, task.error_behavior(self.abort_on_error) ) @@ -1693,8 +1691,9 @@ def es_clients(all_hosts, all_client_options): await asyncio.get_event_loop().shutdown_asyncgens() shutdown_asyncgens_end = time.perf_counter() self.logger.info("Total time to shutdown asyncgens: %f seconds.", (shutdown_asyncgens_end - run_end)) - for e in es.values(): - await e.transport.close() + for c in clients: + for es in c.values(): + await es.close() transport_close_end = time.perf_counter() self.logger.info("Total time to close transports: %f seconds.", (shutdown_asyncgens_end - transport_close_end))