Skip to content

Commit

Permalink
Merge pull request #133 from awakeyoyoyo/main
Browse files Browse the repository at this point in the history
perf[orm]: persist finish, reset modifiedTime & writeToDbTime
  • Loading branch information
jaysunxiao authored Oct 21, 2024
2 parents dc1a196 + 6e76170 commit 3aaab2d
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 14 deletions.
3 changes: 3 additions & 0 deletions orm/src/main/java/com/zfoo/orm/accessor/IAccessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package com.zfoo.orm.accessor;

import com.zfoo.orm.cache.persister.PNode;
import com.zfoo.orm.model.IEntity;
import org.springframework.lang.Nullable;

Expand All @@ -33,6 +34,8 @@ public interface IAccessor {

<PK extends Comparable<PK>, E extends IEntity<PK>> void batchUpdate(List<E> entities);

<PK extends Comparable<PK>, E extends IEntity<PK>> void batchUpdateNode(List<PNode<PK,E>> entities);

<PK extends Comparable<PK>, E extends IEntity<PK>> boolean delete(E entity);

<PK extends Comparable<PK>, E extends IEntity<PK>> boolean delete(PK pk, Class<E> entityClazz);
Expand Down
33 changes: 33 additions & 0 deletions orm/src/main/java/com/zfoo/orm/accessor/MongodbAccessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.ReplaceOneModel;
import com.zfoo.orm.OrmContext;
import com.zfoo.orm.cache.persister.PNode;
import com.zfoo.orm.model.IEntity;
import com.zfoo.protocol.collection.CollectionUtils;
import com.zfoo.scheduler.util.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -102,6 +104,37 @@ public <PK extends Comparable<PK>, E extends IEntity<PK>> void batchUpdate(List<
}
}

@Override
public <PK extends Comparable<PK>, E extends IEntity<PK>> void batchUpdateNode(List<PNode<PK,E>> nodes) {
if (CollectionUtils.isEmpty(nodes)) {
return;
}

try {
@SuppressWarnings("unchecked")
var entityClazz = (Class<E>) nodes.get(0).getClass();
var collection = OrmContext.getOrmManager().getCollection(entityClazz);
List<E> entities = nodes.stream().map(PNode::getEntity).toList();
var batchList = entities.stream()
.map(it -> new ReplaceOneModel<E>(Filters.eq("_id", it.id()), it))
.toList();

var result = collection.bulkWrite(batchList, new BulkWriteOptions().ordered(false));

//设置修改时间
long currentTime = TimeUtils.currentTimeMillis();
nodes.forEach(k->k.resetTime(currentTime));

if (result.getMatchedCount() != entities.size()) {
// 在数据库的批量更新操作中需要更新的数量和最终更新的数量不相同
logger.warn("database:[{}] update size:[{}] not equal with matched size:[{}](some entity of id not exist in database)"
, entityClazz.getSimpleName(), entities.size(), result.getMatchedCount());
}
} catch (Throwable t) {
logger.error("batchUpdate unknown exception", t);
}
}

