Correction of a dataprocessing/database homework

The subject of the Homework is here

The goal of the homework is to play with dataset larger and larger to feel the difficulty to scale and why memory management is important and where database shine. This will provide some corrections and best practice. For those who want to improve their python skills for handling large dataset.

Timing for the medium (and sometime for the full dataset) will be provided in the text.

All the timing and experiments are done on my server computer which is a cheap 12 years old laptop with 8GB of RAM and a slow Intel(R) Core(TM) i5.

Exploring the dataset

Every dataset is broken in one way or the other. Cleaning it is often fastidious and boring while time consuming. There is no exception for this one unfortunately.

The dataset is gzip compressed. For very large dataset, disk availability can become an issue when uncompressing it. It is possible to read it without uncompressing it using the python module. For that, simply replace open by gzip.open after importing the standard module gzip.

The reading of the file will be slow down but it will not consume too much disk space.

Reading the file and putting the data into shape should be done with an helper function. Embracing the iterator way is the most efficient way to avoid relying to much on memory.

When manipulating file in Python, never use string to represent file path. Python is equipped with the pathlib module. In particular it handle file path on windows and linux.

Similarly, never parse a CSV file manually, always use the standard csv module for that (or Pandas). It is error prone. CSV is not a complex file format, but there are some subtitilies nevertheless that people tackled for you long time ago.

import gzip
import csv
from pathlib import Path
from typing import Iterator, Tuple
import json


csv.field_size_limit(10**8) # because some field are big somehow?

def parse_json(iterator: Iterator) -> Iterator:
    """ Take an iterator, catch JSONDecodeError and skip them """
    line_nb = 0
    while True:
        try:
            n = next(iterator)
            yield (*n[:-2], json.loads(n[-1]))
            line_nb += 1
        except json.JSONDecodeError: # this will catch ONLY error emit by Json module.
            print(f"Error on line: {line_nb}")
        except StopIteration:
            return 

It is classic to get annoyed by exception when dealing with iterators. Iterators can always be transformed so that the error handling is a part of them. The parse_json function illustrates that.

def generate_rows(filepath: Path) -> Iterator[Tuple[str, str, int, str, Tuple[dict]]]:
    opener = open
    if filepath.suffix == ".gz":
        opener = gzip.open
    
    with opener(filepath, 'rt') as f: # here r for readmode and t for text mode. Default of gzip.open is to open in binary mode.
        yield from parse_json(csv.reader(f))

The usage is as follows:

very_short = Path("bibendum_extractions.very_short.csv.gz")
it = generate_rows(very_short)
print(next(it))

To benchmark an iterator, you should alway exhaust it. For that, simply iterate over it, doing nothing with the results.

On my computer, the following statement:

medium = Path("bibendum_extractions.medium.csv.gz")
it = generate_rows(medium)
for _ in it:
    ... # doing nothing

It take around 3m on the compress version and 2m30 on the uncompress version. This time is the time for Python to read and decode the file and thats it.

In comparison, Pandas read the file a bit faster (1min30) but take up to 12Go in RAM. I could run it on a buffy computer but not on my laptop for instance. In comparisons, the pure Python version is not using any RAM (or barely noticeable), it would work on any computer, even a raspberry pie.

The full (compress) dataset can be process with the very same code and takes 38min.

It is possible to read it with pandas (some of the students did), it requires to process it in chunk, by giving an iterator of dataframe of bounded size which increase drastically the overall complexity of the code without any clear gains in performance. Also, it is the best to keep a good balance between code complexity and code efficiency!

It is important to keep in mind order of magnitude of the processing time for non-pythonic software as Python is slow. For that bash is usefull: most of the command-line tools are written in C and sufficiently simplistic to not providing overhead.

For instance, the following script is just reading the file and do nothing with it.

    zcat bibendum_extractions.medium.csv.gz > /dev/null 

It tooks 45s (4x time speed up). It also do not decode the Json part which is time consumming, so it is not a big surprise that it is much faster. The following is the equivalent to the uncompress version:

    cat  bibendum_extractions.medium.csv > /dev/null

It took arounds 10s (16x time speed up). Altogether, it gives us the following information:

Pythonic querying

Here, the goal was to make play with iterators and avoid storing too much data in the RAM so we simply filter the iterator.

def extract_author_paper(first_name, family_name, path: Path) -> Iterator[Tuple[str, str, int, str, Tuple[dict]]]:
    """
        Return an iterator over the rows of a given author. 
    """ 
    it = generate_rows(path)
    def check_author(authors_list) -> bool:
        """ Helper function to check if authors is in the list """
        return any(aut["first_name"] == first_name and aut["last_name"] == family_name for aut in authors_list)
    return filter(lambda e: check_author(e[3]), it)

The following execution take around 3min30s, so the filter account for around 16% of the total cost of the processing.

list(extract_author_paper("Charles", "Paperman", medium))

