Commit aa3f1c7f authored by Gijs Hendriksen's avatar Gijs Hendriksen

Clean up bulk index code and add necessary documentation

parent f5b6672a
......@@ -9,7 +9,7 @@ from abc import ABC, abstractmethod
from collections import defaultdict
from nltk import corpus, word_tokenize
from pymonetdb.sql import cursors as monet_cursors
from typing import Union
from typing import Union, Dict
import time
......@@ -20,7 +20,14 @@ class Index(ABC):
self.stopwords = corpus.stopwords.words('english')
self.init_db(db)
def get_terms(self, body: str):
def get_terms(self, body: str) -> Dict[str, int]:
"""
Preprocesses a document's body by tokenising it and removing stop words and other words that contain
characters other than letters
:param body: the contents of the document
:return: a mapping of terms in the document to their occurrence frequency
"""
terms = defaultdict(int)
for term in word_tokenize(body.lower()):
......@@ -30,7 +37,14 @@ class Index(ABC):
return terms
@staticmethod
def get_index(engine: str, db: str):
def get_index(engine: str, db: str) -> 'Index':
"""
Returns an index able to run on the specified database engine
:param engine: the desired database engine (duckdb or monetdb)
:param db: the name of the database that should be used
:return: an index capable of running on the specified database engine
"""
if engine == 'duckdb':
return DuckDBIndex(db)
elif engine == 'monetdb':
......@@ -38,15 +52,108 @@ class Index(ABC):
raise NotImplementedError(f'Engine "{engine}" not implemented!')
@abstractmethod
def init_db(self, db: str):
def init_db(self, db: str) -> None:
"""
Runs the initial startup of the database, for instance by creating the necessary tables.
Should initialise `self.cursor` correctly.
:param db: the name of the database that should be used
"""
pass
@abstractmethod
def index(self, document: dict):
def index(self, document: dict) -> None:
"""
Indexes a document
:param document: a dictionary with the `name` and `body` of the document
"""
pass
@abstractmethod
def bulk_index(self, filename: str):
def get_existing_tables(self) -> Dict[str, pd.DataFrame]:
"""
Retrieves the full contents of the existing tables. Used for the bulk index, where all
heavy computation is done in Python rather than by the database using SQL, for faster
execution.
:return: a dictionary containing the `dict`, `terms` and `docs` tables as pandas DataFrames
"""
pass
@abstractmethod
def export_tables_to_database(self, tables: Dict[str, pd.DataFrame]) -> None:
"""
Exports the generated DataFrames to the database.
:param tables: a dictionary containing the `dict`, `terms` and `docs` tables as pandas DataFrames
"""
pass
def update_dict_table(self, tables: Dict[str, pd.DataFrame], doc_terms: Dict[str, int]):
"""
Updates the `dict` table in DataFrame form. Adds previously unseen terms into the
`dict` table with a unique id, and increases the document frequency of the other terms by one.
:param tables: a dictionary containing the `dict`, `terms` and `docs` tables as pandas DataFrames
:param doc_terms: a mapping of terms in the document to their occurrence frequency
"""
# Generate a new dict row for each term, to make sure the term exists in the dict table
new_dict = pd.DataFrame([{
'term': term,
'df': 0,
} for term in doc_terms])
# Add the new rows to the existing dict table, generating new unique ids for each term
# Also, drop all added rows with terms that already existed
dict_table = (pd.concat([tables['dict'], new_dict], ignore_index=True, sort=False)
.drop_duplicates('term'))
# For each of the terms in the document, increase its document frequency
dict_table.loc[dict_table['term'].isin(doc_terms), 'df'] += 1
tables['dict'] = dict_table
def update_term_table(self, tables: Dict[str, pd.DataFrame], doc_terms: Dict[str, int], docid: int):
"""
Updates the `term` table in DataFrame form. For each term in the document, it adds a new row to
the `term` table containing the term's id, the id of the current document and the amount of times
the term occurs in the current document.
:param tables: a dictionary containing the `dict`, `terms` and `docs` tables as pandas DataFrames
:param doc_terms: a mapping of terms in the document to their occurrence frequency
:param docid: the id of the document under consideration
"""
# Copy the rows from the dict table to create the new entries for the terms table
new_terms = tables['dict'].loc[tables['dict']['term'].isin(doc_terms)].copy()
# Create columns containing the correct term and document ids
# The term ids can be extracted directly from the index of the copied rows, and the
# document ids are the same for each row
new_terms['termid'] = new_terms.index
new_terms['docid'] = np.repeat(docid, len(doc_terms))
# Replace the `term` column by a `count` column that contains the amount of times
# the term occurs in the document, and make sure only the `termid`, `docid` and `count`
# columns remain
new_terms = (new_terms.replace({'term': doc_terms})
.rename(columns={'term': 'count'})[['termid', 'docid', 'count']])
# Add the new term entries
tables['terms'] = pd.concat([tables['terms'], new_terms], ignore_index=True)
def bulk_index(self, filename: str) -> None:
"""
Reads a JSON file for bulk indexing purposes. Each item in the JSON array should be a dictionary containing
the document `name` and `body`. Instead of using the `index` function repeatedly, this procedure does the
heavy lifting using pandas DataFrames. This speeds up index time significantly, mostly due to the lack of
execution of SQL queries.
:param filename: the name of the file in which all documents can be found
"""
try:
with open(filename) as _file:
data = json.load(_file)
......@@ -54,18 +161,64 @@ class Index(ABC):
print('[!] Invalid input file!')
return
print('Indexing...')
tables = self.get_existing_tables()
# Set the id as dataframe index to make use of the automatic
# unique id generation in dataframes
if not tables['dict'].empty:
tables['dict'].set_index('termid', inplace=True)
docs = []
# Generate unique ids by starting above the largest one and counting up
docid_start = 1 if tables['docs'].empty else tables['docs']['docid'].max() + 1
start = time.time()
for i, document in enumerate(data):
self.index(document)
docid = docid_start + i
doc_terms = self.get_terms(document['body'])
self.update_dict_table(tables, doc_terms)
self.update_term_table(tables, doc_terms, docid)
docs.append({
'docid': docid,
'name': document['name'],
'length': sum(doc_terms.values()),
})
amount_of_digits = math.floor(math.log10(len(data))) + 1
print(f'{i + 1:>{amount_of_digits}d}/{len(data)}', end='\r')
new_docs = pd.DataFrame(docs, columns=['docid', 'name', 'length'])
tables['docs'] = new_docs if tables['docs'].empty else pd.concat([tables['docs'], new_docs], ignore_index=True)
tables['dict']['termid'] = tables['dict'].index
tables['dict'] = tables['dict'][['termid', 'term', 'df']]
self.export_tables_to_database(tables)
current = time.time()
print(f'Indexed {len(data)} documents in {current - start:.2f} seconds!')
@abstractmethod
def search(self, query):
def search(self, query) -> list:
"""
Performs an SQL query on the index. Is used in combination with the `search` and `query` modules, which
contain predefined queries like BM25.
:param query: an SQL query to be performed on the index
:return: a list of query results
"""
pass
@abstractmethod
def clear(self):
def clear(self) -> None:
"""
Deletes all entries from the index.
"""
pass
......@@ -111,103 +264,49 @@ class DuckDBIndex(Index):
doc_length = sum(terms.values())
doc_id = self.cursor.execute("SELECT nextval('doc_ids')").fetchone()[0]
# Insert document into `docs` table
self.cursor.execute(f"INSERT INTO docs VALUES ({doc_id}, '{doc_name}', {doc_length})")
term_freqs = {}
start = time.time()
for term, frequency in terms.items():
# Check if term is already present in the `dict` table
term_id = self.cursor.execute(f"SELECT termid FROM dict WHERE term = '{term}'").fetchone()
if term_id is None:
# If the term is not yet present, generate a new id and create an entry for the term
term_id = self.cursor.execute("SELECT nextval('term_ids')").fetchone()[0]
self.cursor.execute(f"INSERT INTO dict VALUES ({term_id}, '{term}', 1)")
else:
# If the term is already present, increase its document frequency by one
term_id = term_id[0]
self.cursor.execute(f"UPDATE dict SET df = df + 1 WHERE termid = {term_id}")
term_freqs[term_id] = frequency
end = time.time()
print(f'{end - start}s for {len(terms)} terms')
# Insert all combinations of term id and doc id into the `terms` table
term_values = ', '.join([f'{term_id, doc_id, frequency}' for term_id, frequency in term_freqs.items()])
self.cursor.execute(f"INSERT INTO terms VALUES {term_values}")
print(f'{time.time() - end}s for inserting')
def bulk_index(self, filename: str):
try:
with open(filename) as _file:
data = json.load(_file)
except json.JSONDecodeError:
print('[!] Invalid input file!')
return
print('Indexing...')
def get_existing_tables(self) -> Dict[str, pd.DataFrame]:
dict_table = self.cursor.execute('SELECT * FROM dict').fetchdf()
term_table = self.cursor.execute('SELECT * FROM terms').fetchdf()
doc_table = self.cursor.execute('SELECT * FROM docs').fetchdf()
if not dict_table.empty:
dict_table.set_index('termid', inplace=True)
docs = []
docid_start = 1 if doc_table.empty else doc_table['docid'].max() + 1
start = time.time()
for i, document in enumerate(data):
docid = docid_start + i
doc_terms = self.get_terms(document['body'])
new_dict = pd.DataFrame([{
'term': term,
'df': 0,
} for term in doc_terms])
dict_table = (pd.concat([dict_table, new_dict], ignore_index=True, sort=False)
.drop_duplicates('term'))
dict_table.loc[dict_table['term'].isin(doc_terms), 'df'] += 1
new_terms = dict_table.loc[dict_table['term'].isin(doc_terms)].copy()
new_terms['termid'] = new_terms.index
new_terms['docid'] = np.repeat(docid, len(doc_terms))
new_terms = (new_terms.replace({'term': doc_terms})
.rename(columns={'term': 'count'})[['termid', 'docid', 'count']])
term_table = pd.concat([term_table, new_terms], ignore_index=True)
docs.append({
'docid': docid,
'name': document['name'],
'length': sum(doc_terms.values()),
})
amount_of_digits = math.floor(math.log10(len(data))) + 1
print(f'{i + 1:>{amount_of_digits}d}/{len(data)}', end='\r')
new_docs = pd.DataFrame(docs, columns=['docid', 'name', 'length'])
doc_table = new_docs if doc_table.empty else pd.concat([doc_table, new_docs], ignore_index=True)
dict_table['termid'] = dict_table.index
dict_table = dict_table[['termid', 'term', 'df']]
dict_table.to_csv('dict.csv', header=False, index=False)
doc_table.to_csv('docs.csv', header=False, index=False)
term_table.to_csv('terms.csv', header=False, index=False)
return {
'dict': dict_table,
'terms': term_table,
'docs': doc_table,
}
def export_tables_to_database(self, tables: Dict[str, pd.DataFrame]) -> None:
for table in ('dict', 'docs', 'terms'):
tables[table].to_csv(f'{table}.csv', header=False, index=False)
self.cursor.execute(f'DELETE FROM {table}')
self.cursor.execute(f"COPY {table} FROM '{table}.csv'")
os.remove(f'{table}.csv')
current = time.time()
print(f'Indexed {len(data)} documents in {current - start:.2f} seconds!')
def search(self, query):
self.cursor.execute(query)
df = self.cursor.fetchdf()
......@@ -279,16 +378,7 @@ class MonetDBIndex(Index):
self.cursor.execute(f"INSERT INTO terms VALUES ({term_id}, {doc_id}, {frequency})")
self.cursor.execute('COMMIT')
def bulk_index(self, filename: str):
try:
with open(filename) as _file:
data = json.load(_file)
except json.JSONDecodeError:
print('[!] Invalid input file!')
return
print('Indexing...')
def get_existing_tables(self) -> Dict[str, pd.DataFrame]:
self.cursor.execute('SELECT * FROM dict')
dict_table = pd.DataFrame(self.cursor.fetchall(), columns=['termid', 'term', 'df'])
......@@ -298,55 +388,16 @@ class MonetDBIndex(Index):
self.cursor.execute('SELECT * FROM docs')
doc_table = pd.DataFrame(self.cursor.fetchall(), columns=['docid', 'name', 'length'])
if not dict_table.empty:
dict_table.set_index('termid', inplace=True)
docs = []
docid_start = 1 if doc_table.empty else doc_table['docid'].max() + 1
start = time.time()
for i, document in enumerate(data):
docid = docid_start + i
doc_terms = self.get_terms(document['body'])
new_dict = pd.DataFrame([{
'term': term,
'df': 0,
} for term in doc_terms])
dict_table = (pd.concat([dict_table, new_dict], ignore_index=True, sort=False)
.drop_duplicates('term'))
dict_table.loc[dict_table['term'].isin(doc_terms), 'df'] += 1
new_terms = dict_table.loc[dict_table['term'].isin(doc_terms)].copy()
new_terms['termid'] = new_terms.index
new_terms['docid'] = np.repeat(docid, len(doc_terms))
new_terms = (new_terms.replace({'term': doc_terms})
.rename(columns={'term': 'count'})[['termid', 'docid', 'count']])
term_table = pd.concat([term_table, new_terms], ignore_index=True)
docs.append({
'docid': docid,
'name': document['name'],
'length': sum(doc_terms.values()),
})
amount_of_digits = math.floor(math.log10(len(data))) + 1
print(f'{i + 1:>{amount_of_digits}d}/{len(data)}', end='\r')
new_docs = pd.DataFrame(docs, columns=['docid', 'name', 'length'])
doc_table = new_docs if doc_table.empty else pd.concat([doc_table, new_docs], ignore_index=True)
dict_table['termid'] = dict_table.index
dict_table = dict_table[['termid', 'term', 'df']]
dict_table.to_csv('.monetdb/dict.csv', header=False, index=False)
doc_table.to_csv('.monetdb/docs.csv', header=False, index=False)
term_table.to_csv('.monetdb/terms.csv', header=False, index=False)
return {
'dict': dict_table,
'terms': term_table,
'docs': doc_table,
}
def export_tables_to_database(self, tables: Dict[str, pd.DataFrame]) -> None:
for table in ('dict', 'docs', 'terms'):
tables[table].to_csv(f'.monetdb/{table}.csv', header=False, index=False)
self.cursor.execute(f'DELETE FROM {table}')
self.cursor.execute(f"COPY INTO {table} FROM '/app/{table}.csv' USING DELIMITERS ','")
......@@ -354,9 +405,6 @@ class MonetDBIndex(Index):
self.cursor.execute('COMMIT')
current = time.time()
print(f'Indexed {len(data)} documents in {current - start:.2f} seconds!')
def search(self, query):
self.cursor.execute(query)
return self.cursor.fetchmany(10)
......
......@@ -7,12 +7,19 @@ from search import Search
def bulk_index(args: argparse.Namespace):
"""
Runs the `index` command, which reads a JSON file of documents and indexes them
"""
index = Index.get_index(args.engine, args.database)
filename = args.input
index.bulk_index(filename)
def query_index(args: argparse.Namespace):
"""
Runs the `query` command, which queries an existing index using a specific set of query terms
"""
index = Index.get_index(args.engine, args.database)
query_terms = args.terms
......@@ -23,6 +30,11 @@ def query_index(args: argparse.Namespace):
def benchmark(args: argparse.Namespace):
"""
Runs the `benchmark` command, which runs several predefined queries against a DuckDB and a MonetDB index,
to compare the differences in query time.
"""
duckdb_index = DuckDBIndex(args.duckdb)
monetdb_index = MonetDBIndex(args.monetdb)
......@@ -50,7 +62,6 @@ def benchmark(args: argparse.Namespace):
for i, index in enumerate(indices):
index.clear()
print('Indexing...')
index.bulk_index(filename)
search = Search(index)
......@@ -88,11 +99,17 @@ def benchmark(args: argparse.Namespace):
def dump_index(args: argparse.Namespace):
"""
Runs the `dump` command, which prints the contents of an index
"""
index = Index.get_index(args.engine, args.database)
print(index)
def clear_index(args: argparse.Namespace):
"""
Runs the `clear` command, which removes the contents of an index
"""
index = Index.get_index(args.engine, args.database)
index.clear()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment