Skip to content

Commit

Permalink
Added new DB Store
Browse files Browse the repository at this point in the history
  • Loading branch information
dutoitc committed Sep 29, 2017
1 parent 9fbffb6 commit 5fb2a3e
Show file tree
Hide file tree
Showing 11 changed files with 789 additions and 102 deletions.
6 changes: 5 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,11 @@
<artifactId>json-path</artifactId>
<version>2.4.0</version>
</dependency>

<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.193</version>
</dependency>
</dependencies>

<repositories>
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/ch/mno/copper/CopperDaemon.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import java.lang.management.ManagementFactory;
import java.net.MalformedURLException;
import java.rmi.RemoteException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -99,7 +101,7 @@ private void runIteration() {

// Processors
LocalDateTime queryTime = LocalDateTime.now(); // Keep time, so that next run will have data between query time assignation and valueStore read time
Collection<String> changedValues = valuesStore.queryValues(lastQueryTime, LocalDateTime.MAX);
Collection<String> changedValues = valuesStore.queryValues(lastQueryTime.toInstant(ZoneOffset.UTC), Instant.MAX);
lastQueryTime = queryTime;
processors.forEach(p -> {
Collection<String> keys = p.findKnownKeys(changedValues);
Expand Down
270 changes: 270 additions & 0 deletions src/main/java/ch/mno/copper/data/DbHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
package ch.mno.copper.data;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

/**
* Helper to read-write data to a local H2 Database. The DB is automatically created if not existent.
* Time is from-inclusive, to-exclusive like String.substring.
* Values once stored will never end but could change over time.
* Only one value is allowed at a given instant.
* Insertion could only be done after already inserted values (no insertion in the past).
*
* Created by dutoitc on 25.05.2016.
*/
public class DbHelper {

private static Logger LOG = LoggerFactory.getLogger(DbHelper.class);
private static final String DBURL="jdbc:h2:./copperdb";
private static final String DBUSER="";
private static final String DBPASS="";
public static final Instant INSTANT_MAX = Instant.parse("3000-12-31T00:00:00.00Z");

static {
try {
Class.forName("org.h2.Driver");
} catch (ClassNotFoundException e) {
throw new RuntimeException("Cannot load H2: " + e.getMessage(), e);
}
createDatabaseIfNeeded();
}

private static void createDatabaseIfNeeded() {
try (Connection con = DriverManager.getConnection(DBURL, DBUSER, DBPASS);
Statement stmt = con.createStatement())
{
// Create table ?
ResultSet rs = stmt.executeQuery("select count(*) as nb from information_schema.tables where table_name = 'VALUESTORE'");
rs.next();
if (rs.getInt("nb")==0) {
LOG.info("Database not found. Creating table VALUESTORE...");
stmt.execute("CREATE TABLE valuestore (" +
" idvaluestore int(11) NOT NULL," +
" key text NOT NULL," +
" value text NOT NULL," +
" datefrom timestamp NOT NULL," +
" dateto timestamp NOT NULL," +
" primary key (idvaluestore))");
LOG.info("Creating sequence SEQ_VALUESTORE_ID");
stmt.execute("create sequence SEQ_VALUESTORE_ID start with 1");
}
} catch (SQLException e2) {
throw new RuntimeException("An error occured while initializing DB: " + e2.getMessage(), e2);
}
}


/** Delete all DB data */
public static void clearAllData() {
String sql = "delete from valuestore";
try (Connection con = DriverManager.getConnection(DBURL, DBUSER, DBPASS))
{
PreparedStatement ps = con.prepareStatement(sql);
int nbRows = ps.executeUpdate();
LOG.info("Deleted " + nbRows + " lines");
} catch (SQLException e) {
e.printStackTrace();
}
}

private static long nextSequence() throws SQLException {
String sqlNextSequence = "select nextval('SEQ_VALUESTORE_ID')";

try (Connection con2 = DriverManager.getConnection(DBURL, DBUSER, DBPASS))
{
// Find next sequence number
ResultSet rs = con2.prepareCall(sqlNextSequence).executeQuery();
if (!rs.next()) throw new RuntimeException("Sequence error");
return rs.getLong(1);
}
}

/** Insert a value at given instant. Actuve value will be finished at the same instant */
public static void insert(String key, String value, Instant instant) throws SQLException {
String sqlInsert = "INSERT INTO valuestore ( idvaluestore, key, value, datefrom, dateto) VALUES (?,?,?,?,?)";
String sqlUpdatePrevious = "update valuestore set dateto=? where idvaluestore=?";

try (Connection con = DriverManager.getConnection(DBURL, DBUSER, DBPASS);
PreparedStatement stmt = con.prepareStatement(sqlInsert))
{
StoreValue previousValue = readLatest(key);

long id = nextSequence();
stmt.setLong(1, id);
stmt.setString(2, key);
stmt.setString(3, value);
stmt.setTimestamp(4, Timestamp.from(instant));
stmt.setTimestamp(5, Timestamp.from(INSTANT_MAX));
int rowInserted = stmt.executeUpdate();
if (rowInserted!=1) {
throw new RuntimeException("DB error: inserted " + rowInserted + " values.");
}

// Stop previous
if (previousValue!=null) {
if (previousValue.getTimestampFrom().isAfter(instant)) {
throw new RuntimeException("Cannot insert value in the past for key " + key + ", old.start="+previousValue.getTimestampFrom()+", new.start="+instant);
}

try (PreparedStatement stmt2 = con.prepareStatement(sqlUpdatePrevious))
{
stmt2.setTimestamp(1, Timestamp.from(instant));
stmt2.setLong(2, previousValue.id);
stmt2.executeUpdate();
} catch (SQLException e) {
throw new RuntimeException("Cannot update previous value: " + e.getMessage(), e);
}
}
} catch (SQLException e) {
throw new RuntimeException("An error occured while saving values", e);
}
}

/** Read the 'key' value at given instant */
public static StoreValue read(String key, Instant timestamp) throws SQLException {
String sql = "SELECT idvaluestore, key, value, datefrom, dateto FROM valuestore where key=? and datefrom<=? and dateto>? order by datefrom";
try (Connection con = DriverManager.getConnection(DBURL, DBUSER, DBPASS);
PreparedStatement stmt = con.prepareStatement(sql))
{
stmt.setString(1,key);
stmt.setTimestamp(2,Timestamp.from(timestamp));
stmt.setTimestamp(3,Timestamp.from(timestamp));
ResultSet rs = stmt.executeQuery();

List<StoreValue> values = new ArrayList<>();
while(rs.next()) {
values.add(mapStoreValue(rs));
}

if (values.size()==0) {
return null;
}
if (values.size()==1) {
return values.get(0);
}
} catch (SQLException e) {
throw new RuntimeException("An error occured while saving values", e);
}


throw new RuntimeException("Too much value for key="+key+", instant="+timestamp.getEpochSecond());
}

/** Read all values for a given key active between from, to. (could have been inserted before and finish after) */
public static List<StoreValue> read(String key, Instant timestampFrom, Instant timestampTo) throws SQLException {
String sql = "SELECT idvaluestore, key, value, datefrom, dateto FROM valuestore where key=? and ((datefrom<? and dateto>?) or (datefrom>=? and datefrom<?) or (dateto>? and dateto<=?)) order by datefrom";
try (Connection con = DriverManager.getConnection(DBURL, DBUSER, DBPASS);
PreparedStatement stmt = con.prepareStatement(sql))
{
stmt.setString(1,key);
stmt.setTimestamp(2,Timestamp.from(timestampFrom));
stmt.setTimestamp(3,Timestamp.from(timestampTo));
stmt.setTimestamp(4,Timestamp.from(timestampFrom));
stmt.setTimestamp(5,Timestamp.from(timestampTo));
stmt.setTimestamp(6,Timestamp.from(timestampFrom));
stmt.setTimestamp(7,Timestamp.from(timestampTo));
ResultSet rs = stmt.executeQuery();

List<StoreValue> values = new ArrayList<>();
while(rs.next()) {
values.add(mapStoreValue(rs));
}
return values;
} catch (SQLException e) {
throw new RuntimeException("An error occured while saving values", e);
}
}

/** Read the latest value of a key) */
public static StoreValue readLatest(String key) throws SQLException {
try (Connection con = DriverManager.getConnection(DBURL, DBUSER, DBPASS);
PreparedStatement stmt = con.prepareStatement("SELECT idvaluestore, key, value, datefrom, dateto FROM valuestore where key=? and dateto=?"))
{
stmt.setString(1,key);
stmt.setTimestamp(2, Timestamp.from(INSTANT_MAX));
ResultSet rs = stmt.executeQuery();
if (!rs.next()) return null;
return mapStoreValue(rs);
} catch (SQLException e) {
throw new RuntimeException("An error occured while saving values", e);
}
}

/** Read all latest values */
public static List<StoreValue> readLatest() throws SQLException {
List<StoreValue> values = new ArrayList<>();
try (Connection con = DriverManager.getConnection(DBURL, DBUSER, DBPASS);
PreparedStatement stmt = con.prepareStatement("SELECT idvaluestore, key, value, datefrom, dateto FROM valuestore where dateto=?"))
{
stmt.setTimestamp(1, Timestamp.from(INSTANT_MAX));
ResultSet rs = stmt.executeQuery();
while (rs.next()) {
values.add(mapStoreValue(rs));
}
} catch (SQLException e) {
throw new RuntimeException("An error occured while saving values", e);
}
return values;
}

/** Read keys updated between from(inclusive) ant to(exclusive) */
public static Collection<String> readUpdatedKeys(Instant from, Instant to) {
List<String> values = new ArrayList<>();
try (Connection con = DriverManager.getConnection(DBURL, DBUSER, DBPASS);
PreparedStatement stmt = con.prepareStatement("SELECT distinct key FROM valuestore where datefrom>=? and datefrom<?"))
{
stmt.setTimestamp(1, Timestamp.from(from));
stmt.setTimestamp(2, Timestamp.from(to));
ResultSet rs = stmt.executeQuery();
while (rs.next()) {
values.add(rs.getString("key"));
}
} catch (SQLException e) {
throw new RuntimeException("An error occured while saving values", e);
}
return values;
}

private static StoreValue mapStoreValue(ResultSet rs) throws SQLException {
long idValueStore = rs.getLong("idValueStore");
String dbKey = rs.getString("key");
String value = rs.getString("value");
Instant from = rs.getTimestamp("datefrom").toInstant();
Instant to = rs.getTimestamp("dateto").toInstant();
return new StoreValue(idValueStore, dbKey, value, from, to);
}


// public static void dumpForTests() {
// try (Connection con = DriverManager.getConnection(DBURL, DBUSER, DBPASS);
// PreparedStatement stmt = con.prepareStatement("SELECT idvaluestore, key, value, datefrom, dateto FROM valuestore order by key, datefrom"))
// {
// ResultSet rs = stmt.executeQuery();
// System.out.println("-----------------------------------");
// System.out.println("dumpForTests");
// while (rs.next()) {
// long idValueStore = rs.getLong("idValueStore");
// String dbKey = rs.getString("key");
// String value = rs.getString("value");
// Instant from = rs.getTimestamp("datefrom").toInstant();
// Instant to = rs.getTimestamp("dateto").toInstant();
// System.out.println(idValueStore+";"+dbKey+";"+value+";"+from+";"+to);
// }
// } catch (SQLException e) {
// throw new RuntimeException("An error occured while saving values", e);
// }
// }

}
91 changes: 91 additions & 0 deletions src/main/java/ch/mno/copper/data/DbValuesStore.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package ch.mno.copper.data;

import java.io.IOException;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Created by dutoitc on 19.09.2017.
*/
public class DbValuesStore implements ValuesStore {

private static DbValuesStore instance;


/** Singleton factory */
public static DbValuesStore getInstance() {
if (instance==null) {
synchronized (DbValuesStore.class) {
if (instance==null) {
instance = new DbValuesStore();
}
}
}
return instance;
}


@Override
public void put(String key, String value) {
try {
DbHelper.insert(key, value, Instant.now());
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

@Override
public String getValue(String key) {
StoreValue storeValue = null;
try {
storeValue = DbHelper.readLatest(key);
} catch (SQLException e) {
throw new RuntimeException("Cannot read value " + key + ": " + e.getMessage());
}
if (storeValue==null) {
return ""; // no data
}
return storeValue.getValue();
}

@Override
public Map<String, StoreValue> getValues() {
try {
List<StoreValue> values = DbHelper.readLatest();
Map<String, StoreValue> map = new HashMap<>(values.size()*4/3+1);
values.forEach(v->map.put(v.getKey(), v));
return map;
} catch (SQLException e) {
throw new RuntimeException("Cannot read values: " + e.getMessage(), e);
}
}

@Override
public Collection<String> queryValues(Instant from, Instant to) {
return DbHelper.readUpdatedKeys(from, to);
}

@Override
public List<List<String>> queryValues(Instant from, Instant to, String columns) {
return null;
}

@Override
public void load() throws IOException {

}

@Override
public void save() throws IOException {

}

@Override
public Map<String, String> getValuesMapString() {
return null;
}
}
Loading

0 comments on commit 5fb2a3e

Please sign in to comment.