Commit 6e64d073 authored by Gijs Hendriksen's avatar Gijs Hendriksen

Improve bulk index, implement querying

parent f5572c0e
import duckdb
import json
import os
import numpy as np
import pandas as pd
from collections import defaultdict
from nltk import corpus
from nltk import corpus, word_tokenize
from typing import List
import time
class Index:
......@@ -52,8 +55,8 @@ class Index:
def get_terms(self, body: str):
terms = defaultdict(int)
for term in body.lower().split():
if term not in self.stopwords:
for term in word_tokenize(body.lower()):
if term not in self.stopwords and term.isalpha():
terms[term] += 1
return terms
......@@ -66,6 +69,10 @@ class Index:
self.cursor.execute(f"INSERT INTO docs VALUES ({doc_id}, '{doc_name}', {doc_length})")
term_freqs = {}
start = time.time()
for term, frequency in terms.items():
term_id = self.cursor.execute(f"SELECT termid FROM dict WHERE term = '{term}'").fetchone()
......@@ -76,7 +83,16 @@ class Index:
term_id = term_id[0]
self.cursor.execute(f"UPDATE dict SET df = df + 1 WHERE termid = {term_id}")
self.cursor.execute(f"INSERT INTO terms VALUES ({term_id}, {doc_id}, {frequency})")
term_freqs[term_id] = frequency
end = time.time()
print(f'{end - start}s for {len(terms)} terms')
term_values = ', '.join([f'{term_id, doc_id, frequency}' for term_id, frequency in term_freqs.items()])
self.cursor.execute(f"INSERT INTO terms VALUES {term_values}")
print(f'{time.time() - end}s for inserting')
def bulk_index(self, filename: str):
try:
......@@ -86,8 +102,56 @@ class Index:
print('[!] Invalid input file!')
return
for document in data:
self.index(document)
dict_table = self.cursor.execute('SELECT * FROM dict').fetchdf()
term_table = self.cursor.execute('SELECT * FROM terms').fetchdf()
doc_table = self.cursor.execute('SELECT * FROM docs').fetchdf()
docs = []
docid_start = 1 if doc_table.empty else doc_table['docid'].max() + 1
start = time.time()
for i, document in enumerate(data):
docid = docid_start + i
doc_terms = self.get_terms(document['body'])
new_dict = pd.DataFrame([{
'term': term,
'df': 0,
} for term in doc_terms])
dict_table = (pd.concat([dict_table, new_dict], ignore_index=True)
.drop_duplicates('term'))
dict_table.loc[dict_table['term'].isin(doc_terms), 'df'] += 1
new_terms = dict_table.loc[dict_table['term'].isin(doc_terms)].copy()
new_terms['termid'] = new_terms.index
new_terms['docid'] = np.repeat(docid, len(doc_terms))
new_terms = new_terms.replace({'term': doc_terms}).rename(columns={'term': 'count'}).drop('df', 1)
term_table = pd.concat([term_table, new_terms], ignore_index=True)
docs.append({
'docid': docid,
'name': document['name'],
'length': len(doc_terms),
})
print(i)
new_docs = pd.DataFrame(docs, columns=['docid', 'name', 'length'])
doc_table = new_docs if doc_table.empty else pd.concat([doc_table, new_docs], ignore_index=True)
print(f'Done in {time.time() - start:.4f} seconds!')
# for i, document in enumerate(data):
# self.index(document)
# print(i)
def search(self, query):
self.cursor.execute(query)
return self.cursor.fetchdf()
def clear(self):
self.cursor.execute("DELETE FROM terms")
......
from argparse import ArgumentParser
import argparse
from index import Index
from search import Search
def bulk_index(index: Index, args: argparse.Namespace):
filename = args.data
index.bulk_index(filename)
print(index)
def query_index(index: Index, args: argparse.Namespace):
query_terms = args.terms
search = Search(index)
search.search(query_terms)
# TODO use query terms to query index
def dump_index(index: Index, args: argparse.Namespace):
print(index)
def clear_index(index: Index, args: argparse.Namespace):
index.clear()
......@@ -23,20 +30,23 @@ def main():
parser = argparse.ArgumentParser(prog='old_duck',
description='OldDuck - A Python implementation of OldDog, using DuckDB')
subparsers = parser.add_subparsers()
parser.add_argument('database', help='The database file to use')
subparsers = parser.add_subparsers(dest='command')
subparsers.required = True
parser_index = subparsers.add_parser('index')
parser_index.add_argument('database', help='The database file to index the files to')
parser_index.add_argument('data', help='The file to read and index documents from')
parser_index.set_defaults(func=bulk_index)
parser_query = subparsers.add_parser('query')
parser_query.add_argument('database', help='The database file to query')
parser_query.add_argument('terms', help='The query terms', nargs='*')
parser_query.set_defaults(func=query_index)
parser_clear = subparsers.add_parser('dump')
parser_clear.set_defaults(func=dump_index)
parser_clear = subparsers.add_parser('clear')
parser_clear.add_argument('database', help='The database file to clear')
parser_clear.set_defaults(func=clear_index)
args = parser.parse_args()
......
def bm25(terms, disjunctive=False):
term_list = ', '.join([f"'{term}'" for term in terms])
constraint = '' if disjunctive else 'HAVING COUNT(distinct termid) = (SELECT COUNT(*) FROM termids)'
return f"""
WITH termids AS (SELECT termid FROM dict WHERE term IN ({term_list})),
qterms AS (SELECT termid, docid, count FROM terms
WHERE termid IN (SELECT * FROM termids)),
subscores AS (SELECT docs.docid, length, term_tf.termid,
tf, df, (log(((SELECT COUNT(*) FROM docs WHERE length > 0)-df+0.5)/(df+0.5))*((tf*(1.2+1)/
(tf+1.2*(1-0.75+0.75*(length/(SELECT AVG(length) FROM docs WHERE length > 0))))))) AS subscore
FROM (SELECT termid, docid, count AS tf FROM qterms) AS term_tf
JOIN (SELECT docid FROM qterms
GROUP BY docid {constraint})
AS cdocs ON term_tf.docid = cdocs.docid
JOIN docs ON term_tf.docid=docs.docid
JOIN dict ON term_tf.termid=dict.termid)
SELECT scores.docid, score FROM (SELECT docid, sum(subscore) AS score
FROM subscores GROUP BY docid) AS scores JOIN docs ON
scores.docid=docs.docid ORDER BY score DESC;
"""
def tfidf(terms, disjunctive=False):
return ''
from index import Index
import query
import re
class Search:
index: Index
def __init__(self, index: Index):
self.index = index
def search(self, terms, method='bm25'):
if method == 'bm25':
sql_query = query.bm25(terms)
elif method == 'tfidf':
sql_query = query.tfidf(terms)
else:
raise NotImplementedError(f'Search method "{method}" was not implemented')
print(sql_query)
print(re.sub(r'\s+', ' ', sql_query))
print(self.index.search(re.sub(r'\s+', ' ', sql_query.strip())))
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment