Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is that possible to add more data after process.map? #99

Open
BobDLA opened this issue Jan 6, 2023 · 2 comments
Open

Is that possible to add more data after process.map? #99

BobDLA opened this issue Jan 6, 2023 · 2 comments
Labels
enhancement New feature or request

Comments

@BobDLA
Copy link

BobDLA commented Jan 6, 2023

for example in this demo code:

import pypeln as pl
import time
from random import random

def slow_add1(x):
time.sleep(random()) # <= some slow computation
return x + 1

def slow_gt3(x):
time.sleep(random()) # <= some slow computation
return x > 3

data = range(10) # [0, 1, 2, ..., 9]

stage = pl.process.map(slow_add1, data, workers=3, maxsize=4)
stage = pl.process.filter(slow_gt3, stage, workers=2)

data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]

Is that possible to append more to data after this code "pl.process.map(slow_add1, data, workers=3, maxsize=4)" and flow to next stage? Is there demo code?

Thanks

@BobDLA BobDLA added the enhancement New feature or request label Jan 6, 2023
@zzl221000
Copy link

Use queues to achieve this. @BobDLA

from multiprocessing import freeze_support

import pypeln as pl
import time
from random import random
import multiprocessing as mp
import threading

q = mp.Queue()
CANCEL_FLAG = 'end'


def slow_add1(x):
    time.sleep(random())  # <= some slow computation
    return x + 1


def slow_gt3(x):
    time.sleep(random())  # <= some slow computation
    return x > 3


def myqueue():
    while True:
        item = q.get(True, 2)
        if item =='end':
            return
        if item is not None:
            yield item


def publish():
    for i in range(10):
        q.put(i)
    q.put(CANCEL_FLAG)
if __name__ == '__main__':
    freeze_support()
    data = myqueue()

    stage = pl.process.map(slow_add1, data, workers=3, maxsize=4)
    stage = pl.process.filter(slow_gt3, stage, workers=2)
    threading.Thread(target=publish,daemon=True).start()

    for d in stage:
        print(d)

@miranda1000
Copy link

miranda1000 commented Sep 30, 2023

The original OP asked about adding data between two stages:

Is that possible to append more to data after this code "pl.process.map(slow_add1, data, workers=3, maxsize=4)"

But as @zzl221000 replied how to add data at the start of the queue I'll add my solution to this problem, just in case someone needs it:

def __init_queue(self):
        async def tts_queue():
            while True:
                if self._processing_input.empty():
                    async with self._queued_element_notifier:
                        await self._queued_element_notifier.wait()

                yield self._processing_input.get(block=True, timeout=2)

        queue = pl.task.from_iterable(tts_queue())                  \
                    | ...

        async def start_queue():
            while True:
                # just iterate
                async for _ in queue.__aiter__():
                    pass

        asyncio.get_event_loop().create_task(start_queue())

You'll need self._queued_element_notifier = asyncio.Condition() and self._processing_input = mp.Queue(), and to insert data:

async def enqueue(self, data):
        self._processing_input.put(data)
        
        # notify the tts_queue loop
        async with self._queued_element_notifier:
            self._queued_element_notifier.notify()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants