index.py 14 KB
Newer Older
Gijs Hendriksen's avatar
Gijs Hendriksen committed
1
import duckdb
2
import json
Gijs Hendriksen's avatar
Gijs Hendriksen committed
3
import os
4
import math
Gijs Hendriksen's avatar
Gijs Hendriksen committed
5
import pymonetdb
6 7
import numpy as np
import pandas as pd
Gijs Hendriksen's avatar
Gijs Hendriksen committed
8
from abc import ABC, abstractmethod
9
from collections import defaultdict
10
from nltk import corpus, word_tokenize
Gijs Hendriksen's avatar
Gijs Hendriksen committed
11 12
from pymonetdb.sql import cursors as monet_cursors
from typing import Union
13
import time
Gijs Hendriksen's avatar
Gijs Hendriksen committed
14 15


Gijs Hendriksen's avatar
Gijs Hendriksen committed
16 17
class Index(ABC):
    cursor: Union[duckdb.Cursor, monet_cursors.Cursor]
18

Gijs Hendriksen's avatar
Gijs Hendriksen committed
19 20 21
    def __init__(self, db):
        self.stopwords = corpus.stopwords.words('english')
        self.init_db(db)
Gijs Hendriksen's avatar
Gijs Hendriksen committed
22

Gijs Hendriksen's avatar
Gijs Hendriksen committed
23 24
    def get_terms(self, body: str):
        terms = defaultdict(int)
Gijs Hendriksen's avatar
Gijs Hendriksen committed
25

Gijs Hendriksen's avatar
Gijs Hendriksen committed
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
        for term in word_tokenize(body.lower()):
            if term not in self.stopwords and term.isalpha():
                terms[term[:32]] += 1

        return terms

    @staticmethod
    def get_index(engine: str, db: str):
        if engine == 'duckdb':
            return DuckDBIndex(db)
        elif engine == 'monetdb':
            return MonetDBIndex(db)
        raise NotImplementedError(f'Engine "{engine}" not implemented!')

    @abstractmethod
    def init_db(self, db: str):
        pass

    @abstractmethod
    def index(self, document: dict):
        pass

    @abstractmethod
    def bulk_index(self, filename: str):
        try:
            with open(filename) as _file:
                data = json.load(_file)
        except json.JSONDecodeError:
            print('[!] Invalid input file!')
            return

        for i, document in enumerate(data):
            self.index(document)

            amount_of_digits = math.floor(math.log10(len(data))) + 1
            print(f'{i + 1:>{amount_of_digits}d}/{len(data)}', end='\r')

    @abstractmethod
    def search(self, query):
        pass

    @abstractmethod
    def clear(self):
        pass
Gijs Hendriksen's avatar
Gijs Hendriksen committed
70

Gijs Hendriksen's avatar
Gijs Hendriksen committed
71 72

class DuckDBIndex(Index):
73 74 75
    def reset_auto_increment(self):
        max_termid = self.cursor.execute('SELECT MAX(termid) FROM terms').fetchone()[0]
        max_termid = 1 if max_termid is None else max_termid + 1
Gijs Hendriksen's avatar
Gijs Hendriksen committed
76

77 78
        max_docid = self.cursor.execute('SELECT MAX(docid) FROM docs').fetchone()[0]
        max_docid = 1 if max_docid is None else max_docid + 1
Gijs Hendriksen's avatar
Gijs Hendriksen committed
79

80 81 82 83 84
        self.cursor.execute('DROP SEQUENCE term_ids')
        self.cursor.execute('DROP SEQUENCE doc_ids')
        self.cursor.execute(f'CREATE SEQUENCE term_ids START WITH {max_termid}')
        self.cursor.execute(f'CREATE SEQUENCE doc_ids START WITH {max_docid}')

Gijs Hendriksen's avatar
Gijs Hendriksen committed
85 86
    def init_db(self, db):
        db_exists = os.path.exists(db)
87

Gijs Hendriksen's avatar
Gijs Hendriksen committed
88
        self.cursor = duckdb.connect(db).cursor()
89

