Compare commits

...

3 Commits

Author SHA1 Message Date
UncleCode
f54db649c5 chore(deps): add httpx extras 2025-06-08 10:06:13 +02:00
UncleCode
451b0d6c9a Set memory_wait_timeout default to 10 minutes (#1193) 2025-06-08 13:53:09 +08:00
UncleCode
8b215e17af Add use_stemming option to BM25ContentFilter (#1192) 2025-06-08 12:57:37 +08:00
10 changed files with 70 additions and 24 deletions

View File

@@ -126,6 +126,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
check_interval: float = 1.0, check_interval: float = 1.0,
max_session_permit: int = 20, max_session_permit: int = 20,
fairness_timeout: float = 600.0, # 10 minutes before prioritizing long-waiting URLs fairness_timeout: float = 600.0, # 10 minutes before prioritizing long-waiting URLs
memory_wait_timeout: Optional[float] = 600.0,
rate_limiter: Optional[RateLimiter] = None, rate_limiter: Optional[RateLimiter] = None,
monitor: Optional[CrawlerMonitor] = None, monitor: Optional[CrawlerMonitor] = None,
): ):
@@ -136,10 +137,12 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
self.check_interval = check_interval self.check_interval = check_interval
self.max_session_permit = max_session_permit self.max_session_permit = max_session_permit
self.fairness_timeout = fairness_timeout self.fairness_timeout = fairness_timeout
self.memory_wait_timeout = memory_wait_timeout
self.result_queue = asyncio.Queue() self.result_queue = asyncio.Queue()
self.task_queue = asyncio.PriorityQueue() # Priority queue for better management self.task_queue = asyncio.PriorityQueue() # Priority queue for better management
self.memory_pressure_mode = False # Flag to indicate when we're in memory pressure mode self.memory_pressure_mode = False # Flag to indicate when we're in memory pressure mode
self.current_memory_percent = 0.0 # Track current memory usage self.current_memory_percent = 0.0 # Track current memory usage
self._high_memory_start_time: Optional[float] = None
async def _memory_monitor_task(self): async def _memory_monitor_task(self):
"""Background task to continuously monitor memory usage and update state""" """Background task to continuously monitor memory usage and update state"""
@@ -147,16 +150,33 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
self.current_memory_percent = psutil.virtual_memory().percent self.current_memory_percent = psutil.virtual_memory().percent
# Enter memory pressure mode if we cross the threshold # Enter memory pressure mode if we cross the threshold
if not self.memory_pressure_mode and self.current_memory_percent >= self.memory_threshold_percent: if self.current_memory_percent >= self.memory_threshold_percent:
self.memory_pressure_mode = True if not self.memory_pressure_mode:
if self.monitor: self.memory_pressure_mode = True
self.monitor.update_memory_status("PRESSURE") self._high_memory_start_time = time.time()
if self.monitor:
self.monitor.update_memory_status("PRESSURE")
else:
if self._high_memory_start_time is None:
self._high_memory_start_time = time.time()
if (
self.memory_wait_timeout is not None
and self._high_memory_start_time is not None
and time.time() - self._high_memory_start_time >= self.memory_wait_timeout
):
raise MemoryError(
"Memory usage exceeded threshold for"
f" {self.memory_wait_timeout} seconds"
)
# Exit memory pressure mode if we go below recovery threshold # Exit memory pressure mode if we go below recovery threshold
elif self.memory_pressure_mode and self.current_memory_percent <= self.recovery_threshold_percent: elif self.memory_pressure_mode and self.current_memory_percent <= self.recovery_threshold_percent:
self.memory_pressure_mode = False self.memory_pressure_mode = False
self._high_memory_start_time = None
if self.monitor: if self.monitor:
self.monitor.update_memory_status("NORMAL") self.monitor.update_memory_status("NORMAL")
elif self.current_memory_percent < self.memory_threshold_percent:
self._high_memory_start_time = None
# In critical mode, we might need to take more drastic action # In critical mode, we might need to take more drastic action
if self.current_memory_percent >= self.critical_threshold_percent: if self.current_memory_percent >= self.critical_threshold_percent:
@@ -321,6 +341,13 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
# Process until both queues are empty # Process until both queues are empty
while not self.task_queue.empty() or active_tasks: while not self.task_queue.empty() or active_tasks:
if memory_monitor.done():
exc = memory_monitor.exception()
if exc:
for t in active_tasks:
t.cancel()
raise exc
# If memory pressure is low, start new tasks # If memory pressure is low, start new tasks
if not self.memory_pressure_mode and len(active_tasks) < self.max_session_permit: if not self.memory_pressure_mode and len(active_tasks) < self.max_session_permit:
try: try:
@@ -467,6 +494,12 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
total_urls = len(urls) total_urls = len(urls)
while completed_count < total_urls: while completed_count < total_urls:
if memory_monitor.done():
exc = memory_monitor.exception()
if exc:
for t in active_tasks:
t.cancel()
raise exc
# If memory pressure is low, start new tasks # If memory pressure is low, start new tasks
if not self.memory_pressure_mode and len(active_tasks) < self.max_session_permit: if not self.memory_pressure_mode and len(active_tasks) < self.max_session_permit:
try: try:

