Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial commit for multi master changes #89

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@
<artifactId>reflections</artifactId>
<version>0.10.2</version>
</dependency>
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.4</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.appform.dropwizard.sharding.utils.ShardCalculator;
import io.dropwizard.Configuration;
import io.dropwizard.ConfiguredBundle;
import io.dropwizard.db.DataSourceFactory;
import io.dropwizard.db.PooledDataSourceFactory;
import io.dropwizard.hibernate.AbstractDAO;
import io.dropwizard.hibernate.HibernateBundle;
Expand All @@ -61,13 +62,16 @@
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import lombok.var;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.StringUtils;
import org.hibernate.SessionFactory;
import org.reflections.Reflections;

import javax.persistence.Entity;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -87,8 +91,9 @@ public abstract class DBShardingBundleBase<T extends Configuration> implements C
private static final String DEFAULT_SHARDS = "2";

private List<HibernateBundle<T>> shardBundles = Lists.newArrayList();
private Map<Integer, Integer> shardMapping = new HashMap<>();
@Getter
private List<SessionFactory> sessionFactories;
private Map<Integer, List<SessionFactory>> sessionFactories;
@Getter
private ShardManager shardManager;
@Getter
Expand Down Expand Up @@ -150,7 +155,7 @@ private void init(final ImmutableList<Class<?>> inEntities) {
shardInfoProvider,
blacklistingStore,
shardManager);
IntStream.range(0, numShards).forEach(
IntStream.range(0, shardCount()).forEach(
shard -> shardBundles.add(new HibernateBundle<T>(inEntities, new SessionFactoryFactory()) {
@Override
protected String name() {
Expand All @@ -159,7 +164,7 @@ protected String name() {

@Override
public PooledDataSourceFactory getDataSourceFactory(T t) {
return getConfig(t).getShards().get(shard);
return getMultiMasterConfig(t).get(shard);
}
}));
}
Expand All @@ -171,7 +176,11 @@ public void run(T configuration, Environment environment) {
throw new RuntimeException(
"Shard count provided through environment does not match the size of the shard configuration list");
}
sessionFactories = shardBundles.stream().map(HibernateBundle::getSessionFactory).collect(Collectors.toList());
sessionFactories = IntStream.range(0, shardBundles.size())
.boxed()
.collect(Collectors.groupingBy(shardMapping::get, LinkedHashMap::new,
Collectors.mapping(i -> shardBundles.get(i).getSessionFactory(),
Collectors.toList())));
this.shardingOptions = getShardingOptions(configuration);
environment.admin().addTask(new BlacklistShardTask(shardManager));
environment.admin().addTask(new UnblacklistShardTask(shardManager));
Expand Down Expand Up @@ -234,6 +243,31 @@ public Map<Integer, Boolean> healthStatus() {

protected abstract ShardedHibernateFactory getConfig(T config);

protected List<DataSourceFactory> getMultiMasterConfig(T config){
final List<DataSourceFactory> dataSourceFactories = getConfig(config).getShards();
final List<DataSourceFactory> factoryArrayList = new ArrayList<>();
for (int i = 0, j=0; i < dataSourceFactories.size(); i++) {
final DataSourceFactory originalFactory = dataSourceFactories.get(i);
final String[] urlValues = getUrlValues(originalFactory.getUrl());
for (String urlValue : urlValues) {
urlValue = urlValue.trim();
final DataSourceFactory newFactory = copyDataSourceFactory(originalFactory);
newFactory.setUrl(urlValue);
factoryArrayList.add(newFactory);
shardMapping.put(j++, i);
}
}
return factoryArrayList;
}

protected boolean enableMultiMaster() {
return false;
}

protected int shardCount() {
return numShards;
}

protected Supplier<MetricConfig> getMetricConfig(T config) {
return () -> getConfig(config).getMetricConfig();
}
Expand Down Expand Up @@ -405,4 +439,25 @@ private void setupObservers(final T config,
});
}

private String[] getUrlValues(final String url){
if(enableMultiMaster()){
return url.split(",");
}
return new String[] { url };
}

private DataSourceFactory copyDataSourceFactory(DataSourceFactory originalFactory) {
DataSourceFactory copiedFactory = new DataSourceFactory(); // Use your custom class if needed

// Use Apache Commons BeanUtils to copy fields
try {
BeanUtils.copyProperties(copiedFactory, originalFactory);
} catch (Exception e) {
throw new RuntimeException(
"Error copying DataSourceFactory");
}

return copiedFactory;
}

}
9 changes: 9 additions & 0 deletions src/main/java/io/appform/dropwizard/sharding/DaoHolder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.appform.dropwizard.sharding;

