Skip to content

How to make a data source

Dale Wahl edited this page Mar 27, 2024 · 31 revisions

Data sources

Overview

4CAT is a modular tool. Its modules come in two varietes: data sources and processors. This article covers the former.

Data sources are a collection of workers, processors and interface elements that extend 4CAT to allow scraping, processing and/or retrieving data for a given platform (such as Instagram, Reddit or Telegram). 4CAT has APIs that can do most of the scaffolding around this for you so data source can be quite lightweight and mostly focus on retrieving the actual data while 4CAT's back-end takes care of the scheduling, determining where the output should go, et cetera.

Data sources are defined as an arbitrarily-named folder in the datasources folder in the 4CAT root. It is recommended to use the datasource ID (see below) as the data source folder name. However, since Python files included in the folder will be included as modules by 4CAT, folder names should be allowed as module names. Concretely this means (among other things) that data source folder names cannot start with a number (hence the fourchan data source).

WARNING: Data sources in multiple ways can define arbitrary code that will be run by either the 4CAT server or client-side browsers. Be careful when running a data source supplied by someone else.

A data source will at least contain the following:

  • An __init__.py containing data source metadata and initialisation code
  • A search worker, which can collect data according to provided parameters and format it as a CSV or NDJSON file that 4CAT can work with.

It may contain additional components:

  • Any processors that are specific to datasets created by this data source
  • Views for the web app that allow more advanced behaviour of the web tool interface
  • Database or Sphinx index definitions

The instructions below describe how to format and create these components (work in progress!)

Initialisation code

The data source root should contain a file __init__.py which in turn defines the following:

DATASOURCE = "datasource-identifier"

This constant defines the data source ID. This is used by 4CAT internally to figure out what data source a dataset belongs to and so on.

def init_datasource(database, logger, queue, name):
    pass

This function is called when 4CAT starts, if the data source is enabled, and should set up anything the data source needs to function (e.g. queueing any recurring workers). A default implementation of this function can be used instead (and when defining your own, it is advised to still call it as part of your own implementation):

from common.lib.helpers import init_datasource

Search workers

The search worker is run when a dataset is created by someone, and collects the data for that dataset (i.e. the posts from the platform matching the given dataset parameters), writing it to the dataset result file. It is contained in an arbitrarily named Python file in the data source root (we recommend search_[datasource].py). The file should define a class that extends backend.abstract.search.Search. This class should define the following attributes and methods:

Attributes

  • str type: Identifier used by the scheduler to know what code to run for jobs for this data source. Should be [datasource-id]-search, datasource-id being equal to the ID defined in __init__.py.
  • str extension: Optional. The extension (format) of the output data file. If omitted, csv is assumed; the other format currently supported is ndjson. Using any other extension will result in a NotImplementedError being raised.
  • int max_workers: Optional, default 1. The amount of search workers that may run in parallel for this data source. Usually, you want to keep this at 1, unless you are confident your server can handle multiple parallel workers of this type.
  • dict options: Optional, default empty. Defines parameters that can be configured when querying this data source. These can be defined via a dictionary here, or via the get_options() method: see Input fields for data sources and processors for more information.

Methods

  • validate_query(dict query, Request request, User user) -> dict: Called statically by the web tool whenever a new dataset is created by someone. query contains the form fields as set in the web interface; this method should return a sanitised version of that query, containing only fields and values relevant to this search worker. On invalid input, a common.lib.exceptions.QueryParametersException should be raised which will prompt the one creating the dataset to change their input and resubmit. You can also raise a common.lib.exceptions.QueryNeedsFurtherInputException(config) where config is a definition of further form fields that need to be completed, which will be shown in the interface while asking the user to submit again. The form fields can be defined in the same format as the 'normal' search parameter options (see get_options()).
  • get_items(self, dict query) -> generator: Yields items matching the query parameters. These are the 'search results' that will comprise the dataset.
  • import_from_file(self, str path) -> generator: Similar to get_items(), but takes a file path as parameter and yields items from that path as items to save in the dataset. Support for this is currently limited but it will serve as the basis for a generic 'import' feature for 4CAT in the future. A generic version of this method is part of the abstract class but it will usually require overriding to fit the nuances of the data source.
  • after_search(self, list items) -> list: Optional. If defined, this will be called after all posts have been retrieved with the methods listed above and, if appropriate, any sampling or such. This method should yield items, like get_items(). You can use it to e.g. perform additional item filtering or processing should your data source require it.
  • get_options(cls, parent_dataset, user) -> dict: Optional. If defined, this will be called to determine the options displayed in the 4CAT web interface when querying the data source, analogous to the options class property (this method overrides that property, if present). See Input fields for data sources and processors for more information.