The iterators over authors

Before moving on to the function get_all_authors, we can design a small helper function that again produces an iterator of authors seen as a pair (first_name, last_name).

This is not hard to design:

def authors_iterator(path: Path) -> Iterator[Tuple[str, str]]:
    it = generate_rows(path)
    yield from ((author["first_name"], author["last_name"]) for row in it for author in row[3])
    

Again, we can time this function to check how costly it is. Remark that in term of memory, it is still in \(O(1)\) with a very small constant.

authors = authors_iterator(medium)
for author in authors:
    ...

And again, it take 3min30. (It seems that stacking iterator add ~10% of computing overhead). But now we want to filter the authors that appears twice and it is no longer possible to use a memory-less program: we need to remember all the authors seen so far.

The straightforward implementation absolutely needs to use a set and not a list for performances. A list would be quadratic which would take forever, probably several months for the full dataset.

I added at the end of the computation a print about the size of the set in RAM.

import sys

def get_all_authors(path: Path) -> Iterator[Tuple[str, str]]:
    authors = set()
    for author in authors_iterator(path):
        if author not in authors:
            yield author 
            authors.add(author)
    print(f"Size of the datastructure: {sys.getsizeof(authors)//10**6}MB")

The run is this time longer (4min) so at list 25% of the computation is taken just by the set operations. More importantly, the memory footprint become the issue: 500MB. While here it is not so limiting, it could become an issue the full dataset. The 8GB of RAM of my computer are not enough to run this function on the large dataset.

When this happens, time is less an issue than RAM as we can wait more but it is often harder to add RAM to your computer. You can of course always pay for a computer with more RAM (or rent one in the cloud). It is also possible to trade RAM for time. The simplest strategy for that would be to perfom multiple pass on the input depending on the amount of RAM available.

We can adapt slightly the previous functions:

def get_all_authors(path: Path, nb_pass=7) -> Iterator[Tuple[str, str]]:
    for i in range(7):
        authors = set()
        for author in authors_iterator(path):
            if hash(author)%nb_pass == i and author not in authors:
                yield author 
                authors.add(author)

This solution is elegant as it doesn’t require a lot of programming to achieve it and it easy to see that we can trade by increasing nb_pass divides the size of the required data-structure (making the program slower by a factor which is exactly nb_pass).

Another strategy would be perform ondisk computation while relying to external efficient programs. Something like this would work on a Linux machine:

import os
import tempfile

def get_all_authors_external_sort(path) -> Iterator[Tuple[str, str]]:
    intmp, infilename = tempfile.mkstemp()
    outtmp, outfilename = tempfile.mkstemp()
    with open(intmp, "a") as f:
        for author in authors_iterator(path):
            f.write("\t".join(author))
    os.system(f"sort -u --parallel 3 {infilename} > {outfilename}")
    os.unlink(infilename) # deleting the file
    with open(outtmp) as f:
        yield from map(lambda e: tuple(e.split("\t")), f)
    os.unlink(outfilename)

Here the software sort is optimized to sort large file with multi processing and taking benefits of available memory without exhausting it. It will become more efficient in situation where multiple pass make the process way too slow. Here, the process of the medium database takes 6min. So it would get interesting in situation where we need at least two pass on the input.

It is already very much like how database behave with very efficient on-disk sorting. Finally, the most reasonnable strategy is to simply put that in a SQLite database to allows it to deal with all the technicalities.

Probabilistic datastructure

So far, we didn’t wanted any loss but in many applications it is ok to have, says, 1% errors or 0.1% errors. That means that potentially some authors would get dropped wrongfully and would disappear. While it could get completely wrong to do so for a working system, when performing data-analysis such an error would have no impact for the end aggregated results.

Let says for instance that we want to count approximately the number of authors.

For that we could rely on probabilistic data-structure: a bloom filters.

A bloom filter should be think as set (hash tables) but with some imprecision but much more concised in memory. We trade memory against certainty. One major distinction with set is that bloom filter just store some sort of concise hash signature and not the values.

I use the package rbloom here.

The usage is rather straightfoward and is exactly equivalent to the first variant except for the set initialisation.

from rbloom import Bloom

def get_all_authors_bloom(path: Path, error_rate=0.01) -> Iterator[Tuple[str, str]]:
    authors = Bloom(50*10**6, error_rate) 
    # 50 millions elements max with error rates of 0.1% per default
    # This allocates 2GB of RAM. We can decrease it
    # by increasing the error rates.
 
    for author in authors_iterator(path):
        if author not in authors:
            yield author 
            authors.add(author)

This code takes 4min, which shows a small overhead in time while not using so much memory. It also works on the full dataset (the overall timing being 50min). The error rate is with respect to the maximum size of the datastructure. The overall size of the datastructure depends of both parameters.

To illusrate the effect of errors rates, I did the experiment several times with values on the medium size data set. The execution time didn’t vary and closely match the one with the set datastructure.

