-
I'm loosing my mind here. Here is my newly created NATS stream with consumer:
Here is my app serve.py: from faststream import FastStream
from faststream.nats.annotations import ContextRepo
from faststream.nats import NatsBroker
from stream.routers import router
import importlib
# Automatically import files with handlers (across all modules)
# importlib.import_module("stream.handlers")
from stream import handlers # noqa: F401
broker = NatsBroker("nats://localhost:4222")
broker.include_router(router)
app = FastStream(broker)
@app.on_startup
async def setup_broker(context: ContextRepo):
await broker.connect()
new_products_kv = await broker.stream.create_key_value(
bucket="new_products",
description="Bucket for newly discovered products",
ttl=3600 * 24 * 7,
)
feeds_kv = await broker.stream.create_key_value(
bucket="feeds",
description="Bucket used to store feed update state",
ttl=3600 * 24 * 7,
)
context.set_global("new_products_kv", new_products_kv)
context.set_global("feeds_kv", feeds_kv) and here are my handlers.py (I've commented out a lot of them just to leave simple failing use-case) import asyncio
import random
import pandas as pd
import datetime
import hashlib
import aiohttp
from faststream.nats.annotations import NatsBroker
from slugify import slugify
from nats.js.kv import KeyValue as KV
from typing_extensions import Annotated
from faststream.nats import JStream, PullSub
from faststream import Logger, Context
from pydantic import ValidationError
from datetime import datetime
from typing import List
from . import FEEDS_COUNT, APP_NAME, PRODUCTS_PER_FEED_MIN, PRODUCTS_PER_FEED_MAX
from .routers import router
from .types import FeedProduct, ProductImage, FeedState, Offer, Product, Feed
from .utils import (
increment_feed_import_state_counter,
get_image_extension_from_bytes,
download_image,
save_product_images,
discover_product_attributes,
discover_product_category,
)
FeedsKeyValue = Annotated[KV, Context("feeds_kv")]
NewProductsKeyValue = Annotated[KV, Context("new_products_kv")]
stream = JStream(
name="product_feeds",
subjects=["product_feeds", "product_feeds.>"],
declare=False,
)
images_df = pd.read_csv("./data/images.csv")
pull_sub = PullSub(batch_size=10, timeout=2)
@router.subscriber(
"product_feeds.feeds_update_requested",
stream=stream,
durable=APP_NAME,
max_workers=1,
pull_sub=pull_sub,
)
async def update_feeds(msg: str, broker: NatsBroker, logger: Logger):
for i in range(1, FEEDS_COUNT + 1):
logger.info(f"Updating feed; feed_id={i}")
await broker.publish(
message=i,
stream=stream.name,
subject="product_feeds.one_feed_update_requested",
)
@router.subscriber(
"product_feeds.one_feed_update_requested",
stream=stream,
durable=APP_NAME,
max_workers=1,
pull_sub=pull_sub,
)
async def update_feed(
feed_id: int, feeds_kv: FeedsKeyValue, broker: NatsBroker, logger: Logger
):
logger.info(f"Downloading feed; feed_id={feed_id}")
# Imagine downloading the feed from a remote server
await asyncio.sleep(10)
# Clear any previous state
await feeds_kv.delete(f"feed_import_{feed_id}")
# TODO: Load feed configuration from a database to determine items in the XML feed
items_count = 0
for item_id in range(
1, random.randint(PRODUCTS_PER_FEED_MIN, PRODUCTS_PER_FEED_MAX) + 1
):
logger.info(f"Processing item; item_id={item_id} feed_id={feed_id}")
raw_item = {
"id": item_id,
"title": f"Product {item_id}",
"price": random.randint(-100, 500),
"description": "This is a product",
"images": images_df.sample(3)["image_link"].to_list(),
"gender": random.choice(["male", "female"]),
"age_group": random.choice(["adult", "children"]),
"merchant_id": feed_id,
"size": None,
}
await broker.publish(
{"item": raw_item, "feed_id": feed_id},
stream=stream.name,
subject="product_feeds.raw_product_received",
)
items_count += 1
# Update the feed state
feed_state = FeedState(
items_count=items_count,
items_approved=0,
items_rejected=0,
started_at=datetime.datetime.now(),
)
await feeds_kv.put(f"feed_import_{feed_id}", feed_state.json().encode())
logger.info(f"Scheduled {items_count} products within the feed; feed_id={feed_id}") and routers.py: from faststream.nats import NatsRouter
router = NatsRouter() and finally types.py from datetime import datetime
from typing import Optional, List, Union, Dict, Any
from pydantic import BaseModel, Field, PositiveInt, HttpUrl, FilePath, FileUrl
class FeedState(BaseModel):
items_count: PositiveInt
items_approved: int
items_rejected: int
started_at: datetime
completed_at: Optional[datetime]
images: Optional[List[str]] = None
def increment_counter(self, counter: str):
if hasattr(self, counter):
setattr(self, counter, getattr(self, counter) + 1)
else:
raise ValueError(f"Invalid counter: {counter}")
def is_completed(self) -> bool:
completed = self.items_approved + self.items_rejected == self.items_count
if completed and not self.completed_at:
self.completed_at = datetime.now()
return completed Looks totally fine. But strange things happens when I publish my initial message:
App output:
Apparently it tries to call @Lancetnik you are my only hope here :) |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 5 replies
-
@pySilver first of all, please, fix the You have 2 imports, using the first one, but it is overrided by the second one. import datetime
from datetime import datetime Then we can dig into the real exception |
Beta Was this translation helpful? Give feedback.
-
Well, I found the problem: you are using the same durable name for all subscribers. NATS thinks, that is the same subscriber and sends you wrong messages to random subscriber. Just use different durable names for each ones. |
Beta Was this translation helpful? Give feedback.
-
@Lancetnik I should have check your message before I setup a demo project: https://github.com/pySilver/faststream-issue So it turns out it is not a FastStream issue at all. I'm new to event sourcing, I thought |
Beta Was this translation helpful? Give feedback.
Well, I found the problem: you are using the same durable name for all subscribers. NATS thinks, that is the same subscriber and sends you wrong messages to random subscriber. Just use different durable names for each ones.