You can also descend from SearchWithScope, which has built-in support for a couple of more advanced modes of querying data. This is particularly useful if data is available locally, and doesn't require round-trips to some remote server. Specifically, SearchWithScope has functionality to do "full-thread" querying (i.e. all posts in a thread that contains a particular amount of matching posts). To this end, it requires definition of the following additional methods:

  • get_search_mode(self, dict query) -> str: Return simple or complex. If simple, get_items_simple() is used to retrieve posts. Else, get_items_complex() is used. This can be used to define 'fast lane' search methods for simpler queries if shortcuts can be taken. Of course, you can also always make it return either of the options if that is not relevant to your data source.
  • get_items_simple(self, dict query) -> generator: Get posts via the 'simple' path.
  • get_items_complex(self, dict query) -> generator: Get posts via the 'complex' path.
  • fetch_posts(self, list post_ids) -> list: Should be used by get_items_*() to retrieve the actual item data. Takes a list of post_ids (as determined by the get_items_* method) and retrieves data for those post IDs, e.g. via an API or a local database.
  • fetch_threads(self, list thread_ids) -> list: Retrieves all posts for the given thread_ids.
  • get_thread_lengths(self, list thread_ids, int min_length) -> dict: Should return a dictionary with thread IDs as keys and amount of posts per thread as values, for all threads with at least min_length posts.

Additionally, because search workers are (after a fashion) architecturally equivalent to processors, they have access to all the attributes a processor has access to, e.g. dataset. See the page for processors for more information on these. In particular, the map_item method can be useful to define for data sources that return complex (e.g. multi-dimensional) data.

Web Tool Interface

People can use 4CAT to create new datasets with your data source. To this end, the data source should define an interface through which dataset parameters may be set via the options property or get_options() method (see above). Data sources can additionally contain a folder webtool with the following files:

  • views.py: Optional. This can define additional views for the 4CAT Flask app. Any function defined in this file will be available as a view via /api/datasource-call/[datasource-id]/[function name]/. Functions should have the signature function(request, user, **kwargs): request and user are objects supplied by Flask, **kwargs is all HTTP GET parameters as keyword arguments. The function should return an object (remember that in Python everything is an object), which will be serialised as JSON as the view output.

Walkthru Datasource Example

This example walks through step by step creation of a new datasource. You can try a working version and see the actual files for this example in this 4CAT branch.

Let’s create a simple datasource that searches Wikipedia articles and collects the results. According to the Wikimedia API documentation, we need to create an account and generate an API key (for personal use is fine).

File structure

  1. We will create a new folder in 4CAT’s datasources folder called “wikipedia”.
  2. Inside this folder, we create a file called __init__.py and one called search_wikipedia.py.
datasources/
    wikipedia/
        __init__.py
        search_wikipedia.py

Step 1: The __init__.py file

A simple __init__.py file is all we need for 4CAT to recognize our new datasource.

"""
Initialize Wikipedia data source
"""

# An init_datasource function is expected to be available to initialize this
# data source. A default function that does this is available from the
# backend helpers library.
from common.lib.helpers import init_datasource

# Internal identifier for this data source
DATASOURCE = "wikipedia" # this will be used internally and must match the Search class `type`
NAME = "Wikipedia" # this will be displayed via the UI

Step 2: A new Search class in the file search_wikipedia.py

This file will contain a new class from our generic Search class. There are some basic information needed:

  • type acts as the job identifier in our database; Search workers must use the same DATASOURCE as the __init__.py and end in -search otherwise they will not be identified correctly
  • title will display in the frontend UI for users to distinguish this datasource
  • extension defaults to “csv”, but this API returns JSON objects so we will store them as “ndjson” (which stores one JSON object per line in a file)
  • is_local is either True or False and denotes whether the data is already store locally (e.g., in a database); this is not as it is retrieved from an API
  • is_static is either True or False and denotes whether the data is continuously updated (only applies to local datasources)
  • references is not required, but, if provided, can give context to other users as to how the data is collected and what it represents
"""
Wikipedia article search
"""
from backend.lib.search import Search

class SearchWikipedia(Search):
    """
    Get Wikipedia articles via Wikimedia's API
    """
    type = "wikipedia-search"  # job ID
    title = "Wikipedia Search"
    extension = "ndjson"
    is_local = False    # Whether this datasource is locally scraped
    is_static = False   # Whether this datasource is still updated

    references = [
        "[Wikimedia API](https://api.wikimedia.org/wiki/Searching_for_Wikipedia_articles_using_Python)",
    ]

Step 3: Add methods to SearchWikipedia class

Next we will add the required methods to our class. Only three methods are required: get_options, validate_query, and get_items, but a fourth method map_item is needed since we are handling JSON data.

Method 1: get_options

First, get_options defines the parameters a user inputs in 4CAT’s frontend UI to create a new dataset. We will simply ask for the API key and a query, but there are many more possible option types which you can read about here.

Add this import to the top of your search worker file:

from common.lib.user_input import UserInput

And this method to the SearchWikipedia class:

    @classmethod
    def get_options(cls, parent_dataset=None, user=None):
        """
        Options for the user to provide in order to run the search

        :param parent_dataset:  Should always be None
        :param user:  User to provide options for
        :return dict:  Data source options
        """

        options = {
            "intro-1": {
                "type": UserInput.OPTION_INFO,
                "help": "Search Wikipedia articles and retreive their metadata. You will need a Wikimedia API key which can be obtained by creating a user account and request a personal API key. They provide [instructions here.](https://api.wikimedia.org/wiki/Getting_started_with_Wikimedia_APIs)"
            },
            "api_key": {
                "type": UserInput.OPTION_TEXT,
                "sensitive": True, # Ensures 4CAT knows to keep this value secret and not stored in the 4CAT database
                "cache": True, # Allows the value to be cached by the user's browser to use again
                "help": "API Key" # “help” text is displayed for the user to view in the UI
            },
            "query": {
                "type": UserInput.OPTION_TEXT,
                "help": "Search query"
            },
        }
        
        return options

Method 2: validate_query

Next the validate_query method is required. It takes the option inputs provided by the user and can be used to ensure they meet certain requirements. It then provides the dictionary of options to the get_items method. For our example, we will just ensure they are not blank.

Add the query exception to the top of our search worker file:

from common.lib.exceptions import QueryParametersException

And add the method to the SearchWikipedia class:

    @staticmethod
    def validate_query(query, request, user):
        """
        Validate the options input needed to query the Wikipedia data source.

        Will raise a QueryParametersException if invalid parameters are
        encountered. Parameters are additionally sanitised.

        :param dict query:  Query parameters, from client-side.
        :param request:  Flask request
        :param User user:  User object of user who has submitted the query
        :return dict:  Safe query parameters
        """
        # Please provide an API key
        if not query.get("api_key", None):
            raise QueryParametersException("Please provide an API key.")

        # Please provide some query
        if not query.get("query", None):
            raise QueryParametersException("Please provide a query.")

        # Great, query is fine to return
        return query

Method 3: get_items

Next, we will do the meat of the work in the get_items method. This takes the parameters provided and collects the results from the API yielding them one by one (or all at once if desired).

Add a couple more imports:

from datetime import datetime, timezone
import requests
from common.lib.exceptions import ProcessorInterruptedException

And this method to our SearchWikipedia class:

    def get_items(self, query):
        """
        Use the Wikimedia API to collect articles

        :param query:
        :return:
        """
        api_key = query.get("api_key") # corresponds to the "api_key" option in `get_options`
        if api_key is None:
            # Because API keys are not stored per the `sensitive` parameter in `get_options`, if this dataset was interrupted, it cannot be resumed
            self.dataset.update_status(
                "Wikipedia query failed or was interrupted; please create new query in order to provide your API key again.",
                is_final=True)
            return [] # No items to return

        # Get the query
        language_code = 'en' # Wikipedia needs a language code; we could add an option for this
        search_query = query.get("query") # corresponds to the "query" option in `get_options`
        number_of_results = 10 # Another great option to add
        headers = {
            'Authorization': api_key,
            'User-Agent': '4CAT (4cat.nl)' # Be nice to open source resources and let them know who is using their API
        }

        base_url = 'https://api.wikimedia.org/core/v1/wikipedia/'
        endpoint = '/search/page'
        url = base_url + language_code + endpoint
        parameters = {'q': search_query, 'limit': number_of_results}

        self.dataset.update_status("Querying Wikimedia API for {}".format(search_query)) # update_status adds a message to the dataset's status log and to the UI
        response = requests.get(url, headers=headers, params=parameters)
        collection_time = datetime.now(tz=timezone.utc).timestamp() # Add a collection timestamp for metadata

        if response.status_code != 200:
            self.dataset.update_status(
                "Wikimedia API query failed with status code {} and reason {}.".format(response.status_code, response.reason),
                is_final=True)
            return []

        total_results = len(response.json()['pages'])
        # Get the data
        for i, article_result in enumerate(response.json()['pages']):
            if self.interrupted:
                # In this example, it is not necessary to check for interruptions, but if we were following up with more
                # queries or collecting the actual articles, we would need to check 4CAT for interruptions
                raise ProcessorInterruptedException("Interrupted while fetching articles from the Wikimedia API")

            # It is a good practice to add some metadata to items such as collection time
            article_result['4CAT_metadata'] = {"collected_at": collection_time, "query": search_query, 'language_code': language_code}

            yield article_result
            self.dataset.update_progress(i+1 / total_results) # update_progress updates the progress bar in the UI

