Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fts sqlite3 fts5 #128

Merged
merged 3 commits into from
Jan 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 180 additions & 1 deletion puren_tonbo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import logging
import os
import re
import sqlite3 # TODO make optional?
import subprocess
import sys
import tempfile
Expand Down Expand Up @@ -931,6 +932,7 @@ def grep_string(search_text, regex_object, highlight_text_start=None, highlight_
"""Given input string "search_text" and compiled regex regex_object
search for regex matches and return list of tuples of line number and text for that line
for matches, prefix and postfix with highlight_text_start, highlight_text_stop
files_with_matches?? - only return filename, not line matches, stops after first line hit. Similar to grep -l, --files-with-matches
"""
def process_matches(match):
return highlight_text_start + match.group(0) + highlight_text_stop
Expand Down Expand Up @@ -1669,17 +1671,150 @@ def directory_contents(dirname, filename_filter=None):

##############################

# NIH FTS Abstraction.. focused on plain text notes

class FullTextSearch:
def __init__(self, index_location):
"""Potentially opens index
"""
self.index_location = index_location # unspecified type, could be; URL (auth handled by implementation), file, directory, ....
raise NotImplementedError()

def index_close(self):
raise NotImplementedError()

def index_delete(self):
raise NotImplementedError()

def create_index_start(self):
raise NotImplementedError()

def create_index_end(self):
raise NotImplementedError()

def add_to_index(self, filename, contents=None, contents_size=None, mtime=None):
"""Add to index self.index_location (from scratch, no update support API.... yet.)
"""
if contents and not contents_size:
contents_size = length(contents)
raise NotImplementedError()

def search(self, search_term, find_only_filename=False, files_with_matches=False, highlight_text_start=None, highlight_text_stop=None):
"""Search self.index_location for `search_term`
"""
raise NotImplementedError()

class FullTextSearchSqlite:
def __init__(self, index_location):
"""index_location - SQLite database, probably a pathname could be memory
SQLite FTS5 - https://www.sqlite.org/fts5.html - TODO FTS4/FTS3 fallback support?
"""
self.index_location = index_location # unspecified type, could be; URL (auth handled by implementation), file, directory, ....
con = sqlite3.connect(index_location)
#if index_location == ':memory:':
self.db = con
cur = con.cursor()
self.cursor = cur

cur.execute('pragma compile_options;')
available_pragmas = cur.fetchall()

if ('ENABLE_FTS5',) not in available_pragmas:
raise NotImplementedError('FTS5 missing %r' % (available_pragmas, ) )

def index_close(self):
self.db.close()

def index_delete(self):
cur = self.cursor
try:
cur.execute("""DROP TABLE note""")
except: # sqlite3.OperationalError: no such table: note
pass

def create_index_start(self):
cur = self.cursor
cur.execute('CREATE VIRTUAL TABLE note USING fts5(filename, contents)') # TODO size and date unindexed

def create_index_end(self):
self.db.commit()

def add_to_index(self, filename, contents=None): #, contents_size=None, mtime=None):
"""Add to index self.index_location (from scratch, no update support API.... yet.)
"""
contents_size=None # FIXME
if contents and not contents_size:
contents_size = len(contents)
cur = self.cursor
cur.execute("""INSERT INTO note (filename, contents) VALUES (?, ?)""", (filename, contents) )

def search(self, search_term, find_only_filename=False, files_with_matches=False, highlight_text_start=None, highlight_text_stop=None):
"""Search self.index_location for `search_term`
"""
# FIXME here and grep find_only_filename=False == files_with_matches? duplicate?
if find_only_filename:
raise NotImplementedError('find_only_filename')
if files_with_matches:
raise NotImplementedError('files_with_matches')
if highlight_text_start is None:
highlight_text_start = '**'
if highlight_text_stop is None:
highlight_text_stop = '**'
# TODO doc demos
context_distance = 3 # fts_search aesop king - only shows aesop
context_distance = 6 # fts_search aesop king - shows both
context_distance = 10

cur = self.cursor
'''
cur.execute("""SELECT
snippet(note, 0, '<b>', '</b>', '...', ?) as title,
snippet(note, 1, '<b>', '</b>', '...', ?) as body
FROM note(?)
ORDER BY rank""",
(context_distance, context_distance, search_term) )
cur.execute("""SELECT
snippet(note, 0, '<b>', '</b>', '...', 10) as title,
snippet(note, 1, '<b>', '</b>', '...', 10) as body
FROM note(?)
ORDER BY rank""",
(search_term, ) )
# works
cur.execute("""SELECT
snippet(note, 0, '<b>', '</b>', '...', ?) as title,
snippet(note, 1, '<b>', '</b>', '...', ?) as body
FROM note(?)
ORDER BY rank""",
(context_distance, context_distance, search_term, ) )
'''

cur.execute("""SELECT
filename,
snippet(note, 0, ?, ?, '...', ?) as title,
snippet(note, 1, ?, ?, '...', ?) as body
FROM note(?)
ORDER BY rank""",
(highlight_text_start, highlight_text_stop, context_distance, highlight_text_start, highlight_text_stop, context_distance, search_term, ) )

return cur.fetchall()

##############################


class FileSystemNotes(BaseNotes):
"""PyTombo notes on local file system, just like original Windows Tombo
"""

def __init__(self, note_root, note_encoding=None):
def __init__(self, note_root, note_encoding=None, fts_class=FullTextSearchSqlite): # FIXME default fts to None?
note_root = self.unicode_path(note_root) # either a file or a directory of files
self.note_root = os.path.abspath(note_root)
self.abs_ignore_path = os.path.join(self.note_root, '') ## add trailing slash.. unless this is a file
#self.note_encoding = note_encoding or 'utf8'
self.note_encoding = note_encoding or ('utf8', 'cp1252')
self.fts_class = fts_class # FIXME how do parameters get passed in?
self.fts_instance = None

def abspath2relative(self, input_path):
"""validate absolute native path, return relative path with with (leading) self.note_root removed.
Expand Down Expand Up @@ -1760,6 +1895,50 @@ def directory_contents(self, sub_dir=None):
sub_dir = self.note_root
return directory_contents(dirname=sub_dir)


def fts_search(self, s, highlight_text_start=None, highlight_text_stop=None): # FIXME API
fts_instance = self.fts_instance
return fts_instance.search(search_term=s, highlight_text_start=highlight_text_start, highlight_text_stop=highlight_text_stop) # or yield...

def fts_index(self, sub_dir=None, get_password_callback=None):
if sub_dir:
raise NotImplementedError('sub_dir')
search_path = self.note_root
recurse_notes_func = self.recurse_notes

"""
search_encrypted = False
#search_encrypted = True # TODO test
if search_encrypted:
if search_encrypted == 'only':
is_note_filename_filter = encrypted_filename_filter
else:
is_note_filename_filter = supported_filename_filter
"""
if get_password_callback:
is_note_filename_filter = supported_filename_filter
else:
is_note_filename_filter = plaintext_filename_filter


# FIXME store constucted
if self.fts_instance:
fts_instance = self.fts_instance
else:
fts_instance = self.fts_class(':memory:')
self.fts_instance = fts_instance
fts_instance.index_delete()
fts_instance.create_index_start()
for tmp_filename in recurse_notes_func(search_path, is_note_filename_filter):
filename = self.abspath2relative(tmp_filename)
print('index %r' % filename)
#contents = None
contents = self.note_contents(filename, get_pass=get_password_callback, dos_newlines=True)
fts_instance.add_to_index(filename, contents=contents)
fts_instance.create_index_end()


# TODO remove (or depreicate) search_term_is_a_regex and replace with search_type=(plain, regex, fts)
# FIXME Consider adding dictionary parameter for search options rather than new keywords each time?
# (self, search_term, search_term_is_a_regex=True, ignore_case=True, search_encrypted=False, get_password_callback=None, progess_callback=None, find_only_filename=None, index_name=None, note_encoding=None):
# (search_term, search_term_is_a_regex=True , ignore_case=True, search_encrypted=False, get_password_callback=None, progess_callback=None, find_only_filename=None, index_name=None, note_encoding=None):
Expand Down
41 changes: 41 additions & 0 deletions puren_tonbo/tools/ptig.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def __init__(self, paths_to_search=None, pt_config=None, grep_options=None): #
self.bookmarks = {}
self.cache = None
self.paths_to_search = []
self.paths_to_search_instances = []
#import pdb ; pdb.set_trace()
paths_to_search = paths_to_search or ['.']
for note_path in paths_to_search:
Expand Down Expand Up @@ -167,6 +168,46 @@ def do_exit(self, line=None):
do_bye = do_exit
do_EOF = do_exit

def do_fts_index(self, line=None):
if line:
print('Parameters not supported') # TODO handle and also cwd support
return
note_encoding = self.pt_config['codec']
password_func = self.grep_options.password or puren_tonbo.caching_console_password_prompt
password_func = None # no password #FIXME include encrypted option

self.paths_to_search_instances = []
for note_root in self.paths_to_search:
notes = puren_tonbo.FileSystemNotes(note_root, note_encoding)
# FIXME handle password from environment, e.g. env PT_PASSWORD=password (keyring)
# FIXME handle cancel from password prompt
notes.fts_index(get_password_callback=password_func)
self.paths_to_search_instances.append(notes)

def do_fts_search(self, line=None):
# this is temporary, ideally fts should be callable from the regular search interface - self.file_hits needs setting up
if not line:
print('Need a search term') # TODO show help (currently missing)?
return

if self.grep_options.use_color:
highlight_text_start, highlight_text_stop = ptgrep.color_linenum, ptgrep.color_reset
highlight_text_start, highlight_text_stop = ptgrep.color_searchhit, ptgrep.color_reset
else:
highlight_text_start, highlight_text_stop = None, None # revisit this

print('')
# TODO time and report, counts and elapsed time
self.file_hits = []
for notes in self.paths_to_search_instances:
for counter, hit in enumerate(notes.fts_search(line, highlight_text_start=highlight_text_start, highlight_text_stop=highlight_text_stop), start=1):
#print('hit %r' % (hit,) )
#print('%s:%s' % hit)
filename, filename_highlighted, note_text = hit
note_text = note_text.replace('\n', ' ')
print('fts[%d] %s:%s:%s' % (counter, filename, filename_highlighted, note_text))
# FIXME need raw filename
self.file_hits.append(filename) # not sure how this can work for multi dir search - nor for "results" directive as parent dir is lost

def do_ls(self, line=None):
if line:
Expand Down
Loading