import duckdb import json import os import math import pymonetdb import numpy as np import pandas as pd from abc import ABC, abstractmethod from collections import defaultdict from nltk import corpus, word_tokenize from pymonetdb.sql import cursors as monet_cursors from typing import Union import time class Index(ABC): cursor: Union[duckdb.Cursor, monet_cursors.Cursor] def __init__(self, db): self.stopwords = corpus.stopwords.words('english') self.init_db(db) def get_terms(self, body: str): terms = defaultdict(int) for term in word_tokenize(body.lower()): if term not in self.stopwords and term.isalpha(): terms[term[:32]] += 1 return terms @staticmethod def get_index(engine: str, db: str): if engine == 'duckdb': return DuckDBIndex(db) elif engine == 'monetdb': return MonetDBIndex(db) raise NotImplementedError(f'Engine "{engine}" not implemented!') @abstractmethod def init_db(self, db: str): pass @abstractmethod def index(self, document: dict): pass @abstractmethod def bulk_index(self, filename: str): try: with open(filename) as _file: data = json.load(_file) except json.JSONDecodeError: print('[!] Invalid input file!') return for i, document in enumerate(data): self.index(document) amount_of_digits = math.floor(math.log10(len(data))) + 1 print(f'{i + 1:>{amount_of_digits}d}/{len(data)}', end='\r') @abstractmethod def search(self, query): pass @abstractmethod def clear(self): pass class DuckDBIndex(Index): def reset_auto_increment(self): max_termid = self.cursor.execute('SELECT MAX(termid) FROM terms').fetchone()[0] max_termid = 1 if max_termid is None else max_termid + 1 max_docid = self.cursor.execute('SELECT MAX(docid) FROM docs').fetchone()[0] max_docid = 1 if max_docid is None else max_docid + 1 self.cursor.execute('DROP SEQUENCE term_ids') self.cursor.execute('DROP SEQUENCE doc_ids') self.cursor.execute(f'CREATE SEQUENCE term_ids START WITH {max_termid}') self.cursor.execute(f'CREATE SEQUENCE doc_ids START WITH {max_docid}') def init_db(self, db): db_exists = os.path.exists(db) self.cursor = duckdb.connect(db).cursor() if not db_exists: self.cursor.execute(f'CREATE SEQUENCE term_ids') self.cursor.execute(f'CREATE SEQUENCE doc_ids') self.cursor.execute('CREATE TABLE dict(' 'termid INTEGER NOT NULL,' 'term VARCHAR NOT NULL,' 'df INTEGER NOT NULL)') self.cursor.execute('CREATE TABLE docs(' 'docid INTEGER NOT NULL,' 'name VARCHAR NOT NULL,' 'length INTEGER NOT NULL)') self.cursor.execute('CREATE TABLE terms(' 'termid INTEGER NOT NULL,' 'docid INTEGER NOT NULL,' 'count INTEGER NOT NULL)') else: self.reset_auto_increment() def index(self, document: dict): terms = self.get_terms(document['body']) doc_name = document['name'] doc_length = sum(terms.values()) doc_id = self.cursor.execute("SELECT nextval('doc_ids')").fetchone()[0] self.cursor.execute(f"INSERT INTO docs VALUES ({doc_id}, '{doc_name}', {doc_length})") term_freqs = {} start = time.time() for term, frequency in terms.items(): term_id = self.cursor.execute(f"SELECT termid FROM dict WHERE term = '{term}'").fetchone() if term_id is None: term_id = self.cursor.execute("SELECT nextval('term_ids')").fetchone()[0] self.cursor.execute(f"INSERT INTO dict VALUES ({term_id}, '{term}', 1)") else: term_id = term_id[0] self.cursor.execute(f"UPDATE dict SET df = df + 1 WHERE termid = {term_id}") term_freqs[term_id] = frequency end = time.time() print(f'{end - start}s for {len(terms)} terms') term_values = ', '.join([f'{term_id, doc_id, frequency}' for term_id, frequency in term_freqs.items()]) self.cursor.execute(f"INSERT INTO terms VALUES {term_values}") print(f'{time.time() - end}s for inserting') def bulk_index(self, filename: str): try: with open(filename) as _file: data = json.load(_file) except json.JSONDecodeError: print('[!] Invalid input file!') return dict_table = self.cursor.execute('SELECT * FROM dict').fetchdf() term_table = self.cursor.execute('SELECT * FROM terms').fetchdf() doc_table = self.cursor.execute('SELECT * FROM docs').fetchdf() if not dict_table.empty: dict_table.set_index('termid', inplace=True) docs = [] docid_start = 1 if doc_table.empty else doc_table['docid'].max() + 1 start = time.time() for i, document in enumerate(data): docid = docid_start + i doc_terms = self.get_terms(document['body']) new_dict = pd.DataFrame([{ 'term': term, 'df': 0, } for term in doc_terms]) dict_table = (pd.concat([dict_table, new_dict], ignore_index=True) .drop_duplicates('term')) dict_table.loc[dict_table['term'].isin(doc_terms), 'df'] += 1 new_terms = dict_table.loc[dict_table['term'].isin(doc_terms)].copy() new_terms['termid'] = new_terms.index new_terms['docid'] = np.repeat(docid, len(doc_terms)) new_terms = (new_terms.replace({'term': doc_terms}) .rename(columns={'term': 'count'})[['termid', 'docid', 'count']]) term_table = pd.concat([term_table, new_terms], ignore_index=True) docs.append({ 'docid': docid, 'name': document['name'], 'length': sum(doc_terms.values()), }) amount_of_digits = math.floor(math.log10(len(data))) + 1 print(f'{i + 1:>{amount_of_digits}d}/{len(data)}', end='\r') new_docs = pd.DataFrame(docs, columns=['docid', 'name', 'length']) doc_table = new_docs if doc_table.empty else pd.concat([doc_table, new_docs], ignore_index=True) dict_table['termid'] = dict_table.index dict_table = dict_table[['termid', 'term', 'df']] dict_table.to_csv('dict.csv', header=False, index=False) doc_table.to_csv('docs.csv', header=False, index=False) term_table.to_csv('terms.csv', header=False, index=False) for table in ('dict', 'docs', 'terms'): self.cursor.execute(f'DELETE FROM {table}') self.cursor.execute(f"COPY {table} FROM '{table}.csv'") os.remove(f'{table}.csv') current = time.time() print(f'Indexed {len(data)} documents in {current - start:.2f} seconds!') def search(self, query): self.cursor.execute(query) return self.cursor.fetchdf() def clear(self): self.cursor.execute("DELETE FROM terms") self.cursor.execute("DELETE FROM docs") self.cursor.execute("DELETE FROM dict") def __str__(self): dict_rows = self.cursor.execute('SELECT * FROM dict').fetchdf() term_rows = self.cursor.execute('SELECT * FROM terms').fetchdf() doc_rows = self.cursor.execute('SELECT * FROM docs').fetchdf() return '\n'.join([ 'dict', '-' * 20, str(dict_rows), '', 'terms', '-' * 20, str(term_rows), '', 'docs', '-' * 20, str(doc_rows), ]) class MonetDBIndex(Index): def init_db(self, db: str): self.cursor = pymonetdb.connect(username='monetdb', password='monetdb', hostname='localhost', database=db).cursor() self.cursor.execute('CREATE TABLE IF NOT EXISTS dict(' # 'termid INTEGER NOT NULL PRIMARY KEY AUTO_INCREMENT,' 'termid INTEGER NOT NULL,' # 'term VARCHAR(32) NOT NULL UNIQUE,' 'term VARCHAR(64) NOT NULL,' 'df INTEGER NOT NULL)') self.cursor.execute('CREATE TABLE IF NOT EXISTS docs(' # 'docid INTEGER NOT NULL PRIMARY KEY AUTO_INCREMENT,' 'docid INTEGER NOT NULL,' 'name VARCHAR(64) NOT NULL,' 'length INTEGER NOT NULL)') self.cursor.execute('CREATE TABLE IF NOT EXISTS terms(' 'termid INTEGER NOT NULL,' 'docid INTEGER NOT NULL,' 'count INTEGER NOT NULL)') def index(self, document: dict): terms = self.get_terms(document['body']) doc_name = document['name'][:32] doc_length = sum(terms.values()) self.cursor.execute(f"INSERT INTO docs (name, length) VALUES ('{doc_name}', {doc_length})") doc_id = self.cursor.lastrowid for term, frequency in terms.items(): rows = self.cursor.execute(f"SELECT termid FROM dict WHERE term = '{term}'") if rows > 0: term_id, = self.cursor.fetchone() self.cursor.execute(f"UPDATE dict SET df = df + 1 WHERE termid = {term_id}") else: self.cursor.execute(f"INSERT INTO dict (term, df) VALUES ('{term}', 1)") term_id = self.cursor.lastrowid self.cursor.execute(f"INSERT INTO terms VALUES ({term_id}, {doc_id}, {frequency})") self.cursor.execute('COMMIT') def bulk_index(self, filename: str): try: with open(filename) as _file: data = json.load(_file) except json.JSONDecodeError: print('[!] Invalid input file!') return self.cursor.execute('SELECT * FROM dict') dict_table = pd.DataFrame(self.cursor.fetchall(), columns=['termid', 'term', 'df']) self.cursor.execute('SELECT * FROM terms') term_table = pd.DataFrame(self.cursor.fetchall(), columns=['termid', 'docid', 'count']) self.cursor.execute('SELECT * FROM docs') doc_table = pd.DataFrame(self.cursor.fetchall(), columns=['docid', 'name', 'length']) if not dict_table.empty: dict_table.set_index('termid', inplace=True) docs = [] docid_start = 1 if doc_table.empty else doc_table['docid'].max() + 1 start = time.time() for i, document in enumerate(data): docid = docid_start + i doc_terms = self.get_terms(document['body']) new_dict = pd.DataFrame([{ 'term': term, 'df': 0, } for term in doc_terms]) dict_table = (pd.concat([dict_table, new_dict], ignore_index=True) .drop_duplicates('term')) dict_table.loc[dict_table['term'].isin(doc_terms), 'df'] += 1 new_terms = dict_table.loc[dict_table['term'].isin(doc_terms)].copy() new_terms['termid'] = new_terms.index new_terms['docid'] = np.repeat(docid, len(doc_terms)) new_terms = (new_terms.replace({'term': doc_terms}) .rename(columns={'term': 'count'})[['termid', 'docid', 'count']]) term_table = pd.concat([term_table, new_terms], ignore_index=True) docs.append({ 'docid': docid, 'name': document['name'], 'length': sum(doc_terms.values()), }) amount_of_digits = math.floor(math.log10(len(data))) + 1 print(f'{i + 1:>{amount_of_digits}d}/{len(data)}', end='\r') new_docs = pd.DataFrame(docs, columns=['docid', 'name', 'length']) doc_table = new_docs if doc_table.empty else pd.concat([doc_table, new_docs], ignore_index=True) dict_table['termid'] = dict_table.index dict_table = dict_table[['termid', 'term', 'df']] dict_table.to_csv('.monetdb/dict.csv', header=False, index=False) doc_table.to_csv('.monetdb/docs.csv', header=False, index=False) term_table.to_csv('.monetdb/terms.csv', header=False, index=False) for table in ('dict', 'docs', 'terms'): self.cursor.execute(f'DELETE FROM {table}') self.cursor.execute(f"COPY INTO {table} FROM '/app/{table}.csv' USING DELIMITERS ','") os.remove(f'.monetdb/{table}.csv') self.cursor.execute('COMMIT') current = time.time() print(f'Indexed {len(data)} documents in {current - start:.2f} seconds!') def search(self, query): self.cursor.execute(query) return self.cursor.fetchmany(10) def clear(self): self.cursor.execute('TRUNCATE terms RESTART IDENTITY') self.cursor.execute('TRUNCATE docs RESTART IDENTITY') self.cursor.execute('TRUNCATE dict RESTART IDENTITY') self.cursor.execute('COMMIT') def __str__(self): table_mapping = { 'dict': ['termid', 'term', 'df'], 'docs': ['docid', 'name', 'length'], 'terms': ['termid', 'docid', 'count'], } rows = [] for table, col_names in table_mapping.items(): rows.append([table]) rows.append(['-' * (11 * len(col_names) - 1)]) rows.append(col_names) rows.append(['-' * 10 for _ in range(len(col_names))]) amount = self.cursor.execute(f'SELECT * FROM {table}') rows.extend(self.cursor.fetchmany(10)) if amount > 10: rows.append(['...' for _ in range(len(col_names))]) rows.append('') return '\n'.join([' '.join([f'{value:>10}' for value in row]) for row in rows])