Skip to content

Commit 22e6ab2

Browse files
authored
Transport improvements (#70)
1 parent 2a2d36b commit 22e6ab2

File tree

10 files changed

+601
-512
lines changed

10 files changed

+601
-512
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ repos:
4848
- id: no-commit-to-branch
4949
args: [--branch, dev, --branch, int, --branch, main]
5050
- repo: https://github.com/astral-sh/ruff-pre-commit
51-
rev: v0.14.3
51+
rev: v0.14.4
5252
hooks:
5353
- id: ruff
5454
args: [--fix, --exit-non-zero-on-fix]

.pyproject_generation/pyproject_custom.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "ghga_service_commons"
3-
version = "5.3.0"
3+
version = "6.0.0"
44
description = "A library that contains common functionality used in services of GHGA"
55
readme = "README.md"
66
authors = [

lock/requirements-dev.txt

Lines changed: 301 additions & 297 deletions
Large diffs are not rendered by default.

lock/requirements.txt

Lines changed: 162 additions & 158 deletions
Large diffs are not rendered by default.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ classifiers = [
2323
"Intended Audience :: Developers",
2424
]
2525
name = "ghga_service_commons"
26-
version = "5.3.0"
26+
version = "6.0.0"
2727
description = "A library that contains common functionality used in services of GHGA"
2828
dependencies = [
2929
"pydantic >=2, <3",
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Copyright 2021 - 2025 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln
2+
# for the German Human Genome-Phenome Archive (GHGA)
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
"""Pluggable, custom and composable logic around HTTP calls."""
17+
18+
from .config import CompositeCacheConfig, CompositeConfig
19+
from .factory import (
20+
AsyncCacheTransport,
21+
AsyncRateLimitingTransport,
22+
AsyncRetryTransport,
23+
CompositeTransportFactory,
24+
)
25+
26+
__all__ = [
27+
"AsyncCacheTransport",
28+
"AsyncRateLimitingTransport",
29+
"AsyncRetryTransport",
30+
"CompositeCacheConfig",
31+
"CompositeConfig",
32+
"CompositeTransportFactory",
33+
]

src/ghga_service_commons/transports/config.py

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,32 @@ class CacheTransportConfig(BaseSettings):
2525
Currently only in memory storage is available.
2626
"""
2727

28-
cache_ttl: NonNegativeInt = Field(
28+
client_cache_capacity: PositiveInt = Field(
29+
default=128,
30+
description="Maximum number of entries to store in the cache. Older entries are evicted once this limit is reached.",
31+
)
32+
client_cache_ttl: NonNegativeInt = Field(
2933
default=60,
3034
description="Number of seconds after which a stored response is considered stale.",
3135
)
32-
cache_capacity: PositiveInt = Field(
33-
default=128,
34-
description="Maximum number of entries to store in the cache. Older entries are evicted once this limit is reached.",
36+
client_cacheable_methods: list[str] = Field(
37+
default=["POST", "GET"],
38+
description="HTTP methods for which responses are allowed to be cached.",
39+
)
40+
client_cacheable_status_codes: list[int] = Field(
41+
default=[200, 201],
42+
description="HTTP response status code for which responses are allowed to be cached.",
3543
)
3644

3745

3846
class RateLimitingTransportConfig(BaseSettings):
3947
"""Configuration options for a rate limiting HTTPTransport."""
4048

41-
jitter: NonNegativeFloat = Field(
49+
per_request_jitter: NonNegativeFloat = Field(
4250
default=0.0,
4351
description="Max amount of jitter (in seconds) to add to each request.",
4452
)
45-
reset_after: PositiveInt = Field(
53+
retry_after_applicable_for_num_requests: PositiveInt = Field(
4654
default=1,
4755
description="Amount of requests after which the stored delay from a 429 response is ignored again. "
4856
+ "Can be useful to adjust if concurrent requests are fired in quick succession.",
@@ -52,18 +60,23 @@ class RateLimitingTransportConfig(BaseSettings):
5260
class RetryTransportConfig(BaseSettings):
5361
"""Configuration options for an HTTPTransport providing retry logic."""
5462

55-
exponential_backoff_max: NonNegativeInt = Field(
63+
client_exponential_backoff_max: NonNegativeInt = Field(
5664
default=60,
5765
description="Maximum number of seconds to wait between retries when using"
5866
+ " exponential backoff retry strategies. The client timeout might need to be adjusted accordingly.",
5967
)
60-
max_retries: NonNegativeInt = Field(
68+
client_num_retries: NonNegativeInt = Field(
6169
default=3, description="Number of times to retry failed API calls."
6270
)
63-
retry_status_codes: list[NonNegativeInt] = Field(
71+
client_retry_status_codes: list[NonNegativeInt] = Field(
6472
default=[408, 429, 500, 502, 503, 504],
6573
description="List of status codes that should trigger retrying a request.",
6674
)
75+
client_reraise_from_retry_error: bool = Field(
76+
default=True,
77+
description="Specifies if the exception wrapped in the final RetryError is reraised "
78+
"or the RetryError is returned as is.",
79+
)
6780

6881

6982
class CompositeConfig(RateLimitingTransportConfig, RetryTransportConfig):

src/ghga_service_commons/transports/factory.py

Lines changed: 51 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515

1616
"""Provides factories for different flavors of httpx.AsyncHTTPTransport."""
1717

18-
from hishel import AsyncCacheTransport, AsyncInMemoryStorage
19-
from httpx import AsyncHTTPTransport, Limits
18+
from hishel import AsyncCacheTransport, AsyncInMemoryStorage, Controller
19+
from httpx import AsyncBaseTransport, AsyncHTTPTransport, Limits
2020

2121
from .config import CompositeCacheConfig, CompositeConfig
2222
from .ratelimiting import AsyncRateLimitingTransport
@@ -28,11 +28,21 @@ class CompositeTransportFactory:
2828

2929
@classmethod
3030
def _create_common_transport_layers(
31-
cls, config: CompositeConfig, limits: Limits | None = None
31+
cls,
32+
config: CompositeConfig,
33+
base_transport: AsyncBaseTransport | None = None,
34+
limits: Limits | None = None,
3235
):
33-
"""Creates wrapped transports reused between different factory methods."""
36+
"""Creates wrapped transports reused between different factory methods.
37+
38+
If provided, limits are applied to the AsyncHTTPTransport instance this method creates.
39+
If provided, a custom base_transport class is used and any limits are ignored.
40+
Those have to be provided directly to the custom base_transport passed into this method.
41+
"""
3442
base_transport = (
35-
AsyncHTTPTransport(limits=limits) if limits else AsyncHTTPTransport()
43+
base_transport or AsyncHTTPTransport(limits=limits)
44+
if limits
45+
else AsyncHTTPTransport()
3646
)
3747
ratelimiting_transport = AsyncRateLimitingTransport(
3848
config=config, transport=base_transport
@@ -44,16 +54,44 @@ def _create_common_transport_layers(
4454

4555
@classmethod
4656
def create_ratelimiting_retry_transport(
47-
cls, config: CompositeConfig, limits: Limits | None = None
57+
cls,
58+
config: CompositeConfig,
59+
base_transport: AsyncBaseTransport | None = None,
60+
limits: Limits | None = None,
4861
) -> AsyncRetryTransport:
49-
"""Creates a retry transport, wrapping a rate limiting transport, wrapping an AsyncHTTPTransport."""
50-
return cls._create_common_transport_layers(config, limits=limits)
62+
"""Creates a retry transport, wrapping, in sequence, a rate limiting transport and AsyncHTTPTransport.
63+
64+
If provided, limits are applied to the wrapped AsyncHTTPTransport instance.
65+
If provided, a custom base_transport class is used and any limits are ignored.
66+
Those have to be provided directly to the custom base_transport passed into this method.
67+
"""
68+
return cls._create_common_transport_layers(
69+
config, base_transport=base_transport, limits=limits
70+
)
5171

5272
@classmethod
5373
def create_cached_ratelimiting_retry_transport(
54-
cls, config: CompositeCacheConfig, limits: Limits | None = None
74+
cls,
75+
config: CompositeCacheConfig,
76+
base_transport: AsyncBaseTransport | None = None,
77+
limits: Limits | None = None,
5578
) -> AsyncCacheTransport:
56-
"""Creates a retry transport, wrapping a rate limiting transport, wrapping a cache transport, wrapping an AsyncHTTPTransport."""
57-
storage = AsyncInMemoryStorage(ttl=config.cache_ttl)
58-
retry_transport = cls._create_common_transport_layers(config, limits=limits)
59-
return AsyncCacheTransport(transport=retry_transport, storage=storage)
79+
"""Creates a cache transport, wrapping, in sequence, a retry, rate limiting transport and AsyncHTTPTransport.
80+
81+
If provided, limits are applied to the wrapped AsyncHTTPTransport instance.
82+
If provided, a custom base_transport class is used and any limits are ignored.
83+
Those have to be provided directly to the custom base_transport passed into this method.
84+
"""
85+
retry_transport = cls._create_common_transport_layers(
86+
config, base_transport=base_transport, limits=limits
87+
)
88+
controller = Controller(
89+
cacheable_methods=config.client_cacheable_methods,
90+
cacheable_status_codes=config.client_cacheable_status_codes,
91+
)
92+
storage = AsyncInMemoryStorage(
93+
ttl=config.client_cache_ttl, capacity=config.client_cache_capacity
94+
)
95+
return AsyncCacheTransport(
96+
controller=controller, transport=retry_transport, storage=storage
97+
)

src/ghga_service_commons/transports/ratelimiting.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,10 @@ class AsyncRateLimitingTransport(httpx.AsyncBaseTransport):
4444
def __init__(
4545
self, config: RateLimitingTransportConfig, transport: httpx.AsyncBaseTransport
4646
) -> None:
47-
self._jitter = config.jitter
47+
self._jitter = config.per_request_jitter
4848
self._transport = transport
4949
self._num_requests = 0
50-
self._reset_after: int = config.reset_after
50+
self._reset_after: int = config.retry_after_applicable_for_num_requests
5151
self._last_retry_after_received: float = 0
5252
self._wait_time: float = 0
5353

src/ghga_service_commons/transports/retry.py

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717

1818
import time
1919
from collections.abc import Callable
20+
from contextlib import suppress
2021
from logging import getLogger
2122
from types import TracebackType
2223
from typing import Any, Self
2324

2425
import httpx
25-
import tenacity
2626
from tenacity import (
2727
AsyncRetrying,
2828
RetryCallState,
@@ -39,12 +39,12 @@
3939

4040
def _default_wait_strategy(config: RetryTransportConfig):
4141
"""Wait strategy using exponential backoff, not waiting for 429 responses."""
42-
return wait_exponential_ignore_429(max=config.exponential_backoff_max)
42+
return wait_exponential_ignore_429(max=config.client_exponential_backoff_max)
4343

4444

4545
def _default_stop_strategy(config: RetryTransportConfig):
4646
"""Basic stop strategy aborting retrying after a configured number of attempts."""
47-
return stop_after_attempt(config.max_retries)
47+
return stop_after_attempt(config.client_num_retries)
4848

4949

5050
def _log_retry_stats(retry_state: RetryCallState):
@@ -65,13 +65,15 @@ def _log_retry_stats(retry_state: RetryCallState):
6565

6666
# Enrich with details from current attempt for debugging
6767
if outcome := retry_state.outcome:
68-
result = outcome.result()
69-
if isinstance(result, httpx.Response):
70-
stats["response_status_code"] = result.status_code
71-
stats["response_headers"] = result.headers
72-
elif isinstance(result, Exception):
73-
stats["exception_type"] = type(result)
74-
stats["exception_message"] = str(result)
68+
try:
69+
result = outcome.result()
70+
except Exception as exc:
71+
stats["exception_type"] = type(exc)
72+
stats["exception_message"] = str(exc)
73+
else:
74+
if isinstance(result, httpx.Response):
75+
stats["response_status_code"] = result.status_code
76+
stats["response_headers"] = result.headers
7577

7678
log.info(
7779
"Retry attempt number %i for function %s.",
@@ -90,14 +92,15 @@ class wait_exponential_ignore_429(wait_exponential): # noqa: N801
9092

9193
def __call__(self, retry_state: RetryCallState) -> float:
9294
"""Copied from base class and adjusted."""
93-
if (
94-
retry_state.outcome
95-
and (result := retry_state.outcome.result())
96-
and isinstance(result, httpx.Response)
97-
and result.status_code == 429
98-
and not result.headers.get("Should-Wait")
99-
):
100-
return 0
95+
if retry_state.outcome:
96+
with suppress(Exception):
97+
result = retry_state.outcome.result()
98+
if (
99+
isinstance(result, httpx.Response)
100+
and result.status_code == 429
101+
and not result.headers.get("Should-Wait")
102+
):
103+
return 0
101104
try:
102105
exp = self.exp_base ** (retry_state.attempt_number - 1)
103106
result = self.multiplier * exp
@@ -133,16 +136,9 @@ def __init__(
133136

134137
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
135138
"""Handles HTTP requests and adds retry logic around calls."""
136-
response = None
137-
try:
138-
response = await self._retry_handler(
139-
fn=self._transport.handle_async_request, request=request
140-
)
141-
except tenacity.RetryError as exc:
142-
# last_attempt is potentially an unawaited future, need to explicitly reraise
143-
# to get the correct inner instance displayed
144-
exc.reraise()
145-
return response
139+
return await self._retry_handler(
140+
fn=self._transport.handle_async_request, request=request
141+
)
146142

147143
async def aclose(self) -> None: # noqa: D102
148144
await self._transport.aclose()
@@ -167,7 +163,7 @@ def _configure_retry_handler(
167163
):
168164
"""Configure the AsyncRetrying instance that is used for handling retryable responses/exceptions."""
169165
return AsyncRetrying(
170-
reraise=True,
166+
reraise=config.client_reraise_from_retry_error,
171167
retry=(
172168
retry_if_exception_type(
173169
(
@@ -177,7 +173,8 @@ def _configure_retry_handler(
177173
)
178174
)
179175
| retry_if_result(
180-
lambda response: response.status_code in config.retry_status_codes
176+
lambda response: response.status_code
177+
in config.client_retry_status_codes
181178
)
182179
),
183180
stop=stop_strategy(config),

0 commit comments

Comments
 (0)