Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature/multi_tenancy] Implement client.bulk in SDK Client #3192

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.ml.common.transport.undeploy;

import lombok.Getter;
import lombok.Setter;

import org.opensearch.action.support.nodes.BaseNodesRequest;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.core.common.io.stream.StreamInput;
Expand All @@ -17,10 +19,15 @@ public class MLUndeployModelNodesRequest extends BaseNodesRequest<MLUndeployMode

@Getter
private String[] modelIds;
@Getter
@Setter
private String tenantId;

public MLUndeployModelNodesRequest(StreamInput in) throws IOException {
super(in);
this.modelIds = in.readOptionalStringArray();
// TODO: will do bwc check later.
this.tenantId = in.readOptionalString();
}

public MLUndeployModelNodesRequest(String[] nodeIds, String[] modelIds) {
Expand All @@ -36,6 +43,7 @@ public MLUndeployModelNodesRequest(DiscoveryNode... nodes) {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalStringArray(modelIds);
out.writeOptionalString(tenantId);
}

}
23 changes: 23 additions & 0 deletions common/src/main/java/org/opensearch/sdk/AbstractSdkClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sdk;

import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;

public abstract class AbstractSdkClient implements SdkClientDelegate {

@SuppressWarnings({ "deprecation", "removal" })
protected <T> CompletionStage<T> executePrivilegedAsync(PrivilegedAction<T> action, Executor executor) {
return CompletableFuture.supplyAsync(() -> AccessController.doPrivileged(action), executor);
}
}
132 changes: 132 additions & 0 deletions common/src/main/java/org/opensearch/sdk/BulkDataObjectRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sdk;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.opensearch.action.support.WriteRequest.RefreshPolicy;
import org.opensearch.common.Nullable;
import org.opensearch.core.common.Strings;

