Skip to content

Commit

Permalink
Add keepalive-expiry (#32)
Browse files Browse the repository at this point in the history
* Add keepalive-expiry

* Unasync
  • Loading branch information
tomchristie authored Feb 26, 2020
1 parent 6325cb9 commit 9a10c68
Show file tree
Hide file tree
Showing 11 changed files with 125 additions and 12 deletions.
1 change: 1 addition & 0 deletions httpcore/_async/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(
self.is_http11 = False
self.is_http2 = False
self.connect_failed = False
self.expires_at: Optional[float] = None
self.backend = AutoBackend()

@property
Expand Down
51 changes: 46 additions & 5 deletions httpcore/_async/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,29 @@ class AsyncConnectionPool(AsyncHTTPTransport):
**Parameters:**
* **ssl_context** - `Optional[SSLContext]` - An SSL context to use for verifying connections.
* **max_connections** - `Optional[int]` - The maximum number of concurrent connections to allow.
* **max_keepalive** - `Optional[int]` - The maximum number of connections to allow before closing keep-alive connections.
* **keepalive_expiry** - `Optional[float]` - The maximum time to allow before closing a keep-alive connection.
* **http2** - `bool` - Enable HTTP/2 support.
"""

def __init__(
self,
ssl_context: SSLContext = None,
max_connections: int = None,
max_keepalive: int = None,
keepalive_expiry: float = None,
http2: bool = False,
):
self.ssl_context = SSLContext() if ssl_context is None else ssl_context
self.max_connections = max_connections
self.max_keepalive = max_keepalive
self.keepalive_expiry = keepalive_expiry
self.http2 = http2
self.connections: Dict[Origin, Set[AsyncHTTPConnection]] = {}
self.thread_lock = ThreadLock()
self.backend = AutoBackend()
self.next_keepalive_check = 0.0

@property
def connection_semaphore(self) -> AsyncSemaphore:
Expand All @@ -111,6 +118,9 @@ async def request(
timeout = {} if timeout is None else timeout
origin = url[:3]

if self.keepalive_expiry is not None:
await self._keepalive_sweep()

connection: Optional[AsyncHTTPConnection] = None
while connection is None:
connection = await self._get_connection_from_pool(origin)
Expand Down Expand Up @@ -191,6 +201,7 @@ async def _get_connection_from_pool(
# Mark the connection as READY before we return it, to indicate
# that if it is HTTP/1.1 then it should not be re-acquired.
reuse_connection.mark_as_ready()
reuse_connection.expires_at = None
elif self.http2 and pending_connection is not None and not seen_http11:
# If we have a PENDING connection, and no HTTP/1.1 connections
# on this origin, then we can attempt to share the connection.
Expand All @@ -209,16 +220,19 @@ async def _response_closed(self, connection: AsyncHTTPConnection):
async with self.thread_lock:
if connection.state == ConnectionState.CLOSED:
remove_from_pool = True
elif (
connection.state == ConnectionState.IDLE
and self.max_keepalive is not None
):
elif connection.state == ConnectionState.IDLE:
num_connections = sum(
[len(conns) for conns in self.connections.values()]
)
if num_connections > self.max_keepalive:
if (
self.max_keepalive is not None
and num_connections > self.max_keepalive
):
remove_from_pool = True
close_connection = True
elif self.keepalive_expiry is not None:
now = self.backend.time()
connection.expires_at = now + self.keepalive_expiry

if remove_from_pool:
if connection in self.connections.get(connection.origin, set()):
Expand All @@ -230,6 +244,33 @@ async def _response_closed(self, connection: AsyncHTTPConnection):
if close_connection:
await connection.close()

async def _keepalive_sweep(self):
assert self.keepalive_expiry is not None

now = self.backend.time()
if now < self.next_keepalive_check:
return

self.next_keepalive_check = now + 1.0
connections_to_close = set()

async with self.thread_lock:
for connection_set in list(self.connections.values()):
for connection in list(connection_set):
if (
connection.state == ConnectionState.IDLE
and connection.expires_at is not None
and now > connection.expires_at
):
connections_to_close.add(connection)
self.connection_semaphore.release()
self.connections[connection.origin].remove(connection)
if not self.connections[connection.origin]:
del self.connections[connection.origin]

for connection in connections_to_close:
await connection.close()

async def close(self) -> None:
connections_to_close = set()

Expand Down
8 changes: 7 additions & 1 deletion httpcore/_async/http_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ class AsyncHTTPProxy(AsyncConnectionPool):
* **proxy_origin** - `Tuple[bytes, bytes, int]` - The address of the proxy service as a 3-tuple of (scheme, host, port).
* **proxy_headers** - `Optional[List[Tuple[bytes, bytes]]]` - A list of proxy headers to include.
* **proxy_mode** - `Optional[str]` - A proxy mode to operate in. May be "DEFAULT", "FORWARD_ONLY", or "TUNNEL_ONLY".
* **proxy_mode** - `str` - A proxy mode to operate in. May be "DEFAULT", "FORWARD_ONLY", or "TUNNEL_ONLY".
* **ssl_context** - `Optional[SSLContext]` - An SSL context to use for verifying connections.
* **max_connections** - `Optional[int]` - The maximum number of concurrent connections to allow.
* **max_keepalive** - `Optional[int]` - The maximum number of connections to allow before closing keep-alive connections.
* **http2** - `bool` - Enable HTTP/2 support.
"""

def __init__(
Expand All @@ -53,6 +56,9 @@ async def request(
stream: AsyncByteStream = None,
timeout: TimeoutDict = None,
) -> Tuple[bytes, int, bytes, Headers, AsyncByteStream]:
if self.keepalive_expiry is not None:
await self._keepalive_sweep()

if (
self.proxy_mode == "DEFAULT" and url[0] == b"http"
) or self.proxy_mode == "FORWARD_ONLY":
Expand Down
4 changes: 4 additions & 0 deletions httpcore/_backends/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,7 @@ def create_lock(self) -> AsyncLock:

def create_semaphore(self, max_value: int, exc_class: type) -> AsyncSemaphore:
return Semaphore(max_value, exc_class=exc_class)

def time(self) -> float:
loop = asyncio.get_event_loop()
return loop.time()
3 changes: 3 additions & 0 deletions httpcore/_backends/auto.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ def create_lock(self) -> AsyncLock:

def create_semaphore(self, max_value: int, exc_class: type) -> AsyncSemaphore:
return self.backend.create_semaphore(max_value, exc_class=exc_class)

def time(self) -> float:
return self.backend.time()
3 changes: 3 additions & 0 deletions httpcore/_backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,6 @@ def create_lock(self) -> AsyncLock:

def create_semaphore(self, max_value: int, exc_class: type) -> AsyncSemaphore:
raise NotImplementedError() # pragma: no cover

def time(self) -> float:
raise NotImplementedError() # pragma: no cover
4 changes: 4 additions & 0 deletions httpcore/_backends/sync.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import select
import socket
import threading
import time
from ssl import SSLContext
from types import TracebackType
from typing import Dict, Optional, Type
Expand Down Expand Up @@ -145,3 +146,6 @@ def create_lock(self) -> SyncLock:

def create_semaphore(self, max_value: int, exc_class: type) -> SyncSemaphore:
return SyncSemaphore(max_value, exc_class=exc_class)

def time(self) -> float:
return time.monotonic()
3 changes: 3 additions & 0 deletions httpcore/_backends/trio.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,6 @@ def create_lock(self) -> AsyncLock:

def create_semaphore(self, max_value: int, exc_class: type) -> AsyncSemaphore:
return Semaphore(max_value, exc_class=exc_class)

def time(self) -> float:
return trio.current_time()
1 change: 1 addition & 0 deletions httpcore/_sync/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(
self.is_http11 = False
self.is_http2 = False
self.connect_failed = False
self.expires_at: Optional[float] = None
self.backend = SyncBackend()

@property
Expand Down
51 changes: 46 additions & 5 deletions httpcore/_sync/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,29 @@ class SyncConnectionPool(SyncHTTPTransport):
**Parameters:**
* **ssl_context** - `Optional[SSLContext]` - An SSL context to use for verifying connections.
* **max_connections** - `Optional[int]` - The maximum number of concurrent connections to allow.
* **max_keepalive** - `Optional[int]` - The maximum number of connections to allow before closing keep-alive connections.
* **keepalive_expiry** - `Optional[float]` - The maximum time to allow before closing a keep-alive connection.
* **http2** - `bool` - Enable HTTP/2 support.
"""

def __init__(
self,
ssl_context: SSLContext = None,
max_connections: int = None,
max_keepalive: int = None,
keepalive_expiry: float = None,
http2: bool = False,
):
self.ssl_context = SSLContext() if ssl_context is None else ssl_context
self.max_connections = max_connections
self.max_keepalive = max_keepalive
self.keepalive_expiry = keepalive_expiry
self.http2 = http2
self.connections: Dict[Origin, Set[SyncHTTPConnection]] = {}
self.thread_lock = ThreadLock()
self.backend = SyncBackend()
self.next_keepalive_check = 0.0

@property
def connection_semaphore(self) -> SyncSemaphore:
Expand All @@ -111,6 +118,9 @@ def request(
timeout = {} if timeout is None else timeout
origin = url[:3]

if self.keepalive_expiry is not None:
self._keepalive_sweep()

connection: Optional[SyncHTTPConnection] = None
while connection is None:
connection = self._get_connection_from_pool(origin)
Expand Down Expand Up @@ -191,6 +201,7 @@ def _get_connection_from_pool(
# Mark the connection as READY before we return it, to indicate
# that if it is HTTP/1.1 then it should not be re-acquired.
reuse_connection.mark_as_ready()
reuse_connection.expires_at = None
elif self.http2 and pending_connection is not None and not seen_http11:
# If we have a PENDING connection, and no HTTP/1.1 connections
# on this origin, then we can attempt to share the connection.
Expand All @@ -209,16 +220,19 @@ def _response_closed(self, connection: SyncHTTPConnection):
with self.thread_lock:
if connection.state == ConnectionState.CLOSED:
remove_from_pool = True
elif (
connection.state == ConnectionState.IDLE
and self.max_keepalive is not None
):
elif connection.state == ConnectionState.IDLE:
num_connections = sum(
[len(conns) for conns in self.connections.values()]
)
if num_connections > self.max_keepalive:
if (
self.max_keepalive is not None
and num_connections > self.max_keepalive
):
remove_from_pool = True
close_connection = True
elif self.keepalive_expiry is not None:
now = self.backend.time()
connection.expires_at = now + self.keepalive_expiry

if remove_from_pool:
if connection in self.connections.get(connection.origin, set()):
Expand All @@ -230,6 +244,33 @@ def _response_closed(self, connection: SyncHTTPConnection):
if close_connection:
connection.close()

def _keepalive_sweep(self):
assert self.keepalive_expiry is not None

now = self.backend.time()
if now < self.next_keepalive_check:
return

self.next_keepalive_check = now + 1.0
connections_to_close = set()

with self.thread_lock:
for connection_set in list(self.connections.values()):
for connection in list(connection_set):
if (
connection.state == ConnectionState.IDLE
and connection.expires_at is not None
and now > connection.expires_at
):
connections_to_close.add(connection)
self.connection_semaphore.release()
self.connections[connection.origin].remove(connection)
if not self.connections[connection.origin]:
del self.connections[connection.origin]

for connection in connections_to_close:
connection.close()

def close(self) -> None:
connections_to_close = set()

Expand Down
8 changes: 7 additions & 1 deletion httpcore/_sync/http_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ class SyncHTTPProxy(SyncConnectionPool):
* **proxy_origin** - `Tuple[bytes, bytes, int]` - The address of the proxy service as a 3-tuple of (scheme, host, port).
* **proxy_headers** - `Optional[List[Tuple[bytes, bytes]]]` - A list of proxy headers to include.
* **proxy_mode** - `Optional[str]` - A proxy mode to operate in. May be "DEFAULT", "FORWARD_ONLY", or "TUNNEL_ONLY".
* **proxy_mode** - `str` - A proxy mode to operate in. May be "DEFAULT", "FORWARD_ONLY", or "TUNNEL_ONLY".
* **ssl_context** - `Optional[SSLContext]` - An SSL context to use for verifying connections.
* **max_connections** - `Optional[int]` - The maximum number of concurrent connections to allow.
* **max_keepalive** - `Optional[int]` - The maximum number of connections to allow before closing keep-alive connections.
* **http2** - `bool` - Enable HTTP/2 support.
"""

def __init__(
Expand All @@ -53,6 +56,9 @@ def request(
stream: SyncByteStream = None,
timeout: TimeoutDict = None,
) -> Tuple[bytes, int, bytes, Headers, SyncByteStream]:
if self.keepalive_expiry is not None:
self._keepalive_sweep()

if (
self.proxy_mode == "DEFAULT" and url[0] == b"http"
) or self.proxy_mode == "FORWARD_ONLY":
Expand Down

0 comments on commit 9a10c68

Please sign in to comment.