Method 4: map_item (Only for NDJSON datasets)

Finally, we need to map the resultant JSON for 4CAT to understand which fields are relevant to the user. In a CSV file, 4CAT will use each column heading as the field names, but with JSONs this is not always possible. Our Wikimedia articles are formatted like so:

{'id': 26903,
 'key': 'Solar_System',
 'title': 'Solar System',
 'excerpt': 'The <span class="searchmatch">Solar</span> <span class="searchmatch">System</span> is the gravitationally bound <span class="searchmatch">system</span> of the Sun and the objects that orbit it. The largest of these objects are the eight planets, which',
 'matched_title': None,
 'description': 'The Sun and objects orbiting it',
 'thumbnail': {'mimetype': 'image/jpeg',
  'width': 60,
  'height': 34,
  'duration': None,
  'url': '//upload.wikimedia.org/wikipedia/commons/thumb/1/19/Solar_System_true_color.jpg/60px-Solar_System_true_color.jpg'}}

We will use map_item to format as desired. The original NDJSON file will be saved, but analyses that require users to select columns or need specific fields will use this map_item method.

Additional imports:

from common.lib.item_mapping import MappedItem

And the map_item method to add to SearchWikipedia:

    @staticmethod
    def map_item(item):
        """
        Map a nested Wikipedia object to a flat dictionary

        :param item:  Wikipedia object as originally returned by the VK API
        :return dict:  Dictionary in the format expected by 4CAT
        """
        # We return a MappedItem object as that can allow us some flexibility to handle issues in the data (such as
        # missing fields) and notify users if needed
        fourcat_metadata = item.get('4CAT_metadata', {})
        collected_at = datetime.fromtimestamp(item.get('4CAT_metadata', {}).get('collected_at'))
        language = fourcat_metadata.get('language_code')

        display_title = item['title']
        article_url = 'https://' + language + '.wikipedia.org/wiki/' + item['key']

        return MappedItem({
            # Some fields are required by 4CAT
            "id": item["id"],
            "timestamp": collected_at.strftime("%Y-%m-%d %H:%M:%S"),
            "body": item.get("excerpt", ""), # The "body" field is often used by default in 4CAT as the main text of the item
            "author": "", # We don't have an author for Wikipedia articles, but it is a required field
            "thread_id": "", # We don't have a thread ID for Wikipedia articles, but it is a required field

            # Additional data
            "link": article_url,
            "subject": display_title, # "subject" is a commonly used field in 4CAT
            "description": item.get("description", ""),
            "image_url": "https:" + item.get("thumbnail", {}).get("url", "") if item.get("thumbnail") else "", # "image_url" is a commonly used field in 4CAT and should be a valid link for 4CAT to download later

            # Metadata if desired; can be useful if we collected multiple queries and/or languages
            "language_code": language,
            "query": fourcat_metadata.get('query', ""),
            "unix_timestamp": int(collected_at.timestamp()),
        })

Step 3: Activate your new datasource

There we have it! Now you can restart 4CAT and activate your datasource via the Control Panel:

  • Control Panel -> Settings -> Data sources
  • Find your new datasource to enable and save!
  • Go to Create Dataset and try it out.

If using Docker, you will need to copy the files to both the 4cat_backend and 4cat_frontend (e.g., docker cp datasources/wikipedia 4cat_backend:/usr/src/app/datasources/) and restart both so that they register the new datasource files (e.g., docker restart 4cat_backend).

Troubleshooting tips

  • Test your datasource using your python environment by importing the search worker file
    • e.g., import datasources.wikipedia.search_wikipedia for our example
    • If using Docker to develop, you can run tests using the 4CAT python environment in the containers via docker exec -it 4cat_backend python.
  • 500 error message when trying to view the datasource:
    • Check your 4CAT frontend logs to see the error message. Most likely it has to do with map_item no properly handling the results.