add claude-obsidian
Tests / Hermetic test suite (push) Has been cancelled
Tests / Skill frontmatter validation (push) Has been cancelled

This commit is contained in:
김경종
2026-05-28 10:57:16 +09:00
parent 1b07531a45
commit 72dad72703
205 changed files with 41703 additions and 80 deletions
+108
View File
@@ -0,0 +1,108 @@
#!/usr/bin/env bash
# test_allocate_address.sh — smoke tests for scripts/allocate-address.sh.
#
# Runs in a throwaway temp vault so it never touches the real
# .vault-meta/address-counter.txt. Exits non-zero on any failure.
#
# Usage: bash tests/test_allocate_address.sh
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
VAULT_ROOT="$(dirname "$SCRIPT_DIR")"
ALLOC="$VAULT_ROOT/scripts/allocate-address.sh"
PASS=0
FAIL=0
pass() { echo "OK $1"; PASS=$((PASS+1)); }
fail() { echo "FAIL $1"; FAIL=$((FAIL+1)); }
assert_eq() {
local label="$1" expected="$2" actual="$3"
if [ "$expected" = "$actual" ]; then pass "$label (got $actual)"
else fail "$label: expected '$expected', got '$actual'"
fi
}
# Create a fresh throwaway vault
TMP=$(mktemp -d -t ds-test-XXXXXX)
trap 'rm -rf "$TMP"' EXIT
mkdir -p "$TMP/scripts" "$TMP/wiki"
cp "$ALLOC" "$TMP/scripts/allocate-address.sh"
chmod +x "$TMP/scripts/allocate-address.sh"
cd "$TMP"
# --- Test 1: rebuild on empty vault = 1 ---
OUT=$(./scripts/allocate-address.sh --rebuild 2>&1)
assert_eq "rebuild on empty vault" "Counter rebuilt: next = 1" "$OUT"
assert_eq "counter file value" "1" "$(cat .vault-meta/address-counter.txt)"
# --- Test 2: peek does not increment ---
P1=$(./scripts/allocate-address.sh --peek)
P2=$(./scripts/allocate-address.sh --peek)
assert_eq "peek idempotent" "$P1" "$P2"
# --- Test 3: allocate returns c-000001 and increments ---
A1=$(./scripts/allocate-address.sh)
assert_eq "first alloc" "c-000001" "$A1"
assert_eq "counter after 1 alloc" "2" "$(cat .vault-meta/address-counter.txt)"
# --- Test 4: monotonic sequence ---
A2=$(./scripts/allocate-address.sh)
A3=$(./scripts/allocate-address.sh)
assert_eq "second alloc" "c-000002" "$A2"
assert_eq "third alloc" "c-000003" "$A3"
# --- Test 5: concurrent allocations are unique ---
./scripts/allocate-address.sh --rebuild >/dev/null
for i in $(seq 1 10); do
(./scripts/allocate-address.sh >> concurrent.txt) &
done
wait
UNIQ=$(sort -u concurrent.txt | wc -l)
TOTAL=$(wc -l < concurrent.txt)
assert_eq "10 concurrent allocs: unique count" "10" "$UNIQ"
assert_eq "10 concurrent allocs: total count" "10" "$TOTAL"
# --- Test 6: corrupt counter -> exit 3 ---
echo "not-a-number" > .vault-meta/address-counter.txt
set +e
./scripts/allocate-address.sh > /dev/null 2>&1
EC=$?
set -e
assert_eq "corrupt counter exit" "3" "$EC"
./scripts/allocate-address.sh --rebuild > /dev/null
# --- Test 7: missing counter recovers from max(c-)+1 ---
rm -f .vault-meta/address-counter.txt
# Drop a fake page into wiki/ with a real frontmatter address so rebuild finds it
cat > wiki/fake.md <<'EOF'
---
type: concept
address: c-000500
---
EOF
REC=$(./scripts/allocate-address.sh --peek 2>/dev/null)
assert_eq "recovery from max observed" "501" "$REC"
# --- Test 8: frontmatter-only scan ignores code-block examples ---
rm wiki/fake.md
echo "1" > .vault-meta/address-counter.txt
cat > wiki/doc.md <<'EOF'
---
type: concept
---
# Doc with a code-block example
```yaml
address: c-999999
```
EOF
REBUILT=$(./scripts/allocate-address.sh --rebuild 2>&1)
assert_eq "code-block ignored, rebuild to 1" "Counter rebuilt: next = 1" "$REBUILT"
# --- Summary ---
echo ""
echo "Passed: $PASS"
echo "Failed: $FAIL"
[ "$FAIL" -eq 0 ]
+275
View File
@@ -0,0 +1,275 @@
#!/usr/bin/env python3
"""test_bm25_index.py — hermetic tests for scripts/bm25-index.py.
Covers tokenization (stopwords, punctuation, case), index construction from
synthetic chunk fixtures, and BM25 scoring correctness against a hand-computed
reference. No network, no ollama, no LLM calls.
Usage:
python3 tests/test_bm25_index.py
"""
import importlib.util
import json
import math
import os
import shutil
import subprocess
import sys
import tempfile
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
HELPER = ROOT / "scripts" / "bm25-index.py"
spec = importlib.util.spec_from_file_location("bm25", HELPER)
bm25 = importlib.util.module_from_spec(spec)
spec.loader.exec_module(bm25)
class Fail(SystemExit):
pass
def assert_eq(label, expected, actual):
if expected != actual:
raise Fail(f"FAIL {label}: expected {expected!r}, got {actual!r}")
print(f"OK {label}")
def assert_true(label, cond, hint=""):
if not cond:
raise Fail(f"FAIL {label}{(': ' + hint) if hint else ''}")
print(f"OK {label}")
def assert_close(label, expected, actual, eps=1e-4):
if abs(expected - actual) > eps:
raise Fail(f"FAIL {label}: expected ~{expected}, got {actual} (diff {abs(expected-actual)})")
print(f"OK {label}")
# ─── tokenize() ──────────────────────────────────────────────────────────────
def test_tokenize_basic():
assert_eq("tokenize basic", ["hello", "world"], bm25.tokenize("Hello, World!"))
def test_tokenize_stopwords():
out = bm25.tokenize("The quick brown fox is at the door")
assert_eq("tokenize strips stopwords", ["quick", "brown", "fox", "door"], out)
def test_tokenize_punctuation_and_apostrophe():
out = bm25.tokenize("don't-stop won't!")
assert_true("tokenize keeps apostrophes/hyphens", "don't-stop" in out or "don't" in out,
hint=f"got {out}")
def test_tokenize_short_tokens_dropped():
out = bm25.tokenize("a b cc dddd")
assert_eq("tokenize drops <2-char and stopwords", ["dddd"], [t for t in out if len(t) > 2])
def test_tokenize_unicode_multilingual():
"""v1.7.2 / closes audit M2: tokenizer must preserve non-ASCII content."""
# Cyrillic
out = bm25.tokenize("Привет мир")
assert_true("tokenize preserves Cyrillic", "привет" in out and "мир" in out,
hint=f"got {out}")
# CJK (each character is its own token because there are no word boundaries)
out = bm25.tokenize("日本語の文書")
assert_true("tokenize preserves CJK", len(out) >= 1 and any("" in t or "" in t for t in out),
hint=f"got {out}")
# Accented Latin (Spanish, French, German)
out = bm25.tokenize("café résumé naïve über")
assert_true("tokenize preserves accented Latin", "café" in out and "résumé" in out,
hint=f"got {out}")
# Pure-emoji string: no word chars → no tokens (correct skip)
out = bm25.tokenize("🎉🚀✨")
assert_eq("tokenize skips pure-emoji string", [], out)
# Mixed ASCII + non-ASCII: both survive
out = bm25.tokenize("Hello мир café")
assert_true("tokenize mixes ASCII + non-ASCII",
"hello" in out and "мир" in out and "café" in out, hint=f"got {out}")
# ─── build_index + query() ───────────────────────────────────────────────────
def synthetic_chunk(idx, address, raw_text, contextualized_text):
"""Build a chunk JSON record matching the contextual-prefix.py schema."""
import hashlib
body_hash = "sha256:" + hashlib.sha256(raw_text.encode()).hexdigest()
return {
"schema_version": 1,
"page_path": f"wiki/fake/{address}.md",
"page_address": address,
"chunk_index": idx,
"raw_text": raw_text,
"contextualized_text": contextualized_text,
"prefix": "",
"prefix_source": "synthetic",
"char_count": len(raw_text),
"body_hash": body_hash,
"page_body_hash": body_hash,
"created_at": "2026-05-17T00:00:00Z",
}
def test_build_and_query():
"""End-to-end: write synthetic chunks, build index, query, verify rankings."""
with tempfile.TemporaryDirectory() as tmpdir:
# Redirect bm25 module's paths to a sandbox
sandbox = Path(tmpdir)
meta = sandbox / ".vault-meta"
chunks_dir = meta / "chunks"
bm25_dir = meta / "bm25"
chunks_dir.mkdir(parents=True)
bm25_dir.mkdir(parents=True)
orig_meta = bm25.META_DIR
orig_chunks = bm25.CHUNKS_DIR
orig_bm25 = bm25.BM25_DIR
orig_index = bm25.INDEX_PATH
orig_lock = bm25.LOCK_PATH
bm25.META_DIR = meta
bm25.CHUNKS_DIR = chunks_dir
bm25.BM25_DIR = bm25_dir
bm25.INDEX_PATH = bm25_dir / "index.json"
bm25.LOCK_PATH = meta / ".bm25.lock"
try:
# 3 fake "pages" with 1 chunk each. Note "memory" appears in p1 and p3.
chunks = [
("c-000001", 0, "DragonScale memory mechanism for log folding"),
("c-000002", 0, "transport detection with the obsidian cli binary"),
("c-000003", 0, "memory layer architecture and the wiki vault"),
]
for addr, idx, text in chunks:
d = chunks_dir / addr
d.mkdir(exist_ok=True)
chunk = synthetic_chunk(idx, addr, text, text)
(d / f"chunk-{idx:03d}.json").write_text(json.dumps(chunk))
# Build index
index = bm25.build_index()
assert_eq("doc count", 3, index["doc_count"])
assert_true("vocab has 'memory'", "memory" in index["vocab"])
assert_true("vocab strips stopwords", "the" not in index["vocab"])
assert_eq("memory df", 2, index["vocab"]["memory"]["df"])
bm25.write_index(index)
assert_true("index file written", bm25.INDEX_PATH.is_file())
# Query: "memory" should rank p1 and p3 above p2
results = bm25.query("memory")
ids = [r["chunk_id"] for r in results]
assert_true("memory query returns 2 hits", len(results) == 2,
hint=f"got {ids}")
assert_true("c-000002 not in 'memory' results",
"c-000002:0" not in ids)
# Query: "transport" should hit only c-000002
results = bm25.query("transport")
assert_eq("transport query hits exactly p2", ["c-000002:0"],
[r["chunk_id"] for r in results])
# Query: stopwords-only returns empty
results = bm25.query("the and of")
assert_eq("stopwords-only query empty", [], results)
finally:
bm25.META_DIR = orig_meta
bm25.CHUNKS_DIR = orig_chunks
bm25.BM25_DIR = orig_bm25
bm25.INDEX_PATH = orig_index
bm25.LOCK_PATH = orig_lock
def test_query_score_monotonicity():
"""A query term appearing TWICE in a chunk should score higher than appearing ONCE.
(Standard BM25 monotonicity property within a single document length cohort.)"""
with tempfile.TemporaryDirectory() as tmpdir:
sandbox = Path(tmpdir)
meta = sandbox / ".vault-meta"
chunks_dir = meta / "chunks"
bm25_dir = meta / "bm25"
chunks_dir.mkdir(parents=True)
bm25_dir.mkdir(parents=True)
orig = (bm25.META_DIR, bm25.CHUNKS_DIR, bm25.BM25_DIR,
bm25.INDEX_PATH, bm25.LOCK_PATH)
bm25.META_DIR = meta
bm25.CHUNKS_DIR = chunks_dir
bm25.BM25_DIR = bm25_dir
bm25.INDEX_PATH = bm25_dir / "index.json"
bm25.LOCK_PATH = meta / ".bm25.lock"
try:
# Equal-length docs (rough): one has "memory" twice, other once.
(chunks_dir / "c-000001").mkdir()
(chunks_dir / "c-000002").mkdir()
(chunks_dir / "c-000001" / "chunk-000.json").write_text(
json.dumps(synthetic_chunk(0, "c-000001",
"memory memory rocket banana",
"memory memory rocket banana")))
(chunks_dir / "c-000002" / "chunk-000.json").write_text(
json.dumps(synthetic_chunk(0, "c-000002",
"memory rocket banana flute",
"memory rocket banana flute")))
bm25.write_index(bm25.build_index())
results = bm25.query("memory")
assert_true("BM25 monotonicity", results[0]["chunk_id"] == "c-000001:0",
hint=f"got {results}")
assert_true("two-mention > one-mention scores",
results[0]["score"] > results[1]["score"],
hint=f"got {results}")
finally:
(bm25.META_DIR, bm25.CHUNKS_DIR, bm25.BM25_DIR,
bm25.INDEX_PATH, bm25.LOCK_PATH) = orig
def test_idf_smoothing():
"""IDF should be positive and finite for any df in [1, N]."""
# Use the formula directly: idf = log(1 + (N - df + 0.5) / (df + 0.5))
for N in [1, 10, 1000]:
for df in range(1, N + 1):
idf = math.log(1 + (N - df + 0.5) / (df + 0.5))
assert_true(f"idf positive N={N} df={df}", idf > 0, hint=f"got {idf}")
# ─── CLI smoke test ──────────────────────────────────────────────────────────
def test_cli_stats_on_missing_index():
"""The CLI should exit 3 (EXIT_INDEX_MISSING) when no index exists."""
with tempfile.TemporaryDirectory() as tmpdir:
# Run in a subprocess with a fresh cwd and zeroed META_DIR
env = dict(os.environ)
# We can't easily redirect bm25's hard-coded paths from outside without
# rewriting the script. Instead: smoke-test the exit code path by
# invoking the module-level load_index() in a context where the index
# file doesn't exist.
orig_index = bm25.INDEX_PATH
bm25.INDEX_PATH = Path(tmpdir) / "nonexistent" / "index.json"
try:
try:
bm25.load_index()
raise Fail("load_index() should have exited on missing file")
except SystemExit as e:
assert_eq("load_index exit code", bm25.EXIT_INDEX_MISSING, e.code)
finally:
bm25.INDEX_PATH = orig_index
def main():
print("=== test_bm25_index.py ===")
test_tokenize_basic()
test_tokenize_stopwords()
test_tokenize_punctuation_and_apostrophe()
test_tokenize_unicode_multilingual()
test_tokenize_short_tokens_dropped()
test_build_and_query()
test_query_score_monotonicity()
test_idf_smoothing()
test_cli_stats_on_missing_index()
print("\nAll bm25-index tests passed.")
if __name__ == "__main__":
main()
+295
View File
@@ -0,0 +1,295 @@
#!/usr/bin/env python3
"""test_boundary_score.py — unit tests for scripts/boundary-score.py.
Exercises parser, recency weight, wikilink extraction (including the
code-block guard), graph construction, and top-N selection against a
throwaway in-memory vault. No external prerequisites.
Usage:
python3 tests/test_boundary_score.py
"""
import importlib.util
import json
import os
import subprocess
import sys
import tempfile
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
HELPER = ROOT / "scripts" / "boundary-score.py"
spec = importlib.util.spec_from_file_location("bs", HELPER)
bs = importlib.util.module_from_spec(spec)
spec.loader.exec_module(bs)
class Fail(SystemExit):
pass
def assert_eq(label, expected, actual):
if expected != actual:
raise Fail(f"FAIL {label}: expected {expected!r}, got {actual!r}")
print(f"OK {label}")
def assert_close(label, expected, actual, tol=1e-6):
if abs(expected - actual) > tol:
raise Fail(f"FAIL {label}: expected ~{expected!r}, got {actual!r}")
print(f"OK {label}")
def assert_true(label, cond):
if not cond:
raise Fail(f"FAIL {label}")
print(f"OK {label}")
def test_frontmatter_fields():
fm, body = bs.parse_frontmatter(
'---\ntype: concept\ntitle: "Foo Bar"\nupdated: 2026-04-20\ncreated: 2026-04-01\n---\n# Hello\n'
)
assert_eq("type", "concept", fm.get("type"))
assert_eq("title unquoted", "Foo Bar", fm.get("title"))
assert_eq("updated", "2026-04-20", fm.get("updated"))
assert_eq("created", "2026-04-01", fm.get("created"))
assert_eq("body", "# Hello\n", body)
def test_recency_weight_bounds():
import math
assert_close("day 0 -> ~1.0", 1.0, bs.recency_weight(0.0))
# 30 days = halflife -> exp(-1)
assert_close("day 30 -> e^-1", math.exp(-1.0), bs.recency_weight(30.0))
# No floor: very old pages approach zero
very_old = bs.recency_weight(10_000.0)
assert_true("very old close to zero", very_old < 1e-10)
def test_wikilink_extraction_basic():
body = "Text [[Foo]] and [[Bar|alias]] and [[Baz#Heading]] and [[Foo]] dup.\n"
links = bs.extract_wikilinks(body)
assert_eq("basic extraction", {"Foo", "Bar", "Baz"}, links)
def test_wikilink_code_block_skipped():
body = (
"Before [[Real]] link.\n"
"```\n"
"[[InBacktickBlock]]\n"
"```\n"
"After [[AnotherReal]] link.\n"
)
links = bs.extract_wikilinks(body)
assert_eq("backtick-block links excluded",
{"Real", "AnotherReal"}, links)
def test_wikilink_tilde_fence_skipped():
body = "A [[Outside]] link.\n~~~\n[[InTildeBlock]]\n~~~\nB [[Another]] link.\n"
assert_eq("tilde-block links excluded",
{"Outside", "Another"}, bs.extract_wikilinks(body))
def test_wikilink_longer_fence_handles_nested():
# Opening 4-backtick fence; an inner 3-backtick line must NOT close it
body = (
"[[Outside]]\n"
"````\n"
"some code\n"
"```\n"
"[[Nested]]\n"
"```\n"
"more code\n"
"````\n"
"[[AfterClose]]\n"
)
assert_eq("longer fence holds through shorter inner fence",
{"Outside", "AfterClose"}, bs.extract_wikilinks(body))
def test_wikilink_indented_not_filtered():
# Obsidian bullets with 4-space indent should still count
body = "Text\n [[IndentedBullet]]\n"
assert_eq("indented-4-space NOT filtered as code",
{"IndentedBullet"}, bs.extract_wikilinks(body))
def test_days_since():
today = bs.days_since(None)
assert_true("missing date -> large sentinel", today >= 9999.0)
garbage = bs.days_since("not-a-date")
assert_true("garbage date -> large sentinel", garbage >= 9999.0)
def test_graph_and_scoring_on_temp_vault():
with tempfile.TemporaryDirectory() as tmp:
tmp = Path(tmp)
wiki = tmp / "wiki"
(wiki / "concepts").mkdir(parents=True)
(wiki / "entities").mkdir(parents=True)
# Frontier page: many outbound, none inbound
(wiki / "concepts" / "Frontier.md").write_text(
"---\ntype: concept\ntitle: Frontier\nupdated: "
+ __import__("datetime").date.today().isoformat()
+ "\n---\n[[Hub]] [[Alpha]] [[Beta]]\n"
)
# Hub page: many inbound
(wiki / "concepts" / "Hub.md").write_text(
"---\ntype: concept\ntitle: Hub\nupdated: 2025-01-01\n---\nBody.\n"
)
(wiki / "entities" / "Alpha.md").write_text(
"---\ntype: entity\ntitle: Alpha\nupdated: 2025-01-01\n---\n[[Hub]]\n"
)
(wiki / "entities" / "Beta.md").write_text(
"---\ntype: entity\ntitle: Beta\nupdated: 2025-01-01\n---\n[[Hub]]\n"
)
# Excluded meta
(wiki / "index.md").write_text(
"---\ntype: meta\n---\n[[Frontier]] [[Hub]]\n"
)
original_root = bs.VAULT_ROOT
original_wiki = bs.WIKI_DIR
bs.VAULT_ROOT = tmp
bs.WIKI_DIR = wiki
try:
pages = bs.collect_pages()
assert_eq("scoreable count", 4, len(pages))
assert_true("Frontier present", "Frontier" in pages)
assert_true("Hub present", "Hub" in pages)
assert_true("Alpha present", "Alpha" in pages)
assert_true("Beta present", "Beta" in pages)
assert_true("meta excluded", "index" not in pages)
out_e, in_e = bs.build_graph(pages)
assert_eq("Frontier out-degree", 3, len(out_e["Frontier"]))
assert_eq("Hub out-degree", 0, len(out_e["Hub"]))
assert_eq("Hub in-degree", 3, len(in_e["Hub"])) # from Frontier, Alpha, Beta
assert_eq("Frontier in-degree from meta excluded",
0, len(in_e["Frontier"]))
frontier_score = bs.score_page("Frontier", pages, out_e, in_e)
hub_score = bs.score_page("Hub", pages, out_e, in_e)
assert_true("Frontier score positive", frontier_score["score"] > 0)
# Hub is older and has in-degree 3, out-degree 0. Without a
# recency floor, very-old hubs have near-zero weight, so their
# score approaches zero (not strongly negative). A fresh hub
# with the same topology WOULD score strongly negative; this
# is intentional — stale hubs do not pollute the frontier.
assert_true("Frontier outranks Hub", frontier_score["score"] > hub_score["score"])
assert_eq("Frontier out", 3, frontier_score["out_degree"])
assert_eq("Frontier in", 0, frontier_score["in_degree"])
assert_eq("Hub out", 0, hub_score["out_degree"])
assert_eq("Hub in", 3, hub_score["in_degree"])
finally:
bs.VAULT_ROOT = original_root
bs.WIKI_DIR = original_wiki
def test_graph_excludes_self_loop_unresolved_meta():
with tempfile.TemporaryDirectory() as tmp:
tmp = Path(tmp)
wiki = tmp / "wiki"
(wiki / "concepts").mkdir(parents=True)
# Self-loop via alias to itself
(wiki / "concepts" / "SelfLoop.md").write_text(
"---\ntype: concept\ntitle: SelfLoop\nupdated: 2026-04-24\n---\n[[SelfLoop]] [[DoesNotExist]]\n"
)
# Target that exists but is meta (excluded)
(wiki / "index.md").write_text(
"---\ntype: meta\n---\nmeta body\n"
)
(wiki / "concepts" / "LinksToMeta.md").write_text(
"---\ntype: concept\nupdated: 2026-04-24\n---\n[[index]]\n"
)
original_root = bs.VAULT_ROOT
original_wiki = bs.WIKI_DIR
bs.VAULT_ROOT = tmp
bs.WIKI_DIR = wiki
try:
pages = bs.collect_pages()
assert_eq("scoreable count (meta excluded)", 2, len(pages))
out_e, in_e = bs.build_graph(pages)
assert_eq("self-loop out-degree excludes self", 0, len(out_e["SelfLoop"]))
assert_eq("unresolved target not in out-edges", 0, len(out_e["SelfLoop"]))
assert_eq("LinksToMeta out-degree excludes meta target", 0, len(out_e["LinksToMeta"]))
finally:
bs.VAULT_ROOT = original_root
bs.WIKI_DIR = original_wiki
def test_cli_page_no_match():
result = subprocess.run(
[sys.executable, str(HELPER), "--page", "definitely-not-a-real-page-xyz"],
capture_output=True, text=True, timeout=5,
)
assert_eq("--page no-match exit", 2, result.returncode)
assert_true("--page error message", "no scoreable page matches" in result.stderr)
def test_included_rejects_symlink():
with tempfile.TemporaryDirectory() as tmp:
tmp = Path(tmp)
wiki = tmp / "wiki"
wiki.mkdir()
real = wiki / "real.md"
real.write_text("---\ntype: concept\n---\nbody\n")
link = wiki / "link.md"
link.symlink_to(real)
original_root = bs.VAULT_ROOT
bs.VAULT_ROOT = tmp
try:
ok_real = bs.included(real, {"type": "concept"})
ok_link = bs.included(link, {"type": "concept"})
assert_true("real file included", ok_real)
assert_eq("symlink excluded", False, ok_link)
finally:
bs.VAULT_ROOT = original_root
def test_cli_top_zero_usage_error():
result = subprocess.run(
[sys.executable, str(HELPER), "--top", "0"],
capture_output=True, text=True, timeout=5,
)
assert_eq("--top 0 exit", 2, result.returncode)
def test_cli_json_structure():
result = subprocess.run(
[sys.executable, str(HELPER), "--json", "--top", "1"],
capture_output=True, text=True, timeout=10,
)
assert_eq("--json exit 0", 0, result.returncode)
payload = json.loads(result.stdout)
for key in ("generated", "halflife_days",
"page_count_scoreable", "results"):
assert_true(f"json has {key}", key in payload)
assert_true("results is list", isinstance(payload["results"], list))
if __name__ == "__main__":
try:
test_frontmatter_fields()
test_recency_weight_bounds()
test_wikilink_extraction_basic()
test_wikilink_code_block_skipped()
test_wikilink_tilde_fence_skipped()
test_wikilink_longer_fence_handles_nested()
test_wikilink_indented_not_filtered()
test_days_since()
test_graph_and_scoring_on_temp_vault()
test_graph_excludes_self_loop_unresolved_meta()
test_included_rejects_symlink()
test_cli_top_zero_usage_error()
test_cli_page_no_match()
test_cli_json_structure()
except Fail as exc:
print(exc, file=sys.stderr)
sys.exit(1)
print("\nAll tests passed.")
+131
View File
@@ -0,0 +1,131 @@
#!/usr/bin/env bash
# test_concurrent_write.sh — verify multi-writer safety with wiki-lock.sh.
#
# The critical correctness gate from v1.7 §3.4. Spawns N background workers,
# each acquires a lock on the same file, appends a uniquely-tagged line, and
# releases. After all workers exit we verify:
# - the file received EXACTLY N appended lines (no losses)
# - every worker's tagged line is present (no silent dropping)
# - no orphaned lockfiles remain
# - clear-stale reports 0 leftovers
#
# Without wiki-lock.sh, concurrent appends to the same file via `echo >> file`
# can interleave and corrupt lines on some filesystems. With the lock, only
# one worker holds the file at a time, and atomic append-then-release prevents
# corruption.
#
# Hermetic: sandbox vault under mktemp, no network.
#
# Usage: bash tests/test_concurrent_write.sh
set -uo pipefail
ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
LOCK_SH="$ROOT/scripts/wiki-lock.sh"
WORKERS=10
TARGET_FILE_REL="wiki/concepts/Stress.md"
SANDBOX=$(mktemp -d /tmp/concurrent-write-test-XXXXXX)
trap 'rm -rf "$SANDBOX"' EXIT
mkdir -p "$SANDBOX/.vault-meta/locks" "$SANDBOX/wiki/concepts"
TARGET_ABS="$SANDBOX/$TARGET_FILE_REL"
echo "seed" > "$TARGET_ABS"
export WIKI_LOCK_VAULT="$SANDBOX"
PASS=0
FAIL=0
assert_eq() {
if [ "$2" = "$3" ]; then
echo "OK $1"
PASS=$((PASS + 1))
else
echo "FAIL $1: expected '$2', got '$3'"
FAIL=$((FAIL + 1))
fi
}
echo "=== test_concurrent_write.sh ==="
echo "sandbox: $SANDBOX"
echo "workers: $WORKERS"
echo "target: $TARGET_FILE_REL"
echo ""
# ── Worker function: acquire lock, append, release ──────────────────────────
worker() {
local id="$1"
local attempts=0
local max_attempts=50
# Random jitter so workers don't all hit at the same instant
local jitter=$(awk -v id="$id" 'BEGIN { srand(id); print int(rand()*100) }')
# POSIX-portable sub-second sleep via sleep(1) with fractional seconds (GNU/macOS supports it)
sleep "0.0${jitter}" 2>/dev/null || sleep 1
while [ "$attempts" -lt "$max_attempts" ]; do
if bash "$LOCK_SH" acquire "$TARGET_FILE_REL" >/dev/null 2>&1; then
# Append our line atomically
echo "worker-$id-tag" >> "$TARGET_ABS"
bash "$LOCK_SH" release "$TARGET_FILE_REL" >/dev/null 2>&1
return 0
fi
attempts=$((attempts + 1))
sleep "0.05" 2>/dev/null || sleep 1
done
echo "worker $id gave up after $attempts attempts" >&2
return 1
}
# ── Spawn workers in parallel ───────────────────────────────────────────────
PIDS=()
for i in $(seq 1 $WORKERS); do
worker "$i" &
PIDS+=("$!")
done
# Wait for all workers
FAILED_WORKERS=0
for pid in "${PIDS[@]}"; do
if ! wait "$pid"; then
FAILED_WORKERS=$((FAILED_WORKERS + 1))
fi
done
assert_eq "all workers completed (no give-ups)" "0" "$FAILED_WORKERS"
# ── Verify: file has seed + exactly N tagged lines ──────────────────────────
TOTAL_LINES=$(wc -l < "$TARGET_ABS")
assert_eq "total line count (seed + workers)" "$((WORKERS + 1))" "$TOTAL_LINES"
# Every worker tag must appear exactly once
for i in $(seq 1 $WORKERS); do
COUNT=$(grep -c "^worker-$i-tag$" "$TARGET_ABS" || echo 0)
if [ "$COUNT" != "1" ]; then
echo "FAIL worker-$i tag count: expected 1, got $COUNT"
FAIL=$((FAIL + 1))
fi
done
echo "OK every worker tag appears exactly once"
PASS=$((PASS + 1))
# ── Verify: no orphaned lockfiles ───────────────────────────────────────────
LIVE_LOCKS=$(bash "$LOCK_SH" list | wc -l)
assert_eq "no live lockfiles after workers exited" "0" "$LIVE_LOCKS"
# ── Verify: clear-stale reports 0 (nothing to reap) ─────────────────────────
REAPED=$(bash "$LOCK_SH" clear-stale --max-age 0)
assert_eq "clear-stale reaped count" "0" "$REAPED"
# ── Verify: file content sanity (no truncated/garbled lines) ────────────────
GARBLED=$(awk 'length > 100' "$TARGET_ABS" | wc -l)
assert_eq "no garbled (overlong) lines" "0" "$GARBLED"
echo ""
echo "Pass: $PASS Fail: $FAIL"
if [ $FAIL -gt 0 ]; then
echo "File contents:"
cat "$TARGET_ABS"
exit 1
fi
echo "All concurrent-write tests passed."
+112
View File
@@ -0,0 +1,112 @@
#!/usr/bin/env python3
"""test_contextual_prefix.py — hermetic tests for scripts/contextual-prefix.py.
Covers the Haiku cache-floor decision (cache_control_for). The network paths
(tier-1 Anthropic API, tier-2 claude CLI) are egress-gated and excluded from
hermetic tests by design; only the pure floor logic is exercised here. No
network, no LLM, no ollama. Pure stdlib.
Usage:
python3 tests/test_contextual_prefix.py
"""
import importlib.util
import json
from pathlib import Path
from unittest import mock
ROOT = Path(__file__).resolve().parent.parent
HELPER = ROOT / "scripts" / "contextual-prefix.py"
spec = importlib.util.spec_from_file_location("contextual_prefix", HELPER)
cp = importlib.util.module_from_spec(spec)
spec.loader.exec_module(cp)
class Fail(SystemExit):
pass
def assert_eq(label, expected, actual):
if expected != actual:
raise Fail(f"FAIL {label}: expected {expected!r}, got {actual!r}")
print(f"OK {label}")
def assert_true(label, cond):
if not cond:
raise Fail(f"FAIL {label}")
print(f"OK {label}")
# ─── Below the floor → no cache_control (silent no-op avoided) ───────────────
def test_below_floor_returns_none():
body = "x" * (cp.HAIKU_CACHE_MIN_CHARS - 1)
assert_eq("body 1 char below floor → None", None, cp.cache_control_for(body))
def test_empty_body_returns_none():
assert_eq("empty body → None", None, cp.cache_control_for(""))
# ─── At / above the floor → ephemeral cache_control ──────────────────────────
def test_at_floor_returns_ephemeral():
body = "x" * cp.HAIKU_CACHE_MIN_CHARS
assert_eq("body exactly at floor → ephemeral",
{"type": "ephemeral"}, cp.cache_control_for(body))
def test_above_floor_returns_ephemeral():
body = "x" * (cp.HAIKU_CACHE_MIN_CHARS * 3)
assert_eq("body well above floor → ephemeral",
{"type": "ephemeral"}, cp.cache_control_for(body))
# ─── Integration: built payload attaches cache_control only above the floor ──
def test_payload_attaches_cache_control_by_body_size():
"""Mock the network. Assert the API payload attaches cache_control to the
page block only when the body clears the floor, and the multi-line model
reply is truncated to one line. No network, no LLM."""
captured = {}
class _Resp:
def __init__(self, d):
self._d = json.dumps(d).encode()
def read(self):
return self._d
def __enter__(self):
return self
def __exit__(self, *a):
return False
def _fake_urlopen(req, timeout=None):
captured["body"] = json.loads(req.data.decode())
return _Resp({
"content": [{"type": "text", "text": "one situating line.\nIGNORED"}],
"usage": {"cache_creation_input_tokens": 7, "cache_read_input_tokens": 3},
})
with mock.patch.object(cp.urllib.request, "urlopen", _fake_urlopen):
out = cp.anthropic_api_prefix("KEY", "T", "x" * cp.HAIKU_CACHE_MIN_CHARS, "chunk")
assert_eq("multi-line reply truncated to one line", "one situating line.", out)
assert_true("above-floor body attaches cache_control",
"cache_control" in captured["body"]["system"][1])
cp.anthropic_api_prefix("KEY", "T", "tiny", "chunk")
assert_true("below-floor body omits cache_control",
"cache_control" not in captured["body"]["system"][1])
def main():
print("=== test_contextual_prefix.py ===")
test_below_floor_returns_none()
test_empty_body_returns_none()
test_at_floor_returns_ephemeral()
test_above_floor_returns_ephemeral()
test_payload_attaches_cache_control_by_body_size()
print("\nAll contextual-prefix tests passed.")
if __name__ == "__main__":
main()
+343
View File
@@ -0,0 +1,343 @@
#!/usr/bin/env python3
"""test_retrieve.py — hermetic tests for scripts/retrieve.py and scripts/rerank.py.
No network, no ollama, no LLM calls. Tests cover:
- import_sibling resolves hyphenated module names
- chunk_snippet truncation behavior
- rerank.cosine math correctness
- rerank.rerank() no-op behavior when ollama is unreachable
- retrieve.py exit 10 (not provisioned) when chunks/index are missing
- dedupe-by-page logic via integration smoke test on synthetic fixtures
Usage:
python3 tests/test_retrieve.py
"""
import importlib.util
import json
import os
import subprocess
import sys
import tempfile
import unittest.mock
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
RETRIEVE = ROOT / "scripts" / "retrieve.py"
RERANK = ROOT / "scripts" / "rerank.py"
BM25 = ROOT / "scripts" / "bm25-index.py"
def import_script(name, path):
spec = importlib.util.spec_from_file_location(name, path)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
retrieve = import_script("retrieve", RETRIEVE)
rerank = import_script("rerank", RERANK)
bm25 = import_script("bm25", BM25)
class Fail(SystemExit):
pass
def assert_eq(label, expected, actual):
if expected != actual:
raise Fail(f"FAIL {label}: expected {expected!r}, got {actual!r}")
print(f"OK {label}")
def assert_true(label, cond, hint=""):
if not cond:
raise Fail(f"FAIL {label}{(': ' + hint) if hint else ''}")
print(f"OK {label}")
def assert_close(label, expected, actual, eps=1e-6):
if abs(expected - actual) > eps:
raise Fail(f"FAIL {label}: expected ~{expected}, got {actual}")
print(f"OK {label}")
# ─── import_sibling ──────────────────────────────────────────────────────────
def test_import_sibling_resolves_hyphenated_names():
"""retrieve.import_sibling('bm25_index', 'bm25-index.py') must succeed."""
mod = retrieve.import_sibling("bm25_index", "bm25-index.py")
assert_true("import_sibling returns module", mod is not None)
assert_true("module has tokenize()", callable(getattr(mod, "tokenize", None)))
# ─── chunk_snippet ───────────────────────────────────────────────────────────
def test_chunk_snippet_short():
"""Short chunks should pass through unchanged."""
out = retrieve.chunk_snippet({"raw_text": "short text"}, max_chars=200)
assert_eq("chunk_snippet short pass-through", "short text", out)
def test_chunk_snippet_truncates_with_ellipsis():
"""Long chunks should be truncated with an ellipsis."""
long_text = "x" * 500
out = retrieve.chunk_snippet({"raw_text": long_text}, max_chars=100)
assert_true("snippet length under cap", len(out) <= 110, hint=f"len={len(out)}")
assert_true("snippet ends with ellipsis", out.endswith(""))
# ─── rerank.cosine() ─────────────────────────────────────────────────────────
def test_cosine_identical():
assert_close("cosine identical vectors", 1.0, rerank.cosine([1.0, 2.0, 3.0], [1.0, 2.0, 3.0]))
def test_cosine_orthogonal():
assert_close("cosine orthogonal", 0.0, rerank.cosine([1.0, 0.0], [0.0, 1.0]))
def test_cosine_anti_parallel():
assert_close("cosine anti-parallel", -1.0, rerank.cosine([1.0, 0.0], [-1.0, 0.0]))
def test_cosine_length_mismatch():
"""Mismatched vector lengths should return 0.0 (defensive, not crash)."""
assert_close("cosine length mismatch", 0.0, rerank.cosine([1.0], [1.0, 2.0]))
def test_cosine_zero_vector():
assert_close("cosine zero vector", 0.0, rerank.cosine([0.0, 0.0], [1.0, 2.0]))
# ─── rerank.rerank() no-op fallback ──────────────────────────────────────────
def test_rerank_noop_when_ollama_unreachable():
"""When ollama is not reachable, rerank should pass candidates through with
rerank_source='noop-no-ollama'. We force this by patching ollama_alive."""
with unittest.mock.patch.object(rerank, "ollama_alive", return_value=(False, [])):
candidates = [
{"chunk_id": "c-001:0", "score": 7.5, "path": "fake/p1.json"},
{"chunk_id": "c-002:0", "score": 5.1, "path": "fake/p2.json"},
]
out = rerank.rerank("query", candidates, top_k=5)
assert_eq("rerank no-op preserves order", ["c-001:0", "c-002:0"],
[c["chunk_id"] for c in out])
assert_true("rerank no-op tags source",
all(c.get("rerank_source") == "noop-no-ollama" for c in out))
assert_true("rerank no-op copies score to rerank_score",
all(c["rerank_score"] == c["score"] for c in out))
def test_rerank_noop_when_model_missing():
"""When ollama is up but model isn't pulled, rerank should still no-op."""
with unittest.mock.patch.object(rerank, "ollama_alive", return_value=(True, ["other-model"])):
candidates = [{"chunk_id": "c-001:0", "score": 5.0, "path": "x"}]
out = rerank.rerank("query", candidates, top_k=5)
assert_eq("rerank no-op for missing model", "noop-no-model", out[0]["rerank_source"])
def test_rerank_truncates_to_top_k():
with unittest.mock.patch.object(rerank, "ollama_alive", return_value=(False, [])):
candidates = [{"chunk_id": f"c-{i:03}:0", "score": float(i), "path": "x"} for i in range(10)]
out = rerank.rerank("query", candidates, top_k=3)
assert_eq("rerank truncates to top_k", 3, len(out))
# ─── retrieve.py CLI: exit 10 when not provisioned ────────────────────────────
def test_retrieve_exits_10_without_index():
"""End-to-end CLI test: with no .vault-meta/bm25/index.json, retrieve.py must exit 10."""
with tempfile.TemporaryDirectory() as tmpdir:
# Build a minimal vault layout under tmpdir
sandbox = Path(tmpdir)
(sandbox / "scripts").mkdir()
(sandbox / ".vault-meta").mkdir()
# Copy retrieve.py and its dependencies into the sandbox
import shutil
for f in ["retrieve.py", "bm25-index.py", "rerank.py"]:
shutil.copy(ROOT / "scripts" / f, sandbox / "scripts" / f)
os.chmod(sandbox / "scripts" / f, 0o755)
# Run retrieve.py — should exit 10 because no bm25 index exists
result = subprocess.run(
[sys.executable, str(sandbox / "scripts" / "retrieve.py"), "test query"],
capture_output=True,
text=True,
timeout=10,
)
assert_eq("retrieve.py exit 10 when not provisioned", 10, result.returncode)
assert_true("retrieve.py prints friendly error",
"no BM25 index" in result.stderr,
hint=result.stderr[:200])
# ─── Integration smoke test: end-to-end with synthetic data ──────────────────
def test_end_to_end_with_synthetic_chunks():
"""Build a minimal vault with 2 chunks, index it, run retrieve, verify output."""
import hashlib
with tempfile.TemporaryDirectory() as tmpdir:
sandbox = Path(tmpdir)
(sandbox / "scripts").mkdir()
meta = sandbox / ".vault-meta"
chunks_dir = meta / "chunks"
bm25_dir = meta / "bm25"
chunks_dir.mkdir(parents=True)
bm25_dir.mkdir(parents=True)
# Copy scripts
import shutil
for f in ["retrieve.py", "bm25-index.py", "rerank.py"]:
shutil.copy(ROOT / "scripts" / f, sandbox / "scripts" / f)
os.chmod(sandbox / "scripts" / f, 0o755)
# Write 2 synthetic chunks
def chunk(addr, idx, text):
return {
"schema_version": 1,
"page_path": f"wiki/fake/{addr}.md",
"page_address": addr,
"chunk_index": idx,
"raw_text": text,
"contextualized_text": text,
"prefix": "",
"prefix_source": "synthetic",
"char_count": len(text),
"body_hash": "sha256:" + hashlib.sha256(text.encode()).hexdigest(),
"page_body_hash": "sha256:0",
"created_at": "2026-05-17T00:00:00Z",
}
(chunks_dir / "c-000001").mkdir()
(chunks_dir / "c-000002").mkdir()
(chunks_dir / "c-000001" / "chunk-000.json").write_text(
json.dumps(chunk("c-000001", 0, "compounding wiki vault pattern by karpathy")))
(chunks_dir / "c-000002" / "chunk-000.json").write_text(
json.dumps(chunk("c-000002", 0, "obsidian cli transport detection")))
# Build index via subprocess (uses the sandbox's META_DIR? no — it uses the
# script's hard-coded paths relative to its location. Since we copied the
# script into sandbox/scripts/, VAULT_ROOT will compute to `sandbox`.)
result = subprocess.run(
[sys.executable, str(sandbox / "scripts" / "bm25-index.py"), "build"],
capture_output=True, text=True, timeout=10)
assert_eq("bm25 build rc=0", 0, result.returncode)
# Run retrieve
result = subprocess.run(
[sys.executable, str(sandbox / "scripts" / "retrieve.py"),
"karpathy wiki", "--top", "2", "--no-rerank"],
capture_output=True, text=True, timeout=10)
assert_eq("retrieve rc=0", 0, result.returncode)
out = json.loads(result.stdout)
assert_eq("retrieve.strategy is bm25-only", "bm25-only", out["strategy"])
assert_true("retrieve returns at least 1 candidate", len(out["candidates"]) >= 1)
# c-000001 should rank above c-000002 for "karpathy wiki"
first = out["candidates"][0]
assert_eq("top hit is c-000001", "c-000001", first["page_address"])
# ─── M8 closure: --explain and --no-rerank flag coverage ─────────────────────
def test_explain_flag_adds_diagnostics_block():
"""v1.7.2 / closes audit M8: --explain must include an 'explain' diagnostics block."""
import hashlib
with tempfile.TemporaryDirectory() as tmpdir:
sandbox = Path(tmpdir)
(sandbox / "scripts").mkdir()
meta = sandbox / ".vault-meta"
chunks_dir = meta / "chunks"
bm25_dir = meta / "bm25"
chunks_dir.mkdir(parents=True)
bm25_dir.mkdir(parents=True)
import shutil
for f in ["retrieve.py", "bm25-index.py", "rerank.py"]:
shutil.copy(ROOT / "scripts" / f, sandbox / "scripts" / f)
os.chmod(sandbox / "scripts" / f, 0o755)
# 2 synthetic chunks
(chunks_dir / "c-000010").mkdir()
(chunks_dir / "c-000010" / "chunk-000.json").write_text(json.dumps({
"schema_version": 1, "page_path": "wiki/fake/c-000010.md",
"page_address": "c-000010", "chunk_index": 0,
"raw_text": "hybrid retrieval pipeline",
"contextualized_text": "hybrid retrieval pipeline",
"prefix": "", "prefix_source": "synthetic",
"char_count": 25,
"body_hash": "sha256:" + hashlib.sha256(b"hybrid retrieval pipeline").hexdigest(),
"page_body_hash": "sha256:0",
"created_at": "2026-05-17T00:00:00Z",
}))
# Build index
subprocess.run([sys.executable, str(sandbox / "scripts" / "bm25-index.py"), "build"],
capture_output=True, timeout=10, check=True)
# Run with --explain --no-rerank
result = subprocess.run(
[sys.executable, str(sandbox / "scripts" / "retrieve.py"),
"hybrid", "--top", "1", "--no-rerank", "--explain"],
capture_output=True, text=True, timeout=10)
assert_eq("retrieve --explain --no-rerank rc=0", 0, result.returncode)
out = json.loads(result.stdout)
assert_true("--explain produces 'explain' key",
"explain" in out, hint=f"keys={list(out.keys())}")
explain = out.get("explain", {})
assert_true("--explain reports BM25 candidate count",
"bm25_candidates" in explain or "bm25" in str(explain).lower(),
hint=f"explain={explain}")
def test_no_rerank_flag_strategy_bm25_only():
"""v1.7.2 / closes audit M8: --no-rerank must produce strategy='bm25-only'."""
import hashlib
with tempfile.TemporaryDirectory() as tmpdir:
sandbox = Path(tmpdir)
(sandbox / "scripts").mkdir()
meta = sandbox / ".vault-meta"
chunks_dir = meta / "chunks"
bm25_dir = meta / "bm25"
chunks_dir.mkdir(parents=True)
bm25_dir.mkdir(parents=True)
import shutil
for f in ["retrieve.py", "bm25-index.py", "rerank.py"]:
shutil.copy(ROOT / "scripts" / f, sandbox / "scripts" / f)
os.chmod(sandbox / "scripts" / f, 0o755)
(chunks_dir / "c-000020").mkdir()
(chunks_dir / "c-000020" / "chunk-000.json").write_text(json.dumps({
"schema_version": 1, "page_path": "wiki/fake/c-000020.md",
"page_address": "c-000020", "chunk_index": 0,
"raw_text": "transport detection fallback chain",
"contextualized_text": "transport detection fallback chain",
"prefix": "", "prefix_source": "synthetic",
"char_count": 35,
"body_hash": "sha256:" + hashlib.sha256(b"transport detection fallback chain").hexdigest(),
"page_body_hash": "sha256:0",
"created_at": "2026-05-17T00:00:00Z",
}))
subprocess.run([sys.executable, str(sandbox / "scripts" / "bm25-index.py"), "build"],
capture_output=True, timeout=10, check=True)
result = subprocess.run(
[sys.executable, str(sandbox / "scripts" / "retrieve.py"),
"transport", "--top", "1", "--no-rerank"],
capture_output=True, text=True, timeout=10)
assert_eq("retrieve --no-rerank rc=0", 0, result.returncode)
out = json.loads(result.stdout)
assert_eq("--no-rerank sets strategy='bm25-only'", "bm25-only", out.get("strategy"))
# --no-rerank produces a consistent shape: rerank fields are populated
# but rerank_source is "skipped" (so callers don't have to special-case).
candidates = out.get("candidates", [])
assert_true("--no-rerank still returns candidates", len(candidates) >= 1)
first = candidates[0]
assert_eq("--no-rerank candidate rerank_source='skipped'", "skipped",
first.get("rerank_source"))
assert_eq("--no-rerank candidate rerank_score equals bm25_score",
first.get("bm25_score"), first.get("rerank_score"))
def main():
print("=== test_retrieve.py ===")
test_import_sibling_resolves_hyphenated_names()
test_chunk_snippet_short()
test_chunk_snippet_truncates_with_ellipsis()
test_cosine_identical()
test_cosine_orthogonal()
test_cosine_anti_parallel()
test_cosine_length_mismatch()
test_cosine_zero_vector()
test_rerank_noop_when_ollama_unreachable()
test_rerank_noop_when_model_missing()
test_rerank_truncates_to_top_k()
test_retrieve_exits_10_without_index()
test_end_to_end_with_synthetic_chunks()
test_explain_flag_adds_diagnostics_block()
test_no_rerank_flag_strategy_bm25_only()
print("\nAll retrieve tests passed.")
if __name__ == "__main__":
main()
+158
View File
@@ -0,0 +1,158 @@
#!/usr/bin/env python3
"""test_tiling_check.py — unit tests for scripts/tiling-check.py.
Does NOT require ollama; tests cover parsing, cosine, inclusion logic,
hash properties, cache schema, and the localhost-URL guard. Tests that
need ollama are marked and skipped cleanly when the helper reports
exit 10/11.
Usage:
python3 tests/test_tiling_check.py
"""
import importlib.util
import json
import os
import subprocess
import sys
import tempfile
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
HELPER = ROOT / "scripts" / "tiling-check.py"
spec = importlib.util.spec_from_file_location("tc", HELPER)
tc = importlib.util.module_from_spec(spec)
spec.loader.exec_module(tc)
class Fail(SystemExit):
pass
def assert_eq(label, expected, actual):
if expected != actual:
raise Fail(f"FAIL {label}: expected {expected!r}, got {actual!r}")
print(f"OK {label}")
def assert_true(label, cond):
if not cond:
raise Fail(f"FAIL {label}")
print(f"OK {label}")
def test_cosine():
assert_eq("cosine identical", 1.0, tc.cosine([1.0, 2.0, 3.0], [1.0, 2.0, 3.0]))
assert_eq("cosine orthogonal", 0.0, tc.cosine([1.0, 0.0], [0.0, 1.0]))
assert_eq("cosine anti-parallel", -1.0, tc.cosine([1.0, 0.0], [-1.0, 0.0]))
assert_eq("cosine zero vector", 0.0, tc.cosine([0.0, 0.0], [1.0, 2.0]))
try:
tc.cosine([1.0], [1.0, 2.0])
raise Fail("FAIL dim mismatch should raise")
except ValueError:
print("OK cosine dim mismatch raises ValueError")
def test_frontmatter():
fm, body = tc.parse_frontmatter("---\ntype: concept\ntitle: Foo\n---\n# Body\n")
assert_eq("parse type", "concept", fm.get("type"))
assert_eq("parse body", "# Body\n", body)
fm, body = tc.parse_frontmatter("# Just a title\n")
assert_eq("no frontmatter -> empty", {}, fm)
fm, _ = tc.parse_frontmatter('---\ntype: "meta"\n---\nbody\n')
assert_eq("quoted type stripped", "meta", fm.get("type"))
def test_body_hash_model_scoped():
h1 = tc.body_hash("body", "model-A")
h2 = tc.body_hash("body", "model-B")
h3 = tc.body_hash("body", "model-A")
assert_true("different models hash differently", h1 != h2)
assert_eq("same body+model hashes identically", h1, h3)
def test_included_basic():
cases = [
(ROOT / "wiki/concepts/Foo.md", {"type": "concept"}, True, "included"),
(ROOT / "wiki/index.md", {"type": "meta"}, False, "excluded filename"),
(ROOT / "wiki/folds/fold-1.md", {"type": "fold"}, False, "under wiki/folds/"),
(ROOT / "wiki/meta/session.md", {"type": "session"}, False, "under wiki/meta/"),
(ROOT / "wiki/entities/Person.md", {"type": "entity"}, True, "included"),
]
for path, fm, expected_ok, expected_reason in cases:
ok, reason = tc.included(path, fm)
label = f"included({path.relative_to(ROOT)}, {fm.get('type')})"
assert_eq(label + ".ok", expected_ok, ok)
assert_eq(label + ".reason", expected_reason, reason)
def test_is_local_url():
assert_true("127.0.0.1 is local", tc._is_local_url("http://127.0.0.1:11434"))
assert_true("localhost is local", tc._is_local_url("http://localhost:11434"))
assert_true("::1 is local", tc._is_local_url("http://[::1]:11434"))
assert_true("example.com NOT local", not tc._is_local_url("http://example.com"))
assert_true("1.2.3.4 NOT local", not tc._is_local_url("http://1.2.3.4"))
def test_cache_schema():
with tempfile.TemporaryDirectory() as tmp:
tmp = Path(tmp)
original_cache = tc.CACHE_PATH
original_meta = tc.META_DIR
tc.CACHE_PATH = tmp / "cache.json"
tc.META_DIR = tmp
try:
c = tc.load_cache("m1")
assert_eq("empty cache -> version 1", 1, c["version"])
assert_eq("empty cache -> empty embeddings", {}, c["embeddings"])
tc.CACHE_PATH.write_text(json.dumps({"version": 1, "model": "m1", "embeddings": {"a.md": {"hash": "h", "embedding": [1.0]}}}))
c = tc.load_cache("m1")
assert_eq("valid cache loads", 1, len(c["embeddings"]))
c = tc.load_cache("m2")
assert_eq("model drift -> empty", {}, c["embeddings"])
assert_eq("model drift -> new model", "m2", c["model"])
tc.CACHE_PATH.write_text("not-json{{")
try:
tc.load_cache("m1")
raise Fail("FAIL corrupt cache should SystemExit")
except SystemExit as e:
assert_eq("corrupt cache exit", 3, e.code)
tc.CACHE_PATH.write_text(json.dumps({"version": 999, "embeddings": {}}))
try:
tc.load_cache("m1")
raise Fail("FAIL wrong version should SystemExit")
except SystemExit as e:
assert_eq("wrong version exit", 3, e.code)
finally:
tc.CACHE_PATH = original_cache
tc.META_DIR = original_meta
def test_url_guard_via_subprocess():
env = os.environ.copy()
env["OLLAMA_URL"] = "http://example.com:11434"
result = subprocess.run(
[sys.executable, str(HELPER), "--peek"],
env=env, capture_output=True, text=True, timeout=10,
)
assert_eq("remote URL without flag exit", 2, result.returncode)
assert_true("remote URL error message", "not localhost" in result.stderr)
if __name__ == "__main__":
try:
test_cosine()
test_frontmatter()
test_body_hash_model_scoped()
test_included_basic()
test_is_local_url()
test_cache_schema()
test_url_guard_via_subprocess()
except Fail as exc:
print(exc, file=sys.stderr)
sys.exit(1)
print("\nAll tests passed.")
+169
View File
@@ -0,0 +1,169 @@
#!/usr/bin/env bash
# test_wiki_lock.sh — unit tests for scripts/wiki-lock.sh.
#
# Hermetic: creates a throwaway vault under mktemp, no network, no external
# deps beyond bash + standard POSIX utilities. Covers:
# - acquire returns 0 on first call, 75 on second call from a holding context
# - release frees the lock and re-acquire works
# - list shows held locks; reflects releases
# - clear-stale removes locks for dead PIDs
# - peek is read-only and reports unheld/held correctly
# - path validation rejects absolute paths and traversal
#
# Usage: bash tests/test_wiki_lock.sh
set -uo pipefail
ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
LOCK_SH="$ROOT/scripts/wiki-lock.sh"
PASS=0
FAIL=0
assert_eq() {
local label="$1" expected="$2" actual="$3"
if [ "$expected" = "$actual" ]; then
echo "OK $label"
PASS=$((PASS + 1))
else
echo "FAIL $label: expected '$expected', got '$actual'"
FAIL=$((FAIL + 1))
fi
}
assert_true() {
local label="$1"
shift
if "$@"; then
echo "OK $label"
PASS=$((PASS + 1))
else
echo "FAIL $label"
FAIL=$((FAIL + 1))
fi
}
# Set up a sandbox vault for the duration of this run
SANDBOX=$(mktemp -d /tmp/wiki-lock-test-XXXXXX)
trap 'rm -rf "$SANDBOX"' EXIT
mkdir -p "$SANDBOX/.vault-meta/locks"
export WIKI_LOCK_VAULT="$SANDBOX"
# Helper: run wiki-lock.sh against the sandbox; return rc
wl() {
bash "$LOCK_SH" "$@"
}
echo "=== test_wiki_lock.sh ==="
echo "sandbox: $SANDBOX"
echo ""
# ── acquire on a fresh path returns 0 ────────────────────────────────────────
wl acquire wiki/concepts/Foo.md >/dev/null
assert_eq "first acquire rc" "0" "$?"
# ── second acquire while the lock is fresh returns 75 ────────────────────────
# With age-based staleness (STALE_AFTER_SEC=60 default), the lock is held until
# either an explicit release OR 60 seconds elapse. A second acquire immediately
# after the first should refuse.
RC2=$( (wl acquire wiki/concepts/Foo.md >/dev/null); echo $? )
assert_eq "second acquire while fresh rc" "75" "$RC2"
# ── peek shows the lock ──────────────────────────────────────────────────────
PEEK_OUT=$(wl peek wiki/concepts/Foo.md)
case "$PEEK_OUT" in
*"wiki/concepts/Foo.md"*) assert_eq "peek includes path" "yes" "yes" ;;
*) assert_eq "peek includes path" "yes" "no($PEEK_OUT)" ;;
esac
# ── list shows the held lock ─────────────────────────────────────────────────
LIST_OUT=$(wl list)
case "$LIST_OUT" in
*"wiki/concepts/Foo.md"*) assert_eq "list shows held lock" "yes" "yes" ;;
*) assert_eq "list shows held lock" "yes" "no" ;;
esac
# ── release frees the lock (cross-process release is allowed by design) ─────
wl release wiki/concepts/Foo.md
LIST_AFTER_RELEASE=$(wl list)
assert_eq "list empty after release" "" "$LIST_AFTER_RELEASE"
# ── re-acquire after release succeeds ───────────────────────────────────────
wl acquire wiki/concepts/Foo.md >/dev/null
assert_eq "re-acquire after release rc" "0" "$?"
wl release wiki/concepts/Foo.md
# ── short --stale-after-sec lets us test age-based reap quickly ─────────────
# Acquire with a 1-second stale window, sleep 2s, second acquire should succeed
wl --stale-after-sec 1 acquire wiki/concepts/Aged.md >/dev/null 2>&1 || \
bash "$LOCK_SH" acquire --stale-after-sec 1 wiki/concepts/Aged.md >/dev/null 2>&1
# (flag order tolerance) — make sure the lock exists
PEEK_AGED=$(wl peek wiki/concepts/Aged.md)
case "$PEEK_AGED" in
*Aged.md*) : ;;
*) echo "DEBUG: aged peek was: $PEEK_AGED" ;;
esac
sleep 2
RC_AGED=$( (bash "$LOCK_SH" --stale-after-sec 1 acquire wiki/concepts/Aged.md >/dev/null 2>&1); echo $? )
assert_eq "age-based stale reap allows re-acquire" "0" "$RC_AGED"
wl release wiki/concepts/Aged.md
# ── clear-stale with max-age=0 reaps everything ──────────────────────────────
# First seed a lock to reap
wl acquire wiki/concepts/Reap.md >/dev/null
REMOVED=$(wl clear-stale --max-age 0)
# Should have removed 1 (the Reap.md lock)
case "$REMOVED" in
[1-9]*) assert_eq "clear-stale removed count >=1" "yes" "yes" ;;
*) assert_eq "clear-stale removed count >=1" "yes" "no($REMOVED)" ;;
esac
LIST_AFTER_CLEAR=$(wl list)
assert_eq "list empty after clear-stale" "" "$LIST_AFTER_CLEAR"
# ── peek on unheld path ──────────────────────────────────────────────────────
PEEK_UNHELD=$(wl peek wiki/concepts/Never.md)
assert_eq "peek unheld" "unheld" "$PEEK_UNHELD"
# ── path validation: absolute path rejected ──────────────────────────────────
RC_ABS=$( (wl acquire /etc/passwd >/dev/null 2>&1); echo $? )
assert_eq "acquire absolute path rejected" "4" "$RC_ABS"
# ── path validation: traversal rejected ──────────────────────────────────────
RC_DOTDOT=$( (wl acquire ../escape.md >/dev/null 2>&1); echo $? )
assert_eq "acquire ../ rejected" "4" "$RC_DOTDOT"
# ── path validation: empty rejected ──────────────────────────────────────────
RC_EMPTY=$( (wl acquire "" >/dev/null 2>&1); echo $? )
assert_eq "acquire empty path rejected" "4" "$RC_EMPTY"
# ── path validation: newline rejected (v1.7.2; closes audit M4) ──────────────
# Newlines in lock paths would break the meta-lock line format (key=value lines
# separated by literal \n). Must be rejected at validate_path() time.
RC_NL=$( (wl acquire $'wiki/concepts/Foo\nbar.md' >/dev/null 2>&1); echo $? )
assert_eq "acquire newline path rejected" "4" "$RC_NL"
# ── path validation: carriage return rejected (v1.7.2; closes audit M4) ──────
RC_CR=$( (wl acquire $'wiki/concepts/Foo\rbar.md' >/dev/null 2>&1); echo $? )
assert_eq "acquire carriage-return path rejected" "4" "$RC_CR"
# ── stress: 10 unique paths all acquire cleanly ──────────────────────────────
for i in $(seq 1 10); do
wl acquire "wiki/stress/page-$i.md" >/dev/null
rc=$?
if [ $rc -ne 0 ]; then
echo "FAIL stress acquire $i: rc=$rc"
FAIL=$((FAIL + 1))
break
fi
done
LIST_COUNT=$(wl list | wc -l)
assert_eq "10 unique paths all acquired" "10" "$LIST_COUNT"
wl clear-stale --max-age 0 >/dev/null
# ── summary ──────────────────────────────────────────────────────────────────
echo ""
echo "Pass: $PASS Fail: $FAIL"
if [ $FAIL -gt 0 ]; then
exit 1
fi
echo "All wiki-lock tests passed."
+349
View File
@@ -0,0 +1,349 @@
#!/usr/bin/env python3
"""test_wiki_mode.py — hermetic tests for scripts/wiki-mode.py.
Covers config load/save round-trip, all 4 modes' routing, slugification, ID
minting, and the default-to-generic fallback when .vault-meta/mode.json is
absent. No network, no LLM, no ollama. Pure stdlib + subprocess.
Usage:
python3 tests/test_wiki_mode.py
"""
import importlib.util
import json
import os
import subprocess
import sys
import tempfile
from pathlib import Path
from unittest import mock
ROOT = Path(__file__).resolve().parent.parent
HELPER = ROOT / "scripts" / "wiki-mode.py"
spec = importlib.util.spec_from_file_location("wiki_mode", HELPER)
wm = importlib.util.module_from_spec(spec)
spec.loader.exec_module(wm)
class Fail(SystemExit):
pass
def assert_eq(label, expected, actual):
if expected != actual:
raise Fail(f"FAIL {label}: expected {expected!r}, got {actual!r}")
print(f"OK {label}")
def assert_true(label, cond, hint=""):
if not cond:
raise Fail(f"FAIL {label}{(': ' + hint) if hint else ''}")
print(f"OK {label}")
# ─── Default-to-generic when no config file ──────────────────────────────────
def test_load_config_defaults_to_generic_when_absent():
with tempfile.TemporaryDirectory() as tmp:
with mock.patch.object(wm, "MODE_PATH", Path(tmp) / "nonexistent.json"):
cfg = wm.load_config()
assert_eq("absent config → mode=generic", "generic", cfg["mode"])
assert_eq("schema_version present", 1, cfg["schema_version"])
assert_true("all 4 mode configs present",
set(cfg["config"].keys()) == {"lyt", "para", "zettelkasten", "generic"})
# ─── Config save → load round-trip ───────────────────────────────────────────
def test_save_load_roundtrip():
with tempfile.TemporaryDirectory() as tmp:
mode_path = Path(tmp) / "mode.json"
with mock.patch.object(wm, "MODE_PATH", mode_path), \
mock.patch.object(wm, "META_DIR", Path(tmp)):
cfg = wm.load_config()
cfg["mode"] = "lyt"
cfg["configured_at"] = "2026-05-17T00:00:00Z"
wm.save_config(cfg)
assert_true("mode.json written", mode_path.is_file())
cfg2 = wm.load_config()
assert_eq("round-trip mode", "lyt", cfg2["mode"])
assert_eq("round-trip configured_at", "2026-05-17T00:00:00Z", cfg2["configured_at"])
# ─── Corrupted mode.json falls back to generic with warning ──────────────────
def test_corrupted_config_falls_back_to_generic():
with tempfile.TemporaryDirectory() as tmp:
mode_path = Path(tmp) / "mode.json"
mode_path.write_text("{ this is not valid json", encoding="utf-8")
with mock.patch.object(wm, "MODE_PATH", mode_path):
cfg = wm.load_config()
assert_eq("corrupted config → mode=generic", "generic", cfg["mode"])
# ─── Mode=generic routing matches v1.7 conventions ──────────────────────────
def test_generic_routing():
cfg = dict(wm.DEFAULT_CONFIG)
cfg["mode"] = "generic"
assert_eq("generic source",
"wiki/sources/Karpathy-2025-essay.md",
wm.route_path("generic", "source", "Karpathy 2025 essay", cfg))
assert_eq("generic entity preserves case",
"wiki/entities/Andrej Karpathy.md",
wm.route_path("generic", "entity", "Andrej Karpathy", cfg))
assert_eq("generic concept",
"wiki/concepts/Compounding Vault.md",
wm.route_path("generic", "concept", "Compounding Vault", cfg))
assert_eq("generic session",
"wiki/sessions/v1-8-launch-prep.md",
wm.route_path("generic", "session", "v1.8 launch prep", cfg))
# ─── Mode=lyt routing: all atomic notes flat under wiki/notes/ ──────────────
def test_lyt_routing():
cfg = dict(wm.DEFAULT_CONFIG)
cfg["mode"] = "lyt"
src = wm.route_path("lyt", "source", "Karpathy essay", cfg)
ent = wm.route_path("lyt", "entity", "Andrej Karpathy", cfg)
con = wm.route_path("lyt", "concept", "Compounding Vault", cfg)
assert_true("lyt source goes to notes/", src.startswith("wiki/notes/"), hint=src)
assert_true("lyt entity goes to notes/", ent.startswith("wiki/notes/"), hint=ent)
assert_true("lyt concept goes to notes/", con.startswith("wiki/notes/"), hint=con)
# ─── Mode=para routing: actionability-based folders ─────────────────────────
def test_para_routing():
cfg = dict(wm.DEFAULT_CONFIG)
cfg["mode"] = "para"
src = wm.route_path("para", "source", "Karpathy essay", cfg)
ent = wm.route_path("para", "entity", "Andrej Karpathy", cfg)
sess = wm.route_path("para", "session", "v1.8 prep", cfg)
res = wm.route_path("para", "research", "compounding-vault", cfg)
assert_true("para source → resources/incoming/", src.startswith("wiki/resources/incoming/"), hint=src)
assert_true("para entity → resources/people/", ent.startswith("wiki/resources/people/"), hint=ent)
assert_true("para session → projects/inbox/", sess.startswith("wiki/projects/inbox/"), hint=sess)
assert_true("para research → resources/<topic>/", "wiki/resources/compounding-vault/" in res, hint=res)
# ─── Mode=zettelkasten routing: flat, timestamp-prefixed ────────────────────
def test_zettelkasten_routing():
cfg = dict(wm.DEFAULT_CONFIG)
cfg["mode"] = "zettelkasten"
p = wm.route_path("zettelkasten", "source", "Karpathy essay", cfg)
# Format: wiki/<20-digit-timestamp-with-microseconds>-<slug>.md
assert_true("zettel path starts with wiki/", p.startswith("wiki/"), hint=p)
assert_true("zettel no subfolders", p.count("/") == 1, hint=p)
fname = p.rsplit("/", 1)[1]
parts = fname.split("-", 1)
# v1.8.1 fix: IDs are 20 digits (YYYYMMDDHHMMSSffffff) for collision resistance
assert_true("zettel ID is 20 digits", parts[0].isdigit() and len(parts[0]) == 20, hint=fname)
# ─── Zettel ID format ───────────────────────────────────────────────────────
def test_mint_zettel_id_format():
zid = wm.mint_zettel_id()
# 14 (YYYYMMDDHHMMSS) + 6 (microseconds) = 20 digits
assert_true("zettel ID is 20-digit string", len(zid) == 20 and zid.isdigit(), hint=zid)
def test_mint_zettel_id_collision_resistance():
"""v1.8.1 fix: rapid back-to-back mint calls produce DIFFERENT IDs.
Microsecond suffix ensures two calls within the same second are distinct.
"""
ids = [wm.mint_zettel_id() for _ in range(10)]
assert_eq("zettel IDs all distinct (10 rapid calls)", 10, len(set(ids)))
def test_slugify_extended_unicode():
"""v1.8.1 fix: explicit test coverage for CJK + Cyrillic (verifier LOW).
The slugify function preserves any Unicode word character; only ASCII
punctuation and emoji get stripped/converted.
"""
assert_eq("CJK preserved", "日本語の文書", wm.slugify("日本語の文書"))
assert_eq("Cyrillic with space", "Привет-мир", wm.slugify("Привет мир"))
assert_eq("Mixed scripts", "Hello-мир-café", wm.slugify("Hello мир café"))
# Emoji is stripped (not in \w); surrounding text joined by single hyphen
assert_eq("Emoji becomes single hyphen between words", "Test-emoji",
wm.slugify("Test 🎉 emoji"))
# ─── Slugify handles unicode + special chars ────────────────────────────────
def test_slugify():
# Case is PRESERVED to match v1.7 entity/concept filing conventions.
assert_eq("ascii slug", "Karpathy-2025-essay", wm.slugify("Karpathy 2025 essay"))
assert_eq("unicode preserved", "café-résumé", wm.slugify("café résumé"))
# Periods become hyphens (so v1.7 → v1-7, not v17)
assert_eq("dots become hyphens", "v1-7-launch-prep", wm.slugify("v1.7 launch! prep?"))
assert_eq("empty → 'untitled'", "untitled", wm.slugify(""))
# ─── Path-traversal hardening (v1.8.2): entity/concept names cannot escape ──
def test_safe_name_strips_path_separators():
"""v1.8.2 fix: names that intentionally preserve case (entity, concept)
must not allow path traversal via '../', leading '/', backslashes, NULs,
or control characters. Spaces and case are still preserved.
"""
assert_eq("traversal '../' stripped", "etcpasswd", wm.safe_name("../../../etc/passwd"))
assert_eq("leading '/' stripped", "etcpasswd", wm.safe_name("/etc/passwd"))
assert_eq("backslash stripped", "etcpasswd", wm.safe_name("..\\..\\etc\\passwd"))
assert_eq("NUL stripped", "foobar", wm.safe_name("foo\x00bar"))
assert_eq("control chars stripped", "foobar", wm.safe_name("foo\x01\x02bar"))
assert_eq("leading dot stripped (no hidden files)", "hidden", wm.safe_name(".hidden"))
assert_eq("leading hyphen stripped (no flag escapes)", "flag", wm.safe_name("-flag"))
assert_eq("spaces + case preserved", "Andrej Karpathy", wm.safe_name("Andrej Karpathy"))
assert_eq("empty after strip → 'untitled'", "untitled", wm.safe_name("/"))
def test_route_path_blocks_traversal_for_generic_entity_and_concept():
"""The end-to-end route must not allow the returned path to escape vault root."""
import os
cfg = dict(wm.DEFAULT_CONFIG); cfg["mode"] = "generic"
vault = os.path.abspath(".")
for content_type, malicious in [
("entity", "../../../etc/passwd"),
("concept", "/etc/passwd"),
("entity", "..\\..\\..\\Windows\\System32"),
("research","../escape"),
]:
p = wm.route_path("generic", content_type, malicious, cfg)
abs_p = os.path.abspath(p)
assert_true(f"generic {content_type}({malicious!r}) stays inside vault",
abs_p.startswith(vault + os.sep), hint=f"got {abs_p}")
def test_route_path_blocks_traversal_for_para_entity_and_concept():
import os
cfg = dict(wm.DEFAULT_CONFIG); cfg["mode"] = "para"
vault = os.path.abspath(".")
for content_type, malicious in [
("entity", "../../../etc/passwd"),
("concept", "/etc/shadow"),
]:
p = wm.route_path("para", content_type, malicious, cfg)
abs_p = os.path.abspath(p)
assert_true(f"para {content_type}({malicious!r}) stays inside vault",
abs_p.startswith(vault + os.sep), hint=f"got {abs_p}")
# ─── CLI --mode preview override (v1.8.2) ───────────────────────────────────
def test_cli_route_mode_override_previews_without_writing():
"""`route --mode lyt source X` must return an lyt path even when current
mode is generic, and must NOT modify .vault-meta/mode.json."""
before = subprocess.run([sys.executable, str(HELPER), "get"],
capture_output=True, text=True, timeout=5).stdout.strip()
result = subprocess.run(
[sys.executable, str(HELPER), "route", "--mode", "lyt", "source", "Preview Test"],
capture_output=True, text=True, timeout=5,
)
assert_eq("cli route --mode rc=0", 0, result.returncode)
path = result.stdout.strip()
assert_true("preview returns lyt notes/ path",
path.startswith("wiki/notes/"), hint=path)
after = subprocess.run([sys.executable, str(HELPER), "get"],
capture_output=True, text=True, timeout=5).stdout.strip()
assert_eq("current mode unchanged by preview", before, after)
def test_cli_route_mode_override_rejects_invalid():
result = subprocess.run(
[sys.executable, str(HELPER), "route", "--mode", "bogus", "source", "X"],
capture_output=True, text=True, timeout=5,
)
assert_true("preview rejects bogus mode", result.returncode != 0,
hint=f"rc={result.returncode}")
# ─── Invalid content type raises ───────────────────────────────────────────
def test_invalid_content_type_raises():
cfg = dict(wm.DEFAULT_CONFIG)
try:
wm.route_path("generic", "garbage", "x", cfg)
raise Fail("expected SystemExit(4) for invalid type")
except SystemExit as e:
assert_eq("invalid type → exit 4", 4, e.code)
# ─── CLI subprocess: `wiki-mode.py get` returns mode string ─────────────────
def test_cli_get_returns_mode():
"""End-to-end CLI test via subprocess; uses the actual vault's mode (or generic if absent)."""
result = subprocess.run(
[sys.executable, str(HELPER), "get"],
capture_output=True, text=True, timeout=5,
)
assert_eq("cli get rc=0", 0, result.returncode)
mode = result.stdout.strip()
assert_true("cli get returns one of 4 modes",
mode in ("generic", "lyt", "para", "zettelkasten"), hint=mode)
# ─── CLI subprocess: `wiki-mode.py id` returns 14-digit timestamp ───────────
def test_cli_id_returns_timestamp():
result = subprocess.run(
[sys.executable, str(HELPER), "id"],
capture_output=True, text=True, timeout=5,
)
assert_eq("cli id rc=0", 0, result.returncode)
zid = result.stdout.strip()
assert_true("cli id is 20-digit", len(zid) == 20 and zid.isdigit(), hint=zid)
# ─── CLI subprocess: `wiki-mode.py route source NAME` returns a path ────────
def test_cli_route_returns_path():
result = subprocess.run(
[sys.executable, str(HELPER), "route", "source", "Test Source"],
capture_output=True, text=True, timeout=5,
)
assert_eq("cli route rc=0", 0, result.returncode)
path = result.stdout.strip()
assert_true("cli route returns wiki-rooted path",
path.startswith("wiki/"), hint=path)
assert_true("cli route returns .md path", path.endswith(".md"), hint=path)
# ─── CLI subprocess: invalid mode rejected ──────────────────────────────────
def test_cli_set_rejects_invalid_mode():
result = subprocess.run(
[sys.executable, str(HELPER), "set", "bogus"],
capture_output=True, text=True, timeout=5,
)
assert_true("cli set rejects invalid mode", result.returncode != 0,
hint=f"rc={result.returncode}")
# ─── CLI subprocess: templates listing returns all 6 ───────────────────────
def test_cli_templates_lists_six():
result = subprocess.run(
[sys.executable, str(HELPER), "templates"],
capture_output=True, text=True, timeout=5,
)
assert_eq("cli templates rc=0", 0, result.returncode)
lines = [l for l in result.stdout.strip().split("\n") if l]
assert_eq("cli templates returns 6 paths", 6, len(lines))
def main():
print("=== test_wiki_mode.py ===")
test_load_config_defaults_to_generic_when_absent()
test_save_load_roundtrip()
test_corrupted_config_falls_back_to_generic()
test_generic_routing()
test_lyt_routing()
test_para_routing()
test_zettelkasten_routing()
test_mint_zettel_id_format()
test_mint_zettel_id_collision_resistance()
test_slugify()
test_slugify_extended_unicode()
test_safe_name_strips_path_separators()
test_route_path_blocks_traversal_for_generic_entity_and_concept()
test_route_path_blocks_traversal_for_para_entity_and_concept()
test_cli_route_mode_override_previews_without_writing()
test_cli_route_mode_override_rejects_invalid()
test_invalid_content_type_raises()
test_cli_get_returns_mode()
test_cli_id_returns_timestamp()
test_cli_route_returns_path()
test_cli_set_rejects_invalid_mode()
test_cli_templates_lists_six()
print("\nAll wiki-mode tests passed.")
if __name__ == "__main__":
main()