Skip to content

Commit

Permalink
Merge pull request #131 from IDEA-Research/optimize/import_performance
Browse files Browse the repository at this point in the history
Optimize/import performance
  • Loading branch information
imhuwq authored Feb 26, 2024
2 parents 6c61561 + c46cda0 commit 131cd5e
Show file tree
Hide file tree
Showing 15 changed files with 304 additions and 308 deletions.
11 changes: 1 addition & 10 deletions deepdataspace/globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,8 @@
_mongo_user = urllib.parse.quote_plus(MONGODB_USER)
_mongo_pass = urllib.parse.quote_plus(MONGODB_PASS)
_mongo_url = f"mongodb://{_mongo_user}:{_mongo_pass}@{MONGODB_HOST}:{MONGODB_PORT}/{MONGODB_DBNAME}"
_mongo_client = MongoClient(_mongo_url, authMechanism="SCRAM-SHA-256")
_mongo_client = MongoClient(_mongo_url, authMechanism="SCRAM-SHA-256", maxPoolSize=None)
MongoDB = _mongo_client[MONGODB_DBNAME]

# init redis client
Redis = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DBNAME, password=REDIS_PASS)

# init sentry client
# TODO: sentry is not necessary for dds tool, remove it as soon as possible
if SENTRY_DSN is not None:
sample_rate = 0.1 if ENV == RunningEnv.Prod else 1.0
sentry_sdk.init(dsn=SENTRY_DSN,
traces_sample_rate=sample_rate,
environment=ENV, )
sentry_sdk.set_tag("os.user", get_os_username())
277 changes: 233 additions & 44 deletions deepdataspace/io/importer.py

Large diffs are not rendered by default.

24 changes: 12 additions & 12 deletions deepdataspace/model/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""

import abc
import logging
import time
from threading import Lock
from typing import ClassVar
Expand All @@ -13,6 +14,7 @@
from typing import Tuple

from pydantic import BaseModel as _Base
from pymongo import WriteConcern
from pymongo.collection import Collection
from pymongo.operations import UpdateOne
from pymongo.typings import _DocumentType
Expand All @@ -24,6 +26,8 @@
_batch_save_queue = {} # a dict of batch save queue for every collection, {'collection_name': batch_save_queue, }
_batch_update_queue = {} # a dict of batch update queue for every collection, {'collection_name': batch_update_queue, }

logger = logging.getLogger("model.base")


def current_ts():
"""
Expand Down Expand Up @@ -53,20 +57,13 @@ def get_collection(cls, *args, **kwargs) -> Collection[_DocumentType]:

raise NotImplementedError

def post_init(self):
"""
Post init hook for initializing a model object.
"""
pass

@classmethod
def from_dict(cls, data: dict):
"""
Convert a python dict to a model object.
"""

obj = cls.parse_obj(data)
obj.post_init()
return obj

def to_dict(self, include: list = None, exclude: list = None):
Expand Down Expand Up @@ -232,6 +229,8 @@ def batch_update(cls, filters: dict, set_data: dict = None, unset_data: dict = N
co = cls.get_collection()
if co is None:
return None
wc = WriteConcern(w=0)
co = co.with_options(write_concern=wc)

op = UpdateOne(filters, {"$set": set_data, "$unset": unset_data})

Expand All @@ -257,6 +256,8 @@ def finish_batch_update(cls):
op_lock = cls._get_batch_op_lock()
with op_lock:
co = cls.get_collection()
wc = WriteConcern(w=0)
co = co.with_options(write_concern=wc)
queue = _batch_update_queue.setdefault(cls_id, [])
if queue:
co.bulk_write(queue)
Expand All @@ -275,7 +276,6 @@ def save(self, refresh=False):
If refresh is True, the object will be re-fetched from mongodb after saving.
"""

self.post_init()
co = self.get_collection()
if co is None:
return None
Expand All @@ -293,7 +293,6 @@ def save(self, refresh=False):
new_self = co.find_one({"_id": _id})
new_self.pop("_id", None)
self.__dict__.update(new_self)
self.post_init()
return self

