Skip to content

Commit

Permalink
ISSUE-242 Added schema version lifecycle statemachine APIs for users …
Browse files Browse the repository at this point in the history
…to add any custom states.

  - Added APIs required for clients including UI to get statemachine information and transition APIs.
  - Changed details column type to blob in schema_version_state table which gives mroe freedom to put whatever content by custom state implementors.
  • Loading branch information
satishd committed Sep 13, 2017
1 parent 31c4115 commit 7c9c221
Show file tree
Hide file tree
Showing 27 changed files with 1,128 additions and 393 deletions.
2 changes: 1 addition & 1 deletion bootstrap/sql/mysql/v002__create_schema_version_state.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ CREATE TABLE IF NOT EXISTS schema_version_state (
stateId TINYINT NOT NULL,
sequence INT NOT NULL,
timestamp BIGINT NOT NULL,
details VARCHAR(255),
details BLOB,
PRIMARY KEY (schemaVersionId, stateId, sequence),
UNIQUE KEY (id)
);
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ CREATE TABLE IF NOT EXISTS schema_version_state (
"stateId" SMALLINT NOT NULL,
"sequence" INT NOT NULL,
"timestamp" BIGINT NOT NULL,
"details" VARCHAR(255),
"details" BLOB,
PRIMARY KEY ("schemaVersionId", "stateId", "sequence"),
UNIQUE ("id")
);
4 changes: 2 additions & 2 deletions conf/registry.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ modules:
defaultSerializerClass: "com.hortonworks.registries.schemaregistry.serdes.avro.AvroSnapshotSerializer"
defaultDeserializerClass: "com.hortonworks.registries.schemaregistry.serdes.avro.AvroSnapshotDeserializer"
# schema reviewer configuration
schemaReviewExecutor:
className: "com.hortonworks.registries.schemaregistry.state.DefaultSchemaReviewExecutor"
customSchemaStateExecutor:
className: "com.hortonworks.registries.schemaregistry.state.DefaultCustomSchemaStateExecutor"
props:
# schema cache properties
# inmemory schema versions cache size
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@
import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
import com.hortonworks.registries.schemaregistry.serde.SerDesException;
import com.hortonworks.registries.schemaregistry.state.SchemaLifecycleException;
import com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleState;
import com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleStateMachineInfo;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.List;

/**
* Basic service interface for schema registry which should be implemented by client and server interfaces.
Expand Down Expand Up @@ -246,11 +245,7 @@ default void startSchemaVersionReview(Long schemaVersionId) throws SchemaNotFoun
throw new UnsupportedOperationException();
}

default void executeCustomState(Long schemaVersionId) throws SchemaNotFoundException, SchemaLifecycleException {
throw new UnsupportedOperationException();
}
void transitionState(Long schemaVersionId, Byte targetStateId) throws SchemaNotFoundException, SchemaLifecycleException;

default List<SchemaVersionLifecycleState> getSchemaVersionLifecycleStates() {
throw new UnsupportedOperationException();
}
SchemaVersionLifecycleStateMachineInfo getSchemaVersionLifecycleStateMachineInfo();
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public SchemaVersion(String schemaText, String description) {
public SchemaVersion(String schemaText, String description, Byte initialState) {
this.description = description;
this.schemaText = schemaText;
this.initialState = initialState != null ? initialState : SchemaVersionLifecycleStates.ENABLED.id();
this.initialState = initialState != null ? initialState : SchemaVersionLifecycleStates.ENABLED.getId();
}

public String getDescription() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public SchemaVersionInfo(Long id,
this.version = version;
this.schemaText = schemaText;
this.timestamp = timestamp;
this.stateId = stateId == null ? SchemaVersionLifecycleStates.ENABLED.id() : stateId;
this.stateId = stateId == null ? SchemaVersionLifecycleStates.ENABLED.getId() : stateId;
}

public Long getId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,71 +15,15 @@
*/
package com.hortonworks.registries.schemaregistry.state;

