PY / Cognition / lib_bejson_cognition.py
System
PY
Family
Cognition
API Density
12
Public API Surface
def bejson_cognition_compact_logsdef bejson_cognition_cosine_similaritydef bejson_cognition_init_indexdef bejson_cognition_init_matrixdef bejson_cognition_integrate_patchesdef bejson_cognition_log_turndef bejson_cognition_querydef bejson_cognition_safe_writedef bejson_cognition_scan_indexdef bejson_cognition_sleepdef bejson_cognition_upsertdef bejson_cognition_wake
Full Source Implementation
FILE // lib_bejson_cognition.py
"""
Library: lib_bejson_cognition.py
Family: AI
Jurisdiction: ["BEJSON_LIBRARIES", "PY"]
Status: OFFICIAL
Author: Elton Boehnen
Version: 2.0 OFFICIAL
MFDB Version: 1.31
Format_Creator: Elton Boehnen
Date: 2026-05-18
Description: Manager for semantic and episodic memory structures in BEJSON.
"""
import json
import os
import sys
import time
import uuid
import logging
import hashlib
import shutil
import stat
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
# ===========================================================================
# SIBLING PATH RESOLUTION (Accessing the Locked Core)
# ===========================================================================
CURRENT_SIBLING = os.path.dirname(os.path.abspath(__file__))
PARENT_LIB_DIR = os.path.dirname(CURRENT_SIBLING)
CORE_SIBLING = os.path.join(PARENT_LIB_DIR, "Core")
if CORE_SIBLING not in sys.path:
sys.path.append(CORE_SIBLING)
try:
from lib_bejson_core import bejson_core_atomic_write, bejson_core_load_file, bejson_core_acquire_lock, bejson_core_release_lock
from lib_mfdb_core import mfdb_core_resolve_path
from lib_bejson_validator import bejson_validator_validate_string
# AI Sibling for Vectorization
AI_SIBLING = os.path.join(PARENT_LIB_DIR, "AI")
if AI_SIBLING not in sys.path: sys.path.append(AI_SIBLING)
from lib_bejson_genai import GenAIClient
except ImportError:
logging.critical("[FATAL] Core Sibling is unreachable. Matrix offline.")
raise RuntimeError("[270] Core Sibling unreachable — cannot initialize Cognition library.")
# ===========================================================================
# COGNITION ERROR CODES (270-289) & SCHEMAS
# ===========================================================================
E_COGNITION_INVALID_MATRIX = 270
E_COGNITION_AGENT_NOT_FOUND = 271
E_COGNITION_INDEX_MISSING = 272
E_COGNITION_PATCH_FAILED = 273
E_COGNITION_SCHEMA_VIOLATION = 274
E_COGNITION_LOCK_TIMEOUT = 275
BEJSON_COGNITION_SCHEMA = [
{"name": "Record_Type_Parent", "type": "string"},
{"name": "id", "type": "string", "Record_Type_Parent": "AgentState"},
{"name": "timestamp", "type": "number", "Record_Type_Parent": "AgentState"},
{"name": "last_checkpoint", "type": "string", "Record_Type_Parent": "AgentState"},
{"name": "core_directives", "type": "object", "Record_Type_Parent": "AgentState"},
{"name": "summary_blob", "type": "string", "Record_Type_Parent": "AgentState"},
{"name": "stack_id", "type": "string", "Record_Type_Parent": "ExecutionStack"},
{"name": "agent_id_fk", "type": "string", "Record_Type_Parent": "ExecutionStack"},
{"name": "stack_timestamp", "type": "number", "Record_Type_Parent": "ExecutionStack"},
{"name": "task_queue", "type": "array", "Record_Type_Parent": "ExecutionStack"},
{"name": "pending_context", "type": "object", "Record_Type_Parent": "ExecutionStack"},
{"name": "log_id", "type": "string", "Record_Type_Parent": "EpisodicLog"},
{"name": "log_timestamp", "type": "number", "Record_Type_Parent": "EpisodicLog"},
{"name": "user_input", "type": "string", "Record_Type_Parent": "EpisodicLog"},
{"name": "agent_response", "type": "string", "Record_Type_Parent": "EpisodicLog"},
{"name": "payloads_used", "type": "array", "Record_Type_Parent": "EpisodicLog"},
{"name": "patch_id", "type": "string", "Record_Type_Parent": "MetaPatch"},
{"name": "patch_timestamp", "type": "number", "Record_Type_Parent": "MetaPatch"},
{"name": "target_layer", "type": "string", "Record_Type_Parent": "MetaPatch"},
{"name": "patch_instruction", "type": "object", "Record_Type_Parent": "MetaPatch"},
{"name": "status", "type": "string", "Record_Type_Parent": "MetaPatch"}
]
# ===========================================================================
# SAFE ATOMIC WRITER (Mutex Backoff)
# ===========================================================================
def bejson_cognition_safe_write(filepath: str, data: dict, max_retries: int = 5) -> bool:
resolved_path = mfdb_core_resolve_path(filepath)
attempt = 0
base_sleep = 0.2
while attempt < max_retries:
if bejson_core_acquire_lock(resolved_path, timeout=2):
try:
bejson_core_atomic_write(resolved_path, data)
return True
finally:
bejson_core_release_lock(resolved_path)
attempt += 1
sleep_time = base_sleep * (2 ** attempt) + (time.time() % 0.1)
logging.warning(f"[COGNITION] Lock contention. Retrying in {sleep_time:.2f}s...")
time.sleep(sleep_time)
raise RuntimeError(f"[{E_COGNITION_LOCK_TIMEOUT}] FATAL: Lock timeout.")
def bejson_cognition_cosine_similarity(vec1: list, vec2: list) -> float:
"""Calculate cosine similarity between two vectors."""
if not vec1 or not vec2 or len(vec1) != len(vec2): return 0.0
dot_product = sum(a * b for a, b in zip(vec1, vec2))
norm_a = sum(a * a for a in vec1) ** 0.5
norm_b = sum(b * b for b in vec2) ** 0.5
if norm_a == 0 or norm_b == 0: return 0.0
return dot_product / (norm_a * norm_b)
# ===========================================================================
# DATABASE ENGINE FUNCTIONS
# ===========================================================================
def bejson_cognition_init_matrix(db_path: str) -> dict:
resolved_path = mfdb_core_resolve_path(db_path)
if os.path.exists(resolved_path): doc = bejson_core_load_file(resolved_path)
else: doc = None
if doc and doc.get("Format_Version") == "104db": return doc
return {
"Format": "BEJSON", "Format_Version": "104db", "Format_Creator": "Elton Boehnen",
"Records_Type": ["AgentState", "ExecutionStack", "EpisodicLog", "MetaPatch"],
"Fields": BEJSON_COGNITION_SCHEMA, "Values": []
}
def bejson_cognition_init_index(index_path: str) -> dict:
resolved_index = mfdb_core_resolve_path(index_path)
if os.path.exists(resolved_index):
doc = bejson_core_load_file(resolved_index)
else:
doc = None
if doc: return doc
return {
"Format": "BEJSON",
"Format_Version": "104a",
"Format_Creator": "Elton Boehnen",
"Records_Type": ["ContextIndex"],
"Fields": [{"name": "placeholder", "type": "string"}],
"Values": [["placeholder"]],
"triggers": {}
}
def bejson_cognition_query(doc: dict, record_type: str, filters: dict = None) -> List[dict]:
results = []
if "Fields" not in doc or "Values" not in doc: return results
field_indices = {f["name"]: i for i, f in enumerate(doc["Fields"])}
for row in doc.get("Values", []):
if row[0] == record_type:
record = {}
for f in doc["Fields"]:
if f.get("Record_Type_Parent") in [record_type, None]:
val = row[field_indices[f["name"]]]
if val is not None: record[f["name"]] = val
match = True
if filters:
for k, v in filters.items():
if record.get(k) != v: match = False; break
if match: results.append(record)
return results
def bejson_cognition_upsert(doc: dict, record_type: str, record_id: str, **kwargs) -> dict:
field_indices = {f["name"]: i for i, f in enumerate(doc["Fields"])}
# Identify the ID field name for this record type
id_field = "id"
if record_type == "ExecutionStack": id_field = "stack_id"
elif record_type == "EpisodicLog": id_field = "log_id"
elif record_type == "MetaPatch": id_field = "patch_id"
# Identify the timestamp field name for this record type
ts_field = "timestamp"
if record_type == "ExecutionStack": ts_field = "stack_timestamp"
elif record_type == "EpisodicLog": ts_field = "log_timestamp"
elif record_type == "MetaPatch": ts_field = "patch_timestamp"
# Search for existing record
target_idx = -1
for i, r in enumerate(doc["Values"]):
if r[0] == record_type and r[field_indices[id_field]] == record_id:
target_idx = i
break
row_data = [None] * len(doc["Fields"])
row_data[0] = record_type
row_data[field_indices[id_field]] = record_id
row_data[field_indices[ts_field]] = time.time()
if target_idx != -1:
row_data = list(doc["Values"][target_idx])
row_data[field_indices[ts_field]] = time.time()
for key, val in kwargs.items():
if key in field_indices:
row_data[field_indices[key]] = val
if target_idx != -1: doc["Values"][target_idx] = row_data
else: doc["Values"].append(row_data)
return doc
# ===========================================================================
# STATE MANAGEMENT & AMNESIA PATTERN
# ===========================================================================
def bejson_cognition_wake(db_doc: dict, agent_id: str, genesis_directives: dict = None) -> dict:
state = bejson_cognition_query(db_doc, "AgentState", {"id": agent_id})
stack = bejson_cognition_query(db_doc, "ExecutionStack", {"agent_id_fk": agent_id})
if not state:
return {"id": agent_id, "core_directives": genesis_directives or {"persona": "Default"}, "summary_blob": "Init.", "task_queue": [], "pending_context": {}, "active_buffer": []}
return {"id": agent_id, "core_directives": state[0].get("core_directives", {}), "summary_blob": state[0].get("summary_blob", ""), "task_queue": stack[0].get("task_queue", []) if stack else [], "pending_context": stack[0].get("pending_context", {}) if stack else {}, "active_buffer": []}
def bejson_cognition_sleep(db_path: str, db_doc: dict, agent_state: dict) -> None:
agent_id = agent_state["id"]
if agent_state.get("active_buffer"):
agent_state["summary_blob"] = f"Summary updated {time.ctime()}: Processed {len(agent_state['active_buffer'])} ops."
db_doc = bejson_cognition_upsert(db_doc, "AgentState", agent_id, core_directives=agent_state.get("core_directives"), summary_blob=agent_state.get("summary_blob"), last_checkpoint=datetime.now(timezone.utc).isoformat())
db_doc = bejson_cognition_upsert(db_doc, "ExecutionStack", f"STK-{agent_id}", agent_id_fk=agent_id, task_queue=agent_state.get("task_queue"), pending_context=agent_state.get("pending_context"))
agent_state["active_buffer"] = []
bejson_cognition_safe_write(db_path, db_doc)
def bejson_cognition_log_turn(db_doc: dict, user_input: str, agent_response: str, payloads_used: list) -> dict:
return bejson_cognition_upsert(db_doc, "EpisodicLog", f"LOG-{uuid.uuid4().hex[:8]}", user_input=user_input, agent_response=agent_response, payloads_used=payloads_used)
# ===========================================================================
# COMPACTION & INDEXING
# ===========================================================================
def bejson_cognition_compact_logs(db_doc: dict, archive_path: str, max_logs: int = 100) -> dict:
log_rows = [(i, r) for i, r in enumerate(db_doc.get("Values", [])) if r[0] == "EpisodicLog"]
if len(log_rows) <= max_logs: return db_doc
# Sort by log_timestamp field (resolved by name for schema safety)
log_ts_idx = next((i for i, f in enumerate(db_doc["Fields"]) if f["name"] == "log_timestamp"), 2)
log_rows.sort(key=lambda x: x[1][log_ts_idx] or 0)
to_archive = len(log_rows) - max_logs
archive_rows = [x[1] for x in log_rows[:to_archive]]
remove_indices = {x[0] for x in log_rows[:to_archive]}
resolved_archive = mfdb_core_resolve_path(archive_path)
try: archive_doc = bejson_core_load_file(resolved_archive)
except: archive_doc = None
if not archive_doc or archive_doc.get("Format_Version") != "104db":
archive_doc = {"Format": "BEJSON", "Format_Version": "104db", "Format_Creator": "Elton Boehnen", "Records_Type": ["EpisodicLog"], "Fields": db_doc.get("Fields", []), "Values": []}
archive_doc["Values"].extend(archive_rows)
bejson_cognition_safe_write(resolved_archive, archive_doc)
db_doc["Values"] = [r for i, r in enumerate(db_doc["Values"]) if i not in remove_indices]
return db_doc
def bejson_cognition_scan_index(index_doc: dict, text: str, payloads_dir: str, threshold: float = 0.75) -> Tuple[List[str], List[str]]:
loaded_payloads, payload_names = [], []
# --- PHASE 1: Keyword Scanning (Legacy Fallback) ---
for keyword, payload_file in index_doc.get("triggers", {}).items():
if keyword in text.lower():
payload_data = bejson_core_load_file(mfdb_core_resolve_path(os.path.join(payloads_dir, payload_file)))
if payload_data:
loaded_payloads.append(json.dumps(payload_data))
payload_names.append(payload_file)
# --- PHASE 2: Semantic Vector Scanning ---
if index_doc.get("Values"):
try:
client = GenAIClient()
query_vector = client.embed_content(text)
if query_vector:
field_map = {f["name"]: i for i, f in enumerate(index_doc["Fields"])}
for row in index_doc["Values"]:
trigger_vector = row[field_map["embedding"]]
similarity = bejson_cognition_cosine_similarity(query_vector, trigger_vector)
if similarity >= threshold:
payload_file = row[field_map["payload_file"]]
if payload_file not in payload_names:
payload_data = bejson_core_load_file(mfdb_core_resolve_path(os.path.join(payloads_dir, payload_file)))
if payload_data:
loaded_payloads.append(json.dumps(payload_data))
payload_names.append(payload_file)
logging.info(f"[COGNITION] Semantic match: '{row[field_map['trigger_text']]}' (Score: {similarity:.4f})")
except Exception as e:
logging.error(f"[COGNITION] Semantic scan failed: {e}")
return loaded_payloads, payload_names
# ===========================================================================
# SELF-PATCHING (Meta-Cognitive Loop)
# ===========================================================================
def bejson_cognition_integrate_patches(db_path: str, db_doc: dict, index_path: str, index_doc: dict) -> None:
pending_patches = bejson_cognition_query(db_doc, "MetaPatch", {"status": "pending"})
if not pending_patches:
bejson_cognition_safe_write(db_path, bejson_cognition_compact_logs(db_doc, "{SC_ROOT}/resources/Archives/episodic_archive.104db.bejson"))
return
resolved_index = mfdb_core_resolve_path(index_path)
if os.path.exists(resolved_index): shutil.copy2(resolved_index, f"{resolved_index}.bak")
for patch in pending_patches:
instr = patch.get("patch_instruction", {})
# --- LAYER: Context Index ---
if patch["target_layer"] == "context_index" and instr.get("action") == "APPEND":
if "triggers" not in index_doc: index_doc["triggers"] = {}
index_doc["triggers"][instr["target_key"]] = instr["target_value"]
db_doc = bejson_cognition_upsert(db_doc, "MetaPatch", patch["patch_id"], status="applied")
# --- LAYER: Tooling (Generative Tool Forging) ---
elif patch["target_layer"] == "Tooling" and instr.get("action") == "FORGE_TOOL":
# SANDBOX CONTAINMENT CHECK
sandbox_state_file = mfdb_core_resolve_path("{INTERNAL_STORAGE}/Admin/data/policy/sandbox_state.json")
if os.path.exists(sandbox_state_file):
with open(sandbox_state_file, "r") as sf:
state = json.load(sf)
if state.get("sandbox_enabled"):
logging.warning(f"[CONTAINMENT] Sandbox active. Refusing to forge tool.")
db_doc = bejson_cognition_upsert(db_doc, "MetaPatch", patch["patch_id"], status="blocked_by_sandbox")
continue
try:
metadata = instr.get("tool_metadata", {})
filename = metadata.get("filename")
code = instr.get("code")
if not filename or not code:
logging.error(f"[COGNITION] Patch {patch['patch_id']} missing filename or code.")
continue
# 1. Forge the physical tool
# Using portable path resolution
tools_dir = mfdb_core_resolve_path("{INTERNAL_STORAGE}/Admin/tools")
tool_path = os.path.join(tools_dir, filename)
with open(tool_path, "w") as f:
f.write(code)
# 2. Apply executable permissions
st = os.stat(tool_path)
os.chmod(tool_path, st.st_mode | stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH)
# 3. Register in CLITool Registry
registry_path = mfdb_core_resolve_path("{INTERNAL_STORAGE}/Admin/init/registry/mfdb_layers/data/clitool.bejson")
registry_doc = bejson_core_load_file(registry_path)
if registry_doc and "Values" in registry_doc:
# [name, identifier, path_or_url, version, description, is_active, owner_email, website, clitool_guid, timestamp, session]
new_record = [
metadata.get("name", "Generated Tool"),
metadata.get("identifier", uuid.uuid4().hex),
tool_path,
metadata.get("version", "1.0.0"),
metadata.get("description", "Autonomously forged tool."),
True,
"boehnenelton2024@gmail.com",
"",
f"guid-forge-{uuid.uuid4().hex[:8]}",
datetime.now(timezone.utc).isoformat(),
"forge-session"
]
registry_doc["Values"].append(new_record)
bejson_core_atomic_write(registry_path, registry_doc)
db_doc = bejson_cognition_upsert(db_doc, "MetaPatch", patch["patch_id"], status="applied")
logging.info(f"[COGNITION] Tool '{filename}' forged and registered successfully.")
except Exception as e:
logging.error(f"[COGNITION] Failed to forge tool in patch {patch['patch_id']}: {e}")
db_doc = bejson_cognition_upsert(db_doc, "MetaPatch", patch["patch_id"], status="failed")
# --- LAYER: Orchestration (The Hive Mind) ---
elif patch["target_layer"] == "Orchestration" and instr.get("action") == "SPAWN_AGENT":
try:
agent_id = instr.get("agent_id")
persona = instr.get("persona", "Worker")
initial_task = instr.get("initial_task")
if not agent_id or not initial_task:
logging.error(f"[COGNITION] SPAWN_AGENT patch {patch['patch_id']} missing agent_id or task.")
db_doc = bejson_cognition_upsert(db_doc, "MetaPatch", patch["patch_id"], status="failed")
continue
# 1. Initialize AgentState
db_doc = bejson_cognition_upsert(
db_doc, "AgentState", agent_id,
core_directives={"persona": persona, "status": "active"},
summary_blob="Initialized by Orchestrator.",
last_checkpoint=datetime.now(timezone.utc).isoformat()
)
# 2. Initialize ExecutionStack with task
db_doc = bejson_cognition_upsert(
db_doc, "ExecutionStack", f"STK-{agent_id}",
agent_id_fk=agent_id,
task_queue=[{"task_id": uuid.uuid4().hex[:8], "description": initial_task, "status": "pending", "result": None}],
pending_context={}
)
logging.info(f"[COGNITION] Hive Mind: Spawned sub-agent {agent_id}.")
db_doc = bejson_cognition_upsert(db_doc, "MetaPatch", patch["patch_id"], status="applied")
except Exception as e:
logging.error(f"[COGNITION] Failed to spawn agent in patch {patch['patch_id']}: {e}")
db_doc = bejson_cognition_upsert(db_doc, "MetaPatch", patch["patch_id"], status="failed")
try: bejson_validator_validate_string(json.dumps(index_doc))
except Exception as e: logging.error(f"[FATAL] Schema corrupted. {e}"); return
bejson_cognition_safe_write(index_path, index_doc)
bejson_cognition_safe_write(db_path, bejson_cognition_compact_logs(db_doc, "{SC_ROOT}/resources/Archives/episodic_archive.104db.bejson"))
built from BEJSON HTML3 Libraries 2.0