def batch_save(self, batch_size: int = 20, set_on_insert: Dict = None):
Expand All @@ -303,13 +302,12 @@ def batch_save(self, batch_size: int = 20, set_on_insert: Dict = None):
:param batch_size: the batch size. We will only write to mongodb when the batch is full.
:param set_on_insert: the fields only need to be set when we are inserting a new object.
"""

self.post_init()

cls = self.__class__
co = cls.get_collection()
if co is None:
return None
wc = WriteConcern(w=0)
co = co.with_options(write_concern=wc)

_id = self.__dict__.get("id", None)
if _id is None:
Expand Down Expand Up @@ -348,6 +346,8 @@ def finish_batch_save(cls):
op_lock = _batch_lock[cls_id]
with op_lock:
co = cls.get_collection()
wc = WriteConcern(w=0)
co = co.with_options(write_concern=wc)
queue = _batch_save_queue.setdefault(cls_id, [])
if queue:
co.bulk_write(queue)
Expand Down
70 changes: 20 additions & 50 deletions deepdataspace/model/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from deepdataspace.model.image import ImageModel
from deepdataspace.model.label import Label
from deepdataspace.utils.file import create_file_url
from deepdataspace.utils.function import count_block_time
from deepdataspace.utils.string import get_str_md5

logger = logging.getLogger("io.model.dataset")
Expand Down Expand Up @@ -103,7 +104,7 @@ def get_collection(cls, *args, **kwargs) -> Collection[_DocumentType]:
group_name: str = None

_batch_queue: Dict[int, ImageModel] = {}
_batch_size: int = 100
_batch_size: int = 200

@classmethod
def create_dataset(cls,
Expand Down Expand Up @@ -139,6 +140,7 @@ def create_dataset(cls,
dataset.path = path or dataset.path
dataset.files = files or dataset.files
dataset.name = name
dataset.num_images = Image(dataset.id).count_num({})
dataset.save()
return dataset
else:
Expand All @@ -148,42 +150,11 @@ def create_dataset(cls,
dataset = cls(name=name, id=id_, type=type, path=path,
files=files, status=DatasetStatus.Ready,
description=description, description_func=description_func)
dataset.post_init()
dataset.num_images = Image(dataset.id).count_num({})
dataset.save()
return dataset

@classmethod
def get_importing_dataset(cls,
name: str,
id_: str = None,
type: str = None,
path: str = None,
files: dict = None,
) -> "DataSet":
"""
This is the same as create_dataset.
But if the dataset is new, it's status will be set to "waiting" instead of "ready".
"""

if id_:
dataset = DataSet.find_one({"id": id_})
if dataset is not None:
dataset.type = type or dataset.type
dataset.path = path or dataset.path
dataset.files = files or dataset.files
dataset.name = name
dataset.save()
return dataset
else:
id_ = uuid.uuid4().hex

files = files or {}
dataset = cls(name=name, id=id_, type=type, path=path, files=files, status=DatasetStatus.Waiting)
dataset.post_init()
dataset.save()
return dataset

def _add_cover(self, force_update: bool = False):
def add_cover(self, force_update: bool = False):
has_cover = bool(self.cover_url)
if has_cover and not force_update:
return
Expand Down Expand Up @@ -257,17 +228,16 @@ def add_image(self,
image.flag = flag or image.flag
image.flag_ts = flag_ts or image.flag_ts
image.metadata = metadata or image.metadata
image.post_init()
image._dataset = self # this saves a db query

image.save()
self.num_images = Model.count_num({})
self._add_cover()
self.add_cover()

# save whitelist to redis
whitelist_dirs = set()
self._add_local_file_url_to_whitelist(image.url, whitelist_dirs)
self._add_local_file_url_to_whitelist(image.url_full_res, whitelist_dirs)
self.add_local_file_url_to_whitelist(image.url, whitelist_dirs)
self.add_local_file_url_to_whitelist(image.url_full_res, whitelist_dirs)
if whitelist_dirs:
Redis.sadd(RedisKey.DatasetImageDirs, *whitelist_dirs)

Expand Down Expand Up @@ -326,7 +296,7 @@ def batch_add_image(self,
return image

@staticmethod
def _add_local_file_url_to_whitelist(url: str, whitelist: set):
def add_local_file_url_to_whitelist(url: str, whitelist: set):
if not url or not url.startswith("/files/local_files"):
return

Expand Down Expand Up @@ -377,7 +347,7 @@ def _batch_save_image_batch(self):
object_types.add(AnnotationType.Segmentation)
if obj.alpha and AnnotationType.Matting not in object_types:
object_types.add(AnnotationType.Matting)
self._add_local_file_url_to_whitelist(obj.alpha, whitelist_dirs)
self.add_local_file_url_to_whitelist(obj.alpha, whitelist_dirs)
if obj.points and AnnotationType.KeyPoints not in object_types:
object_types.add(AnnotationType.KeyPoints)

Expand All @@ -387,8 +357,8 @@ def _batch_save_image_batch(self):
image.batch_save(batch_size=self._batch_size, set_on_insert={"idx": image.idx})
idx += 1

self._add_local_file_url_to_whitelist(image.url, whitelist_dirs)
self._add_local_file_url_to_whitelist(image.url_full_res, whitelist_dirs)
self.add_local_file_url_to_whitelist(image.url, whitelist_dirs)
self.add_local_file_url_to_whitelist(image.url_full_res, whitelist_dirs)

# finish batch saves
IModel.finish_batch_save()
Expand All @@ -406,9 +376,9 @@ def _batch_save_image_batch(self):

self._batch_queue.clear()

def batch_save_image(self, enforce: bool = False):
def batch_save_image(self):
batch_is_full = len(self._batch_queue) >= self._batch_size
if batch_is_full or enforce:
if batch_is_full:
self._batch_save_image_batch()
return True
return False
Expand All @@ -419,7 +389,7 @@ def finish_batch_add_image(self):
This saves all images in the buffer queue to database.
"""
self._batch_save_image_batch()
self._add_cover()
self.add_cover()

def eval_description(self):
"""
Expand Down Expand Up @@ -449,16 +419,16 @@ def cascade_delete(dataset: "DataSet"):
return

dataset_id = dataset.id
print(f"dataset [{dataset_id}] is found, deleting...")
logger.info(f"dataset [{dataset_id}] is found, deleting...")

print(f"dataset [{dataset_id}] is found, deleting categories...")
logger.info(f"dataset [{dataset_id}] is found, deleting categories...")
Category.delete_many({"dataset_id": dataset_id})

print(f"dataset [{dataset_id}] is found, deleting labels...")
logger.info(f"dataset [{dataset_id}] is found, deleting labels...")
Label.delete_many({"dataset_id": dataset_id})

print(f"dataset [{dataset_id}] is found, deleting images...")
logger.info(f"dataset [{dataset_id}] is found, deleting images...")
Image(dataset_id).get_collection().drop()

DataSet.delete_many({"id": dataset_id})
print(f"dataset [{dataset_id}] is deleted.")
logger.info(f"dataset [{dataset_id}] is deleted.")
Loading

0 comments on commit 131cd5e

Please sign in to comment.