import java.util.List;

/**
*
*/
public abstract class AbstractInbuiltSchemaLifecycleState implements InbuiltSchemaVersionLifecycleState {
private final String name;
private final byte id;
private final String description;
private final List<SchemaVersionLifecycleState> nextStates;
public abstract class AbstractInbuiltSchemaLifecycleState extends BaseSchemaVersionLifecycleState implements InbuiltSchemaVersionLifecycleState {

protected AbstractInbuiltSchemaLifecycleState(String name,
byte id,
String description,
List<SchemaVersionLifecycleState> nextStates) {
this.name = name;
this.id = id;
this.description = description;
this.nextStates = nextStates;
}

@Override
public Byte id() {
return id;
}

@Override
public String name() {
return name;
}

@Override
public String description() {
return description;
}

@Override
public List<SchemaVersionLifecycleState> nextStates() {
return nextStates;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

AbstractInbuiltSchemaLifecycleState that = (AbstractInbuiltSchemaLifecycleState) o;

if (id != that.id) return false;
return name != null ? name.equals(that.name) : that.name == null;
}

@Override
public int hashCode() {
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (int) id;
return result;
String description) {
super(name, id, description);
}

@Override
public String toString() {
return "{" +
"name='" + name + '\'' +
", id=" + id +
", description='" + description + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2016 Hortonworks.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hortonworks.registries.schemaregistry.state;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

import java.io.Serializable;

/**
*
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class BaseSchemaVersionLifecycleState implements SchemaVersionLifecycleState, Serializable {

private static final long serialVersionUID = 7503502751825893763L;

private String name;
private byte id;
private String description;

private BaseSchemaVersionLifecycleState() {
}

protected BaseSchemaVersionLifecycleState(String name,
byte id,
String description) {
this.name = name;
this.id = id;
this.description = description;
}

@Override
public Byte getId() {
return id;
}

@Override
public String getName() {
return name;
}

@Override
public String getDescription() {
return description;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

BaseSchemaVersionLifecycleState that = (BaseSchemaVersionLifecycleState) o;

if (id != that.id) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
return description != null ? description.equals(that.description) : that.description == null;
}

@Override
public int hashCode() {
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (int) id;
result = 31 * result + (description != null ? description.hashCode() : 0);
return result;
}

@Override
public String toString() {
return "BaseSchemaVersionLifecycleState{" +
"name='" + name + '\'' +
", id=" + id +
", description='" + description + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,24 @@
import java.util.Map;

/**
* This interface should be implemented for having custom review process. This can be defined in registry.yaml for
* registration.
* This interface should be implemented for having custom review process and adding any custom states with the default
* state machine. This can be defined in registry.yaml for registration.
*
* This is still experimental API and not stable.
*/
public interface SchemaReviewExecutor {
public interface CustomSchemaStateExecutor {

/**
* Initialize with completion states of this schema review.
*
* @param successState state to be set when review is successful.
* @param retryState state to be set when review is failed.
* @param props any properties to be initialized with.
* @param builder this can be used to add any custom states and transitions.
* @param successStateId state to be set when review is successful.
* @param retryStateId state to be set when review is failed.
* @param props any properties to be initialized with.
*/
void init(SchemaVersionLifecycleState successState,
SchemaVersionLifecycleState retryState,
void init(SchemaVersionLifecycleStateMachine.Builder builder,
Byte successStateId,
Byte retryStateId,
Map<String, ?> props);

/**
Expand All @@ -45,5 +49,5 @@ void init(SchemaVersionLifecycleState successState,
* @throws SchemaLifecycleException when any lifecycle error occurs.
* @throws SchemaNotFoundException when the given schema version is not found.
*/
void execute(SchemaVersionLifecycleContext schemaVersionLifecycleContext) throws SchemaLifecycleException, SchemaNotFoundException;
void executeReviewState(SchemaVersionLifecycleContext schemaVersionLifecycleContext) throws SchemaLifecycleException, SchemaNotFoundException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,24 @@
import java.util.Map;

/**
* This is default implementation of SchemaReviewExecutor which always leads to the success state.
* This is default implementation of CustomSchemaStateExecutor which always leads to the success state.
*/
public class DefaultSchemaReviewExecutor implements SchemaReviewExecutor {
public class DefaultCustomSchemaStateExecutor implements CustomSchemaStateExecutor {

private SchemaVersionLifecycleState successState;
private SchemaVersionLifecycleState retryState;

@Override
public void init(SchemaVersionLifecycleState successState,
SchemaVersionLifecycleState retryState,
public void init(SchemaVersionLifecycleStateMachine.Builder builder,
Byte successStateId,
Byte retryStateId,
Map<String, ?> props) {
this.successState = successState;
this.retryState = retryState;
this.successState = builder.getStates().get(successStateId);
this.retryState = builder.getStates().get(retryStateId);
}

@Override
public void execute(SchemaVersionLifecycleContext schemaVersionLifecycleContext) throws SchemaLifecycleException, SchemaNotFoundException {
public void executeReviewState(SchemaVersionLifecycleContext schemaVersionLifecycleContext) throws SchemaLifecycleException, SchemaNotFoundException {
schemaVersionLifecycleContext.setState(successState);
schemaVersionLifecycleContext.updateSchemaVersionState();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,36 @@

import com.hortonworks.registries.schemaregistry.errors.IncompatibleSchemaException;
import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
import org.apache.commons.lang3.tuple.Pair;

import java.util.Collection;

/**
*
*/
public interface InbuiltSchemaVersionLifecycleState extends SchemaVersionLifecycleState {

public default void startReview(SchemaVersionLifecycleContext schemaVersionLifecycleContext,
SchemaReviewExecutor schemaReviewExecutor) throws SchemaLifecycleException, SchemaNotFoundException {
default void startReview(SchemaVersionLifecycleContext schemaVersionLifecycleContext) throws SchemaLifecycleException, SchemaNotFoundException {
throw new SchemaLifecycleException(" This operation is not supported for this instance: " + this);
}

public default void enable(SchemaVersionLifecycleContext schemaVersionLifecycleContext) throws SchemaLifecycleException, SchemaNotFoundException, IncompatibleSchemaException {
default void enable(SchemaVersionLifecycleContext schemaVersionLifecycleContext) throws SchemaLifecycleException, IncompatibleSchemaException, SchemaNotFoundException {
throw new SchemaLifecycleException(" This operation is not supported for this instance: " + this);
}

public default void disable(SchemaVersionLifecycleContext schemaVersionLifecycleContext) throws SchemaLifecycleException, SchemaNotFoundException {
default void disable(SchemaVersionLifecycleContext schemaVersionLifecycleContext) throws SchemaLifecycleException, SchemaNotFoundException {
throw new SchemaLifecycleException(" This operation is not supported for this instance: " + this);

}

public default void archive(SchemaVersionLifecycleContext schemaVersionLifecycleContext) throws SchemaLifecycleException, SchemaNotFoundException {
default void archive(SchemaVersionLifecycleContext schemaVersionLifecycleContext) throws SchemaLifecycleException, SchemaNotFoundException {
throw new SchemaLifecycleException(" This operation is not supported for this instance: " + this);
}

public default void delete(SchemaVersionLifecycleContext schemaVersionLifecycleContext) throws SchemaLifecycleException, SchemaNotFoundException {
default void delete(SchemaVersionLifecycleContext schemaVersionLifecycleContext) throws SchemaLifecycleException, SchemaNotFoundException {
throw new SchemaLifecycleException(" This operation is not supported for this instance: " + this);
}

Collection<Pair<SchemaVersionLifecycleStateTransition, SchemaVersionLifecycleStateAction>> getTransitionActions();

}
Loading

0 comments on commit 7c9c221

Please sign in to comment.