Skip to content

Commit

Permalink
Add Flint index metadata log and transaction support (#110)
Browse files Browse the repository at this point in the history
* Add initial impl with basic IT

Signed-off-by: Chen Dai <[email protected]>

* Move to Flint core

Signed-off-by: Chen Dai <[email protected]>

* Switch to new transaction API

Signed-off-by: Chen Dai <[email protected]>

* Add java doc

Signed-off-by: Chen Dai <[email protected]>

* Refactor OS query method

Signed-off-by: Chen Dai <[email protected]>

* Add more IT for exceptional case

Signed-off-by: Chen Dai <[email protected]>

* Add no transaction class to avoid impact on existing IT

Signed-off-by: Chen Dai <[email protected]>

* Make Flint metadata log entry case class and use copy

Signed-off-by: Chen Dai <[email protected]>

* Add logging in FlintSpark, FlintClient and Transaction layer

Signed-off-by: Chen Dai <[email protected]>

* Add index state enum

Signed-off-by: Chen Dai <[email protected]>

* Add IT base suite

Signed-off-by: Chen Dai <[email protected]>

* Extract OS logic to metadata log

Signed-off-by: Chen Dai <[email protected]>

* Update javadoc and more logging

Signed-off-by: Chen Dai <[email protected]>

* Add more IT with skipping index

Signed-off-by: Chen Dai <[email protected]>

* Refactor and add logging for manual test

Signed-off-by: Chen Dai <[email protected]>

* Refactor and add logging for manual test

Signed-off-by: Chen Dai <[email protected]>

* Read metadata log index name from Spark conf

Signed-off-by: Chen Dai <[email protected]>

* Refactor OS add and update with write method

Signed-off-by: Chen Dai <[email protected]>

* Fix broken IT

Signed-off-by: Chen Dai <[email protected]>

* Pass datasource name to metadata log

Signed-off-by: Chen Dai <[email protected]>

* Store latest id in Flint metadata

Signed-off-by: Chen Dai <[email protected]>

* Add comments

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Oct 31, 2023
1 parent ad9989e commit 3e93f77
Show file tree
Hide file tree
Showing 15 changed files with 972 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.opensearch.client.RestHighLevelClient;
import org.opensearch.flint.core.metadata.FlintMetadata;
import org.opensearch.flint.core.metadata.log.OptimisticTransaction;
import org.opensearch.flint.core.storage.FlintReader;
import org.opensearch.flint.core.storage.FlintWriter;

Expand All @@ -18,6 +19,15 @@
*/
public interface FlintClient {

/**
* Start a new optimistic transaction.
*
* @param indexName index name
* @param dataSourceName TODO: read from elsewhere in future
* @return transaction handle
*/
<T> OptimisticTransaction<T> startTransaction(String indexName, String dataSourceName);

/**
* Create a Flint index with the metadata given.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ case class FlintMetadata(
properties: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef],
/** Flint index schema */
schema: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef],
/** Optional latest metadata log entry */
latestId: Option[String] = None,
/** Optional Flint index settings. TODO: move elsewhere? */
indexSettings: Option[String]) {

Expand All @@ -58,6 +60,9 @@ case class FlintMetadata(
.field("source", source)
.field("indexedColumns", indexedColumns)

if (latestId.isDefined) {
builder.field("latestId", latestId.get)
}
optionalObjectField(builder, "options", options)
optionalObjectField(builder, "properties", properties)
}
Expand Down Expand Up @@ -219,14 +224,14 @@ object FlintMetadata {
def build(): FlintMetadata = {
FlintMetadata(
if (version == null) current() else version,
name,
kind,
source,
indexedColumns,
options,
properties,
schema,
indexSettings)
name = name,
kind = kind,
source = source,
indexedColumns = indexedColumns,
options = options,
properties = properties,
schema = schema,
indexSettings = indexSettings)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata.log;

import static org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState$;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

import java.util.Objects;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.logging.Logger;

/**
* Default optimistic transaction implementation that captures the basic workflow for
* transaction support by optimistic locking.
*
* @param <T> result type
*/
public class DefaultOptimisticTransaction<T> implements OptimisticTransaction<T> {

private static final Logger LOG = Logger.getLogger(DefaultOptimisticTransaction.class.getName());

/**
* Data source name. TODO: remove this in future.
*/
private final String dataSourceName;

/**
* Flint metadata log
*/
private final FlintMetadataLog<FlintMetadataLogEntry> metadataLog;

private Predicate<FlintMetadataLogEntry> initialCondition = null;
private Function<FlintMetadataLogEntry, FlintMetadataLogEntry> transientAction = null;
private Function<FlintMetadataLogEntry, FlintMetadataLogEntry> finalAction = null;

public DefaultOptimisticTransaction(
String dataSourceName,
FlintMetadataLog<FlintMetadataLogEntry> metadataLog) {
this.dataSourceName = dataSourceName;
this.metadataLog = metadataLog;
}

@Override
public DefaultOptimisticTransaction<T> initialLog(
Predicate<FlintMetadataLogEntry> initialCondition) {
this.initialCondition = initialCondition;
return this;
}

@Override
public DefaultOptimisticTransaction<T> transientLog(
Function<FlintMetadataLogEntry, FlintMetadataLogEntry> action) {
this.transientAction = action;
return this;
}

@Override
public DefaultOptimisticTransaction<T> finalLog(
Function<FlintMetadataLogEntry, FlintMetadataLogEntry> action) {
this.finalAction = action;
return this;
}

@Override
public T commit(Function<FlintMetadataLogEntry, T> operation) {
Objects.requireNonNull(initialCondition);
Objects.requireNonNull(finalAction);

// Get the latest log and create if not exists
FlintMetadataLogEntry latest =
metadataLog.getLatest().orElseGet(() -> metadataLog.add(emptyLogEntry()));

// Perform initial log check
if (initialCondition.test(latest)) {

// Append optional transient log
if (transientAction != null) {
latest = metadataLog.add(transientAction.apply(latest));
}

// Perform operation
T result = operation.apply(latest);

// Append final log
metadataLog.add(finalAction.apply(latest));
return result;
} else {
LOG.warning("Initial log entry doesn't satisfy precondition " + latest);
throw new IllegalStateException(
"Transaction failed due to initial log precondition not satisfied");
}
}

private FlintMetadataLogEntry emptyLogEntry() {
return new FlintMetadataLogEntry(
"",
UNASSIGNED_SEQ_NO,
UNASSIGNED_PRIMARY_TERM,
IndexState$.MODULE$.EMPTY(),
dataSourceName,
"");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata.log;

import java.util.Optional;

/**
* Flint metadata log that provides transactional support on write API based on different storage.
*/
public interface FlintMetadataLog<T> {

/**
* Add a new log entry to the metadata log.
*
* @param logEntry log entry
* @return log entry after add
*/
T add(T logEntry);

/**
* Get the latest log entry in the metadata log.
*
* @return latest log entry
*/
Optional<T> getLatest();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata.log

import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.IndexState

/**
* Flint metadata log entry. This is temporary and will merge field in FlintMetadata here and move
* implementation specific field, such as seqNo, primaryTerm, dataSource to properties.
*
* @param id
* log entry id
* @param seqNo
* OpenSearch sequence number
* @param primaryTerm
* OpenSearch primary term
* @param state
* Flint index state
* @param dataSource
* OpenSearch data source associated //TODO: remove?
* @param error
* error details if in error state
*/
case class FlintMetadataLogEntry(
id: String,
seqNo: Long,
primaryTerm: Long,
state: IndexState,
dataSource: String, // TODO: get from Spark conf
error: String) {

def this(id: String, seqNo: Long, primaryTerm: Long, map: java.util.Map[String, AnyRef]) {
this(
id,
seqNo,
primaryTerm,
IndexState.from(map.get("state").asInstanceOf[String]),
map.get("dataSourceName").asInstanceOf[String],
map.get("error").asInstanceOf[String])
}

def toJson: String = {
// Implicitly populate latest appId, jobId and timestamp whenever persist
s"""
|{
| "version": "1.0",
| "type": "flintindexstate",
| "state": "$state",
| "applicationId": "${sys.env.getOrElse("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", "unknown")}",
| "jobId": "${sys.env.getOrElse("SERVERLESS_EMR_JOB_ID", "unknown")}",
| "dataSourceName": "$dataSource",
| "lastUpdateTime": "${System.currentTimeMillis()}",
| "error": "$error"
|}
|""".stripMargin
}
}

object FlintMetadataLogEntry {

/**
* Flint index state enum.
*/
object IndexState extends Enumeration {
type IndexState = Value
val EMPTY: IndexState.Value = Value("empty")
val CREATING: IndexState.Value = Value("creating")
val ACTIVE: IndexState.Value = Value("active")
val REFRESHING: IndexState.Value = Value("refreshing")
val DELETING: IndexState.Value = Value("deleting")
val DELETED: IndexState.Value = Value("deleted")
val FAILED: IndexState.Value = Value("failed")
val UNKNOWN: IndexState.Value = Value("unknown")

def from(s: String): IndexState.Value = {
IndexState.values
.find(_.toString.equalsIgnoreCase(s))
.getOrElse(IndexState.UNKNOWN)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata.log;

import java.util.function.Function;
import java.util.function.Predicate;

/**
* Optimistic transaction interface that represents a state transition on the state machine.
* In particular, this abstraction is trying to express:
* initial log (precondition)
* => transient log (with pending operation to do)
* => final log (after operation succeeds)
* For example, "empty" => creating (operation is to create index) => active
*
* @param <T> result type
*/
public interface OptimisticTransaction<T> {

/**
* @param initialCondition initial precondition that the subsequent transition and action can proceed
* @return this transaction
*/
OptimisticTransaction<T> initialLog(Predicate<FlintMetadataLogEntry> initialCondition);

/**
* @param action action to generate transient log entry
* @return this transaction
*/
OptimisticTransaction<T> transientLog(Function<FlintMetadataLogEntry, FlintMetadataLogEntry> action);

/**
* @param action action to generate final log entry
* @return this transaction
*/
OptimisticTransaction<T> finalLog(Function<FlintMetadataLogEntry, FlintMetadataLogEntry> action);

/**
* Execute the given operation with the given log transition above.
*
* @param operation operation
* @return result
*/
T commit(Function<FlintMetadataLogEntry, T> operation);

/**
* No optimistic transaction.
*/
class NoOptimisticTransaction<T> implements OptimisticTransaction<T> {
@Override
public OptimisticTransaction<T> initialLog(Predicate<FlintMetadataLogEntry> initialCondition) {
return this;
}

@Override
public OptimisticTransaction<T> transientLog(Function<FlintMetadataLogEntry, FlintMetadataLogEntry> action) {
return this;
}

@Override
public OptimisticTransaction<T> finalLog(Function<FlintMetadataLogEntry, FlintMetadataLogEntry> action) {
return this;
}

@Override
public T commit(Function<FlintMetadataLogEntry, T> operation) {
return operation.apply(null);
}
};
}
Loading

0 comments on commit 3e93f77

Please sign in to comment.