Skip to content

Commit

Permalink
fix(discovery): discovery synchronization for stale lost targets (#689)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewazores authored Nov 28, 2024
1 parent 25eaba2 commit c3ee63b
Show file tree
Hide file tree
Showing 9 changed files with 480 additions and 253 deletions.
42 changes: 41 additions & 1 deletion src/main/java/io/cryostat/StorageBuckets.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,21 @@
*/
package io.cryostat;

import java.time.Duration;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import io.cryostat.util.HttpStatusCodeIdentifier;

import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
Expand All @@ -30,7 +41,17 @@ public class StorageBuckets {
@Inject S3Client storage;
@Inject Logger logger;

@ConfigProperty(name = "storage.buckets.creation-retry.period")
Duration creationRetryPeriod;

private final Set<String> buckets = ConcurrentHashMap.newKeySet();
private final ScheduledExecutorService worker = Executors.newSingleThreadScheduledExecutor();

public void createIfNecessary(String bucket) {
buckets.add(bucket);
}

private boolean tryCreate(String bucket) {
boolean exists = false;
logger.debugv("Checking if storage bucket \"{0}\" exists ...", bucket);
try {
Expand All @@ -49,8 +70,27 @@ public void createIfNecessary(String bucket) {
storage.createBucket(CreateBucketRequest.builder().bucket(bucket).build());
logger.debugv("Storage bucket \"{0}\" created", bucket);
} catch (Exception e) {
logger.error(e);
logger.warn(e);
return false;
}
}
return true;
}

void onStart(@Observes StartupEvent evt) {
worker.scheduleAtFixedRate(
() -> {
var it = buckets.iterator();
while (it.hasNext()) {
if (tryCreate(it.next())) it.remove();
}
},
0,
creationRetryPeriod.toMillis(),
TimeUnit.MILLISECONDS);
}

void onStop(@Observes ShutdownEvent evt) {
worker.shutdown();
}
}
71 changes: 47 additions & 24 deletions src/main/java/io/cryostat/discovery/DiscoveryNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.fasterxml.jackson.annotation.JsonView;
import io.quarkus.hibernate.orm.panache.PanacheEntity;
import io.quarkus.narayana.jta.QuarkusTransaction;
import io.quarkus.panache.common.Parameters;
import io.vertx.mutiny.core.eventbus.EventBus;
import jakarta.annotation.Nullable;
import jakarta.enterprise.context.ApplicationScoped;
Expand All @@ -42,6 +43,8 @@
import jakarta.persistence.FetchType;
import jakarta.persistence.JoinColumn;
import jakarta.persistence.ManyToOne;
import jakarta.persistence.NamedQueries;
import jakarta.persistence.NamedQuery;
import jakarta.persistence.OneToMany;
import jakarta.persistence.OneToOne;
import jakarta.persistence.PostPersist;
Expand All @@ -56,6 +59,11 @@

@Entity
@EntityListeners(DiscoveryNode.Listener.class)
@NamedQueries({
@NamedQuery(
name = "DiscoveryNode.byTypeWithName",
query = "from DiscoveryNode where nodeType = :nodeType and name = :name")
})
public class DiscoveryNode extends PanacheEntity {

public static final String NODE_TYPE = "nodeType";
Expand Down Expand Up @@ -129,33 +137,48 @@ public static List<DiscoveryNode> findAllByNodeType(NodeType nodeType) {
}

public static DiscoveryNode environment(String name, NodeType nodeType) {
return QuarkusTransaction.joiningExisting()
.call(
() -> {
DiscoveryNode node = new DiscoveryNode();
node.name = name;
node.nodeType = nodeType.getKind();
node.labels = new HashMap<>();
node.children = new ArrayList<>();
node.target = null;
node.persist();
return node;
});
var kind = nodeType.getKind();
return DiscoveryNode.<DiscoveryNode>find(
"#DiscoveryNode.byTypeWithName",
Parameters.with("nodeType", kind).and("name", name))
.firstResultOptional()
.orElseGet(
() ->
QuarkusTransaction.joiningExisting()
.call(
() -> {
DiscoveryNode node = new DiscoveryNode();
node.name = name;
node.nodeType = kind;
node.labels = new HashMap<>();
node.children = new ArrayList<>();
node.target = null;
node.persist();
return node;
}));
}

public static DiscoveryNode target(Target target, NodeType nodeType) {
return QuarkusTransaction.joiningExisting()
.call(
() -> {
DiscoveryNode node = new DiscoveryNode();
node.name = target.connectUrl.toString();
node.nodeType = nodeType.getKind();
node.labels = new HashMap<>(target.labels);
node.children = null;
node.target = target;
node.persist();
return node;
});
var kind = nodeType.getKind();
var connectUrl = target.connectUrl.toString();
return DiscoveryNode.<DiscoveryNode>find(
"#DiscoveryNode.byTypeWithName",
Parameters.with("nodeType", kind).and("name", connectUrl))
.firstResultOptional()
.orElseGet(
() ->
QuarkusTransaction.joiningExisting()
.call(
() -> {
DiscoveryNode node = new DiscoveryNode();
node.name = connectUrl;
node.nodeType = kind;
node.labels = new HashMap<>(target.labels);
node.children = null;
node.target = target;
node.persist();
return node;
}));
}

@Override
Expand Down
Loading

0 comments on commit c3ee63b

Please sign in to comment.