Gijs Hendriksen's avatar
Gijs Hendriksen committed
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
        if not db_exists:
            self.cursor.execute(f'CREATE SEQUENCE term_ids')
            self.cursor.execute(f'CREATE SEQUENCE doc_ids')
            self.cursor.execute('CREATE TABLE dict('
                                'termid INTEGER NOT NULL,'
                                'term VARCHAR NOT NULL,'
                                'df INTEGER NOT NULL)')
            self.cursor.execute('CREATE TABLE docs('
                                'docid INTEGER NOT NULL,'
                                'name VARCHAR NOT NULL,'
                                'length INTEGER NOT NULL)')
            self.cursor.execute('CREATE TABLE terms('
                                'termid INTEGER NOT NULL,'
                                'docid INTEGER NOT NULL,'
                                'count INTEGER NOT NULL)')
        else:
            self.reset_auto_increment()
Gijs Hendriksen's avatar
Gijs Hendriksen committed
107

108
    def index(self, document: dict):
Gijs Hendriksen's avatar
Gijs Hendriksen committed
109
        terms = self.get_terms(document['body'])
110
        doc_name = document['name']
Gijs Hendriksen's avatar
Gijs Hendriksen committed
111
        doc_length = sum(terms.values())
112 113 114 115
        doc_id = self.cursor.execute("SELECT nextval('doc_ids')").fetchone()[0]

        self.cursor.execute(f"INSERT INTO docs VALUES ({doc_id}, '{doc_name}', {doc_length})")

116 117 118 119
        term_freqs = {}

        start = time.time()

120
        for term, frequency in terms.items():
Gijs Hendriksen's avatar
Gijs Hendriksen committed
121 122 123
            term_id = self.cursor.execute(f"SELECT termid FROM dict WHERE term = '{term}'").fetchone()

            if term_id is None:
124 125
                term_id = self.cursor.execute("SELECT nextval('term_ids')").fetchone()[0]
                self.cursor.execute(f"INSERT INTO dict VALUES ({term_id}, '{term}', 1)")
Gijs Hendriksen's avatar
Gijs Hendriksen committed
126
            else:
127 128 129
                term_id = term_id[0]
                self.cursor.execute(f"UPDATE dict SET df = df + 1 WHERE termid = {term_id}")

130 131 132 133 134 135 136 137 138 139
            term_freqs[term_id] = frequency

        end = time.time()

        print(f'{end - start}s for {len(terms)} terms')

        term_values = ', '.join([f'{term_id, doc_id, frequency}' for term_id, frequency in term_freqs.items()])
        self.cursor.execute(f"INSERT INTO terms VALUES {term_values}")

        print(f'{time.time() - end}s for inserting')
140

141 142 143 144 145 146 147
    def bulk_index(self, filename: str):
        try:
            with open(filename) as _file:
                data = json.load(_file)
        except json.JSONDecodeError:
            print('[!] Invalid input file!')
            return
148

149 150 151 152
        dict_table = self.cursor.execute('SELECT * FROM dict').fetchdf()
        term_table = self.cursor.execute('SELECT * FROM terms').fetchdf()
        doc_table = self.cursor.execute('SELECT * FROM docs').fetchdf()

153 154 155
        if not dict_table.empty:
            dict_table.set_index('termid', inplace=True)

156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
        docs = []
        docid_start = 1 if doc_table.empty else doc_table['docid'].max() + 1

        start = time.time()

        for i, document in enumerate(data):
            docid = docid_start + i

            doc_terms = self.get_terms(document['body'])

            new_dict = pd.DataFrame([{
                'term': term,
                'df': 0,
            } for term in doc_terms])

            dict_table = (pd.concat([dict_table, new_dict], ignore_index=True)
                            .drop_duplicates('term'))
            dict_table.loc[dict_table['term'].isin(doc_terms), 'df'] += 1

            new_terms = dict_table.loc[dict_table['term'].isin(doc_terms)].copy()
            new_terms['termid'] = new_terms.index
            new_terms['docid'] = np.repeat(docid, len(doc_terms))
178 179
            new_terms = (new_terms.replace({'term': doc_terms})
                                  .rename(columns={'term': 'count'})[['termid', 'docid', 'count']])
180 181 182 183 184 185

            term_table = pd.concat([term_table, new_terms], ignore_index=True)

            docs.append({
                'docid': docid,
                'name': document['name'],
186
                'length': sum(doc_terms.values()),
187 188
            })

189
            amount_of_digits = math.floor(math.log10(len(data))) + 1
Gijs Hendriksen's avatar
Gijs Hendriksen committed
190
            print(f'{i + 1:>{amount_of_digits}d}/{len(data)}', end='\r')
