2/5/2020 - (AMQP) The broadcast exchange had both datastores AND aggregators bound to it - I went ahead and modified the AMQP exchange setup. There are now two exchanges per endpoint type: stitch.aggregator.direct
and stitch.aggregator.broadcast
.
-
Aggregator metastores need to keep track of the Endpoint records tha the datastores are sending in.
-
Maybe some way to track resource availability by the state of the host datastore?
-
Also need to think about eventually implementing the ability to have resource replicas.
-
Aggregators should probably allow you to create resource schemas.
-
Track resource metadata cardinality across a schema.
Resource ID - The Id of the resource Epoch - A sequence number that identifies all copies that have the latest updates for a resource. The larger the number, the most up-to-date the copy of the resource keeping an older one from becoming a master. Mtime - A timestamp from the last time an update was made. Master- The datastore that contains the resources master (The contianer with the largest epoch number) ActiveServers - A list of available datastores that house the resource. InactiveServers - The id of datastores that host the container but are in maintenance mode. UnusedServers - The id of datastores from which no "heartbeat" has been received for quite some time. LogicalSizeMB - The logical size on disk of the resource
# Starting the DataStore services
java -cp "target/*:target/libs/*" stitch.stitch.datastore.DataStoreMain
# Starting the Aggregator service
java -cp "target/*:target/libs/*" stitch.aggregator.AggregatorMain
# List available datastores
./stitchcli list datastores
# List available resources
./stitchcli list resources
Discovering datastores that are managed by an aggregator
public class ExampleConfigDiscovery {
public static void main(String[] args) throws Exception {
Map<String, String> filters = new HashMap<>();
filters.put("type", ConfigItemType.DATASTORE.toString());
filters.put("aggregator", config.getConfigId());
for(ConfigItem dataStoreConfig : configStore.getConfigItemsByAttributes(filters)){
dataStoreClients.put(dataStoreConfig.getConfigId(), new DataStoreClient(dataStoreConfig.getConfigId()));
}
}
}
import stitch.aggregator.AggregatorServer;
import stitch.util.configuration.item.ConfigItem;
public class ExampleAggregatorServer implements MetaStoreCallable {
public ExampleAggregatorServer(ConfigItem config) throws IllegalAccessException, InstantiationException, ClassNotFoundException {
}
}
{
"id": "95a6495db3ef46c5aa98b428127c2cd4",
"name": "amqp",
"type": "aggregator",
"class": "com.foo.bar.ExampleAggregatorServer"
}
package com.foo.bar;
import stitch.datastore.DataStoreServer;
import stitch.util.configuration.item.ConfigItem;
import java.lang.reflect.InvocationTargetException;
public class ExampleDataStore extends DataStoreServer {
public ExampleDataStore(ConfigItem configItem) throws IllegalAccessException, InstantiationException, ClassNotFoundException, InvocationTargetException, NoSuchMethodException {
super(configItem);
}
}
{
"id": "95a6495db3ef46c5aa98b428127c2cd4",
"name": "amqp",
"type": "datastore",
"class": "com.foo.bar.ExampleDataStore"
}
package stitch.transport.amqp;
import stitch.transport.Transport;
import stitch.transport.TransportCallableClient;
import stitch.util.configuration.item.ConfigItem;
public class HttpClient extends RpcCallableAbstract implements RpcCallableClient {
public HttpClient(ConfigItem config) throws IllegalAccessException, InstantiationException, ClassNotFoundException {
super(config);
}
}
package stitch.transport.amqp;
import stitch.transport.Transport;
import stitch.transport.TransportCallableServer;
import stitch.util.configuration.item.ConfigItem;
public class HttpServer extends RpcCallableAbstract implements RpcCallableServer {
public HttpServer(ConfigItem config) throws IllegalAccessException, InstantiationException, ClassNotFoundException {
super(config);
}
}
{
"id": "95a6495db3ef46c5aa98b428127c2cd4",
"name": "amqp",
"type": "transport",
"class": "stitch.transport.amqp",
"client_class": Astitch.rpc.transport.amqp.AmqpClientch.rpc.transport.amqp.AMQPServer"
}