public class BulkDataObjectRequest {

private final List<DataObjectRequest> requests = new ArrayList<>();
private final Set<String> indices = new HashSet<>();
private RefreshPolicy refreshPolicy = RefreshPolicy.NONE;
private String globalIndex;

/**
* Instantiate this request with a global index.
* <p>
* For data storage implementations other than OpenSearch, an index may be referred to as a table and the id may be referred to as a primary key.
* @param globalIndex the index location for all the bulk requests as a default if not already specified
*/
public BulkDataObjectRequest(@Nullable String globalIndex) {
this.globalIndex = globalIndex;
}

/**
* Returns the list of requests in this bulk request.
* @return the requests list
*/
public List<DataObjectRequest> requests() {
return List.copyOf(this.requests);
}

/**
* Returns the indices being updated in this bulk request.
* @return the indices being updated
*/
public Set<String> getIndices() {
return Collections.unmodifiableSet(indices);
}

/**
* Add the given request to the {@link BulkDataObjectRequest}
* @param request The request to add
* @return the updated request object
*/
public BulkDataObjectRequest add(DataObjectRequest request) {
if (!request.isWriteRequest()) {
throw new IllegalArgumentException("No support for request [" + request.getClass().getName() + "]");
}
if (Strings.isNullOrEmpty(request.index())) {
if (Strings.isNullOrEmpty(globalIndex)) {
throw new IllegalArgumentException(
"Either the request [" + request.getClass().getName() + "] or the bulk request must specify an index."
);
}
indices.add(globalIndex);
request.index(globalIndex);
} else {
indices.add(request.index());
}
requests.add(request);
return this;
}

/**
* Should this request trigger a refresh ({@linkplain RefreshPolicy#IMMEDIATE}), wait for a refresh (
* {@linkplain RefreshPolicy#WAIT_UNTIL}), or proceed ignore refreshes entirely ({@linkplain RefreshPolicy#NONE}, the default).
*/
public BulkDataObjectRequest setRefreshPolicy(RefreshPolicy refreshPolicy) {
this.refreshPolicy = refreshPolicy;
return this;
}

/**
* Should this request trigger a refresh ({@linkplain RefreshPolicy#IMMEDIATE}), wait for a refresh (
* {@linkplain RefreshPolicy#WAIT_UNTIL}), or proceed ignore refreshes entirely ({@linkplain RefreshPolicy#NONE}, the default).
*/
public RefreshPolicy getRefreshPolicy() {
return refreshPolicy;
}

/**
* Instantiate a builder for this object
* @return a builder instance
*/
public static Builder builder() {
return new Builder();
}

/**
* Class for constructing a Builder for this Request Object
*/
public static class Builder {
private String globalIndex = null;

/**
* Empty constructor to initialize
*/
protected Builder() {}

/**
* Add an index to this builder
* @param index the index to put the object
* @return the updated builder
*/
public Builder globalIndex(String index) {
this.globalIndex = index;
return this;
}

/**
* Builds the request
* @return A {@link BulkDataObjectRequest}
*/
public BulkDataObjectRequest build() {
return new BulkDataObjectRequest(this.globalIndex);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sdk;

import java.util.Arrays;

import org.opensearch.core.xcontent.XContentParser;

import static org.opensearch.action.bulk.BulkResponse.NO_INGEST_TOOK;

public class BulkDataObjectResponse {

private final DataObjectResponse[] responses;
private final long tookInMillis;
private final long ingestTookInMillis;
private final boolean failures;
private final XContentParser parser;

public BulkDataObjectResponse(DataObjectResponse[] responses, long tookInMillis, boolean failures, XContentParser parser) {
this(responses, tookInMillis, NO_INGEST_TOOK, failures, parser);
}

public BulkDataObjectResponse(DataObjectResponse[] responses, long tookInMillis, long ingestTookInMillis, boolean failures, XContentParser parser) {
this.responses = responses;
this.tookInMillis = tookInMillis;
this.ingestTookInMillis = ingestTookInMillis;
this.failures = failures;
this.parser = parser;
}

/**
* The items representing each action performed in the bulk operation (in the same order!).
* @return the responses in the same order requested
*/
public DataObjectResponse[] getResponses() {
return responses;
}

/**
* How long the bulk execution took. Excluding ingest preprocessing.
* @return the execution time in milliseconds
*/
public long getTookInMillis() {
return tookInMillis;
}

/**
* If ingest is enabled returns the bulk ingest preprocessing time. in milliseconds, otherwise -1 is returned.
* @return the ingest execution time in milliseconds
*/
public long getIngestTookInMillis() {
return ingestTookInMillis;
}

/**
* Has anything failed with the execution.
* @return true if any response failed, false otherwise
*/
public boolean hasFailures() {
return this.failures;
}

/**
* Returns the parser
* @return the parser
*/
public XContentParser parser() {
return this.parser;
}
}
128 changes: 128 additions & 0 deletions common/src/main/java/org/opensearch/sdk/DataObjectRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sdk;

public abstract class DataObjectRequest {

private String index;
private final String id;
private String tenantId;

/**
* Instantiate this request with an index and id.
* <p>
* For data storage implementations other than OpenSearch, an index may be referred to as a table and the id may be referred to as a primary key.
* @param index the index location to delete the object
* @param id the document id
* @param tenantId the tenant id
*/
protected DataObjectRequest(String index, String id, String tenantId) {
this.index = index;
this.id = id;
this.tenantId = tenantId;
}

/**
* Returns the index
* @return the index
*/
public String index() {
return this.index;
}

/**
* Sets the index
* @param index The new index to set
*/
public void index(String index) {
this.index = index;
}

/**
* Returns the document id
* @return the id
*/
public String id() {
return this.id;
}

/**
* Returns the tenant id
* @return the tenantId
*/
public String tenantId() {
return this.tenantId;
}

/**
* Sets the tenant id
* @param tenantId The new tenant id to set
*/
public void tenantId(String tenantId) {
this.tenantId = tenantId;
}

/**
* Returns whether the subclass can be used in a {@link BulkDataObjectRequest}
* @return whether the subclass is a write request
*/
public abstract boolean isWriteRequest();

/**
* Superclass for common fields in subclass builders
*/
public static class Builder<T extends Builder<T>> {
protected String index = null;
protected String id = null;
protected String tenantId = null;

/**
* Empty constructor to initialize
*/
protected Builder() {}

/**
* Add an index to this builder
* @param index the index to put the object
* @return the updated builder
*/
public T index(String index) {
this.index = index;
return self();
}

/**
* Add an id to this builder
* @param id the document id
* @return the updated builder
*/
public T id(String id) {
this.id = id;
return self();
}

/**
* Add a tenant id to this builder
* @param tenantId the tenant id
* @return the updated builder
*/
public T tenantId(String tenantId) {
this.tenantId = tenantId;
return self();
}

/**
* Returns this builder as the parameterized type.
*/
@SuppressWarnings("unchecked")
protected T self() {
return (T) this;
}
}
}
Loading
Loading