-
Notifications
You must be signed in to change notification settings - Fork 144
/
rss_source_pipe.py
210 lines (178 loc) · 9 KB
/
rss_source_pipe.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
import mrc
from pydantic import BaseModel
from pydantic import Field
from pydantic import ValidationError
from pydantic import validator
from morpheus.modules.general.monitor import MonitorLoaderFactory
from morpheus.modules.input.rss_source import RSSSourceLoaderFactory
from morpheus.modules.preprocess.deserialize import DeserializeLoaderFactory
from morpheus.utils.module_utils import ModuleLoaderFactory
from morpheus.utils.module_utils import register_module
from .schema_transform import SchemaTransformLoaderFactory
from .vdb_resource_tagging_module import VDBResourceTaggingLoaderFactory
from .web_scraper_module import WebScraperLoaderFactory
logger = logging.getLogger(__name__)
class RSSSourcePipeSchema(BaseModel):
batch_size: int = 32
cache_dir: str = "./.cache/http"
cooldown_interval_sec: int = 600
enable_cache: bool = False
enable_monitor: bool = True
feed_input: List[str] = Field(default_factory=list)
interval_sec: int = 600
output_batch_size: int = 2048
request_timeout_sec: float = 2.0
run_indefinitely: bool = True
stop_after_rec: int = 0
strip_markup: bool = True
vdb_resource_name: str
web_scraper_config: Optional[Dict[Any, Any]] = None
@validator('feed_input', pre=True)
def validate_feed_input(cls, to_validate): # pylint: disable=no-self-argument
if isinstance(to_validate, str):
return [to_validate]
if isinstance(to_validate, list):
return to_validate
raise ValueError('feed_input must be a string or a list of strings')
class Config:
extra = "forbid"
RSSSourcePipeLoaderFactory = ModuleLoaderFactory("rss_source_pipe", "morpheus_examples_llm", RSSSourcePipeSchema)
@register_module("rss_source_pipe", "morpheus_examples_llm")
def _rss_source_pipe(builder: mrc.Builder):
"""
Creates a pipeline for processing RSS feeds.
This function sets up a pipeline that takes RSS feed data, scrapes web content
based on the feed, and then outputs the scraped data. It integrates modules like RSS source,
web scraper, and deserializer, along with monitoring for each stage.
Parameters
----------
builder : mrc.Builder
The Morpheus builder to which the pipeline modules will be added.
Notes
-----
The module configuration can include the following parameters:
- **rss_config**: Configuration for the RSS source module.
- **batch_size**: Number of RSS feed items to process in each batch.
- **cache_dir**: Directory for caching RSS feed data.
- **cooldown_interval_sec**: Cooldown interval in seconds between fetches.
- **enable_cache**: Boolean to enable caching of feed data.
- **enable_monitor**: Boolean to enable monitoring for this module.
- **feed_input**: List of RSS feed URLs to process.
- **interval_sec**: Interval in seconds for fetching new feed items.
- **request_timeout_sec**: Timeout in seconds for RSS feed requests.
- **run_indefinitely**: Boolean to indicate continuous running.
- **stop_after**: Number of records to process before stopping (0 for indefinite).
- **strip_markup**: When True, strip HTML & XML markup from feed content.
- **web_scraper_config**: Configuration for the web scraper module.
- **chunk_overlap**: Overlap size for chunks in web scraping.
- **chunk_size**: Size of content chunks for processing.
- **enable_cache**: Boolean to enable caching of scraped data.
The pipeline connects these modules in the following order:
RSS Source -> Web Scraper -> Deserializer, with monitoring at each stage.
"""
# Load and validate the module configuration from the builder
module_config = builder.get_current_module_config()
rss_config = module_config.get("rss_config", {})
try:
validated_config = RSSSourcePipeSchema(**rss_config)
except ValidationError as e:
error_messages = '; '.join([f"{error['loc'][0]}: {error['msg']}" for error in e.errors()])
log_error_message = f"Invalid RSS source configuration: {error_messages}"
logger.error(log_error_message)
raise
enable_monitor = validated_config.enable_monitor
rss_source_config = {
"feed_input": validated_config.feed_input,
"run_indefinitely": validated_config.run_indefinitely,
"batch_size": validated_config.batch_size,
"enable_cache": validated_config.enable_cache,
"cache_dir": validated_config.cache_dir,
"cooldown_interval_sec": validated_config.cooldown_interval_sec,
"request_timeout_sec": validated_config.request_timeout_sec,
"interval_sec": validated_config.interval_sec,
"stop_after_rec": validated_config.stop_after_rec,
"strip_markup": validated_config.strip_markup,
}
rss_source_loader = RSSSourceLoaderFactory.get_instance("rss_source", {"rss_source": rss_source_config})
web_scraper_loader = WebScraperLoaderFactory.get_instance(
"web_scraper", {
"web_scraper_config": validated_config.web_scraper_config,
})
transform_config = {
"schema_transform_config": {
"summary": {
"dtype": "str", "op_type": "select"
},
"title": {
"dtype": "str", "op_type": "select"
},
"content": {
"from": "page_content", "dtype": "str", "op_type": "rename"
},
"source": {
"from": "link", "dtype": "str", "op_type": "rename"
}
}
}
schema_transform_loader = SchemaTransformLoaderFactory.get_instance("schema_transform", transform_config)
deserialize_loader = DeserializeLoaderFactory.get_instance(
"deserialize", {
"batch_size": validated_config.output_batch_size, "message_type": "ControlMessage"
})
vdb_resource_tagging_loader = VDBResourceTaggingLoaderFactory.get_instance(
"vdb_resource_tagging", {"vdb_resource_name": validated_config.vdb_resource_name})
monitor_0_loader = MonitorLoaderFactory.get_instance(
"monitor_m1", {
"description": "RSSSourcePipe RSS Source", "silence_monitors": not enable_monitor
})
monitor_1_loader = MonitorLoaderFactory.get_instance(
"monitor_0", {
"description": "RSSSourcePipe Web Scraper", "silence_monitors": not enable_monitor
})
monitor_2_loader = MonitorLoaderFactory.get_instance(
"monitor_1", {
"description": "RSSSourcePipe Transform", "silence_monitors": not enable_monitor
})
monitor_3_loader = MonitorLoaderFactory.get_instance(
"monitor_2", {
"description": "RSSSourcePipe Deserialize", "silence_monitors": not enable_monitor
})
# Load modules
rss_source_module = rss_source_loader.load(builder=builder)
monitor_0_loader = monitor_0_loader.load(builder=builder)
web_scraper_module = web_scraper_loader.load(builder=builder)
monitor_0_module = monitor_1_loader.load(builder=builder)
transform_module = schema_transform_loader.load(builder=builder)
monitor_1_module = monitor_2_loader.load(builder=builder)
deserialize_module = deserialize_loader.load(builder=builder)
vdb_resource_tagging_module = vdb_resource_tagging_loader.load(builder=builder)
monitor_2_module = monitor_3_loader.load(builder=builder)
# Connect the modules: RSS source -> Web scraper -> Schema transform
builder.make_edge(rss_source_module.output_port("output"), monitor_0_loader.input_port("input"))
builder.make_edge(monitor_0_loader.output_port("output"), web_scraper_module.input_port("input"))
builder.make_edge(web_scraper_module.output_port("output"), monitor_0_module.input_port("input"))
builder.make_edge(monitor_0_module.output_port("output"), transform_module.input_port("input"))
builder.make_edge(transform_module.output_port("output"), monitor_1_module.input_port("input"))
builder.make_edge(monitor_1_module.output_port("output"), deserialize_module.input_port("input"))
builder.make_edge(deserialize_module.output_port("output"), vdb_resource_tagging_module.input_port("input"))
builder.make_edge(vdb_resource_tagging_module.output_port("output"), monitor_2_module.input_port("input"))
# Register the final output of the transformation module
builder.register_module_output("output", monitor_2_module.output_port("output"))