Skip to content

Commit

Permalink
[Feature/multi_tenancy] Implement client.bulk in SDK Client (#3192)
Browse files Browse the repository at this point in the history
* Create DataObjectRequest superclass

Signed-off-by: Daniel Widdis <[email protected]>

* Add BulkDataObjectRequest and BulkDataObjectResponse

Signed-off-by: Daniel Widdis <[email protected]>

* Integrate with SdkClient and stub implementations

Signed-off-by: Daniel Widdis <[email protected]>

* Client implementations and tests for bulkDataObjectAsync

Signed-off-by: Daniel Widdis <[email protected]>

* Fix javadocs, refactor to hide default ingestTook value implementation details

Signed-off-by: Daniel Widdis <[email protected]>

* Add more parameters to DataObject requests and responses

Signed-off-by: Daniel Widdis <[email protected]>

* Implement client.bulk in Undeploy actions

Signed-off-by: Daniel Widdis <[email protected]>

* Pass tenantId all the way from Rest Undeploy to Transport Undeploy

Signed-off-by: Daniel Widdis <[email protected]>

* Remove globalTenantId and add logic to validate all bulk requests

Signed-off-by: Daniel Widdis <[email protected]>

* Refactor RemoteClusterIndicesClient bulkDataObjectAsync

Signed-off-by: Daniel Widdis <[email protected]>

* Refactor privileged execution call to a superclass

Signed-off-by: Daniel Widdis <[email protected]>

* Refactor LocalClusterIndicesClient for DRY

Signed-off-by: Daniel Widdis <[email protected]>

* Fix MLSyncUpCronTests to account for asynchronous delay

Signed-off-by: Daniel Widdis <[email protected]>

* Add test coverage for new abstract superclass

Signed-off-by: Daniel Widdis <[email protected]>

* Need more delay on GHA tests due to more thread contention

Signed-off-by: Daniel Widdis <[email protected]>

---------

Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis authored Nov 21, 2024
1 parent 72ec80e commit 3bbc700
Show file tree
Hide file tree
Showing 38 changed files with 2,399 additions and 877 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.ml.common.transport.undeploy;

import lombok.Getter;
import lombok.Setter;

import org.opensearch.action.support.nodes.BaseNodesRequest;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.core.common.io.stream.StreamInput;
Expand All @@ -17,10 +19,15 @@ public class MLUndeployModelNodesRequest extends BaseNodesRequest<MLUndeployMode

@Getter
private String[] modelIds;
@Getter
@Setter
private String tenantId;

public MLUndeployModelNodesRequest(StreamInput in) throws IOException {
super(in);
this.modelIds = in.readOptionalStringArray();
// TODO: will do bwc check later.
this.tenantId = in.readOptionalString();
}

public MLUndeployModelNodesRequest(String[] nodeIds, String[] modelIds) {
Expand All @@ -36,6 +43,7 @@ public MLUndeployModelNodesRequest(DiscoveryNode... nodes) {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalStringArray(modelIds);
out.writeOptionalString(tenantId);
}

}
23 changes: 23 additions & 0 deletions common/src/main/java/org/opensearch/sdk/AbstractSdkClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sdk;

import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;

public abstract class AbstractSdkClient implements SdkClientDelegate {

@SuppressWarnings({ "deprecation", "removal" })
protected <T> CompletionStage<T> executePrivilegedAsync(PrivilegedAction<T> action, Executor executor) {
return CompletableFuture.supplyAsync(() -> AccessController.doPrivileged(action), executor);
}
}
132 changes: 132 additions & 0 deletions common/src/main/java/org/opensearch/sdk/BulkDataObjectRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sdk;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.opensearch.action.support.WriteRequest.RefreshPolicy;
import org.opensearch.common.Nullable;
import org.opensearch.core.common.Strings;

public class BulkDataObjectRequest {

private final List<DataObjectRequest> requests = new ArrayList<>();
private final Set<String> indices = new HashSet<>();
private RefreshPolicy refreshPolicy = RefreshPolicy.NONE;
private String globalIndex;

/**
* Instantiate this request with a global index.
* <p>
* For data storage implementations other than OpenSearch, an index may be referred to as a table and the id may be referred to as a primary key.
* @param globalIndex the index location for all the bulk requests as a default if not already specified
*/
public BulkDataObjectRequest(@Nullable String globalIndex) {
this.globalIndex = globalIndex;
}

/**
* Returns the list of requests in this bulk request.
* @return the requests list
*/
public List<DataObjectRequest> requests() {
return List.copyOf(this.requests);
}

/**
* Returns the indices being updated in this bulk request.
* @return the indices being updated
*/
public Set<String> getIndices() {
return Collections.unmodifiableSet(indices);
}

/**
* Add the given request to the {@link BulkDataObjectRequest}
* @param request The request to add
* @return the updated request object
*/
public BulkDataObjectRequest add(DataObjectRequest request) {
if (!request.isWriteRequest()) {
throw new IllegalArgumentException("No support for request [" + request.getClass().getName() + "]");
}
if (Strings.isNullOrEmpty(request.index())) {
if (Strings.isNullOrEmpty(globalIndex)) {
throw new IllegalArgumentException(
"Either the request [" + request.getClass().getName() + "] or the bulk request must specify an index."
);
}
indices.add(globalIndex);
request.index(globalIndex);
} else {
indices.add(request.index());
}
requests.add(request);
return this;
}

/**
* Should this request trigger a refresh ({@linkplain RefreshPolicy#IMMEDIATE}), wait for a refresh (
* {@linkplain RefreshPolicy#WAIT_UNTIL}), or proceed ignore refreshes entirely ({@linkplain RefreshPolicy#NONE}, the default).
*/
public BulkDataObjectRequest setRefreshPolicy(RefreshPolicy refreshPolicy) {
this.refreshPolicy = refreshPolicy;
return this;
}

/**
* Should this request trigger a refresh ({@linkplain RefreshPolicy#IMMEDIATE}), wait for a refresh (
* {@linkplain RefreshPolicy#WAIT_UNTIL}), or proceed ignore refreshes entirely ({@linkplain RefreshPolicy#NONE}, the default).
*/
public RefreshPolicy getRefreshPolicy() {
return refreshPolicy;
}

/**
* Instantiate a builder for this object
* @return a builder instance
*/
public static Builder builder() {
return new Builder();
}

/**
* Class for constructing a Builder for this Request Object
*/
public static class Builder {
private String globalIndex = null;

/**
* Empty constructor to initialize
*/
protected Builder() {}

/**
* Add an index to this builder
* @param index the index to put the object
* @return the updated builder
*/
public Builder globalIndex(String index) {
this.globalIndex = index;
return this;
}

/**
* Builds the request
* @return A {@link BulkDataObjectRequest}
*/
public BulkDataObjectRequest build() {
return new BulkDataObjectRequest(this.globalIndex);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sdk;

import java.util.Arrays;

import org.opensearch.core.xcontent.XContentParser;

import static org.opensearch.action.bulk.BulkResponse.NO_INGEST_TOOK;

public class BulkDataObjectResponse {

private final DataObjectResponse[] responses;
private final long tookInMillis;
private final long ingestTookInMillis;
private final boolean failures;
private final XContentParser parser;

public BulkDataObjectResponse(DataObjectResponse[] responses, long tookInMillis, boolean failures, XContentParser parser) {
this(responses, tookInMillis, NO_INGEST_TOOK, failures, parser);
}

public BulkDataObjectResponse(DataObjectResponse[] responses, long tookInMillis, long ingestTookInMillis, boolean failures, XContentParser parser) {
this.responses = responses;
this.tookInMillis = tookInMillis;
this.ingestTookInMillis = ingestTookInMillis;
this.failures = failures;
this.parser = parser;
}

/**
* The items representing each action performed in the bulk operation (in the same order!).
* @return the responses in the same order requested
*/
public DataObjectResponse[] getResponses() {
return responses;
}

/**
* How long the bulk execution took. Excluding ingest preprocessing.
* @return the execution time in milliseconds
*/
public long getTookInMillis() {
return tookInMillis;
}

/**
* If ingest is enabled returns the bulk ingest preprocessing time. in milliseconds, otherwise -1 is returned.
* @return the ingest execution time in milliseconds
*/
public long getIngestTookInMillis() {
return ingestTookInMillis;
}

/**
* Has anything failed with the execution.
* @return true if any response failed, false otherwise
*/
public boolean hasFailures() {
return this.failures;
}

/**
* Returns the parser
* @return the parser
*/
public XContentParser parser() {
return this.parser;
}
}
128 changes: 128 additions & 0 deletions common/src/main/java/org/opensearch/sdk/DataObjectRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sdk;

public abstract class DataObjectRequest {

private String index;
private final String id;
private String tenantId;

/**
* Instantiate this request with an index and id.
* <p>
* For data storage implementations other than OpenSearch, an index may be referred to as a table and the id may be referred to as a primary key.
* @param index the index location to delete the object
* @param id the document id
* @param tenantId the tenant id
*/
protected DataObjectRequest(String index, String id, String tenantId) {
this.index = index;
this.id = id;
this.tenantId = tenantId;
}

/**
* Returns the index
* @return the index
*/
public String index() {
return this.index;
}

/**
* Sets the index
* @param index The new index to set
*/
public void index(String index) {
this.index = index;
}

/**
* Returns the document id
* @return the id
*/
public String id() {
return this.id;
}

/**
* Returns the tenant id
* @return the tenantId
*/
public String tenantId() {
return this.tenantId;
}

/**
* Sets the tenant id
* @param tenantId The new tenant id to set
*/
public void tenantId(String tenantId) {
this.tenantId = tenantId;
}

/**
* Returns whether the subclass can be used in a {@link BulkDataObjectRequest}
* @return whether the subclass is a write request
*/
public abstract boolean isWriteRequest();

/**
* Superclass for common fields in subclass builders
*/
public static class Builder<T extends Builder<T>> {
protected String index = null;
protected String id = null;
protected String tenantId = null;

/**
* Empty constructor to initialize
*/
protected Builder() {}

/**
* Add an index to this builder
* @param index the index to put the object
* @return the updated builder
*/
public T index(String index) {
this.index = index;
return self();
}

/**
* Add an id to this builder
* @param id the document id
* @return the updated builder
*/
public T id(String id) {
this.id = id;
return self();
}

/**
* Add a tenant id to this builder
* @param tenantId the tenant id
* @return the updated builder
*/
public T tenantId(String tenantId) {
this.tenantId = tenantId;
return self();
}

/**
* Returns this builder as the parameterized type.
*/
@SuppressWarnings("unchecked")
protected T self() {
return (T) this;
}
}
}
Loading

0 comments on commit 3bbc700

Please sign in to comment.