Commit 1fc7afbc authored by Gijs Hendriksen's avatar Gijs Hendriksen

Add MonetDB implementation

parent 7e890b22
......@@ -2,29 +2,74 @@ import duckdb
import json
import os
import math
import pymonetdb
import numpy as np
import pandas as pd
from abc import ABC, abstractmethod
from collections import defaultdict
from nltk import corpus, word_tokenize
from typing import List
from pymonetdb.sql import cursors as monet_cursors
from typing import Union
import time
class Index:
cursor: duckdb.Cursor
stopwords: List[str]
class Index(ABC):
cursor: Union[duckdb.Cursor, monet_cursors.Cursor]
def __init__(self, db: str, stopwords: List[str] = None):
db_exists = os.path.exists(db)
self.cursor = duckdb.connect(db).cursor()
def __init__(self, db):
self.stopwords = corpus.stopwords.words('english')
self.init_db(db)
self.stopwords = stopwords or corpus.stopwords.words('english')
def get_terms(self, body: str):
terms = defaultdict(int)
if db_exists:
self.reset_auto_increment()
else:
self.init_db()
for term in word_tokenize(body.lower()):
if term not in self.stopwords and term.isalpha():
terms[term[:32]] += 1
return terms
@staticmethod
def get_index(engine: str, db: str):
if engine == 'duckdb':
return DuckDBIndex(db)
elif engine == 'monetdb':
return MonetDBIndex(db)
raise NotImplementedError(f'Engine "{engine}" not implemented!')
@abstractmethod
def init_db(self, db: str):
pass
@abstractmethod
def index(self, document: dict):
pass
@abstractmethod
def bulk_index(self, filename: str):
try:
with open(filename) as _file:
data = json.load(_file)
except json.JSONDecodeError:
print('[!] Invalid input file!')
return
for i, document in enumerate(data):
self.index(document)
amount_of_digits = math.floor(math.log10(len(data))) + 1
print(f'{i + 1:>{amount_of_digits}d}/{len(data)}', end='\r')
@abstractmethod
def search(self, query):
pass
@abstractmethod
def clear(self):
pass
class DuckDBIndex(Index):
def reset_auto_increment(self):
max_termid = self.cursor.execute('SELECT MAX(termid) FROM terms').fetchone()[0]
max_termid = 1 if max_termid is None else max_termid + 1
......@@ -37,35 +82,33 @@ class Index:
self.cursor.execute(f'CREATE SEQUENCE term_ids START WITH {max_termid}')
self.cursor.execute(f'CREATE SEQUENCE doc_ids START WITH {max_docid}')
def init_db(self):
self.cursor.execute(f'CREATE SEQUENCE term_ids')
self.cursor.execute(f'CREATE SEQUENCE doc_ids')
self.cursor.execute('CREATE TABLE dict('
'termid INTEGER NOT NULL,'
'term VARCHAR NOT NULL,'
'df INTEGER NOT NULL)')
self.cursor.execute('CREATE TABLE docs('
'docid INTEGER NOT NULL,'
'name VARCHAR NOT NULL,'
'length INTEGER NOT NULL)')
self.cursor.execute('CREATE TABLE terms('
'termid INTEGER NOT NULL,'
'docid INTEGER NOT NULL,'
'count INTEGER NOT NULL)')
def get_terms(self, body: str):
terms = defaultdict(int)
def init_db(self, db):
db_exists = os.path.exists(db)
for term in word_tokenize(body.lower()):
if term not in self.stopwords and term.isalpha():
terms[term] += 1
self.cursor = duckdb.connect(db).cursor()
return terms
if not db_exists:
self.cursor.execute(f'CREATE SEQUENCE term_ids')
self.cursor.execute(f'CREATE SEQUENCE doc_ids')
self.cursor.execute('CREATE TABLE dict('
'termid INTEGER NOT NULL,'
'term VARCHAR NOT NULL,'
'df INTEGER NOT NULL)')
self.cursor.execute('CREATE TABLE docs('
'docid INTEGER NOT NULL,'
'name VARCHAR NOT NULL,'
'length INTEGER NOT NULL)')
self.cursor.execute('CREATE TABLE terms('
'termid INTEGER NOT NULL,'
'docid INTEGER NOT NULL,'
'count INTEGER NOT NULL)')
else:
self.reset_auto_increment()
def index(self, document: dict):
terms = self.get_terms(document['body'])
doc_name = document['name']
doc_length = len(terms)
doc_length = sum(terms.values())
doc_id = self.cursor.execute("SELECT nextval('doc_ids')").fetchone()[0]
self.cursor.execute(f"INSERT INTO docs VALUES ({doc_id}, '{doc_name}', {doc_length})")
......@@ -144,7 +187,7 @@ class Index:
})
amount_of_digits = math.floor(math.log10(len(data))) + 1
print(f'{i:>{amount_of_digits}d}/{len(data)}', end='\r')
print(f'{i + 1:>{amount_of_digits}d}/{len(data)}', end='\r')
new_docs = pd.DataFrame(docs, columns=['docid', 'name', 'length'])
doc_table = new_docs if doc_table.empty else pd.concat([doc_table, new_docs], ignore_index=True)
......@@ -190,3 +233,154 @@ class Index:
'-' * 20,
str(doc_rows),
])
class MonetDBIndex(Index):
def init_db(self, db: str):
self.cursor = pymonetdb.connect(username='monetdb', password='monetdb',
hostname='localhost', database=db).cursor()
self.cursor.execute('CREATE TABLE IF NOT EXISTS dict('
'termid INTEGER NOT NULL PRIMARY KEY AUTO_INCREMENT,'
'term VARCHAR(32) NOT NULL UNIQUE,'
'df INTEGER NOT NULL)')
self.cursor.execute('CREATE TABLE IF NOT EXISTS docs('
'docid INTEGER NOT NULL PRIMARY KEY AUTO_INCREMENT,'
'name VARCHAR(32) NOT NULL,'
'length INTEGER NOT NULL)')
self.cursor.execute('CREATE TABLE IF NOT EXISTS terms('
'termid INTEGER NOT NULL,'
'docid INTEGER NOT NULL,'
'count INTEGER NOT NULL)')
def index(self, document: dict):
terms = self.get_terms(document['body'])
doc_name = document['name'][:32]
doc_length = sum(terms.values())
self.cursor.execute(f"INSERT INTO docs (name, length) VALUES ('{doc_name}', {doc_length})")
doc_id = self.cursor.lastrowid
for term, frequency in terms.items():
rows = self.cursor.execute(f"SELECT termid FROM dict WHERE term = '{term}'")
if rows > 0:
term_id, = self.cursor.fetchone()
self.cursor.execute(f"UPDATE dict SET df = df + 1 WHERE termid = {term_id}")
else:
self.cursor.execute(f"INSERT INTO dict (term, df) VALUES ('{term}', 1)")
term_id = self.cursor.lastrowid
self.cursor.execute(f"INSERT INTO terms VALUES ({term_id}, {doc_id}, {frequency})")
self.cursor.execute('COMMIT')
def bulk_index(self, filename: str):
try:
with open(filename) as _file:
data = json.load(_file)
except json.JSONDecodeError:
print('[!] Invalid input file!')
return
self.cursor.execute('SELECT * FROM dict')
dict_table = pd.DataFrame(self.cursor.fetchall(), columns=['termid', 'term', 'df'])
self.cursor.execute('SELECT * FROM terms')
term_table = pd.DataFrame(self.cursor.fetchall(), columns=['termid', 'docid', 'count'])
self.cursor.execute('SELECT * FROM docs')
doc_table = pd.DataFrame(self.cursor.fetchall(), columns=['docid', 'name', 'length'])
if not dict_table.empty:
dict_table.set_index('termid', inplace=True)
docs = []
docid_start = 1 if doc_table.empty else doc_table['docid'].max() + 1
start = time.time()
for i, document in enumerate(data):
docid = docid_start + i
doc_terms = self.get_terms(document['body'])
new_dict = pd.DataFrame([{
'term': term,
'df': 0,
} for term in doc_terms])
dict_table = (pd.concat([dict_table, new_dict], ignore_index=True)
.drop_duplicates('term'))
dict_table.loc[dict_table['term'].isin(doc_terms), 'df'] += 1
new_terms = dict_table.loc[dict_table['term'].isin(doc_terms)].copy()
new_terms['termid'] = new_terms.index
new_terms['docid'] = np.repeat(docid, len(doc_terms))
new_terms = (new_terms.replace({'term': doc_terms})
.rename(columns={'term': 'count'})[['termid', 'docid', 'count']])
term_table = pd.concat([term_table, new_terms], ignore_index=True)
docs.append({
'docid': docid,
'name': document['name'],
'length': len(doc_terms),
})
amount_of_digits = math.floor(math.log10(len(data))) + 1
print(f'{i + 1:>{amount_of_digits}d}/{len(data)}', end='\r')
new_docs = pd.DataFrame(docs, columns=['docid', 'name', 'length'])
doc_table = new_docs if doc_table.empty else pd.concat([doc_table, new_docs], ignore_index=True)
dict_table['termid'] = dict_table.index
dict_table = dict_table[['termid', 'term', 'df']]
dict_table.to_csv('dict.csv', header=False, index=False)
doc_table.to_csv('docs.csv', header=False, index=False)
term_table.to_csv('terms.csv', header=False, index=False)
for table in ('dict', 'docs', 'terms'):
filename = os.path.abspath(f'{table}.csv')
self.cursor.execute(f'DELETE FROM {table}')
self.cursor.execute(f"COPY INTO {table} FROM '{filename}'")
os.remove(f'{table}.csv')
self.cursor.execute('COMMIT')
current = time.time()
print(f'Indexed {len(data)} documents in {current - start:.2f} seconds!')
def search(self, query):
self.cursor.execute(query)
return self.cursor.fetchmany(10)
def clear(self):
self.cursor.execute('TRUNCATE terms RESTART IDENTITY')
self.cursor.execute('TRUNCATE docs RESTART IDENTITY')
self.cursor.execute('TRUNCATE dict RESTART IDENTITY')
self.cursor.execute('COMMIT')
def __str__(self):
table_mapping = {
'dict': ['termid', 'term', 'df'],
'docs': ['docid', 'name', 'length'],
'terms': ['termid', 'docid', 'count'],
}
rows = []
for table, col_names in table_mapping.items():
rows.append([table])
rows.append(['-' * (11 * len(col_names) - 1)])
rows.append(col_names)
rows.append(['-' * 10 for _ in range(len(col_names))])
amount = self.cursor.execute(f'SELECT * FROM {table}')
rows.extend(self.cursor.fetchmany(10))
if amount > 10:
rows.append(['...' for _ in range(len(col_names))])
rows.append('')
return '\n'.join([' '.join([f'{value:>10}' for value in row]) for row in rows])
import argparse
import time
from index import Index
from search import Search
......@@ -11,11 +12,23 @@ def bulk_index(index: Index, args: argparse.Namespace):
def query_index(index: Index, args: argparse.Namespace):
query_terms = args.terms
iterations = args.iterations
search = Search(index)
search.search(query_terms)
# TODO use query terms to query index
if iterations:
times = []
for _ in range(iterations):
start = time.time()
search.search(query_terms)
times.append(time.time() - start)
avg_time = sum(times) / len(times)
print(f'Average query time over {iterations} iterations: {avg_time:.3f}s')
else:
result = search.search(query_terms)
print(result)
def dump_index(index: Index, args: argparse.Namespace):
......@@ -31,6 +44,8 @@ def main():
description='OldDuck - A Python implementation of OldDog, using DuckDB')
parser.add_argument('database', help='The database file to use')
parser.add_argument('-e', '--engine', help='The database engine to use',
choices=('duckdb', 'monetdb'), default='duckdb')
subparsers = parser.add_subparsers(dest='command')
subparsers.required = True
......@@ -40,6 +55,8 @@ def main():
parser_index.set_defaults(func=bulk_index)
parser_query = subparsers.add_parser('query')
parser_query.add_argument('-i', '--iterations', help='Number of iterations',
type=int, default=0)
parser_query.add_argument('terms', help='The query terms', nargs='*')
parser_query.set_defaults(func=query_index)
......@@ -51,7 +68,7 @@ def main():
args = parser.parse_args()
index = Index(args.database)
index = Index.get_index(args.engine, args.database)
args.func(index, args)
......
......@@ -42,6 +42,18 @@ numpy = ">=1.13.3"
python-dateutil = ">=2.6.1"
pytz = ">=2017.2"
[[package]]
category = "main"
description = "Native MonetDB client Python API"
name = "pymonetdb"
optional = false
python-versions = "*"
version = "1.2.1"
[package.dependencies]
six = ">=1.12.0"
typing = "*"
[[package]]
category = "main"
description = "Extensions to the standard Python datetime module"
......@@ -69,8 +81,16 @@ optional = false
python-versions = ">=2.6, !=3.0.*, !=3.1.*"
version = "1.13.0"
[[package]]
category = "main"
description = "Type Hints for Python"
name = "typing"
optional = false
python-versions = "*"
version = "3.7.4.1"
[metadata]
content-hash = "7ef45e999a2464d5a8aeb70a370dbd0e2806b146e945e1ee679294a69d61c430"
content-hash = "5c45c379f3047a08f8f0ba0e5943dfd5b17510016c4125ff449b1225521ed14a"
python-versions = "^3.6"
[metadata.hashes]
......@@ -78,6 +98,8 @@ duckdb = ["2272cc1f8a6496b1e1c38dac09d5de054bb5ec668a8767166f7b0254f7d61275", "2
nltk = ["a08bdb4b8a1c13de16743068d9eb61c8c71c2e5d642e8e08205c528035843f82", "bed45551259aa2101381bbdd5df37d44ca2669c5c3dad72439fa459b29137d94"]
numpy = ["0b0dd8f47fb177d00fa6ef2d58783c4f41ad3126b139c91dd2f7c4b3fdf5e9a5", "25ffe71f96878e1da7e014467e19e7db90ae7d4e12affbc73101bcf61785214e", "26efd7f7d755e6ca966a5c0ac5a930a87dbbaab1c51716ac26a38f42ecc9bc4b", "28b1180c758abf34a5c3fea76fcee66a87def1656724c42bb14a6f9717a5bdf7", "2e418f0a59473dac424f888dd57e85f77502a593b207809211c76e5396ae4f5c", "30c84e3a62cfcb9e3066f25226e131451312a044f1fe2040e69ce792cb7de418", "4650d94bb9c947151737ee022b934b7d9a845a7c76e476f3e460f09a0c8c6f39", "4dd830a11e8724c9c9379feed1d1be43113f8bcce55f47ea7186d3946769ce26", "4f2a2b279efde194877aff1f76cf61c68e840db242a5c7169f1ff0fd59a2b1e2", "62d22566b3e3428dfc9ec972014c38ed9a4db4f8969c78f5414012ccd80a149e", "669795516d62f38845c7033679c648903200980d68935baaa17ac5c7ae03ae0c", "75fcd60d682db3e1f8fbe2b8b0c6761937ad56d01c1dc73edf4ef2748d5b6bc4", "9395b0a41e8b7e9a284e3be7060db9d14ad80273841c952c83a5afc241d2bd98", "9e37c35fc4e9410093b04a77d11a34c64bf658565e30df7cbe882056088a91c1", "a0678793096205a4d784bd99f32803ba8100f639cf3b932dc63b21621390ea7e", "b46554ad4dafb2927f88de5a1d207398c5385edbb5c84d30b3ef187c4a3894d8", "c867eeccd934920a800f65c6068acdd6b87e80d45cd8c8beefff783b23cdc462", "dd0667f5be56fb1b570154c2c0516a528e02d50da121bbbb2cbb0b6f87f59bc2", "de2b1c20494bdf47f0160bd88ed05f5e48ae5dc336b8de7cfade71abcc95c0b9", "f1df7b2b7740dd777571c732f98adb5aad5450aee32772f1b39249c8a50386f6", "ffca69e29079f7880c5392bf675eb8b4146479d976ae1924d01cd92b04cccbcc"]
pandas = ["00dff3a8e337f5ed7ad295d98a31821d3d0fe7792da82d78d7fd79b89c03ea9d", "22361b1597c8c2ffd697aa9bf85423afa9e1fcfa6b1ea821054a244d5f24d75e", "255920e63850dc512ce356233081098554d641ba99c3767dde9e9f35630f994b", "26382aab9c119735908d94d2c5c08020a4a0a82969b7e5eefb92f902b3b30ad7", "33970f4cacdd9a0ddb8f21e151bfb9f178afb7c36eb7c25b9094c02876f385c2", "4545467a637e0e1393f7d05d61dace89689ad6d6f66f267f86fff737b702cce9", "52da74df8a9c9a103af0a72c9d5fdc8e0183a90884278db7f386b5692a2220a4", "61741f5aeb252f39c3031d11405305b6d10ce663c53bc3112705d7ad66c013d0", "6a3ac2c87e4e32a969921d1428525f09462770c349147aa8e9ab95f88c71ec71", "7458c48e3d15b8aaa7d575be60e1e4dd70348efcd9376656b72fecd55c59a4c3", "78bf638993219311377ce9836b3dc05f627a666d0dbc8cec37c0ff3c9ada673b", "8153705d6545fd9eb6dd2bc79301bff08825d2e2f716d5dced48daafc2d0b81f", "975c461accd14e89d71772e89108a050fa824c0b87a67d34cedf245f6681fc17", "9962957a27bfb70ab64103d0a7b42fa59c642fb4ed4cb75d0227b7bb9228535d", "adc3d3a3f9e59a38d923e90e20c4922fc62d1e5a03d083440468c6d8f3f1ae0a", "bbe3eb765a0b1e578833d243e2814b60c825b7fdbf4cdfe8e8aae8a08ed56ecf", "df8864824b1fe488cf778c3650ee59c3a0d8f42e53707de167ba6b4f7d35f133", "e45055c30a608076e31a9fcd780a956ed3b1fa20db61561b8d88b79259f526f7", "ee50c2142cdcf41995655d499a157d0a812fce55c97d9aad13bc1eef837ed36c"]
pymonetdb = ["0e27358ce7c7c9c7a7753dce96a7aef5fd81ade023baf289bd19ee3bfaa56856", "e44cdcbd3e7de6e1e550a92aedd740a44fb9007090cf8003f5764416f87f371b"]
python-dateutil = ["73ebfe9dbf22e832286dafa60473e4cd239f8592f699aa5adaf10050e6e1823c", "75bb3f31ea686f1197762692a9ee6a7550b59fc6ca3a1f4b5d7e32fb98e2da2a"]
pytz = ["1c557d7d0e871de1f5ccd5833f60fb2550652da6be2693c1e02300743d21500d", "b02c06db6cf09c12dd25137e563b31700d3b80fcc4ad23abb7a315f2789819be"]
six = ["1f1b7d42e254082a9db6279deae68afb421ceba6158efa6131de7b3003ee93fd", "30f610279e8b2578cab6db20741130331735c781b56053c59c4076da27f06b66"]
typing = ["91dfe6f3f706ee8cc32d38edbbf304e9b7583fb37108fef38229617f8b3eba23", "c8cabb5ab8945cd2f54917be357d134db9cc1eb039e59d1606dc1e60cb1d9d36", "f38d83c5a7a7086543a0f649564d661859c5146a85775ab90c0d2f93ffaa9714"]
......@@ -8,6 +8,7 @@ authors = ["Your Name <you@example.com>"]
python = "^3.6"
duckdb = "^0.1.1"
nltk = "^3.4"
pymonetdb = "^1.2"
[tool.poetry.dev-dependencies]
......
......@@ -8,7 +8,7 @@ def bm25(terms, disjunctive=False):
qterms AS (SELECT termid, docid, count FROM terms
WHERE termid IN (SELECT * FROM termids)),
subscores AS (SELECT docs.docid, length, term_tf.termid,
tf, df, (log(((SELECT COUNT(*) FROM docs WHERE length > 0)-df+0.5)/(df+0.5))*((tf*(1.2+1)/
tf, df, (ln(((SELECT COUNT(*) FROM docs WHERE length > 0)-df+0.5)/(df+0.5))*((tf*(1.2+1)/
(tf+1.2*(1-0.75+0.75*(length/(SELECT AVG(length) FROM docs WHERE length > 0))))))) AS subscore
FROM (SELECT termid, docid, count AS tf FROM qterms) AS term_tf
JOIN (SELECT docid FROM qterms
......
from index import Index
import query
import re
class Search:
......@@ -17,5 +16,4 @@ class Search:
else:
raise NotImplementedError(f'Search method "{method}" was not implemented')
result = self.index.search(sql_query)
print(result)
return self.index.search(sql_query)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment