Skip to content

Commit

Permalink
Add prune (#52)
Browse files Browse the repository at this point in the history
Add function to prune the dag
  • Loading branch information
0x26res authored Jan 23, 2024
1 parent 344ff69 commit 18009e9
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 1 deletion.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

<!-- insertion marker -->
## [v0.5.0](https://github.com/tradewelltech/beavers/releases/tag/v0.5.0) - 2024-01-23

<small>[Compare with v0.4.0](https://github.com/tradewelltech/beavers/compare/v0.4.0...v0.5.0)</small>

### Added

- Add python 12 support (#53) ([344ff69](https://github.com/tradewelltech/beavers/commit/344ff69309d81780d9d08effc2fdfe3b1f8d9b22) by 0x26res).
- Add prune ([4e5b06f](https://github.com/tradewelltech/beavers/commit/4e5b06f073c2e210f4cca8d67f096698c52c3fa9) by aandres).
- Add kafka json to arrow support (#50) ([120c116](https://github.com/tradewelltech/beavers/commit/120c116d13ab46604d54088bb07d851ff5d3fd00) by 0x26res).


## [v0.4.0](https://github.com/tradewelltech/beavers/releases/tag/v0.4.0) - 2023-11-26

<small>[Compare with v0.3.1](https://github.com/tradewelltech/beavers/compare/v0.3.1...v0.4.0)</small>
Expand Down
32 changes: 32 additions & 0 deletions beavers/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,38 @@ def silence(self, node: Node[T]) -> Node[T]:
)
)

def prune(self) -> list[Node]:
"""Remove any parts of the dag that are not connected to a sink."""
to_remove = []
for node in self._nodes[::-1]:
if (
not isinstance(node._function, _SinkFunction)
and node is not self._now_node
and node is not self._silent_now_node
):
observers = [
observer
for observer in node._observers
if observer not in to_remove
]
if len(observers) == 0:
to_remove.append(node)
else:
node._observers.clear()
node._observers.extend(observers)

if to_remove:
self._nodes = [node for node in self._nodes if node not in to_remove]
self._sources = {
name: node
for name, node in self._sources.items()
if node not in to_remove
}
self._timer_manager_nodes = [
node for node in self._timer_manager_nodes if node not in to_remove
]
return to_remove

def get_sources(self) -> dict[str, Node]:
"""Return the source `Node`s."""
return self._sources
Expand Down
2 changes: 1 addition & 1 deletion docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ git-changelog -io CHANGELOG.md

For new release, first prepare the change log, push and merge it.
```shell
git-changelog -bio CHANGELOG.md
git-changelog --bump=auto -io CHANGELOG.md
```

Then tag and push:
Expand Down
50 changes: 50 additions & 0 deletions tests/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -684,3 +684,53 @@ def test_mutate_inputs():
assert modifier.get_value() == [1]
assert passthrough.get_value() == [] # Notified but got the factory list
assert passthrough.get_cycle_id() != dag.get_cycle_id() # considered not updated


def test_prune_simple():
dag = Dag()
source = dag.source_stream([], name="source")
node = dag.stream(lambda x: x, []).map(source)
assert node in dag._nodes
assert source in dag._nodes
assert dag.get_sources() == {"source": source}
dag.prune()
assert node not in dag._nodes
assert source not in dag._nodes
assert dag.get_sources() == {}
dag.execute()


def test_prune_nothing():
dag = Dag()
source = dag.source_stream([], name="source")
node = dag.stream(lambda x, _: x, []).map(source, dag.now())
dag.sink("sink", node)

assert node in dag._nodes
assert source in dag._nodes
assert dag.get_sources() == {"source": source}
assert dag.prune() == []


def test_prune_sinks():
dag = Dag()
source_a = dag.source_stream([], name="source_a")
source_b = dag.source_stream([], name="source_b")
node_a = dag.stream(lambda x: x, []).map(source_a)
node_b = dag.stream(lambda x: x, []).map(source_b)
sink_b = dag.sink("sink_b", node_b)

dag.prune()
assert source_a not in dag._nodes
assert node_a not in dag._nodes
assert source_b in dag._nodes
assert node_b in dag._nodes
assert sink_b in dag._nodes
assert dag.get_sources() == {"source_b": source_b}
dag.execute()

source_a.set_stream(["a", "b"])
source_b.set_stream(["a", "b"])
dag.execute()
assert node_a.get_value() == [] # Not updated
assert node_b.get_value() == ["a", "b"]

0 comments on commit 18009e9

Please sign in to comment.