Skip to content

Commit

Permalink
enhance(datastore): allow dumping when updating tables
Browse files Browse the repository at this point in the history
  • Loading branch information
xuchuan committed Sep 26, 2023
1 parent 3977573 commit 71f0ede
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class DataStore implements OrderedRollingUpdateStatusListener {
private final StorageAccessService storageAccessService;

private final Map<String, SoftReference<MemoryTable>> tables = new ConcurrentHashMap<>();
private final Map<MemoryTable, String> dirtyTables = new ConcurrentHashMap<>();
private final Set<MemoryTable> dirtyTables = new HashSet<>();
private final String snapshotRootPath;
private final ParquetConfig parquetConfig;

Expand All @@ -92,6 +92,7 @@ public DataStore(StorageAccessService storageAccessService,
@Value("${sw.datastore.data-root-path:}") String dataRootPath,
@Value("${sw.datastore.dump-interval:1h}") String dumpInterval,
@Value("${sw.datastore.min-no-update-period:4h}") String minNoUpdatePeriod,
@Value("${sw.datastore.min-wal-id-gap:1000}") int minWalIdGap,
@Value("${sw.datastore.parquet.compression-codec:SNAPPY}") String compressionCodec,
@Value("${sw.datastore.parquet.row-group-size:128MB}") String rowGroupSize,
@Value("${sw.datastore.parquet.page-size:1MB}") String pageSize,
Expand All @@ -111,7 +112,8 @@ public DataStore(StorageAccessService storageAccessService,
this.parquetConfig.setPageSize((int) DataSize.parse(pageSize).toBytes());
this.parquetConfig.setPageRowCountLimit(pageRowCountLimit);
this.dumpThread = new DumpThread(DurationStyle.detectAndParse(dumpInterval).toMillis(),
DurationStyle.detectAndParse(minNoUpdatePeriod).toMillis());
DurationStyle.detectAndParse(minNoUpdatePeriod).toMillis(),
minWalIdGap);
}

public DataStore start() {
Expand All @@ -138,7 +140,9 @@ public DataStore start() {
//noinspection ConstantConditions
table.updateFromWal(entry);
if (table.getFirstWalLogId() >= 0) {
this.dirtyTables.put(table, "");
synchronized (this.dirtyTables) {
this.dirtyTables.add(table);
}
}
}
log.info("Finished load wal log...");
Expand Down Expand Up @@ -200,7 +204,9 @@ public String update(String tableName,
table.lock();
try {
var ts = table.update(schema, records);
this.dirtyTables.put(table, "");
synchronized (this.dirtyTables) {
this.dirtyTables.add(table);
}
return Long.toString(ts);
} finally {
this.updateHandle.poll();
Expand Down Expand Up @@ -497,7 +503,9 @@ record = new HashMap<>();
* @return true if there is any dirty table.
*/
public boolean hasDirtyTables() {
return !this.dirtyTables.isEmpty();
synchronized (this.dirtyTables) {
return !this.dirtyTables.isEmpty();
}
}

/**
Expand Down Expand Up @@ -586,21 +594,25 @@ private Map<String, String> getColumnAliases(TableSchema schema, Map<String, Str
}
}

private boolean saveOneTable(long minNoUpdatePeriodMillis) throws IOException {
for (var table : this.dirtyTables.keySet()) {
table.lock();
try {
var now = System.currentTimeMillis();
if ((now - table.getLastUpdateTime()) >= minNoUpdatePeriodMillis) {
table.save();
this.dirtyTables.remove(table);
return true;
private void saveTables(long minNoUpdatePeriodMillis, int minWalIdGap) throws IOException {
var now = System.currentTimeMillis();
var maxWalLogId = this.walManager.getMaxEntryId();
List<MemoryTable> tables;
synchronized (this.dirtyTables) {
tables = new ArrayList<>(this.dirtyTables);
}
for (var table : tables) {
if ((now - table.getLastUpdateTime()) >= minNoUpdatePeriodMillis
|| maxWalLogId - table.getFirstWalLogId() >= minWalIdGap) {
log.info("dumping {}", table.getTableName());
table.save();
synchronized (this.dirtyTables) {
if (table.getFirstWalLogId() < 0) {
this.dirtyTables.remove(table);
}
}
} finally {
table.unlock();
}
}
return false;
}

private void clearWalLogFiles() {
Expand All @@ -610,13 +622,9 @@ private void clearWalLogFiles() {
if (table == null) {
continue;
}
table.lock();
try {
if (table.getFirstWalLogId() >= 0 && table.getFirstWalLogId() < minWalLogIdToRetain) {
minWalLogIdToRetain = table.getFirstWalLogId();
}
} finally {
table.unlock();
var walLogId = table.getFirstWalLogId();
if (walLogId >= 0 && walLogId < minWalLogIdToRetain) {
minWalLogIdToRetain = walLogId;
}
}
try {
Expand Down Expand Up @@ -659,21 +667,19 @@ private class DumpThread extends Thread {

private final long dumpIntervalMillis;
private final long minNoUpdatePeriodMillis;
private final int minWalIdGap;
private transient boolean terminated;

public DumpThread(long dumpIntervalMillis, long minNoUpdatePeriodMillis) {
public DumpThread(long dumpIntervalMillis, long minNoUpdatePeriodMillis, int minWalIdGap) {
this.dumpIntervalMillis = dumpIntervalMillis;
this.minNoUpdatePeriodMillis = minNoUpdatePeriodMillis;
this.minWalIdGap = minWalIdGap;
}

public void run() {
while (!this.terminated) {
try {
while (saveOneTable(minNoUpdatePeriodMillis)) {
if (this.terminated) {
return;
}
}
saveTables(this.minNoUpdatePeriodMillis, this.minWalIdGap);
clearWalLogFiles();
} catch (Throwable t) {
log.error("failed to save table", t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

public interface MemoryTable {

String getTableName();

TableSchema getSchema();

void updateFromWal(Wal.WalEntry entry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,16 @@
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Set;
import java.util.TimeZone;
Expand All @@ -81,6 +82,7 @@ public class MemoryTableImpl implements MemoryTable {

static final Pattern ATTRIBUTE_NAME_PATTERN = Pattern.compile("^[\\p{Alnum}_]+$");

@Getter
private final String tableName;

private final WalManager walManager;
Expand All @@ -95,7 +97,7 @@ public class MemoryTableImpl implements MemoryTable {
private final Map<String, ColumnStatistics> statisticsMap = new HashMap<>();

@Getter
private long firstWalLogId = -1;
private transient long firstWalLogId = -1;

private long lastWalLogId = -1;

Expand Down Expand Up @@ -191,55 +193,104 @@ var record = reader.read();
@Override
public void save() throws IOException {
String metadata;
try {
metadata = JsonFormat.printer().print(TableMeta.MetaData.newBuilder()
.setLastWalLogId(this.lastWalLogId)
.setLastUpdateTime(this.lastUpdateTime)
.setLastRevision(this.lastRevision)
.build());
} catch (InvalidProtocolBufferException e) {
throw new SwProcessException(ErrorType.DATASTORE, "failed to print table meta", e);
}
var columnSchema = new HashMap<String, ColumnSchema>();
int index = 0;
for (var entry : new TreeMap<>(this.statisticsMap).entrySet()) {
columnSchema.put(entry.getKey(), entry.getValue().createSchema(entry.getKey(), index++));
}
var revisionColumnSchema = new ColumnSchema(REVISION_COLUMN_NAME, index++);
revisionColumnSchema.setType(ColumnType.INT64);
var deletedFlagColumnSchema = new ColumnSchema(DELETED_FLAG_COLUMN_NAME, index);
deletedFlagColumnSchema.setType(ColumnType.BOOL);
columnSchema.put(REVISION_COLUMN_NAME, revisionColumnSchema);
columnSchema.put(DELETED_FLAG_COLUMN_NAME, deletedFlagColumnSchema);

this.lock();
var lastRevision = this.lastRevision;
var firstWalLogId = this.firstWalLogId;
this.firstWalLogId = -1;
try {
try {
try {
metadata = JsonFormat.printer().print(TableMeta.MetaData.newBuilder()
.setLastWalLogId(this.lastWalLogId)
.setLastUpdateTime(this.lastUpdateTime)
.setLastRevision(this.lastRevision)
.build());
} catch (InvalidProtocolBufferException e) {
throw new SwProcessException(ErrorType.DATASTORE, "failed to print table meta", e);
}
int index = 0;
for (var entry : new TreeMap<>(this.statisticsMap).entrySet()) {
columnSchema.put(entry.getKey(), entry.getValue().createSchema(entry.getKey(), index++));
}
var timestampColumnSchema = new ColumnSchema(REVISION_COLUMN_NAME, index++);
timestampColumnSchema.setType(ColumnType.INT64);
var deletedFlagColumnSchema = new ColumnSchema(DELETED_FLAG_COLUMN_NAME, index);
deletedFlagColumnSchema.setType(ColumnType.BOOL);
columnSchema.put(REVISION_COLUMN_NAME, timestampColumnSchema);
columnSchema.put(DELETED_FLAG_COLUMN_NAME, deletedFlagColumnSchema);
} finally {
this.unlock();
}
SwWriter.writeWithBuilder(
new SwParquetWriterBuilder(this.storageAccessService,
columnSchema,
this.schema.toJsonString(),
metadata,
this.dataPathPrefix + this.dataPathSuffixFormat.format(new Date()),
this.parquetConfig),
this.recordMap.entrySet().stream()
.map(entry -> {
var list = new ArrayList<Map<String, BaseValue>>();
for (var record : entry.getValue()) {
var recordMap = new HashMap<String, BaseValue>();
if (record.getValues() != null) {
recordMap.putAll(record.getValues());
new Iterator<>() {
private final LinkedList<Map<String, BaseValue>> candidates = new LinkedList<>();

private BaseValue lastKey;

{
getNext();
}

@Override
public boolean hasNext() {
return !this.candidates.isEmpty();
}

@Override
public Map<String, BaseValue> next() {
var ret = candidates.poll();
if (candidates.isEmpty()) {
this.getNext();
}
return ret;
}

private void getNext() {
MemoryTableImpl.this.lock();
try {
NavigableMap<BaseValue, List<MemoryRecord>> target;
if (this.lastKey == null) {
target = MemoryTableImpl.this.recordMap;
} else {
target = MemoryTableImpl.this.recordMap.tailMap(this.lastKey, false);
}
int count = 0;
for (var entry : target.entrySet()) {
this.lastKey = entry.getKey();
for (var record : entry.getValue()) {
if (record.getRevision() > lastRevision) {
break;
}
var recordMap = new HashMap<String, BaseValue>();
if (record.getValues() != null) {
recordMap.putAll(record.getValues());
}
recordMap.put(MemoryTableImpl.this.schema.getKeyColumn(), entry.getKey());
recordMap.put(REVISION_COLUMN_NAME, new Int64Value(record.getRevision()));
recordMap.put(DELETED_FLAG_COLUMN_NAME, BaseValue.valueOf(record.isDeleted()));
this.candidates.add(recordMap);
}
if (++count == 1000) {
break;
}
recordMap.put(this.schema.getKeyColumn(), entry.getKey());
recordMap.put(REVISION_COLUMN_NAME, new Int64Value(record.getRevision()));
recordMap.put(DELETED_FLAG_COLUMN_NAME, BaseValue.valueOf(record.isDeleted()));
list.add(recordMap);
}
return list;
}).flatMap(Collection::stream).iterator());
} finally {
MemoryTableImpl.this.unlock();
}
}
});
} catch (Throwable e) {
this.firstWalLogId = firstWalLogId;
log.error("fail to save table:{}, error:{}", this.tableName, e.getMessage(), e);
throw e;
}
this.firstWalLogId = -1;
}

@Override
Expand Down
1 change: 1 addition & 0 deletions server/controller/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ sw:
data-root-path: ${SW_DATASTORE_DATA_ROOT_PATH:}
dump-interval: ${SW_DATASTORE_DUMP_INTERVAL:1h}
min-no-update-period: ${SW_DATASTORE_MIN_NO_UPDATE_PERIOD:4h}
min-wal-id-gap: ${SW_DATASTORE_MIN_WAL_ID_GAP:1000}
parquet:
compression-codec: ${SW_DATASTORE_PARQUET_COMPRESSION_CODEC:SNAPPY}
row-group-size: ${SW_DATASTORE_PARQUET_ROW_GROUP_SIZE:128MB}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public void setUp() {
"",
"1h",
"1d",
1000,
"SNAPPY",
"1MB",
"1KB",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ private static class DataStoreParams {
@Default
String minNoUpdatePeriod = "1d";
@Default
int minWalIdGap = 10;
@Default
String compressionCodec = "SNAPPY";
@Default
String rowGroupSize = "1MB";
Expand Down Expand Up @@ -111,6 +113,7 @@ private void createDateStore(DataStoreParams params) {
params.dataRootPath,
params.dumpInterval,
params.minNoUpdatePeriod,
params.minWalIdGap,
params.compressionCodec,
params.rowGroupSize,
params.pageSize,
Expand Down Expand Up @@ -1019,7 +1022,7 @@ public void execute() throws Exception {
dataStore.terminate();
System.out.printf("%s terminated\n", this.dateFormat.format(new Date()));
createDateStore(DataStoreParams.builder()
.dumpInterval("1s")
.dumpInterval("10ms")
.minNoUpdatePeriod("1ms")
.build());
System.out.printf("%s restarted\n", this.dateFormat.format(new Date()));
Expand Down

0 comments on commit 71f0ede

Please sign in to comment.