View File

@@ -1073,7 +1073,8 @@ def crawl_cmd(url: str, browser_config: str, crawler_config: str, filter_config:
crawler_cfg.markdown_generator = DefaultMarkdownGenerator( crawler_cfg.markdown_generator = DefaultMarkdownGenerator(
content_filter = BM25ContentFilter( content_filter = BM25ContentFilter(
user_query=filter_conf.get("query"), user_query=filter_conf.get("query"),
bm25_threshold=filter_conf.get("threshold", 1.0) bm25_threshold=filter_conf.get("threshold", 1.0),
use_stemming=filter_conf.get("use_stemming", True),
) )
) )
elif filter_conf["type"] == "pruning": elif filter_conf["type"] == "pruning":

View File

@@ -405,6 +405,7 @@ class BM25ContentFilter(RelevantContentFilter):
user_query: str = None, user_query: str = None,
bm25_threshold: float = 1.0, bm25_threshold: float = 1.0,
language: str = "english", language: str = "english",
use_stemming: bool = True,
): ):
""" """
Initializes the BM25ContentFilter class, if not provided, falls back to page metadata. Initializes the BM25ContentFilter class, if not provided, falls back to page metadata.
@@ -416,9 +417,11 @@ class BM25ContentFilter(RelevantContentFilter):
user_query (str): User query for filtering (optional). user_query (str): User query for filtering (optional).
bm25_threshold (float): BM25 threshold for filtering (default: 1.0). bm25_threshold (float): BM25 threshold for filtering (default: 1.0).
language (str): Language for stemming (default: 'english'). language (str): Language for stemming (default: 'english').
use_stemming (bool): Whether to apply stemming (default: True).
""" """
super().__init__(user_query=user_query) super().__init__(user_query=user_query)
self.bm25_threshold = bm25_threshold self.bm25_threshold = bm25_threshold
self.use_stemming = use_stemming
self.priority_tags = { self.priority_tags = {
"h1": 5.0, "h1": 5.0,
"h2": 4.0, "h2": 4.0,
@@ -432,7 +435,7 @@ class BM25ContentFilter(RelevantContentFilter):
"pre": 1.5, "pre": 1.5,
"th": 1.5, # Table headers "th": 1.5, # Table headers
} }
self.stemmer = stemmer(language) self.stemmer = stemmer(language) if use_stemming else None
def filter_content(self, html: str, min_word_threshold: int = None) -> List[str]: def filter_content(self, html: str, min_word_threshold: int = None) -> List[str]:
""" """
@@ -479,13 +482,19 @@ class BM25ContentFilter(RelevantContentFilter):
# for _, chunk, _, _ in candidates] # for _, chunk, _, _ in candidates]
# tokenized_query = [ps.stem(word) for word in query.lower().split()] # tokenized_query = [ps.stem(word) for word in query.lower().split()]
tokenized_corpus = [ if self.use_stemming:
[self.stemmer.stemWord(word) for word in chunk.lower().split()] tokenized_corpus = [
for _, chunk, _, _ in candidates [self.stemmer.stemWord(word) for word in chunk.lower().split()]
] for _, chunk, _, _ in candidates
tokenized_query = [ ]
self.stemmer.stemWord(word) for word in query.lower().split() tokenized_query = [
] self.stemmer.stemWord(word) for word in query.lower().split()
]
else:
tokenized_corpus = [
chunk.lower().split() for _, chunk, _, _ in candidates
]
tokenized_query = query.lower().split()
# tokenized_corpus = [[self.stemmer.stemWord(word) for word in tokenize_text(chunk.lower())] # tokenized_corpus = [[self.stemmer.stemWord(word) for word in tokenize_text(chunk.lower())]
# for _, chunk, _, _ in candidates] # for _, chunk, _, _ in candidates]

View File

