Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add keepalive-expiry #32

Merged
merged 2 commits into from
Feb 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions httpcore/_async/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(
self.is_http11 = False
self.is_http2 = False
self.connect_failed = False
self.expires_at: Optional[float] = None
self.backend = AutoBackend()

@property
Expand Down
51 changes: 46 additions & 5 deletions httpcore/_async/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,29 @@ class AsyncConnectionPool(AsyncHTTPTransport):
**Parameters:**

* **ssl_context** - `Optional[SSLContext]` - An SSL context to use for verifying connections.
* **max_connections** - `Optional[int]` - The maximum number of concurrent connections to allow.
* **max_keepalive** - `Optional[int]` - The maximum number of connections to allow before closing keep-alive connections.
* **keepalive_expiry** - `Optional[float]` - The maximum time to allow before closing a keep-alive connection.
* **http2** - `bool` - Enable HTTP/2 support.
"""

def __init__(
self,
ssl_context: SSLContext = None,
max_connections: int = None,
max_keepalive: int = None,
keepalive_expiry: float = None,
http2: bool = False,
):
self.ssl_context = SSLContext() if ssl_context is None else ssl_context
self.max_connections = max_connections
self.max_keepalive = max_keepalive
self.keepalive_expiry = keepalive_expiry
self.http2 = http2
self.connections: Dict[Origin, Set[AsyncHTTPConnection]] = {}
self.thread_lock = ThreadLock()
self.backend = AutoBackend()
self.next_keepalive_check = 0.0

@property
def connection_semaphore(self) -> AsyncSemaphore:
Expand All @@ -111,6 +118,9 @@ async def request(
timeout = {} if timeout is None else timeout
origin = url[:3]

if self.keepalive_expiry is not None:
await self._keepalive_sweep()

connection: Optional[AsyncHTTPConnection] = None
while connection is None:
connection = await self._get_connection_from_pool(origin)
Expand Down Expand Up @@ -191,6 +201,7 @@ async def _get_connection_from_pool(
# Mark the connection as READY before we return it, to indicate
# that if it is HTTP/1.1 then it should not be re-acquired.
reuse_connection.mark_as_ready()
reuse_connection.expires_at = None
elif self.http2 and pending_connection is not None and not seen_http11:
# If we have a PENDING connection, and no HTTP/1.1 connections
# on this origin, then we can attempt to share the connection.
Expand All @@ -209,16 +220,19 @@ async def _response_closed(self, connection: AsyncHTTPConnection):
async with self.thread_lock:
if connection.state == ConnectionState.CLOSED:
remove_from_pool = True
elif (
connection.state == ConnectionState.IDLE
and self.max_keepalive is not None
):
elif connection.state == ConnectionState.IDLE:
num_connections = sum(
[len(conns) for conns in self.connections.values()]
)
if num_connections > self.max_keepalive:
if (
self.max_keepalive is not None
and num_connections > self.max_keepalive
):
remove_from_pool = True
close_connection = True
elif self.keepalive_expiry is not None:
now = self.backend.time()
connection.expires_at = now + self.keepalive_expiry

if remove_from_pool:
if connection in self.connections.get(connection.origin, set()):
Expand All @@ -230,6 +244,33 @@ async def _response_closed(self, connection: AsyncHTTPConnection):
if close_connection:
await connection.close()

async def _keepalive_sweep(self):
assert self.keepalive_expiry is not None

now = self.backend.time()
if now < self.next_keepalive_check:
return

self.next_keepalive_check = now + 1.0
connections_to_close = set()

async with self.thread_lock:
for connection_set in list(self.connections.values()):
for connection in list(connection_set):
if (
connection.state == ConnectionState.IDLE
and connection.expires_at is not None
and now > connection.expires_at
):
connections_to_close.add(connection)
self.connection_semaphore.release()
self.connections[connection.origin].remove(connection)
if not self.connections[connection.origin]:
del self.connections[connection.origin]

for connection in connections_to_close:
await connection.close()

async def close(self) -> None:
connections_to_close = set()

Expand Down
8 changes: 7 additions & 1 deletion httpcore/_async/http_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ class AsyncHTTPProxy(AsyncConnectionPool):

* **proxy_origin** - `Tuple[bytes, bytes, int]` - The address of the proxy service as a 3-tuple of (scheme, host, port).
* **proxy_headers** - `Optional[List[Tuple[bytes, bytes]]]` - A list of proxy headers to include.
* **proxy_mode** - `Optional[str]` - A proxy mode to operate in. May be "DEFAULT", "FORWARD_ONLY", or "TUNNEL_ONLY".
* **proxy_mode** - `str` - A proxy mode to operate in. May be "DEFAULT", "FORWARD_ONLY", or "TUNNEL_ONLY".
* **ssl_context** - `Optional[SSLContext]` - An SSL context to use for verifying connections.
* **max_connections** - `Optional[int]` - The maximum number of concurrent connections to allow.
* **max_keepalive** - `Optional[int]` - The maximum number of connections to allow before closing keep-alive connections.
* **http2** - `bool` - Enable HTTP/2 support.
"""

def __init__(
Expand All @@ -53,6 +56,9 @@ async def request(
stream: AsyncByteStream = None,
timeout: TimeoutDict = None,
) -> Tuple[bytes, int, bytes, Headers, AsyncByteStream]:
if self.keepalive_expiry is not None:
await self._keepalive_sweep()

if (
self.proxy_mode == "DEFAULT" and url[0] == b"http"
) or self.proxy_mode == "FORWARD_ONLY":
Expand Down
4 changes: 4 additions & 0 deletions httpcore/_backends/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,7 @@ def create_lock(self) -> AsyncLock:

def create_semaphore(self, max_value: int, exc_class: type) -> AsyncSemaphore:
return Semaphore(max_value, exc_class=exc_class)

def time(self) -> float:
loop = asyncio.get_event_loop()
return loop.time()
3 changes: 3 additions & 0 deletions httpcore/_backends/auto.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ def create_lock(self) -> AsyncLock:

def create_semaphore(self, max_value: int, exc_class: type) -> AsyncSemaphore:
return self.backend.create_semaphore(max_value, exc_class=exc_class)

def time(self) -> float:
return self.backend.time()
3 changes: 3 additions & 0 deletions httpcore/_backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,6 @@ def create_lock(self) -> AsyncLock:

def create_semaphore(self, max_value: int, exc_class: type) -> AsyncSemaphore:
raise NotImplementedError() # pragma: no cover

def time(self) -> float:
raise NotImplementedError() # pragma: no cover
4 changes: 4 additions & 0 deletions httpcore/_backends/sync.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import select
import socket
import threading
import time
from ssl import SSLContext
from types import TracebackType
from typing import Dict, Optional, Type
Expand Down Expand Up @@ -145,3 +146,6 @@ def create_lock(self) -> SyncLock:

def create_semaphore(self, max_value: int, exc_class: type) -> SyncSemaphore:
return SyncSemaphore(max_value, exc_class=exc_class)

def time(self) -> float:
return time.monotonic()
3 changes: 3 additions & 0 deletions httpcore/_backends/trio.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,6 @@ def create_lock(self) -> AsyncLock:

def create_semaphore(self, max_value: int, exc_class: type) -> AsyncSemaphore:
return Semaphore(max_value, exc_class=exc_class)

def time(self) -> float:
return trio.current_time()
1 change: 1 addition & 0 deletions httpcore/_sync/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(
self.is_http11 = False
self.is_http2 = False
self.connect_failed = False
self.expires_at: Optional[float] = None
self.backend = SyncBackend()

@property
Expand Down
51 changes: 46 additions & 5 deletions httpcore/_sync/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,29 @@ class SyncConnectionPool(SyncHTTPTransport):
**Parameters:**

* **ssl_context** - `Optional[SSLContext]` - An SSL context to use for verifying connections.
* **max_connections** - `Optional[int]` - The maximum number of concurrent connections to allow.
* **max_keepalive** - `Optional[int]` - The maximum number of connections to allow before closing keep-alive connections.
* **keepalive_expiry** - `Optional[float]` - The maximum time to allow before closing a keep-alive connection.
* **http2** - `bool` - Enable HTTP/2 support.
"""

def __init__(
self,
ssl_context: SSLContext = None,
max_connections: int = None,
max_keepalive: int = None,
keepalive_expiry: float = None,
http2: bool = False,
):
self.ssl_context = SSLContext() if ssl_context is None else ssl_context
self.max_connections = max_connections
self.max_keepalive = max_keepalive
self.keepalive_expiry = keepalive_expiry
self.http2 = http2
self.connections: Dict[Origin, Set[SyncHTTPConnection]] = {}
self.thread_lock = ThreadLock()
self.backend = SyncBackend()
self.next_keepalive_check = 0.0

@property
def connection_semaphore(self) -> SyncSemaphore:
Expand All @@ -111,6 +118,9 @@ def request(
timeout = {} if timeout is None else timeout
origin = url[:3]

if self.keepalive_expiry is not None:
self._keepalive_sweep()

connection: Optional[SyncHTTPConnection] = None
while connection is None:
connection = self._get_connection_from_pool(origin)
Expand Down Expand Up @@ -191,6 +201,7 @@ def _get_connection_from_pool(
# Mark the connection as READY before we return it, to indicate
# that if it is HTTP/1.1 then it should not be re-acquired.
reuse_connection.mark_as_ready()
reuse_connection.expires_at = None
elif self.http2 and pending_connection is not None and not seen_http11:
# If we have a PENDING connection, and no HTTP/1.1 connections
# on this origin, then we can attempt to share the connection.
Expand All @@ -209,16 +220,19 @@ def _response_closed(self, connection: SyncHTTPConnection):
with self.thread_lock:
if connection.state == ConnectionState.CLOSED:
remove_from_pool = True
elif (
connection.state == ConnectionState.IDLE
and self.max_keepalive is not None
):
elif connection.state == ConnectionState.IDLE:
num_connections = sum(
[len(conns) for conns in self.connections.values()]
)
if num_connections > self.max_keepalive:
if (
self.max_keepalive is not None
and num_connections > self.max_keepalive
):
remove_from_pool = True
close_connection = True
elif self.keepalive_expiry is not None:
now = self.backend.time()
connection.expires_at = now + self.keepalive_expiry

if remove_from_pool:
if connection in self.connections.get(connection.origin, set()):
Expand All @@ -230,6 +244,33 @@ def _response_closed(self, connection: SyncHTTPConnection):
if close_connection:
connection.close()

def _keepalive_sweep(self):
assert self.keepalive_expiry is not None

now = self.backend.time()
if now < self.next_keepalive_check:
return

self.next_keepalive_check = now + 1.0
connections_to_close = set()

with self.thread_lock:
for connection_set in list(self.connections.values()):
for connection in list(connection_set):
if (
connection.state == ConnectionState.IDLE
and connection.expires_at is not None
and now > connection.expires_at
):
connections_to_close.add(connection)
self.connection_semaphore.release()
self.connections[connection.origin].remove(connection)
if not self.connections[connection.origin]:
del self.connections[connection.origin]

for connection in connections_to_close:
connection.close()

def close(self) -> None:
connections_to_close = set()

Expand Down
8 changes: 7 additions & 1 deletion httpcore/_sync/http_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ class SyncHTTPProxy(SyncConnectionPool):

* **proxy_origin** - `Tuple[bytes, bytes, int]` - The address of the proxy service as a 3-tuple of (scheme, host, port).
* **proxy_headers** - `Optional[List[Tuple[bytes, bytes]]]` - A list of proxy headers to include.
* **proxy_mode** - `Optional[str]` - A proxy mode to operate in. May be "DEFAULT", "FORWARD_ONLY", or "TUNNEL_ONLY".
* **proxy_mode** - `str` - A proxy mode to operate in. May be "DEFAULT", "FORWARD_ONLY", or "TUNNEL_ONLY".
* **ssl_context** - `Optional[SSLContext]` - An SSL context to use for verifying connections.
* **max_connections** - `Optional[int]` - The maximum number of concurrent connections to allow.
* **max_keepalive** - `Optional[int]` - The maximum number of connections to allow before closing keep-alive connections.
* **http2** - `bool` - Enable HTTP/2 support.
"""

def __init__(
Expand All @@ -53,6 +56,9 @@ def request(
stream: SyncByteStream = None,
timeout: TimeoutDict = None,
) -> Tuple[bytes, int, bytes, Headers, SyncByteStream]:
if self.keepalive_expiry is not None:
self._keepalive_sweep()

if (
self.proxy_mode == "DEFAULT" and url[0] == b"http"
) or self.proxy_mode == "FORWARD_ONLY":
Expand Down