Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions google-cloud-spanner-executor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,41 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-bom</artifactId>
<version>1.41.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud.opentelemetry</groupId>
<artifactId>exporter-trace</artifactId>
<version>0.29.0</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-spanner</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-trace</artifactId>
<version>2.47.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.cloud.spanner.TransactionRunner.TransactionCallable;

import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.paging.Page;
import com.google.api.gax.retrying.RetrySettings;
Expand Down Expand Up @@ -70,15 +71,21 @@
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.TransactionContext;
import com.google.cloud.spanner.TransactionRunner;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.cloud.spanner.Type;
import com.google.cloud.spanner.Value;
import com.google.cloud.spanner.encryption.CustomerManagedEncryption;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.cloud.trace.v1.TraceServiceClient;
import com.google.cloud.trace.v1.TraceServiceSettings;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.cloudtrace.v1.GetTraceRequest;
import com.google.devtools.cloudtrace.v1.Trace;
import com.google.devtools.cloudtrace.v1.TraceSpan;
import com.google.longrunning.Operation;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
Expand Down Expand Up @@ -152,6 +159,9 @@
import com.google.spanner.v1.TypeCode;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -332,24 +342,28 @@ public void startRWTransaction() throws Exception {
// Try to commit
return null;
};
io.opentelemetry.context.Context context = io.opentelemetry.context.Context.current();
Runnable runnable =
() -> {
try {
runner =
optimistic
? dbClient.readWriteTransaction(Options.optimisticLock())
: dbClient.readWriteTransaction();
LOGGER.log(Level.INFO, String.format("Ready to run callable %s\n", transactionSeed));
runner.run(callable);
transactionSucceeded(runner.getCommitTimestamp().toProto());
} catch (SpannerException e) {
LOGGER.log(
Level.WARNING,
String.format("Transaction runnable failed with exception %s\n", e.getMessage()),
e);
transactionFailed(e);
}
};
context.wrap(
() -> {
try {
runner =
optimistic
? dbClient.readWriteTransaction(Options.optimisticLock())
: dbClient.readWriteTransaction();
LOGGER.log(
Level.INFO, String.format("Ready to run callable %s\n", transactionSeed));
runner.run(callable);
transactionSucceeded(runner.getCommitTimestamp().toProto());
} catch (SpannerException e) {
LOGGER.log(
Level.WARNING,
String.format(
"Transaction runnable failed with exception %s\n", e.getMessage()),
e);
transactionFailed(e);
}
});
LOGGER.log(
Level.INFO,
String.format("Callable and Runnable created, ready to execute %s\n", transactionSeed));
Expand Down Expand Up @@ -815,6 +829,8 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex
.setHost(HOST_PREFIX + WorkerProxy.spannerPort)
.setCredentials(credentials)
.setChannelProvider(channelProvider)
.setEnableServerSideTracing(true)
.setOpenTelemetry(WorkerProxy.openTelemetrySdk)
.setSessionPoolOption(sessionPoolOptions);

SpannerStubSettings.Builder stubSettingsBuilder =
Expand All @@ -838,6 +854,70 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex
return optionsBuilder.build().getService();
}

private TraceServiceClient traceServiceClient;

// Return the trace service client, create one if not exists.
private synchronized TraceServiceClient getTraceServiceClient() throws IOException {
if (traceServiceClient != null) {
return traceServiceClient;
}
// Create a trace service client
Credentials credentials;
if (WorkerProxy.serviceKeyFile.isEmpty()) {
credentials = NoCredentials.getInstance();
} else {
credentials =
GoogleCredentials.fromStream(
new ByteArrayInputStream(
FileUtils.readFileToByteArray(new File(WorkerProxy.serviceKeyFile))),
HTTP_TRANSPORT_FACTORY);
}

TransportChannelProvider transportChannelProvider =
CloudUtil.newCloudTraceChannelProviderHelper(WorkerProxy.CLOUD_TRACE_ENDPOINT, 443);

TraceServiceSettings traceServiceSettings =
TraceServiceSettings.newBuilder()
.setEndpoint(WorkerProxy.CLOUD_TRACE_ENDPOINT)
.setCredentialsProvider(FixedCredentialsProvider.create(credentials))
.setTransportChannelProvider(transportChannelProvider)
.build();

traceServiceClient = TraceServiceClient.create(traceServiceSettings);
return traceServiceClient;
}

private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction";
private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction";

/* Handles verification of OpenTelemetry traces for server side tracing feature. */
public boolean verifyExportedEndToEndTrace(String traceId) {
try {
GetTraceRequest getTraceRequest =
GetTraceRequest.newBuilder()
.setProjectId(WorkerProxy.PROJECT_ID)
.setTraceId(traceId)
.build();
Trace trace = getTraceServiceClient().getTrace(getTraceRequest);
boolean readWriteorReadOnlyTxnPresent = false, spannerServerSideSpanPresent = false;
for (TraceSpan span : trace.getSpansList()) {
if (span.getName() == READ_ONLY_TRANSACTION || span.getName() == READ_WRITE_TRANSACTION) {
readWriteorReadOnlyTxnPresent = true;
}
if (span.getName().startsWith("Spanner.")) {
spannerServerSideSpanPresent = true;
}
}
if (readWriteorReadOnlyTxnPresent && !spannerServerSideSpanPresent) {
return false;
}
} catch (IOException e) {
LOGGER.log(Level.WARNING, "failed to verify end to end traces.", e);
return false;
}
return true;
}

