Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade ES client to 8.2.0 #1510

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 19 additions & 46 deletions esrally/async_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import List, Optional

import aiohttp
import elasticsearch
import elastic_transport
from aiohttp import BaseConnector, RequestInfo
from aiohttp.client_proto import ResponseHandler
from aiohttp.helpers import BaseTimerContext
Expand Down Expand Up @@ -168,47 +168,19 @@ def response(self, path):
return body


class AIOHttpConnection(elasticsearch.AIOHttpConnection):
def __init__(
self,
host="localhost",
port=None,
http_auth=None,
use_ssl=False,
ssl_assert_fingerprint=None,
headers=None,
ssl_context=None,
http_compress=None,
cloud_id=None,
api_key=None,
opaque_id=None,
loop=None,
trace_config=None,
**kwargs,
):
super().__init__(
host=host,
port=port,
http_auth=http_auth,
use_ssl=use_ssl,
ssl_assert_fingerprint=ssl_assert_fingerprint,
# provided to the base class via `maxsize` to keep base class state consistent despite Rally
# calling the attribute differently.
maxsize=kwargs.get("max_connections", 0),
headers=headers,
ssl_context=ssl_context,
http_compress=http_compress,
cloud_id=cloud_id,
api_key=api_key,
opaque_id=opaque_id,
loop=loop,
**kwargs,
)
class RallyAiohttpHttpNode(elastic_transport.AiohttpHttpNode):
def __init__(self, config):
super().__init__(config)

self._loop = asyncio.get_running_loop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please assign self._loop to None here and only call asyncio.get_running_loop in _create_aiohttp_session like elastic-transport does?

https://sethmlarson.dev/blog/flask-async-views-and-async-globals explains why it's important to wait before calling this function.

self._limit = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should not be setting self._limit as it's internal to the Python client. In fact elastic-transport switched:

  • from setting aiohttp's TCPConnector limit to setting limit_per_host and
  • from self._limit to self._connection_per_node.

So the code below should read limit_per_host=self._connections_per_node instead of limit=self._limit


self._trace_configs = [trace_config] if trace_config else None
self._enable_cleanup_closed = kwargs.get("enable_cleanup_closed", True)
client_options = config._extras.get("_rally_client_options")
if client_options:
self._trace_configs = client_options.get("trace_config")
self._enable_cleanup_closed = client_options.get("enable_cleanup_closed")

static_responses = kwargs.get("static_responses")
static_responses = client_options.get("static_responses")
self.use_static_responses = static_responses is not None

if self.use_static_responses:
Expand All @@ -223,21 +195,22 @@ def __init__(
self._request_class = aiohttp.ClientRequest
self._response_class = RawClientResponse

async def _create_aiohttp_session(self):
if self.loop is None:
self.loop = asyncio.get_running_loop()

def _create_aiohttp_session(self):
if self.use_static_responses:
connector = StaticConnector(limit=self._limit, enable_cleanup_closed=self._enable_cleanup_closed)
else:
connector = aiohttp.TCPConnector(
limit=self._limit, use_dns_cache=True, ssl_context=self._ssl_context, enable_cleanup_closed=self._enable_cleanup_closed
limit=self._limit,
use_dns_cache=True,
# May need to be just ssl=self._ssl_context
ssl_context=self._ssl_context,
Comment on lines +205 to +206
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please do that. Using ssl= is the preferred way since 3.0 (aio-libs/aiohttp#2626) and ssl_context= goes away in the next version, 4.0 (aio-libs/aiohttp#3548).

enable_cleanup_closed=self._enable_cleanup_closed,
)

self.session = aiohttp.ClientSession(
headers=self.headers,
auto_decompress=True,
loop=self.loop,
loop=self._loop,
cookie_jar=aiohttp.DummyCookieJar(),
request_class=self._request_class,
response_class=self._response_class,
Expand Down
Loading