Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor data models to be generic to data storage #2687

Merged
merged 3 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,16 @@ public CreateAsyncQueryResponse createAsyncQuery(
sparkExecutionEngineConfig.getSparkSubmitParameters(),
createAsyncQueryRequest.getSessionId()));
asyncQueryJobMetadataStorageService.storeJobMetadata(
new AsyncQueryJobMetadata(
dispatchQueryResponse.getQueryId(),
sparkExecutionEngineConfig.getApplicationId(),
dispatchQueryResponse.getJobId(),
dispatchQueryResponse.getResultIndex(),
dispatchQueryResponse.getSessionId(),
dispatchQueryResponse.getDatasourceName(),
dispatchQueryResponse.getJobType(),
dispatchQueryResponse.getIndexName()));
AsyncQueryJobMetadata.builder()
.queryId(dispatchQueryResponse.getQueryId())
.applicationId(sparkExecutionEngineConfig.getApplicationId())
.jobId(dispatchQueryResponse.getJobId())
.resultIndex(dispatchQueryResponse.getResultIndex())
.sessionId(dispatchQueryResponse.getSessionId())
.datasourceName(dispatchQueryResponse.getDatasourceName())
.jobType(dispatchQueryResponse.getJobType())
.indexName(dispatchQueryResponse.getIndexName())
.build());
return new CreateAsyncQueryResponse(
dispatchQueryResponse.getQueryId().getId(), dispatchQueryResponse.getSessionId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,45 @@

package org.opensearch.sql.spark.asyncquery;

import static org.opensearch.sql.spark.execution.statestore.StateStore.createJobMetaData;

import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.spark.asyncquery.exceptions.AsyncQueryNotFoundException;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.execution.statestore.OpenSearchStateStoreUtil;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.execution.xcontent.AsyncQueryJobMetadataXContentSerializer;

/** Opensearch implementation of {@link AsyncQueryJobMetadataStorageService} */
@RequiredArgsConstructor
public class OpensearchAsyncQueryJobMetadataStorageService
implements AsyncQueryJobMetadataStorageService {

private final StateStore stateStore;
private final AsyncQueryJobMetadataXContentSerializer asyncQueryJobMetadataXContentSerializer;

private static final Logger LOGGER =
LogManager.getLogger(OpensearchAsyncQueryJobMetadataStorageService.class);

@Override
public void storeJobMetadata(AsyncQueryJobMetadata asyncQueryJobMetadata) {
AsyncQueryId queryId = asyncQueryJobMetadata.getQueryId();
createJobMetaData(stateStore, queryId.getDataSourceName()).apply(asyncQueryJobMetadata);
stateStore.create(
asyncQueryJobMetadata,
AsyncQueryJobMetadata::copy,
OpenSearchStateStoreUtil.getIndexName(queryId.getDataSourceName()));
}

@Override
public Optional<AsyncQueryJobMetadata> getJobMetadata(String qid) {
try {
AsyncQueryId queryId = new AsyncQueryId(qid);
return StateStore.getJobMetaData(stateStore, queryId.getDataSourceName())
.apply(queryId.docId());
return stateStore.get(
queryId.docId(),
asyncQueryJobMetadataXContentSerializer::fromXContent,
OpenSearchStateStoreUtil.getIndexName(queryId.getDataSourceName()));
} catch (Exception e) {
LOGGER.error("Error while fetching the job metadata.", e);
throw new AsyncQueryNotFoundException(String.format("Invalid QueryId: %s", qid));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@

package org.opensearch.sql.spark.asyncquery.model;

import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import lombok.Builder.Default;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.opensearch.index.seqno.SequenceNumbers;
import lombok.experimental.SuperBuilder;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.statestore.StateModel;

/** This class models all the metadata required for a job. */
@Data
@SuperBuilder
@EqualsAndHashCode(callSuper = false)
public class AsyncQueryJobMetadata extends StateModel {
private final AsyncQueryId queryId;
Expand All @@ -27,113 +30,31 @@ public class AsyncQueryJobMetadata extends StateModel {
// since 2.13
// jobType could be null before OpenSearch 2.12. SparkQueryDispatcher use jobType to choose
// cancel query handler. if jobType is null, it will invoke BatchQueryHandler.cancel().
private final JobType jobType;
@Default private final JobType jobType = JobType.INTERACTIVE;
// null if JobType is null
private final String datasourceName;
// null if JobType is INTERACTIVE or null
private final String indexName;

@EqualsAndHashCode.Exclude private final long seqNo;
@EqualsAndHashCode.Exclude private final long primaryTerm;

public AsyncQueryJobMetadata(
AsyncQueryId queryId, String applicationId, String jobId, String resultIndex) {
this(
queryId,
applicationId,
jobId,
resultIndex,
null,
null,
JobType.INTERACTIVE,
null,
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
}

public AsyncQueryJobMetadata(
AsyncQueryId queryId,
String applicationId,
String jobId,
String resultIndex,
String sessionId) {
this(
queryId,
applicationId,
jobId,
resultIndex,
sessionId,
null,
JobType.INTERACTIVE,
null,
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
}

public AsyncQueryJobMetadata(
AsyncQueryId queryId,
String applicationId,
String jobId,
String resultIndex,
String sessionId,
String datasourceName,
JobType jobType,
String indexName) {
this(
queryId,
applicationId,
jobId,
resultIndex,
sessionId,
datasourceName,
jobType,
indexName,
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
}

public AsyncQueryJobMetadata(
AsyncQueryId queryId,
String applicationId,
String jobId,
String resultIndex,
String sessionId,
String datasourceName,
JobType jobType,
String indexName,
long seqNo,
long primaryTerm) {
this.queryId = queryId;
this.applicationId = applicationId;
this.jobId = jobId;
this.resultIndex = resultIndex;
this.sessionId = sessionId;
this.datasourceName = datasourceName;
this.jobType = jobType;
this.indexName = indexName;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
}

@Override
public String toString() {
return new Gson().toJson(this);
}

/** copy builder. update seqNo and primaryTerm */
public static AsyncQueryJobMetadata copy(
AsyncQueryJobMetadata copy, long seqNo, long primaryTerm) {
return new AsyncQueryJobMetadata(
copy.getQueryId(),
copy.getApplicationId(),
copy.getJobId(),
copy.getResultIndex(),
copy.getSessionId(),
copy.datasourceName,
copy.jobType,
copy.indexName,
seqNo,
primaryTerm);
AsyncQueryJobMetadata copy, ImmutableMap<String, Object> metadata) {
return builder()
.queryId(copy.queryId)
.applicationId(copy.getApplicationId())
.jobId(copy.getJobId())
.resultIndex(copy.getResultIndex())
.sessionId(copy.getSessionId())
.datasourceName(copy.datasourceName)
.jobType(copy.jobType)
.indexName(copy.indexName)
.metadata(metadata)
.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,14 @@ private AsyncQueryId storeIndexDMLResult(
long queryRunTime) {
AsyncQueryId asyncQueryId = AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName());
IndexDMLResult indexDMLResult =
new IndexDMLResult(
asyncQueryId.getId(),
status,
error,
dispatchQueryRequest.getDatasource(),
queryRunTime,
System.currentTimeMillis());
IndexDMLResult.builder()
.queryId(asyncQueryId.getId())
.status(status)
.error(error)
.datasourceName(dispatchQueryRequest.getDatasource())
.queryRunTime(queryRunTime)
.updateTime(System.currentTimeMillis())
.build();
indexDMLResultStorageService.createIndexDMLResult(indexDMLResult);
return asyncQueryId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

package org.opensearch.sql.spark.dispatcher.model;

import com.google.common.collect.ImmutableMap;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.opensearch.index.seqno.SequenceNumbers;
import lombok.experimental.SuperBuilder;
import org.opensearch.sql.spark.execution.statestore.StateModel;

/** Plugin create Index DML result. */
@Data
@SuperBuilder
@EqualsAndHashCode(callSuper = false)
public class IndexDMLResult extends StateModel {
public static final String DOC_ID_PREFIX = "index";
Expand All @@ -23,28 +25,20 @@ public class IndexDMLResult extends StateModel {
private final Long queryRunTime;
private final Long updateTime;

public static IndexDMLResult copy(IndexDMLResult copy, long seqNo, long primaryTerm) {
return new IndexDMLResult(
copy.queryId,
copy.status,
copy.error,
copy.datasourceName,
copy.queryRunTime,
copy.updateTime);
public static IndexDMLResult copy(IndexDMLResult copy, ImmutableMap<String, Object> metadata) {
return builder()
.queryId(copy.queryId)
.status(copy.status)
.error(copy.error)
.datasourceName(copy.datasourceName)
.queryRunTime(copy.queryRunTime)
.updateTime(copy.updateTime)
.metadata(metadata)
.build();
}

@Override
public String getId() {
return DOC_ID_PREFIX + queryId;
}

@Override
public long getSeqNo() {
return SequenceNumbers.UNASSIGNED_SEQ_NO;
}

@Override
public long getPrimaryTerm() {
return SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
import static org.opensearch.sql.spark.execution.session.SessionState.NOT_STARTED;
import static org.opensearch.sql.spark.execution.session.SessionType.INTERACTIVE;

import lombok.Builder;
import com.google.common.collect.ImmutableMap;
import lombok.Data;
import org.opensearch.index.seqno.SequenceNumbers;
import lombok.experimental.SuperBuilder;
import org.opensearch.sql.spark.execution.statestore.StateModel;

/** Session data in flint.ql.sessions index. */
@Data
@Builder
@SuperBuilder
public class SessionModel extends StateModel {

public static final String UNKNOWN = "unknown";
Expand All @@ -30,10 +30,7 @@ public class SessionModel extends StateModel {
private final String error;
private final long lastUpdateTime;

private final long seqNo;
private final long primaryTerm;

public static SessionModel of(SessionModel copy, long seqNo, long primaryTerm) {
public static SessionModel of(SessionModel copy, ImmutableMap<String, Object> metadata) {
return builder()
.version(copy.version)
.sessionType(copy.sessionType)
Expand All @@ -44,13 +41,12 @@ public static SessionModel of(SessionModel copy, long seqNo, long primaryTerm) {
.jobId(copy.jobId)
.error(UNKNOWN)
.lastUpdateTime(copy.getLastUpdateTime())
.seqNo(seqNo)
.primaryTerm(primaryTerm)
.metadata(metadata)
.build();
}

public static SessionModel copyWithState(
SessionModel copy, SessionState state, long seqNo, long primaryTerm) {
SessionModel copy, SessionState state, ImmutableMap<String, Object> metadata) {
return builder()
.version(copy.version)
.sessionType(copy.sessionType)
Expand All @@ -61,8 +57,7 @@ public static SessionModel copyWithState(
.jobId(copy.jobId)
.error(UNKNOWN)
.lastUpdateTime(copy.getLastUpdateTime())
.seqNo(seqNo)
.primaryTerm(primaryTerm)
.metadata(metadata)
.build();
}

Expand All @@ -78,8 +73,6 @@ public static SessionModel initInteractiveSession(
.jobId(jobId)
.error(UNKNOWN)
.lastUpdateTime(System.currentTimeMillis())
.seqNo(SequenceNumbers.UNASSIGNED_SEQ_NO)
.primaryTerm(SequenceNumbers.UNASSIGNED_PRIMARY_TERM)
.build();
}

Expand Down
Loading
Loading