Skip to content

Commit

Permalink
extract SDK > move state storing to wrapped adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
DC2-DanielKrueger committed Apr 30, 2024
1 parent edd32e8 commit a569f86
Show file tree
Hide file tree
Showing 11 changed files with 66 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.google.common.base.Preconditions;
import com.hivemq.api.model.ApiConstants;
import com.hivemq.api.model.status.Status;
import com.hivemq.edge.modules.api.adapters.ProtocolAdapter;
import com.hivemq.edge.modules.api.adapters.ProtocolAdapterState;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.protocols.ProtocolAdapterWrapper;

Expand All @@ -38,7 +38,7 @@ public class AdapterStatusModelConversionUtils {
protocolAdapterWrapper.getErrorMessage());
}

public static @NotNull Status.CONNECTION_STATUS convertConnectionStatus(@NotNull final ProtocolAdapter.ConnectionStatus connectionStatus){
public static @NotNull Status.CONNECTION_STATUS convertConnectionStatus(@NotNull final ProtocolAdapterState.ConnectionStatus connectionStatus){
Preconditions.checkNotNull(connectionStatus);
switch (connectionStatus){
case DISCONNECTED:
Expand All @@ -55,7 +55,7 @@ public class AdapterStatusModelConversionUtils {
}
}

public static @NotNull Status.RUNTIME_STATUS convertRuntimeStatus(@NotNull final ProtocolAdapter.RuntimeStatus runtimeStatus){
public static @NotNull Status.RUNTIME_STATUS convertRuntimeStatus(@NotNull final ProtocolAdapterState.RuntimeStatus runtimeStatus){
Preconditions.checkNotNull(runtimeStatus);
switch (runtimeStatus){
case STARTED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ public static ProtocolAdapter convertInstalledAdapterType(


final String rawVersion = info.getVersion();
if (rawVersion == null) {
System.err.println("FUCK");
}
final String version = rawVersion.replace("${edge-version}", versionProvider.getVersion());

return new ProtocolAdapter(info.getProtocolId(),
Expand All @@ -101,8 +104,7 @@ public static ProtocolAdapter convertInstalledAdapterType(
* @return The instance to be sent across the API
*/
public static ProtocolAdapter convertModuleAdapterType(
final @NotNull Module module,
final @NotNull ConfigurationService configurationService) {
final @NotNull Module module, final @NotNull ConfigurationService configurationService) {

Preconditions.checkNotNull(module);
Preconditions.checkNotNull(configurationService);
Expand Down Expand Up @@ -130,8 +132,7 @@ public static ProtocolAdapter convertModuleAdapterType(
}

public static String applyAbsoluteServerAddressInDeveloperMode(
@NotNull String logoUrl,
final @NotNull ConfigurationService configurationService) {
@NotNull String logoUrl, final @NotNull ConfigurationService configurationService) {
Preconditions.checkNotNull(logoUrl);
Preconditions.checkNotNull(configurationService);
if (logoUrl != null && Boolean.getBoolean(HiveMQEdgeConstants.DEVELOPMENT_MODE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ public class VersionProvider {

@Inject
public VersionProvider() {
version = ManifestUtils.getValueFromManifest(HiveMQEdgeMain.class, "HiveMQ-Version");
final String versionFromManifest = ManifestUtils.getValueFromManifest(HiveMQEdgeMain.class, "HiveMQ-Version");
if(versionFromManifest!=null) {
version = versionFromManifest;
} else {
version = "Development Snapshot";
}
}

public synchronized @NotNull String getVersion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.google.common.base.Preconditions;
import com.hivemq.edge.model.TypeIdentifierImpl;
import com.hivemq.edge.modules.api.adapters.ProtocolAdapter;
import com.hivemq.edge.modules.api.adapters.ProtocolAdapterState;
import com.hivemq.edge.modules.api.events.EventService;
import com.hivemq.edge.modules.api.events.EventUtils;
Expand All @@ -16,24 +15,24 @@

public class ProtocolAdapterStateImpl implements ProtocolAdapterState {
private final @NotNull EventService eventService;
protected @NotNull AtomicReference<ProtocolAdapter.RuntimeStatus> runtimeStatus =
new AtomicReference<>(ProtocolAdapter.RuntimeStatus.STOPPED);
protected @NotNull AtomicReference<ProtocolAdapter.ConnectionStatus> connectionStatus =
new AtomicReference<>(ProtocolAdapter.ConnectionStatus.DISCONNECTED);
protected @NotNull AtomicReference<RuntimeStatus> runtimeStatus =
new AtomicReference<>(RuntimeStatus.STOPPED);
protected @NotNull AtomicReference<ConnectionStatus> connectionStatus =
new AtomicReference<>(ConnectionStatus.DISCONNECTED);
protected @Nullable String errorMessage;

public ProtocolAdapterStateImpl(final @NotNull EventService eventService) {
this.eventService = eventService;
}

@Override
public boolean setConnectionStatus(@NotNull final ProtocolAdapter.ConnectionStatus connectionStatus) {
public boolean setConnectionStatus(@NotNull final ConnectionStatus connectionStatus) {
Preconditions.checkNotNull(connectionStatus);
return this.connectionStatus.getAndSet(connectionStatus) != connectionStatus;
}

@Override
public @NotNull ProtocolAdapter.ConnectionStatus getConnectionStatus() {
public @NotNull ConnectionStatus getConnectionStatus() {
return connectionStatus.get();
}

Expand All @@ -47,7 +46,7 @@ public void setErrorConnectionStatus(
final @NotNull String protocolId,
@Nullable final Throwable t,
@Nullable final String errorMessage) {
boolean changed = setConnectionStatus(ProtocolAdapter.ConnectionStatus.ERROR);
boolean changed = setConnectionStatus(ConnectionStatus.ERROR);
reportErrorMessage(adapterId, protocolId, t, errorMessage, changed);
}

Expand All @@ -74,12 +73,12 @@ public void reportErrorMessage(
}

@Override
public void setRuntimeStatus(@NotNull final ProtocolAdapter.RuntimeStatus runtimeStatus) {
public void setRuntimeStatus(@NotNull final RuntimeStatus runtimeStatus) {
this.runtimeStatus.set(runtimeStatus);
}

@Override
public @NotNull ProtocolAdapter.RuntimeStatus getRuntimeStatus() {
public @NotNull RuntimeStatus getRuntimeStatus() {
return this.runtimeStatus.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;

import static com.hivemq.edge.modules.api.adapters.ProtocolAdapterState.ConnectionStatus.STATELESS;
import static com.hivemq.edge.modules.api.adapters.ProtocolAdapterState.RuntimeStatus.STARTED;
import static com.hivemq.edge.modules.api.adapters.ProtocolAdapterState.RuntimeStatus.STOPPED;

public class SimulationProtocolAdapter implements PollingPerSubscriptionProtocolAdapter {

private final @NotNull ProtocolAdapterInformation adapterInformation;
Expand All @@ -45,7 +49,8 @@ public SimulationProtocolAdapter(
this.adapterInformation = adapterInformation;
this.adapterConfig = protocolAdapterInput.getConfig();
this.metricRegistry = protocolAdapterInput.getMetricRegistry();
protocolAdapterState = protocolAdapterInput.getProtocolAdapterState();
this.protocolAdapterState = protocolAdapterInput.getProtocolAdapterState();
this.protocolAdapterState.setConnectionStatus(STATELESS);
}

@Override
Expand All @@ -56,13 +61,13 @@ public SimulationProtocolAdapter(
@Override
public @NotNull CompletableFuture<ProtocolAdapterStartOutput> start(
@NotNull final ProtocolAdapterStartInput input, @NotNull final ProtocolAdapterStartOutput output) {
protocolAdapterState.setRuntimeStatus(RuntimeStatus.STARTED);
protocolAdapterState.setRuntimeStatus(STARTED);
return CompletableFuture.completedFuture(output);
}

@Override
public @NotNull CompletableFuture<Void> stop() {
protocolAdapterState.setRuntimeStatus(RuntimeStatus.STOPPED);
protocolAdapterState.setRuntimeStatus(STOPPED);
return CompletableFuture.completedFuture(null);
}

Expand All @@ -71,16 +76,6 @@ public SimulationProtocolAdapter(
return adapterInformation;
}

@Override
public @NotNull ConnectionStatus getConnectionStatus() {
return ConnectionStatus.STATELESS;
}

@Override
public @NotNull RuntimeStatus getRuntimeStatus() {
return protocolAdapterState.getRuntimeStatus();
}

@Override
public @Nullable String getErrorMessage() {
//TODO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public CompletableFuture<Void> stop(final @NotNull String protocolAdapterId) {
}
CompletableFuture<ProtocolAdapterStartOutput> startFuture;
final ProtocolAdapterStartOutputImpl output = new ProtocolAdapterStartOutputImpl();
if (protocolAdapterWrapper.getRuntimeStatus() == ProtocolAdapter.RuntimeStatus.STARTED) {
if (protocolAdapterWrapper.getRuntimeStatus() == ProtocolAdapterState.RuntimeStatus.STARTED) {
startFuture = CompletableFuture.completedFuture(output);
} else {
startFuture =
Expand Down Expand Up @@ -279,7 +279,7 @@ public CompletableFuture<Void> stop(final @NotNull String protocolAdapterId) {
});
}

public CompletableFuture<Void> stop(final @NotNull ProtocolAdapter protocolAdapter) {
public CompletableFuture<Void> stop(final @NotNull ProtocolAdapterWrapper protocolAdapter) {
Preconditions.checkNotNull(protocolAdapter);
if (log.isInfoEnabled()) {
log.info("Stopping protocol-adapter \"{}\".", protocolAdapter.getId());
Expand All @@ -291,7 +291,7 @@ public CompletableFuture<Void> stop(final @NotNull ProtocolAdapter protocolAdapt
.forEach(protocolAdapterPollingService::stopPolling);
}

if (protocolAdapter.getRuntimeStatus() == ProtocolAdapter.RuntimeStatus.STOPPED) {
if (protocolAdapter.getRuntimeStatus() == ProtocolAdapterState.RuntimeStatus.STOPPED) {
stopFuture = CompletableFuture.completedFuture(null);
} else {
stopFuture = protocolAdapter.stop();
Expand Down Expand Up @@ -425,19 +425,21 @@ public Optional<ProtocolAdapterInformation> getAdapterTypeById(final String type
metricRegistry);


final ProtocolAdapterStateImpl protocolAdapterState =
new ProtocolAdapterStateImpl(moduleServices.eventService());
final ProtocolAdapter protocolAdapter =
protocolAdapterFactory.createAdapter(protocolAdapterFactory.getInformation(),
new ProtocolAdapterInputImpl(configObject,
metricRegistry,
version,
new ProtocolAdapterStateImpl(moduleServices.eventService()),
version, protocolAdapterState,
moduleServices,
protocolAdapterMetricsHelper));


ProtocolAdapterWrapper wrapper = new ProtocolAdapterWrapper(protocolAdapter,
protocolAdapterFactory,
protocolAdapterFactory.getInformation(),
protocolAdapterState,
configObject);
protocolAdapters.put(wrapper.getId(), wrapper);
return wrapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.hivemq.edge.modules.api.adapters.ProtocolAdapter;
import com.hivemq.edge.modules.api.adapters.ProtocolAdapterFactory;
import com.hivemq.edge.modules.api.adapters.ProtocolAdapterInformation;
import com.hivemq.edge.modules.api.adapters.ProtocolAdapterState;
import com.hivemq.edge.modules.config.CustomConfig;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.annotations.Nullable;
Expand All @@ -31,17 +32,20 @@ public class ProtocolAdapterWrapper implements ProtocolAdapter{
private final @NotNull ProtocolAdapter adapter;
private final @NotNull ProtocolAdapterFactory<?> adapterFactory;
private final @NotNull ProtocolAdapterInformation adapterInformation;
private final @NotNull ProtocolAdapterState protocolAdapterState;
private final @NotNull CustomConfig configObject;
protected @Nullable Long lastStartAttemptTime;

public ProtocolAdapterWrapper(
final @NotNull ProtocolAdapter adapter,
final @NotNull ProtocolAdapterFactory<?> adapterFactory,
final @NotNull ProtocolAdapterInformation adapterInformation,
final @NotNull ProtocolAdapterState protocolAdapterState,
final @NotNull CustomConfig configObject) {
this.adapter = adapter;
this.adapterFactory = adapterFactory;
this.adapterInformation = adapterInformation;
this.protocolAdapterState = protocolAdapterState;
this.configObject = configObject;
}

Expand All @@ -61,14 +65,12 @@ public ProtocolAdapterWrapper(
return adapter.getProtocolAdapterInformation();
}

@Override
public @NotNull ConnectionStatus getConnectionStatus() {
return adapter.getConnectionStatus();
public @NotNull ProtocolAdapterState.ConnectionStatus getConnectionStatus() {
return protocolAdapterState.getConnectionStatus();
}

@Override
public @NotNull RuntimeStatus getRuntimeStatus() {
return adapter.getRuntimeStatus();
public @NotNull ProtocolAdapterState.RuntimeStatus getRuntimeStatus() {
return protocolAdapterState.getRuntimeStatus();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.hivemq.edge.modules.adapters.model.ProtocolAdapterStartInput;
import com.hivemq.edge.modules.adapters.model.ProtocolAdapterStartOutput;
import com.hivemq.edge.modules.api.adapters.ModuleServices;
import com.hivemq.edge.modules.api.adapters.ProtocolAdapter;
import com.hivemq.edge.modules.api.adapters.ProtocolAdapterInformation;
import com.hivemq.edge.modules.api.adapters.ProtocolAdapterState;
import com.hivemq.edge.modules.api.events.model.Event;
Expand Down Expand Up @@ -54,6 +53,8 @@
import java.util.concurrent.CompletableFuture;

import static com.hivemq.edge.HiveMQEdgeConstants.CLIENT_AGENT_PROPERTY_VALUE;
import static com.hivemq.edge.modules.api.adapters.ProtocolAdapterState.ConnectionStatus.ERROR;
import static com.hivemq.edge.modules.api.adapters.ProtocolAdapterState.ConnectionStatus.STATELESS;

/**
* @author HiveMQ Adapter Generator
Expand Down Expand Up @@ -95,7 +96,7 @@ public HttpProtocolAdapter(
public @NotNull CompletableFuture<ProtocolAdapterStartOutput> start(
@NotNull final ProtocolAdapterStartInput input, @NotNull final ProtocolAdapterStartOutput output) {
try {
protocolAdapterState.setConnectionStatus(ProtocolAdapter.ConnectionStatus.STATELESS);
protocolAdapterState.setConnectionStatus(STATELESS);
if (httpClient == null) {
synchronized (lock) {
if (httpClient == null) {
Expand All @@ -120,16 +121,6 @@ public HttpProtocolAdapter(
return adapterInformation;
}

@Override
public @NotNull ConnectionStatus getConnectionStatus() {
return protocolAdapterState.getConnectionStatus();
}

@Override
public @NotNull RuntimeStatus getRuntimeStatus() {
return protocolAdapterState.getRuntimeStatus();
}

@Override
public @Nullable String getErrorMessage() {
return null;
Expand Down Expand Up @@ -157,8 +148,8 @@ public HttpProtocolAdapter(
boolean publishData = isSuccessStatusCode(data.getHttpStatusCode()) ||
!adapterConfig.isHttpPublishSuccessStatusCodeOnly();
protocolAdapterState.setConnectionStatus(isSuccessStatusCode(data.getHttpStatusCode()) ?
ProtocolAdapter.ConnectionStatus.STATELESS :
ProtocolAdapter.ConnectionStatus.ERROR);
STATELESS :
ERROR);
if (publishData) {
return data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static com.hivemq.edge.modules.api.adapters.ProtocolAdapterState.ConnectionStatus.CONNECTED;

public class ModbusProtocolAdapter implements PollingPerSubscriptionProtocolAdapter {
private static final Logger log = LoggerFactory.getLogger(ModbusProtocolAdapter.class);
private final @NotNull Object lock = new Object();
Expand Down Expand Up @@ -96,7 +98,7 @@ public ModbusProtocolAdapter(
if (modbusClient != null) {
if (!modbusClient.isConnected()) {
modbusClient.connect()
.thenRun(() -> protocolAdapterState.setConnectionStatus(ConnectionStatus.CONNECTED))
.thenRun(() -> protocolAdapterState.setConnectionStatus(CONNECTED))
.get();
}
return CompletableFuture.supplyAsync(() -> readRegisters(adapterSubscription))
Expand Down Expand Up @@ -160,16 +162,6 @@ public ModbusProtocolAdapter(
return adapterInformation;
}

@Override
public @NotNull ConnectionStatus getConnectionStatus() {
return protocolAdapterState.getConnectionStatus();
}

@Override
public @NotNull RuntimeStatus getRuntimeStatus() {
return protocolAdapterState.getRuntimeStatus();
}

@Override
public @Nullable String getErrorMessage() {
//TODO
Expand Down
Loading

0 comments on commit a569f86

Please sign in to comment.