diff --git a/bootstrap/sql/mysql/v002__create_schema_version_state.sql b/bootstrap/sql/mysql/v002__create_schema_version_state.sql index c673018ab..4b5fabe65 100644 --- a/bootstrap/sql/mysql/v002__create_schema_version_state.sql +++ b/bootstrap/sql/mysql/v002__create_schema_version_state.sql @@ -19,7 +19,7 @@ CREATE TABLE IF NOT EXISTS schema_version_state ( stateId TINYINT NOT NULL, sequence INT NOT NULL, timestamp BIGINT NOT NULL, - details VARCHAR(255), + details BLOB, PRIMARY KEY (schemaVersionId, stateId, sequence), UNIQUE KEY (id) ); \ No newline at end of file diff --git a/bootstrap/sql/postgresql/v002__create_schema_version_state.sql b/bootstrap/sql/postgresql/v002__create_schema_version_state.sql index 1edbd9f0b..bdbf78ebd 100644 --- a/bootstrap/sql/postgresql/v002__create_schema_version_state.sql +++ b/bootstrap/sql/postgresql/v002__create_schema_version_state.sql @@ -19,7 +19,7 @@ CREATE TABLE IF NOT EXISTS schema_version_state ( "stateId" SMALLINT NOT NULL, "sequence" INT NOT NULL, "timestamp" BIGINT NOT NULL, - "details" VARCHAR(255), + "details" BLOB, PRIMARY KEY ("schemaVersionId", "stateId", "sequence"), UNIQUE ("id") ); \ No newline at end of file diff --git a/conf/registry.yaml b/conf/registry.yaml index 270146ca1..ed0a98fbd 100644 --- a/conf/registry.yaml +++ b/conf/registry.yaml @@ -8,8 +8,8 @@ modules: defaultSerializerClass: "com.hortonworks.registries.schemaregistry.serdes.avro.AvroSnapshotSerializer" defaultDeserializerClass: "com.hortonworks.registries.schemaregistry.serdes.avro.AvroSnapshotDeserializer" # schema reviewer configuration - schemaReviewExecutor: - className: "com.hortonworks.registries.schemaregistry.state.DefaultSchemaReviewExecutor" + customSchemaStateExecutor: + className: "com.hortonworks.registries.schemaregistry.state.DefaultCustomSchemaStateExecutor" props: # schema cache properties # inmemory schema versions cache size diff --git a/schema-registry/client/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaRegistryClient.java b/schema-registry/client/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaRegistryClient.java index 92f63e76d..178b03a92 100644 --- a/schema-registry/client/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaRegistryClient.java +++ b/schema-registry/client/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaRegistryClient.java @@ -48,6 +48,7 @@ import com.hortonworks.registries.schemaregistry.serde.pull.PullSerializer; import com.hortonworks.registries.schemaregistry.serde.push.PushDeserializer; import com.hortonworks.registries.schemaregistry.state.SchemaLifecycleException; +import com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleStateMachineInfo; import org.apache.commons.io.IOUtils; import org.glassfish.jersey.SslConfigurator; import org.glassfish.jersey.client.ClientConfig; @@ -139,7 +140,7 @@ public class SchemaRegistryClient implements ISchemaRegistryClient { private static final String SCHEMAS_PATH = SCHEMA_REGISTRY_PATH + "/schemas/"; private static final String SCHEMA_PROVIDERS_PATH = SCHEMA_REGISTRY_PATH + "/schemaproviders/"; private static final String SCHEMAS_BY_ID_PATH = SCHEMA_REGISTRY_PATH + "/schemasById/"; - private static final String SCHEMA_VERSIONS_BY_ID_PATH = SCHEMAS_PATH + "versionsById/"; + private static final String SCHEMA_VERSIONS_PATH = SCHEMAS_PATH + "versions/"; private static final String FILES_PATH = SCHEMA_REGISTRY_PATH + "/files/"; private static final String SERIALIZERS_PATH = SCHEMA_REGISTRY_PATH + "/serdes/"; private static final String REGISTY_CLIENT_JAAS_SECTION = "RegistryClient"; @@ -180,7 +181,9 @@ public class SchemaRegistryClient implements ISchemaRegistryClient { /** * Creates {@link SchemaRegistryClient} instance with the given yaml config. + * * @param confFile config file which contains the configuration entries. + * * @throws IOException when any IOException occurs while reading the given confFile */ public SchemaRegistryClient(File confFile) throws IOException { @@ -188,7 +191,7 @@ public SchemaRegistryClient(File confFile) throws IOException { } private static Map buildConfFromFile(File confFile) throws IOException { - try(FileInputStream fis = new FileInputStream(confFile)) { + try (FileInputStream fis = new FileInputStream(confFile)) { return (Map) new Yaml().load(IOUtils.toString(fis, "UTF-8")); } } @@ -198,18 +201,18 @@ public SchemaRegistryClient(Map conf) { ClientConfig config = createClientConfig(conf); ClientBuilder clientBuilder = ClientBuilder.newBuilder() - .withConfig(config) - .property(ClientProperties.FOLLOW_REDIRECTS, Boolean.TRUE); - if(conf.containsKey(SSL_CONFIGURATION_KEY)) { - Map sslConfigurations = (Map) conf.get(SSL_CONFIGURATION_KEY); + .withConfig(config) + .property(ClientProperties.FOLLOW_REDIRECTS, Boolean.TRUE); + if (conf.containsKey(SSL_CONFIGURATION_KEY)) { + Map sslConfigurations = (Map) conf.get(SSL_CONFIGURATION_KEY); clientBuilder.sslContext(createSSLContext(sslConfigurations)); - if(sslConfigurations.containsKey(HOSTNAME_VERIFIER_CLASS_KEY)) { - HostnameVerifier hostNameVerifier= null; + if (sslConfigurations.containsKey(HOSTNAME_VERIFIER_CLASS_KEY)) { + HostnameVerifier hostNameVerifier = null; String hostNameVerifierClassName = sslConfigurations.get(HOSTNAME_VERIFIER_CLASS_KEY); try { hostNameVerifier = (HostnameVerifier) Class.forName(hostNameVerifierClassName).newInstance(); } catch (Exception e) { - throw new RuntimeException("Failed to instantiate hostNameVerifierClass : "+hostNameVerifierClassName, e); + throw new RuntimeException("Failed to instantiate hostNameVerifierClass : " + hostNameVerifierClassName, e); } clientBuilder.hostnameVerifier(hostNameVerifier); } @@ -220,7 +223,7 @@ public SchemaRegistryClient(Map conf) { // get list of urls and create given or default UrlSelector. urlSelector = createUrlSelector(); urlWithTargets = new ConcurrentHashMap<>(); - + classLoaderCache = new ClassLoaderCache(this); schemaVersionInfoCache = new SchemaVersionInfoCache(new SchemaVersionRetriever() { @@ -234,38 +237,44 @@ public SchemaVersionInfo retrieveSchemaVersion(SchemaIdVersion key) throws Schem return doGetSchemaVersionInfo(key); } }, - ((Number) configuration.getValue(Configuration.SCHEMA_VERSION_CACHE_SIZE.name())).intValue(), - ((Number) configuration.getValue(Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name())).longValue()); + ((Number) configuration.getValue(Configuration.SCHEMA_VERSION_CACHE_SIZE + .name())).intValue(), + ((Number) configuration.getValue(Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS + .name())).longValue()); SchemaMetadataCache.SchemaMetadataFetcher schemaMetadataFetcher = createSchemaMetadataFetcher(); - schemaMetadataCache = new SchemaMetadataCache(((Number) configuration.getValue(Configuration.SCHEMA_METADATA_CACHE_SIZE.name())).longValue(), - ((Number) configuration.getValue(Configuration.SCHEMA_METADATA_CACHE_EXPIRY_INTERVAL_SECS.name())).longValue(), + schemaMetadataCache = new SchemaMetadataCache(((Number) configuration.getValue(Configuration.SCHEMA_METADATA_CACHE_SIZE + .name())).longValue(), + ((Number) configuration.getValue(Configuration.SCHEMA_METADATA_CACHE_EXPIRY_INTERVAL_SECS + .name())).longValue(), schemaMetadataFetcher); schemaTextCache = CacheBuilder.newBuilder() - .maximumSize(((Number) configuration.getValue(Configuration.SCHEMA_TEXT_CACHE_SIZE.name())).longValue()) - .expireAfterAccess(((Number) configuration.getValue(Configuration.SCHEMA_TEXT_CACHE_EXPIRY_INTERVAL_SECS.name())).longValue(), - TimeUnit.MILLISECONDS) - .build(); + .maximumSize(((Number) configuration.getValue(Configuration.SCHEMA_TEXT_CACHE_SIZE + .name())).longValue()) + .expireAfterAccess(((Number) configuration.getValue(Configuration.SCHEMA_TEXT_CACHE_EXPIRY_INTERVAL_SECS + .name())).longValue(), + TimeUnit.MILLISECONDS) + .build(); } protected SSLContext createSSLContext(Map sslConfigurations) { SslConfigurator sslConfigurator = SslConfigurator.newInstance(); String keyPassword = "keyPassword"; sslConfigurator.keyStoreType(sslConfigurations.get("keyStoreType")) - .keyStoreFile(sslConfigurations.get("keyStorePath")) - .keyStorePassword(sslConfigurations.get("keyStorePassword")) - .trustStoreType(sslConfigurations.get("trustStoreType")) - .trustStoreFile(sslConfigurations.get("trustStorePath")) - .trustStorePassword(sslConfigurations.get("trustStorePassword")) - .keyStoreProvider(sslConfigurations.get("keyStoreProvider")) - .trustStoreProvider(sslConfigurations.get("trustStoreProvider")) - .keyManagerFactoryAlgorithm(sslConfigurations.get("keyManagerFactoryAlgorithm")) - .keyManagerFactoryProvider(sslConfigurations.get("keyManagerFactoryProvider")) - .trustManagerFactoryAlgorithm(sslConfigurations.get("trustManagerFactoryAlgorithm")) - .trustManagerFactoryProvider(sslConfigurations.get("trustManagerFactoryProvider")) - .securityProtocol(sslConfigurations.get("protocol")); - if(sslConfigurations.containsKey(keyPassword)) + .keyStoreFile(sslConfigurations.get("keyStorePath")) + .keyStorePassword(sslConfigurations.get("keyStorePassword")) + .trustStoreType(sslConfigurations.get("trustStoreType")) + .trustStoreFile(sslConfigurations.get("trustStorePath")) + .trustStorePassword(sslConfigurations.get("trustStorePassword")) + .keyStoreProvider(sslConfigurations.get("keyStoreProvider")) + .trustStoreProvider(sslConfigurations.get("trustStoreProvider")) + .keyManagerFactoryAlgorithm(sslConfigurations.get("keyManagerFactoryAlgorithm")) + .keyManagerFactoryProvider(sslConfigurations.get("keyManagerFactoryProvider")) + .trustManagerFactoryAlgorithm(sslConfigurations.get("trustManagerFactoryAlgorithm")) + .trustManagerFactoryProvider(sslConfigurations.get("trustManagerFactoryProvider")) + .securityProtocol(sslConfigurations.get("protocol")); + if (sslConfigurations.containsKey(keyPassword)) sslConfigurator.keyPassword(sslConfigurations.get(keyPassword)); return sslConfigurator.createSSLContext(); } @@ -275,7 +284,7 @@ private SchemaRegistryTargets currentSchemaRegistryTargets() { urlWithTargets.computeIfAbsent(url, s -> new SchemaRegistryTargets(client.target(s))); return urlWithTargets.get(url); } - + private static class SchemaRegistryTargets { private final WebTarget schemaProvidersTarget; private final WebTarget schemasTarget; @@ -284,14 +293,18 @@ private static class SchemaRegistryTargets { private final WebTarget searchFieldsTarget; private final WebTarget serializersTarget; private final WebTarget filesTarget; + private final WebTarget schemaVersionsTarget; private final WebTarget schemaVersionsByIdTarget; + private final WebTarget schemaVersionsStatesMachineTarget; SchemaRegistryTargets(WebTarget rootTarget) { + this.rootTarget = rootTarget; schemaProvidersTarget = rootTarget.path(SCHEMA_PROVIDERS_PATH); schemasTarget = rootTarget.path(SCHEMAS_PATH); schemasByIdTarget = rootTarget.path(SCHEMAS_BY_ID_PATH); - schemaVersionsByIdTarget = rootTarget.path(SCHEMA_VERSIONS_BY_ID_PATH); - this.rootTarget = rootTarget; + schemaVersionsByIdTarget = schemasTarget.path("versionsById"); + schemaVersionsTarget = rootTarget.path(SCHEMA_VERSIONS_PATH); + schemaVersionsStatesMachineTarget = schemaVersionsTarget.path("statemachine"); searchFieldsTarget = rootTarget.path(SEARCH_FIELDS); serializersTarget = rootTarget.path(SERIALIZERS_PATH); filesTarget = rootTarget.path(FILES_PATH); @@ -307,7 +320,9 @@ private UrlSelector createUrlSelector() { urlSelector = new LoadBalancedFailoverUrlSelector(rootCatalogURL); } else { try { - urlSelector = (UrlSelector) Class.forName(urlSelectorClass).getConstructor(String.class).newInstance(rootCatalogURL); + urlSelector = (UrlSelector) Class.forName(urlSelectorClass) + .getConstructor(String.class) + .newInstance(rootCatalogURL); } catch (InstantiationException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException | InvocationTargetException e) { throw new RuntimeException(e); @@ -367,7 +382,8 @@ public Long registerSchemaMetadata(SchemaMetadata schemaMetadata) { @Override public Long addSchemaMetadata(SchemaMetadata schemaMetadata) { - SchemaMetadataInfo schemaMetadataInfo = schemaMetadataCache.getIfPresent(SchemaMetadataCache.Key.of(schemaMetadata.getName())); + SchemaMetadataInfo schemaMetadataInfo = schemaMetadataCache.getIfPresent(SchemaMetadataCache.Key.of(schemaMetadata + .getName())); if (schemaMetadataInfo == null) { return doRegisterSchemaMetadata(schemaMetadata, currentSchemaRegistryTargets().schemasTarget); } @@ -401,7 +417,7 @@ public SchemaMetadataInfo getSchemaMetadataInfo(Long schemaMetadataId) { @Override public SchemaIdVersion addSchemaVersion(SchemaMetadata schemaMetadata, SchemaVersion schemaVersion) throws - InvalidSchemaException, IncompatibleSchemaException, SchemaNotFoundException { + InvalidSchemaException, IncompatibleSchemaException, SchemaNotFoundException { // get it, if it exists in cache SchemaDigestEntry schemaDigestEntry = buildSchemaTextEntry(schemaVersion, schemaMetadata.getName()); SchemaIdVersion schemaIdVersion = schemaTextCache.getIfPresent(schemaDigestEntry); @@ -491,7 +507,8 @@ else if (cause instanceof IncompatibleSchemaException) { public void deleteSchemaVersion(SchemaVersionKey schemaVersionKey) throws SchemaNotFoundException { schemaVersionInfoCache.invalidateSchema(new SchemaVersionInfoCache.Key(schemaVersionKey)); - WebTarget target = currentSchemaRegistryTargets().schemasTarget.path(String.format("%s/versions/%s", schemaVersionKey.getSchemaName(), schemaVersionKey.getVersion())); + WebTarget target = currentSchemaRegistryTargets().schemasTarget.path(String.format("%s/versions/%s", schemaVersionKey + .getSchemaName(), schemaVersionKey.getVersion())); Response response = Subject.doAs(subject, new PrivilegedAction() { @Override public Response run() { @@ -512,7 +529,8 @@ private void handleDeleteSchemaResponse(Response response) throws SchemaNotFound } } - private SchemaIdVersion doAddSchemaVersion(String schemaName, SchemaVersion schemaVersion) throws IncompatibleSchemaException, InvalidSchemaException, SchemaNotFoundException { + private SchemaIdVersion doAddSchemaVersion(String schemaName, + SchemaVersion schemaVersion) throws IncompatibleSchemaException, InvalidSchemaException, SchemaNotFoundException { SchemaMetadataInfo schemaMetadataInfo = getSchemaMetadataInfo(schemaName); if (schemaMetadataInfo == null) { throw new SchemaNotFoundException("Schema with name " + schemaName + " not found"); @@ -528,7 +546,8 @@ public Response run() { return handleSchemaIdVersionResponse(schemaMetadataInfo, response); } - private SchemaIdVersion handleSchemaIdVersionResponse(SchemaMetadataInfo schemaMetadataInfo, Response response) throws IncompatibleSchemaException, InvalidSchemaException { + private SchemaIdVersion handleSchemaIdVersionResponse(SchemaMetadataInfo schemaMetadataInfo, + Response response) throws IncompatibleSchemaException, InvalidSchemaException { int status = response.getStatus(); String msg = response.readEntity(String.class); if (status == Response.Status.BAD_REQUEST.getStatusCode() || status == Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()) { @@ -545,7 +564,8 @@ private SchemaIdVersion handleSchemaIdVersionResponse(SchemaMetadataInfo schemaM Integer version = readEntity(msg, Integer.class); - SchemaVersionInfo schemaVersionInfo = doGetSchemaVersionInfo(new SchemaVersionKey(schemaMetadataInfo.getSchemaMetadata().getName(), version)); + SchemaVersionInfo schemaVersionInfo = doGetSchemaVersionInfo(new SchemaVersionKey(schemaMetadataInfo.getSchemaMetadata() + .getName(), version)); return new SchemaIdVersion(schemaMetadataInfo.getId(), version, schemaVersionInfo.getId()); } @@ -583,10 +603,13 @@ public SchemaVersionInfo getSchemaVersionInfo(SchemaVersionKey schemaVersionKey) } private SchemaVersionInfo doGetSchemaVersionInfo(SchemaIdVersion schemaIdVersion) throws SchemaNotFoundException { - if(schemaIdVersion.getSchemaVersionId() != null) { + if (schemaIdVersion.getSchemaVersionId() != null) { LOG.info("Getting schema version from target registry for [{}]", schemaIdVersion.getSchemaVersionId()); - return getEntity(currentSchemaRegistryTargets().schemaVersionsByIdTarget.path(schemaIdVersion.getSchemaVersionId().toString()), SchemaVersionInfo.class); - } else if (schemaIdVersion.getSchemaMetadataId() != null){ + return getEntity(currentSchemaRegistryTargets() + .schemaVersionsByIdTarget + .path(schemaIdVersion.getSchemaVersionId().toString()), + SchemaVersionInfo.class); + } else if (schemaIdVersion.getSchemaMetadataId() != null) { SchemaMetadataInfo schemaMetadataInfo = getSchemaMetadataInfo(schemaIdVersion.getSchemaMetadataId()); SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaMetadataInfo.getSchemaMetadata() .getName(), schemaIdVersion.getVersion()); @@ -600,7 +623,8 @@ private SchemaVersionInfo doGetSchemaVersionInfo(SchemaIdVersion schemaIdVersion private SchemaVersionInfo doGetSchemaVersionInfo(SchemaVersionKey schemaVersionKey) { LOG.info("Getting schema version from target registry for [{}]", schemaVersionKey); String schemaName = schemaVersionKey.getSchemaName(); - WebTarget webTarget = currentSchemaRegistryTargets().schemasTarget.path(String.format("%s/versions/%d", schemaName, schemaVersionKey.getVersion())); + WebTarget webTarget = currentSchemaRegistryTargets().schemasTarget.path(String.format("%s/versions/%d", schemaName, schemaVersionKey + .getVersion())); return getEntity(webTarget, SchemaVersionInfo.class); } @@ -623,10 +647,10 @@ private static String encode(String schemaName) { public void enableSchemaVersion(Long schemaVersionId) throws SchemaNotFoundException, SchemaLifecycleException, IncompatibleSchemaException { try { - fetchSchemaVersionState(schemaVersionId, "enable"); + transitionSchemaVersionState(schemaVersionId, "enable"); } catch (SchemaLifecycleException e) { Throwable cause = e.getCause(); - if(cause != null && cause instanceof IncompatibleSchemaException) { + if (cause != null && cause instanceof IncompatibleSchemaException) { throw (IncompatibleSchemaException) cause; } throw e; @@ -635,29 +659,40 @@ public void enableSchemaVersion(Long schemaVersionId) @Override public void disableSchemaVersion(Long schemaVersionId) throws SchemaNotFoundException, SchemaLifecycleException { - fetchSchemaVersionState(schemaVersionId, "disable"); + transitionSchemaVersionState(schemaVersionId, "disable"); } @Override public void deleteSchemaVersion(Long schemaVersionId) throws SchemaNotFoundException, SchemaLifecycleException { - fetchSchemaVersionState(schemaVersionId, "delete"); + transitionSchemaVersionState(schemaVersionId, "delete"); } @Override public void archiveSchemaVersion(Long schemaVersionId) throws SchemaNotFoundException, SchemaLifecycleException { - fetchSchemaVersionState(schemaVersionId, "archive"); + transitionSchemaVersionState(schemaVersionId, "archive"); } @Override public void startSchemaVersionReview(Long schemaVersionId) throws SchemaNotFoundException, SchemaLifecycleException { - fetchSchemaVersionState(schemaVersionId, "startReview"); + transitionSchemaVersionState(schemaVersionId, "startReview"); + } + @Override + public void transitionState(Long schemaVersionId, + Byte targetStateId) throws SchemaNotFoundException, SchemaLifecycleException { + boolean result = transitionSchemaVersionState(schemaVersionId, targetStateId.toString()); + } + + @Override + public SchemaVersionLifecycleStateMachineInfo getSchemaVersionLifecycleStateMachineInfo() { + return getEntity(currentSchemaRegistryTargets().schemaVersionsStatesMachineTarget, + SchemaVersionLifecycleStateMachineInfo.class); } - private boolean fetchSchemaVersionState(Long schemaVersionId, - String operation) throws SchemaNotFoundException, SchemaLifecycleException { + private boolean transitionSchemaVersionState(Long schemaVersionId, + String operationOrTargetState) throws SchemaNotFoundException, SchemaLifecycleException { - WebTarget webTarget = currentSchemaRegistryTargets().schemaVersionsByIdTarget.path(schemaVersionId + "/state/" + operation); + WebTarget webTarget = currentSchemaRegistryTargets().schemaVersionsTarget.path(schemaVersionId + "/state/" + operationOrTargetState); Response response = Subject.doAs(subject, new PrivilegedAction() { @Override public Response run() { @@ -665,7 +700,7 @@ public Response run() { } }); - boolean result = handleSchemaLifeCycleResponse(response); + boolean result = handleSchemaLifeCycleResponse(response); // invalidate this entry from cache. schemaVersionInfoCache.invalidateSchema(SchemaVersionInfoCache.Key.of(new SchemaIdVersion(schemaVersionId))); @@ -676,13 +711,13 @@ public Response run() { private boolean handleSchemaLifeCycleResponse(Response response) throws SchemaNotFoundException, SchemaLifecycleException { boolean result; int status = response.getStatus(); - if(status == Response.Status.OK.getStatusCode()) { + if (status == Response.Status.OK.getStatusCode()) { result = response.readEntity(Boolean.class); - } else if(status == Response.Status.NOT_FOUND.getStatusCode()) { + } else if (status == Response.Status.NOT_FOUND.getStatusCode()) { throw new SchemaNotFoundException(response.readEntity(String.class)); - } else if(status == Response.Status.BAD_REQUEST.getStatusCode()) { + } else if (status == Response.Status.BAD_REQUEST.getStatusCode()) { CatalogResponse catalogResponse = readCatalogResponse(response.readEntity(String.class)); - if(catalogResponse.getResponseCode() == CatalogResponse.ResponseMessage.INCOMPATIBLE_SCHEMA.getCode()) { + if (catalogResponse.getResponseCode() == CatalogResponse.ResponseMessage.INCOMPATIBLE_SCHEMA.getCode()) { throw new SchemaLifecycleException(new IncompatibleSchemaException(catalogResponse.getResponseMessage())); } throw new SchemaLifecycleException(catalogResponse.getResponseMessage()); @@ -701,7 +736,8 @@ public Collection getAllVersions(String schemaName) throws Sc } @Override - public CompatibilityResult checkCompatibility(String schemaName, String toSchemaText) throws SchemaNotFoundException { + public CompatibilityResult checkCompatibility(String schemaName, + String toSchemaText) throws SchemaNotFoundException { WebTarget webTarget = currentSchemaRegistryTargets().schemasTarget.path(encode(schemaName) + "/compatibility"); String response = Subject.doAs(subject, new PrivilegedAction() { @Override @@ -735,7 +771,8 @@ public String uploadFile(InputStream inputStream) { return Subject.doAs(subject, new PrivilegedAction() { @Override public String run() { - return currentSchemaRegistryTargets().filesTarget.request().post(Entity.entity(multiPart, MediaType.MULTIPART_FORM_DATA), String.class); + return currentSchemaRegistryTargets().filesTarget.request() + .post(Entity.entity(multiPart, MediaType.MULTIPART_FORM_DATA), String.class); } }); } @@ -745,7 +782,9 @@ public InputStream downloadFile(String fileId) { return Subject.doAs(subject, new PrivilegedAction() { @Override public InputStream run() { - return currentSchemaRegistryTargets().filesTarget.path("download/" + encode(fileId)).request().get(InputStream.class); + return currentSchemaRegistryTargets().filesTarget.path("download/" + encode(fileId)) + .request() + .get(InputStream.class); } }); } @@ -830,7 +869,8 @@ private T createInstance(SerDesInfo serDesInfo, boolean isSerializer) { T t; try { - String className = isSerializer ? serDesPair.getSerializerClassName() : serDesPair.getDeserializerClassName(); + String className = + isSerializer ? serDesPair.getSerializerClassName() : serDesPair.getDeserializerClassName(); Class clazz = (Class) Class.forName(className, true, classLoader); t = clazz.newInstance(); @@ -1100,7 +1140,8 @@ private Map> buildOptions(Field[] fields) { } public T getValue(String propertyKey) { - return (T) (config.containsKey(propertyKey) ? config.get(propertyKey) : options.get(propertyKey).defaultValue()); + return (T) (config.containsKey(propertyKey) ? config.get(propertyKey) + : options.get(propertyKey).defaultValue()); } public Map getConfig() { diff --git a/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/ISchemaRegistryService.java b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/ISchemaRegistryService.java index 516bc631f..26de684ed 100644 --- a/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/ISchemaRegistryService.java +++ b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/ISchemaRegistryService.java @@ -20,12 +20,11 @@ import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException; import com.hortonworks.registries.schemaregistry.serde.SerDesException; import com.hortonworks.registries.schemaregistry.state.SchemaLifecycleException; -import com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleState; +import com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleStateMachineInfo; import java.io.IOException; import java.io.InputStream; import java.util.Collection; -import java.util.List; /** * Basic service interface for schema registry which should be implemented by client and server interfaces. @@ -246,11 +245,7 @@ default void startSchemaVersionReview(Long schemaVersionId) throws SchemaNotFoun throw new UnsupportedOperationException(); } - default void executeCustomState(Long schemaVersionId) throws SchemaNotFoundException, SchemaLifecycleException { - throw new UnsupportedOperationException(); - } + void transitionState(Long schemaVersionId, Byte targetStateId) throws SchemaNotFoundException, SchemaLifecycleException; - default List getSchemaVersionLifecycleStates() { - throw new UnsupportedOperationException(); - } + SchemaVersionLifecycleStateMachineInfo getSchemaVersionLifecycleStateMachineInfo(); } diff --git a/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/SchemaVersion.java b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/SchemaVersion.java index 5b1f0baae..c7a95fd84 100644 --- a/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/SchemaVersion.java +++ b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/SchemaVersion.java @@ -44,7 +44,7 @@ public SchemaVersion(String schemaText, String description) { public SchemaVersion(String schemaText, String description, Byte initialState) { this.description = description; this.schemaText = schemaText; - this.initialState = initialState != null ? initialState : SchemaVersionLifecycleStates.ENABLED.id(); + this.initialState = initialState != null ? initialState : SchemaVersionLifecycleStates.ENABLED.getId(); } public String getDescription() { diff --git a/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/SchemaVersionInfo.java b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/SchemaVersionInfo.java index 23aeeac89..5101520a8 100644 --- a/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/SchemaVersionInfo.java +++ b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/SchemaVersionInfo.java @@ -95,7 +95,7 @@ public SchemaVersionInfo(Long id, this.version = version; this.schemaText = schemaText; this.timestamp = timestamp; - this.stateId = stateId == null ? SchemaVersionLifecycleStates.ENABLED.id() : stateId; + this.stateId = stateId == null ? SchemaVersionLifecycleStates.ENABLED.getId() : stateId; } public Long getId() { diff --git a/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/AbstractInbuiltSchemaLifecycleState.java b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/AbstractInbuiltSchemaLifecycleState.java index 0021a585b..4799eae55 100644 --- a/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/AbstractInbuiltSchemaLifecycleState.java +++ b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/AbstractInbuiltSchemaLifecycleState.java @@ -15,71 +15,15 @@ */ package com.hortonworks.registries.schemaregistry.state; -import java.util.List; - /** * */ -public abstract class AbstractInbuiltSchemaLifecycleState implements InbuiltSchemaVersionLifecycleState { - private final String name; - private final byte id; - private final String description; - private final List nextStates; +public abstract class AbstractInbuiltSchemaLifecycleState extends BaseSchemaVersionLifecycleState implements InbuiltSchemaVersionLifecycleState { protected AbstractInbuiltSchemaLifecycleState(String name, byte id, - String description, - List nextStates) { - this.name = name; - this.id = id; - this.description = description; - this.nextStates = nextStates; - } - - @Override - public Byte id() { - return id; - } - - @Override - public String name() { - return name; - } - - @Override - public String description() { - return description; - } - - @Override - public List nextStates() { - return nextStates; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - AbstractInbuiltSchemaLifecycleState that = (AbstractInbuiltSchemaLifecycleState) o; - - if (id != that.id) return false; - return name != null ? name.equals(that.name) : that.name == null; - } - - @Override - public int hashCode() { - int result = name != null ? name.hashCode() : 0; - result = 31 * result + (int) id; - return result; + String description) { + super(name, id, description); } - @Override - public String toString() { - return "{" + - "name='" + name + '\'' + - ", id=" + id + - ", description='" + description + '\'' + - '}'; - } } diff --git a/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/BaseSchemaVersionLifecycleState.java b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/BaseSchemaVersionLifecycleState.java new file mode 100644 index 000000000..fb1e86551 --- /dev/null +++ b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/BaseSchemaVersionLifecycleState.java @@ -0,0 +1,88 @@ +/* + * Copyright 2016 Hortonworks. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hortonworks.registries.schemaregistry.state; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import java.io.Serializable; + +/** + * + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class BaseSchemaVersionLifecycleState implements SchemaVersionLifecycleState, Serializable { + + private static final long serialVersionUID = 7503502751825893763L; + + private String name; + private byte id; + private String description; + + private BaseSchemaVersionLifecycleState() { + } + + protected BaseSchemaVersionLifecycleState(String name, + byte id, + String description) { + this.name = name; + this.id = id; + this.description = description; + } + + @Override + public Byte getId() { + return id; + } + + @Override + public String getName() { + return name; + } + + @Override + public String getDescription() { + return description; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + BaseSchemaVersionLifecycleState that = (BaseSchemaVersionLifecycleState) o; + + if (id != that.id) return false; + if (name != null ? !name.equals(that.name) : that.name != null) return false; + return description != null ? description.equals(that.description) : that.description == null; + } + + @Override + public int hashCode() { + int result = name != null ? name.hashCode() : 0; + result = 31 * result + (int) id; + result = 31 * result + (description != null ? description.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "BaseSchemaVersionLifecycleState{" + + "name='" + name + '\'' + + ", id=" + id + + ", description='" + description + '\'' + + '}'; + } +} diff --git a/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaReviewExecutor.java b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/CustomSchemaStateExecutor.java similarity index 64% rename from schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaReviewExecutor.java rename to schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/CustomSchemaStateExecutor.java index 30c3a4bf2..dcdd29b5c 100644 --- a/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaReviewExecutor.java +++ b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/CustomSchemaStateExecutor.java @@ -20,20 +20,24 @@ import java.util.Map; /** - * This interface should be implemented for having custom review process. This can be defined in registry.yaml for - * registration. + * This interface should be implemented for having custom review process and adding any custom states with the default + * state machine. This can be defined in registry.yaml for registration. + * + * This is still experimental API and not stable. */ -public interface SchemaReviewExecutor { +public interface CustomSchemaStateExecutor { /** * Initialize with completion states of this schema review. * - * @param successState state to be set when review is successful. - * @param retryState state to be set when review is failed. - * @param props any properties to be initialized with. + * @param builder this can be used to add any custom states and transitions. + * @param successStateId state to be set when review is successful. + * @param retryStateId state to be set when review is failed. + * @param props any properties to be initialized with. */ - void init(SchemaVersionLifecycleState successState, - SchemaVersionLifecycleState retryState, + void init(SchemaVersionLifecycleStateMachine.Builder builder, + Byte successStateId, + Byte retryStateId, Map props); /** @@ -45,5 +49,5 @@ void init(SchemaVersionLifecycleState successState, * @throws SchemaLifecycleException when any lifecycle error occurs. * @throws SchemaNotFoundException when the given schema version is not found. */ - void execute(SchemaVersionLifecycleContext schemaVersionLifecycleContext) throws SchemaLifecycleException, SchemaNotFoundException; + void executeReviewState(SchemaVersionLifecycleContext schemaVersionLifecycleContext) throws SchemaLifecycleException, SchemaNotFoundException; } diff --git a/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/DefaultSchemaReviewExecutor.java b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/DefaultCustomSchemaStateExecutor.java similarity index 63% rename from schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/DefaultSchemaReviewExecutor.java rename to schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/DefaultCustomSchemaStateExecutor.java index cae010604..716e06b35 100644 --- a/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/DefaultSchemaReviewExecutor.java +++ b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/DefaultCustomSchemaStateExecutor.java @@ -20,23 +20,24 @@ import java.util.Map; /** - * This is default implementation of SchemaReviewExecutor which always leads to the success state. + * This is default implementation of CustomSchemaStateExecutor which always leads to the success state. */ -public class DefaultSchemaReviewExecutor implements SchemaReviewExecutor { +public class DefaultCustomSchemaStateExecutor implements CustomSchemaStateExecutor { private SchemaVersionLifecycleState successState; private SchemaVersionLifecycleState retryState; @Override - public void init(SchemaVersionLifecycleState successState, - SchemaVersionLifecycleState retryState, + public void init(SchemaVersionLifecycleStateMachine.Builder builder, + Byte successStateId, + Byte retryStateId, Map props) { - this.successState = successState; - this.retryState = retryState; + this.successState = builder.getStates().get(successStateId); + this.retryState = builder.getStates().get(retryStateId); } @Override - public void execute(SchemaVersionLifecycleContext schemaVersionLifecycleContext) throws SchemaLifecycleException, SchemaNotFoundException { + public void executeReviewState(SchemaVersionLifecycleContext schemaVersionLifecycleContext) throws SchemaLifecycleException, SchemaNotFoundException { schemaVersionLifecycleContext.setState(successState); schemaVersionLifecycleContext.updateSchemaVersionState(); } diff --git a/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/InbuiltSchemaVersionLifecycleState.java b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/InbuiltSchemaVersionLifecycleState.java index b6b127883..957f39718 100644 --- a/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/InbuiltSchemaVersionLifecycleState.java +++ b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/InbuiltSchemaVersionLifecycleState.java @@ -17,32 +17,36 @@ import com.hortonworks.registries.schemaregistry.errors.IncompatibleSchemaException; import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException; +import org.apache.commons.lang3.tuple.Pair; + +import java.util.Collection; /** * */ public interface InbuiltSchemaVersionLifecycleState extends SchemaVersionLifecycleState { - public default void startReview(SchemaVersionLifecycleContext schemaVersionLifecycleContext, - SchemaReviewExecutor schemaReviewExecutor) throws SchemaLifecycleException, SchemaNotFoundException { + default void startReview(SchemaVersionLifecycleContext schemaVersionLifecycleContext) throws SchemaLifecycleException, SchemaNotFoundException { throw new SchemaLifecycleException(" This operation is not supported for this instance: " + this); } - public default void enable(SchemaVersionLifecycleContext schemaVersionLifecycleContext) throws SchemaLifecycleException, SchemaNotFoundException, IncompatibleSchemaException { + default void enable(SchemaVersionLifecycleContext schemaVersionLifecycleContext) throws SchemaLifecycleException, IncompatibleSchemaException, SchemaNotFoundException { throw new SchemaLifecycleException(" This operation is not supported for this instance: " + this); } - public default void disable(SchemaVersionLifecycleContext schemaVersionLifecycleContext) throws SchemaLifecycleException, SchemaNotFoundException { + default void disable(SchemaVersionLifecycleContext schemaVersionLifecycleContext) throws SchemaLifecycleException, SchemaNotFoundException { throw new SchemaLifecycleException(" This operation is not supported for this instance: " + this); } - public default void archive(SchemaVersionLifecycleContext schemaVersionLifecycleContext) throws SchemaLifecycleException, SchemaNotFoundException { + default void archive(SchemaVersionLifecycleContext schemaVersionLifecycleContext) throws SchemaLifecycleException, SchemaNotFoundException { throw new SchemaLifecycleException(" This operation is not supported for this instance: " + this); } - public default void delete(SchemaVersionLifecycleContext schemaVersionLifecycleContext) throws SchemaLifecycleException, SchemaNotFoundException { + default void delete(SchemaVersionLifecycleContext schemaVersionLifecycleContext) throws SchemaLifecycleException, SchemaNotFoundException { throw new SchemaLifecycleException(" This operation is not supported for this instance: " + this); } + Collection> getTransitionActions(); + } diff --git a/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleContext.java b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleContext.java index 0051a02be..16d0bc818 100644 --- a/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleContext.java +++ b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleContext.java @@ -21,58 +21,102 @@ import java.util.concurrent.ConcurrentMap; /** - * + * This class contains contextual information and services required for transition from one state to other state. */ public class SchemaVersionLifecycleContext { private SchemaVersionLifecycleState state; private Long schemaVersionId; private Integer sequence; private SchemaVersionService schemaVersionService; - private SchemaVersionLifecycleStates.Registry schemaLifeCycleStatesRegistry; + private SchemaVersionLifecycleStateMachine schemaVersionLifecycleStateMachine; + private CustomSchemaStateExecutor customSchemaStateExecutor; private ConcurrentMap props = new ConcurrentHashMap<>(); public SchemaVersionLifecycleContext(Long schemaVersionId, Integer sequence, SchemaVersionService schemaVersionService, - SchemaVersionLifecycleStates.Registry schemaLifeCycleStatesRegistry) { + SchemaVersionLifecycleStateMachine schemaVersionLifecycleStateMachine, + CustomSchemaStateExecutor customSchemaStateExecutor) { this.schemaVersionId = schemaVersionId; this.sequence = sequence; this.schemaVersionService = schemaVersionService; - this.schemaLifeCycleStatesRegistry = schemaLifeCycleStatesRegistry; + this.schemaVersionLifecycleStateMachine = schemaVersionLifecycleStateMachine; + this.customSchemaStateExecutor = customSchemaStateExecutor; } + /** + * Sets the given state as target state for the given schema version identified by {@code schemaVersionId}. + * + * @param state target state. + */ public void setState(SchemaVersionLifecycleState state) { this.state = state; } + /** + * @return Target state to be set. + */ public SchemaVersionLifecycleState getState() { return state; } + /** + * @return schema version id for which state change has to happen. + */ public Long getSchemaVersionId() { return schemaVersionId; } + /** + * @return Current sequence of the schema version state updation. + */ public Integer getSequence() { return sequence; } + /** + * @return {@link SchemaVersionService} which can be used to have different operations with schema versions etc. + */ public SchemaVersionService getSchemaVersionService() { return schemaVersionService; } - public SchemaVersionLifecycleStates.Registry getSchemaLifeCycleStatesRegistry() { - return schemaLifeCycleStatesRegistry; + public CustomSchemaStateExecutor getCustomSchemaStateExecutor() { + return customSchemaStateExecutor; + } + + /** + * @return SchemaVersionLifecycleStateMachine registered for this SchemaRegistry instance. + */ + public SchemaVersionLifecycleStateMachine getSchemaLifeCycleStatesMachine() { + return schemaVersionLifecycleStateMachine; } + /** + * @param key key object + * @return any property registered for the given key. + */ public Object getProperty(String key) { return props.get(key); } + /** + * Set the given property value for the given property key. + * + * @param key key object + * @param value value object + * @return Any value that is already registered for the given key. + */ public Object setProperty(String key, String value) { return props.put(key, value); } + /** + * Updates the current context into the schema version state storage. + * + * @throws SchemaLifecycleException when any error occurs while updating the state. + * @throws SchemaNotFoundException when there is no schema/version found with the given {@code schemaVersionId}. + */ public void updateSchemaVersionState() throws SchemaLifecycleException, SchemaNotFoundException { schemaVersionService.updateSchemaVersionState(this); } @@ -84,7 +128,7 @@ public String toString() { ", schemaVersionId=" + schemaVersionId + ", sequence=" + sequence + ", schemaVersionService=" + schemaVersionService + - ", schemaLifeCycleStatesRegistry=" + schemaLifeCycleStatesRegistry + + ", schemaVersionLifecycleStateMachine=" + schemaVersionLifecycleStateMachine + ", props=" + props + '}'; } diff --git a/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleState.java b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleState.java index cac334dd8..dccaeaf34 100644 --- a/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleState.java +++ b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleState.java @@ -15,33 +15,28 @@ */ package com.hortonworks.registries.schemaregistry.state; -import java.util.List; +import java.io.Serializable; /** * Schema version life cycle state to be defined. */ public interface SchemaVersionLifecycleState { + Byte INBUILT_STATE_ID_MAX = 32; /** - * @return This state's identifier. Identifiers <= 32 are reserved and 33 to 127 can be used for any custom - * states. + * @return This state's identifier. Identifiers [0, 32] are reserved and other byte values can be used for + * any custom states. */ - Byte id(); + Byte getId(); /** * @return name of this state */ - String name(); + String getName(); /** * @return description about this state */ - String description(); + String getDescription(); - /** - * @return List of states that can lead from this state. - */ - default List nextStates() { - throw new UnsupportedOperationException(); - } } diff --git a/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleStateAction.java b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleStateAction.java new file mode 100644 index 000000000..d626459d3 --- /dev/null +++ b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleStateAction.java @@ -0,0 +1,30 @@ +/* + * Copyright 2016 Hortonworks. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hortonworks.registries.schemaregistry.state; + +/** + * This class defines what needs to be executed on transition from one state to other state. + */ +public interface SchemaVersionLifecycleStateAction { + + /** + * @param context which gives the context about the state machine and any dependent services. + * + * @throws SchemaLifecycleException any Exceptions occur will be wrapper in this and consumers of this API + * should check for root cause and take appropriate action. + */ + public void execute(SchemaVersionLifecycleContext context) throws SchemaLifecycleException; +} diff --git a/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleStateMachine.java b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleStateMachine.java new file mode 100644 index 000000000..0a73a2bcd --- /dev/null +++ b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleStateMachine.java @@ -0,0 +1,152 @@ +/* + * Copyright 2016 Hortonworks. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hortonworks.registries.schemaregistry.state; + +import org.apache.commons.lang3.tuple.Pair; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleState.INBUILT_STATE_ID_MAX; + +/** + * This class represents schema version lifecycle state machine registered in SchemaRegistry server. Users can customize + * the state by configuring {@link CustomSchemaStateExecutor} and custom states and transitions can be configured using {@link Builder} + * in callbacks{@link CustomSchemaStateExecutor#init(Builder, Byte, Byte, Map)} + */ +public class SchemaVersionLifecycleStateMachine { + + private final Map states; + private final Map transitions; + + private SchemaVersionLifecycleStateMachine(Map states, + Map transitions) { + this.states = Collections.unmodifiableMap(states); + this.transitions = Collections.unmodifiableMap(transitions); + } + + public Map getStates() { + return states; + } + + public Map getTransitions() { + return transitions; + } + + public SchemaVersionLifecycleStateMachineInfo toConfig() { + return new SchemaVersionLifecycleStateMachineInfo(states.values(), transitions.keySet()); + } + + public static Builder newBuilder() { + return new Builder(); + } + + + public static class Builder { + ConcurrentMap states = new ConcurrentHashMap<>(); + ConcurrentMap transitionsWithActions = new ConcurrentHashMap<>(); + + public Builder() { + registerInBuiltStates(); + } + + private void registerInBuiltStates() { + List> transitionActions = new ArrayList<>(); + + Field[] declaredFields = SchemaVersionLifecycleStates.class.getDeclaredFields(); + for (Field field : declaredFields) { + if (Modifier.isFinal(field.getModifiers()) && + Modifier.isStatic(field.getModifiers()) && + InbuiltSchemaVersionLifecycleState.class.isAssignableFrom(field.getType())) { + InbuiltSchemaVersionLifecycleState state = null; + try { + state = (InbuiltSchemaVersionLifecycleState) field.get(null); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + register(state); + + transitionActions.addAll(state.getTransitionActions()); + } + } + + // register transitions for inbuilt states + for (Pair transitionAction : transitionActions) { + transition(transitionAction.getLeft(), transitionAction.getRight()); + } + } + + /** + * Registers the given state with REGISTRY. + * + * @param state state to be registered. + * + * @throws IllegalArgumentException if the given state is already registered. + */ + public void register(SchemaVersionLifecycleState state) { + checkForInbuiltStateIds(state); + SchemaVersionLifecycleState prevState = states.putIfAbsent(state.getId(), state); + if (prevState != null) { + throw new IllegalArgumentException("Given state is already registered as " + prevState); + } + } + + public Map getStates() { + return Collections.unmodifiableMap(states); + } + + public Builder transition(SchemaVersionLifecycleStateTransition transition, + SchemaVersionLifecycleStateAction action) { + Byte sourceStateId = transition.getSourceStateId(); + Byte targetStateId = transition.getTargetStateId(); + checkStatesRegistered(sourceStateId, targetStateId); + + SchemaVersionLifecycleStateAction existingTransitionAction = transitionsWithActions.putIfAbsent(transition, action); + if (existingTransitionAction != null) { + throw new IllegalArgumentException("Given transition already exists, from: [" + sourceStateId + "] to: [" + targetStateId + "]"); + } + + return this; + } + + private void checkForInbuiltStateIds(SchemaVersionLifecycleState state) { + if (!(state instanceof InbuiltSchemaVersionLifecycleState)) { + if (state.getId() <= INBUILT_STATE_ID_MAX) { + throw new IllegalArgumentException("Given custom state id should be more than 32"); + } + } + } + + private void checkStatesRegistered(Byte... stateIds) { + for (Byte stateId : stateIds) { + if (!this.states.containsKey(stateId)) { + throw new IllegalArgumentException("Given state [" + stateId + "] is not yet registered."); + } + } + } + + public SchemaVersionLifecycleStateMachine build() { + return new SchemaVersionLifecycleStateMachine(states, transitionsWithActions); + } + } + +} diff --git a/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleStateMachineInfo.java b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleStateMachineInfo.java new file mode 100644 index 000000000..dc021add6 --- /dev/null +++ b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleStateMachineInfo.java @@ -0,0 +1,84 @@ +/* + * Copyright 2016 Hortonworks. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hortonworks.registries.schemaregistry.state; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.google.common.collect.Lists; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; + +/** + * This is a representation of {@link SchemaVersionLifecycleStateMachine} which can be serialized/deserialized with + * json format. This can be used by clients to understand the current schema version lifecycle state and act on + * accordingly. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public final class SchemaVersionLifecycleStateMachineInfo implements Serializable { + private static final long serialVersionUID = -3499349707355937574L; + + private Collection states; + private Collection transitions; + + private SchemaVersionLifecycleStateMachineInfo() { + } + + SchemaVersionLifecycleStateMachineInfo(Collection states, + Collection transitions) { + this.states = new ArrayList<>(states.size()); + this.transitions = Lists.newArrayList(transitions); + + for (SchemaVersionLifecycleState state : states) { + this.states.add(new BaseSchemaVersionLifecycleState(state.getName(), state.getId(), state.getDescription())); + } + } + + public Collection getStates() { + return states; + } + + public Collection getTransitions() { + return transitions; + } + + @Override + public String toString() { + return "Configuration{" + + "states=" + states + + ", transitions=" + transitions + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + SchemaVersionLifecycleStateMachineInfo that = (SchemaVersionLifecycleStateMachineInfo) o; + + if (states != null ? !states.equals(that.states) : that.states != null) return false; + return transitions != null ? transitions.equals(that.transitions) : that.transitions == null; + } + + @Override + public int hashCode() { + int result = states != null ? states.hashCode() : 0; + result = 31 * result + (transitions != null ? transitions.hashCode() : 0); + return result; + } + +} diff --git a/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleStatePojo.java b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleStatePojo.java new file mode 100644 index 000000000..434e6e9cb --- /dev/null +++ b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleStatePojo.java @@ -0,0 +1,82 @@ +/* + * Copyright 2016 Hortonworks. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hortonworks.registries.schemaregistry.state; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import java.io.Serializable; + +/** + * This class is used for serialization/deserialiation of {@link SchemaVersionLifecycleState} instances with json format. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public final class SchemaVersionLifecycleStatePojo implements SchemaVersionLifecycleState, Serializable { + private static final long serialVersionUID = -9100601483425067672L; + + private Byte id; + private String name; + private String description; + + private SchemaVersionLifecycleStatePojo() { + } + + public SchemaVersionLifecycleStatePojo(SchemaVersionLifecycleState schemaVersionLifecycleState) { + this.id = schemaVersionLifecycleState.getId(); + this.name = schemaVersionLifecycleState.getName(); + this.description = schemaVersionLifecycleState.getDescription(); + } + + public Byte getId() { + return id; + } + + public String getName() { + return name; + } + + public String getDescription() { + return description; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + SchemaVersionLifecycleStatePojo that = (SchemaVersionLifecycleStatePojo) o; + + if (id != null ? !id.equals(that.id) : that.id != null) return false; + if (name != null ? !name.equals(that.name) : that.name != null) return false; + return description != null ? description.equals(that.description) : that.description == null; + } + + @Override + public int hashCode() { + int result = id != null ? id.hashCode() : 0; + result = 31 * result + (name != null ? name.hashCode() : 0); + result = 31 * result + (description != null ? description.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "SchemaVersionLifecycleStatePojo{" + + "id=" + id + + ", name='" + name + '\'' + + ", description='" + description + '\'' + + '}'; + } +} diff --git a/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleStateTransition.java b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleStateTransition.java new file mode 100644 index 000000000..5809dd012 --- /dev/null +++ b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleStateTransition.java @@ -0,0 +1,93 @@ +/* + * Copyright 2016 Hortonworks. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hortonworks.registries.schemaregistry.state; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import java.io.Serializable; + +/** + * + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class SchemaVersionLifecycleStateTransition implements Serializable { + private static final long serialVersionUID = -3578936032929400872L; + + private Byte sourceStateId; + private Byte targetStateId; + private String name; + private String description; + + private SchemaVersionLifecycleStateTransition() { + } + + public SchemaVersionLifecycleStateTransition(Byte sourceStateId, Byte targetStateId) { + this(sourceStateId, targetStateId, null, null); + } + + SchemaVersionLifecycleStateTransition(Byte sourceStateId, Byte targetStateId, String name, String description) { + this.sourceStateId = sourceStateId; + this.targetStateId = targetStateId; + this.name = name; + this.description = description; + } + + public Byte getSourceStateId() { + return sourceStateId; + } + + public Byte getTargetStateId() { + return targetStateId; + } + + public String getName() { + return name; + } + + public String getDescription() { + return description; + } + + @Override + public String toString() { + return "Transition{" + + "sourceStateId=" + sourceStateId + + ", targetStateId=" + targetStateId + + ", name='" + name + '\'' + + ", description='" + description + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + SchemaVersionLifecycleStateTransition that = (SchemaVersionLifecycleStateTransition) o; + + if (sourceStateId != null ? !sourceStateId.equals(that.sourceStateId) : that.sourceStateId != null) + return false; + return targetStateId != null ? targetStateId.equals(that.targetStateId) : that.targetStateId == null; + } + + @Override + public int hashCode() { + int result = sourceStateId != null ? sourceStateId.hashCode() : 0; + result = 31 * result + (targetStateId != null ? targetStateId.hashCode() : 0); + return result; + } + +} diff --git a/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleStates.java b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleStates.java index 8a743eba8..182097e80 100644 --- a/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleStates.java +++ b/schema-registry/common/src/main/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleStates.java @@ -23,19 +23,17 @@ import com.hortonworks.registries.schemaregistry.SchemaVersionInfo; import com.hortonworks.registries.schemaregistry.errors.IncompatibleSchemaException; import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException; +import org.apache.commons.lang3.tuple.Pair; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; /** - * Schema version life cycle state flow of a specific version stored so that it can be seen how a specific version + * Schema version life cycle state flow is stored so that it can be seen how a specific version * changes its state till it finally reaches the terminal states. Once it reaches the terminal state, those entries * can be removed after a configured time. This may be useful for looking at changes of the schema version history. */ @@ -49,87 +47,30 @@ public final class SchemaVersionLifecycleStates { public static final InbuiltSchemaVersionLifecycleState DISABLED = new DisabledState(); public static final InbuiltSchemaVersionLifecycleState ARCHIVED = new ArchivedState(); public static final InbuiltSchemaVersionLifecycleState DELETED = new DeletedState(); - public static final SchemaVersionLifecycleState CUSTOM_STATE = new CustomState(); - - /** - * Registry of states which contain inbuilt ones and customs states can be registered with {@link #register(SchemaVersionLifecycleState)}. - */ - public static class Registry { - - ConcurrentMap states = new ConcurrentHashMap<>(); - - public Registry() { - registerInBuiltStates(); - } - - private void registerInBuiltStates() { - Field[] declaredFields = SchemaVersionLifecycleStates.class.getDeclaredFields(); - for (Field field : declaredFields) { - if (Modifier.isFinal(field.getModifiers()) && - Modifier.isStatic(field.getModifiers()) && - InbuiltSchemaVersionLifecycleState.class.isAssignableFrom(field.getType())) { - InbuiltSchemaVersionLifecycleState state = null; - try { - state = (InbuiltSchemaVersionLifecycleState) field.get(null); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } - register(state); - } - } - } - - /** - * Registers the given state with REGISTRY. - * - * @param state state to be registered. - * - * @throws IllegalArgumentException if the given state is already registered. - */ - public void register(SchemaVersionLifecycleState state) { - SchemaVersionLifecycleState prevState = states.putIfAbsent(state.id(), state); - if (prevState == state) { - throw new IllegalArgumentException("Given state is already registered as " + prevState); - } - } - - /** - * @param stateId state id to be deregistered. - * - * @return Returns true if the given stateId exists and deregistered, false otherwise. - */ - public boolean deregister(Byte stateId) { - return states.remove(stateId) != null; - } - - public SchemaVersionLifecycleState get(Byte stateId) { - return states.get(stateId); - } - - } private static final class InitiatedState extends AbstractInbuiltSchemaLifecycleState { private InitiatedState() { - super("INITIATED", (byte) 1, "Schema version is initialized, It can either go to review or enabled states.", Lists - .newArrayList(START_REVIEW, ENABLED)); + super("INITIATED", (byte) 1, "Schema version is initialized, It can either go to review or enabled states."); } @Override - public void startReview(SchemaVersionLifecycleContext context, - SchemaReviewExecutor schemaReviewExecutor) + public void startReview(SchemaVersionLifecycleContext context) throws SchemaLifecycleException, SchemaNotFoundException { - context.setState(START_REVIEW); - // get review state executor and - context.updateSchemaVersionState(); + transitionToStartReview(context); } @Override public void enable(SchemaVersionLifecycleContext context) - throws SchemaLifecycleException, SchemaNotFoundException, IncompatibleSchemaException { + throws SchemaLifecycleException, IncompatibleSchemaException, SchemaNotFoundException { transitionToEnableState(context); } + @Override + public Collection> getTransitionActions() { + return Lists.newArrayList(createStartReviewTransitionActionPair(getId()), createArchiveTransitionAction(getId())); + } + @Override public String toString() { return "InitiatedState{" + super.toString() + "}"; @@ -141,14 +82,17 @@ private static final class StartReviewState extends AbstractInbuiltSchemaLifecyc // it should invoke custom state and return to success/failure state eventually. private StartReviewState() { - super("StartReview", (byte) 2, "Initiates the process for reviewing with the given custom state", Collections - .singletonList(CUSTOM_STATE)); + super("StartReview", (byte) 2, "Initiates the process for reviewing with the given custom state"); } - public void startReview(SchemaVersionLifecycleContext context, - SchemaReviewExecutor schemaReviewExecutor) + public void startReview(SchemaVersionLifecycleContext context) throws SchemaLifecycleException, SchemaNotFoundException { - schemaReviewExecutor.execute(context); + transitionToStartReview(context); + } + + @Override + public Collection> getTransitionActions() { + return Collections.emptyList(); } @Override @@ -163,25 +107,25 @@ private static final class ChangesRequiredState extends AbstractInbuiltSchemaLif public ChangesRequiredState() { super("ChangesRequired", (byte) 3, - "Requires changes to be done in this schema", - Lists.newArrayList(START_REVIEW, DELETED)); + "Requires changes to be done in this schema" + ); } @Override - public void startReview(SchemaVersionLifecycleContext context, - SchemaReviewExecutor schemaReviewExecutor) + public void startReview(SchemaVersionLifecycleContext context) throws SchemaLifecycleException, SchemaNotFoundException { - context.setState(START_REVIEW); - // execute start review process, updation of the state should be done by schemaReviewExecutor - schemaReviewExecutor.execute(context); + transitionToStartReview(context); } @Override public void delete(SchemaVersionLifecycleContext context) throws SchemaLifecycleException, SchemaNotFoundException { - context.setState(DELETED); - context.getSchemaVersionService().deleteSchemaVersion(context.getSchemaVersionId()); - context.updateSchemaVersionState(); + transitionToDeleteState(context); + } + + @Override + public Collection> getTransitionActions() { + return Lists.newArrayList(createStartReviewTransitionActionPair(getId()), createDeleteTransitionActionPair(getId())); } @Override @@ -196,21 +140,26 @@ private static final class ReviewedState extends AbstractInbuiltSchemaLifecycleS private ReviewedState() { super("ReviewedState", (byte) 4, - "This schema version is successfully reviewed", - Lists.newArrayList(ENABLED, ARCHIVED)); + "This schema version is successfully reviewed" + ); } @Override public void enable(SchemaVersionLifecycleContext context) - throws SchemaLifecycleException, SchemaNotFoundException, IncompatibleSchemaException { + throws SchemaLifecycleException, IncompatibleSchemaException, SchemaNotFoundException { transitionToEnableState(context); } @Override public void archive(SchemaVersionLifecycleContext context) throws SchemaLifecycleException, SchemaNotFoundException { - context.setState(ARCHIVED); - context.updateSchemaVersionState(); + transitionToArchiveState(context); + } + + @Override + public Collection> + getTransitionActions() { + return Lists.newArrayList(createEnableTransitionAction(getId()), createArchiveTransitionAction(getId())); } @Override @@ -220,27 +169,129 @@ public String toString() { } + private static Pair + createDeleteTransitionActionPair( + Byte sourceStateId) { + return Pair.of(new SchemaVersionLifecycleStateTransition(sourceStateId, + DELETED.getId(), + "Delete", + "Deletes the schema version"), + context -> { + try { + transitionToDeleteState(context); + } catch (SchemaNotFoundException e) { + throw new SchemaLifecycleException(e); + } + }); + } + + private static Pair + createStartReviewTransitionActionPair(Byte sourceStateId) { + return Pair.of(new SchemaVersionLifecycleStateTransition(sourceStateId, START_REVIEW.getId(), + "StartReview", + "Starts review state"), + context -> { + try { + transitionToStartReview(context); + } catch (SchemaNotFoundException e) { + throw new SchemaLifecycleException(e); + } + }); + } + + + private static Pair + createDisableAction(Byte sourceStateId) { + return Pair.of(new SchemaVersionLifecycleStateTransition(sourceStateId, + DISABLED.getId(), + "Disable", + "Disables the schema version"), + context -> { + try { + transitionToDisableState(context); + } catch (SchemaNotFoundException e) { + throw new SchemaLifecycleException(e); + } + }); + } + + private static Pair + createArchiveTransitionAction(Byte sourceStateId) { + return Pair.of(new SchemaVersionLifecycleStateTransition(sourceStateId, + ARCHIVED.getId(), + "Archive", + "Archives the schema version"), + context -> { + try { + transitionToArchiveState(context); + } catch (SchemaNotFoundException e) { + throw new SchemaLifecycleException(e); + } + }); + } + + private static Pair + createEnableTransitionAction(Byte sourceStateId) { + return Pair.of(new SchemaVersionLifecycleStateTransition(sourceStateId, + ENABLED.getId(), + "Enable", + "Enables the schema version"), + context -> { + try { + transitionToEnableState(context); + } catch (SchemaNotFoundException | IncompatibleSchemaException e) { + throw new SchemaLifecycleException(e); + } + }); + } + + private static void transitionToDisableState(SchemaVersionLifecycleContext context) throws SchemaLifecycleException, SchemaNotFoundException { + context.setState(DISABLED); + context.updateSchemaVersionState(); + } + + private static void transitionToStartReview(SchemaVersionLifecycleContext context) throws SchemaLifecycleException, SchemaNotFoundException { + context.setState(START_REVIEW); + // execute start review process, updation of the state should be done by schemaReviewExecutor + context.getCustomSchemaStateExecutor().executeReviewState(context); + } + + private static void transitionToArchiveState(SchemaVersionLifecycleContext context) throws SchemaLifecycleException, SchemaNotFoundException { + context.setState(ARCHIVED); + context.updateSchemaVersionState(); + } + + private static void transitionToDeleteState(SchemaVersionLifecycleContext context) throws SchemaLifecycleException, SchemaNotFoundException { + context.setState(DELETED); + context.getSchemaVersionService().deleteSchemaVersion(context.getSchemaVersionId()); + context.updateSchemaVersionState(); + } + private static final class EnabledState extends AbstractInbuiltSchemaLifecycleState { private EnabledState() { super("Enabled", (byte) 5, - "Schema version is enabled", - Lists.newArrayList(DISABLED, ARCHIVED)); + "Schema version is enabled" + ); } @Override public void disable(SchemaVersionLifecycleContext context) throws SchemaLifecycleException, SchemaNotFoundException { - context.setState(DISABLED); - context.updateSchemaVersionState(); + transitionToDisableState(context); } @Override public void archive(SchemaVersionLifecycleContext context) throws SchemaLifecycleException, SchemaNotFoundException { - context.setState(ARCHIVED); - context.updateSchemaVersionState(); + transitionToArchiveState(context); + } + + @Override + public Collection> getTransitionActions() { + return Lists.newArrayList(createDisableAction(getId()), createArchiveTransitionAction(getId())); + } @Override @@ -254,23 +305,27 @@ private static final class DisabledState extends AbstractInbuiltSchemaLifecycleS private DisabledState() { super("Disabled", (byte) 6, - "Schema version is disabled", - Lists.newArrayList(ENABLED, ARCHIVED)); + "Schema version is disabled" + ); } @Override public void enable(SchemaVersionLifecycleContext context) - throws SchemaLifecycleException, SchemaNotFoundException, IncompatibleSchemaException { + throws SchemaLifecycleException, IncompatibleSchemaException, SchemaNotFoundException { transitionToEnableState(context); } @Override public void archive(SchemaVersionLifecycleContext context) throws SchemaLifecycleException, SchemaNotFoundException { - context.setState(ARCHIVED); - context.updateSchemaVersionState(); + transitionToArchiveState(context); } + @Override + public Collection> getTransitionActions() { + return Lists.newArrayList(createEnableTransitionAction(getId()), createArchiveTransitionAction(getId())); + + } @Override public String toString() { @@ -301,8 +356,8 @@ private static final class ArchivedState extends AbstractInbuiltSchemaLifecycleS private ArchivedState() { super("Archived", (byte) 7, - "Schema is archived and it is a terminal state", - Collections.emptyList()); + "Schema is archived and it is a terminal state" + ); } @Override @@ -310,6 +365,10 @@ public String toString() { return "ArchivedState{" + super.toString() + "}"; } + @Override + public Collection> getTransitionActions() { + return Collections.emptyList(); + } } private static final class DeletedState extends AbstractInbuiltSchemaLifecycleState { @@ -317,8 +376,8 @@ private static final class DeletedState extends AbstractInbuiltSchemaLifecycleSt private DeletedState() { super("Deleted", (byte) 8, - "Schema is deleted and it is a terminal state", - Collections.emptyList()); + "Schema is deleted and it is a terminal state" + ); } @Override @@ -326,22 +385,9 @@ public String toString() { return "DeletedState{" + super.toString() + "}"; } - } - - private static final class CustomState implements SchemaVersionLifecycleState { - @Override - public Byte id() { - return Byte.MIN_VALUE; - } - @Override - public String name() { - return "custom"; - } - - @Override - public String description() { - return "Custom state at which user runs its own logic to move state"; + public Collection> getTransitionActions() { + return Collections.emptyList(); } } @@ -361,10 +407,10 @@ public static void transitionToEnableState(SchemaVersionLifecycleContext context List allEnabledSchemaVersions = schemaVersionService.getAllSchemaVersions(schemaName) .stream() - .filter(x -> SchemaVersionLifecycleStates.ENABLED.id().equals(x.getStateId())) + .filter(x -> SchemaVersionLifecycleStates.ENABLED.getId().equals(x.getStateId())) .collect(Collectors.toList()); - if(!allEnabledSchemaVersions.isEmpty()) { + if (!allEnabledSchemaVersions.isEmpty()) { if (validationLevel.equals(SchemaValidationLevel.ALL)) { for (SchemaVersionInfo curSchemaVersionInfo : allEnabledSchemaVersions) { int curVersion = curSchemaVersionInfo.getVersion(); @@ -398,5 +444,4 @@ public static void transitionToEnableState(SchemaVersionLifecycleContext context context.setState(ENABLED); context.updateSchemaVersionState(); } - } diff --git a/schema-registry/common/src/test/java/com/hortonworks/registries/schemaregistry/SchemaVersionInfoCacheTest.java b/schema-registry/common/src/test/java/com/hortonworks/registries/schemaregistry/SchemaVersionInfoCacheTest.java index cb70c1799..dffada4d4 100644 --- a/schema-registry/common/src/test/java/com/hortonworks/registries/schemaregistry/SchemaVersionInfoCacheTest.java +++ b/schema-registry/common/src/test/java/com/hortonworks/registries/schemaregistry/SchemaVersionInfoCacheTest.java @@ -41,7 +41,7 @@ public void testSchemaVersionCache() throws Exception { Integer version = 2; long schemaVersionId = 3L; SchemaVersionInfo schemaVersionInfo = new SchemaVersionInfo(schemaVersionId, schemaName, version, schemaMetadataId, "schema-text", System - .currentTimeMillis(), "schema-description", SchemaVersionLifecycleStates.ENABLED.id()); + .currentTimeMillis(), "schema-description", SchemaVersionLifecycleStates.ENABLED.getId()); SchemaIdVersion withVersionId = new SchemaIdVersion(schemaVersionId); SchemaIdVersion withMetaIdAndVersion = new SchemaIdVersion(schemaMetadataId, version); SchemaIdVersion withBoth = new SchemaIdVersion(schemaMetadataId, version, schemaVersionId); @@ -51,7 +51,7 @@ public void testSchemaVersionCache() throws Exception { allIdVersions.stream().forEach(x -> schemaIdWithVersionInfo.put(x, schemaVersionInfo)); SchemaVersionInfo otherSchemaVersionInfo = new SchemaVersionInfo(schemaVersionId + 1, "other-" + schemaName, version, schemaMetadataId+1L, "other-schema-text", System - .currentTimeMillis(), "schema-description", SchemaVersionLifecycleStates.ENABLED.id()); + .currentTimeMillis(), "schema-description", SchemaVersionLifecycleStates.ENABLED.getId()); SchemaIdVersion otherIdVersion = new SchemaIdVersion(otherSchemaVersionInfo.getId()); SchemaVersionKey otherSchemaVersionKey = new SchemaVersionKey(otherSchemaVersionInfo.getName(), otherSchemaVersionInfo .getVersion()); diff --git a/schema-registry/common/src/test/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleStatesTest.java b/schema-registry/common/src/test/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleStatesTest.java index 92e1c0ca9..3c0ea9868 100644 --- a/schema-registry/common/src/test/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleStatesTest.java +++ b/schema-registry/common/src/test/java/com/hortonworks/registries/schemaregistry/state/SchemaVersionLifecycleStatesTest.java @@ -23,7 +23,6 @@ import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -37,7 +36,6 @@ /** * */ -//@Ignore public class SchemaVersionLifecycleStatesTest { private static Logger LOG = LoggerFactory.getLogger(SchemaVersionLifecycleStatesTest.class); @@ -55,7 +53,7 @@ public void setup() { SchemaVersionInfo schemaVersionInfo = new SchemaVersionInfo(new Random().nextLong(), schemaMetadata.getName(), 1, schemaMetadataInfo.getId(), "{\"type\":\"string\"}", System.currentTimeMillis(), - "", SchemaVersionLifecycleStates.ENABLED.id()); + "", SchemaVersionLifecycleStates.ENABLED.getId()); SchemaVersionService schemaVersionServiceMock = new SchemaVersionService() { @Override @@ -91,7 +89,14 @@ public Collection getAllSchemaVersions(String schemaName) thr } }; - context = new SchemaVersionLifecycleContext(schemaVersionInfo.getId(), 1, schemaVersionServiceMock, new SchemaVersionLifecycleStates.Registry()); + SchemaVersionLifecycleStateMachine lifecycleStateMachine = SchemaVersionLifecycleStateMachine.newBuilder() + .build(); + + context = new SchemaVersionLifecycleContext(schemaVersionInfo.getId(), + 1, + schemaVersionServiceMock, + lifecycleStateMachine, + new DefaultCustomSchemaStateExecutor()); } @Test @@ -99,8 +104,8 @@ public void testInitiatedState() throws Exception { InbuiltSchemaVersionLifecycleState initiated = SchemaVersionLifecycleStates.INITIATED; - DefaultSchemaReviewExecutor schemaReviewExecutor = createDefaultSchemaReviewExecutor(); - initiated.startReview(context, schemaReviewExecutor); + DefaultCustomSchemaStateExecutor schemaReviewExecutor = createDefaultSchemaReviewExecutor(); + initiated.startReview(context); initiated.enable(context); checkDisableNotSupported(initiated, context); @@ -108,9 +113,12 @@ public void testInitiatedState() throws Exception { checkDeleteNotSupported(initiated, context); } - private DefaultSchemaReviewExecutor createDefaultSchemaReviewExecutor() { - DefaultSchemaReviewExecutor schemaReviewExecutor = new DefaultSchemaReviewExecutor(); - schemaReviewExecutor.init(SchemaVersionLifecycleStates.REVIEWED, SchemaVersionLifecycleStates.CHANGES_REQUIRED, Collections.emptyMap()); + private DefaultCustomSchemaStateExecutor createDefaultSchemaReviewExecutor() { + DefaultCustomSchemaStateExecutor schemaReviewExecutor = new DefaultCustomSchemaStateExecutor(); + schemaReviewExecutor.init(SchemaVersionLifecycleStateMachine.newBuilder(), + SchemaVersionLifecycleStates.REVIEWED.getId(), + SchemaVersionLifecycleStates.CHANGES_REQUIRED.getId(), + Collections.emptyMap()); return schemaReviewExecutor; } @@ -169,17 +177,16 @@ private void checkArchiveNotSupported(InbuiltSchemaVersionLifecycleState state, SchemaVersionLifecycleContext context) throws SchemaNotFoundException { try { state.archive(context); - Assert.fail(state.name() + " should not lead to archive state"); + Assert.fail(state.getName() + " should not lead to archive state"); } catch (SchemaLifecycleException e) { } } - private void checkDeleteNotSupported(InbuiltSchemaVersionLifecycleState state, SchemaVersionLifecycleContext context) throws SchemaNotFoundException { try { state.delete(context); - Assert.fail(state.name() + " should not lead to delete state"); + Assert.fail(state.getName() + " should not lead to delete state"); } catch (SchemaLifecycleException e) { } } @@ -188,7 +195,7 @@ private void checkDisableNotSupported(InbuiltSchemaVersionLifecycleState state, SchemaVersionLifecycleContext context) throws SchemaNotFoundException { try { state.disable(context); - Assert.fail(state.name() + " should not lead to disabled state"); + Assert.fail(state.getName() + " should not lead to disabled state"); } catch (SchemaLifecycleException e) { } } @@ -197,7 +204,7 @@ private void checkEnableNotSupported(InbuiltSchemaVersionLifecycleState state, SchemaVersionLifecycleContext context) throws SchemaNotFoundException, IncompatibleSchemaException { try { state.enable(context); - Assert.fail(state.name() + " should not lead to enable state"); + Assert.fail(state.getName() + " should not lead to enable state"); } catch (SchemaLifecycleException e) { } } @@ -205,8 +212,8 @@ private void checkEnableNotSupported(InbuiltSchemaVersionLifecycleState state, private void checkStartReviewNotSupported(InbuiltSchemaVersionLifecycleState state, SchemaVersionLifecycleContext context) throws SchemaNotFoundException { try { - state.startReview(context, createDefaultSchemaReviewExecutor()); - Assert.fail(state.name() + " should not lead to startReview state"); + state.startReview(context); + Assert.fail(state.getName() + " should not lead to startReview state"); } catch (SchemaLifecycleException e) { } } diff --git a/schema-registry/core/src/main/java/com/hortonworks/registries/schemaregistry/DefaultSchemaRegistry.java b/schema-registry/core/src/main/java/com/hortonworks/registries/schemaregistry/DefaultSchemaRegistry.java index 54c825081..eb95ed506 100644 --- a/schema-registry/core/src/main/java/com/hortonworks/registries/schemaregistry/DefaultSchemaRegistry.java +++ b/schema-registry/core/src/main/java/com/hortonworks/registries/schemaregistry/DefaultSchemaRegistry.java @@ -23,7 +23,7 @@ import com.hortonworks.registries.schemaregistry.errors.UnsupportedSchemaTypeException; import com.hortonworks.registries.schemaregistry.serde.SerDesException; import com.hortonworks.registries.schemaregistry.state.SchemaLifecycleException; -import com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleState; +import com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleStateMachineInfo; import com.hortonworks.registries.storage.OrderByField; import com.hortonworks.registries.storage.Storable; import com.hortonworks.registries.storage.StorableKey; @@ -452,13 +452,12 @@ public void startSchemaVersionReview(Long schemaVersionId) throws SchemaNotFound } @Override - public void executeCustomState(Long schemaVersionId) throws SchemaNotFoundException, SchemaLifecycleException { - schemaVersionLifecycleManager.executeCustomState(schemaVersionId); + public void transitionState(Long schemaVersionId, Byte targetStateId) throws SchemaNotFoundException, SchemaLifecycleException { + schemaVersionLifecycleManager.executeState(schemaVersionId, targetStateId); } - public List getSchemaVersionLifecycleStates() { - //todo - return null; + public SchemaVersionLifecycleStateMachineInfo getSchemaVersionLifecycleStateMachineInfo() { + return schemaVersionLifecycleManager.getSchemaVersionLifecycleStateMachine().toConfig(); } @Override diff --git a/schema-registry/core/src/main/java/com/hortonworks/registries/schemaregistry/SchemaVersionLifecycleManager.java b/schema-registry/core/src/main/java/com/hortonworks/registries/schemaregistry/SchemaVersionLifecycleManager.java index 966b1e983..1af8d2b20 100644 --- a/schema-registry/core/src/main/java/com/hortonworks/registries/schemaregistry/SchemaVersionLifecycleManager.java +++ b/schema-registry/core/src/main/java/com/hortonworks/registries/schemaregistry/SchemaVersionLifecycleManager.java @@ -24,11 +24,14 @@ import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException; import com.hortonworks.registries.schemaregistry.errors.UnsupportedSchemaTypeException; import com.hortonworks.registries.schemaregistry.state.InbuiltSchemaVersionLifecycleState; -import com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleContext; import com.hortonworks.registries.schemaregistry.state.SchemaLifecycleException; +import com.hortonworks.registries.schemaregistry.state.CustomSchemaStateExecutor; +import com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleContext; import com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleState; +import com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleStateAction; +import com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleStateMachine; +import com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleStateTransition; import com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleStates; -import com.hortonworks.registries.schemaregistry.state.SchemaReviewExecutor; import com.hortonworks.registries.schemaregistry.state.SchemaVersionService; import com.hortonworks.registries.storage.OrderByField; import com.hortonworks.registries.storage.StorableKey; @@ -52,9 +55,11 @@ public class SchemaVersionLifecycleManager { private static final Logger LOG = LoggerFactory.getLogger(SchemaVersionLifecycleManager.class); - private static final String DEFAULT_SCHEMA_REVIEW_EXECUTOR_CLASS = "com.hortonworks.registries.schemaregistry.state.DefaultSchemaReviewExecutor"; + private static final String DEFAULT_SCHEMA_REVIEW_EXECUTOR_CLASS = "com.hortonworks.registries.schemaregistry.state.DefaultCustomSchemaStateExecutor"; public static final InbuiltSchemaVersionLifecycleState DEFAULT_VERSION_STATE = SchemaVersionLifecycleStates.INITIATED; - private SchemaReviewExecutor schemaReviewExecutor; + + private final SchemaVersionLifecycleStateMachine schemaVersionLifecycleStateMachine; + private CustomSchemaStateExecutor customSchemaStateExecutor; private SchemaVersionInfoCache schemaVersionInfoCache; private SchemaVersionRetriever schemaVersionRetriever; private SlotSynchronizer slotSynchronizer = new SlotSynchronizer<>(); @@ -64,14 +69,13 @@ public class SchemaVersionLifecycleManager { // todo remove this lock usage private Object addOrUpdateLock = new Object(); - private SchemaVersionLifecycleStates.Registry schemaLifeCycleStatesRegistry; public SchemaVersionLifecycleManager(StorageManager storageManager, Map props, DefaultSchemaRegistry.SchemaMetadataFetcher schemaMetadataFetcher) { this.storageManager = storageManager; this.schemaMetadataFetcher = schemaMetadataFetcher; - schemaLifeCycleStatesRegistry = new SchemaVersionLifecycleStates.Registry(); + SchemaVersionLifecycleStateMachine.Builder builder = SchemaVersionLifecycleStateMachine.newBuilder(); Options options = new Options(props); schemaVersionRetriever = createSchemaVersionRetriever(); @@ -81,27 +85,38 @@ public SchemaVersionLifecycleManager(StorageManager storageManager, options.getMaxSchemaCacheSize(), options.getSchemaExpiryInSecs()); - schemaReviewExecutor = createSchemaReviewExecutor(props); + customSchemaStateExecutor = createSchemaReviewExecutor(props, builder); + + schemaVersionLifecycleStateMachine = builder.build(); } - private SchemaReviewExecutor createSchemaReviewExecutor(Map props) { - Map schemaReviewExecConfig = (Map) props.getOrDefault("schemaReviewExecutor", + private CustomSchemaStateExecutor createSchemaReviewExecutor(Map props, + SchemaVersionLifecycleStateMachine.Builder builder) { + Map schemaReviewExecConfig = (Map) props.getOrDefault("customSchemaStateExecutor", Collections.emptyMap()); String className = (String) schemaReviewExecConfig.getOrDefault("className", DEFAULT_SCHEMA_REVIEW_EXECUTOR_CLASS); Map executorProps = (Map) schemaReviewExecConfig.getOrDefault("props", Collections.emptyMap()); - SchemaReviewExecutor schemaReviewExecutor; + CustomSchemaStateExecutor customSchemaStateExecutor; try { - schemaReviewExecutor = (SchemaReviewExecutor) Class.forName(className, - true, - Thread.currentThread().getContextClassLoader()) - .newInstance(); + customSchemaStateExecutor = (CustomSchemaStateExecutor) Class.forName(className, + true, + Thread.currentThread().getContextClassLoader()) + .newInstance(); } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { - LOG.error("Error encountered while loading SchemaReviewExecutor [{}]", className, e); + LOG.error("Error encountered while loading class [{}]", className, e); throw new IllegalArgumentException(e); } - schemaReviewExecutor.init(SchemaVersionLifecycleStates.REVIEWED, SchemaVersionLifecycleStates.CHANGES_REQUIRED, executorProps); - return schemaReviewExecutor; + customSchemaStateExecutor.init(builder, + SchemaVersionLifecycleStates.REVIEWED.getId(), + SchemaVersionLifecycleStates.CHANGES_REQUIRED.getId(), + executorProps); + + return customSchemaStateExecutor; + } + + public SchemaVersionLifecycleStateMachine getSchemaVersionLifecycleStateMachine() { + return schemaVersionLifecycleStateMachine; } public SchemaVersionRetriever getSchemaVersionRetriever() { @@ -147,7 +162,8 @@ public SchemaIdVersion addSchemaVersion(String schemaName, if (schemaMetadataInfo != null) { SchemaMetadata schemaMetadata = schemaMetadataInfo.getSchemaMetadata(); // check whether the same schema text exists - schemaVersionInfo = findSchemaVersion(schemaMetadata.getType(), schemaVersion.getSchemaText(), schemaMetadataInfo.getId()); + schemaVersionInfo = findSchemaVersion(schemaMetadata.getType(), schemaVersion.getSchemaText(), schemaMetadataInfo + .getId()); if (schemaVersionInfo == null) { schemaVersionInfo = createSchemaVersion(schemaMetadata, schemaMetadataInfo.getId(), @@ -161,7 +177,7 @@ public SchemaIdVersion addSchemaVersion(String schemaName, } public SchemaVersionInfo getLatestEnabledSchemaVersionInfo(String schemaName) throws SchemaNotFoundException { - return getLatestSchemaVersionInfo(schemaName, SchemaVersionLifecycleStates.ENABLED.id()); + return getLatestSchemaVersionInfo(schemaName, SchemaVersionLifecycleStates.ENABLED.getId()); } public SchemaVersionInfo getLatestSchemaVersionInfo(String schemaName) throws SchemaNotFoundException { @@ -215,7 +231,7 @@ private SchemaVersionInfo createSchemaVersion(SchemaMetadata schemaMetadata, schemaVersionStorable.setDescription(schemaVersion.getDescription()); schemaVersionStorable.setTimestamp(System.currentTimeMillis()); - schemaVersionStorable.setState(DEFAULT_VERSION_STATE.id()); + schemaVersionStorable.setState(DEFAULT_VERSION_STATE.getId()); // take a lock for a schema with same name. SlotSynchronizer.Lock slotLock = slotSynchronizer.lockSlot(schemaName); @@ -227,7 +243,7 @@ private SchemaVersionInfo createSchemaVersion(SchemaMetadata schemaMetadata, Byte initialState = schemaVersion.getInitialState(); if (schemaMetadata.isEvolve()) { // if the given version is added with enabled state then only check for compatibility - if (SchemaVersionLifecycleStates.ENABLED.id().equals(initialState)) { + if (SchemaVersionLifecycleStates.ENABLED.getId().equals(initialState)) { CompatibilityResult compatibilityResult = checkCompatibility(schemaName, schemaVersion.getSchemaText()); if (!compatibilityResult.isCompatible()) { String errMsg = String.format("Given schema is not compatible with latest schema versions. \n" + @@ -280,18 +296,19 @@ private SchemaVersionInfo createSchemaVersion(SchemaMetadata schemaMetadata, private void updateSchemaVersionState(Long schemaVersionId, Byte initialState) throws SchemaNotFoundException { try { - SchemaVersionLifecycleContext schemaVersionLifecycleContext = new SchemaVersionLifecycleContext(schemaVersionId, 1, createSchemaVersionService(), schemaLifeCycleStatesRegistry); - schemaVersionLifecycleContext.setState(getSchemaVersionLifeCycleState(initialState)); + SchemaVersionLifecycleContext schemaVersionLifecycleContext = + new SchemaVersionLifecycleContext(schemaVersionId, + 1, + createSchemaVersionService(), + schemaVersionLifecycleStateMachine, + customSchemaStateExecutor); + schemaVersionLifecycleContext.setState(schemaVersionLifecycleStateMachine.getStates().get(initialState)); schemaVersionLifecycleContext.updateSchemaVersionState(); } catch (SchemaLifecycleException e) { throw new RuntimeException(e); } } - private InbuiltSchemaVersionLifecycleState getSchemaVersionLifeCycleState(Byte initialState) { - return (InbuiltSchemaVersionLifecycleState) schemaLifeCycleStatesRegistry.get(initialState); - } - private SchemaProvider getSchemaProvider(String type) { return schemaMetadataFetcher.getSchemaProvider(type); } @@ -304,7 +321,7 @@ public CompatibilityResult checkCompatibility(String schemaName, String toSchema switch (validationLevel) { case LATEST: SchemaVersionInfo latestSchemaVersionInfo = getLatestEnabledSchemaVersionInfo(schemaName); - if(latestSchemaVersionInfo != null) { + if (latestSchemaVersionInfo != null) { compatibilityResult = checkCompatibility(schemaMetadata.getType(), toSchema, latestSchemaVersionInfo.getSchemaText(), @@ -318,7 +335,7 @@ public CompatibilityResult checkCompatibility(String schemaName, String toSchema case ALL: Collection schemaVersionInfos = getAllVersions(schemaName); for (SchemaVersionInfo schemaVersionInfo : schemaVersionInfos) { - if (SchemaVersionLifecycleStates.ENABLED.id().equals(schemaVersionInfo.getStateId())) { + if (SchemaVersionLifecycleStates.ENABLED.getId().equals(schemaVersionInfo.getStateId())) { compatibilityResult = checkCompatibility(schemaMetadata.getType(), toSchema, schemaVersionInfo.getSchemaText(), @@ -440,9 +457,8 @@ public void deleteSchemaVersion(SchemaVersionKey schemaVersionKey) throws Schema } } - private ImmutablePair createSchemaVersionLifeCycleContextAndState( - Long schemaVersionId) - throws SchemaNotFoundException { + private ImmutablePair + createSchemaVersionLifeCycleContextAndState(Long schemaVersionId) throws SchemaNotFoundException { // get the current state from storage for the given versionID // we can use a query to get max value for the column for a given schema-version-id but StorageManager does not // have API to take custom queries. @@ -456,12 +472,14 @@ private ImmutablePair(context, schemaVersionLifecycleState); } @@ -506,7 +524,7 @@ private void storeSchemaVersionState(SchemaVersionLifecycleContext schemaVersion // store versions state, sequence SchemaVersionStateStorable stateStorable = new SchemaVersionStateStorable(); Long schemaVersionId = schemaVersionLifecycleContext.getSchemaVersionId(); - byte stateId = schemaVersionLifecycleContext.getState().id(); + byte stateId = schemaVersionLifecycleContext.getState().getId(); stateStorable.setSchemaVersionId(schemaVersionId); stateStorable.setSequence(schemaVersionLifecycleContext.getSequence() + 1); @@ -566,14 +584,31 @@ public void disableSchemaVersion(Long schemaVersionId) throws SchemaNotFoundExce public void startSchemaVersionReview(Long schemaVersionId) throws SchemaNotFoundException, SchemaLifecycleException { ImmutablePair pair = createSchemaVersionLifeCycleContextAndState(schemaVersionId); - ((InbuiltSchemaVersionLifecycleState) pair.getRight()).startReview(pair.getLeft(), schemaReviewExecutor); + ((InbuiltSchemaVersionLifecycleState) pair.getRight()).startReview(pair.getLeft()); } - public void executeCustomState(Long schemaVersionId) throws SchemaNotFoundException, SchemaLifecycleException { - ImmutablePair schemaLifeCycleContextAndState = createSchemaVersionLifeCycleContextAndState(schemaVersionId); + public void executeState(Long schemaVersionId, Byte targetState) + throws SchemaLifecycleException, SchemaNotFoundException { + ImmutablePair schemaLifeCycleContextAndState = + createSchemaVersionLifeCycleContextAndState(schemaVersionId); SchemaVersionLifecycleContext schemaVersionLifecycleContext = schemaLifeCycleContextAndState.getLeft(); - schemaVersionLifecycleContext.setState(schemaLifeCycleContextAndState.getRight()); - schemaReviewExecutor.execute(schemaVersionLifecycleContext); + SchemaVersionLifecycleState currentState = schemaLifeCycleContextAndState.getRight(); + + schemaVersionLifecycleContext.setState(currentState); + SchemaVersionLifecycleStateTransition transition = + new SchemaVersionLifecycleStateTransition(currentState.getId(), targetState); + SchemaVersionLifecycleStateAction action = schemaVersionLifecycleContext.getSchemaLifeCycleStatesMachine() + .getTransitions() + .get(transition); + try { + action.execute(schemaVersionLifecycleContext); + } catch (SchemaLifecycleException e) { + Throwable cause = e.getCause(); + if (cause != null && cause instanceof SchemaNotFoundException) { + throw (SchemaNotFoundException) cause; + } + throw e; + } } private SchemaVersionInfo retrieveSchemaVersionInfo(SchemaVersionKey schemaVersionKey) throws SchemaNotFoundException { diff --git a/schema-registry/rest-service/src/main/java/com/hortonworks/registries/schemaregistry/webservice/SchemaRegistryResource.java b/schema-registry/rest-service/src/main/java/com/hortonworks/registries/schemaregistry/webservice/SchemaRegistryResource.java index e4ae6477b..f1204c4df 100644 --- a/schema-registry/rest-service/src/main/java/com/hortonworks/registries/schemaregistry/webservice/SchemaRegistryResource.java +++ b/schema-registry/rest-service/src/main/java/com/hortonworks/registries/schemaregistry/webservice/SchemaRegistryResource.java @@ -39,7 +39,7 @@ import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException; import com.hortonworks.registries.schemaregistry.errors.UnsupportedSchemaTypeException; import com.hortonworks.registries.schemaregistry.state.SchemaLifecycleException; -import com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleState; +import com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleStateMachineInfo; import com.hortonworks.registries.storage.search.OrderBy; import com.hortonworks.registries.storage.search.WhereClause; import io.swagger.annotations.Api; @@ -52,12 +52,12 @@ import org.slf4j.LoggerFactory; import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; -import javax.ws.rs.DELETE; import javax.ws.rs.core.Context; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; @@ -609,14 +609,14 @@ public Response getSchemaVersionById(@ApiParam(value = "version identifier of th } @GET - @Path("/schemas/versionsById/states") + @Path("/schemas/versions/statemachine") @ApiOperation(value = "Get schema version life cycle states", response = SchemaVersionInfo.class, tags = OPERATION_GROUP_SCHEMA) @Timed public Response getSchemaVersionLifeCycleStates() { Response response; try { - List states = schemaRegistry.getSchemaVersionLifecycleStates(); + SchemaVersionLifecycleStateMachineInfo states = schemaRegistry.getSchemaVersionLifecycleStateMachineInfo(); response = WSUtils.respondEntity(states, Response.Status.OK); } catch (Exception ex) { LOG.error("Encountered error while getting schema version lifecycle states", ex); @@ -627,7 +627,7 @@ public Response getSchemaVersionLifeCycleStates() { } @POST - @Path("/schemas/versionsById/{id}/state/enable") + @Path("/schemas/versions/{id}/state/enable") @ApiOperation(value = "Enables version of the schema identified by the given versionid", response = Boolean.class, tags = OPERATION_GROUP_SCHEMA) @Timed @@ -655,7 +655,7 @@ public Response enableSchema(@ApiParam(value = "version identifier of the schema } @POST - @Path("/schemas/versionsById/{id}/state/disable") + @Path("/schemas/versions/{id}/state/disable") @ApiOperation(value = "Disables version of the schema identified by the given version id", response = Boolean.class, tags = OPERATION_GROUP_SCHEMA) @Timed @@ -680,7 +680,7 @@ public Response disableSchema(@ApiParam(value = "version identifier of the schem } @POST - @Path("/schemas/versionsById/{id}/state/archive") + @Path("/schemas/versions/{id}/state/archive") @ApiOperation(value = "Disables version of the schema identified by the given version id", response = Boolean.class, tags = OPERATION_GROUP_SCHEMA) @Timed @@ -706,7 +706,7 @@ public Response archiveSchema(@ApiParam(value = "version identifier of the schem @POST - @Path("/schemas/versionsById/{id}/state/delete") + @Path("/schemas/versions/{id}/state/delete") @ApiOperation(value = "Disables version of the schema identified by the given version id", response = Boolean.class, tags = OPERATION_GROUP_SCHEMA) @Timed @@ -731,7 +731,7 @@ public Response deleteSchema(@ApiParam(value = "version identifier of the schema } @POST - @Path("/schemas/versionsById/{id}/state/startReview") + @Path("/schemas/versions/{id}/state/startReview") @ApiOperation(value = "Disables version of the schema identified by the given version id", response = Boolean.class, tags = OPERATION_GROUP_SCHEMA) @Timed @@ -755,24 +755,28 @@ public Response startReviewSchema(@ApiParam(value = "version identifier of the s return response; } - @POST - @Path("/schemas/versionsById/{id}/state/custom") - @ApiOperation(value = "Runs the custom state execution for schema version identified by the given version id", + @Path("/schemas/versions/{id}/state/{stateId}") + @ApiOperation(value = "Runs the state execution for schema version identified by the given version id and executes action associated with target state id", response = Boolean.class, tags = OPERATION_GROUP_SCHEMA) @Timed - public Response executeCustomState(@ApiParam(value = "version identifier of the schema", required = true) @PathParam("id") Long versionId) { + public Response executeState(@ApiParam(value = "version identifier of the schema", required = true) @PathParam("id") Long versionId, + @ApiParam(value = "", required = true) @PathParam("stateId") Byte stateId) { Response response; try { - schemaRegistry.executeCustomState(versionId); + schemaRegistry.transitionState(versionId, stateId); response = WSUtils.respondEntity(true, Response.Status.OK); } catch (SchemaNotFoundException e) { LOG.info("No schema version is found with schema version id : [{}]", versionId); response = WSUtils.respond(Response.Status.NOT_FOUND, CatalogResponse.ResponseMessage.ENTITY_NOT_FOUND, versionId.toString()); } catch(SchemaLifecycleException e) { LOG.error("Encountered error while disabling schema version with id [{}]", versionId, e); - response = WSUtils.respond(Response.Status.BAD_REQUEST, CatalogResponse.ResponseMessage.BAD_REQUEST, e.getMessage()); + CatalogResponse.ResponseMessage badRequestResponse = + e.getCause() != null && e.getCause() instanceof IncompatibleSchemaException + ? CatalogResponse.ResponseMessage.INCOMPATIBLE_SCHEMA + : CatalogResponse.ResponseMessage.BAD_REQUEST; + response = WSUtils.respond(Response.Status.BAD_REQUEST, badRequestResponse, e.getMessage()); } catch (Exception ex) { LOG.error("Encountered error while getting schema version with id [{}]", versionId, ex); response = WSUtils.respond(Response.Status.INTERNAL_SERVER_ERROR, CatalogResponse.ResponseMessage.EXCEPTION, ex.getMessage()); diff --git a/schema-registry/rest-service/src/test/java/com/hortonworks/registries/schemaregistry/avro/AvroSchemaRegistryClientTest.java b/schema-registry/rest-service/src/test/java/com/hortonworks/registries/schemaregistry/avro/AvroSchemaRegistryClientTest.java index 1b0cada48..180ef4d99 100644 --- a/schema-registry/rest-service/src/test/java/com/hortonworks/registries/schemaregistry/avro/AvroSchemaRegistryClientTest.java +++ b/schema-registry/rest-service/src/test/java/com/hortonworks/registries/schemaregistry/avro/AvroSchemaRegistryClientTest.java @@ -15,6 +15,7 @@ **/ package com.hortonworks.registries.schemaregistry.avro; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Maps; import com.hortonworks.registries.common.catalog.CatalogResponse; import com.hortonworks.registries.common.test.IntegrationTest; @@ -41,6 +42,8 @@ import com.hortonworks.registries.schemaregistry.serdes.avro.AvroSnapshotSerializer; import com.hortonworks.registries.schemaregistry.serdes.avro.SerDesProtocolHandlerRegistry; import com.hortonworks.registries.schemaregistry.state.SchemaLifecycleException; +import com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleStateMachineInfo; +import com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleStateTransition; import com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleStates; import org.apache.avro.util.Utf8; import org.apache.commons.io.IOUtils; @@ -59,8 +62,12 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; @@ -422,22 +429,22 @@ public void doTestSchemaVersionEnableState(String schemaName) throws Exception { SCHEMA_REGISTRY_CLIENT.disableSchemaVersion(schemaVersionId_2); SchemaVersionInfo schemaVersionInfo = SCHEMA_REGISTRY_CLIENT.getSchemaVersionInfo(schemaIdVersion_2); System.out.println("disable :: schemaVersionInfo.getStateId() = " + schemaVersionInfo.getStateId()); - Assert.assertEquals(SchemaVersionLifecycleStates.DISABLED.id(), schemaVersionInfo.getStateId()); + Assert.assertEquals(SchemaVersionLifecycleStates.DISABLED.getId(), schemaVersionInfo.getStateId()); SCHEMA_REGISTRY_CLIENT.enableSchemaVersion(schemaVersionId_2); schemaVersionInfo = SCHEMA_REGISTRY_CLIENT.getSchemaVersionInfo(schemaIdVersion_2); System.out.println("enable :: schemaVersionInfo.getStateId() = " + schemaVersionInfo.getStateId()); - Assert.assertEquals(SchemaVersionLifecycleStates.ENABLED.id(), schemaVersionInfo.getStateId()); + Assert.assertEquals(SchemaVersionLifecycleStates.ENABLED.getId(), schemaVersionInfo.getStateId()); SCHEMA_REGISTRY_CLIENT.disableSchemaVersion(schemaVersionId_2); schemaVersionInfo = SCHEMA_REGISTRY_CLIENT.getSchemaVersionInfo(schemaIdVersion_2); System.out.println("disable :: schemaVersionInfo.getStateId() = " + schemaVersionInfo.getStateId()); - Assert.assertEquals(SchemaVersionLifecycleStates.DISABLED.id(), schemaVersionInfo.getStateId()); + Assert.assertEquals(SchemaVersionLifecycleStates.DISABLED.getId(), schemaVersionInfo.getStateId()); SCHEMA_REGISTRY_CLIENT.archiveSchemaVersion(schemaVersionId_2); schemaVersionInfo = SCHEMA_REGISTRY_CLIENT.getSchemaVersionInfo(schemaIdVersion_2); System.out.println("archive :: schemaVersionInfo.getStateId() = " + schemaVersionInfo.getStateId()); - Assert.assertEquals(SchemaVersionLifecycleStates.ARCHIVED.id(), schemaVersionInfo.getStateId()); + Assert.assertEquals(SchemaVersionLifecycleStates.ARCHIVED.getId(), schemaVersionInfo.getStateId()); } @Test @@ -450,7 +457,8 @@ public void testSchemaVersionLifeCycleStatesWithValidationAsAll() throws Excepti doTestSchemaVersionLifeCycleStates(SchemaValidationLevel.ALL); } - private void doTestSchemaVersionLifeCycleStates(SchemaValidationLevel validationLevel) throws InvalidSchemaException, IncompatibleSchemaException, SchemaNotFoundException, IOException, SchemaLifecycleException { + private void doTestSchemaVersionLifeCycleStates(SchemaValidationLevel validationLevel) + throws InvalidSchemaException, IncompatibleSchemaException, SchemaNotFoundException, IOException, SchemaLifecycleException { SchemaMetadata schemaMetadata = new SchemaMetadata.Builder(TEST_NAME_RULE.getMethodName() + "-schema") .type(AvroSchemaProvider.TYPE) .schemaGroup("group") @@ -472,7 +480,7 @@ private void doTestSchemaVersionLifeCycleStates(SchemaValidationLevel validation // disable version 2 SCHEMA_REGISTRY_CLIENT.disableSchemaVersion(schemaIdVersion_2.getSchemaVersionId()); - Assert.assertEquals(SchemaVersionLifecycleStates.DISABLED.id(), + Assert.assertEquals(SchemaVersionLifecycleStates.DISABLED.getId(), SCHEMA_REGISTRY_CLIENT.getSchemaVersionInfo(schemaIdVersion_2).getStateId()); // add version 3 @@ -482,32 +490,99 @@ private void doTestSchemaVersionLifeCycleStates(SchemaValidationLevel validation "Third version of the schema, removes name field")); // enable version 2 SCHEMA_REGISTRY_CLIENT.enableSchemaVersion(schemaIdVersion_2.getSchemaVersionId()); - Assert.assertEquals(SchemaVersionLifecycleStates.ENABLED.id(), + Assert.assertEquals(SchemaVersionLifecycleStates.ENABLED.getId(), SCHEMA_REGISTRY_CLIENT.getSchemaVersionInfo(schemaIdVersion_2).getStateId()); SchemaIdVersion schemaIdVersion_4 = SCHEMA_REGISTRY_CLIENT.addSchemaVersion(schemaName, new SchemaVersion(AvroSchemaRegistryClientUtil.getSchema("/schema-4.avsc"), "Forth version of the schema, adds back name field, but different type", - SchemaVersionLifecycleStates.INITIATED.id())); + SchemaVersionLifecycleStates.INITIATED.getId())); // enable version 4 try { SCHEMA_REGISTRY_CLIENT.enableSchemaVersion(schemaIdVersion_4.getSchemaVersionId()); - Assert.fail("Enabling "+schemaIdVersion_4+" should have failed with incompatible schema error"); + Assert.fail("Enabling " + schemaIdVersion_4 + " should have failed with incompatible schema error"); } catch (IncompatibleSchemaException e) { } - Assert.assertEquals(SchemaVersionLifecycleStates.INITIATED.id(), + Assert.assertEquals(SchemaVersionLifecycleStates.INITIATED.getId(), SCHEMA_REGISTRY_CLIENT.getSchemaVersionInfo(schemaIdVersion_4).getStateId()); // disable version 3 SCHEMA_REGISTRY_CLIENT.disableSchemaVersion(schemaIdVersion_3.getSchemaVersionId()); - Assert.assertEquals(SchemaVersionLifecycleStates.DISABLED.id(), + Assert.assertEquals(SchemaVersionLifecycleStates.DISABLED.getId(), SCHEMA_REGISTRY_CLIENT.getSchemaVersionInfo(schemaIdVersion_3).getStateId()); // enable version 3 SCHEMA_REGISTRY_CLIENT.enableSchemaVersion(schemaIdVersion_3.getSchemaVersionId()); - Assert.assertEquals(SchemaVersionLifecycleStates.ENABLED.id(), + Assert.assertEquals(SchemaVersionLifecycleStates.ENABLED.getId(), SCHEMA_REGISTRY_CLIENT.getSchemaVersionInfo(schemaIdVersion_3).getStateId()); } + + @Test + public void testSchemaVersionLifeCycleStateMachineConfig() throws Exception { + SchemaVersionLifecycleStateMachineInfo stateMachineInfo = + SCHEMA_REGISTRY_CLIENT.getSchemaVersionLifecycleStateMachineInfo(); + ObjectMapper objectMapper = new ObjectMapper(); + String stateMachineAsStr = objectMapper.writeValueAsString(stateMachineInfo); + SchemaVersionLifecycleStateMachineInfo readStateMachineInfo = + objectMapper.readValue(stateMachineAsStr, SchemaVersionLifecycleStateMachineInfo.class); + + Assert.assertEquals(readStateMachineInfo, stateMachineInfo); + + // check for duplicate state/transitions + checkDuplicateEntries(stateMachineInfo.getStates()); + checkDuplicateEntries(stateMachineInfo.getTransitions()); + } + + private void checkDuplicateEntries(Collection states) { + HashSet statesSet = new HashSet<>(); + for (T state : states) { + if (!statesSet.add(state)) { + Assert.fail("stateMachineInfo contains duplicate state: " + state); + } + } + } + + @Test + public void testSchemaVersionStatesThroughIds() throws Exception { + SchemaMetadata schemaMetadata = new SchemaMetadata.Builder(TEST_NAME_RULE.getMethodName() + "-schema") + .type(AvroSchemaProvider.TYPE) + .schemaGroup("group") + .compatibility(SchemaCompatibility.BOTH) + .build(); + String schemaName = schemaMetadata.getName(); + + // build nextTransitions from state machine + SchemaVersionLifecycleStateMachineInfo stateMachine = SCHEMA_REGISTRY_CLIENT.getSchemaVersionLifecycleStateMachineInfo(); + Map> nextTransitionsForStateIds = new HashMap<>(); + for (SchemaVersionLifecycleStateTransition transition : stateMachine.getTransitions()) { + List nextTransitions = nextTransitionsForStateIds.computeIfAbsent(transition + .getSourceStateId(), + aByte -> new ArrayList<>()); + nextTransitions.add(transition); + } + + Long id = SCHEMA_REGISTRY_CLIENT.registerSchemaMetadata(schemaMetadata); + + SchemaIdVersion schemaIdVersion_1 = + SCHEMA_REGISTRY_CLIENT.addSchemaVersion(schemaName, + new SchemaVersion(AvroSchemaRegistryClientUtil.getSchema("/schema-1.avsc"), + "Initial version of the schema")); + SchemaIdVersion schemaIdVersion_2 = + SCHEMA_REGISTRY_CLIENT.addSchemaVersion(schemaName, + new SchemaVersion(AvroSchemaRegistryClientUtil.getSchema("/schema-2.avsc"), + "Second version of the schema")); + + // disable version 2 + SchemaVersionInfo schemaVersionInfo = SCHEMA_REGISTRY_CLIENT.getSchemaVersionInfo(schemaIdVersion_2); + Byte stateId = schemaVersionInfo.getStateId(); + List nextTransitions = nextTransitionsForStateIds.get(stateId); + + Byte targetStateId = nextTransitions.get(0).getTargetStateId(); + SCHEMA_REGISTRY_CLIENT.transitionState(schemaVersionInfo.getId(), targetStateId); + + Assert.assertEquals(targetStateId, SCHEMA_REGISTRY_CLIENT.getSchemaVersionInfo(schemaIdVersion_2).getStateId()); + } + } diff --git a/schema-registry/serdes/src/test/java/com/hortonworks/registries/schemaregistry/client/MockSchemaRegistryClient.java b/schema-registry/serdes/src/test/java/com/hortonworks/registries/schemaregistry/client/MockSchemaRegistryClient.java index 84a820478..3c41fcba3 100644 --- a/schema-registry/serdes/src/test/java/com/hortonworks/registries/schemaregistry/client/MockSchemaRegistryClient.java +++ b/schema-registry/serdes/src/test/java/com/hortonworks/registries/schemaregistry/client/MockSchemaRegistryClient.java @@ -34,6 +34,8 @@ import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException; import com.hortonworks.registries.schemaregistry.errors.UnsupportedSchemaTypeException; import com.hortonworks.registries.schemaregistry.serde.SerDesException; +import com.hortonworks.registries.schemaregistry.state.SchemaLifecycleException; +import com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleStateMachineInfo; import com.hortonworks.registries.storage.StorageManager; import com.hortonworks.registries.storage.impl.memory.InMemoryStorageManager; @@ -207,6 +209,17 @@ public Collection getSerDes(String schemaName) { return null; } + @Override + public void transitionState(Long schemaVersionId, + Byte targetStateId) throws SchemaNotFoundException, SchemaLifecycleException { + schemaRegistry.transitionState(schemaVersionId, targetStateId); + } + + @Override + public SchemaVersionLifecycleStateMachineInfo getSchemaVersionLifecycleStateMachineInfo() { + return schemaRegistry.getSchemaVersionLifecycleStateMachineInfo(); + } + public T createSerializerInstance(SerDesInfo serDesInfo) { return null; }