pyimport | questions = [re.sub(r'|\|+', '', q, flags=re.IGNORECASE).strip() | Cell 5 | Search

The code imports necessary libraries and defines a schema to build a Whoosh index, which is then used to store and retrieve data. It allows for searching by ID, filename, and fuzzy search by questions, with the ability to add more strategies as needed.

Cell 4

import os
import json
import datetime
from whoosh.index import create_in
from whoosh.fields import Schema, TEXT, ID, DATETIME, KEYWORD
from whoosh.qparser import QueryParser, MultifieldParser
from whoosh.writing import AsyncWriter
from whoosh.query import Term

# Define schema for Whoosh index
schema = Schema(markdown=TEXT(stored=True), language=TEXT(stored=True), mtime=DATETIME(stored=True), id=ID(stored=True), questions=TEXT(stored=True), filename=TEXT(stored=True), code=TEXT(stored=True))

# Ensure index directory exists
if not os.path.exists(".cache"):
    os.mkdir(".cache")
    index = create_in(".cache", schema)
else:
    from whoosh.index import open_dir
    index = open_dir(".cache")

def store_in_whoosh(cells):
    """Stores extracted cells in Whoosh index."""
    writer = AsyncWriter(index)
    for cell in cells:
        if 'code' in cell: # and cell["language"] == "python":
            # print(cell["questions"], cell["filename"])
            writer.add_document(markdown=cell["markdown"], language=cell["language"], mtime=cell["mtime"], id=cell["id"], questions="\n".join(cell["questions"]), filename=cell["filename"], code=cell["code"])
            # writer.update_document(id=cell["id"], questions=" ".join(cell["questions"]), filename=cell["filename"], code=cell["code"])
    writer.commit()

globals()['first'] = True
def interpret(question):

    if globals()['first']:
        scan_directory(os.path.join(os.path.dirname(__file__), '..'), 3)
        globals()['first'] = False

    with index.searcher() as searcher:
        query = Term("id", question)
        results = searcher.search(query)  # Adjust limit as needed
        
        if not results:
            query = Term("filename", question)
            results = searcher.search(query)  # Adjust limit as needed

        if not results:
            query = QueryParser("questions", index.schema).parse(question)  # Fuzzy search
            results = searcher.search(query)

        return [{"id": r["id"], "filename": r["filename"], "code": r["code"], "language": r["language"], "markdown": r["markdown"], "questions": r["questions"].split('\n')} for r in results]

def scan_directory(directory, limit):
    from Core import cache_cells
    """Recursively scans a directory for notebooks and stores extracted cells in Whoosh index."""
    all_cells = []
    
    for root, _, files in os.walk(directory):
        for file in files:
            if file.startswith("."):
                continue

            if os.path.basename(root).startswith("."):
                continue

            if file.endswith(".ipynb"):

                # TODO: compare date time and delete notebooks only if outdated
                #parser = QueryParser("date", schema=ix.schema)
                #query = DateRange("date", datetime(1, 1, 1), datetime())
                #results = searcher.search(query)

                notebook_path = os.path.join(root, file)
                parser = QueryParser("filename", schema=schema)
                query = parser.parse(os.path.abspath(notebook_path))
                results = index.searcher().search(query)
                if(not results or len(results) == 0
                    or results[0]['mtime'] < datetime.datetime.fromtimestamp(os.path.getmtime(os.path.abspath(notebook_path)))):
                    print("replacing: ", notebook_path)
                    writer = AsyncWriter(index)
                    writer.delete_by_query(query, index.searcher())
                    writer.commit()
                    all_cells.extend(cache_cells(notebook_path))
            

    store_in_whoosh(all_cells)
    print(f"Stored {len(all_cells)} cells in Whoosh index.")


__all__ = {
  "scan_directory": scan_directory,
  "interpret": interpret,
}

What the code could have been:

pip install whoosh

Code Breakdown

Importing Libraries and Defining Schema

The code begins by importing various libraries, including os, json, datetime, and several from the whoosh library for building an index. It then defines a schema for the index using Schema from whoosh.fields, which consists of the following fields:

Index Creation and Modification

The code then checks if the index directory exists at ".cache". If it doesn't, it creates the directory and initializes the index with the defined schema. If the directory exists, it opens the existing index.

Storing Data in Whoosh Index

The store_in_whoosh function takes a list of cells as input and stores their extracted data in the Whoosh index. It uses an AsyncWriter to add documents to the index.

Interpreting Questions

The interpret function takes a question as input and attempts to find matching documents in the Whoosh index. It uses three strategies:

  1. Search by ID
  2. Search by filename
  3. Fuzzy search by questions

It uses Term from whoosh.query to create a query for each strategy and searches the index using searcher from the index object.

Miscellaneous

The code sets a global variable first to True and then sets it to False after a directory scan. However, this code seems unnecessary and can be removed.

Note that the code has some TODOs and comments indicating that it might need to be adjusted to suit specific needs, such as handling Python code specifically or adjusting the limit for the search results.