Skip to content

Commit

Permalink
Fix index update with writeMany(singleStatement: true)
Browse files Browse the repository at this point in the history
  • Loading branch information
tp committed Dec 2, 2024
1 parent e7a76b1 commit 36c2187
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 22 deletions.
74 changes: 60 additions & 14 deletions lib/src/index_entity_store.dart
Original file line number Diff line number Diff line change
Expand Up @@ -247,26 +247,42 @@ class IndexedEntityStore<T, K> {
}) {
final keys = <K>{};

final sw = Stopwatch()..start();

if (singleStatement) {
if (entities.isEmpty) {
return;
}

_database.execute(
[
'REPLACE INTO `entity` (`type`, `key`, `value`) '
' VALUES (?1, ?, ?)',
// Add additional entry values for each further parameter
', (?1, ?, ?)' * (entities.length - 1),
].join(' '),
[
_entityKey,
for (final e in entities) ...[
_connector.getPrimaryKey(e),
_connector.serialize(e),
try {
_database.execute('BEGIN');

_database.execute(
[
'REPLACE INTO `entity` (`type`, `key`, `value`) '
' VALUES (?1, ?, ?)',
// Add additional entry values for each further parameter
', (?1, ?, ?)' * (entities.length - 1),
].join(' '),
[
_entityKey,
for (final e in entities) ...[
_connector.getPrimaryKey(e),
_connector.serialize(e),
],
],
],
);
);

_updateIndexInternalSingleStatement(entities);

_database.execute('COMMIT');
} catch (e) {
_database.execute('ROLLBACK');

rethrow;
}

keys.addAll(entities.map(_connector.getPrimaryKey));
} else {
// transaction variant

Expand All @@ -292,6 +308,9 @@ class IndexedEntityStore<T, K> {
}
}

print(
'$singleStatement ${(sw.elapsedMicroseconds / 1000).toStringAsFixed(2)}ms');

_handleUpdate(keys);
}

Expand All @@ -315,6 +334,33 @@ class IndexedEntityStore<T, K> {
}
}

void _updateIndexInternalSingleStatement(Iterable<T> entities) {
if (_indexColumns._indexColumns.values.isEmpty) {
return;
}

_database.execute(
[
'INSERT INTO `index` (`type`, `entity`, `field`, `value`, `referenced_type`, `unique`) '
' VALUES (?1, ?, ?, ?, ?, ?)',
// Add additional entry values for each further entity
', (?1, ?, ?, ?, ?, ?)' *
(entities.length * _indexColumns._indexColumns.values.length - 1),
].join(' '),
[
_entityKey,
for (final indexColumn in _indexColumns._indexColumns.values)
for (final e in entities) ...[
_connector.getPrimaryKey(e),
indexColumn._field,
indexColumn._getIndexValue(e),
indexColumn._referencedEntity,
indexColumn._unique,
],
],
);
}

/// Delete the specified entries
void delete({
final T? entity,
Expand Down
121 changes: 113 additions & 8 deletions test/indexed_entity_store_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ void main() {

test(
'Reactive queries, check against duplicate updates',
() async {
() {
final path =
'/tmp/index_entity_store_test_${FlutterTimeline.now}.sqlite3';

Expand Down Expand Up @@ -1197,13 +1197,10 @@ void main() {

final valueStore = db.entityStore(valueWrappingConnector);

valueStore.writeMany(
[
for (var i = 0; i < 10; i++)
_ValueWrapper(FlutterTimeline.now + i, '$i'),
],
singleStatement: false,
);
valueStore.writeMany([
for (var i = 0; i < 10; i++)
_ValueWrapper(FlutterTimeline.now + i, '$i'),
]);

final allEntities = valueStore.query();
final entityValue2 = valueStore.single(
Expand Down Expand Up @@ -1238,6 +1235,81 @@ void main() {
expect(valueStore.subscriptionCount, 0);
},
);

for (final singleStatement in [true, false]) {
test(
'writeMany(singleStatement: $singleStatement)',
() {
final path =
'/tmp/index_entity_store_test_${FlutterTimeline.now}.sqlite3';

final db = IndexedEntityDabase.open(path);

final valueWrappingConnector =
IndexedEntityConnector<_IntValueWrapper, int, String>(
entityKey: 'value_wrapper',
getPrimaryKey: (f) => f.key,
getIndices: (index) {
index((e) => e.value % 2 == 0, as: 'even');
index((e) => e.batch, as: 'batch');
},
serialize: (f) => jsonEncode(f.toJSON()),
deserialize: (s) => _IntValueWrapper.fromJSON(
jsonDecode(s) as Map<String, dynamic>,
),
);

final store = db.entityStore(valueWrappingConnector);

final allEntities = store.query();
final evenEntities = store.query(
where: (cols) => cols['even'].equals(true),
);
final batch1Entities = store.query(
where: (cols) => cols['batch'].equals(1),
);
final batch2Entities = store.query(
where: (cols) => cols['batch'].equals(2),
);

// writeMany
{
final entities = [
for (var i = 0; i < 1000; i++) _IntValueWrapper(i, i, 1),
];

store.writeMany(entities, singleStatement: singleStatement);
}

expect(allEntities.value, hasLength(1000));
expect(evenEntities.value, hasLength(500));
expect(batch1Entities.value, hasLength(1000));
expect(batch2Entities.value, isEmpty);

// writeMany again (in-place updates, index update with new batch ID)
{
final entities = [
for (var i = 0; i < 1000; i++) _IntValueWrapper(i, i, 2),
];

store.writeMany(entities, singleStatement: singleStatement);
}

expect(allEntities.value, hasLength(1000));
expect(evenEntities.value, hasLength(500));
expect(evenEntities.value.first.batch, 2); // value got updated
expect(batch1Entities.value, isEmpty);
expect(batch2Entities.value, hasLength(1000));

allEntities.dispose();
evenEntities.dispose();
batch1Entities.dispose();
batch2Entities.dispose();

db.dispose();
},
);
}
}

class _FooEntity {
Expand Down Expand Up @@ -1470,3 +1542,36 @@ class _ValueWrapper {
return '_ValueWrapper($key, $value)';
}
}

class _IntValueWrapper {
_IntValueWrapper(
this.key,
this.value,
this.batch,
);

final int key;
final int value;
final int batch;

Map<String, dynamic> toJSON() {
return {
'key': key,
'value': value,
'batch': batch,
};
}

static _IntValueWrapper fromJSON(Map<String, dynamic> json) {
return _IntValueWrapper(
json['key'],
json['value'],
json['batch'],
);
}

@override
String toString() {
return '_IntValueWrapper($key, $value, $batch)';
}
}

0 comments on commit 36c2187

Please sign in to comment.