Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 42 additions & 4 deletions pkg/common/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,15 @@ func (t *InstrumentedTransport) RoundTrip(req *http.Request) (*http.Response, er
if resp != nil {
// record latency and increment counter for non-200 status code
recordHTTPResponse(sanitizedURL, resp.StatusCode, duration.Seconds())

// wrap the response body to count bytes read
resp.Body = &responseSizeCounterReadCloser{
ReadCloser: resp.Body,
recordSizeFunc: func(size int) {
// Record response body size
recordResponseBodySize(sanitizedURL, size)
},
}
Comment on lines +138 to +146
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would also start generating metrics for SaneHTTPClient. Just wanna make sure if this is intentional.

}

return resp, err
Expand All @@ -147,6 +156,35 @@ func NewInstrumentedTransport(T http.RoundTripper) *InstrumentedTransport {
return &InstrumentedTransport{T}
}

type responseSizeCounterReadCloser struct {
io.ReadCloser
bytesRead *int
recordSizeFunc func(int)
}

func (r *responseSizeCounterReadCloser) Read(p []byte) (n int, err error) {
n, err = r.ReadCloser.Read(p)
if r.bytesRead == nil {
r.bytesRead = new(int)
}
*r.bytesRead += n
return n, err
}

func (r *responseSizeCounterReadCloser) Close() error {
if r.recordSizeFunc != nil {
if r.bytesRead == nil {
// this means downstream code never consumed the body, we must drain the body to get its size
_, err := io.Copy(io.Discard, r)
if err != nil {
return err
}
}
r.recordSizeFunc(*r.bytesRead)
}
return r.ReadCloser.Close()
}

func ConstantResponseHttpClient(statusCode int, body string) *http.Client {
return &http.Client{
Timeout: DefaultResponseTimeout,
Expand Down Expand Up @@ -198,7 +236,7 @@ func WithRetryWaitMax(wait time.Duration) ClientOption {
func PinnedRetryableHttpClient() *http.Client {
httpClient := retryablehttp.NewClient()
httpClient.Logger = nil
httpClient.HTTPClient.Transport = NewCustomTransport(&http.Transport{
httpClient.HTTPClient.Transport = NewInstrumentedTransport(NewCustomTransport(&http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: PinnedCertPool(),
},
Expand All @@ -212,15 +250,15 @@ func PinnedRetryableHttpClient() *http.Client {
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
})
}))
return httpClient.StandardClient()
}

func RetryableHTTPClient(opts ...ClientOption) *http.Client {
httpClient := retryablehttp.NewClient()
httpClient.RetryMax = 3
httpClient.Logger = nil
httpClient.HTTPClient.Transport = NewCustomTransport(nil)
httpClient.HTTPClient.Transport = NewInstrumentedTransport(NewCustomTransport(nil))

for _, opt := range opts {
opt(httpClient)
Expand All @@ -234,7 +272,7 @@ func RetryableHTTPClientTimeout(timeOutSeconds int64, opts ...ClientOption) *htt
httpClient.RetryMax = 3
httpClient.Logger = nil
httpClient.HTTPClient.Timeout = time.Duration(timeOutSeconds) * time.Second
httpClient.HTTPClient.Transport = NewCustomTransport(nil)
httpClient.HTTPClient.Transport = NewInstrumentedTransport(NewCustomTransport(nil))

for _, opt := range opts {
opt(httpClient)
Expand Down
16 changes: 16 additions & 0 deletions pkg/common/http_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ var (
},
[]string{"url", "status_code"},
)

httpResponseBodySizeBytes = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Subsystem: "http_client",
Name: "response_body_size_bytes",
Help: "Size of HTTP response bodies in bytes, labeled by URL.",
Buckets: prometheus.ExponentialBuckets(100, 10, 5), // [100B, 1KB, 10KB, 100KB, 1MB]
},
[]string{"url"},
)
)

// sanitizeURL sanitizes a URL to avoid high cardinality metrics.
Expand Down Expand Up @@ -106,3 +117,8 @@ func recordHTTPResponse(sanitizedURL string, statusCode int, durationSeconds flo
func recordNetworkError(sanitizedURL string) {
httpNon200ResponsesTotal.WithLabelValues(sanitizedURL, "network_error").Inc()
}

// recordResponseBodySize records metrics for the size of an HTTP response body.
func recordResponseBodySize(sanitizedURL string, sizeBytes int) {
httpResponseBodySizeBytes.WithLabelValues(sanitizedURL).Observe(float64(sizeBytes))
}
119 changes: 119 additions & 0 deletions pkg/common/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package common

import (
"context"
"io"
"math"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -405,6 +406,76 @@ func TestSaneHttpClientMetrics(t *testing.T) {
}
}

func TestRetryableHttpClientMetrics(t *testing.T) {
// Create a test server that returns different status codes
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/success":
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("success"))
case "/error":
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("error"))
case "/notfound":
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte("not found"))
default:
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("default"))
}
}))
defer server.Close()