191 192 193

        new_docs = pd.DataFrame(docs, columns=['docid', 'name', 'length'])
        doc_table = new_docs if doc_table.empty else pd.concat([doc_table, new_docs], ignore_index=True)
194 195 196 197 198 199
        dict_table['termid'] = dict_table.index
        dict_table = dict_table[['termid', 'term', 'df']]

        dict_table.to_csv('dict.csv', header=False, index=False)
        doc_table.to_csv('docs.csv', header=False, index=False)
        term_table.to_csv('terms.csv', header=False, index=False)
200

201 202 203 204
        for table in ('dict', 'docs', 'terms'):
            self.cursor.execute(f'DELETE FROM {table}')
            self.cursor.execute(f"COPY {table} FROM '{table}.csv'")
            os.remove(f'{table}.csv')
205

206 207
        current = time.time()
        print(f'Indexed {len(data)} documents in {current - start:.2f} seconds!')
208 209 210 211

    def search(self, query):
        self.cursor.execute(query)
        return self.cursor.fetchdf()
212 213 214 215 216

    def clear(self):
        self.cursor.execute("DELETE FROM terms")
        self.cursor.execute("DELETE FROM docs")
        self.cursor.execute("DELETE FROM dict")
Gijs Hendriksen's avatar
Gijs Hendriksen committed
217

218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
    def __str__(self):
        dict_rows = self.cursor.execute('SELECT * FROM dict').fetchdf()
        term_rows = self.cursor.execute('SELECT * FROM terms').fetchdf()
        doc_rows = self.cursor.execute('SELECT * FROM docs').fetchdf()

        return '\n'.join([
            'dict',
            '-' * 20,
            str(dict_rows),
            '',
            'terms',
            '-' * 20,
            str(term_rows),
            '',
            'docs',
            '-' * 20,
            str(doc_rows),
        ])
Gijs Hendriksen's avatar
Gijs Hendriksen committed
236 237 238 239 240 241 242 243


class MonetDBIndex(Index):
    def init_db(self, db: str):
        self.cursor = pymonetdb.connect(username='monetdb', password='monetdb',
                                        hostname='localhost', database=db).cursor()

        self.cursor.execute('CREATE TABLE IF NOT EXISTS dict('
244 245 246 247
                            # 'termid INTEGER NOT NULL PRIMARY KEY AUTO_INCREMENT,'
                            'termid INTEGER NOT NULL,'
                            # 'term VARCHAR(32) NOT NULL UNIQUE,'
                            'term VARCHAR(64) NOT NULL,'
Gijs Hendriksen's avatar
Gijs Hendriksen committed
248 249
                            'df INTEGER NOT NULL)')
        self.cursor.execute('CREATE TABLE IF NOT EXISTS docs('
250 251 252
                            # 'docid INTEGER NOT NULL PRIMARY KEY AUTO_INCREMENT,'
                            'docid INTEGER NOT NULL,'
                            'name VARCHAR(64) NOT NULL,'
Gijs Hendriksen's avatar
Gijs Hendriksen committed
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328
                            'length INTEGER NOT NULL)')
        self.cursor.execute('CREATE TABLE IF NOT EXISTS terms('
                            'termid INTEGER NOT NULL,'
                            'docid INTEGER NOT NULL,'
                            'count INTEGER NOT NULL)')

    def index(self, document: dict):
        terms = self.get_terms(document['body'])
        doc_name = document['name'][:32]
        doc_length = sum(terms.values())

        self.cursor.execute(f"INSERT INTO docs (name, length) VALUES ('{doc_name}', {doc_length})")
        doc_id = self.cursor.lastrowid

        for term, frequency in terms.items():
            rows = self.cursor.execute(f"SELECT termid FROM dict WHERE term = '{term}'")
            if rows > 0:
                term_id, = self.cursor.fetchone()
                self.cursor.execute(f"UPDATE dict SET df = df + 1 WHERE termid = {term_id}")
            else:
                self.cursor.execute(f"INSERT INTO dict (term, df) VALUES ('{term}', 1)")
                term_id = self.cursor.lastrowid

            self.cursor.execute(f"INSERT INTO terms VALUES ({term_id}, {doc_id}, {frequency})")
        self.cursor.execute('COMMIT')

    def bulk_index(self, filename: str):
        try:
            with open(filename) as _file:
                data = json.load(_file)
        except json.JSONDecodeError:
            print('[!] Invalid input file!')
            return

        self.cursor.execute('SELECT * FROM dict')
        dict_table = pd.DataFrame(self.cursor.fetchall(), columns=['termid', 'term', 'df'])

        self.cursor.execute('SELECT * FROM terms')
        term_table = pd.DataFrame(self.cursor.fetchall(), columns=['termid', 'docid', 'count'])

        self.cursor.execute('SELECT * FROM docs')
        doc_table = pd.DataFrame(self.cursor.fetchall(), columns=['docid', 'name', 'length'])

        if not dict_table.empty:
            dict_table.set_index('termid', inplace=True)

        docs = []
        docid_start = 1 if doc_table.empty else doc_table['docid'].max() + 1

        start = time.time()

        for i, document in enumerate(data):
            docid = docid_start + i

            doc_terms = self.get_terms(document['body'])

            new_dict = pd.DataFrame([{
                'term': term,
                'df': 0,
            } for term in doc_terms])

            dict_table = (pd.concat([dict_table, new_dict], ignore_index=True)
                          .drop_duplicates('term'))
            dict_table.loc[dict_table['term'].isin(doc_terms), 'df'] += 1

            new_terms = dict_table.loc[dict_table['term'].isin(doc_terms)].copy()
            new_terms['termid'] = new_terms.index
            new_terms['docid'] = np.repeat(docid, len(doc_terms))
            new_terms = (new_terms.replace({'term': doc_terms})
                .rename(columns={'term': 'count'})[['termid', 'docid', 'count']])

            term_table = pd.concat([term_table, new_terms], ignore_index=True)

            docs.append({
                'docid': docid,
                'name': document['name'],
329
                'length': sum(doc_terms.values()),
Gijs Hendriksen's avatar
Gijs Hendriksen committed
330 331 332 333 334 335 336 337 338 339
            })

            amount_of_digits = math.floor(math.log10(len(data))) + 1
            print(f'{i + 1:>{amount_of_digits}d}/{len(data)}', end='\r')

        new_docs = pd.DataFrame(docs, columns=['docid', 'name', 'length'])
        doc_table = new_docs if doc_table.empty else pd.concat([doc_table, new_docs], ignore_index=True)
        dict_table['termid'] = dict_table.index
        dict_table = dict_table[['termid', 'term', 'df']]

