Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring the Pipeline Runtime #304

Merged
merged 19 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions nodestream/cli/operations/run_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def create_progress_reporter(
callback=indicator.progress_callback,
on_start_callback=indicator.on_start,
on_finish_callback=indicator.on_finish,
on_fatal_error_callback=indicator.on_fatal_error,
)


Expand All @@ -139,8 +140,18 @@ def progress_callback(self, _, __):
def on_finish(self, context: PipelineContext):
pass

def on_fatal_error(self, exception: Exception):
self.command.line(
"<error>Encountered a fatal error while running pipeline</error>"
)
self.command.line(f"<error>{exception}</error>")


class SpinnerProgressIndicator(ProgressIndicator):
def __init__(self, command: NodestreamCommand, pipeline_name: str) -> None:
super().__init__(command, pipeline_name)
self.exception = None

def on_start(self):
self.progress = self.command.progress_indicator()
self.progress.start(f"Running pipeline: '{self.pipeline_name}'")
Expand All @@ -156,3 +167,12 @@ def on_finish(self, context: PipelineContext):
stats = ((k, str(v)) for k, v in context.stats.items())
table = self.command.table(STATS_TABLE_COLS, stats)
table.render()

if self.exception:
raise self.exception

def on_fatal_error(self, exception: Exception):
self.progress.set_message(
"<error>Encountered a fatal error while running pipeline</error>"
)
self.exception = exception
6 changes: 5 additions & 1 deletion nodestream/databases/null.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import AsyncGenerator, Iterable

from ..model import IngestionHook, Node, RelationshipWithNodes, TimeToLiveConfiguration
from ..pipeline.pipeline import empty_async_generator
from ..schema.migrations import Migrator
from ..schema.migrations.operations import Operation
from .copy import TypeRetriever
Expand All @@ -13,6 +12,11 @@
)


async def empty_async_generator():
for i in []:
yield i # pragma: no cover


class NullMigrator(Migrator):
async def execute_operation(self, _: Operation):
pass
Expand Down
163 changes: 163 additions & 0 deletions nodestream/pipeline/channel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import asyncio
from asyncio import Queue, wait_for
from typing import Optional, Tuple


# WARNING: DoneObject refernces should not exist beyond this module.
class DoneObject:
"""A `DoneObject` is a marker object that indicates that a step is done.

When a step is done processing records, it should emit a `DoneObject` to
indicate that it is finished. This allows downstream steps to know when a
step is done and they can stop processing records.

The `DoneObject` is a special object that is used to signal the end of a
stream of records. It is not a record itself and should not be processed as
such.
"""

pass


CHANNEL_TIMEOUT = 0.25


class Channel:
"""`Channel` is a communication channel between steps in a pipeline.

A `Channel` is used to pass records between steps in a pipeline. It is a
simple queue that can be used to pass records from one step to another. The
`Channel` is asynchronous and can be used to pass records between steps in
an asynchronous context.

A `Channel` has a fixed size and will block if the queue is full. This
allows the pipeline to control the flow of records between steps and
prevent one step from overwhelming another step with too many records.
"""

__slots__ = ("queue", "input_dropped")

def __init__(self, size: int) -> None:
self.queue = Queue(maxsize=size)
self.input_dropped = False

async def get(self):
"""Get an object from the channel.

This method is used to get an object from the channel. It will block
until an object is available in the channel. If the channel is empty,
it will block until an object is available.

Returns:
object: The object that was retrieved from the channel.
"""
return await self.queue.get()

async def put(self, obj) -> bool:
"""Put an object in the channel.

This method is used to put an object in the channel. It will block
until the object is put in the channel. If the channel is full, it
will block until there is space in the channel.

Returns:
bool: True if the object was successfully put in the channel, False
if the channel is full and the object was not put in the channel.
"""
try:
await wait_for(self.queue.put(obj), timeout=CHANNEL_TIMEOUT)
return True
except (TimeoutError, asyncio.TimeoutError):
return False