public interface DaoHolder<T> {
T get(int shard, final String key);

T get(int shard);

int size();
}
51 changes: 51 additions & 0 deletions src/main/java/io/appform/dropwizard/sharding/MapDaoHolder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.appform.dropwizard.sharding;

import io.appform.dropwizard.sharding.sharding.impl.ConsistentHashBucketIdExtractor;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class MapDaoHolder<T> implements DaoHolder<T> {

private final Map<Integer, HolderValue> hashMap;

public MapDaoHolder(final Map<Integer, List<T>> hashMap) {
this.hashMap = hashMap.entrySet().stream().collect(Collectors.toMap(
Map.Entry::getKey,
entry -> new HolderValue(entry.getValue())));
}

@Override
public T get(final int shard, final String key) {
return hashMap.get(shard).get(key);
}

@Override
public T get(final int shard) {
return hashMap.get(shard).get();
}

@Override
public int size() {
return hashMap.size();
}

class HolderValue {
private final List<T> elements;
private final ConsistentHashBucketIdExtractor<String> bucketIdExtractor;

public HolderValue(final List<T> elements) {
this.elements = elements;
this.bucketIdExtractor = new ConsistentHashBucketIdExtractor<>(elements.size());
}

public T get(String key){
return elements.get(bucketIdExtractor.bucketId(key));
}

public T get(){
return elements.get(0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.hibernate.SessionFactory;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

Expand All @@ -44,7 +45,7 @@ public class CacheableLookupDao<T> extends LookupDao<T> {

private LookupCache<T> cache;

public CacheableLookupDao(List<SessionFactory> sessionFactories,
public CacheableLookupDao(Map<Integer,List<SessionFactory>> sessionFactories,
Class<T> entityClass,
ShardCalculator<String> shardCalculator,
LookupCache<T> cache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.hibernate.criterion.DetachedCriteria;

import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
Expand All @@ -34,7 +35,7 @@ public class CacheableRelationalDao<T> extends RelationalDao<T> {

private RelationalCache<T> cache;

public CacheableRelationalDao(List<SessionFactory> sessionFactories, Class<T> entityClass,
public CacheableRelationalDao(Map<Integer,List<SessionFactory>> sessionFactories, Class<T> entityClass,
ShardCalculator<String> shardCalculator,
RelationalCache<T> cache,
ShardInfoProvider shardInfoProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public interface Mutator<T> {
enum Mode {READ, INSERT}

private final int shardId;
private final String parent;
private final SessionFactory sessionFactory;
private final List<Function<T, Void>> operations = Lists.newArrayList();
private Supplier<T> getter;
Expand All @@ -36,12 +37,14 @@ enum Mode {READ, INSERT}

public LockedContext(
int shardId,
String parent,
SessionFactory sessionFactory,
Supplier<T> getter,
Class<T> entityClass,
ShardInfoProvider shardInfoProvider,
TransactionObserver observer) {
this.shardId = shardId;
this.parent = parent;
this.sessionFactory = sessionFactory;
this.getter = getter;
this.observer = observer;
Expand All @@ -51,13 +54,15 @@ public LockedContext(

public LockedContext(
int shardId,
String parent,
SessionFactory sessionFactory,
Function<T, T> saver,
T entity,
Class<T> entityClass,
ShardInfoProvider shardInfoProvider,
TransactionObserver observer) {
this.shardId = shardId;
this.parent = parent;
this.sessionFactory = sessionFactory;
this.saver = saver;
this.entity = entity;
Expand Down
Loading