From 1fc7afbc11c8ded857645c17a75740ffa23cace0 Mon Sep 17 00:00:00 2001 From: Gijs Hendriksen Date: Fri, 6 Dec 2019 15:24:13 +0100 Subject: [PATCH] Add MonetDB implementation --- index.py | 266 ++++++++++++++++++++++++++++++++++++++++++------- main.py | 23 ++++- poetry.lock | 24 ++++- pyproject.toml | 1 + query.py | 2 +- search.py | 4 +- 6 files changed, 276 insertions(+), 44 deletions(-) diff --git a/index.py b/index.py index 62cdb57..fe8ddfd 100644 --- a/index.py +++ b/index.py @@ -2,29 +2,74 @@ import duckdb import json import os import math +import pymonetdb import numpy as np import pandas as pd +from abc import ABC, abstractmethod from collections import defaultdict from nltk import corpus, word_tokenize -from typing import List +from pymonetdb.sql import cursors as monet_cursors +from typing import Union import time -class Index: - cursor: duckdb.Cursor - stopwords: List[str] +class Index(ABC): + cursor: Union[duckdb.Cursor, monet_cursors.Cursor] - def __init__(self, db: str, stopwords: List[str] = None): - db_exists = os.path.exists(db) - self.cursor = duckdb.connect(db).cursor() + def __init__(self, db): + self.stopwords = corpus.stopwords.words('english') + self.init_db(db) - self.stopwords = stopwords or corpus.stopwords.words('english') + def get_terms(self, body: str): + terms = defaultdict(int) - if db_exists: - self.reset_auto_increment() - else: - self.init_db() + for term in word_tokenize(body.lower()): + if term not in self.stopwords and term.isalpha(): + terms[term[:32]] += 1 + + return terms + + @staticmethod + def get_index(engine: str, db: str): + if engine == 'duckdb': + return DuckDBIndex(db) + elif engine == 'monetdb': + return MonetDBIndex(db) + raise NotImplementedError(f'Engine "{engine}" not implemented!') + + @abstractmethod + def init_db(self, db: str): + pass + + @abstractmethod + def index(self, document: dict): + pass + + @abstractmethod + def bulk_index(self, filename: str): + try: + with open(filename) as _file: + data = json.load(_file) + except json.JSONDecodeError: + print('[!] Invalid input file!') + return + + for i, document in enumerate(data): + self.index(document) + + amount_of_digits = math.floor(math.log10(len(data))) + 1 + print(f'{i + 1:>{amount_of_digits}d}/{len(data)}', end='\r') + + @abstractmethod + def search(self, query): + pass + + @abstractmethod + def clear(self): + pass + +class DuckDBIndex(Index): def reset_auto_increment(self): max_termid = self.cursor.execute('SELECT MAX(termid) FROM terms').fetchone()[0] max_termid = 1 if max_termid is None else max_termid + 1 @@ -37,35 +82,33 @@ class Index: self.cursor.execute(f'CREATE SEQUENCE term_ids START WITH {max_termid}') self.cursor.execute(f'CREATE SEQUENCE doc_ids START WITH {max_docid}') - def init_db(self): - self.cursor.execute(f'CREATE SEQUENCE term_ids') - self.cursor.execute(f'CREATE SEQUENCE doc_ids') - self.cursor.execute('CREATE TABLE dict(' - 'termid INTEGER NOT NULL,' - 'term VARCHAR NOT NULL,' - 'df INTEGER NOT NULL)') - self.cursor.execute('CREATE TABLE docs(' - 'docid INTEGER NOT NULL,' - 'name VARCHAR NOT NULL,' - 'length INTEGER NOT NULL)') - self.cursor.execute('CREATE TABLE terms(' - 'termid INTEGER NOT NULL,' - 'docid INTEGER NOT NULL,' - 'count INTEGER NOT NULL)') - - def get_terms(self, body: str): - terms = defaultdict(int) + def init_db(self, db): + db_exists = os.path.exists(db) - for term in word_tokenize(body.lower()): - if term not in self.stopwords and term.isalpha(): - terms[term] += 1 + self.cursor = duckdb.connect(db).cursor() - return terms + if not db_exists: + self.cursor.execute(f'CREATE SEQUENCE term_ids') + self.cursor.execute(f'CREATE SEQUENCE doc_ids') + self.cursor.execute('CREATE TABLE dict(' + 'termid INTEGER NOT NULL,' + 'term VARCHAR NOT NULL,' + 'df INTEGER NOT NULL)') + self.cursor.execute('CREATE TABLE docs(' + 'docid INTEGER NOT NULL,' + 'name VARCHAR NOT NULL,' + 'length INTEGER NOT NULL)') + self.cursor.execute('CREATE TABLE terms(' + 'termid INTEGER NOT NULL,' + 'docid INTEGER NOT NULL,' + 'count INTEGER NOT NULL)') + else: + self.reset_auto_increment() def index(self, document: dict): terms = self.get_terms(document['body']) doc_name = document['name'] - doc_length = len(terms) + doc_length = sum(terms.values()) doc_id = self.cursor.execute("SELECT nextval('doc_ids')").fetchone()[0] self.cursor.execute(f"INSERT INTO docs VALUES ({doc_id}, '{doc_name}', {doc_length})") @@ -144,7 +187,7 @@ class Index: }) amount_of_digits = math.floor(math.log10(len(data))) + 1 - print(f'{i:>{amount_of_digits}d}/{len(data)}', end='\r') + print(f'{i + 1:>{amount_of_digits}d}/{len(data)}', end='\r') new_docs = pd.DataFrame(docs, columns=['docid', 'name', 'length']) doc_table = new_docs if doc_table.empty else pd.concat([doc_table, new_docs], ignore_index=True) @@ -190,3 +233,154 @@ class Index: '-' * 20, str(doc_rows), ]) + + +class MonetDBIndex(Index): + def init_db(self, db: str): + self.cursor = pymonetdb.connect(username='monetdb', password='monetdb', + hostname='localhost', database=db).cursor() + + self.cursor.execute('CREATE TABLE IF NOT EXISTS dict(' + 'termid INTEGER NOT NULL PRIMARY KEY AUTO_INCREMENT,' + 'term VARCHAR(32) NOT NULL UNIQUE,' + 'df INTEGER NOT NULL)') + self.cursor.execute('CREATE TABLE IF NOT EXISTS docs(' + 'docid INTEGER NOT NULL PRIMARY KEY AUTO_INCREMENT,' + 'name VARCHAR(32) NOT NULL,' + 'length INTEGER NOT NULL)') + self.cursor.execute('CREATE TABLE IF NOT EXISTS terms(' + 'termid INTEGER NOT NULL,' + 'docid INTEGER NOT NULL,' + 'count INTEGER NOT NULL)') + + def index(self, document: dict): + terms = self.get_terms(document['body']) + doc_name = document['name'][:32] + doc_length = sum(terms.values()) + + self.cursor.execute(f"INSERT INTO docs (name, length) VALUES ('{doc_name}', {doc_length})") + doc_id = self.cursor.lastrowid + + for term, frequency in terms.items(): + rows = self.cursor.execute(f"SELECT termid FROM dict WHERE term = '{term}'") + if rows > 0: + term_id, = self.cursor.fetchone() + self.cursor.execute(f"UPDATE dict SET df = df + 1 WHERE termid = {term_id}") + else: + self.cursor.execute(f"INSERT INTO dict (term, df) VALUES ('{term}', 1)") + term_id = self.cursor.lastrowid + + self.cursor.execute(f"INSERT INTO terms VALUES ({term_id}, {doc_id}, {frequency})") + self.cursor.execute('COMMIT') + + def bulk_index(self, filename: str): + try: + with open(filename) as _file: + data = json.load(_file) + except json.JSONDecodeError: + print('[!] Invalid input file!') + return + + self.cursor.execute('SELECT * FROM dict') + dict_table = pd.DataFrame(self.cursor.fetchall(), columns=['termid', 'term', 'df']) + + self.cursor.execute('SELECT * FROM terms') + term_table = pd.DataFrame(self.cursor.fetchall(), columns=['termid', 'docid', 'count']) + + self.cursor.execute('SELECT * FROM docs') + doc_table = pd.DataFrame(self.cursor.fetchall(), columns=['docid', 'name', 'length']) + + if not dict_table.empty: + dict_table.set_index('termid', inplace=True) + + docs = [] + docid_start = 1 if doc_table.empty else doc_table['docid'].max() + 1 + + start = time.time() + + for i, document in enumerate(data): + docid = docid_start + i + + doc_terms = self.get_terms(document['body']) + + new_dict = pd.DataFrame([{ + 'term': term, + 'df': 0, + } for term in doc_terms]) + + dict_table = (pd.concat([dict_table, new_dict], ignore_index=True) + .drop_duplicates('term')) + dict_table.loc[dict_table['term'].isin(doc_terms), 'df'] += 1 + + new_terms = dict_table.loc[dict_table['term'].isin(doc_terms)].copy() + new_terms['termid'] = new_terms.index + new_terms['docid'] = np.repeat(docid, len(doc_terms)) + new_terms = (new_terms.replace({'term': doc_terms}) + .rename(columns={'term': 'count'})[['termid', 'docid', 'count']]) + + term_table = pd.concat([term_table, new_terms], ignore_index=True) + + docs.append({ + 'docid': docid, + 'name': document['name'], + 'length': len(doc_terms), + }) + + amount_of_digits = math.floor(math.log10(len(data))) + 1 + print(f'{i + 1:>{amount_of_digits}d}/{len(data)}', end='\r') + + new_docs = pd.DataFrame(docs, columns=['docid', 'name', 'length']) + doc_table = new_docs if doc_table.empty else pd.concat([doc_table, new_docs], ignore_index=True) + dict_table['termid'] = dict_table.index + dict_table = dict_table[['termid', 'term', 'df']] + + dict_table.to_csv('dict.csv', header=False, index=False) + doc_table.to_csv('docs.csv', header=False, index=False) + term_table.to_csv('terms.csv', header=False, index=False) + + for table in ('dict', 'docs', 'terms'): + filename = os.path.abspath(f'{table}.csv') + + self.cursor.execute(f'DELETE FROM {table}') + self.cursor.execute(f"COPY INTO {table} FROM '{filename}'") + + os.remove(f'{table}.csv') + + self.cursor.execute('COMMIT') + + current = time.time() + print(f'Indexed {len(data)} documents in {current - start:.2f} seconds!') + + def search(self, query): + self.cursor.execute(query) + return self.cursor.fetchmany(10) + + def clear(self): + self.cursor.execute('TRUNCATE terms RESTART IDENTITY') + self.cursor.execute('TRUNCATE docs RESTART IDENTITY') + self.cursor.execute('TRUNCATE dict RESTART IDENTITY') + self.cursor.execute('COMMIT') + + def __str__(self): + table_mapping = { + 'dict': ['termid', 'term', 'df'], + 'docs': ['docid', 'name', 'length'], + 'terms': ['termid', 'docid', 'count'], + } + + rows = [] + + for table, col_names in table_mapping.items(): + rows.append([table]) + rows.append(['-' * (11 * len(col_names) - 1)]) + rows.append(col_names) + rows.append(['-' * 10 for _ in range(len(col_names))]) + + amount = self.cursor.execute(f'SELECT * FROM {table}') + rows.extend(self.cursor.fetchmany(10)) + + if amount > 10: + rows.append(['...' for _ in range(len(col_names))]) + rows.append('') + + return '\n'.join([' '.join([f'{value:>10}' for value in row]) for row in rows]) diff --git a/main.py b/main.py index 37497cf..c6d2509 100755 --- a/main.py +++ b/main.py @@ -1,5 +1,6 @@ import argparse +import time from index import Index from search import Search @@ -11,11 +12,23 @@ def bulk_index(index: Index, args: argparse.Namespace): def query_index(index: Index, args: argparse.Namespace): query_terms = args.terms + iterations = args.iterations search = Search(index) - search.search(query_terms) - # TODO use query terms to query index + if iterations: + times = [] + + for _ in range(iterations): + start = time.time() + search.search(query_terms) + times.append(time.time() - start) + + avg_time = sum(times) / len(times) + print(f'Average query time over {iterations} iterations: {avg_time:.3f}s') + else: + result = search.search(query_terms) + print(result) def dump_index(index: Index, args: argparse.Namespace): @@ -31,6 +44,8 @@ def main(): description='OldDuck - A Python implementation of OldDog, using DuckDB') parser.add_argument('database', help='The database file to use') + parser.add_argument('-e', '--engine', help='The database engine to use', + choices=('duckdb', 'monetdb'), default='duckdb') subparsers = parser.add_subparsers(dest='command') subparsers.required = True @@ -40,6 +55,8 @@ def main(): parser_index.set_defaults(func=bulk_index) parser_query = subparsers.add_parser('query') + parser_query.add_argument('-i', '--iterations', help='Number of iterations', + type=int, default=0) parser_query.add_argument('terms', help='The query terms', nargs='*') parser_query.set_defaults(func=query_index) @@ -51,7 +68,7 @@ def main(): args = parser.parse_args() - index = Index(args.database) + index = Index.get_index(args.engine, args.database) args.func(index, args) diff --git a/poetry.lock b/poetry.lock index 265e425..dd2a000 100644 --- a/poetry.lock +++ b/poetry.lock @@ -42,6 +42,18 @@ numpy = ">=1.13.3" python-dateutil = ">=2.6.1" pytz = ">=2017.2" +[[package]] +category = "main" +description = "Native MonetDB client Python API" +name = "pymonetdb" +optional = false +python-versions = "*" +version = "1.2.1" + +[package.dependencies] +six = ">=1.12.0" +typing = "*" + [[package]] category = "main" description = "Extensions to the standard Python datetime module" @@ -69,8 +81,16 @@ optional = false python-versions = ">=2.6, !=3.0.*, !=3.1.*" version = "1.13.0" +[[package]] +category = "main" +description = "Type Hints for Python" +name = "typing" +optional = false +python-versions = "*" +version = "3.7.4.1" + [metadata] -content-hash = "7ef45e999a2464d5a8aeb70a370dbd0e2806b146e945e1ee679294a69d61c430" +content-hash = "5c45c379f3047a08f8f0ba0e5943dfd5b17510016c4125ff449b1225521ed14a" python-versions = "^3.6" [metadata.hashes] @@ -78,6 +98,8 @@ duckdb = ["2272cc1f8a6496b1e1c38dac09d5de054bb5ec668a8767166f7b0254f7d61275", "2 nltk = ["a08bdb4b8a1c13de16743068d9eb61c8c71c2e5d642e8e08205c528035843f82", "bed45551259aa2101381bbdd5df37d44ca2669c5c3dad72439fa459b29137d94"] numpy = ["0b0dd8f47fb177d00fa6ef2d58783c4f41ad3126b139c91dd2f7c4b3fdf5e9a5", "25ffe71f96878e1da7e014467e19e7db90ae7d4e12affbc73101bcf61785214e", "26efd7f7d755e6ca966a5c0ac5a930a87dbbaab1c51716ac26a38f42ecc9bc4b", "28b1180c758abf34a5c3fea76fcee66a87def1656724c42bb14a6f9717a5bdf7", "2e418f0a59473dac424f888dd57e85f77502a593b207809211c76e5396ae4f5c", "30c84e3a62cfcb9e3066f25226e131451312a044f1fe2040e69ce792cb7de418", "4650d94bb9c947151737ee022b934b7d9a845a7c76e476f3e460f09a0c8c6f39", "4dd830a11e8724c9c9379feed1d1be43113f8bcce55f47ea7186d3946769ce26", "4f2a2b279efde194877aff1f76cf61c68e840db242a5c7169f1ff0fd59a2b1e2", "62d22566b3e3428dfc9ec972014c38ed9a4db4f8969c78f5414012ccd80a149e", "669795516d62f38845c7033679c648903200980d68935baaa17ac5c7ae03ae0c", "75fcd60d682db3e1f8fbe2b8b0c6761937ad56d01c1dc73edf4ef2748d5b6bc4", "9395b0a41e8b7e9a284e3be7060db9d14ad80273841c952c83a5afc241d2bd98", "9e37c35fc4e9410093b04a77d11a34c64bf658565e30df7cbe882056088a91c1", "a0678793096205a4d784bd99f32803ba8100f639cf3b932dc63b21621390ea7e", "b46554ad4dafb2927f88de5a1d207398c5385edbb5c84d30b3ef187c4a3894d8", "c867eeccd934920a800f65c6068acdd6b87e80d45cd8c8beefff783b23cdc462", "dd0667f5be56fb1b570154c2c0516a528e02d50da121bbbb2cbb0b6f87f59bc2", "de2b1c20494bdf47f0160bd88ed05f5e48ae5dc336b8de7cfade71abcc95c0b9", "f1df7b2b7740dd777571c732f98adb5aad5450aee32772f1b39249c8a50386f6", "ffca69e29079f7880c5392bf675eb8b4146479d976ae1924d01cd92b04cccbcc"] pandas = ["00dff3a8e337f5ed7ad295d98a31821d3d0fe7792da82d78d7fd79b89c03ea9d", "22361b1597c8c2ffd697aa9bf85423afa9e1fcfa6b1ea821054a244d5f24d75e", "255920e63850dc512ce356233081098554d641ba99c3767dde9e9f35630f994b", "26382aab9c119735908d94d2c5c08020a4a0a82969b7e5eefb92f902b3b30ad7", "33970f4cacdd9a0ddb8f21e151bfb9f178afb7c36eb7c25b9094c02876f385c2", "4545467a637e0e1393f7d05d61dace89689ad6d6f66f267f86fff737b702cce9", "52da74df8a9c9a103af0a72c9d5fdc8e0183a90884278db7f386b5692a2220a4", "61741f5aeb252f39c3031d11405305b6d10ce663c53bc3112705d7ad66c013d0", "6a3ac2c87e4e32a969921d1428525f09462770c349147aa8e9ab95f88c71ec71", "7458c48e3d15b8aaa7d575be60e1e4dd70348efcd9376656b72fecd55c59a4c3", "78bf638993219311377ce9836b3dc05f627a666d0dbc8cec37c0ff3c9ada673b", "8153705d6545fd9eb6dd2bc79301bff08825d2e2f716d5dced48daafc2d0b81f", "975c461accd14e89d71772e89108a050fa824c0b87a67d34cedf245f6681fc17", "9962957a27bfb70ab64103d0a7b42fa59c642fb4ed4cb75d0227b7bb9228535d", "adc3d3a3f9e59a38d923e90e20c4922fc62d1e5a03d083440468c6d8f3f1ae0a", "bbe3eb765a0b1e578833d243e2814b60c825b7fdbf4cdfe8e8aae8a08ed56ecf", "df8864824b1fe488cf778c3650ee59c3a0d8f42e53707de167ba6b4f7d35f133", "e45055c30a608076e31a9fcd780a956ed3b1fa20db61561b8d88b79259f526f7", "ee50c2142cdcf41995655d499a157d0a812fce55c97d9aad13bc1eef837ed36c"] +pymonetdb = ["0e27358ce7c7c9c7a7753dce96a7aef5fd81ade023baf289bd19ee3bfaa56856", "e44cdcbd3e7de6e1e550a92aedd740a44fb9007090cf8003f5764416f87f371b"] python-dateutil = ["73ebfe9dbf22e832286dafa60473e4cd239f8592f699aa5adaf10050e6e1823c", "75bb3f31ea686f1197762692a9ee6a7550b59fc6ca3a1f4b5d7e32fb98e2da2a"] pytz = ["1c557d7d0e871de1f5ccd5833f60fb2550652da6be2693c1e02300743d21500d", "b02c06db6cf09c12dd25137e563b31700d3b80fcc4ad23abb7a315f2789819be"] six = ["1f1b7d42e254082a9db6279deae68afb421ceba6158efa6131de7b3003ee93fd", "30f610279e8b2578cab6db20741130331735c781b56053c59c4076da27f06b66"] +typing = ["91dfe6f3f706ee8cc32d38edbbf304e9b7583fb37108fef38229617f8b3eba23", "c8cabb5ab8945cd2f54917be357d134db9cc1eb039e59d1606dc1e60cb1d9d36", "f38d83c5a7a7086543a0f649564d661859c5146a85775ab90c0d2f93ffaa9714"] diff --git a/pyproject.toml b/pyproject.toml index 6e4004f..a69309d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,6 +8,7 @@ authors = ["Your Name "] python = "^3.6" duckdb = "^0.1.1" nltk = "^3.4" +pymonetdb = "^1.2" [tool.poetry.dev-dependencies] diff --git a/query.py b/query.py index 0715325..86903a9 100644 --- a/query.py +++ b/query.py @@ -8,7 +8,7 @@ def bm25(terms, disjunctive=False): qterms AS (SELECT termid, docid, count FROM terms WHERE termid IN (SELECT * FROM termids)), subscores AS (SELECT docs.docid, length, term_tf.termid, - tf, df, (log(((SELECT COUNT(*) FROM docs WHERE length > 0)-df+0.5)/(df+0.5))*((tf*(1.2+1)/ + tf, df, (ln(((SELECT COUNT(*) FROM docs WHERE length > 0)-df+0.5)/(df+0.5))*((tf*(1.2+1)/ (tf+1.2*(1-0.75+0.75*(length/(SELECT AVG(length) FROM docs WHERE length > 0))))))) AS subscore FROM (SELECT termid, docid, count AS tf FROM qterms) AS term_tf JOIN (SELECT docid FROM qterms diff --git a/search.py b/search.py index 828e720..a2c349d 100644 --- a/search.py +++ b/search.py @@ -1,6 +1,5 @@ from index import Index import query -import re class Search: @@ -17,5 +16,4 @@ class Search: else: raise NotImplementedError(f'Search method "{method}" was not implemented') - result = self.index.search(sql_query) - print(result) + return self.index.search(sql_query) -- GitLab