Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add refresh command support #16

Merged
merged 3 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,21 @@ SpiceClient client = SpiceClient.builder()
Retries are performed for connection and system internal errors. It is the SDK user's responsibility to properly
handle other errors, for example RESOURCE_EXHAUSTED (HTTP 429).

### Spice.ai Runtime commands
sgrebnov marked this conversation as resolved.
Show resolved Hide resolved

#### Accelerated dataset refresh

Use `refresh` method to perform [Accelerated Dataset](https://docs.spiceai.org/components/data-accelerators) refresh. See full [dataset refresh example](/src/main/java/ai/spice/example/ExampleDatasetRefreshSpiceOSS.java).

```java
SpiceClient client = SpiceClient.builder()
..
.build();

client.refresh("taxi_trips")

```

## 🤝 Connect with us

Use [issues](https://github.com/spiceai/spice-java/issues), [[email protected]](mailto:[email protected]) or [Discord](https://discord.gg/kZnTfneP5u) to send us feedback, suggestion or if you need help installing or using the library.
30 changes: 30 additions & 0 deletions src/main/java/ai/spice/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,23 @@ public class Config {
public static final String CLOUD_FLIGHT_ADDRESS;
/** Local flight address */
public static final String LOCAL_FLIGHT_ADDRESS;
/** Cloud HTTP address */
public static final String CLOUD_HTTP_ADDRESS;
/** Local HTTP address */
public static final String LOCAL_HTTP_ADDRESS;

static {
CLOUD_FLIGHT_ADDRESS = System.getenv("SPICE_FLIGHT_URL") != null ? System.getenv("SPICE_FLIGHT_URL")
: "https://flight.spiceai.io:443";

LOCAL_FLIGHT_ADDRESS = System.getenv("SPICE_FLIGHT_URL") != null ? System.getenv("SPICE_FLIGHT_URL")
: "http://localhost:50051";

CLOUD_HTTP_ADDRESS = System.getenv("SPICE_HTTP_URL") != null ? System.getenv("SPICE_HTTP_URL")
: "https://data.spiceai.io";

LOCAL_HTTP_ADDRESS = System.getenv("SPICE_HTTP_URL") != null ? System.getenv("SPICE_HTTP_URL")
: "http://localhost:8090";
}

/**
Expand All @@ -62,4 +72,24 @@ public static URI getLocalFlightAddressUri() throws URISyntaxException {
public static URI getCloudFlightAddressUri() throws URISyntaxException {
return new URI(CLOUD_FLIGHT_ADDRESS);
}

/**
* Returns the local HTTP address
*
* @return URI of the local HTTP address.
* @throws URISyntaxException if the string could not be parsed as a URI.
*/
public static URI getLocalHttpAddressUri() throws URISyntaxException {
return new URI(LOCAL_HTTP_ADDRESS);
}

/**
* Returns the cloud HTTP address
*
* @return URI of the cloud HTTP address.
* @throws URISyntaxException if the string could not be parsed as a URI.
*/
public static URI getCloudHttpAddressUri() throws URISyntaxException {
return new URI(CLOUD_HTTP_ADDRESS);
}
}
45 changes: 43 additions & 2 deletions src/main/java/ai/spice/SpiceClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ of this software and associated documentation files (the "Software"), to deal

package ai.spice;

import java.net.ConnectException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.concurrent.ExecutionException;

import org.apache.arrow.flight.CallStatus;
Expand Down Expand Up @@ -57,6 +61,7 @@ public class SpiceClient implements AutoCloseable {
private String appId;
private String apiKey;
private URI flightAddress;
private URI httpAddress;
private int maxRetries;
private FlightSqlClient flightClient;
private CredentialCallOption authCallOptions = null;
Expand All @@ -80,12 +85,15 @@ public static SpiceClientBuilder builder() throws URISyntaxException {
* services
* @param flightAddress the URI of the flight address for connecting to Spice.ai
* services
* @param httpAddress the URI of the Spice.ai runtime HTTP address
*
* @param maxRetries the maximum number of connection retries for the client
*/
public SpiceClient(String appId, String apiKey, URI flightAddress, int maxRetries) {
public SpiceClient(String appId, String apiKey, URI flightAddress, URI httpAddress, int maxRetries) {
this.appId = appId;
this.apiKey = apiKey;
this.maxRetries = maxRetries;
this.httpAddress = httpAddress;

// Arrow Flight requires URI to be grpc protocol, convert http/https for
// convinience
Expand Down Expand Up @@ -129,7 +137,40 @@ public FlightStream query(String sql) throws ExecutionException {
return this.queryInternalWithRetry(sql);
} catch (RetryException e) {
Throwable err = e.getLastFailedAttempt().getExceptionCause();
throw new ExecutionException("Failed to execute query due to error: " + err.getMessage(), err);
throw new ExecutionException("Failed to execute query due to error: " + err.toString(), err);
}
}

public void refresh(String dataset) throws ExecutionException {
if (Strings.isNullOrEmpty(dataset)) {
throw new IllegalArgumentException("No dataset name provided");
}

try {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(new URI(String.format("%s/v1/datasets/%s/acceleration/refresh", this.httpAddress, dataset)))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.noBody())
.build();

HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());

if (response.statusCode() != 201) {
throw new ExecutionException(
String.format("Failed to trigger dataset refresh. Status Code: %d, Response: %s",
response.statusCode(),
response.body()),
null);
}
} catch (ExecutionException e) {
// no need to wrap ExecutionException
throw e;
} catch (ConnectException err) {
throw new ExecutionException(
String.format("The Spice runtime is unavailable at %s. Is it running?", this.httpAddress), err);
} catch (Exception err) {
throw new ExecutionException("Failed to trigger dataset refresh due to error: " + err.toString(), err);
}
}

Expand Down
19 changes: 18 additions & 1 deletion src/main/java/ai/spice/SpiceClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class SpiceClientBuilder {
private String appId;
private String apiKey;
private URI flightAddress;
private URI httpAddress;
private int maxRetries = 3;

/**
Expand All @@ -44,6 +45,7 @@ public class SpiceClientBuilder {
*/
SpiceClientBuilder() throws URISyntaxException {
this.flightAddress = Config.getLocalFlightAddressUri();
this.httpAddress = Config.getLocalHttpAddressUri();
}

/**
Expand All @@ -60,6 +62,20 @@ public SpiceClientBuilder withFlightAddress(URI flightAddress) {
return this;
}

/**
* Sets the client's HTTP address
*
* @param httpAddress The URI of the HTTP address
* @return The current instance of SpiceClientBuilder for method chaining.
*/
public SpiceClientBuilder withHttpAddress(URI httpAddress) {
if (httpAddress == null) {
throw new IllegalArgumentException("httpAddress can't be null");
}
this.httpAddress = httpAddress;
return this;
}

/**
* Sets the client's Api Key.
*
Expand Down Expand Up @@ -90,6 +106,7 @@ public SpiceClientBuilder withApiKey(String apiKey) {
*/
public SpiceClientBuilder withSpiceCloud() throws URISyntaxException {
this.flightAddress = Config.getCloudFlightAddressUri();
this.httpAddress = Config.getCloudHttpAddressUri();
return this;
}

Expand All @@ -113,6 +130,6 @@ public SpiceClientBuilder withMaxRetries(int maxRetries) {
* @return The SpiceClient instance
*/
public SpiceClient build() {
return new SpiceClient(appId, apiKey, flightAddress, maxRetries);
return new SpiceClient(appId, apiKey, flightAddress, httpAddress, maxRetries);
}
}
63 changes: 63 additions & 0 deletions src/main/java/ai/spice/example/ExampleDatasetRefreshSpiceOSS.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Copyright 2024 The Spice.ai OSS Authors

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/

package ai.spice.example;

import java.net.URI;

import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.vector.VectorSchemaRoot;

import ai.spice.SpiceClient;

/**
* Example of using SDK with Spice.ai OSS (Local)
* _JAVA_OPTIONS="--add-opens=java.base/java.nio=ALL-UNNAMED" mvn exec:java -Dexec.mainClass="ai.spice.example.ExampleDatasetRefreshSpiceOSS"
*
* Requires local Spice OSS running. Follow the quickstart
* https://github.com/spiceai/spiceai?tab=readme-ov-file#%EF%B8%8F-quickstart-local-machine.
*/
public class ExampleDatasetRefreshSpiceOSS {

public static void main(String[] args) {
try (SpiceClient client = SpiceClient.builder()
.withFlightAddress(URI.create("http://localhost:50051"))
.withHttpAddress(URI.create("http://localhost:8090"))
.build()) {

client.refresh("taxi_trips");
System.out.println("Dataset refresh triggered for taxi_trips");

System.out.println("Query taxi_trips dataset");
FlightStream stream = client.query("SELECT * FROM taxi_trips LIMIT 1;");

while (stream.next()) {
try (VectorSchemaRoot batches = stream.getRoot()) {
System.out.println(batches.contentToTSVString());
}
}

} catch (Exception e) {
System.err.println("An unexpected error occurred: " + e.getMessage());
}
}
}
23 changes: 20 additions & 3 deletions src/test/java/ai/spice/FlightQueryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ of this software and associated documentation files (the "Software"), to deal

import junit.framework.TestCase;

public class FlightQueryTest
extends TestCase
{
public class FlightQueryTest
extends TestCase {
public void testQuerySpiceCloudPlatform() throws ExecutionException, InterruptedException {
try {
String apiKey = System.getenv("API_KEY");
Expand Down Expand Up @@ -96,4 +95,22 @@ public void testQuerySpiceOSS() throws ExecutionException, InterruptedException
}
}

public void testRefreshSpiceOSS() throws ExecutionException, InterruptedException {
try {
SpiceClient spiceClient = SpiceClient.builder()
.build();

spiceClient.refresh("taxi_trips");

try {
spiceClient.refresh("taxi_trips_does_not_exist");
fail("Should throw exception when unable to refresh dataset");
} catch (Exception e) {
assertTrue("Should correctly pass response message when unable to refresh table",
e.getMessage().contains("\"message\":"));
}
} catch (Exception e) {
fail("Should not throw exception: " + e.getMessage());
}
}
}