diff --git a/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStore.java b/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStore.java index cbdf2d1777..2c699f01fd 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStore.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStore.java @@ -68,7 +68,7 @@ public class DataStore implements OrderedRollingUpdateStatusListener { private final StorageAccessService storageAccessService; private final Map> tables = new ConcurrentHashMap<>(); - private final Map dirtyTables = new ConcurrentHashMap<>(); + private final Set dirtyTables = new HashSet<>(); private final String snapshotRootPath; private final ParquetConfig parquetConfig; @@ -92,6 +92,7 @@ public DataStore(StorageAccessService storageAccessService, @Value("${sw.datastore.data-root-path:}") String dataRootPath, @Value("${sw.datastore.dump-interval:1h}") String dumpInterval, @Value("${sw.datastore.min-no-update-period:4h}") String minNoUpdatePeriod, + @Value("${sw.datastore.min-wal-id-gap:1000}") int minWalIdGap, @Value("${sw.datastore.parquet.compression-codec:SNAPPY}") String compressionCodec, @Value("${sw.datastore.parquet.row-group-size:128MB}") String rowGroupSize, @Value("${sw.datastore.parquet.page-size:1MB}") String pageSize, @@ -111,7 +112,8 @@ public DataStore(StorageAccessService storageAccessService, this.parquetConfig.setPageSize((int) DataSize.parse(pageSize).toBytes()); this.parquetConfig.setPageRowCountLimit(pageRowCountLimit); this.dumpThread = new DumpThread(DurationStyle.detectAndParse(dumpInterval).toMillis(), - DurationStyle.detectAndParse(minNoUpdatePeriod).toMillis()); + DurationStyle.detectAndParse(minNoUpdatePeriod).toMillis(), + minWalIdGap); } public DataStore start() { @@ -138,7 +140,9 @@ public DataStore start() { //noinspection ConstantConditions table.updateFromWal(entry); if (table.getFirstWalLogId() >= 0) { - this.dirtyTables.put(table, ""); + synchronized (this.dirtyTables) { + this.dirtyTables.add(table); + } } } log.info("Finished load wal log..."); @@ -200,7 +204,9 @@ public String update(String tableName, table.lock(); try { var ts = table.update(schema, records); - this.dirtyTables.put(table, ""); + synchronized (this.dirtyTables) { + this.dirtyTables.add(table); + } return Long.toString(ts); } finally { this.updateHandle.poll(); @@ -497,7 +503,9 @@ record = new HashMap<>(); * @return true if there is any dirty table. */ public boolean hasDirtyTables() { - return !this.dirtyTables.isEmpty(); + synchronized (this.dirtyTables) { + return !this.dirtyTables.isEmpty(); + } } /** @@ -586,21 +594,25 @@ private Map getColumnAliases(TableSchema schema, Map= minNoUpdatePeriodMillis) { - table.save(); - this.dirtyTables.remove(table); - return true; + private void saveTables(long minNoUpdatePeriodMillis, int minWalIdGap) throws IOException { + var now = System.currentTimeMillis(); + var maxWalLogId = this.walManager.getMaxEntryId(); + List tables; + synchronized (this.dirtyTables) { + tables = new ArrayList<>(this.dirtyTables); + } + for (var table : tables) { + if ((now - table.getLastUpdateTime()) >= minNoUpdatePeriodMillis + || maxWalLogId - table.getFirstWalLogId() >= minWalIdGap) { + log.info("dumping {}", table.getTableName()); + table.save(); + synchronized (this.dirtyTables) { + if (table.getFirstWalLogId() < 0) { + this.dirtyTables.remove(table); + } } - } finally { - table.unlock(); } } - return false; } private void clearWalLogFiles() { @@ -659,21 +671,19 @@ private class DumpThread extends Thread { private final long dumpIntervalMillis; private final long minNoUpdatePeriodMillis; - private transient boolean terminated; + private final int minWalIdGap; + private volatile boolean terminated; - public DumpThread(long dumpIntervalMillis, long minNoUpdatePeriodMillis) { + public DumpThread(long dumpIntervalMillis, long minNoUpdatePeriodMillis, int minWalIdGap) { this.dumpIntervalMillis = dumpIntervalMillis; this.minNoUpdatePeriodMillis = minNoUpdatePeriodMillis; + this.minWalIdGap = minWalIdGap; } public void run() { while (!this.terminated) { try { - while (saveOneTable(minNoUpdatePeriodMillis)) { - if (this.terminated) { - return; - } - } + saveTables(this.minNoUpdatePeriodMillis, this.minWalIdGap); clearWalLogFiles(); } catch (Throwable t) { log.error("failed to save table", t); diff --git a/server/controller/src/main/java/ai/starwhale/mlops/datastore/MemoryTable.java b/server/controller/src/main/java/ai/starwhale/mlops/datastore/MemoryTable.java index 4d2e063560..bb730cc7ac 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/datastore/MemoryTable.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/datastore/MemoryTable.java @@ -23,6 +23,8 @@ public interface MemoryTable { + String getTableName(); + TableSchema getSchema(); void updateFromWal(Wal.WalEntry entry); diff --git a/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/MemoryTableImpl.java b/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/MemoryTableImpl.java index a68be725ff..af0e6416d7 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/MemoryTableImpl.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/MemoryTableImpl.java @@ -49,15 +49,16 @@ import java.io.IOException; import java.text.SimpleDateFormat; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.NavigableMap; import java.util.Objects; import java.util.Set; import java.util.TimeZone; @@ -81,6 +82,7 @@ public class MemoryTableImpl implements MemoryTable { static final Pattern ATTRIBUTE_NAME_PATTERN = Pattern.compile("^[\\p{Alnum}_]+$"); + @Getter private final String tableName; private final WalManager walManager; @@ -95,7 +97,7 @@ public class MemoryTableImpl implements MemoryTable { private final Map statisticsMap = new HashMap<>(); @Getter - private long firstWalLogId = -1; + private volatile long firstWalLogId = -1; private long lastWalLogId = -1; @@ -103,7 +105,7 @@ public class MemoryTableImpl implements MemoryTable { private long lastUpdateTime = 0; @Getter - private transient long lastRevision = 0; + private volatile long lastRevision = 0; @Setter private boolean useTimestampAsRevision = false; // unittest only @@ -191,55 +193,115 @@ var record = reader.read(); @Override public void save() throws IOException { String metadata; - try { - metadata = JsonFormat.printer().print(TableMeta.MetaData.newBuilder() - .setLastWalLogId(this.lastWalLogId) - .setLastUpdateTime(this.lastUpdateTime) - .setLastRevision(this.lastRevision) - .build()); - } catch (InvalidProtocolBufferException e) { - throw new SwProcessException(ErrorType.DATASTORE, "failed to print table meta", e); - } var columnSchema = new HashMap(); - int index = 0; - for (var entry : new TreeMap<>(this.statisticsMap).entrySet()) { - columnSchema.put(entry.getKey(), entry.getValue().createSchema(entry.getKey(), index++)); - } - var revisionColumnSchema = new ColumnSchema(REVISION_COLUMN_NAME, index++); - revisionColumnSchema.setType(ColumnType.INT64); - var deletedFlagColumnSchema = new ColumnSchema(DELETED_FLAG_COLUMN_NAME, index); - deletedFlagColumnSchema.setType(ColumnType.BOOL); - columnSchema.put(REVISION_COLUMN_NAME, revisionColumnSchema); - columnSchema.put(DELETED_FLAG_COLUMN_NAME, deletedFlagColumnSchema); - + this.lock(); + var lastRevision = this.lastRevision; + var firstWalLogId = this.firstWalLogId; + this.firstWalLogId = -1; try { + try { + try { + metadata = JsonFormat.printer().print(TableMeta.MetaData.newBuilder() + .setLastWalLogId(this.lastWalLogId) + .setLastUpdateTime(this.lastUpdateTime) + .setLastRevision(this.lastRevision) + .build()); + } catch (InvalidProtocolBufferException e) { + throw new SwProcessException(ErrorType.DATASTORE, "failed to print table meta", e); + } + int index = 0; + for (var entry : new TreeMap<>(this.statisticsMap).entrySet()) { + columnSchema.put(entry.getKey(), entry.getValue().createSchema(entry.getKey(), index++)); + } + var timestampColumnSchema = new ColumnSchema(REVISION_COLUMN_NAME, index++); + timestampColumnSchema.setType(ColumnType.INT64); + var deletedFlagColumnSchema = new ColumnSchema(DELETED_FLAG_COLUMN_NAME, index); + deletedFlagColumnSchema.setType(ColumnType.BOOL); + columnSchema.put(REVISION_COLUMN_NAME, timestampColumnSchema); + columnSchema.put(DELETED_FLAG_COLUMN_NAME, deletedFlagColumnSchema); + } finally { + this.unlock(); + } + var currentSnapshots = this.storageAccessService.list(this.dataPathPrefix).collect(Collectors.toList()); + var path = this.dataPathPrefix + this.dataPathSuffixFormat.format(new Date()); SwWriter.writeWithBuilder( new SwParquetWriterBuilder(this.storageAccessService, columnSchema, this.schema.toJsonString(), metadata, - this.dataPathPrefix + this.dataPathSuffixFormat.format(new Date()), + path, this.parquetConfig), - this.recordMap.entrySet().stream() - .map(entry -> { - var list = new ArrayList>(); - for (var record : entry.getValue()) { - var recordMap = new HashMap(); - if (record.getValues() != null) { - recordMap.putAll(record.getValues()); + new Iterator<>() { + private final LinkedList> candidates = new LinkedList<>(); + + private BaseValue lastKey; + + { + getNext(); + } + + @Override + public boolean hasNext() { + return !this.candidates.isEmpty(); + } + + @Override + public Map next() { + var ret = candidates.poll(); + if (candidates.isEmpty()) { + this.getNext(); + } + return ret; + } + + private void getNext() { + MemoryTableImpl.this.lock(); + try { + NavigableMap> target; + if (this.lastKey == null) { + target = MemoryTableImpl.this.recordMap; + } else { + target = MemoryTableImpl.this.recordMap.tailMap(this.lastKey, false); + } + int count = 0; + for (var entry : target.entrySet()) { + this.lastKey = entry.getKey(); + for (var record : entry.getValue()) { + if (record.getRevision() > lastRevision) { + break; + } + var recordMap = new HashMap(); + if (record.getValues() != null) { + recordMap.putAll(record.getValues()); + } + recordMap.put(MemoryTableImpl.this.schema.getKeyColumn(), entry.getKey()); + recordMap.put(REVISION_COLUMN_NAME, new Int64Value(record.getRevision())); + recordMap.put(DELETED_FLAG_COLUMN_NAME, BaseValue.valueOf(record.isDeleted())); + this.candidates.add(recordMap); + } + if (++count == 1000) { + break; } - recordMap.put(this.schema.getKeyColumn(), entry.getKey()); - recordMap.put(REVISION_COLUMN_NAME, new Int64Value(record.getRevision())); - recordMap.put(DELETED_FLAG_COLUMN_NAME, BaseValue.valueOf(record.isDeleted())); - list.add(recordMap); } - return list; - }).flatMap(Collection::stream).iterator()); + } finally { + MemoryTableImpl.this.unlock(); + } + } + }); + for (var snapshot : currentSnapshots) { + try { + if (!snapshot.equals(path)) { + this.storageAccessService.delete(snapshot); + } + } catch (IOException e) { + log.warn("fail to delete {}", snapshot, e); + } + } } catch (Throwable e) { + this.firstWalLogId = firstWalLogId; log.error("fail to save table:{}, error:{}", this.tableName, e.getMessage(), e); throw e; } - this.firstWalLogId = -1; } @Override diff --git a/server/controller/src/main/java/ai/starwhale/mlops/datastore/wal/WalManager.java b/server/controller/src/main/java/ai/starwhale/mlops/datastore/wal/WalManager.java index d069bbc719..ab64f383ec 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/datastore/wal/WalManager.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/datastore/wal/WalManager.java @@ -49,7 +49,7 @@ public class WalManager extends Thread { private final LinkedList entriesToWrite = new LinkedList<>(); private final int walMaxFileSize; private final int walMaxFileSizeNoHeader; - private transient boolean terminated; + private volatile boolean terminated; private long maxEntryId; diff --git a/server/controller/src/main/java/ai/starwhale/mlops/datastore/wal/WalRemoteFileManager.java b/server/controller/src/main/java/ai/starwhale/mlops/datastore/wal/WalRemoteFileManager.java index 579402ee30..71c502075b 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/datastore/wal/WalRemoteFileManager.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/datastore/wal/WalRemoteFileManager.java @@ -42,7 +42,7 @@ @Slf4j public class WalRemoteFileManager extends Thread { - private transient boolean terminated; + private volatile boolean terminated; private final StorageAccessService storageAccessService; private final int ossMaxAttempts; diff --git a/server/controller/src/main/resources/application.yaml b/server/controller/src/main/resources/application.yaml index 07581df2c9..41afac5583 100644 --- a/server/controller/src/main/resources/application.yaml +++ b/server/controller/src/main/resources/application.yaml @@ -92,6 +92,7 @@ sw: data-root-path: ${SW_DATASTORE_DATA_ROOT_PATH:} dump-interval: ${SW_DATASTORE_DUMP_INTERVAL:1h} min-no-update-period: ${SW_DATASTORE_MIN_NO_UPDATE_PERIOD:4h} + min-wal-id-gap: ${SW_DATASTORE_MIN_WAL_ID_GAP:1000} parquet: compression-codec: ${SW_DATASTORE_PARQUET_COMPRESSION_CODEC:SNAPPY} row-group-size: ${SW_DATASTORE_PARQUET_ROW_GROUP_SIZE:128MB} diff --git a/server/controller/src/test/java/ai/starwhale/mlops/api/DataStoreControllerTest.java b/server/controller/src/test/java/ai/starwhale/mlops/api/DataStoreControllerTest.java index 679876318e..567dad7f8f 100644 --- a/server/controller/src/test/java/ai/starwhale/mlops/api/DataStoreControllerTest.java +++ b/server/controller/src/test/java/ai/starwhale/mlops/api/DataStoreControllerTest.java @@ -98,6 +98,7 @@ public void setUp() { "", "1h", "1d", + 1000, "SNAPPY", "1MB", "1KB", diff --git a/server/controller/src/test/java/ai/starwhale/mlops/datastore/DataStoreTest.java b/server/controller/src/test/java/ai/starwhale/mlops/datastore/DataStoreTest.java index 3e16402496..8d65ec6f47 100644 --- a/server/controller/src/test/java/ai/starwhale/mlops/datastore/DataStoreTest.java +++ b/server/controller/src/test/java/ai/starwhale/mlops/datastore/DataStoreTest.java @@ -66,7 +66,7 @@ public class DataStoreTest { private FileSystem fs; - private transient DataStore dataStore; + private volatile DataStore dataStore; private StorageAccessService storageAccessService; @@ -84,6 +84,8 @@ private static class DataStoreParams { @Default String minNoUpdatePeriod = "1d"; @Default + int minWalIdGap = 10; + @Default String compressionCodec = "SNAPPY"; @Default String rowGroupSize = "1MB"; @@ -111,6 +113,7 @@ private void createDateStore(DataStoreParams params) { params.dataRootPath, params.dumpInterval, params.minNoUpdatePeriod, + params.minWalIdGap, params.compressionCodec, params.rowGroupSize, params.pageSize, @@ -1019,7 +1022,7 @@ public void execute() throws Exception { dataStore.terminate(); System.out.printf("%s terminated\n", this.dateFormat.format(new Date())); createDateStore(DataStoreParams.builder() - .dumpInterval("1s") + .dumpInterval("10ms") .minNoUpdatePeriod("1ms") .build()); System.out.printf("%s restarted\n", this.dateFormat.format(new Date())); diff --git a/server/controller/src/test/java/ai/starwhale/mlops/datastore/impl/MemoryTableImplTest.java b/server/controller/src/test/java/ai/starwhale/mlops/datastore/impl/MemoryTableImplTest.java index 88465871c7..cc3df6c698 100644 --- a/server/controller/src/test/java/ai/starwhale/mlops/datastore/impl/MemoryTableImplTest.java +++ b/server/controller/src/test/java/ai/starwhale/mlops/datastore/impl/MemoryTableImplTest.java @@ -834,6 +834,19 @@ public void testUpdateFromWal() throws IOException { }))) .collect(Collectors.toList()))); } + + @Test + public void testSave() throws IOException { + this.memoryTable.update( + new TableSchemaDesc("k", List.of( + ColumnSchemaDesc.builder().name("k").type("STRING").build(), + ColumnSchemaDesc.builder().name("a").type("INT32").build())), + List.of(Map.of("k", "0", "a", "a"))); + for (int i = 0; i < 100; ++i) { + this.memoryTable.save(); + assertThat(storageAccessService.list("test").count(), is(1L)); + } + } } @Nested