Skip to content

Commit

Permalink
fix(core): handle exceptions occuring on failed index write (#1428)
Browse files Browse the repository at this point in the history
  • Loading branch information
alambare authored Dec 18, 2024
1 parent d4fffce commit 076a0f8
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 9 deletions.
23 changes: 14 additions & 9 deletions eodag/api/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from pkg_resources import resource_filename
from whoosh import analysis, fields
from whoosh.fields import Schema
from whoosh.index import create_in, exists_in, open_dir
from whoosh.index import exists_in, open_dir
from whoosh.qparser import QueryParser

from eodag.api.product.metadata_mapping import (
Expand Down Expand Up @@ -61,7 +61,7 @@
from eodag.plugins.search.qssearch import PostJsonSearch
from eodag.types import model_fields_to_annotated
from eodag.types.queryables import CommonQueryables, QueryablesDict
from eodag.types.whoosh import EODAGQueryParser
from eodag.types.whoosh import EODAGQueryParser, create_in
from eodag.utils import (
DEFAULT_DOWNLOAD_TIMEOUT,
DEFAULT_DOWNLOAD_WAIT,
Expand Down Expand Up @@ -306,13 +306,18 @@ def build_index(self) -> None:
product_type, **{"md5": self.product_types_config_md5}
)
# add to index
ix_writer.add_document(
**{
k: v
for k, v in versioned_product_type.items()
if k in product_types_schema.names()
}
)
try:
ix_writer.add_document(
**{
k: v
for k, v in versioned_product_type.items()
if k in product_types_schema.names()
}
)
except TypeError as e:
logger.error(
f"Cannot write product type {product_type['ID']} into index. e={e} product_type={product_type}"
)
ix_writer.commit()

def set_preferred_provider(self, provider: str) -> None:
Expand Down
126 changes: 126 additions & 0 deletions eodag/types/whoosh.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
from typing import List

from whoosh.fields import Schema
from whoosh.index import _DEF_INDEX_NAME, FileIndex
from whoosh.matching import NullMatcher
from whoosh.qparser import OrGroup, QueryParser, plugins
from whoosh.query.positional import Phrase
from whoosh.query.qcore import QueryError
from whoosh.util.text import utf8encode
from whoosh.writing import SegmentWriter


class RobustPhrase(Phrase):
Expand Down Expand Up @@ -77,3 +80,126 @@ def __init__(
phraseclass=RobustPhrase,
group=OrGroup,
)


class CleanSegmentWriter(SegmentWriter):
"""Override to clean up writer for failed document add when exceptions were absorbed
cf: https://github.com/whoosh-community/whoosh/pull/543
"""

def add_document(self, **fields):
"""Add document"""
self._check_state()
perdocwriter = self.perdocwriter
schema = self.schema
docnum = self.docnum
add_post = self.pool.add

docboost = self._doc_boost(fields)
fieldnames = sorted(
[name for name in fields.keys() if not name.startswith("_")]
)
self._check_fields(schema, fieldnames)

perdocwriter.start_doc(docnum)

try:
for fieldname in fieldnames:
value = fields.get(fieldname)
if value is None:
continue
field = schema[fieldname]

length = 0
if field.indexed:
# TODO: Method for adding progressive field values, ie
# setting start_pos/start_char?
fieldboost = self._field_boost(fields, fieldname, docboost)
# Ask the field to return a list of (text, weight, vbytes)
# tuples
items = field.index(value)
# Only store the length if the field is marked scorable
scorable = field.scorable
# Add the terms to the pool
for tbytes, freq, weight, vbytes in items:
weight *= fieldboost
if scorable:
length += freq
add_post((fieldname, tbytes, docnum, weight, vbytes))

if field.separate_spelling():
spellfield = field.spelling_fieldname(fieldname)
for word in field.spellable_words(value):
word = utf8encode(word)[0]
add_post((spellfield, word, 0, 1, vbytes))

vformat = field.vector
if vformat:
analyzer = field.analyzer
# Call the format's word_values method to get posting values
vitems = vformat.word_values(value, analyzer, mode="index")
# Remove unused frequency field from the tuple
vitems = sorted(
(text, weight, vbytes) for text, _, weight, vbytes in vitems
)
perdocwriter.add_vector_items(fieldname, field, vitems)

# Allow a custom value for stored field/column
customval = fields.get("_stored_%s" % fieldname, value)

# Add the stored value and length for this field to the per-
# document writer
sv = customval if field.stored else None
perdocwriter.add_field(fieldname, field, sv, length)

column = field.column_type
if column and customval is not None:
cv = field.to_column_value(customval)
perdocwriter.add_column_value(fieldname, column, cv)
except Exception as ex:
# cancel doc
perdocwriter._doccount -= 1
perdocwriter._indoc = False
raise ex

perdocwriter.finish_doc()
self._added = True
self.docnum += 1


class CleanFileIndex(FileIndex):
"""Override to call CleanSegmentWriter"""

def writer(self, procs=1, **kwargs):
"""file index writer"""
if procs > 1:
from whoosh.multiproc import MpWriter

return MpWriter(self, procs=procs, **kwargs)
else:
return CleanSegmentWriter(self, **kwargs)


def create_in(dirname, schema, indexname=None):
"""
Override to call the CleanFileIndex.
Convenience function to create an index in a directory. Takes care of
creating a FileStorage object for you.
:param dirname: the path string of the directory in which to create the
index.
:param schema: a :class:`whoosh.fields.Schema` object describing the
index's fields.
:param indexname: the name of the index to create; you only need to specify
this if you are creating multiple indexes within the same storage
object.
:returns: :class:`Index`
"""

from whoosh.filedb.filestore import FileStorage

if not indexname:
indexname = _DEF_INDEX_NAME
storage = FileStorage(dirname)
return CleanFileIndex.create(storage, schema, indexname)

0 comments on commit 076a0f8

Please sign in to comment.