340 341 342
        dict_table.to_csv('.monetdb/dict.csv', header=False, index=False)
        doc_table.to_csv('.monetdb/docs.csv', header=False, index=False)
        term_table.to_csv('.monetdb/terms.csv', header=False, index=False)
Gijs Hendriksen's avatar
Gijs Hendriksen committed
343 344 345

        for table in ('dict', 'docs', 'terms'):
            self.cursor.execute(f'DELETE FROM {table}')
346
            self.cursor.execute(f"COPY INTO {table} FROM '/app/{table}.csv' USING DELIMITERS ','")
Gijs Hendriksen's avatar
Gijs Hendriksen committed
347

348
            os.remove(f'.monetdb/{table}.csv')
Gijs Hendriksen's avatar
Gijs Hendriksen committed
349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387

        self.cursor.execute('COMMIT')

        current = time.time()
        print(f'Indexed {len(data)} documents in {current - start:.2f} seconds!')

    def search(self, query):
        self.cursor.execute(query)
        return self.cursor.fetchmany(10)

    def clear(self):
        self.cursor.execute('TRUNCATE terms RESTART IDENTITY')
        self.cursor.execute('TRUNCATE docs RESTART IDENTITY')
        self.cursor.execute('TRUNCATE dict RESTART IDENTITY')
        self.cursor.execute('COMMIT')

    def __str__(self):
        table_mapping = {
            'dict': ['termid', 'term', 'df'],
            'docs': ['docid', 'name', 'length'],
            'terms': ['termid', 'docid', 'count'],
        }

        rows = []

        for table, col_names in table_mapping.items():
            rows.append([table])
            rows.append(['-' * (11 * len(col_names) - 1)])
            rows.append(col_names)
            rows.append(['-' * 10 for _ in range(len(col_names))])

            amount = self.cursor.execute(f'SELECT * FROM {table}')
            rows.extend(self.cursor.fetchmany(10))

            if amount > 10:
                rows.append(['...' for _ in range(len(col_names))])
            rows.append('')

        return '\n'.join([' '.join([f'{value:>10}' for value in row]) for row in rows])