Skip to content

Commit

Permalink
Dynamic Buffer Size now handles slow commits better, regularly random…
Browse files Browse the repository at this point in the history
…ly checks small buffer sizes to see if this improves throughput, especially good for relations
  • Loading branch information
jkminder committed Sep 27, 2022
1 parent 0c96935 commit bea6e6d
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 25 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ The library is built specifically for converting data into a [neo4j](https://neo

Note: The [py2neo](https://py2neo.org/2021.1/index.html) library does not support parallel relations of the same type (same source, same target and same type). If your graph requires such parallel relations please checkout the provided [py2neo extensions](https://rel2graph.jkminder.ch/py2neo_extensions.html).
## Installation
If you have setup a private ssh key for your github, copy-paste the command below to install the latest version ([v0.7.0][latest_tag]):
If you have setup a private ssh key for your github, copy-paste the command below to install the latest version ([v0.7.1][latest_tag]):
```
pip install git+ssh://[email protected]/sg-dev/[email protected].0
pip install git+ssh://[email protected]/sg-dev/[email protected].1
```

If you don't have ssh set up, download the latest wheel [here][latest_wheel] and install the wheel with:
Expand Down Expand Up @@ -88,7 +88,7 @@ converter()
# Known issues
If you encounter a bug or an unexplainable behavior, please check the [known issues](https://github.com/sg-dev/rel2graph/labels/bug) list. If your issue is not found, submit a new one.

[latest_version]: v0.7.0
[latest_tag]: https://github.com/sg-dev/rel2graph/releases/tag/v0.7.0
[latest_wheel]: https://github.com/sg-dev/rel2graph/releases/download/v0.7.0/rel2graph-0.7.0-py3-none-any.whl
[latest_version]: v0.7.1
[latest_tag]: https://github.com/sg-dev/rel2graph/releases/tag/v0.7.1
[latest_wheel]: https://github.com/sg-dev/rel2graph/releases/download/v0.7.1/rel2graph-0.7.1-py3-none-any.whl
[wiki]: https://rel2graph.jkminder.ch/index.html
65 changes: 46 additions & 19 deletions rel2graph/core/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from .schema_compiler import compile_schema
import threading
import time
import random

from rel2graph.core import graph_elements

Expand Down Expand Up @@ -111,7 +112,6 @@ def __init__(self, config: WorkerConfig, update_interval: int = 60) -> None:

self._config = config
self._update_interval = update_interval
self._step_size = 10
self._stop_event = threading.Event()

def terminate(self) -> None:
Expand All @@ -133,27 +133,56 @@ def take_measurement(self):

def run(self) -> None:
"""Dynamically adapts the buffer size based on the performance of the conversion."""
bound = lambda x: min(max(10, x), 1000)
while(True):
initial = True
def bound(x):
x = round(x/10)*10 # round to nearest 10
res = 1 if x < 10 else min(x, 1000)
return res
def get_adaptive_step_size(current_buffer_size, initial):
# If its the initial step, we first try large steps
# The step size is inversely proportional to the buffer size and bounded by 1 and 1000
# To escape local minima with small bs it is randomly set to 10 for probability 0.2
if initial:
return 90
base = 10
if current_buffer_size > 100:
base = 20
elif current_buffer_size > 200:
base = 50
elif current_buffer_size > 500:
base = 100

# for 20% of the time we set the step size to a random number to escape local minima
# if the buffer is very small, we set the step size to 100 to test large buffers
# if the buffer is large we set the step size such that the buffer size is 10
return current_buffer_size - 10 if random.random() > 0.2 and current_buffer_size > 30 else base

while(not self._stop_event.is_set()):
higher_avg, lower_avg = 99999, 99999 # init with high values
current_size = self._config.buffer_size
step_size = get_adaptive_step_size(self._config.buffer_size, initial)
initial = False
try:
current_avg = self.take_measurement()
self._config.buffer_size = bound(self._config.buffer_size + self._step_size)
higher_avg = self.take_measurement()
self._config.buffer_size = bound(self._config.buffer_size - 2*self._step_size)
lower_avg = self.take_measurement()
self._config.buffer_size = bound(self._config.buffer_size + self._step_size) # back to original value
if current_size < 1000:
self._config.buffer_size = bound(current_size + step_size)
higher_avg = self.take_measurement()
if current_size > 1:
self._config.buffer_size = bound(current_size - step_size)
lower_avg = self.take_measurement()
self._config.buffer_size = current_size # back to original value
except Exception:
break # if the thread is terminated
time_list = [current_avg, higher_avg, lower_avg]
imin = time_list.index(min(time_list))
if imin == 1:
self._config.buffer_size = bound(self._config.buffer_size + self._step_size)
logger.debug(f"Buffer size increased to {self._config.buffer_size}")
self._config.buffer_size = bound(self._config.buffer_size + step_size)
logger.debug(f"Buffer size increased from {current_size} to {self._config.buffer_size}")
elif imin == 2:
self._config.buffer_size = bound(self._config.buffer_size - self._step_size)
logger.debug(f"Buffer size decreased to {self._config.buffer_size}")

self._config.buffer_size = bound(self._config.buffer_size - step_size)
logger.debug(f"Buffer size decreased from {current_size} to {self._config.buffer_size}")
self._stop_event.wait(10) # run 10s in current configuration
class Worker(threading.Thread):
"""The Worker does the main conversion. It is build to be parallelised."""

Expand Down Expand Up @@ -361,7 +390,7 @@ def run(self) -> None:
worker.start()

if self._use_dynamic_buffer:
self._dynamic_buffer_monitor = DynamicBufferMonitor(self._config, update_interval=10)
self._dynamic_buffer_monitor = DynamicBufferMonitor(self._config, update_interval=30)
self._dynamic_buffer_monitor.start()

# main loop
Expand Down Expand Up @@ -513,7 +542,7 @@ def _is_valid_instance(self) -> None:
def _setup_worker_pool(self, type, pb = None) -> None:
if self._worker_pool is None:
instantiated_iterator = iter(self.iterator)
config = WorkerConfig(instantiated_iterator, self._factories, self._node_mask, self._relation_mask, type, self._graph, 1 if self._serialize else 100, pb)
config = WorkerConfig(instantiated_iterator, self._factories, self._node_mask, self._relation_mask, type, self._graph, 1 if self._serialize else 100, pb)
self._worker_pool = WorkerPool(self._num_workers, config, use_dynamic_buffer=not self._serialize)
else:
logger.info("Continuing previous work...")
Expand Down Expand Up @@ -575,7 +604,6 @@ def __call__(self, progress_bar: "tdqm.tqdm" = None, skip_nodes = False, skip_re
if not self._processed_relations and not skip_relations:
# Create relations
logger.info("Starting creation of relations.")

self._setup_worker_pool(WorkType.RELATION, pb)

try:
Expand All @@ -592,5 +620,4 @@ def __call__(self, progress_bar: "tdqm.tqdm" = None, skip_nodes = False, skip_re
self._processed_relations = True
else:
logger.info("Skipping creation of relations.")

logger.info(f"Processed in total {self._n_nodes} nodes and {self._n_relations} relations (this run took {int(time.time()-start)}s)")
logger.info(f"Processed in total {self._n_nodes} nodes and {self._n_relations} relations (this run took {int(time.time()-start)}s)")
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
setup(
name = "rel2graph",
packages = find_packages(),
version = "0.7.0",
version = "0.7.1",
description = "Library for converting relational data into graph data (neo4j)",
author = "Julian Minder",
author_email = "[email protected]",
Expand Down

0 comments on commit bea6e6d

Please sign in to comment.