@@ -6705,7 +6705,7 @@ dispatcher = MemoryAdaptiveDispatcher(
3.**`max_session_permit`** (`int`, default: `10`) 3.**`max_session_permit`** (`int`, default: `10`)
The maximum number of concurrent crawling tasks allowed. This ensures resource limits are respected while maintaining concurrency. The maximum number of concurrent crawling tasks allowed. This ensures resource limits are respected while maintaining concurrency.
4.**`memory_wait_timeout`** (`float`, default: `300.0`) 4.**`memory_wait_timeout`** (`float`, default: `600.0`)
Optional timeout (in seconds). If memory usage exceeds `memory_threshold_percent` for longer than this duration, a `MemoryError` is raised. Optional timeout (in seconds). If memory usage exceeds `memory_threshold_percent` for longer than this duration, a `MemoryError` is raised.
5.**`rate_limiter`** (`RateLimiter`, default: `None`) 5.**`rate_limiter`** (`RateLimiter`, default: `None`)

View File

@@ -14,3 +14,4 @@ anyio==4.9.0
PyJWT==2.10.1 PyJWT==2.10.1
mcp>=1.6.0 mcp>=1.6.0
websockets>=15.0.1 websockets>=15.0.1
httpx[http2]>=0.27.2

View File

@@ -172,7 +172,7 @@ dispatcher = MemoryAdaptiveDispatcher(
3.**`max_session_permit`** (`int`, default: `10`) 3.**`max_session_permit`** (`int`, default: `10`)
The maximum number of concurrent crawling tasks allowed. This ensures resource limits are respected while maintaining concurrency. The maximum number of concurrent crawling tasks allowed. This ensures resource limits are respected while maintaining concurrency.
4.**`memory_wait_timeout`** (`float`, default: `300.0`) 4.**`memory_wait_timeout`** (`float`, default: `600.0`)
Optional timeout (in seconds). If memory usage exceeds `memory_threshold_percent` for longer than this duration, a `MemoryError` is raised. Optional timeout (in seconds). If memory usage exceeds `memory_threshold_percent` for longer than this duration, a `MemoryError` is raised.
5.**`rate_limiter`** (`RateLimiter`, default: `None`) 5.**`rate_limiter`** (`RateLimiter`, default: `None`)

View File

@@ -6732,7 +6732,7 @@ dispatcher = MemoryAdaptiveDispatcher(
3.**`max_session_permit`** (`int`, default: `10`) 3.**`max_session_permit`** (`int`, default: `10`)
The maximum number of concurrent crawling tasks allowed. This ensures resource limits are respected while maintaining concurrency. The maximum number of concurrent crawling tasks allowed. This ensures resource limits are respected while maintaining concurrency.
4.**`memory_wait_timeout`** (`float`, default: `300.0`) 4.**`memory_wait_timeout`** (`float`, default: `600.0`)
Optional timeout (in seconds). If memory usage exceeds `memory_threshold_percent` for longer than this duration, a `MemoryError` is raised. Optional timeout (in seconds). If memory usage exceeds `memory_threshold_percent` for longer than this duration, a `MemoryError` is raised.
5.**`rate_limiter`** (`RateLimiter`, default: `None`) 5.**`rate_limiter`** (`RateLimiter`, default: `None`)

View File

@@ -200,7 +200,7 @@ config = CrawlerRunConfig(markdown_generator=md_generator)
- **`user_query`**: The term you want to focus on. BM25 tries to keep only content blocks relevant to that query. - **`user_query`**: The term you want to focus on. BM25 tries to keep only content blocks relevant to that query.
- **`bm25_threshold`**: Raise it to keep fewer blocks; lower it to keep more. - **`bm25_threshold`**: Raise it to keep fewer blocks; lower it to keep more.
- **`use_stemming`**: If `True`, variations of words match (e.g., “learn,” “learning,” “learnt”). - **`use_stemming`** *(default `True`)*: If enabled, variations of words match (e.g., “learn,” “learning,” “learnt”).
**No query provided?** BM25 tries to glean a context from page metadata, or you can simply treat it as a scorched-earth approach that discards text with low generic score. Realistically, you want to supply a query for best results. **No query provided?** BM25 tries to glean a context from page metadata, or you can simply treat it as a scorched-earth approach that discards text with low generic score. Realistically, you want to supply a query for best results.

View File

@@ -36,6 +36,7 @@ dependencies = [
"rich>=13.9.4", "rich>=13.9.4",
"cssselect>=1.2.0", "cssselect>=1.2.0",
"httpx>=0.27.2", "httpx>=0.27.2",
"httpx[http2]>=0.27.2",
"fake-useragent>=2.0.3", "fake-useragent>=2.0.3",
"click>=8.1.7", "click>=8.1.7",
"pyperclip>=1.8.2", "pyperclip>=1.8.2",

View File

@@ -23,3 +23,4 @@ rich>=13.9.4
cssselect>=1.2.0 cssselect>=1.2.0
chardet>=5.2.0 chardet>=5.2.0
brotli>=1.1.0 brotli>=1.1.0
httpx[http2]>=0.27.2