Skip to content

Commit

Permalink
Merge branch 'master' into krv_annotation_value_restriction
Browse files Browse the repository at this point in the history
  • Loading branch information
KRVPerera committed Jun 17, 2020
2 parents 41858ae + 007580f commit 3751e75
Show file tree
Hide file tree
Showing 17 changed files with 751 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,8 @@
<Class name="org.ballerinalang.observe.trace.extension.choreo.client.ChoreoClientHolder" />
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE,NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
</Match>
<Match>
<Class name="org.ballerinalang.observe.trace.extension.choreo.client.secret.AnonymousAppSecretHandler" />
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE,NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
</Match>
</FindBugsFilter>
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import io.jaegertracing.internal.Reference;
import io.jaegertracing.spi.Reporter;
import io.opentracing.References;
import org.ballerinalang.jvm.StringUtils;
import org.ballerinalang.jvm.values.api.BValueCreator;
import org.ballerinalang.observe.trace.extension.choreo.client.ChoreoClient;
import org.ballerinalang.observe.trace.extension.choreo.client.ChoreoClientHolder;
import org.ballerinalang.observe.trace.extension.choreo.client.error.ChoreoClientException;
import org.ballerinalang.observe.trace.extension.choreo.logging.LogFactory;
import org.ballerinalang.observe.trace.extension.choreo.logging.Logger;
import org.ballerinalang.observe.trace.extension.choreo.model.ChoreoTraceSpan;
Expand All @@ -49,7 +52,14 @@ public class ChoreoJaegerReporter implements Reporter, AutoCloseable {
private int maxQueueSize;

public ChoreoJaegerReporter(int maxQueueSize) {
ChoreoClient choreoClient = ChoreoClientHolder.getChoreoClient(this);
ChoreoClient choreoClient = null;
try {
choreoClient = ChoreoClientHolder.getChoreoClient(this);
} catch (ChoreoClientException e) {
throw BValueCreator.createErrorValue(
StringUtils.fromString("Choreo client is not initialized. Please check Ballerina configurations."),
e.getMessage());
}
if (Objects.isNull(choreoClient)) {
throw new IllegalStateException("Choreo client is not initialized");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class Constants {

private Constants() {}

public static final String EXTENSION_NAME = "choreo";
public static final String CHOREO_EXTENSION_NAME = "choreo";

public static final String REPORTER_HOST_NAME_CONFIG = "reporter.hostname";
public static final String DEFAULT_REPORTER_HOSTNAME = "periscope.choreo.dev";
Expand All @@ -36,6 +36,6 @@ private Constants() {}
public static final String REPORTER_USE_SSL_CONFIG = "reporter.useSSL";
public static final boolean DEFAULT_REPORTER_USE_SSL = true;

public static final String APPLICATION_ID_CONFIG = "application.id";
public static final String DEFAULT_APPLICATION_ID = "";
public static final String APPLICATION_ID_CONFIG = "application.secret";
public static final String EMPTY_APPLICATION_SECRET = "";
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.ballerinalang.jvm.values.api.BValueCreator;
import org.ballerinalang.observe.trace.extension.choreo.client.ChoreoClient;
import org.ballerinalang.observe.trace.extension.choreo.client.ChoreoClientHolder;
import org.ballerinalang.observe.trace.extension.choreo.client.error.ChoreoClientException;
import org.ballerinalang.observe.trace.extension.choreo.logging.LogFactory;
import org.ballerinalang.observe.trace.extension.choreo.logging.Logger;
import org.ballerinalang.observe.trace.extension.choreo.model.ChoreoMetric;
Expand All @@ -44,7 +45,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static org.ballerinalang.observe.trace.extension.choreo.Constants.EXTENSION_NAME;
import static org.ballerinalang.observe.trace.extension.choreo.Constants.CHOREO_EXTENSION_NAME;

/**
* Ballerina MetricReporter extension for Choreo cloud.
Expand All @@ -67,7 +68,15 @@ public class MetricsReporterExtension implements MetricReporter, AutoCloseable {

@Override
public void init() {
ChoreoClient choreoClient = ChoreoClientHolder.getChoreoClient(this);
ChoreoClient choreoClient = null;
try {
choreoClient = ChoreoClientHolder.getChoreoClient(this);
} catch (ChoreoClientException e) {
throw BValueCreator.createErrorValue(
StringUtils.fromString("Could not initialize the client. Please check Ballerina configurations."),
e.getMessage());
}

if (Objects.isNull(choreoClient)) {
throw BValueCreator.createErrorValue(StringUtils.fromString("Choreo client is not initialized"), null);
}
Expand All @@ -80,7 +89,7 @@ public void init() {

@Override
public String getName() {
return EXTENSION_NAME;
return CHOREO_EXTENSION_NAME;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
import org.ballerinalang.jvm.values.api.BValueCreator;
import org.ballerinalang.observe.trace.extension.choreo.client.ChoreoClient;
import org.ballerinalang.observe.trace.extension.choreo.client.ChoreoClientHolder;
import org.ballerinalang.observe.trace.extension.choreo.client.error.ChoreoClientException;

import java.util.Objects;

import static org.ballerinalang.observe.trace.extension.choreo.Constants.EXTENSION_NAME;
import static org.ballerinalang.observe.trace.extension.choreo.Constants.CHOREO_EXTENSION_NAME;

/**
* This is the open tracing extension class for {@link OpenTracer}.
Expand All @@ -38,7 +39,14 @@ public class OpenTracerExtension implements OpenTracer {

@Override
public void init() {
choreoClient = ChoreoClientHolder.getChoreoClient();
try {
choreoClient = ChoreoClientHolder.getChoreoClient();
} catch (ChoreoClientException e) {
throw BValueCreator.createErrorValue(
StringUtils.fromString("Choreo client is not initialized. Please check Ballerina configurations."),
e.getMessage());
}

if (Objects.isNull(choreoClient)) {
throw BValueCreator.createErrorValue(
StringUtils.fromString("Choreo client is not initialized. Please check Ballerina configurations."),
Expand Down Expand Up @@ -69,7 +77,7 @@ public Tracer getTracer(String tracerName, String serviceName) {

@Override
public String getName() {
return EXTENSION_NAME;
return CHOREO_EXTENSION_NAME;
}

}
Original file line number Diff line number Diff line change
@@ -1,27 +1,33 @@
/*
* Copyright (c) 2020, WSO2 Inc. (http://wso2.com) All Rights Reserved.
* Copyright (c) 2020, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.ballerinalang.observe.trace.extension.choreo.client;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import org.ballerinalang.observe.trace.extension.choreo.client.error.ChoreoClientException;
import org.ballerinalang.observe.trace.extension.choreo.client.error.ChoreoErrors;
import org.ballerinalang.observe.trace.extension.choreo.gen.HandshakeGrpc;
import org.ballerinalang.observe.trace.extension.choreo.gen.NegotiatorOuterClass.HandshakeRequest;
import org.ballerinalang.observe.trace.extension.choreo.gen.NegotiatorOuterClass.HandshakeResponse;
import org.ballerinalang.observe.trace.extension.choreo.gen.NegotiatorOuterClass.PublishProgramRequest;
import org.ballerinalang.observe.trace.extension.choreo.gen.NegotiatorOuterClass;
import org.ballerinalang.observe.trace.extension.choreo.gen.NegotiatorOuterClass.PublishAstRequest;
import org.ballerinalang.observe.trace.extension.choreo.gen.NegotiatorOuterClass.RegisterRequest;
import org.ballerinalang.observe.trace.extension.choreo.gen.TelemetryGrpc;
import org.ballerinalang.observe.trace.extension.choreo.gen.TelemetryOuterClass;
import org.ballerinalang.observe.trace.extension.choreo.logging.LogFactory;
Expand All @@ -42,50 +48,87 @@ public class ChoreoClient implements AutoCloseable {
private static final int SERVER_MAX_FRAME_SIZE_BYTES = 4 * 1024 * 1024 - MESSAGE_SIZE_BUFFER_BYTES;

private String id; // ID received from the handshake
private String instanceId;
private String appId;
private String nodeId;
private String version;
private String projectSecret;

private ManagedChannel channel;
private HandshakeGrpc.HandshakeBlockingStub handshakeClient;
private HandshakeGrpc.HandshakeBlockingStub registrationClient;
private TelemetryGrpc.TelemetryBlockingStub telemetryClient;
private Thread uploadingThread;

public ChoreoClient(String hostname, int port, boolean useSSL) {
public ChoreoClient(String hostname, int port, boolean useSSL, String projectSecret) {
LOGGER.info("initializing connection with observability backend " + hostname + ":" + port);

ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder.forAddress(hostname, port);
if (!useSSL) {
channelBuilder.usePlaintext();
}
channel = channelBuilder.build();
handshakeClient = HandshakeGrpc.newBlockingStub(channel);
registrationClient = HandshakeGrpc.newBlockingStub(channel);
telemetryClient = TelemetryGrpc.newBlockingStub(channel);
this.projectSecret = projectSecret;
}

public String register(final MetadataReader metadataReader, String instanceId, String appId) {
HandshakeRequest handshakeRequest = HandshakeRequest.newBuilder()
.setProgramHash(metadataReader.getAstHash())
.setUserId(instanceId)
.setApplicationId(appId)
.build();
HandshakeResponse handshakeResponse = handshakeClient.handshake(handshakeRequest);
this.id = handshakeResponse.getObservabilityId();
this.appId = appId;
boolean sendProgramJson = handshakeResponse.getSendProgramJson();
public RegisterResponse register(final MetadataReader metadataReader, String nodeId) throws
ChoreoClientException {
RegisterRequest handshakeRequest = RegisterRequest.newBuilder()
.setAstHash(metadataReader.getAstHash())
.setProjectSecret(projectSecret)
.setNodeId(nodeId)
.build();

NegotiatorOuterClass.RegisterResponse registerResponse = null;
try {
registerResponse = registrationClient.register(handshakeRequest);
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.UNAVAILABLE.getCode()) {
throw ChoreoErrors.getUnavailableError();
}

throw e;
}

this.id = registerResponse.getObsId();
this.version = registerResponse.getVersion();
boolean sendProgramJson = registerResponse.getSendAst();

if (sendProgramJson) {
uploadingThread = new Thread(() -> {
PublishProgramRequest programRequest = PublishProgramRequest.newBuilder()
.setProgramJson(metadataReader.getAstData())
.setObservabilityId(id)
.build();
handshakeClient.withCompression("gzip").publishProgram(programRequest);
PublishAstRequest programRequest = PublishAstRequest.newBuilder()
.setAst(metadataReader.getAstData())
.setObsId(id)
.setProjectSecret(projectSecret)
.build();
registrationClient.withCompression("gzip").publishAst(programRequest);
// TODO add debug log to indicate success
}, "AST Uploading Thread");
uploadingThread.start();
}

this.instanceId = instanceId;
return handshakeResponse.getUrl();
this.nodeId = nodeId;
return new RegisterResponse(registerResponse.getObsUrl(), this.id);
}

/**
* Data holder for register response call.
*/
public static class RegisterResponse {
private String obsUrl;
private String obsId;

public RegisterResponse(String obsUrl, String obsId) {
this.obsUrl = obsUrl;
this.obsId = obsId;
}

public String getObsUrl() {
return obsUrl;
}

public String getObsId() {
return obsId;
}
}

public void publishMetrics(ChoreoMetric[] metrics) {
Expand All @@ -96,12 +139,13 @@ public void publishMetrics(ChoreoMetric[] metrics) {
int messageSize = 0;
while (i < metrics.length && messageSize < SERVER_MAX_FRAME_SIZE_BYTES) {
ChoreoMetric metric = metrics[i];
TelemetryOuterClass.Metric metricMessage = TelemetryOuterClass.Metric.newBuilder()
.setTimestamp(metric.getTimestamp())
.setName(metric.getName())
.setValue(metric.getValue())
.putAllTags(metric.getTags())
.build();
TelemetryOuterClass.Metric metricMessage
= TelemetryOuterClass.Metric.newBuilder()
.setTimestamp(metric.getTimestamp())
.setName(metric.getName())
.setValue(metric.getValue())
.putAllTags(metric.getTags())
.build();

int currentMessageSize = metricMessage.getSerializedSize();
if (currentMessageSize >= SERVER_MAX_FRAME_SIZE_BYTES) {
Expand All @@ -117,9 +161,10 @@ public void publishMetrics(ChoreoMetric[] metrics) {
}
}
telemetryClient.publishMetrics(requestBuilder.setObservabilityId(id)
.setInstanceId(instanceId)
.setAppId(appId)
.build());
.setNodeId(nodeId)
.setVersion(version)
.setProjectSecret(projectSecret)
.build());
}
}

Expand All @@ -131,14 +176,15 @@ public void publishTraceSpans(ChoreoTraceSpan[] traceSpans) {
int messageSize = 0;
while (i < traceSpans.length && messageSize < SERVER_MAX_FRAME_SIZE_BYTES) {
ChoreoTraceSpan traceSpan = traceSpans[i];
TelemetryOuterClass.TraceSpan.Builder traceSpanBuilder = TelemetryOuterClass.TraceSpan.newBuilder()
.setTraceId(traceSpan.getTraceId())
.setSpanId(traceSpan.getSpanId())
.setServiceName(traceSpan.getServiceName())
.setOperationName(traceSpan.getOperationName())
.setTimestamp(traceSpan.getTimestamp())
.setDuration(traceSpan.getDuration())
.putAllTags(traceSpan.getTags());
TelemetryOuterClass.TraceSpan.Builder traceSpanBuilder
= TelemetryOuterClass.TraceSpan.newBuilder()
.setTraceId(traceSpan.getTraceId())
.setSpanId(traceSpan.getSpanId())
.setServiceName(traceSpan.getServiceName())
.setOperationName(traceSpan.getOperationName())
.setTimestamp(traceSpan.getTimestamp())
.setDuration(traceSpan.getDuration())
.putAllTags(traceSpan.getTags());
for (ChoreoTraceSpan.Reference reference : traceSpan.getReferences()) {
traceSpanBuilder.addReferences(TelemetryOuterClass.TraceSpanReference.newBuilder()
.setTraceId(reference.getTraceId())
Expand All @@ -163,9 +209,10 @@ public void publishTraceSpans(ChoreoTraceSpan[] traceSpans) {
}
}
telemetryClient.publishTraces(requestBuilder.setObservabilityId(id)
.setInstanceId(instanceId)
.setAppId(appId)
.build());
.setNodeId(nodeId)
.setVersion(version)
.setProjectSecret(projectSecret)
.build());
}
}

Expand Down
Loading

0 comments on commit 3751e75

Please sign in to comment.