Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed Jun 18, 2024
1 parent d44be98 commit a77c9d7
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 144 deletions.
4 changes: 1 addition & 3 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,7 @@ def _make_rest_sink_config(self) -> "DatahubRestSinkConfig":
# TODO: We should refactor out the multithreading functionality of the sink
# into a separate class that can be used by both the sink and the graph client
# e.g. a DatahubBulkRestEmitter that both the sink and the graph client use.
return DatahubRestSinkConfig(
**self.config.dict(), mode=RestSinkMode.ASYNC_BATCH
)
return DatahubRestSinkConfig(**self.config.dict(), mode=RestSinkMode.ASYNC)

@contextlib.contextmanager
def make_rest_sink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
MetadataChangeEvent,
MetadataChangeProposal,
)
from datahub.utilities.advanced_thread_executor import (
from datahub.utilities.partition_executor import (
BatchPartitionExecutor,
PartitionExecutor,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,14 @@ def __init__(
self._clearinghouse_started = False

self._pending_count = BoundedSemaphore(max_pending)
self._pending = queue.Queue(maxsize=max_pending)
self._pending: "queue.Queue[Optional[_BatchPartitionWorkItem]]" = queue.Queue(
maxsize=max_pending
)

# If this is true, that means shutdown() has been called and self._pending is empty.
self._queue_empty_for_shutdown = False

def _clearinghouse_worker(self) -> None:
def _clearinghouse_worker(self) -> None: # noqa: C901
# This worker will pull items off the queue, and submit them into the executor
# in batches. Only this worker will submit process commands to the executor thread pool.

Expand Down Expand Up @@ -249,33 +251,44 @@ def _find_ready_items() -> List[_BatchPartitionWorkItem]:
keys_in_flight.remove(key)
keys_no_longer_in_flight.clear()

# Then, update the pending key completion.
# Then, update the pending key completion and build the ready list.
pending = pending_key_completion.copy()
pending_key_completion.clear()

ready = []
for item in pending:
if item.key not in keys_in_flight:
keys_in_flight.add(item.key)
if (
len(ready) < self.max_per_batch
and item.key not in keys_in_flight
):
ready.append(item)
else:
pending_key_completion.append(item)

return ready

def _build_batch() -> List[_BatchPartitionWorkItem]:
next_batch = _find_ready_items()

while not self._queue_empty_for_shutdown and not (
len(next_batch) >= self.max_per_batch
or (
while (
not self._queue_empty_for_shutdown
and len(next_batch) < self.max_per_batch
):
blocking = True
if (
next_batch
and _now() - last_submit_time > self.min_process_interval
and workers_available > 0
)
):
):
# If we're past the submit deadline, pull from the queue
# in a non-blocking way, and submit the batch once the queue
# is empty.
blocking = False

try:
next_item: Optional[_BatchPartitionWorkItem] = self._pending.get(
block=True, timeout=self.min_process_interval.total_seconds()
block=blocking,
timeout=self.min_process_interval.total_seconds(),
)
if next_item is None:
self._queue_empty_for_shutdown = True
Expand All @@ -287,7 +300,8 @@ def _build_batch() -> List[_BatchPartitionWorkItem]:
else:
next_batch.append(next_item)
except queue.Empty:
pass
if not blocking:
break

return next_batch

Expand Down Expand Up @@ -357,6 +371,14 @@ def submit(
self._pending.put(_BatchPartitionWorkItem(key, args, done_callback))

def shutdown(self) -> None:
if not self._clearinghouse_started:
# This is required to make shutdown() idempotent, which is important
# when it's called explicitly and then also by a context manager.
logger.debug("Shutting down: clearinghouse not started")
return

logger.debug(f"Shutting down {self.__class__.__name__}")

# Send the shutdown signal.
self._pending.put(None)

Expand All @@ -368,10 +390,10 @@ def shutdown(self) -> None:
# We must wait for the clearinghouse worker to exit before calling shutdown
# on the thread pool. Without this, the clearinghouse worker might fail to
# enqueue pending tasks into the pool.
while not self._clearinghouse_started:
while self._clearinghouse_started:
time.sleep(_PARTITION_EXECUTOR_FLUSH_SLEEP_INTERVAL)

self._executor.shutdown(wait=True)
self._executor.shutdown(wait=False)

def close(self) -> None:
self.shutdown()

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import time

from datahub.utilities.backpressure_aware_executor import BackpressureAwareExecutor
from datahub.utilities.perf_timer import PerfTimer


def test_backpressure_aware_executor_simple():
def task(i):
return i

assert {
res.result()
for res in BackpressureAwareExecutor.map(
task, ((i,) for i in range(10)), max_workers=2
)
} == set(range(10))


def test_backpressure_aware_executor_advanced():
task_duration = 0.5
started = set()
executed = set()

def task(x, y):
assert x + 1 == y
started.add(x)
time.sleep(task_duration)
executed.add(x)
return x

args_list = [(i, i + 1) for i in range(10)]

with PerfTimer() as timer:
results = BackpressureAwareExecutor.map(
task, args_list, max_workers=2, max_pending=4
)
assert timer.elapsed_seconds() < task_duration

# No tasks should have completed yet.
assert len(executed) == 0

# Consume the first result.
first_result = next(results)
assert 0 <= first_result.result() < 4
assert timer.elapsed_seconds() > task_duration

# By now, the first four tasks should have started.
time.sleep(task_duration)
assert {0, 1, 2, 3}.issubset(started)
assert 2 <= len(executed) <= 4

# Finally, consume the rest of the results.
assert {r.result() for r in results} == {
i for i in range(10) if i != first_result.result()
}

# Validate that the entire process took about 5-10x the task duration.
# That's because we have 2 workers and 10 tasks.
assert 5 * task_duration < timer.elapsed_seconds() < 10 * task_duration
Loading

0 comments on commit a77c9d7

Please sign in to comment.