Skip to content

Commit 6b042db

Browse files
committed
Add server address and port for Spymemcached
1 parent 62d8ae3 commit 6b042db

File tree

10 files changed

+184
-36
lines changed

10 files changed

+184
-36
lines changed

instrumentation/spymemcached-2.12/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spymemcached/BulkGetCompletionListener.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.util.concurrent.ExecutionException;
1313
import javax.annotation.Nullable;
1414
import net.spy.memcached.MemcachedConnection;
15+
import net.spy.memcached.MemcachedNode;
1516
import net.spy.memcached.internal.BulkGetFuture;
1617

1718
public class BulkGetCompletionListener extends CompletionListener<BulkGetFuture<?>>
@@ -24,13 +25,43 @@ private BulkGetCompletionListener(Context parentContext, SpymemcachedRequest req
2425
@Nullable
2526
public static BulkGetCompletionListener create(
2627
Context parentContext, MemcachedConnection connection, String methodName) {
27-
SpymemcachedRequest request = SpymemcachedRequest.create(connection, methodName);
28+
MemcachedNode handlingNode = getRepresentativeNodeFromConnection(connection);
29+
SpymemcachedRequest request = SpymemcachedRequest.create(connection, methodName, handlingNode);
2830
if (!instrumenter().shouldStart(parentContext, request)) {
2931
return null;
3032
}
3133
return new BulkGetCompletionListener(parentContext, request);
3234
}
3335

36+
@Nullable
37+
private static MemcachedNode getRepresentativeNodeFromConnection(MemcachedConnection connection) {
38+
try {
39+
// Strategy: Get the "most representative" node for bulk operations
40+
// We choose the last active node in the list, which often represents
41+
// the most recently added or most stable node in the cluster
42+
java.util.Collection<net.spy.memcached.MemcachedNode> allNodes =
43+
connection.getLocator().getAll();
44+
45+
MemcachedNode lastActiveNode = null;
46+
MemcachedNode fallbackNode = null;
47+
48+
for (net.spy.memcached.MemcachedNode node : allNodes) {
49+
if (fallbackNode == null) {
50+
fallbackNode = node;
51+
}
52+
53+
if (node.isActive()) {
54+
lastActiveNode = node;
55+
}
56+
}
57+
58+
// Return the last active node, or fallback to the first node
59+
return lastActiveNode != null ? lastActiveNode : fallbackNode;
60+
} catch (RuntimeException e) {
61+
return null;
62+
}
63+
}
64+
3465
@Override
3566
public void onComplete(BulkGetFuture<?> future) {
3667
closeAsyncSpan(future);

instrumentation/spymemcached-2.12/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spymemcached/GetCompletionListener.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,17 @@
55

66
package io.opentelemetry.javaagent.instrumentation.spymemcached;
77

8+
import static io.opentelemetry.javaagent.instrumentation.spymemcached.SpymemcachedSingletons.GET_FUTURE_OPERATION;
89
import static io.opentelemetry.javaagent.instrumentation.spymemcached.SpymemcachedSingletons.instrumenter;
910

1011
import io.opentelemetry.api.trace.Span;
1112
import io.opentelemetry.context.Context;
1213
import java.util.concurrent.ExecutionException;
1314
import javax.annotation.Nullable;
1415
import net.spy.memcached.MemcachedConnection;
16+
import net.spy.memcached.MemcachedNode;
1517
import net.spy.memcached.internal.GetFuture;
18+
import net.spy.memcached.ops.Operation;
1619

1720
public class GetCompletionListener extends CompletionListener<GetFuture<?>>
1821
implements net.spy.memcached.internal.GetCompletionListener {
@@ -23,14 +26,28 @@ private GetCompletionListener(Context parentContext, SpymemcachedRequest request
2326

2427
@Nullable
2528
public static GetCompletionListener create(
26-
Context parentContext, MemcachedConnection connection, String methodName) {
27-
SpymemcachedRequest request = SpymemcachedRequest.create(connection, methodName);
29+
Context parentContext,
30+
MemcachedConnection connection,
31+
String methodName,
32+
GetFuture<?> future) {
33+
// Extract handling node from future before creating span
34+
MemcachedNode handlingNode = extractHandlingNodeFromFuture(future);
35+
SpymemcachedRequest request = SpymemcachedRequest.create(connection, methodName, handlingNode);
2836
if (!instrumenter().shouldStart(parentContext, request)) {
2937
return null;
3038
}
3139
return new GetCompletionListener(parentContext, request);
3240
}
3341

42+
@Nullable
43+
private static MemcachedNode extractHandlingNodeFromFuture(GetFuture<?> future) {
44+
Operation operation = GET_FUTURE_OPERATION.get(future);
45+
if (operation != null) {
46+
return operation.getHandlingNode();
47+
}
48+
return null;
49+
}
50+
3451
@Override
3552
public void onComplete(GetFuture<?> future) {
3653
closeAsyncSpan(future);

instrumentation/spymemcached-2.12/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spymemcached/MemcachedClientInstrumentation.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public static void methodExit(
7676
if (future != null) {
7777
OperationCompletionListener listener =
7878
OperationCompletionListener.create(
79-
currentContext(), client.getConnection(), methodName);
79+
currentContext(), client.getConnection(), methodName, future);
8080
if (listener != null) {
8181
future.addListener(listener);
8282
}
@@ -106,7 +106,8 @@ public static void methodExit(
106106

107107
if (future != null) {
108108
GetCompletionListener listener =
109-
GetCompletionListener.create(currentContext(), client.getConnection(), methodName);
109+
GetCompletionListener.create(
110+
currentContext(), client.getConnection(), methodName, future);
110111
if (listener != null) {
111112
future.addListener(listener);
112113
}
@@ -156,15 +157,16 @@ private AdviceScope(CallDepth callDepth, @Nullable SyncCompletionListener listen
156157
this.listener = listener;
157158
}
158159

159-
public static AdviceScope start(MemcachedClient client, String methodName) {
160+
public static AdviceScope start(MemcachedClient client, String methodName, String key) {
160161
CallDepth callDepth = CallDepth.forClass(MemcachedClient.class);
161162
if (callDepth.getAndIncrement() > 0) {
162163
return new AdviceScope(callDepth, null);
163164
}
164165

165166
return new AdviceScope(
166167
callDepth,
167-
SyncCompletionListener.create(Context.current(), client.getConnection(), methodName));
168+
SyncCompletionListener.create(
169+
Context.current(), client.getConnection(), methodName, key));
168170
}
169171

170172
public void end(@Nullable Throwable throwable) {
@@ -178,8 +180,10 @@ public void end(@Nullable Throwable throwable) {
178180

179181
@Advice.OnMethodEnter(suppress = Throwable.class)
180182
public static AdviceScope methodEnter(
181-
@Advice.This MemcachedClient client, @Advice.Origin("#m") String methodName) {
182-
return AdviceScope.start(client, methodName);
183+
@Advice.This MemcachedClient client,
184+
@Advice.Origin("#m") String methodName,
185+
@Advice.Argument(0) String key) {
186+
return AdviceScope.start(client, methodName, key);
183187
}
184188

185189
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)

instrumentation/spymemcached-2.12/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spymemcached/OperationCompletionListener.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,17 @@
55

66
package io.opentelemetry.javaagent.instrumentation.spymemcached;
77

8+
import static io.opentelemetry.javaagent.instrumentation.spymemcached.SpymemcachedSingletons.OPERATION_FUTURE_OPERATION;
89
import static io.opentelemetry.javaagent.instrumentation.spymemcached.SpymemcachedSingletons.instrumenter;
910

1011
import io.opentelemetry.api.trace.Span;
1112
import io.opentelemetry.context.Context;
1213
import java.util.concurrent.ExecutionException;
1314
import javax.annotation.Nullable;
1415
import net.spy.memcached.MemcachedConnection;
16+
import net.spy.memcached.MemcachedNode;
1517
import net.spy.memcached.internal.OperationFuture;
18+
import net.spy.memcached.ops.Operation;
1619

1720
public class OperationCompletionListener extends CompletionListener<OperationFuture<?>>
1821
implements net.spy.memcached.internal.OperationCompletionListener {
@@ -23,14 +26,28 @@ private OperationCompletionListener(Context parentContext, SpymemcachedRequest r
2326

2427
@Nullable
2528
public static OperationCompletionListener create(
26-
Context parentContext, MemcachedConnection connection, String methodName) {
27-
SpymemcachedRequest request = SpymemcachedRequest.create(connection, methodName);
29+
Context parentContext,
30+
MemcachedConnection connection,
31+
String methodName,
32+
OperationFuture<?> future) {
33+
// Extract handling node from future before creating span
34+
MemcachedNode handlingNode = extractHandlingNodeFromFuture(future);
35+
SpymemcachedRequest request = SpymemcachedRequest.create(connection, methodName, handlingNode);
2836
if (!instrumenter().shouldStart(parentContext, request)) {
2937
return null;
3038
}
3139
return new OperationCompletionListener(parentContext, request);
3240
}
3341

42+
@Nullable
43+
private static MemcachedNode extractHandlingNodeFromFuture(OperationFuture<?> future) {
44+
Operation operation = OPERATION_FUTURE_OPERATION.get(future);
45+
if (operation != null) {
46+
return operation.getHandlingNode();
47+
}
48+
return null;
49+
}
50+
3451
@Override
3552
public void onComplete(OperationFuture<?> future) {
3653
closeAsyncSpan(future);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.spymemcached;
7+
8+
import static io.opentelemetry.javaagent.instrumentation.spymemcached.SpymemcachedSingletons.GET_FUTURE_OPERATION;
9+
import static io.opentelemetry.javaagent.instrumentation.spymemcached.SpymemcachedSingletons.OPERATION_FUTURE_OPERATION;
10+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
11+
import static net.bytebuddy.matcher.ElementMatchers.named;
12+
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
13+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
14+
15+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
16+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
17+
import net.bytebuddy.asm.Advice;
18+
import net.bytebuddy.description.type.TypeDescription;
19+
import net.bytebuddy.matcher.ElementMatcher;
20+
import net.spy.memcached.internal.GetFuture;
21+
import net.spy.memcached.internal.OperationFuture;
22+
import net.spy.memcached.ops.Operation;
23+
24+
public class SetOperationInstrumentation implements TypeInstrumentation {
25+
26+
@Override
27+
public ElementMatcher<TypeDescription> typeMatcher() {
28+
return namedOneOf(
29+
"net.spy.memcached.internal.OperationFuture", "net.spy.memcached.internal.GetFuture");
30+
}
31+
32+
@Override
33+
public void transform(TypeTransformer transformer) {
34+
transformer.applyAdviceToMethod(
35+
isMethod().and(named("setOperation")).and(takesArguments(1)),
36+
this.getClass().getName() + "$SetOperationAdvice");
37+
}
38+
39+
@SuppressWarnings("unused")
40+
public static class SetOperationAdvice {
41+
42+
@Advice.OnMethodExit(suppress = Throwable.class)
43+
public static void onExit(@Advice.This Object future, @Advice.Argument(0) Operation operation) {
44+
45+
if (future instanceof OperationFuture) {
46+
OPERATION_FUTURE_OPERATION.set((OperationFuture<?>) future, operation);
47+
} else if (future instanceof GetFuture) {
48+
GET_FUTURE_OPERATION.set((GetFuture<?>) future, operation);
49+
}
50+
}
51+
}
52+
}

instrumentation/spymemcached-2.12/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spymemcached/SpymemcachedInstrumentationModule.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
package io.opentelemetry.javaagent.instrumentation.spymemcached;
77

8-
import static java.util.Collections.singletonList;
8+
import static java.util.Arrays.asList;
99

1010
import com.google.auto.service.AutoService;
1111
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
@@ -23,7 +23,7 @@ public SpymemcachedInstrumentationModule() {
2323

2424
@Override
2525
public List<TypeInstrumentation> typeInstrumentations() {
26-
return singletonList(new MemcachedClientInstrumentation());
26+
return asList(new MemcachedClientInstrumentation(), new SetOperationInstrumentation());
2727
}
2828

2929
@Override

instrumentation/spymemcached-2.12/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spymemcached/SpymemcachedNetworkAttributesGetter.java

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,28 +5,25 @@
55

66
package io.opentelemetry.javaagent.instrumentation.spymemcached;
77

8+
import io.opentelemetry.instrumentation.api.semconv.network.NetworkAttributesGetter;
89
import io.opentelemetry.instrumentation.api.semconv.network.ServerAttributesGetter;
910
import java.net.InetSocketAddress;
1011
import java.net.SocketAddress;
11-
import java.util.Collection;
1212
import javax.annotation.Nullable;
13-
import net.spy.memcached.MemcachedConnection;
1413
import net.spy.memcached.MemcachedNode;
1514

1615
final class SpymemcachedNetworkAttributesGetter
17-
implements ServerAttributesGetter<SpymemcachedRequest> {
16+
implements ServerAttributesGetter<SpymemcachedRequest>,
17+
NetworkAttributesGetter<SpymemcachedRequest, Object> {
1818

1919
@Nullable
2020
@Override
2121
public String getServerAddress(SpymemcachedRequest request) {
22-
MemcachedConnection connection = request.getConnection();
23-
if (connection != null) {
24-
Collection<MemcachedNode> nodes = connection.getLocator().getAll();
25-
if (!nodes.isEmpty()) {
26-
SocketAddress socketAddress = nodes.iterator().next().getSocketAddress();
27-
if (socketAddress instanceof InetSocketAddress) {
28-
return ((InetSocketAddress) socketAddress).getHostString();
29-
}
22+
MemcachedNode handlingNode = request.getHandlingNode();
23+
if (handlingNode != null) {
24+
SocketAddress socketAddress = handlingNode.getSocketAddress();
25+
if (socketAddress instanceof InetSocketAddress) {
26+
return ((InetSocketAddress) socketAddress).getHostString();
3027
}
3128
}
3229
return null;
@@ -35,14 +32,11 @@ public String getServerAddress(SpymemcachedRequest request) {
3532
@Nullable
3633
@Override
3734
public Integer getServerPort(SpymemcachedRequest request) {
38-
MemcachedConnection connection = request.getConnection();
39-
if (connection != null) {
40-
Collection<MemcachedNode> nodes = connection.getLocator().getAll();
41-
if (!nodes.isEmpty()) {
42-
SocketAddress socketAddress = nodes.iterator().next().getSocketAddress();
43-
if (socketAddress instanceof InetSocketAddress) {
44-
return ((InetSocketAddress) socketAddress).getPort();
45-
}
35+
MemcachedNode handlingNode = request.getHandlingNode();
36+
if (handlingNode != null) {
37+
SocketAddress socketAddress = handlingNode.getSocketAddress();
38+
if (socketAddress instanceof InetSocketAddress) {
39+
return ((InetSocketAddress) socketAddress).getPort();
4640
}
4741
}
4842
return null;

instrumentation/spymemcached-2.12/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spymemcached/SpymemcachedRequest.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,25 @@
66
package io.opentelemetry.javaagent.instrumentation.spymemcached;
77

88
import com.google.auto.value.AutoValue;
9+
import javax.annotation.Nullable;
910
import net.spy.memcached.MemcachedConnection;
11+
import net.spy.memcached.MemcachedNode;
1012

1113
@AutoValue
1214
public abstract class SpymemcachedRequest {
1315

14-
public static SpymemcachedRequest create(MemcachedConnection connection, String statement) {
15-
return new AutoValue_SpymemcachedRequest(connection, statement);
16+
public static SpymemcachedRequest create(
17+
MemcachedConnection connection, String statement, @Nullable MemcachedNode handlingNode) {
18+
return new AutoValue_SpymemcachedRequest(connection, statement, handlingNode);
1619
}
1720

1821
public abstract MemcachedConnection getConnection();
1922

2023
public abstract String getStatement();
2124

25+
@Nullable
26+
public abstract MemcachedNode getHandlingNode();
27+
2228
public String dbOperation() {
2329
String statement = getStatement();
2430
if (statement.startsWith("async")) {

instrumentation/spymemcached-2.12/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spymemcached/SpymemcachedSingletons.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,22 @@
1111
import io.opentelemetry.instrumentation.api.incubator.semconv.db.DbClientSpanNameExtractor;
1212
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
1313
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
14+
import io.opentelemetry.instrumentation.api.semconv.network.NetworkAttributesExtractor;
1415
import io.opentelemetry.instrumentation.api.semconv.network.ServerAttributesExtractor;
16+
import io.opentelemetry.instrumentation.api.util.VirtualField;
17+
import net.spy.memcached.internal.GetFuture;
18+
import net.spy.memcached.internal.OperationFuture;
19+
import net.spy.memcached.ops.Operation;
1520

1621
public final class SpymemcachedSingletons {
1722
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.spymemcached-2.12";
1823

1924
private static final Instrumenter<SpymemcachedRequest, Object> INSTRUMENTER;
2025

26+
public static final VirtualField<OperationFuture<?>, Operation> OPERATION_FUTURE_OPERATION;
27+
28+
public static final VirtualField<GetFuture<?>, Operation> GET_FUTURE_OPERATION;
29+
2130
static {
2231
SpymemcachedAttributesGetter dbAttributesGetter = new SpymemcachedAttributesGetter();
2332
SpymemcachedNetworkAttributesGetter netAttributesGetter =
@@ -30,8 +39,12 @@ public final class SpymemcachedSingletons {
3039
DbClientSpanNameExtractor.create(dbAttributesGetter))
3140
.addAttributesExtractor(DbClientAttributesExtractor.create(dbAttributesGetter))
3241
.addAttributesExtractor(ServerAttributesExtractor.create(netAttributesGetter))
42+
.addAttributesExtractor(NetworkAttributesExtractor.create(netAttributesGetter))
3343
.addOperationMetrics(DbClientMetrics.get())
3444
.buildInstrumenter(SpanKindExtractor.alwaysClient());
45+
46+
OPERATION_FUTURE_OPERATION = VirtualField.find(OperationFuture.class, Operation.class);
47+
GET_FUTURE_OPERATION = VirtualField.find(GetFuture.class, Operation.class);
3548
}
3649

3750
public static Instrumenter<SpymemcachedRequest, Object> instrumenter() {

0 commit comments

Comments
 (0)