/** Handle actions. */
public Status startHandlingRequest(
SpannerAsyncActionRequest req, ExecutionFlowContext executionContext) {
Expand All @@ -862,17 +942,20 @@ public Status startHandlingRequest(
useMultiplexedSession = false;
}

io.opentelemetry.context.Context context = io.opentelemetry.context.Context.current();
actionThreadPool.execute(
() -> {
Status status =
executeAction(outcomeSender, action, dbPath, useMultiplexedSession, executionContext);
if (!status.isOk()) {
LOGGER.log(
Level.WARNING,
String.format("Failed to execute action with error: %s\n%s", status, action));
executionContext.onError(status.getCause());
}
});
context.wrap(
() -> {
Status status =
executeAction(
outcomeSender, action, dbPath, useMultiplexedSession, executionContext);
if (!status.isOk()) {
LOGGER.log(
Level.WARNING,
String.format("Failed to execute action with error: %s\n%s", status, action));
executionContext.onError(status.getCause());
}
}));
return Status.OK;
}

Expand All @@ -883,7 +966,12 @@ private Status executeAction(
String dbPath,
boolean useMultiplexedSession,
ExecutionFlowContext executionContext) {

Tracer tracer =
WorkerProxy.openTelemetrySdk.getTracer(CloudClientExecutor.class.getName(), "0.1.0");
String spanName = String.format("performaction_%s", action.getActionCase().toString());
LOGGER.log(Level.INFO, String.format("spanName: %s", spanName));
Span span = tracer.spanBuilder(spanName).startSpan();
Scope scope = span.makeCurrent();
try {
if (action.hasAdmin()) {
return executeAdminAction(useMultiplexedSession, action.getAdmin(), outcomeSender);
Expand Down Expand Up @@ -956,11 +1044,15 @@ private Status executeAction(
ErrorCode.UNIMPLEMENTED, "Not implemented yet: \n" + action)));
}
} catch (Exception e) {
span.recordException(e);
LOGGER.log(Level.WARNING, "Unexpected error: " + e.getMessage());
return outcomeSender.finishWithError(
toStatus(
SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT, "Unexpected error: " + e.getMessage())));
} finally {
scope.close();
span.end();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
import com.google.spanner.executor.v1.SpannerOptions;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -40,19 +44,37 @@ public class CloudExecutorImpl extends SpannerExecutorProxyGrpc.SpannerExecutorP
// Ratio of operations to use multiplexed sessions.
private final double multiplexedSessionOperationsRatio;

// Count of checks performed to verify end to end traces using Cloud Trace APIs.
private int cloudTraceCheckCount;

// Maximum checks allowed to verify end to end traces using Cloud Trace APIs.
private static final int MAX_CLOUD_TRACE_CHECK_LIMIT = 20;

public CloudExecutorImpl(
boolean enableGrpcFaultInjector, double multiplexedSessionOperationsRatio) {
clientExecutor = new CloudClientExecutor(enableGrpcFaultInjector);
this.cloudTraceCheckCount = 0;
this.multiplexedSessionOperationsRatio = multiplexedSessionOperationsRatio;
}

/** Execute SpannerAsync action requests. */
@Override
public StreamObserver<SpannerAsyncActionRequest> executeActionAsync(
StreamObserver<SpannerAsyncActionResponse> responseObserver) {
// Create a top-level OpenTelemetry span for streaming request.
Tracer tracer =
WorkerProxy.openTelemetrySdk.getTracer(CloudClientExecutor.class.getName(), "0.1.0");
Span span = tracer.spanBuilder("java_systest_execute_actions_stream").setNoParent().startSpan();
Scope scope = span.makeCurrent();

final String traceId = span.getSpanContext().getTraceId();
final boolean isSampled = span.getSpanContext().getTraceFlags().isSampled();
AtomicBoolean requestHasReadOrQueryAction = new AtomicBoolean(false);

CloudClientExecutor.ExecutionFlowContext executionContext =
clientExecutor.new ExecutionFlowContext(responseObserver);
return new StreamObserver<SpannerAsyncActionRequest>() {

@Override
public void onNext(SpannerAsyncActionRequest request) {
LOGGER.log(Level.INFO, String.format("Receiving request: \n%s", request));
Expand Down Expand Up @@ -86,6 +108,11 @@ public void onNext(SpannerAsyncActionRequest request) {
Level.INFO,
String.format("Updated request to set multiplexed session flag: \n%s", request));
}
String actionName = request.getAction().getActionCase().toString();
if (actionName == "READ" || actionName == "QUERY") {
requestHasReadOrQueryAction.set(true);
}

Status status = clientExecutor.startHandlingRequest(request, executionContext);
if (!status.isOk()) {
LOGGER.log(
Expand All @@ -104,9 +131,26 @@ public void onError(Throwable t) {

@Override
public void onCompleted() {
if (isSampled
&& cloudTraceCheckCount < MAX_CLOUD_TRACE_CHECK_LIMIT
&& requestHasReadOrQueryAction.get()) {
cloudTraceCheckCount++;
if (!clientExecutor.verifyExportedEndToEndTrace(traceId)) {
executionContext.onError(
Status.INTERNAL
.withDescription(
String.format(
"failed to verify end to end trace for trace_id: %s", traceId))
.getCause());
executionContext.cleanup();
return;
}
}
LOGGER.log(Level.INFO, "Client called Done, half closed");
executionContext.cleanup();
responseObserver.onCompleted();
scope.close();
span.end();
}
};
}
Expand Down
Loading