Skip to content
This repository has been archived by the owner on Sep 7, 2023. It is now read-only.

Plugin architecture improvement and ideas. #1225

Open
dalf opened this issue Mar 4, 2018 · 1 comment
Open

Plugin architecture improvement and ideas. #1225

dalf opened this issue Mar 4, 2018 · 1 comment
Labels

Comments

@dalf
Copy link
Contributor

dalf commented Mar 4, 2018

Right now, all plugins are synchronized (see SearchWithPlugins) :

  • first the pre_search is called
  • if it's pre_search allows it, the normal search is done (without plugin)
  • then post_search is called
  • then on_result is called for each result

The on_result can takes time, and may not modify the results (the current implementation of the HTTPS everywhere plugin for example).

There are two type of plugins for now.

A first type modifies the results :

  • OA DOI rewrite (modify the results)
  • Tracker remover (modify the results)
  • HTTPS rewrite (modify the results)
  • self IP (modify the answers, not the results)

A second type modifies the UX/UI using javascript / css :

  • vim
  • infinite scroll
  • search on category select
  • open results in a new tab (actually dead code?)

We can imagine a third type: slow but doesn't modify the results or the UI/UX. The plugin is just notify of the results.

Improve the response time

The on_result functions inside the plugins are after all results have been collected by the engines. See the call in search.py, at this line :

plugins.call(self.ordered_plugin_list, 'on_result', self.request, self, result)

One way to improve the result time is to call "on_result" as soon one engine gives some results. To do that, ResultContainer must call on_result: no global wait at the end of the process.

class SearchWithPlugins(Search):
    ...
    def processResult(result):
        plugins.call(self.ordered_plugin_list, 'on_result', self.request, self, result)
    ...

class ResultContainer(object):
  def __init__(self, process_result):
    ...
    self.process_result = process_result

  def extend(self, engine_name, results):
    ...
    # if there is no duplicate found, append result
    else:
            result['positions'] = [position]
            self.process_result (result)
            with RLock():
                self._merged_results.append(result)
    '''
       should be thread safe?
       is not, the same URL may be processed multiple time
       (should not be a problem)
    '''
    def processUrl(url):
        if url in self.processedUrl
            # self.ordered_plugin_list and self.request, self.search must be initialized.
            self.processedUrl['url'] = plugins.call(self.ordered_plugin_list, 'on_result', self.request, self.search, url)
        return self.processedUrl['url']

I'm not very happy with this solution because ResultContainer gets references from everything.
One solution : define processUrl(url) inside search.py, and ResultContainer gets a reference to that function.

Implement a third type of plugin: asynchronous notification

Use case: plugins which won't modify the results but which are slow. Example: store the results, make some statistics on the results, etc...
Note: most probably these use cases break the stateless status of searx.

as @asciimoo has suggested in #1224, it can be done using the current plugin architecture :

import threading

name = 'Save results'
description = 'Save results'
default_on = True


def save_results(results):
    # TBD..
    print(results)


def post_search(request, search):
    results = search.result_container.get_ordered_results()
    threading.Thread(target=save_results, args=(results,)).start()
    return True

The drawbacks :

  • it creates one thread per plugin and per request (decrease the global response time)
  • the plugin is not sure to have the fully processed URL (some other plugins may be modified the URL after that call)

If this is a problem, a solution: we can imagine one Queue, and one thread processing the results. The thread gets result from the Queue, and calls the on_notify_result function of the plugins (on_notify_result can't modify the results, but can be slow).

The drawback of the solution: if one plugin hangs indefinitely, the asynchronize processing stops.
The solution of the solution: one Queue/thread per plugin.

To sum up

search.py:

class SearchWithPlugins(Search):

    def __init__(self, search_query):
        # init vars
        super(Search, self).__init__()
        self.search_query = search_query
        self.result_container = ResultContainer(self.processResult)   

    def processResult(result):
        plugins.call(self.ordered_plugin_list, 'on_result', self.request, self, result)

    def search(self):
        if plugins.call(self.ordered_plugin_list, 'pre_search', self.request, self):
            super(SearchWithPlugins, self).search()

        plugins.call(self.ordered_plugin_list, 'post_search', self.request, self)

        # ResultContainer will call processResult which will call on_result for each plugins.
        results = self.result_container.get_ordered_results()

        for result in results:
            plugins.async_on_result(self.ordered_plugin_list, self.request, self, result)

        return self.result_container

results.py:

class ResultContainer(object):
  def __init__(self, process_result):
    ...
    self.process_result = process_result

  def extend(self, engine_name, results):
    ...
       # if there is no duplicate found, append result
       else:
            result['positions'] = [position]
            self.process_result (result)
            with RLock():
                self._merged_results.append(result)

plugins.py:

from multiprocessing import Queue
from threading import Thread

def worker(queue):
    while 1:
            f, args, kwargs = queue.get()
            try:
                f(*args, **kwargs)
            except e:
                # todo : log
                print(e)

class QueueThread():
    
    def __init__(self, name):
        self.queue = Queue(100)
        self.thread  = Thread(name='plugin-' + name, target=worker, args=(self.queue,))
        self.thread.daemon = True
        self.thread.start()
        
    def call(self, f, *args, **kwargs):
        self.queue.put((f, args, kwargs))

class PluginStore():
    
    def __init__(self):
        self.plugins = []
        self.pools = {}
    
    def start_async(self):
        for plugin in self.plugins:
            if hasattr(plugin, 'on_async_result'):
                self.pools[plugin] = QueueThread(plugin.name)
                
    def async_on_result(self, ordered_plugin_list, *args, **kwargs):
        for plugin in ordered_plugin_list:
            if plugin in self.pools:
                f = getattr(plugin, 'on_async_result')
                self.pools[f].call(f, request, *args, **kwargs)
@dalf dalf added the core label Mar 4, 2018
@return42
Copy link
Contributor

FWIW: "plugins" should not be a part of the searx source code, see PR #1938 decoupling plugin development.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

2 participants