Skip to content

Commit

Permalink
Sanitize cql in Apache Camel instrumentation (#3717)
Browse files Browse the repository at this point in the history
* sanitize cassandra

* use SemanticAttributes
  • Loading branch information
anosek-an authored Jul 29, 2021
1 parent e5ddbfe commit 117891a
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,28 @@ import spock.lang.Unroll

class SanitizationTest extends Specification {

@Unroll
def "sanitize cql #originalCql"() {

setup:
def decorator = new DbSpanDecorator("cql", "")
def exchange = Mock(Exchange) {
getIn() >> Mock(Message) {
getHeader("CamelCqlQuery") >> originalCql
}
}
def actualSanitized = decorator.getStatement(exchange, null)

expect:
actualSanitized == sanitizedCql

where:
originalCql | sanitizedCql
"FROM TABLE WHERE FIELD>=-1234" | "FROM TABLE WHERE FIELD>=?"
"SELECT Name, Phone.Number FROM Contact WHERE Address.State = 'NY'" | "SELECT Name, Phone.Number FROM Contact WHERE Address.State = ?"
"FROM col WHERE @Tag='Something'" | "FROM col WHERE @Tag=?"
}

@Unroll
def "sanitize jdbc #originalSql"() {

Expand Down
2 changes: 2 additions & 0 deletions instrumentation/apache-camel-2.20/javaagent/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies {
testLibrary("org.apache.camel:camel-jaxb-starter:$camelversion")
testLibrary("org.apache.camel:camel-undertow:$camelversion")
testLibrary("org.apache.camel:camel-aws:$camelversion")
testLibrary("org.apache.camel:camel-cassandraql:$camelversion")

testImplementation("org.springframework.boot:spring-boot-starter-test:1.5.17.RELEASE")
testImplementation("org.springframework.boot:spring-boot-starter:1.5.17.RELEASE")
Expand All @@ -43,6 +44,7 @@ dependencies {
testImplementation("org.elasticmq:elasticmq-rest-sqs_2.12:1.0.0")

testImplementation("org.testcontainers:localstack:${versions["org.testcontainers"]}")
testImplementation("org.testcontainers:cassandra:${versions["org.testcontainers"]}")

latestDepTestLibrary("org.apache.camel:camel-core:2.+")
latestDepTestLibrary("org.apache.camel:camel-spring-boot-starter:2.+")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,11 @@ public String getOperationName(

// visible for testing
String getStatement(Exchange exchange, Endpoint endpoint) {
// TODO: sanitize cql
switch (component) {
case "cql":
Object cqlObj = exchange.getIn().getHeader("CamelCqlQuery");
if (cqlObj != null) {
return cqlObj.toString();
}
Map<String, String> cqlParameters = toQueryParameters(endpoint.getEndpointUri());
if (cqlParameters.containsKey("cql")) {
return cqlParameters.get("cql");
return SqlStatementSanitizer.sanitize(cqlObj.toString()).getFullStatement();
}
return null;
case "jdbc":
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.apachecamel.decorators

import org.apache.camel.builder.RouteBuilder
import org.springframework.boot.SpringBootConfiguration
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.context.annotation.Bean

@SpringBootConfiguration
@EnableAutoConfiguration
class CassandraConfig {

@Bean
RouteBuilder serviceRoute() {
return new RouteBuilder() {

@Override
void configure() throws Exception {
from("direct:input")
.setHeader("CamelCqlQuery", simple("select * from test.users where id=1 ALLOW FILTERING"))
.toD("cql://{{cassandra.host}}:{{cassandra.port}}/test")
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.apachecamel.decorators

import com.datastax.driver.core.Cluster
import com.datastax.driver.core.Session
import org.apache.camel.CamelContext
import org.apache.camel.ProducerTemplate
import org.testcontainers.containers.CassandraContainer
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.test.RetryOnAddressAlreadyInUseTrait
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.springframework.boot.SpringApplication
import org.springframework.context.ConfigurableApplicationContext
import org.testcontainers.containers.GenericContainer
import spock.lang.Shared

import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.INTERNAL

class CassandraTest extends AgentInstrumentationSpecification implements RetryOnAddressAlreadyInUseTrait {

@Shared
ConfigurableApplicationContext server
@Shared
GenericContainer cassandra
@Shared
Cluster cluster
@Shared
String host
@Shared
int port

Session session

def setupSpec() {
withRetryOnAddressAlreadyInUse({
setupSpecUnderRetry()
})
}

def setupSpecUnderRetry() {
cassandra = new CassandraContainer()
cassandra.withExposedPorts(9042)
cassandra.start()

port = cassandra.getFirstMappedPort()
host = cassandra.getHost()

cluster = cassandra.getCluster()

def app = new SpringApplication(CassandraConfig)
app.setDefaultProperties(["cassandra.host": host, "cassandra.port": port])
server = app.run()
}

def cleanupSpec() {
server?.close()
cluster?.close()
cassandra.stop()
}

def setup() {
session = cluster.connect()

session.execute("CREATE KEYSPACE IF NOT EXISTS test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':1};")
session.execute("CREATE TABLE IF NOT EXISTS test.users ( id int primary key, name text );")
session.execute("INSERT INTO test.users (id,name) VALUES (1, 'user1') IF NOT EXISTS;")
session.execute("INSERT INTO test.users (id,name) VALUES (2, 'user2') IF NOT EXISTS;")
}

def cleanup() {
session?.close()
}

def "test cassandra "() {

setup:
def camelContext = server.getBean(CamelContext)
ProducerTemplate template = camelContext.createProducerTemplate()

when:
def response = template.requestBody("direct:input", null)

then:
response.first().getString("name") == "user1"

assertTraces(1) {
trace(0, 2) {
span(0) {
kind INTERNAL
hasNoParent()
attributes {
"apache-camel.uri" "direct://input"
}
}
span(1){
kind CLIENT
attributes {
"apache-camel.uri" "cql://$host:$port/test"
"$SemanticAttributes.DB_NAME.key" "test"
"$SemanticAttributes.DB_STATEMENT.key" "select * from test.users where id=? ALLOW FILTERING"
"$SemanticAttributes.DB_SYSTEM.key" "cassandra"
}
}
}
}

}

}

0 comments on commit 117891a

Please sign in to comment.