// Create a RetryableHttpClient
client := RetryableHTTPClient()

testCases := []struct {
name string
path string
expectedStatusCode int
expectsNon200 bool
}{
{
name: "successful request",
path: "/success",
expectedStatusCode: 200,
expectsNon200: false,
},
{
name: "not found request",
path: "/notfound",
expectedStatusCode: 404,
expectsNon200: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var requestURL string
if strings.HasPrefix(tc.path, "http") {
requestURL = tc.path
} else {
requestURL = server.URL + tc.path
}

// Get initial metric values
sanitizedURL := sanitizeURL(requestURL)
initialRequestsTotal := testutil.ToFloat64(httpRequestsTotal.WithLabelValues(sanitizedURL))

// Make the request
resp, err := client.Get(requestURL)

require.NoError(t, err)
defer resp.Body.Close()
assert.Equal(t, tc.expectedStatusCode, resp.StatusCode)

// Check that request counter was incremented
requestsTotal := testutil.ToFloat64(httpRequestsTotal.WithLabelValues(sanitizedURL))
assert.Equal(t, initialRequestsTotal+1, requestsTotal)
})
}
}

func TestInstrumentedTransport(t *testing.T) {
// Create a mock transport that we can control
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -439,3 +510,51 @@ func TestInstrumentedTransport(t *testing.T) {
// Note: Testing histogram metrics is complex due to the way Prometheus handles them
// The main thing is that the request completed successfully and counters were incremented
}

func TestResponseSizeCounterReadCloser(t *testing.T) {
bodyContent := "This is a test response body."
expectedSize := len(bodyContent)
resp := &http.Response{
Body: io.NopCloser(strings.NewReader(bodyContent)),
}

var recordedSize int
readCloser := &responseSizeCounterReadCloser{
ReadCloser: resp.Body,
recordSizeFunc: func(size int) {
recordedSize = size
},
}
// Read the entire body
_, err := io.ReadAll(readCloser)
require.NoError(t, err)

// Close the ReadCloser to trigger size recording
err = readCloser.Close()
require.NoError(t, err)

assert.Equal(t, expectedSize, recordedSize, "Response body size does not match expected")
}

func TestResponseSizeCounterReadCloser_UnreadBody(t *testing.T) {
bodyContent := "This is a test response body."
expectedSize := len(bodyContent)
resp := &http.Response{
Body: io.NopCloser(strings.NewReader(bodyContent)),
}

var recordedSize int
readCloser := &responseSizeCounterReadCloser{
ReadCloser: resp.Body,
recordSizeFunc: func(size int) {
recordedSize = size
},
}
// Do not read the body

// Close the ReadCloser to trigger size recording
err := readCloser.Close()
require.NoError(t, err)

assert.Equal(t, expectedSize, recordedSize, "Response body size does not match expected for unread body")
}