Correction of a dataprocessing/database homework
The subject of the Homework is here
The goal of the homework is to play with dataset larger and larger to feel the difficulty to scale and why memory management is important and where database shine. This will provide some corrections and best practice. For those who want to improve their python skills for handling large dataset.
Timing for the medium (and sometime for the full dataset) will be provided in the text.
All the timing and experiments are done on my server computer which is a cheap 12 years old laptop with 8GB of RAM and a slow Intel(R) Core(TM) i5.
Exploring the dataset
Every dataset is broken in one way or the other. Cleaning it is often fastidious and boring while time consuming. There is no exception for this one unfortunately.
The dataset is gzip compressed. For very large dataset, disk
availability can become an issue when uncompressing it. It is possible
to read it without uncompressing it using the python module. For that,
simply replace open
by gzip.open
after
importing the standard module gzip
.
The reading of the file will be slow down but it will not consume too much disk space.
Reading the file and putting the data into shape should be done with
an helper function. Embracing the iterator
way is the most
efficient way to avoid relying to much on memory.
When manipulating file in Python, never use string
to
represent file path. Python is equipped with the pathlib
module. In particular it handle file path on windows and linux.
Similarly, never parse a CSV file manually, always use the standard
csv
module for that (or Pandas). It is error prone. CSV is
not a complex file format, but there are some subtitilies nevertheless
that people tackled for you long time ago.
import gzip
import csv
from pathlib import Path
from typing import Iterator, Tuple
import json
10**8) # because some field are big somehow?
csv.field_size_limit(
def parse_json(iterator: Iterator) -> Iterator:
""" Take an iterator, catch JSONDecodeError and skip them """
= 0
line_nb while True:
try:
= next(iterator)
n yield (*n[:-2], json.loads(n[-1]))
+= 1
line_nb except json.JSONDecodeError: # this will catch ONLY error emit by Json module.
print(f"Error on line: {line_nb}")
except StopIteration:
return
It is classic to get annoyed by exception when dealing with
iterators. Iterators can always be transformed so that
the error handling is a part of them. The parse_json
function illustrates that.
def generate_rows(filepath: Path) -> Iterator[Tuple[str, str, int, str, Tuple[dict]]]:
= open
opener if filepath.suffix == ".gz":
= gzip.open
opener
with opener(filepath, 'rt') as f: # here r for readmode and t for text mode. Default of gzip.open is to open in binary mode.
yield from parse_json(csv.reader(f))
The usage is as follows:
= Path("bibendum_extractions.very_short.csv.gz")
very_short = generate_rows(very_short)
it print(next(it))
To benchmark an iterator, you should alway exhaust it. For that, simply iterate over it, doing nothing with the results.
On my computer, the following statement:
= Path("bibendum_extractions.medium.csv.gz")
medium = generate_rows(medium)
it for _ in it:
# doing nothing ...
It take around 3m on the compress version and 2m30 on the uncompress version. This time is the time for Python to read and decode the file and thats it.
In comparison, Pandas read the file a bit faster (1min30) but take up to 12Go in RAM. I could run it on a buffy computer but not on my laptop for instance. In comparisons, the pure Python version is not using any RAM (or barely noticeable), it would work on any computer, even a raspberry pie.
The full (compress) dataset can be process with the very same code and takes 38min.
It is possible to read it with pandas (some of the students did), it requires to process it in chunk, by giving an iterator of dataframe of bounded size which increase drastically the overall complexity of the code without any clear gains in performance. Also, it is the best to keep a good balance between code complexity and code efficiency!
It is important to keep in mind order of magnitude of the processing
time for non-pythonic software as Python is slow. For that
bash
is usefull: most of the command-line tools are written
in C
and sufficiently simplistic to not providing
overhead.
For instance, the following script is just reading the file and do nothing with it.
zcat bibendum_extractions.medium.csv.gz > /dev/null
It tooks 45s (4x time speed up). It also do not decode the Json part which is time consumming, so it is not a big surprise that it is much faster. The following is the equivalent to the uncompress version:
cat bibendum_extractions.medium.csv > /dev/null
It took arounds 10s (16x time speed up). Altogether, it gives us the following information:
- Processing the compressed file is indeed costly
- Dealing with Json probably account for most of the time of the computation.
Pythonic querying
Here, the goal was to make play with iterators and avoid storing too much data in the RAM so we simply filter the iterator.
def extract_author_paper(first_name, family_name, path: Path) -> Iterator[Tuple[str, str, int, str, Tuple[dict]]]:
"""
Return an iterator over the rows of a given author.
"""
= generate_rows(path)
it def check_author(authors_list) -> bool:
""" Helper function to check if authors is in the list """
return any(aut["first_name"] == first_name and aut["last_name"] == family_name for aut in authors_list)
return filter(lambda e: check_author(e[3]), it)
The following execution take around 3min30s, so the filter account for around 16% of the total cost of the processing.
list(extract_author_paper("Charles", "Paperman", medium))
The iterators over authors
Before moving on to the function get_all_authors
, we can
design a small helper function that again produces an iterator of
authors
seen as a pair
(first_name, last_name)
.
This is not hard to design:
def authors_iterator(path: Path) -> Iterator[Tuple[str, str]]:
= generate_rows(path)
it yield from ((author["first_name"], author["last_name"]) for row in it for author in row[3])
Again, we can time this function to check how costly it is. Remark that in term of memory, it is still in \(O(1)\) with a very small constant.
= authors_iterator(medium)
authors for author in authors:
...
And again, it take 3min30. (It seems that stacking iterator add ~10% of computing overhead). But now we want to filter the authors that appears twice and it is no longer possible to use a memory-less program: we need to remember all the authors seen so far.
The straightforward implementation absolutely needs to use a
set
and not a list
for performances. A list
would be quadratic which would take forever, probably several months for
the full dataset.
I added at the end of the computation a print
about the
size of the set in RAM.
import sys
def get_all_authors(path: Path) -> Iterator[Tuple[str, str]]:
= set()
authors for author in authors_iterator(path):
if author not in authors:
yield author
authors.add(author)print(f"Size of the datastructure: {sys.getsizeof(authors)//10**6}MB")
The run is this time longer (4min) so at list 25% of the computation is taken just by the set operations. More importantly, the memory footprint become the issue: 500MB. While here it is not so limiting, it could become an issue the full dataset. The 8GB of RAM of my computer are not enough to run this function on the large dataset.
When this happens, time is less an issue than RAM as we can wait more but it is often harder to add RAM to your computer. You can of course always pay for a computer with more RAM (or rent one in the cloud). It is also possible to trade RAM for time. The simplest strategy for that would be to perfom multiple pass on the input depending on the amount of RAM available.
We can adapt slightly the previous functions:
def get_all_authors(path: Path, nb_pass=7) -> Iterator[Tuple[str, str]]:
for i in range(7):
= set()
authors for author in authors_iterator(path):
if hash(author)%nb_pass == i and author not in authors:
yield author
authors.add(author)
This solution is elegant as it doesn’t require a lot of programming
to achieve it and it easy to see that we can trade by increasing
nb_pass
divides the size of the required data-structure
(making the program slower by a factor which is exactly
nb_pass
).
Another strategy would be perform ondisk computation while relying to external efficient programs. Something like this would work on a Linux machine:
import os
import tempfile
def get_all_authors_external_sort(path) -> Iterator[Tuple[str, str]]:
= tempfile.mkstemp()
intmp, infilename = tempfile.mkstemp()
outtmp, outfilename with open(intmp, "a") as f:
for author in authors_iterator(path):
"\t".join(author))
f.write(f"sort -u --parallel 3 {infilename} > {outfilename}")
os.system(# deleting the file
os.unlink(infilename) with open(outtmp) as f:
yield from map(lambda e: tuple(e.split("\t")), f)
os.unlink(outfilename)
Here the software sort
is optimized to sort large file
with multi processing and taking benefits of available memory without
exhausting it. It will become more efficient in situation where multiple
pass make the process way too slow. Here, the process of the
medium
database takes 6min. So it would get interesting in
situation where we need at least two pass on the input.
It is already very much like how database behave with very efficient on-disk sorting. Finally, the most reasonnable strategy is to simply put that in a SQLite database to allows it to deal with all the technicalities.
Probabilistic datastructure
So far, we didn’t wanted any loss but in many applications it is ok to have, says, 1% errors or 0.1% errors. That means that potentially some authors would get dropped wrongfully and would disappear. While it could get completely wrong to do so for a working system, when performing data-analysis such an error would have no impact for the end aggregated results.
Let says for instance that we want to count approximately the number of authors.
For that we could rely on probabilistic data-structure: a bloom filters.
A bloom filter should be think as set (hash tables) but with some imprecision but much more concised in memory. We trade memory against certainty. One major distinction with set is that bloom filter just store some sort of concise hash signature and not the values.
I use the package rbloom here.
The usage is rather straightfoward and is exactly equivalent to the
first variant except for the set
initialisation.
from rbloom import Bloom
def get_all_authors_bloom(path: Path, error_rate=0.01) -> Iterator[Tuple[str, str]]:
= Bloom(50*10**6, error_rate)
authors # 50 millions elements max with error rates of 0.1% per default
# This allocates 2GB of RAM. We can decrease it
# by increasing the error rates.
for author in authors_iterator(path):
if author not in authors:
yield author
authors.add(author)
This code takes 4min, which shows a small overhead in time while not using so much memory. It also works on the full dataset (the overall timing being 50min). The error rate is with respect to the maximum size of the datastructure. The overall size of the datastructure depends of both parameters.
To illusrate the effect of errors rates, I did the experiment several
times with values on the medium size data set. The execution time didn’t
vary and closely match the one with the set
datastructure.
Error rate | Memory usage | Missing authors |
---|---|---|
Baseline | 500 MB | 0 |
25% | 18 MB | 165488 (1%) |
10% | 30 MB | 17052 (0.1%) |
5% | 39 MB | 2479 (0.01%) |
1% | 60 MB | 33 (0.0001%) |
0.1% | 90 MB | 0 |
Using appropriate tooling
It is possible to use SQLite (or PostgreSQL) to perform all thoses operations efficiently without consideration about the RAM. Database are designed for that.
The key part is to ingest the data into the system.
Pythonless version, the SQLite way
It is possible to ingest the CSV directly within a table with a small command line interaction with SQLite.
sqlite3 bibendum.db "create table bibendum (id TEXT, type TEXT, year TEXT, title INT, authors JSON);"
sqlite3 bibendum.db ".mode csv" ".import bibendum_extractions.medium.csv bibendum"
This script took 1min30s, which should be put in persective with the 3min (x2) to just iterates throught the CSV in Python and parse the JSON part.
It is possible then to produce the list of all authors (with
repetitions) through the JSON_EACH
commands.
We can then implement extract_author_paper
with SQLite
as follows:
import sqlite3
def extract_author_paper_sqlite(first_name, family_name) -> Iterator[Tuple[str, str, int, str, Tuple[dict]]]:
"""
Return an iterator over the rows of a given author.
"""
= sqlite3.connect("bibendum.db")
db yield from db.execute("""
SELECT bibendum.*
FROM bibendum, JSON_EACH(authors)
WHERE
JSON_VALID(authors) AND
value ->> "first_name" = ? AND
value ->> "last_name" = ?
""", (first_name, family_name))
db.close()
This script takes 1min to be perform, three time faster than our Python version but most importantely, scales to the full dataset without trouble and without considering RAM at all.
Similarly we can implement the iterator on authors without duplicates on the authors:
import sqlite3
def get_all_authors_sqlite() -> Iterator[Tuple[str, str]]:
"""
Return an iterator over the rows of a given author.
"""
= sqlite3.connect("bibendum.db")
db yield from db.execute("""
SELECT DISTINCT
value ->> 'first_name',
value ->> 'last_name'
FROM bibendum, JSON_EACH(authors)
WHERE
JSON_VALID(authors)
""")
db.close()
It is however slower than the pure Python version (5m30 vs 3min for the medium dataset). Once the CSV is in the table, we can normalize the data with the following queries:
CREATE TABLE papers AS SELECT id, year, title, type FROM bibendum;
CREATE TABLE authors (
id TEXT,
first_name TEXT,
last_name TEXT
);
INSERT INTO authors
SELECT DISTINCT
value ->> 'id' as id,
value ->> 'first_name',
value ->> 'last_name'
FROM bibendum, JSON_EACH(authors)
WHERE
JSON_VALID(authors);
CREATE TABLE paper_to_author AS
SELECT
id as paper_id,
bibendum.value ->> 'id' as author_id
FROM bibendum, JSON_EACH(authors)
WHERE
JSON_VALID(authors);
All together, ingestion and normalisation of the medium dataset takes 10min for the medium dataset. To scale with the full dataset we need first to decompress the file. It was annoying for me as I have a very limited disk space on my machine. I cannot for instance store twice the dataset uncompressed so having a copy of the CSV file and a DB storing it is out of question. To avoid this, I used a named pipe.
mkfifo bibendum_uncompress.csv
zcat bibendum_extractions.csv.gz > bibendum_uncompress.csv &
sqlite3 bibendum.full.db "create table bibendum (id TEXT, type TEXT, year TEXT, title INT, authors JSON);"
sqlite3 bibendum.full.db ".mode csv" ".import bibendum_uncompress.csv bibendum"
This ingestion in the database takes 28 minutes
which is twice faster than simply iterating ONCE over the dataset in
pure python style. The ingestion and normalisation of the full dataset
takes 112 minutes. The final db file (after removing
the bibendum
requires **
- The
&
at the end of line 2 makes the uncompression perform in a subprocess in background. - The named pipe behave a bit like an iterator, each time we read from it we allow writer to push more data into it.
- It is rather low level and efficient !
Querying with SQLite
Now that we have a normalized database, we can query it efficiently. We are going first to write the queries, execute them without and then with index.
First, fetching the id of an authors.
SELECT * FROM authors WHERE first_name = "Charles" and last_name = "Paperman";
The query take 9s (without any index). It is just a sequential scan but with rows that are smaller, without decoding a JSON document.
To fetch the papers of an authors is harder as it requires to perform two joins:
SELECT papers.*
FROM
INNER JOIN paper_to_author ON papers.id = paper_id
papers INNER JOIN authors ON authors.id = author_id
WHERE
= "Charles" and last_name = "Paperman"; first_name
This query like this will simply take forever (I tried but after a
night, it was still running).
Creating the index tooks only few minutes and the query then is getting
executed in few ms.
What could have be done, but I have no time
- Using PostgreSQL
- Using PostgreSQL with more advance schema construction (automatic partitionning of the authors table, advance indexing)
- Using DuckDB, the state of the art analytical database
- Static indexing in pure Python
Compiled the: mer. 08 janv. 2025 11:51:58 CET