Error rate Memory usage Missing authors
Baseline 500 MB 0
25% 18 MB 165488 (1%)
10% 30 MB 17052 (0.1%)
5% 39 MB 2479 (0.01%)
1% 60 MB 33 (0.0001%)
0.1% 90 MB 0

Using appropriate tooling

It is possible to use SQLite (or PostgreSQL) to perform all thoses operations efficiently without consideration about the RAM. Database are designed for that.

The key part is to ingest the data into the system.

Pythonless version, the SQLite way

It is possible to ingest the CSV directly within a table with a small command line interaction with SQLite.

sqlite3 bibendum.db "create table bibendum (id TEXT, type TEXT, year TEXT, title INT, authors JSON);"
sqlite3 bibendum.db ".mode csv" ".import bibendum_extractions.medium.csv bibendum"

This script took 1min30s, which should be put in persective with the 3min (x2) to just iterates throught the CSV in Python and parse the JSON part.

It is possible then to produce the list of all authors (with repetitions) through the JSON_EACH commands.

We can then implement extract_author_paper with SQLite as follows:

import sqlite3
def extract_author_paper_sqlite(first_name, family_name) -> Iterator[Tuple[str, str, int, str, Tuple[dict]]]:
    """
        Return an iterator over the rows of a given author. 
    """ 
    db = sqlite3.connect("bibendum.db")
    yield from db.execute("""

SELECT bibendum.*
FROM bibendum, JSON_EACH(authors)
WHERE 
    JSON_VALID(authors) AND
    value ->> "first_name" = ? AND
    value ->> "last_name" = ?
""", (first_name, family_name))
    db.close()

This script takes 1min to be perform, three time faster than our Python version but most importantely, scales to the full dataset without trouble and without considering RAM at all.

Similarly we can implement the iterator on authors without duplicates on the authors:

import sqlite3
def get_all_authors_sqlite() -> Iterator[Tuple[str, str]]:
    """
        Return an iterator over the rows of a given author. 
    """ 
    db = sqlite3.connect("bibendum.db")
    yield from db.execute("""

SELECT DISTINCT 
    value ->> 'first_name',
    value ->> 'last_name'
FROM bibendum, JSON_EACH(authors)
WHERE 
    JSON_VALID(authors)
""")
    db.close()

It is however slower than the pure Python version (5m30 vs 3min for the medium dataset). Once the CSV is in the table, we can normalize the data with the following queries:

CREATE TABLE papers AS SELECT id, year, title, type FROM bibendum;

CREATE TABLE authors (
    id TEXT, 
    first_name TEXT,
    last_name TEXT
);

INSERT INTO authors
    SELECT DISTINCT
        value ->> 'id' as id,
        value ->> 'first_name',
        value ->> 'last_name'
FROM bibendum, JSON_EACH(authors)
WHERE
    JSON_VALID(authors);

CREATE TABLE paper_to_author AS
    SELECT
        bibendum.id as paper_id,
        value ->> 'id' as author_id
    FROM bibendum, JSON_EACH(authors)
    WHERE
        JSON_VALID(authors);

All together, ingestion and normalisation of the medium dataset takes 10min for the medium dataset. To scale with the full dataset we need first to decompress the file. It was annoying for me as I have a very limited disk space on my machine. I cannot for instance store twice the dataset uncompressed so having a copy of the CSV file and a DB storing it is out of question. To avoid this, I used a named pipe.

mkfifo bibendum_uncompress.csv
zcat bibendum_extractions.csv.gz > bibendum_uncompress.csv & 
sqlite3 bibendum.full.db "create table bibendum (id TEXT, type TEXT, year TEXT, title INT, authors JSON);"
sqlite3 bibendum.full.db ".mode csv" ".import bibendum_uncompress.csv  bibendum"

This ingestion in the database takes 28 minutes which is twice faster than simply iterating ONCE over the dataset in pure python style. The ingestion and normalisation of the full dataset takes 112 minutes. The final db file (after removing the bibendum requires **

Querying with SQLite

Now that we have a normalized database, we can query it efficiently. We are going first to write the queries, execute them without and then with index.

First, fetching the id of an authors.

SELECT * FROM authors WHERE first_name = "Charles" and last_name = "Paperman";

The query take 9s (without any index). It is just a sequential scan but with rows that are smaller, without decoding a JSON document.

To fetch the papers of an authors is harder as it requires to perform two joins:

SELECT papers.* 
FROM 
    papers INNER JOIN paper_to_author ON papers.id = paper_id 
    INNER JOIN authors ON authors.id = author_id
WHERE
    first_name = "Charles" and last_name = "Paperman";

This query like this will simply take forever (I tried but after a night, it was still running).
Creating the index tooks only few minutes and the query then is getting executed in few ms.

What could have be done, but I have no time


Compiled the: jeu. 15 févr. 2024 21:01:22 CET