class StepOuput:
zprobst marked this conversation as resolved.
Show resolved Hide resolved
"""`StepOutput` is an output channel for a step in a pipeline.

A `StepOutput` is used to pass records from a step to the next step in a
pipeline. It is a simple wrapper around a `Channel` that provides a more
convenient interface for putting records in the channel.
"""

__slots__ = ("channel",)

def __init__(self, channel: Channel) -> None:
self.channel = channel

async def done(self):
"""Mark the output channel as done.

This method is used to mark the output channel as done. It will put a
`DoneObject` in the channel to indicate that the step is finished
processing records. This allows downstream steps to know when the step
is done and they can stop processing records.
"""
await self.channel.put(DoneObject)

async def put(self, obj) -> bool:
"""Put an object in the output channel.

This method is used to put an object in the output channel. It will
block until the object is put in the channel. If the channel is full,
it will block until there is space in the channel unless the channel is
closed on the other end.

Returns:
bool: True if the object was successfully put in the channel, False
if the channel is closed on the other end and the object was not
put in the channel.
"""
successfully_put = False
while not successfully_put and not self.channel.input_dropped:
successfully_put = await self.channel.put(obj)

return successfully_put


class StepInput:
"""`StepInput` is an input channel for a step in a pipeline.

A `StepInput` is used to get records from the previous step in a pipeline.
It is a simple wrapper around a `Channel` that provides a more convenient
interface for getting records from the channel.

The `StepInput` is an asynchronous generator that can be used to get
records from the channel in an asynchronous context.
"""

__slots__ = ("channel",)

def __init__(self, channel: Channel) -> None:
self.channel = channel

async def get(self) -> Optional[object]:
"""Get an object from the input channel.

This method is used to get an object from the input channel. It will
block until an object is available in the channel. Once the channel is
closed (i.e. a `DoneObject` is received), this method will return None.

Returns:
object: The object that was retrieved from the channel.
"""
if (object := await self.channel.get()) is DoneObject:
return None

return object

def done(self):
"""Mark the input channel as done.

This will close the channel.
"""
self.channel.input_dropped = True


def channel(size: int) -> Tuple[StepInput, StepOuput]:
"""Create a new input and output channel.

Args:
size: The size of the channel.
"""
channel = Channel(size)
return StepInput(channel), StepOuput(channel)
9 changes: 4 additions & 5 deletions nodestream/pipeline/extractors/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@
class Extractor(Step):
"""Extractors represent the source of a set of records.

They are like any other step. However, they ignore the incoming record stream and instead produce their own
stream of records. For this reason they generally should only be set at the beginning of a pipeline.
They are like any other step. However, they ignore the incoming record '
stream and instead produce their own stream of records. For this reason
they generally should only be set at the beginning of a pipeline.
"""

def handle_async_record_stream(
self, _: AsyncGenerator[Any, Any]
) -> AsyncGenerator[Any, Any]:
def emit_outstanding_records(self):
return self.extract_records()

@abstractmethod
Expand Down
13 changes: 5 additions & 8 deletions nodestream/pipeline/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,13 @@
class Filter(Step):
"""A `Filter` takes a given record and evaluates whether or not it should continue downstream.

`Filter` steps generally make up the middle of an ETL pipeline and are responsible
for ensuring only relevant records make it through.
`Filter` steps generally make up the middle of an ETL pipeline and are
responsible for ensuring only relevant records make it through.
"""

async def handle_async_record_stream(
self, record_stream: AsyncGenerator[Any, Any]
) -> AsyncGenerator[Any, Any]:
async for record in record_stream:
if record is Flush or not await self.filter_record(record):
yield record
async def process_record(self, record, _) -> AsyncGenerator[object, None]:
if record is Flush or not await self.filter_record(record):
yield record

@abstractmethod
async def filter_record(self, record: Any) -> bool:
Expand Down
Loading