diff --git a/README.md b/README.md index d0ddf72..f1386db 100644 --- a/README.md +++ b/README.md @@ -120,6 +120,21 @@ SpiceClient client = SpiceClient.builder() Retries are performed for connection and system internal errors. It is the SDK user's responsibility to properly handle other errors, for example RESOURCE_EXHAUSTED (HTTP 429). +### Spice.ai Runtime commands + +#### Accelerated dataset refresh + +Use `refresh` method to perform [Accelerated Dataset](https://docs.spiceai.org/components/data-accelerators) refresh. See full [dataset refresh example](/src/main/java/ai/spice/example/ExampleDatasetRefreshSpiceOSS.java). + +```java +SpiceClient client = SpiceClient.builder() + .. + .build(); + +client.refresh("taxi_trips") + +``` + ## 🤝 Connect with us Use [issues](https://github.com/spiceai/spice-java/issues), [hey@spice.ai](mailto:hey@spice.ai) or [Discord](https://discord.gg/kZnTfneP5u) to send us feedback, suggestion or if you need help installing or using the library. diff --git a/src/main/java/ai/spice/Config.java b/src/main/java/ai/spice/Config.java index db50ada..fc24d85 100644 --- a/src/main/java/ai/spice/Config.java +++ b/src/main/java/ai/spice/Config.java @@ -34,6 +34,10 @@ public class Config { public static final String CLOUD_FLIGHT_ADDRESS; /** Local flight address */ public static final String LOCAL_FLIGHT_ADDRESS; + /** Cloud HTTP address */ + public static final String CLOUD_HTTP_ADDRESS; + /** Local HTTP address */ + public static final String LOCAL_HTTP_ADDRESS; static { CLOUD_FLIGHT_ADDRESS = System.getenv("SPICE_FLIGHT_URL") != null ? System.getenv("SPICE_FLIGHT_URL") @@ -41,6 +45,12 @@ public class Config { LOCAL_FLIGHT_ADDRESS = System.getenv("SPICE_FLIGHT_URL") != null ? System.getenv("SPICE_FLIGHT_URL") : "http://localhost:50051"; + + CLOUD_HTTP_ADDRESS = System.getenv("SPICE_HTTP_URL") != null ? System.getenv("SPICE_HTTP_URL") + : "https://data.spiceai.io"; + + LOCAL_HTTP_ADDRESS = System.getenv("SPICE_HTTP_URL") != null ? System.getenv("SPICE_HTTP_URL") + : "http://localhost:8090"; } /** @@ -62,4 +72,24 @@ public static URI getLocalFlightAddressUri() throws URISyntaxException { public static URI getCloudFlightAddressUri() throws URISyntaxException { return new URI(CLOUD_FLIGHT_ADDRESS); } + + /** + * Returns the local HTTP address + * + * @return URI of the local HTTP address. + * @throws URISyntaxException if the string could not be parsed as a URI. + */ + public static URI getLocalHttpAddressUri() throws URISyntaxException { + return new URI(LOCAL_HTTP_ADDRESS); + } + + /** + * Returns the cloud HTTP address + * + * @return URI of the cloud HTTP address. + * @throws URISyntaxException if the string could not be parsed as a URI. + */ + public static URI getCloudHttpAddressUri() throws URISyntaxException { + return new URI(CLOUD_HTTP_ADDRESS); + } } \ No newline at end of file diff --git a/src/main/java/ai/spice/SpiceClient.java b/src/main/java/ai/spice/SpiceClient.java index c094a07..07842d0 100644 --- a/src/main/java/ai/spice/SpiceClient.java +++ b/src/main/java/ai/spice/SpiceClient.java @@ -22,8 +22,12 @@ of this software and associated documentation files (the "Software"), to deal package ai.spice; +import java.net.ConnectException; import java.net.URI; import java.net.URISyntaxException; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; import java.util.concurrent.ExecutionException; import org.apache.arrow.flight.CallStatus; @@ -57,6 +61,7 @@ public class SpiceClient implements AutoCloseable { private String appId; private String apiKey; private URI flightAddress; + private URI httpAddress; private int maxRetries; private FlightSqlClient flightClient; private CredentialCallOption authCallOptions = null; @@ -80,12 +85,15 @@ public static SpiceClientBuilder builder() throws URISyntaxException { * services * @param flightAddress the URI of the flight address for connecting to Spice.ai * services + * @param httpAddress the URI of the Spice.ai runtime HTTP address + * * @param maxRetries the maximum number of connection retries for the client */ - public SpiceClient(String appId, String apiKey, URI flightAddress, int maxRetries) { + public SpiceClient(String appId, String apiKey, URI flightAddress, URI httpAddress, int maxRetries) { this.appId = appId; this.apiKey = apiKey; this.maxRetries = maxRetries; + this.httpAddress = httpAddress; // Arrow Flight requires URI to be grpc protocol, convert http/https for // convinience @@ -129,7 +137,40 @@ public FlightStream query(String sql) throws ExecutionException { return this.queryInternalWithRetry(sql); } catch (RetryException e) { Throwable err = e.getLastFailedAttempt().getExceptionCause(); - throw new ExecutionException("Failed to execute query due to error: " + err.getMessage(), err); + throw new ExecutionException("Failed to execute query due to error: " + err.toString(), err); + } + } + + public void refresh(String dataset) throws ExecutionException { + if (Strings.isNullOrEmpty(dataset)) { + throw new IllegalArgumentException("No dataset name provided"); + } + + try { + HttpClient client = HttpClient.newHttpClient(); + HttpRequest request = HttpRequest.newBuilder() + .uri(new URI(String.format("%s/v1/datasets/%s/acceleration/refresh", this.httpAddress, dataset))) + .header("Content-Type", "application/json") + .POST(HttpRequest.BodyPublishers.noBody()) + .build(); + + HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString()); + + if (response.statusCode() != 201) { + throw new ExecutionException( + String.format("Failed to trigger dataset refresh. Status Code: %d, Response: %s", + response.statusCode(), + response.body()), + null); + } + } catch (ExecutionException e) { + // no need to wrap ExecutionException + throw e; + } catch (ConnectException err) { + throw new ExecutionException( + String.format("The Spice runtime is unavailable at %s. Is it running?", this.httpAddress), err); + } catch (Exception err) { + throw new ExecutionException("Failed to trigger dataset refresh due to error: " + err.toString(), err); } } diff --git a/src/main/java/ai/spice/SpiceClientBuilder.java b/src/main/java/ai/spice/SpiceClientBuilder.java index 25b7ecc..08209ac 100644 --- a/src/main/java/ai/spice/SpiceClientBuilder.java +++ b/src/main/java/ai/spice/SpiceClientBuilder.java @@ -35,6 +35,7 @@ public class SpiceClientBuilder { private String appId; private String apiKey; private URI flightAddress; + private URI httpAddress; private int maxRetries = 3; /** @@ -44,6 +45,7 @@ public class SpiceClientBuilder { */ SpiceClientBuilder() throws URISyntaxException { this.flightAddress = Config.getLocalFlightAddressUri(); + this.httpAddress = Config.getLocalHttpAddressUri(); } /** @@ -60,6 +62,20 @@ public SpiceClientBuilder withFlightAddress(URI flightAddress) { return this; } + /** + * Sets the client's HTTP address + * + * @param httpAddress The URI of the HTTP address + * @return The current instance of SpiceClientBuilder for method chaining. + */ + public SpiceClientBuilder withHttpAddress(URI httpAddress) { + if (httpAddress == null) { + throw new IllegalArgumentException("httpAddress can't be null"); + } + this.httpAddress = httpAddress; + return this; + } + /** * Sets the client's Api Key. * @@ -90,6 +106,7 @@ public SpiceClientBuilder withApiKey(String apiKey) { */ public SpiceClientBuilder withSpiceCloud() throws URISyntaxException { this.flightAddress = Config.getCloudFlightAddressUri(); + this.httpAddress = Config.getCloudHttpAddressUri(); return this; } @@ -113,6 +130,6 @@ public SpiceClientBuilder withMaxRetries(int maxRetries) { * @return The SpiceClient instance */ public SpiceClient build() { - return new SpiceClient(appId, apiKey, flightAddress, maxRetries); + return new SpiceClient(appId, apiKey, flightAddress, httpAddress, maxRetries); } } \ No newline at end of file diff --git a/src/main/java/ai/spice/example/ExampleDatasetRefreshSpiceOSS.java b/src/main/java/ai/spice/example/ExampleDatasetRefreshSpiceOSS.java new file mode 100644 index 0000000..ae40dfc --- /dev/null +++ b/src/main/java/ai/spice/example/ExampleDatasetRefreshSpiceOSS.java @@ -0,0 +1,63 @@ +/* +Copyright 2024 The Spice.ai OSS Authors + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +package ai.spice.example; + +import java.net.URI; + +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.vector.VectorSchemaRoot; + +import ai.spice.SpiceClient; + +/** + * Example of using SDK with Spice.ai OSS (Local) + * _JAVA_OPTIONS="--add-opens=java.base/java.nio=ALL-UNNAMED" mvn exec:java -Dexec.mainClass="ai.spice.example.ExampleDatasetRefreshSpiceOSS" + * + * Requires local Spice OSS running. Follow the quickstart + * https://github.com/spiceai/spiceai?tab=readme-ov-file#%EF%B8%8F-quickstart-local-machine. + */ +public class ExampleDatasetRefreshSpiceOSS { + + public static void main(String[] args) { + try (SpiceClient client = SpiceClient.builder() + .withFlightAddress(URI.create("http://localhost:50051")) + .withHttpAddress(URI.create("http://localhost:8090")) + .build()) { + + client.refresh("taxi_trips"); + System.out.println("Dataset refresh triggered for taxi_trips"); + + System.out.println("Query taxi_trips dataset"); + FlightStream stream = client.query("SELECT * FROM taxi_trips LIMIT 1;"); + + while (stream.next()) { + try (VectorSchemaRoot batches = stream.getRoot()) { + System.out.println(batches.contentToTSVString()); + } + } + + } catch (Exception e) { + System.err.println("An unexpected error occurred: " + e.getMessage()); + } + } +} diff --git a/src/test/java/ai/spice/FlightQueryTest.java b/src/test/java/ai/spice/FlightQueryTest.java index 6749fc8..1fcdec2 100644 --- a/src/test/java/ai/spice/FlightQueryTest.java +++ b/src/test/java/ai/spice/FlightQueryTest.java @@ -31,9 +31,8 @@ of this software and associated documentation files (the "Software"), to deal import junit.framework.TestCase; -public class FlightQueryTest - extends TestCase -{ +public class FlightQueryTest + extends TestCase { public void testQuerySpiceCloudPlatform() throws ExecutionException, InterruptedException { try { String apiKey = System.getenv("API_KEY"); @@ -96,4 +95,22 @@ public void testQuerySpiceOSS() throws ExecutionException, InterruptedException } } + public void testRefreshSpiceOSS() throws ExecutionException, InterruptedException { + try { + SpiceClient spiceClient = SpiceClient.builder() + .build(); + + spiceClient.refresh("taxi_trips"); + + try { + spiceClient.refresh("taxi_trips_does_not_exist"); + fail("Should throw exception when unable to refresh dataset"); + } catch (Exception e) { + assertTrue("Should correctly pass response message when unable to refresh table", + e.getMessage().contains("\"message\":")); + } + } catch (Exception e) { + fail("Should not throw exception: " + e.getMessage()); + } + } }