Skip to content

Commit

Permalink
Refactor RestClientTransport to allow using other http client librari…
Browse files Browse the repository at this point in the history
…es (#584)
  • Loading branch information
swallez authored Jun 13, 2023
1 parent 38ec037 commit 4c3a13b
Show file tree
Hide file tree
Showing 26 changed files with 2,435 additions and 464 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/checkstyle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
java-version: [ 11 ]
java-version: [ 17 ]
steps:
- uses: actions/checkout@v2

Expand Down
1 change: 1 addition & 0 deletions example-transports/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
This directory contains experimental implementations of the `TransportHttpClient` interface. They are to be used as examples and inspiration and should not be considered production-ready.
55 changes: 55 additions & 0 deletions example-transports/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

plugins {
java
`java-library`
`java-test-fixtures`
}

tasks.withType<Test> {
useJUnitPlatform()
}

java {
targetCompatibility = JavaVersion.VERSION_17
}


dependencies {
val jacksonVersion = "2.13.3"

api("io.netty", "netty-codec-http", "4.1.93.Final")

implementation(project(":java-client"))

// Apache 2.0
// https://github.com/FasterXML/jackson
testImplementation("com.fasterxml.jackson.core", "jackson-core", jacksonVersion)
testImplementation("com.fasterxml.jackson.core", "jackson-databind", jacksonVersion)

// EPL-2.0
// https://junit.org/junit5/
testImplementation("org.junit.jupiter:junit-jupiter-api:5.9.0")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.9.0")

}
repositories {
mavenCentral()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package co.elastic.clients.transport.netty;

import co.elastic.clients.util.BinaryData;
import co.elastic.clients.util.NoCopyByteArrayOutputStream;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;

public class InputStreamBinaryData implements BinaryData {

private final String contentType;
private final InputStream inputStream;
private boolean consumed = false;

public InputStreamBinaryData(String contentType, InputStream inputStream) {
this.contentType = contentType;
this.inputStream = inputStream;
}

@Override
public String contentType() {
return contentType;
}

@Override
public void writeTo(OutputStream out) throws IOException {
consume();
try {
byte[] buffer = new byte[8192];
int len;
while ((len = inputStream.read(buffer)) > 0) {
out.write(buffer, 0, len);
}
} finally {
inputStream.close();
}
}

@Override
public ByteBuffer asByteBuffer() throws IOException {
consume();
NoCopyByteArrayOutputStream baos = new NoCopyByteArrayOutputStream();
writeTo(baos);
return baos.asByteBuffer();
}

@Override
public InputStream asInputStream() throws IOException {
consume();
return inputStream;
}

@Override
public boolean isRepeatable() {
return false;
}

@Override
public long size() {
return -1;
}

private void consume() throws IllegalStateException {
if (consumed) {
throw new IllegalStateException("Data has already been consumed");
}
consumed = true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package co.elastic.clients.transport.netty;

import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransportBase;
import co.elastic.clients.transport.TransportOptions;
import co.elastic.clients.transport.http.TransportHttpClient;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;

public class NettyElasticsearchTransport extends ElasticsearchTransportBase {

public NettyElasticsearchTransport(TransportHttpClient.Node node, TransportOptions options, JsonpMapper jsonpMapper) {
super(new SingleNodeHttpClient(new NettyTransportHttpClient(), node), options, jsonpMapper);
}

public static class SingleNodeHttpClient implements TransportHttpClient {
private final TransportHttpClient client;
private final Node node;

public SingleNodeHttpClient(TransportHttpClient client, Node node) {
this.client = client;
this.node = node;
}

@Override
public TransportOptions createOptions(@Nullable TransportOptions options) {
return client.createOptions(options);
}

@Override
public Response performRequest(
String endpointId, @Nullable Node ignoredNode, Request request, TransportOptions options
) throws IOException {
return client.performRequest(endpointId, node, request, options);
}

@Override
public CompletableFuture<Response> performRequestAsync(
String endpointId, @Nullable Node ignoredNode, Request request, TransportOptions options
) {
return client.performRequestAsync(endpointId, node, request, options);
}

@Override
public void close() throws IOException {
client.close();
}
}
}
Loading

0 comments on commit 4c3a13b

Please sign in to comment.