@Override
public <PK extends Comparable<PK>, E extends IEntity<PK>> boolean delete(E entity) {
@SuppressWarnings("unchecked")
Expand Down
29 changes: 15 additions & 14 deletions orm/src/main/java/com/zfoo/orm/cache/EntityCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public void accept(List<LazyCache.Cache<PK, PNode<PK, E>>> removes, LazyCache.Re
var updateList = removes.stream()
.map(it -> it.v)
.filter(it -> !noNeedUpdate(it))
.map(it -> it.getEntity())
.toList();
EventBus.asyncExecute(clazz.hashCode(), () -> doPersist(updateList));
}
Expand Down Expand Up @@ -228,8 +227,7 @@ public void persist(PK pk) {
if (noNeedUpdate(pnode)) {
return;
}
pnode.resetTime(TimeUtils.currentTimeMillis());
doPersist(List.of(pnode.getEntity()));
doPersist(List.of(pnode));
}

private boolean noNeedUpdate(PNode<PK, E> pnode) {
Expand All @@ -247,7 +245,7 @@ public void persistAll() {
} else {
var currentTime = TimeUtils.currentTimeMillis();
// key为threadId
var updateMap = new HashMap<Long, List<E>>();
var updateMap = new HashMap<Long, List<PNode<PK, E>>>();
var initSize = caches.size() >> 2;
caches.forEach(new BiConsumer<PK, PNode<PK, E>>() {
@Override
Expand All @@ -257,7 +255,7 @@ public void accept(PK pk, PNode<PK, E> pnode) {
}
pnode.resetTime(currentTime);
var updateList = updateMap.computeIfAbsent(pnode.getThreadId(), it -> new ArrayList<>(initSize));
updateList.add(pnode.getEntity());
updateList.add(pnode);
}
});
var count = 0;
Expand All @@ -278,21 +276,21 @@ public void accept(PK pk, PNode<PK, E> pnode) {
@Override
public void persistAllBlock() {
var currentTime = TimeUtils.currentTimeMillis();
var updateList = new ArrayList<E>(caches.size());
var updateList = new ArrayList<PNode<PK, E>>(caches.size());
caches.forEach(new BiConsumer<PK, PNode<PK, E>>() {
@Override
public void accept(PK pk, PNode<PK, E> pnode) {
if (noNeedUpdate(pnode)) {
return;
}
pnode.resetTime(currentTime);
updateList.add(pnode.getEntity());
updateList.add(pnode);
}
});
doPersist(updateList);
}

private void doPersist(List<E> updateList) {
private void doPersist(List<PNode<PK,E>> updateList) {
// 执行更新
if (updateList.isEmpty()) {
return;
Expand All @@ -305,28 +303,28 @@ private void doPersist(List<E> updateList) {
}
}

private void doPersistNoVersion(List<E> updateList) {
private void doPersistNoVersion(List<PNode<PK,E>> updateList) {
var page = Page.valueOf(1, DEFAULT_BATCH_SIZE, updateList.size());
var maxPageSize = page.totalPage();
for (var currentPage = 1; currentPage <= maxPageSize; currentPage++) {
page.setPage(currentPage);
var currentUpdateList = page.currentPageList(updateList);
OrmContext.getAccessor().batchUpdate(currentUpdateList);
OrmContext.getAccessor().batchUpdateNode(currentUpdateList);
}
}

private void doPersistWithVersion(List<E> updateList) {
private void doPersistWithVersion(List<PNode<PK,E>> updateList) {
var page = Page.valueOf(1, DEFAULT_BATCH_SIZE, updateList.size());
var maxPageSize = page.totalPage();
var versionFiledName = wrapper.versionFieldName();

for (var currentPage = 1; currentPage <= maxPageSize; currentPage++) {
page.setPage(currentPage);
var currentUpdateList = page.currentPageList(updateList);
List<E> entities = currentUpdateList.stream().map(PNode::getEntity).toList();
try {
var collection = OrmContext.getOrmManager().getCollection(clazz).withWriteConcern(WriteConcern.ACKNOWLEDGED);

var batchList = currentUpdateList.stream()
var batchList = entities.stream()
.map(it -> {
var version = wrapper.gvs(it);
wrapper.svs(it, version + 1);
Expand All @@ -336,6 +334,9 @@ private void doPersistWithVersion(List<E> updateList) {
.toList();

var result = collection.bulkWrite(batchList, new BulkWriteOptions().ordered(false));

long currentTime = TimeUtils.currentTimeMillis();
currentUpdateList.forEach(node->node.resetTime(currentTime));
if (result.getMatchedCount() == batchList.size()) {
continue;
}
Expand All @@ -347,7 +348,7 @@ private void doPersistWithVersion(List<E> updateList) {
} catch (Throwable t) {
logger.error("doPersistWithVersion(): [{}] batch update unknown error and try ", clazz.getSimpleName(), t);
}
persistAndCompareVersion(currentUpdateList);
persistAndCompareVersion(entities);
}
}

Expand Down

0 comments on commit 3aaab2d

Please sign in to comment.