import os import json import hashlib import asyncio import shlex import uuid import base64 import imaplib import email import tempfile import re import time from pathlib import Path from email import policy from email.utils import parseaddr, parsedate_to_datetime from datetime import datetime, timezone from typing import Optional, List, Dict, Any, Literal import httpx from fastapi import FastAPI, HTTPException, Header from fastapi.responses import FileResponse, HTMLResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, Field from gremlin_python.driver import client as gremlin_client from gremlin_python.driver.serializer import GraphSONSerializersV3d0 from dotenv import load_dotenv APP_NAME = "concept-api" APP_VERSION = os.getenv("APP_VERSION", "dev-local") APP_STARTED_AT_UTC = datetime.now(timezone.utc).isoformat() # Keep env loading behavior aligned with connectivity_check.py. load_dotenv() # ---- config (set these env vars) ---- GREMLIN_URL = os.getenv("GREMLIN_URL", "ws://localhost:8182/gremlin") ES_URL = os.getenv("ES_URL", "http://localhost:9200") ES_INDEX = os.getenv("ES_INDEX", "concepts") IPFS_API = os.getenv("IPFS_API", "http://localhost:5001") # Kubo HTTP API OLLAMA_URL = os.getenv("OLLAMA_URL", "http://localhost:11434") OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "llama3.1:8b") OLLAMA_EMBED_MODEL = os.getenv("OLLAMA_EMBED_MODEL", "nomic-embed-text") PROJECTOR_SSH_HOST = os.getenv("PROJECTOR_SSH_HOST", "lakehouse-core.rakeroots.lan") PROJECTOR_REMOTE_DIR = os.getenv("PROJECTOR_REMOTE_DIR", "/tmp/jecio") PROJECTOR_REMOTE_SCRIPT = os.getenv("PROJECTOR_REMOTE_SCRIPT", "./run-projector-standard.sh") PROJECTOR_SSH_BIN = os.getenv("PROJECTOR_SSH_BIN", "ssh") PROJECTOR_SSH_OPTS = os.getenv("PROJECTOR_SSH_OPTS", "-o BatchMode=yes -o ConnectTimeout=10") PROJECTOR_SCP_BIN = os.getenv("PROJECTOR_SCP_BIN", "scp") PROJECTOR_SCP_OPTS = os.getenv("PROJECTOR_SCP_OPTS", "-o BatchMode=yes -o ConnectTimeout=10") PROJECTOR_TIMEOUT_SEC = int(os.getenv("PROJECTOR_TIMEOUT_SEC", "900")) ADMIN_API_KEY = os.getenv("ADMIN_API_KEY", "") RUNS_REMOTE_SCRIPT = os.getenv("RUNS_REMOTE_SCRIPT", "./record-run-via-spark-container.sh") RUN_EVENTS_REMOTE_SCRIPT = os.getenv("RUN_EVENTS_REMOTE_SCRIPT", "./record-run-event-via-spark-container.sh") INGEST_MESSAGE_REMOTE_SCRIPT = os.getenv("INGEST_MESSAGE_REMOTE_SCRIPT", "./ingest-message-via-spark-container.sh") INGEST_MESSAGES_BATCH_REMOTE_SCRIPT = os.getenv( "INGEST_MESSAGES_BATCH_REMOTE_SCRIPT", "./ingest-messages-batch-via-spark-container.sh", ) ASSISTANT_FEEDBACK_REMOTE_SCRIPT = os.getenv( "ASSISTANT_FEEDBACK_REMOTE_SCRIPT", "./record-assistant-feedback-via-spark-container.sh", ) ASSISTANT_FEEDBACK_QUERY_REMOTE_SCRIPT = os.getenv( "ASSISTANT_FEEDBACK_QUERY_REMOTE_SCRIPT", "./query-assistant-feedback-via-spark-container.sh", ) ASSISTANT_METRICS_QUERY_REMOTE_SCRIPT = os.getenv( "ASSISTANT_METRICS_QUERY_REMOTE_SCRIPT", "./query-assistant-metrics-via-spark-container.sh", ) ASSISTANT_ACTION_REMOTE_SCRIPT = os.getenv( "ASSISTANT_ACTION_REMOTE_SCRIPT", "./record-assistant-action-via-spark-container.sh", ) ASSISTANT_ACTIONS_QUERY_REMOTE_SCRIPT = os.getenv( "ASSISTANT_ACTIONS_QUERY_REMOTE_SCRIPT", "./query-assistant-actions-via-spark-container.sh", ) IMAP_CHECKPOINT_REMOTE_SCRIPT = os.getenv( "IMAP_CHECKPOINT_REMOTE_SCRIPT", "./query-imap-checkpoint-via-spark-container.sh", ) CREATE_MESSAGES_RELEASE_REMOTE_SCRIPT = os.getenv( "CREATE_MESSAGES_RELEASE_REMOTE_SCRIPT", "./create-messages-release-via-spark-container.sh", ) app = FastAPI(title=APP_NAME) UI_DIR = Path(__file__).resolve().parent / "ui" UI_ASSETS_DIR = UI_DIR / "assets" if UI_ASSETS_DIR.exists(): app.mount("/ui/assets", StaticFiles(directory=str(UI_ASSETS_DIR)), name="ui-assets") # --------- models --------- class ConceptCreate(BaseModel): canonical_name: str = Field(..., min_length=1) kind: Optional[str] = None aliases: List[str] = [] description: Optional[str] = None external_ids: Dict[str, str] = {} # {"wikidata":"Q42"} etc. tags: List[str] = [] class ConceptOut(BaseModel): concept_id: str canonical_name: str kind: Optional[str] = None aliases: List[str] = [] external_ids: Dict[str, str] = {} tags: List[str] = [] latest_cid: Optional[str] = None summary: Optional[str] = None created_at: str updated_at: str class ProjectionTrigger(BaseModel): release_name: str = Field(..., min_length=1) targets: Literal["es", "gremlin", "both"] = "both" concept_table: Optional[str] = None dry_run: bool = False class MessageIngestPayload(BaseModel): thread_id: str = Field(..., min_length=1) message_id: str = Field(..., min_length=1) sender: str = Field(..., min_length=1) channel: str = Field(..., min_length=1) sent_at: Optional[str] = None body: str = Field(..., min_length=1) metadata: Dict[str, Any] = {} table: str = "lake.db1.messages" class MessageIngestItem(BaseModel): thread_id: str = Field(..., min_length=1) message_id: str = Field(..., min_length=1) sender: str = Field(..., min_length=1) channel: str = Field(..., min_length=1) sent_at: Optional[str] = None body: str = Field(..., min_length=1) metadata: Dict[str, Any] = {} class MessageIngestBatchPayload(BaseModel): table: str = "lake.db1.messages" dedupe_mode: Literal["none", "message_id", "thread_message"] = "none" messages: List[MessageIngestItem] = Field(default_factory=list) class EmailImapIngestPayload(BaseModel): host: str = Field(..., min_length=1) port: int = 993 use_ssl: bool = True username: str = Field(..., min_length=1) password: Optional[str] = None mailbox: str = "INBOX" search_criteria: str = "ALL" max_messages: int = Field(default=50, ge=1, le=500) table: str = "lake.db1.messages" dedupe_mode: Literal["none", "message_id", "thread_message"] = "message_id" channel: str = "email-imap" incremental: bool = True since_uid: Optional[int] = None class PollAndProjectPayload(BaseModel): imap: EmailImapIngestPayload release_name: Optional[str] = None release_prefix: str = "rel" targets: Literal["es", "gremlin", "both"] = "es" concept_table: str = "lake.db1.messages" dry_run: bool = False project_if_no_new: bool = False class AssistantDraftPayload(BaseModel): task_type: Literal["message", "finance", "gov", "general"] = "general" goal: str = Field(..., min_length=3) recipient: Optional[str] = None tone: Optional[str] = "professional" constraints: List[str] = Field(default_factory=list) release_name: Optional[str] = None max_sources: int = Field(default=5, ge=1, le=20) class AssistantDraftSource(BaseModel): concept_id: str source_pk: Optional[str] = None source_table: Optional[str] = None release_name: Optional[str] = None score: Optional[float] = None class AssistantDraftResponse(BaseModel): task_type: str draft: str sources: List[AssistantDraftSource] confidence: float needs_review: bool release_name: Optional[str] = None class AssistantPlanPayload(BaseModel): task_type: Literal["message", "finance", "gov", "general"] = "general" objective: str = Field(..., min_length=3) constraints: List[str] = Field(default_factory=list) release_name: Optional[str] = None max_sources: int = Field(default=5, ge=1, le=20) max_steps: int = Field(default=6, ge=1, le=20) class AssistantPlanStep(BaseModel): step_id: str title: str action_type: Literal["research", "draft", "ask_user", "prepare_data", "review"] requires_approval: bool = False notes: Optional[str] = None class AssistantPlanResponse(BaseModel): objective: str task_type: str plan: List[AssistantPlanStep] sources: List[AssistantDraftSource] needs_review: bool confidence: float release_name: Optional[str] = None class AssistantExecuteStepPayload(BaseModel): task_type: Literal["message", "finance", "gov", "general"] = "general" objective: str = Field(..., min_length=3) release_name: Optional[str] = None plan: List[AssistantPlanStep] step_id: str = Field(..., min_length=1) approved: bool = False manual_confirm_token: Optional[str] = None class AssistantExecuteStepResponse(BaseModel): action_id: str step_id: str status: Literal["blocked", "executed"] output: Dict[str, Any] needs_review: bool class AssistantFeedbackPayload(BaseModel): outcome: Literal["accepted", "edited", "rejected"] task_type: Literal["message", "finance", "gov", "general"] = "general" release_name: Optional[str] = None goal: Optional[str] = None draft: str = Field(..., min_length=1) final_text: Optional[str] = None sources: List[AssistantDraftSource] = Field(default_factory=list) confidence: Optional[float] = None needs_review: bool = True notes: Optional[str] = None class AssistantLearnPayload(BaseModel): text: str = Field(..., min_length=3) title: Optional[str] = None tags: List[str] = Field(default_factory=list) release_name: Optional[str] = None metadata: Dict[str, Any] = Field(default_factory=dict) class AssistantChatMessage(BaseModel): role: Literal["user", "assistant"] content: str class AssistantChatPayload(BaseModel): message: str = Field(..., min_length=1) session_id: Optional[str] = None release_name: Optional[str] = None max_sources: int = Field(default=6, ge=1, le=20) history: List[AssistantChatMessage] = Field(default_factory=list) temperature_hint: Optional[str] = "balanced" class AssistantChatResponse(BaseModel): session_id: str answer: str sources: List[AssistantDraftSource] confidence: float release_name: Optional[str] = None # --------- helpers --------- def now_iso() -> str: return datetime.now(timezone.utc).isoformat() def _tail(text: str, max_chars: int = 8000) -> str: if len(text) <= max_chars: return text return text[-max_chars:] def check_admin_api_key(x_admin_api_key: Optional[str]) -> None: if not ADMIN_API_KEY: raise HTTPException(status_code=503, detail="ADMIN_API_KEY is not configured") if x_admin_api_key != ADMIN_API_KEY: raise HTTPException(status_code=401, detail="Unauthorized") def make_fingerprint(name: str, kind: Optional[str], external_ids: Dict[str, str]) -> str: norm = (name or "").strip().lower() k = (kind or "").strip().lower() ext = "|".join(f"{a}:{b}".lower() for a, b in sorted(external_ids.items())) raw = f"{norm}|{k}|{ext}" return hashlib.sha256(raw.encode("utf-8")).hexdigest() def _clean_header_id(v: Optional[str]) -> str: if not v: return "" return v.strip().strip("<>").strip() def _normalize_thread_id(msg_id: str, refs: str, in_reply_to: str, subject: str, sender: str) -> str: refs_clean = _clean_header_id(refs.split()[-1] if refs else "") in_reply_clean = _clean_header_id(in_reply_to) if refs_clean: return f"thread:{refs_clean}" if in_reply_clean: return f"thread:{in_reply_clean}" seed = f"{subject.strip().lower()}|{sender.strip().lower()}" if not seed.strip("|"): seed = msg_id return "thread:" + hashlib.sha256(seed.encode("utf-8")).hexdigest()[:24] def _extract_body_text(msg: email.message.Message) -> str: try: if msg.is_multipart(): for part in msg.walk(): ctype = (part.get_content_type() or "").lower() disp = (part.get("Content-Disposition") or "").lower() if ctype == "text/plain" and "attachment" not in disp: payload_obj = part.get_content() if isinstance(payload_obj, str): return payload_obj.strip() if isinstance(payload_obj, bytes): return payload_obj.decode(part.get_content_charset() or "utf-8", errors="replace").strip() for part in msg.walk(): ctype = (part.get_content_type() or "").lower() if ctype == "text/html": html_obj = part.get_content() if isinstance(html_obj, bytes): html_obj = html_obj.decode(part.get_content_charset() or "utf-8", errors="replace") if isinstance(html_obj, str): return html_obj.strip() return "" payload_obj = msg.get_content() if isinstance(payload_obj, str): return payload_obj.strip() if isinstance(payload_obj, bytes): return payload_obj.decode(msg.get_content_charset() or "utf-8", errors="replace").strip() return "" except Exception: return "" def fetch_imap_messages_blocking( payload: EmailImapIngestPayload, effective_search_criteria: str, since_uid: Optional[int], ) -> List[MessageIngestItem]: password = payload.password or os.getenv("IMAP_PASSWORD", "") if not password: raise ValueError("IMAP password missing: provide payload.password or set IMAP_PASSWORD") if payload.use_ssl: client = imaplib.IMAP4_SSL(payload.host, payload.port) else: client = imaplib.IMAP4(payload.host, payload.port) try: status, _ = client.login(payload.username, password) if status != "OK": raise RuntimeError("IMAP login failed") status, _ = client.select(payload.mailbox, readonly=True) if status != "OK": raise RuntimeError(f"IMAP select mailbox failed: {payload.mailbox}") if since_uid is not None: status, search_data = client.uid("search", None, "UID", f"{int(since_uid) + 1}:*") else: status, search_data = client.uid("search", None, effective_search_criteria) if status != "OK": raise RuntimeError(f"IMAP search failed: {effective_search_criteria}") uid_bytes = search_data[0] if search_data else b"" uid_list = [u for u in uid_bytes.decode("utf-8", errors="replace").split() if u] if since_uid is not None: filtered: List[str] = [] for u in uid_list: try: if int(u) > int(since_uid): filtered.append(u) except Exception: continue uid_list = filtered if not uid_list: return [] # For incremental UID windows, process oldest-new first so checkpointing cannot skip gaps. is_uid_window = since_uid is not None if is_uid_window: selected_uids = uid_list[: payload.max_messages] else: # For non-incremental scans (e.g. ALL), keep "latest N" behavior. selected_uids = uid_list[-payload.max_messages :] out: List[MessageIngestItem] = [] for uid in selected_uids: status, msg_data = client.uid("fetch", uid, "(RFC822)") if status != "OK" or not msg_data: continue raw_bytes = None for part in msg_data: if isinstance(part, tuple) and len(part) >= 2 and isinstance(part[1], (bytes, bytearray)): raw_bytes = bytes(part[1]) break if not raw_bytes: continue msg = email.message_from_bytes(raw_bytes, policy=policy.default) subject = str(msg.get("Subject") or "").strip() from_raw = str(msg.get("From") or "").strip() to_raw = str(msg.get("To") or "").strip() date_raw = str(msg.get("Date") or "").strip() msg_id_raw = str(msg.get("Message-Id") or msg.get("Message-ID") or "").strip() refs_raw = str(msg.get("References") or "").strip() in_reply_raw = str(msg.get("In-Reply-To") or "").strip() sender_email = parseaddr(from_raw)[1] or from_raw or "unknown" msg_id_clean = _clean_header_id(msg_id_raw) if not msg_id_clean: seed = f"{uid}|{subject}|{sender_email}|{date_raw}" msg_id_clean = "imap-" + hashlib.sha256(seed.encode("utf-8")).hexdigest()[:24] thread_id = _normalize_thread_id( msg_id=msg_id_clean, refs=refs_raw, in_reply_to=in_reply_raw, subject=subject, sender=sender_email, ) sent_at_iso = None if date_raw: try: dt = parsedate_to_datetime(date_raw) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) sent_at_iso = dt.astimezone(timezone.utc).isoformat() except Exception: sent_at_iso = None body = _extract_body_text(msg) if not body: body = f"(no body) {subject}".strip() metadata = { "subject": subject, "from": from_raw, "to": to_raw, "date": date_raw, "imap_uid": uid, "mailbox": payload.mailbox, "host": payload.host, "username": payload.username, } out.append( MessageIngestItem( thread_id=thread_id, message_id=msg_id_clean, sender=sender_email, channel=payload.channel, sent_at=sent_at_iso, body=body, metadata=metadata, ) ) return out finally: try: client.logout() except Exception: pass async def run_remote_query_imap_checkpoint( host: str, mailbox: str, username: str, table: str, ) -> Optional[int]: parts = [ IMAP_CHECKPOINT_REMOTE_SCRIPT, host, mailbox, username, table, ] command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}" ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] proc = await asyncio.create_subprocess_exec( *ssh_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) except asyncio.TimeoutError: proc.kill() await proc.wait() raise HTTPException(status_code=504, detail="IMAP checkpoint query timed out") out = stdout.decode("utf-8", errors="replace") err = stderr.decode("utf-8", errors="replace") if proc.returncode != 0: raise HTTPException( status_code=502, detail={ "host": PROJECTOR_SSH_HOST, "remote_dir": PROJECTOR_REMOTE_DIR, "exit_code": proc.returncode, "stdout_tail": _tail(out), "stderr_tail": _tail(err), }, ) try: obj = _extract_json_object_from_text(out) val = obj.get("max_uid") if val is None: return None return int(val) except Exception as e: raise HTTPException( status_code=502, detail={ "message": f"Unable to parse IMAP checkpoint output: {e}", "stdout_tail": _tail(out), "stderr_tail": _tail(err), }, ) async def run_remote_create_messages_release(release_name: str) -> Dict[str, Any]: parts = [ CREATE_MESSAGES_RELEASE_REMOTE_SCRIPT, release_name, ] command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}" ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] proc = await asyncio.create_subprocess_exec( *ssh_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) except asyncio.TimeoutError: proc.kill() await proc.wait() raise HTTPException(status_code=504, detail="Create messages release timed out") out = stdout.decode("utf-8", errors="replace") err = stderr.decode("utf-8", errors="replace") result = { "host": PROJECTOR_SSH_HOST, "remote_dir": PROJECTOR_REMOTE_DIR, "exit_code": proc.returncode, "release_name": release_name, "stdout_tail": _tail(out), "stderr_tail": _tail(err), } if proc.returncode != 0: raise HTTPException(status_code=502, detail=result) return result async def ipfs_add_json(payload: Dict[str, Any]) -> str: # Kubo: POST /api/v0/add with file content in multipart data = json.dumps(payload, ensure_ascii=False).encode("utf-8") files = {"file": ("concept.json", data, "application/json")} async with httpx.AsyncClient(timeout=30) as h: r = await h.post(f"{IPFS_API}/api/v0/add", files=files) r.raise_for_status() # Response is text lines of JSON; last line contains Hash # Often single line, but handle both last = r.text.strip().splitlines()[-1] obj = json.loads(last) return obj["Hash"] async def ollama_summary(text: str) -> str: prompt = ( "Summarize the following concept in 1-2 sentences. " "Keep it factual and compact.\n\n" f"{text}" ) async with httpx.AsyncClient(timeout=60) as h: r = await h.post( f"{OLLAMA_URL}/api/generate", json={"model": OLLAMA_MODEL, "prompt": prompt, "stream": False}, ) r.raise_for_status() return (r.json().get("response") or "").strip() async def ollama_embed(text: str) -> List[float]: async with httpx.AsyncClient(timeout=60) as h: r = await h.post( f"{OLLAMA_URL}/api/embeddings", json={"model": OLLAMA_EMBED_MODEL, "prompt": text}, ) r.raise_for_status() emb = r.json().get("embedding") if not isinstance(emb, list): return [] return emb async def es_ensure_index(): # Minimal mapping: text fields + dense_vector (optional) # If your ES doesn't support dense_vector, remove it. mapping = { "mappings": { "properties": { "concept_id": {"type": "keyword"}, "canonical_name": {"type": "text"}, "kind": {"type": "keyword"}, "aliases": {"type": "text"}, "tags": {"type": "keyword"}, "summary": {"type": "text"}, "latest_cid": {"type": "keyword"}, "fingerprint": {"type": "keyword"}, "created_at": {"type": "date"}, "updated_at": {"type": "date"}, "embedding": {"type": "dense_vector", "dims": 768, "index": False}, # may vary } } } async with httpx.AsyncClient(timeout=30) as h: head = await h.get(f"{ES_URL}/{ES_INDEX}") if head.status_code == 200: return r = await h.put(f"{ES_URL}/{ES_INDEX}", json=mapping) # If this fails due to dense_vector incompatibility, you can still proceed: if r.status_code >= 400: # try without vector mapping["mappings"]["properties"].pop("embedding", None) r2 = await h.put(f"{ES_URL}/{ES_INDEX}", json=mapping) r2.raise_for_status() async def es_index(doc: Dict[str, Any]): async with httpx.AsyncClient(timeout=30) as h: r = await h.put(f"{ES_URL}/{ES_INDEX}/_doc/{doc['concept_id']}", json=doc) r.raise_for_status() async def es_search(q: str, size: int = 10) -> List[Dict[str, Any]]: query = { "size": size, "query": { "multi_match": { "query": q, "fields": ["canonical_name^3", "aliases^2", "summary", "tags"], } }, } async with httpx.AsyncClient(timeout=30) as h: r = await h.post(f"{ES_URL}/{ES_INDEX}/_search", json=query) r.raise_for_status() hits = r.json().get("hits", {}).get("hits", []) return [h["_source"] for h in hits] async def es_search_hits(q: str, size: int = 10, release_name: Optional[str] = None) -> List[Dict[str, Any]]: must_clause: Dict[str, Any] = { "multi_match": { "query": q, "fields": [ "display_name^3", "canonical_name^3", "description^2", "text^2", "summary^2", "aliases^2", "tags", "source_pk^2", "source_table", ], } } query: Dict[str, Any] = {"size": size} if release_name: release_filter = { "bool": { "should": [ {"term": {"release_name.keyword": release_name}}, {"term": {"release_name": release_name}}, {"match_phrase": {"release_name": release_name}}, ], "minimum_should_match": 1, } } query["query"] = { "bool": { "must": [must_clause], "filter": [release_filter], } } else: query["query"] = must_clause async with httpx.AsyncClient(timeout=30) as h: r = await h.post(f"{ES_URL}/{ES_INDEX}/_search", json=query) r.raise_for_status() return r.json().get("hits", {}).get("hits", []) async def es_recent_by_release(release_name: str, size: int = 10) -> List[Dict[str, Any]]: query: Dict[str, Any] = { "size": size, "query": { "bool": { "filter": [ { "bool": { "should": [ {"term": {"release_name.keyword": release_name}}, {"term": {"release_name": release_name}}, {"match_phrase": {"release_name": release_name}}, ], "minimum_should_match": 1, } } ] } }, "sort": [{"updated_at": {"order": "desc", "unmapped_type": "date"}}], } async with httpx.AsyncClient(timeout=30) as h: r = await h.post(f"{ES_URL}/{ES_INDEX}/_search", json=query) r.raise_for_status() return r.json().get("hits", {}).get("hits", []) async def es_recent_messages( size: int = 20, release_name: Optional[str] = None, q: Optional[str] = None, ) -> List[Dict[str, Any]]: filters: List[Dict[str, Any]] = [ { "bool": { "should": [ {"term": {"concept_type.keyword": "message"}}, {"term": {"concept_type": "message"}}, {"term": {"kind.keyword": "message"}}, {"term": {"kind": "message"}}, ], "minimum_should_match": 1, } } ] if release_name: filters.append( { "bool": { "should": [ {"term": {"release_name.keyword": release_name}}, {"term": {"release_name": release_name}}, {"match_phrase": {"release_name": release_name}}, ], "minimum_should_match": 1, } } ) must: List[Dict[str, Any]] = [] if q and q.strip(): must.append( { "multi_match": { "query": q.strip(), "fields": [ "text^3", "description^2", "summary^2", "display_name^2", "canonical_name^2", "source_pk^2", ], } } ) query: Dict[str, Any] = { "size": size, "query": { "bool": { "filter": filters, "must": must, } }, "sort": [{"updated_at": {"order": "desc", "unmapped_type": "date"}}], } async with httpx.AsyncClient(timeout=30) as h: r = await h.post(f"{ES_URL}/{ES_INDEX}/_search", json=query) r.raise_for_status() return r.json().get("hits", {}).get("hits", []) TASK_TRIGGER_RE = re.compile( r"\b(" r"please|can you|could you|need to|needs to|todo|to do|follow up|follow-up|" r"review|fix|send|reply|schedule|book|call|remind|prepare|draft|submit|pay|" r"deadline|due|let me know|check in|confirm|update me|get back to me" r")\b", flags=re.IGNORECASE, ) TASK_DONE_RE = re.compile(r"\b(done|completed|closed|resolved|fixed)\b", flags=re.IGNORECASE) DUE_HINT_RE = re.compile( r"\b(" r"today|tomorrow|tonight|next week|next month|monday|tuesday|wednesday|thursday|friday|saturday|sunday|" r"\d{4}-\d{2}-\d{2}|" r"\d{1,2}:\d{2}" r")\b", flags=re.IGNORECASE, ) TASK_AI_CACHE_TTL_SEC = int(os.getenv("TASK_AI_CACHE_TTL_SEC", "3600")) TASK_AI_CACHE_MAX_SIZE = int(os.getenv("TASK_AI_CACHE_MAX_SIZE", "5000")) TASK_AI_CACHE: Dict[str, Dict[str, Any]] = {} ASSISTANT_CHAT_MAX_TURNS = int(os.getenv("ASSISTANT_CHAT_MAX_TURNS", "20")) ASSISTANT_CHAT_SESSIONS: Dict[str, List[Dict[str, str]]] = {} def _split_sentences(text: str) -> List[str]: raw_parts = re.split(r"[\n\r]+|(?<=[.!?])\s+", text or "") return [p.strip() for p in raw_parts if p and p.strip()] def _extract_due_hint(text: str) -> Optional[str]: m = DUE_HINT_RE.search(text or "") if not m: return None return m.group(1) def _extract_who(text: str, default_sender: Optional[str]) -> Optional[str]: m = re.search(r"\b(?:to|for)\s+([A-Z][a-zA-Z0-9_-]{1,40})\b", text or "") if m: return m.group(1) return default_sender or None def extract_pending_tasks_from_source(src: Dict[str, Any]) -> List[Dict[str, Any]]: text = str(src.get("text") or src.get("description") or src.get("summary") or "").strip() if not text: return [] attrs_raw = src.get("attributes_json") attrs: Dict[str, Any] = {} if isinstance(attrs_raw, str) and attrs_raw.strip(): try: parsed = json.loads(attrs_raw) if isinstance(parsed, dict): attrs = parsed except Exception: attrs = {} elif isinstance(attrs_raw, dict): attrs = attrs_raw thread_id = attrs.get("thread_id") message_id = attrs.get("message_id") or src.get("source_pk") sender = attrs.get("sender") sent_at = attrs.get("sent_at") base_source = f"{src.get('concept_id') or ''}|{message_id or ''}" tasks: List[Dict[str, Any]] = [] for sentence in _split_sentences(text): if len(sentence) < 8: continue looks_like_question = "?" in sentence if not TASK_TRIGGER_RE.search(sentence) and not looks_like_question: continue status = "done" if TASK_DONE_RE.search(sentence) else "pending" due_hint = _extract_due_hint(sentence) who = _extract_who(sentence, sender) task_id = "task-" + hashlib.sha256(f"{base_source}|{sentence.lower()}".encode("utf-8")).hexdigest()[:16] tasks.append( { "task_id": task_id, "status": status, "todo": sentence[:400], "due_hint": due_hint, "who": who, "concept_id": src.get("concept_id"), "source_pk": src.get("source_pk"), "source_table": src.get("source_table"), "release_name": src.get("release_name"), "thread_id": thread_id, "message_id": message_id, "sender": sender, "sent_at": sent_at, "updated_at": src.get("updated_at"), } ) return tasks def _task_ai_cache_key(src: Dict[str, Any]) -> str: text = str(src.get("text") or src.get("description") or src.get("summary") or "") text_hash = hashlib.sha256(text.encode("utf-8")).hexdigest()[:24] base = "|".join( [ str(src.get("concept_id") or ""), str(src.get("source_pk") or ""), str(src.get("updated_at") or ""), text_hash, OLLAMA_MODEL, ] ) return hashlib.sha256(base.encode("utf-8")).hexdigest() def _task_ai_cache_get(key: str) -> Optional[List[Dict[str, Any]]]: obj = TASK_AI_CACHE.get(key) if not obj: return None expires_at = float(obj.get("expires_at") or 0) if expires_at <= time.time(): TASK_AI_CACHE.pop(key, None) return None tasks = obj.get("tasks") if not isinstance(tasks, list): return None return tasks def _task_ai_cache_set(key: str, tasks: List[Dict[str, Any]]) -> None: # Keep cache bounded in a simple way by evicting oldest-expiry items first. if len(TASK_AI_CACHE) >= TASK_AI_CACHE_MAX_SIZE: keys_sorted = sorted(TASK_AI_CACHE.items(), key=lambda kv: float(kv[1].get("expires_at") or 0)) trim_count = max(1, TASK_AI_CACHE_MAX_SIZE // 10) for k, _ in keys_sorted[:trim_count]: TASK_AI_CACHE.pop(k, None) TASK_AI_CACHE[key] = { "expires_at": time.time() + TASK_AI_CACHE_TTL_SEC, "tasks": tasks, } def build_task_extraction_prompt(src: Dict[str, Any]) -> str: text = str(src.get("text") or src.get("description") or src.get("summary") or "")[:4000] attrs_raw = src.get("attributes_json") attrs = attrs_raw if isinstance(attrs_raw, str) else json.dumps(attrs_raw or {}, ensure_ascii=False) return ( "Extract actionable tasks from this message. Ignore pure marketing/promotional content.\n" "Return strict JSON only with shape:\n" '{"tasks":[{"todo":"...","status":"pending|done","due_hint":"...|null","who":"...|null"}]}\n' "If no actionable tasks, return {\"tasks\":[]}.\n\n" f"Message concept_id: {src.get('concept_id')}\n" f"Source pk: {src.get('source_pk')}\n" f"Attributes JSON: {attrs}\n" f"Text:\n{text}\n" ) async def extract_pending_tasks_from_source_ai(src: Dict[str, Any]) -> List[Dict[str, Any]]: prompt = build_task_extraction_prompt(src) raw = await ollama_generate(prompt) obj = _extract_json_object_from_text(raw) task_items = obj.get("tasks") if not isinstance(task_items, list): return [] attrs_raw = src.get("attributes_json") attrs: Dict[str, Any] = {} if isinstance(attrs_raw, str) and attrs_raw.strip(): try: parsed = json.loads(attrs_raw) if isinstance(parsed, dict): attrs = parsed except Exception: attrs = {} elif isinstance(attrs_raw, dict): attrs = attrs_raw thread_id = attrs.get("thread_id") message_id = attrs.get("message_id") or src.get("source_pk") sender = attrs.get("sender") sent_at = attrs.get("sent_at") base_source = f"{src.get('concept_id') or ''}|{message_id or ''}" out: List[Dict[str, Any]] = [] for item in task_items: if not isinstance(item, dict): continue todo = str(item.get("todo") or "").strip() if len(todo) < 4: continue status_raw = str(item.get("status") or "pending").strip().lower() status = "done" if status_raw == "done" else "pending" due_hint = item.get("due_hint") who = item.get("who") todo_norm = todo[:400] task_id = "task-" + hashlib.sha256(f"{base_source}|{todo_norm.lower()}".encode("utf-8")).hexdigest()[:16] out.append( { "task_id": task_id, "status": status, "todo": todo_norm, "due_hint": str(due_hint) if due_hint is not None else None, "who": str(who) if who is not None else sender, "concept_id": src.get("concept_id"), "source_pk": src.get("source_pk"), "source_table": src.get("source_table"), "release_name": src.get("release_name"), "thread_id": thread_id, "message_id": message_id, "sender": sender, "sent_at": sent_at, "updated_at": src.get("updated_at"), } ) return out def build_chat_prompt( user_message: str, history: List[Dict[str, str]], source_docs: List[Dict[str, Any]], release_name: Optional[str], ) -> str: history_lines: List[str] = [] for t in history[-ASSISTANT_CHAT_MAX_TURNS:]: role = t.get("role", "") content = (t.get("content", "") or "").strip() if role in ("user", "assistant") and content: history_lines.append(f"{role}: {content[:1200]}") context_chunks = [] for d in source_docs: src = d.get("_source", {}) or {} context_chunks.append( "\n".join( [ f"concept_id: {src.get('concept_id', '')}", f"source_pk: {src.get('source_pk', '')}", f"release_name: {src.get('release_name', '')}", f"text: {str(src.get('text') or src.get('description') or src.get('summary') or '')[:1200]}", ] ) ) context = "\n\n---\n\n".join(context_chunks) if context_chunks else "No retrieved context." hist = "\n".join(history_lines) if history_lines else "(none)" return ( "You are a practical personal assistant. Be concise, factual, and useful.\n" "Use retrieved context when available. If uncertain, say so briefly and ask one clarifying question.\n" "Do not claim external actions were already performed.\n\n" f"Release filter: {release_name or '(none)'}\n" f"Conversation history:\n{hist}\n\n" f"Retrieved context:\n{context}\n\n" f"User: {user_message}\n" "Assistant:" ) def _append_chat_turn(session_id: str, role: str, content: str) -> None: turns = ASSISTANT_CHAT_SESSIONS.get(session_id, []) turns.append({"role": role, "content": content}) max_items = ASSISTANT_CHAT_MAX_TURNS * 2 if len(turns) > max_items: turns = turns[-max_items:] ASSISTANT_CHAT_SESSIONS[session_id] = turns def build_assistant_prompt(payload: AssistantDraftPayload, source_docs: List[Dict[str, Any]]) -> str: recipient = payload.recipient or "unspecified recipient" tone = payload.tone or "professional" constraints = payload.constraints or [] constraint_lines = "\n".join(f"- {c}" for c in constraints) if constraints else "- None" context_chunks = [] for d in source_docs: src = d.get("_source", {}) or {} context_chunks.append( "\n".join( [ f"concept_id: {src.get('concept_id', '')}", f"concept_type: {src.get('concept_type', '')}", f"source_pk: {src.get('source_pk', '')}", f"source_table: {src.get('source_table', '')}", f"release_name: {src.get('release_name', '')}", f"text: {str(src.get('text') or src.get('description') or src.get('summary') or '')[:1000]}", ] ) ) context = "\n\n---\n\n".join(context_chunks) if context_chunks else "No retrieved context." return ( "You are a careful personal assistant. Draft a response based only on provided context.\n" "If context is missing, state uncertainty and ask concise follow-up questions.\n" "Do not claim actions were sent; provide draft-only text.\n\n" f"Task type: {payload.task_type}\n" f"Goal: {payload.goal}\n" f"Recipient: {recipient}\n" f"Tone: {tone}\n" f"Constraints:\n{constraint_lines}\n\n" "Retrieved context:\n" f"{context}\n\n" "Output only the draft text." ) async def ollama_generate(prompt: str) -> str: async with httpx.AsyncClient(timeout=90) as h: r = await h.post( f"{OLLAMA_URL}/api/generate", json={"model": OLLAMA_MODEL, "prompt": prompt, "stream": False}, ) r.raise_for_status() return (r.json().get("response") or "").strip() def fallback_draft_text(payload: AssistantDraftPayload) -> str: recipient = payload.recipient or "there" tone = (payload.tone or "").lower() if payload.task_type == "message": if "friendly" in tone: return ( f"Hi {recipient},\n\n" "Thanks for your message. I received it and will follow up tomorrow.\n\n" "Best," ) return ( f"Hello {recipient},\n\n" "I confirm receipt of your message and will follow up tomorrow.\n\n" "Regards," ) return ( "Draft:\n" f"{payload.goal}\n\n" "I can refine this further once retrieval/model services are available." ) def _extract_json_object_from_text(text: str) -> Dict[str, Any]: start = text.find("{") end = text.rfind("}") if start == -1 or end == -1 or end < start: raise ValueError("No JSON object found in output") candidate = text[start : end + 1] obj = json.loads(candidate) if not isinstance(obj, dict): raise ValueError("Parsed value is not a JSON object") return obj def build_assistant_plan_prompt(payload: AssistantPlanPayload, source_docs: List[Dict[str, Any]]) -> str: constraints = payload.constraints or [] constraint_lines = "\n".join(f"- {c}" for c in constraints) if constraints else "- None" context_chunks = [] for d in source_docs: src = d.get("_source", {}) or {} context_chunks.append( "\n".join( [ f"concept_id: {src.get('concept_id', '')}", f"source_pk: {src.get('source_pk', '')}", f"source_table: {src.get('source_table', '')}", f"release_name: {src.get('release_name', '')}", f"text: {str(src.get('text') or src.get('description') or src.get('summary') or '')[:600]}", ] ) ) context = "\n\n---\n\n".join(context_chunks) if context_chunks else "No retrieved context." return ( "You are a cautious personal assistant planner. Produce an execution plan only; do not execute anything.\n" "Return valid JSON ONLY with this exact shape:\n" '{' '"plan": [' '{"step_id":"S1","title":"...","action_type":"research|draft|ask_user|prepare_data|review","requires_approval":true|false,"notes":"..."}' "]" "}\n" f"Use at most {payload.max_steps} steps.\n" "Prefer safe read-only and draft actions first.\n\n" f"Task type: {payload.task_type}\n" f"Objective: {payload.objective}\n" f"Constraints:\n{constraint_lines}\n\n" "Retrieved context:\n" f"{context}\n" ) def fallback_plan(payload: AssistantPlanPayload) -> List[AssistantPlanStep]: return [ AssistantPlanStep( step_id="S1", title="Gather relevant facts and constraints", action_type="research", requires_approval=False, notes="Review messages/concepts and identify required context.", ), AssistantPlanStep( step_id="S2", title="Draft a response or action proposal", action_type="draft", requires_approval=False, notes="Produce a concise draft aligned with objective and constraints.", ), AssistantPlanStep( step_id="S3", title="Request user confirmation before any external action", action_type="ask_user", requires_approval=True, notes="Do not send or execute changes until approved.", ), ][: payload.max_steps] def find_plan_step(plan: List[AssistantPlanStep], step_id: str) -> Optional[AssistantPlanStep]: for s in plan: if s.step_id == step_id: return s return None def is_high_risk_step(step: AssistantPlanStep) -> bool: text = f"{step.title} {step.notes or ''}".lower() high_risk_terms = [ "send", "submit", "pay", "payment", "transfer", "wire", "sign", "file", "delete", "close account", "change account", ] return any(t in text for t in high_risk_terms) def enforce_step_policy(payload: AssistantExecuteStepPayload, step: AssistantPlanStep) -> Optional[str]: # Plan-declared approval gate. if step.requires_approval and not payload.approved: return "Step requires approval but approved=false." # Extra hard gate for risky external actions. if is_high_risk_step(step): if not payload.approved: return "High-risk step requires approved=true." if not (payload.manual_confirm_token and payload.manual_confirm_token.strip()): return "High-risk step requires manual_confirm_token." return None async def execute_plan_step(payload: AssistantExecuteStepPayload, step: AssistantPlanStep) -> Dict[str, Any]: if step.action_type == "draft": prompt = ( "Draft concise text for this approved planning step.\n" f"Task type: {payload.task_type}\n" f"Objective: {payload.objective}\n" f"Step: {step.title}\n" f"Notes: {step.notes or ''}\n" "Output only final draft text." ) try: text = await ollama_generate(prompt) if not text.strip(): text = f"Draft for step '{step.title}'." except Exception: text = f"Draft for step '{step.title}'." return {"draft": text} if step.action_type == "research": return {"note": "Research step acknowledged. Use /search or /assistant/draft for grounded retrieval."} if step.action_type == "prepare_data": return {"note": "Prepare-data step acknowledged.", "checklist": ["Collect required inputs", "Normalize format", "Validate completeness"]} if step.action_type == "review": return {"note": "Review step requires human review before external action."} if step.action_type == "ask_user": return {"question": "Please confirm whether to proceed with the next high-impact action."} return {"note": "Step recognized but no executor implemented."} def _gremlin_submit_blocking(q: str, bindings: Dict[str, Any]) -> Any: # Create/close client per call so transport loop is owned by this worker thread. c = gremlin_client.Client( GREMLIN_URL, "g", message_serializer=GraphSONSerializersV3d0(), ) try: return c.submit(q, bindings).all().result() finally: c.close() async def gremlin_submit(q: str, bindings: Dict[str, Any]) -> Any: return await asyncio.to_thread(_gremlin_submit_blocking, q, bindings) async def run_remote_projector(payload: ProjectionTrigger) -> Dict[str, Any]: parts = [ PROJECTOR_REMOTE_SCRIPT, "--release-name", payload.release_name, "--targets", payload.targets, ] if payload.concept_table: parts.extend(["--concept-table", payload.concept_table]) if payload.dry_run: parts.append("--dry-run") command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}" ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] proc = await asyncio.create_subprocess_exec( *ssh_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) except asyncio.TimeoutError: proc.kill() await proc.wait() raise HTTPException(status_code=504, detail="Projector execution timed out") out = stdout.decode("utf-8", errors="replace") err = stderr.decode("utf-8", errors="replace") spark_read_done = "[STEP] spark_read_done" in out projection_done = "[STEP] projection_done" in out result = { "host": PROJECTOR_SSH_HOST, "remote_dir": PROJECTOR_REMOTE_DIR, "exit_code": proc.returncode, "spark_read_done": spark_read_done, "projection_done": projection_done, "stdout_tail": _tail(out), "stderr_tail": _tail(err), } if proc.returncode != 0: raise HTTPException(status_code=502, detail=result) return result def _b64(s: str) -> str: return base64.b64encode(s.encode("utf-8")).decode("ascii") async def run_remote_ingest_message(payload: MessageIngestPayload) -> Dict[str, Any]: sent_at = payload.sent_at or "" parts = [ INGEST_MESSAGE_REMOTE_SCRIPT, payload.table, payload.thread_id, payload.message_id, payload.sender, payload.channel, sent_at, _b64(payload.body), _b64(json.dumps(payload.metadata, ensure_ascii=False)), ] command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}" ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] proc = await asyncio.create_subprocess_exec( *ssh_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) except asyncio.TimeoutError: proc.kill() await proc.wait() raise HTTPException(status_code=504, detail="Message ingest execution timed out") out = stdout.decode("utf-8", errors="replace") err = stderr.decode("utf-8", errors="replace") result = { "host": PROJECTOR_SSH_HOST, "remote_dir": PROJECTOR_REMOTE_DIR, "exit_code": proc.returncode, "stdout_tail": _tail(out), "stderr_tail": _tail(err), } if proc.returncode != 0: raise HTTPException(status_code=502, detail=result) return result async def run_remote_ingest_messages_batch(payload: MessageIngestBatchPayload) -> Dict[str, Any]: rows = [] for m in payload.messages: rows.append( { "thread_id": m.thread_id, "message_id": m.message_id, "sender": m.sender, "channel": m.channel, "sent_at": m.sent_at, "body": m.body, "metadata": m.metadata, } ) if not rows: return { "host": PROJECTOR_SSH_HOST, "remote_dir": PROJECTOR_REMOTE_DIR, "exit_code": 0, "rows": 0, "stdout_tail": "[INFO] No rows to ingest", "stderr_tail": "", } local_tmp = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False, encoding="utf-8") remote_tmp = f"{PROJECTOR_REMOTE_DIR}/.ingest_messages_{uuid.uuid4().hex}.json" try: json.dump(rows, local_tmp, ensure_ascii=False) local_tmp.flush() local_tmp.close() scp_target = f"{PROJECTOR_SSH_HOST}:{remote_tmp}" scp_args = [PROJECTOR_SCP_BIN, *shlex.split(PROJECTOR_SCP_OPTS), local_tmp.name, scp_target] scp_proc = await asyncio.create_subprocess_exec( *scp_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: scp_stdout, scp_stderr = await asyncio.wait_for(scp_proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) except asyncio.TimeoutError: scp_proc.kill() await scp_proc.wait() raise HTTPException(status_code=504, detail="Batch payload upload timed out") if scp_proc.returncode != 0: raise HTTPException( status_code=502, detail={ "host": PROJECTOR_SSH_HOST, "remote_dir": PROJECTOR_REMOTE_DIR, "exit_code": scp_proc.returncode, "stdout_tail": _tail(scp_stdout.decode("utf-8", errors="replace")), "stderr_tail": _tail(scp_stderr.decode("utf-8", errors="replace")), }, ) payload_arg = f"@{remote_tmp}" parts = [ INGEST_MESSAGES_BATCH_REMOTE_SCRIPT, payload.table, payload.dedupe_mode, payload_arg, ] batch_cmd = " ".join(shlex.quote(p) for p in parts) command = ( f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && " f"({batch_cmd}); rc=$?; rm -f {shlex.quote(remote_tmp)}; exit $rc" ) ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] proc = await asyncio.create_subprocess_exec( *ssh_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) except asyncio.TimeoutError: proc.kill() await proc.wait() raise HTTPException(status_code=504, detail="Batch message ingest execution timed out") finally: try: os.unlink(local_tmp.name) except Exception: pass out = stdout.decode("utf-8", errors="replace") err = stderr.decode("utf-8", errors="replace") result = { "host": PROJECTOR_SSH_HOST, "remote_dir": PROJECTOR_REMOTE_DIR, "exit_code": proc.returncode, "rows": len(rows), "stdout_tail": _tail(out), "stderr_tail": _tail(err), } if proc.returncode != 0: raise HTTPException(status_code=502, detail=result) return result async def run_remote_assistant_feedback( feedback_id: str, payload: AssistantFeedbackPayload, ) -> Dict[str, Any]: confidence = payload.confidence if payload.confidence is not None else 0.0 parts = [ ASSISTANT_FEEDBACK_REMOTE_SCRIPT, feedback_id, now_iso(), payload.outcome, payload.task_type, payload.release_name or "", f"{confidence}", "true" if payload.needs_review else "false", _b64(payload.goal or ""), _b64(payload.draft), _b64(payload.final_text or ""), _b64(json.dumps([s.model_dump() for s in payload.sources], ensure_ascii=False)), _b64(payload.notes or ""), ] command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}" ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] proc = await asyncio.create_subprocess_exec( *ssh_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) except asyncio.TimeoutError: proc.kill() await proc.wait() raise HTTPException(status_code=504, detail="Assistant feedback execution timed out") out = stdout.decode("utf-8", errors="replace") err = stderr.decode("utf-8", errors="replace") result = { "host": PROJECTOR_SSH_HOST, "remote_dir": PROJECTOR_REMOTE_DIR, "exit_code": proc.returncode, "stdout_tail": _tail(out), "stderr_tail": _tail(err), } if proc.returncode != 0: raise HTTPException(status_code=502, detail=result) return result def _extract_json_array_from_text(text: str) -> List[Dict[str, Any]]: start = text.find("[") end = text.rfind("]") if start == -1 or end == -1 or end < start: raise ValueError("No JSON array found in output") candidate = text[start : end + 1] obj = json.loads(candidate) if not isinstance(obj, list): raise ValueError("Parsed value is not a JSON array") out: List[Dict[str, Any]] = [] for item in obj: if isinstance(item, dict): out.append(item) return out async def run_remote_query_assistant_feedback( outcome: Optional[str], task_type: Optional[str], release_name: Optional[str], limit: int, ) -> Dict[str, Any]: parts = [ ASSISTANT_FEEDBACK_QUERY_REMOTE_SCRIPT, outcome or "", task_type or "", release_name or "", str(limit), ] command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}" ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] proc = await asyncio.create_subprocess_exec( *ssh_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) except asyncio.TimeoutError: proc.kill() await proc.wait() raise HTTPException(status_code=504, detail="Assistant feedback query timed out") out = stdout.decode("utf-8", errors="replace") err = stderr.decode("utf-8", errors="replace") if proc.returncode != 0: raise HTTPException( status_code=502, detail={ "host": PROJECTOR_SSH_HOST, "remote_dir": PROJECTOR_REMOTE_DIR, "exit_code": proc.returncode, "stdout_tail": _tail(out), "stderr_tail": _tail(err), }, ) try: rows = _extract_json_array_from_text(out) except Exception as e: raise HTTPException( status_code=502, detail={ "message": f"Unable to parse feedback query output: {e}", "stdout_tail": _tail(out), "stderr_tail": _tail(err), }, ) return { "host": PROJECTOR_SSH_HOST, "remote_dir": PROJECTOR_REMOTE_DIR, "rows": rows, } async def run_remote_query_assistant_metrics( task_type: Optional[str], release_name: Optional[str], outcome: Optional[str], group_by: str, limit: int, ) -> Dict[str, Any]: parts = [ ASSISTANT_METRICS_QUERY_REMOTE_SCRIPT, task_type or "", release_name or "", outcome or "", group_by, str(limit), ] command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}" ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] proc = await asyncio.create_subprocess_exec( *ssh_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) except asyncio.TimeoutError: proc.kill() await proc.wait() raise HTTPException(status_code=504, detail="Assistant metrics query timed out") out = stdout.decode("utf-8", errors="replace") err = stderr.decode("utf-8", errors="replace") if proc.returncode != 0: raise HTTPException( status_code=502, detail={ "host": PROJECTOR_SSH_HOST, "remote_dir": PROJECTOR_REMOTE_DIR, "exit_code": proc.returncode, "stdout_tail": _tail(out), "stderr_tail": _tail(err), }, ) try: rows = _extract_json_array_from_text(out) except Exception as e: raise HTTPException( status_code=502, detail={ "message": f"Unable to parse metrics query output: {e}", "stdout_tail": _tail(out), "stderr_tail": _tail(err), }, ) return { "host": PROJECTOR_SSH_HOST, "remote_dir": PROJECTOR_REMOTE_DIR, "rows": rows, } async def run_remote_assistant_action( action_id: str, payload: AssistantExecuteStepPayload, step: AssistantPlanStep, status: str, output_json: Dict[str, Any], error_text: Optional[str], ) -> Dict[str, Any]: parts = [ ASSISTANT_ACTION_REMOTE_SCRIPT, action_id, now_iso(), payload.task_type, payload.release_name or "", _b64(payload.objective), step.step_id, _b64(step.title), step.action_type, "true" if step.requires_approval else "false", "true" if payload.approved else "false", status, _b64(json.dumps(output_json, ensure_ascii=False)), _b64(error_text or ""), ] command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}" ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] proc = await asyncio.create_subprocess_exec( *ssh_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) except asyncio.TimeoutError: proc.kill() await proc.wait() raise HTTPException(status_code=504, detail="Assistant action logging timed out") out = stdout.decode("utf-8", errors="replace") err = stderr.decode("utf-8", errors="replace") result = { "host": PROJECTOR_SSH_HOST, "remote_dir": PROJECTOR_REMOTE_DIR, "exit_code": proc.returncode, "stdout_tail": _tail(out), "stderr_tail": _tail(err), } if proc.returncode != 0: raise HTTPException(status_code=502, detail=result) return result async def run_remote_query_assistant_actions( status: Optional[str], task_type: Optional[str], release_name: Optional[str], step_id: Optional[str], action_type: Optional[str], limit: int, ) -> Dict[str, Any]: parts = [ ASSISTANT_ACTIONS_QUERY_REMOTE_SCRIPT, status or "", task_type or "", release_name or "", step_id or "", action_type or "", str(limit), ] command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}" ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] proc = await asyncio.create_subprocess_exec( *ssh_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) except asyncio.TimeoutError: proc.kill() await proc.wait() raise HTTPException(status_code=504, detail="Assistant actions query timed out") out = stdout.decode("utf-8", errors="replace") err = stderr.decode("utf-8", errors="replace") if proc.returncode != 0: raise HTTPException( status_code=502, detail={ "host": PROJECTOR_SSH_HOST, "remote_dir": PROJECTOR_REMOTE_DIR, "exit_code": proc.returncode, "stdout_tail": _tail(out), "stderr_tail": _tail(err), }, ) try: rows = _extract_json_array_from_text(out) except Exception as e: raise HTTPException( status_code=502, detail={ "message": f"Unable to parse actions query output: {e}", "stdout_tail": _tail(out), "stderr_tail": _tail(err), }, ) return { "host": PROJECTOR_SSH_HOST, "remote_dir": PROJECTOR_REMOTE_DIR, "rows": rows, } async def run_remote_record_run( run_id: str, run_type: str, status: str, started_at_utc: str, finished_at_utc: str, actor: str, input_json: Dict[str, Any], output_json: Optional[Dict[str, Any]], error_text: Optional[str], ) -> None: parts = [ RUNS_REMOTE_SCRIPT, run_id, run_type, status, started_at_utc, finished_at_utc, actor, _b64(json.dumps(input_json, ensure_ascii=False)), _b64(json.dumps(output_json, ensure_ascii=False) if output_json is not None else ""), _b64(error_text or ""), ] command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}" ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] proc = await asyncio.create_subprocess_exec( *ssh_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) if proc.returncode != 0: raise HTTPException( status_code=502, detail={ "message": "Failed to record run in Iceberg", "host": PROJECTOR_SSH_HOST, "exit_code": proc.returncode, "stdout_tail": _tail(stdout.decode("utf-8", errors="replace")), "stderr_tail": _tail(stderr.decode("utf-8", errors="replace")), }, ) async def run_remote_record_event( run_id: str, event_type: str, detail_json: Dict[str, Any], ) -> None: parts = [ RUN_EVENTS_REMOTE_SCRIPT, run_id, event_type, now_iso(), _b64(json.dumps(detail_json, ensure_ascii=False)), ] command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}" ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] proc = await asyncio.create_subprocess_exec( *ssh_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) if proc.returncode != 0: raise HTTPException( status_code=502, detail={ "message": "Failed to record run event in Iceberg", "host": PROJECTOR_SSH_HOST, "exit_code": proc.returncode, "stdout_tail": _tail(stdout.decode("utf-8", errors="replace")), "stderr_tail": _tail(stderr.decode("utf-8", errors="replace")), }, ) async def record_event_best_effort(run_id: str, event_type: str, detail_json: Dict[str, Any]) -> None: try: await run_remote_record_event(run_id, event_type, detail_json) except Exception as e: # Event tracing must never break the primary projection flow. print(f"[WARN] run event logging failed: run_id={run_id} event={event_type} error={e}") # --------- routes --------- @app.on_event("startup") async def startup(): await es_ensure_index() @app.get("/meta") async def meta(): return { "app_name": APP_NAME, "version": APP_VERSION, "started_at_utc": APP_STARTED_AT_UTC, "features": { "assistant_chat": True, "assistant_learn": True, "assistant_tasks_ai": True, "assistant_ui": True, }, } @app.get("/ui", include_in_schema=False) async def assistant_ui(): index_html = UI_DIR / "index.html" if not index_html.exists(): raise HTTPException(status_code=404, detail="UI not found") html = index_html.read_text(encoding="utf-8") css_path = UI_ASSETS_DIR / "styles.css" js_path = UI_ASSETS_DIR / "app.js" css = css_path.read_text(encoding="utf-8") if css_path.exists() else "" js = js_path.read_text(encoding="utf-8") if js_path.exists() else "" html = html.replace( '', f"", ) html = html.replace( '', f"", ) return HTMLResponse(content=html) @app.post("/concepts", response_model=ConceptOut) async def create_concept(payload: ConceptCreate): created = now_iso() updated = created fingerprint = make_fingerprint(payload.canonical_name, payload.kind, payload.external_ids) # Store long content in IPFS (version 1) content_doc = { "canonical_name": payload.canonical_name, "kind": payload.kind, "aliases": payload.aliases, "external_ids": payload.external_ids, "tags": payload.tags, "description": payload.description, "created_at": created, } latest_cid = await ipfs_add_json(content_doc) # LLM extras (optional) summary = None embedding: List[float] = [] base_text = payload.description or payload.canonical_name if base_text.strip(): try: summary = await ollama_summary(base_text) except Exception: summary = None try: embedding = await ollama_embed(base_text) except Exception: embedding = [] # Create vertex in JanusGraph (Concept) # Uses a stable concept_id as your canonical handle concept_id = (await gremlin_submit( """ import java.util.UUID def id = UUID.randomUUID().toString() g.addV('Concept') .property('concept_id', id) .property('canonical_name', canonical_name) .property('kind', kind) .property('aliases', aliases_json) .property('external_ids', external_ids_json) .property('tags', tags_json) .property('fingerprint', fingerprint) .property('latest_cid', latest_cid) .property('summary', summary) .property('created_at', created_at) .property('updated_at', updated_at) .values('concept_id') """, { "canonical_name": payload.canonical_name, "kind": payload.kind or "", "aliases_json": json.dumps(payload.aliases, ensure_ascii=False), "external_ids_json": json.dumps(payload.external_ids, ensure_ascii=False), "tags_json": json.dumps(payload.tags, ensure_ascii=False), "fingerprint": fingerprint, "latest_cid": latest_cid, "summary": summary or "", "created_at": created, "updated_at": updated, }, ))[0] out = { "concept_id": concept_id, "canonical_name": payload.canonical_name, "kind": payload.kind, "aliases": payload.aliases, "external_ids": payload.external_ids, "tags": payload.tags, "latest_cid": latest_cid, "summary": summary, "created_at": created, "updated_at": updated, } # Index in Elasticsearch doc = dict(out) doc["fingerprint"] = fingerprint if embedding: doc["embedding"] = embedding await es_index(doc) return out @app.get("/concepts/{concept_id}", response_model=ConceptOut) async def get_concept(concept_id: str): rows = await gremlin_submit( """ g.V().hasLabel('Concept').has('concept_id', concept_id) .project('concept_id','canonical_name','kind','aliases','external_ids','tags','latest_cid','summary','created_at','updated_at') .by(values('concept_id')) .by(values('canonical_name')) .by(values('kind')) .by(values('aliases')) .by(values('external_ids')) .by(values('tags')) .by(values('latest_cid')) .by(values('summary')) .by(values('created_at')) .by(values('updated_at')) """, {"concept_id": concept_id}, ) if not rows: raise HTTPException(status_code=404, detail="Concept not found") r = rows[0] external_ids = {} try: external_ids = json.loads(r.get("external_ids") or "{}") except Exception: external_ids = {} aliases = [] try: aliases = json.loads(r.get("aliases") or "[]") if not isinstance(aliases, list): aliases = [] except Exception: aliases = [] tags = [] try: tags = json.loads(r.get("tags") or "[]") if not isinstance(tags, list): tags = [] except Exception: tags = [] return { "concept_id": r.get("concept_id"), "canonical_name": r.get("canonical_name"), "kind": (r.get("kind") or None), "aliases": aliases, "external_ids": external_ids, "tags": tags, "latest_cid": (r.get("latest_cid") or None), "summary": (r.get("summary") or None), "created_at": r.get("created_at"), "updated_at": r.get("updated_at"), } @app.get("/search") async def search(q: str, size: int = 10): results = await es_search(q, size=size) return {"q": q, "results": results} @app.get("/assistant/inbox") async def assistant_inbox( release_name: Optional[str] = None, q: Optional[str] = None, limit: int = 20, x_admin_api_key: Optional[str] = Header(default=None), ): check_admin_api_key(x_admin_api_key) bounded_limit = max(1, min(limit, 200)) hits = await es_recent_messages( size=bounded_limit, release_name=release_name, q=q, ) rows: List[Dict[str, Any]] = [] for h in hits: src = h.get("_source", {}) or {} rows.append( { "concept_id": src.get("concept_id"), "source_pk": src.get("source_pk"), "source_table": src.get("source_table"), "release_name": src.get("release_name"), "concept_type": src.get("concept_type") or src.get("kind"), "display_name": src.get("display_name") or src.get("canonical_name"), "text": src.get("text"), "summary": src.get("summary"), "description": src.get("description"), "updated_at": src.get("updated_at"), "score": float(h.get("_score")) if h.get("_score") is not None else None, } ) return { "count": len(rows), "filters": { "release_name": release_name, "q": q, "limit": bounded_limit, }, "rows": rows, } @app.get("/assistant/tasks") async def assistant_tasks( release_name: Optional[str] = None, q: Optional[str] = None, only_pending: bool = True, use_ai: bool = True, limit: int = 50, x_admin_api_key: Optional[str] = Header(default=None), ): check_admin_api_key(x_admin_api_key) bounded_limit = max(1, min(limit, 500)) # Pull more messages than final task limit because each message can yield 0..N tasks. hits = await es_recent_messages( size=min(1000, max(100, bounded_limit * 4)), release_name=release_name, q=q, ) rows: List[Dict[str, Any]] = [] seen_task_ids: set[str] = set() ai_cache_hits = 0 ai_calls = 0 for h in hits: src = h.get("_source", {}) or {} extracted: List[Dict[str, Any]] = [] if use_ai: cache_key = _task_ai_cache_key(src) cached = _task_ai_cache_get(cache_key) if cached is not None: ai_cache_hits += 1 extracted = cached else: ai_calls += 1 try: extracted = await extract_pending_tasks_from_source_ai(src) except Exception as e: print(f"[WARN] assistant_tasks ai extraction failed: {e}") extracted = [] _task_ai_cache_set(cache_key, extracted) if not extracted: extracted = extract_pending_tasks_from_source(src) for t in extracted: if only_pending and t.get("status") != "pending": continue task_id = str(t.get("task_id") or "") if not task_id or task_id in seen_task_ids: continue seen_task_ids.add(task_id) rows.append(t) if len(rows) >= bounded_limit: break if len(rows) >= bounded_limit: break return { "count": len(rows), "filters": { "release_name": release_name, "q": q, "only_pending": only_pending, "use_ai": use_ai, "limit": bounded_limit, "ai_cache_ttl_sec": TASK_AI_CACHE_TTL_SEC, }, "stats": { "messages_scanned": len(hits), "ai_cache_hits": ai_cache_hits, "ai_calls": ai_calls, "ai_cache_size": len(TASK_AI_CACHE), }, "rows": rows, } @app.post("/assistant/learn") async def assistant_learn(payload: AssistantLearnPayload, x_admin_api_key: Optional[str] = Header(default=None)): check_admin_api_key(x_admin_api_key) now = now_iso() note_id = "note-" + uuid.uuid4().hex[:16] title = (payload.title or "").strip() if not title: title = payload.text.strip().splitlines()[0][:80] summary = payload.text.strip()[:280] doc = { "concept_id": f"note:{note_id}", "concept_type": "note", "display_name": title, "description": summary, "text": payload.text, "source_table": "assistant.learn", "source_pk": note_id, "release_name": payload.release_name or "", "ref_hash": "", "attributes_json": json.dumps(payload.metadata or {}, ensure_ascii=False, sort_keys=True), "canonical_name": title, "kind": "note", "aliases": [], "external_ids": {}, "tags": payload.tags or [], "latest_cid": None, "summary": summary, "created_at": now, "updated_at": now, "fingerprint": make_fingerprint(title, "note", {}), } await es_index(doc) return { "stored": True, "concept_id": doc["concept_id"], "release_name": payload.release_name, "title": title, "tags": payload.tags, } @app.post("/assistant/chat", response_model=AssistantChatResponse) async def assistant_chat(payload: AssistantChatPayload, x_admin_api_key: Optional[str] = Header(default=None)): check_admin_api_key(x_admin_api_key) session_id = (payload.session_id or str(uuid.uuid4())).strip() history: List[Dict[str, str]] = [] if payload.history: history = [{"role": h.role, "content": h.content} for h in payload.history if h.content.strip()] else: history = ASSISTANT_CHAT_SESSIONS.get(session_id, []) hits: List[Dict[str, Any]] = [] try: hits = await es_search_hits( q=payload.message, size=payload.max_sources, release_name=payload.release_name, ) except Exception as e: print(f"[WARN] assistant_chat retrieval failed: {e}") hits = [] if not hits and payload.release_name: try: hits = await es_recent_by_release(payload.release_name, size=payload.max_sources) except Exception as e: print(f"[WARN] assistant_chat release fallback failed: {e}") if not hits: try: hits = await es_recent_messages(size=payload.max_sources, release_name=payload.release_name, q=None) except Exception as e: print(f"[WARN] assistant_chat inbox fallback failed: {e}") prompt = build_chat_prompt( user_message=payload.message, history=history, source_docs=hits, release_name=payload.release_name, ) try: answer = await ollama_generate(prompt) if not answer.strip(): answer = "I don't have enough context to answer confidently. Can you share one more detail?" except Exception as e: print(f"[WARN] assistant_chat generation failed: {e}") answer = "I could not generate a response right now. Please retry." sources: List[AssistantDraftSource] = [] for h in hits: src = h.get("_source", {}) or {} sources.append( AssistantDraftSource( concept_id=str(src.get("concept_id") or ""), source_pk=src.get("source_pk"), source_table=src.get("source_table"), release_name=src.get("release_name"), score=float(h.get("_score")) if h.get("_score") is not None else None, ) ) source_count = len([s for s in sources if s.concept_id]) confidence = 0.35 if source_count >= 5: confidence = 0.85 elif source_count >= 3: confidence = 0.75 elif source_count >= 1: confidence = 0.6 if len(answer.strip()) < 30: confidence = min(confidence, 0.45) _append_chat_turn(session_id, "user", payload.message) _append_chat_turn(session_id, "assistant", answer) return AssistantChatResponse( session_id=session_id, answer=answer, sources=[s for s in sources if s.concept_id], confidence=confidence, release_name=payload.release_name, ) @app.post("/assistant/draft", response_model=AssistantDraftResponse) async def assistant_draft(payload: AssistantDraftPayload, x_admin_api_key: Optional[str] = Header(default=None)): check_admin_api_key(x_admin_api_key) retrieval_query = " ".join( [ payload.goal, payload.recipient or "", payload.task_type, " ".join(payload.constraints), ] ).strip() try: hits = await es_search_hits( q=retrieval_query, size=payload.max_sources, release_name=payload.release_name, ) except Exception as e: print(f"[WARN] assistant_draft retrieval failed: {e}") hits = [] if not hits and payload.release_name: try: hits = await es_recent_by_release(payload.release_name, size=payload.max_sources) except Exception as e: print(f"[WARN] assistant_draft release fallback retrieval failed: {e}") if not hits and payload.release_name: try: hits = await es_recent_messages( size=payload.max_sources, release_name=payload.release_name, q=None, ) except Exception as e: print(f"[WARN] assistant_draft inbox fallback retrieval failed: {e}") if not hits: try: hits = await es_recent_messages( size=payload.max_sources, release_name=None, q=None, ) except Exception as e: print(f"[WARN] assistant_draft global fallback retrieval failed: {e}") prompt = build_assistant_prompt(payload, hits) used_fallback = False try: draft = await ollama_generate(prompt) if not draft.strip(): used_fallback = True draft = fallback_draft_text(payload) except Exception as e: print(f"[WARN] assistant_draft generation failed: {e}") used_fallback = True draft = fallback_draft_text(payload) sources: List[AssistantDraftSource] = [] for h in hits: src = h.get("_source", {}) or {} sources.append( AssistantDraftSource( concept_id=str(src.get("concept_id") or ""), source_pk=src.get("source_pk"), source_table=src.get("source_table"), release_name=src.get("release_name"), score=float(h.get("_score")) if h.get("_score") is not None else None, ) ) source_count = len([s for s in sources if s.concept_id]) if source_count >= 5: confidence = 0.85 elif source_count >= 3: confidence = 0.75 elif source_count >= 1: confidence = 0.6 else: confidence = 0.35 if len(draft.strip()) < 40: confidence = min(confidence, 0.45) if used_fallback: confidence = min(confidence, 0.4) source_ids = [s.concept_id for s in sources if s.concept_id] citation_required = confidence < 0.75 or used_fallback if citation_required and source_ids: already_cited = any(cid in draft for cid in source_ids) if not already_cited: cited = ", ".join(source_ids[:3]) draft = f"{draft.rstrip()}\n\nSources: {cited}" return AssistantDraftResponse( task_type=payload.task_type, draft=draft, sources=[s for s in sources if s.concept_id], confidence=confidence, needs_review=True, release_name=payload.release_name, ) @app.post("/assistant/plan", response_model=AssistantPlanResponse) async def assistant_plan(payload: AssistantPlanPayload, x_admin_api_key: Optional[str] = Header(default=None)): check_admin_api_key(x_admin_api_key) retrieval_query = " ".join([payload.objective, payload.task_type, " ".join(payload.constraints)]).strip() try: hits = await es_search_hits( q=retrieval_query, size=payload.max_sources, release_name=payload.release_name, ) except Exception as e: print(f"[WARN] assistant_plan retrieval failed: {e}") hits = [] if not hits and payload.release_name: try: hits = await es_recent_by_release(payload.release_name, size=payload.max_sources) except Exception as e: print(f"[WARN] assistant_plan release fallback retrieval failed: {e}") prompt = build_assistant_plan_prompt(payload, hits) used_fallback = False plan_steps: List[AssistantPlanStep] = [] try: raw = await ollama_generate(prompt) parsed = _extract_json_object_from_text(raw) raw_steps = parsed.get("plan") if isinstance(raw_steps, list): allowed_action_types = {"research", "draft", "ask_user", "prepare_data", "review"} for i, s in enumerate(raw_steps[: payload.max_steps]): if not isinstance(s, dict): continue step_id = str(s.get("step_id") or f"S{i+1}").strip() or f"S{i+1}" title = str(s.get("title") or "").strip() if not title: continue at = str(s.get("action_type") or "research").strip() action_type = at if at in allowed_action_types else "research" requires_approval = bool(s.get("requires_approval", False)) notes = str(s.get("notes")).strip() if s.get("notes") is not None else None plan_steps.append( AssistantPlanStep( step_id=step_id, title=title, action_type=action_type, # type: ignore[arg-type] requires_approval=requires_approval, notes=notes, ) ) except Exception as e: print(f"[WARN] assistant_plan generation failed: {e}") used_fallback = True if not plan_steps: used_fallback = True plan_steps = fallback_plan(payload) sources: List[AssistantDraftSource] = [] for h in hits: src = h.get("_source", {}) or {} sources.append( AssistantDraftSource( concept_id=str(src.get("concept_id") or ""), source_pk=src.get("source_pk"), source_table=src.get("source_table"), release_name=src.get("release_name"), score=float(h.get("_score")) if h.get("_score") is not None else None, ) ) source_count = len([s for s in sources if s.concept_id]) if source_count >= 5: confidence = 0.85 elif source_count >= 3: confidence = 0.75 elif source_count >= 1: confidence = 0.6 else: confidence = 0.35 if used_fallback: confidence = min(confidence, 0.45) return AssistantPlanResponse( objective=payload.objective, task_type=payload.task_type, plan=plan_steps, sources=[s for s in sources if s.concept_id], needs_review=True, confidence=confidence, release_name=payload.release_name, ) @app.post("/assistant/execute-step", response_model=AssistantExecuteStepResponse) async def assistant_execute_step(payload: AssistantExecuteStepPayload, x_admin_api_key: Optional[str] = Header(default=None)): check_admin_api_key(x_admin_api_key) action_id = str(uuid.uuid4()) started_at_utc = now_iso() run_input = payload.model_dump() await record_event_best_effort(action_id, "started", {"step_id": payload.step_id, "approved": payload.approved}) step = find_plan_step(payload.plan, payload.step_id) if step is None: error_text = f"Step '{payload.step_id}' not found in plan." await run_remote_assistant_action( action_id=action_id, payload=payload, step=AssistantPlanStep( step_id=payload.step_id, title="missing-step", action_type="review", requires_approval=True, notes=error_text, ), status="blocked", output_json={"reason": error_text}, error_text=error_text, ) await run_remote_record_run( run_id=action_id, run_type="assistant_execute_step", status="failed", started_at_utc=started_at_utc, finished_at_utc=now_iso(), actor="admin-api", input_json=run_input, output_json={"reason": error_text}, error_text=error_text, ) raise HTTPException(status_code=400, detail=error_text) policy_error = enforce_step_policy(payload, step) if policy_error: out = {"policy_blocked": True, "reason": policy_error} await record_event_best_effort(action_id, "policy_blocked", {"step_id": step.step_id, "reason": policy_error}) await run_remote_assistant_action( action_id=action_id, payload=payload, step=step, status="blocked", output_json=out, error_text=policy_error, ) await run_remote_record_run( run_id=action_id, run_type="assistant_execute_step", status="failed", started_at_utc=started_at_utc, finished_at_utc=now_iso(), actor="admin-api", input_json=run_input, output_json=out, error_text=policy_error, ) return AssistantExecuteStepResponse( action_id=action_id, step_id=step.step_id, status="blocked", output=out, needs_review=True, ) try: output = await execute_plan_step(payload, step) await record_event_best_effort(action_id, "executed", {"step_id": step.step_id, "action_type": step.action_type}) await run_remote_assistant_action( action_id=action_id, payload=payload, step=step, status="executed", output_json=output, error_text=None, ) await run_remote_record_run( run_id=action_id, run_type="assistant_execute_step", status="succeeded", started_at_utc=started_at_utc, finished_at_utc=now_iso(), actor="admin-api", input_json=run_input, output_json=output, error_text=None, ) await record_event_best_effort(action_id, "recorded", {"status": "succeeded"}) return AssistantExecuteStepResponse( action_id=action_id, step_id=step.step_id, status="executed", output=output, needs_review=True, ) except Exception as e: err_text = str(e) await run_remote_assistant_action( action_id=action_id, payload=payload, step=step, status="blocked", output_json={"error": err_text}, error_text=err_text, ) await run_remote_record_run( run_id=action_id, run_type="assistant_execute_step", status="failed", started_at_utc=started_at_utc, finished_at_utc=now_iso(), actor="admin-api", input_json=run_input, output_json=None, error_text=err_text, ) await record_event_best_effort(action_id, "recorded", {"status": "failed"}) raise HTTPException(status_code=500, detail=err_text) @app.post("/assistant/feedback") async def assistant_feedback(payload: AssistantFeedbackPayload, x_admin_api_key: Optional[str] = Header(default=None)): check_admin_api_key(x_admin_api_key) run_id = str(uuid.uuid4()) run_input = payload.model_dump() started_at_utc = now_iso() await record_event_best_effort( run_id, "started", {"input": {"outcome": payload.outcome, "task_type": payload.task_type, "release_name": payload.release_name}}, ) try: result = await run_remote_assistant_feedback(run_id, payload) await record_event_best_effort( run_id, "feedback_recorded", {"outcome": payload.outcome, "task_type": payload.task_type}, ) await run_remote_record_run( run_id=run_id, run_type="assistant_feedback", status="succeeded", started_at_utc=started_at_utc, finished_at_utc=now_iso(), actor="admin-api", input_json=run_input, output_json=result, error_text=None, ) await record_event_best_effort(run_id, "recorded", {"status": "succeeded"}) return {"run_id": run_id, "result": result} except HTTPException as e: err_text = e.detail if isinstance(e.detail, str) else json.dumps(e.detail, ensure_ascii=False) await run_remote_record_run( run_id=run_id, run_type="assistant_feedback", status="failed", started_at_utc=started_at_utc, finished_at_utc=now_iso(), actor="admin-api", input_json=run_input, output_json=None, error_text=err_text, ) await record_event_best_effort(run_id, "recorded", {"status": "failed"}) raise except Exception as e: await run_remote_record_run( run_id=run_id, run_type="assistant_feedback", status="failed", started_at_utc=started_at_utc, finished_at_utc=now_iso(), actor="admin-api", input_json=run_input, output_json=None, error_text=str(e), ) await record_event_best_effort(run_id, "recorded", {"status": "failed"}) raise @app.get("/assistant/feedback") async def assistant_feedback_list( outcome: Optional[Literal["accepted", "edited", "rejected"]] = None, task_type: Optional[Literal["message", "finance", "gov", "general"]] = None, release_name: Optional[str] = None, limit: int = 50, x_admin_api_key: Optional[str] = Header(default=None), ): check_admin_api_key(x_admin_api_key) bounded_limit = max(1, min(limit, 500)) result = await run_remote_query_assistant_feedback( outcome=outcome, task_type=task_type, release_name=release_name, limit=bounded_limit, ) rows = result["rows"] return { "count": len(rows), "filters": { "outcome": outcome, "task_type": task_type, "release_name": release_name, "limit": bounded_limit, }, "rows": rows, } @app.get("/assistant/metrics") async def assistant_metrics( task_type: Optional[Literal["message", "finance", "gov", "general"]] = None, release_name: Optional[str] = None, outcome: Optional[Literal["accepted", "edited", "rejected"]] = None, group_by: Literal["task_type", "release_name", "both"] = "both", limit: int = 100, x_admin_api_key: Optional[str] = Header(default=None), ): check_admin_api_key(x_admin_api_key) bounded_limit = max(1, min(limit, 1000)) result = await run_remote_query_assistant_metrics( task_type=task_type, release_name=release_name, outcome=outcome, group_by=group_by, limit=bounded_limit, ) rows = result["rows"] return { "count": len(rows), "filters": { "task_type": task_type, "release_name": release_name, "outcome": outcome, "group_by": group_by, "limit": bounded_limit, }, "rows": rows, } @app.get("/assistant/actions") async def assistant_actions( status: Optional[Literal["blocked", "executed"]] = None, task_type: Optional[Literal["message", "finance", "gov", "general"]] = None, release_name: Optional[str] = None, step_id: Optional[str] = None, action_type: Optional[Literal["research", "draft", "ask_user", "prepare_data", "review"]] = None, limit: int = 50, x_admin_api_key: Optional[str] = Header(default=None), ): check_admin_api_key(x_admin_api_key) bounded_limit = max(1, min(limit, 500)) result = await run_remote_query_assistant_actions( status=status, task_type=task_type, release_name=release_name, step_id=step_id, action_type=action_type, limit=bounded_limit, ) rows = result["rows"] return { "count": len(rows), "filters": { "status": status, "task_type": task_type, "release_name": release_name, "step_id": step_id, "action_type": action_type, "limit": bounded_limit, }, "rows": rows, } @app.post("/admin/project-release") async def project_release(payload: ProjectionTrigger, x_admin_api_key: Optional[str] = Header(default=None)): check_admin_api_key(x_admin_api_key) run_id = str(uuid.uuid4()) run_input = payload.model_dump() started_at_utc = now_iso() await record_event_best_effort(run_id, "started", {"input": run_input}) try: result = await run_remote_projector(payload) if result.get("spark_read_done"): await record_event_best_effort(run_id, "spark_read_done", {"release_name": payload.release_name}) if result.get("projection_done"): await record_event_best_effort( run_id, "projection_done", {"targets": payload.targets, "dry_run": payload.dry_run}, ) await run_remote_record_run( run_id=run_id, run_type="projection", status="succeeded", started_at_utc=started_at_utc, finished_at_utc=now_iso(), actor="admin-api", input_json=run_input, output_json=result, error_text=None, ) await record_event_best_effort(run_id, "recorded", {"status": "succeeded"}) return {"run_id": run_id, "result": result} except HTTPException as e: err_text = e.detail if isinstance(e.detail, str) else json.dumps(e.detail, ensure_ascii=False) await run_remote_record_run( run_id=run_id, run_type="projection", status="failed", started_at_utc=started_at_utc, finished_at_utc=now_iso(), actor="admin-api", input_json=run_input, output_json=None, error_text=err_text, ) await record_event_best_effort(run_id, "recorded", {"status": "failed"}) raise except Exception as e: await run_remote_record_run( run_id=run_id, run_type="projection", status="failed", started_at_utc=started_at_utc, finished_at_utc=now_iso(), actor="admin-api", input_json=run_input, output_json=None, error_text=str(e), ) await record_event_best_effort(run_id, "recorded", {"status": "failed"}) raise @app.post("/admin/ingest-email-imap") async def ingest_email_imap(payload: EmailImapIngestPayload, x_admin_api_key: Optional[str] = Header(default=None)): check_admin_api_key(x_admin_api_key) run_id = str(uuid.uuid4()) started_at_utc = now_iso() run_input = payload.model_dump() if "password" in run_input: run_input["password"] = "***" effective_since_uid = payload.since_uid if effective_since_uid is None and payload.incremental: effective_since_uid = await run_remote_query_imap_checkpoint( host=payload.host, mailbox=payload.mailbox, username=payload.username, table=payload.table, ) if effective_since_uid is not None: effective_search_criteria = f"UID {int(effective_since_uid) + 1}:*" else: effective_search_criteria = payload.search_criteria await record_event_best_effort( run_id, "started", { "input": { "host": payload.host, "mailbox": payload.mailbox, "search_criteria": effective_search_criteria, "since_uid": effective_since_uid, "max_messages": payload.max_messages, "table": payload.table, "dedupe_mode": payload.dedupe_mode, } }, ) try: items = await asyncio.to_thread( fetch_imap_messages_blocking, payload, effective_search_criteria, effective_since_uid, ) max_uid_fetched: Optional[int] = None for m in items: uid_raw = m.metadata.get("imap_uid") try: uid_int = int(uid_raw) except Exception: continue if max_uid_fetched is None or uid_int > max_uid_fetched: max_uid_fetched = uid_int batch_payload = MessageIngestBatchPayload( table=payload.table, dedupe_mode=payload.dedupe_mode, messages=items, ) ingest_result = await run_remote_ingest_messages_batch(batch_payload) result = { "incremental": payload.incremental, "since_uid": effective_since_uid, "search_criteria_used": effective_search_criteria, "max_uid_fetched": max_uid_fetched, "fetched_messages": len(items), "ingested_rows_requested": len(items), "ingest_result": ingest_result, } await record_event_best_effort( run_id, "ingest_done", {"fetched_messages": len(items), "table": payload.table, "max_uid_fetched": max_uid_fetched}, ) await run_remote_record_run( run_id=run_id, run_type="ingest_email_imap", status="succeeded", started_at_utc=started_at_utc, finished_at_utc=now_iso(), actor="admin-api", input_json=run_input, output_json=result, error_text=None, ) await record_event_best_effort(run_id, "recorded", {"status": "succeeded"}) return {"run_id": run_id, "result": result} except HTTPException as e: err_text = e.detail if isinstance(e.detail, str) else json.dumps(e.detail, ensure_ascii=False) await run_remote_record_run( run_id=run_id, run_type="ingest_email_imap", status="failed", started_at_utc=started_at_utc, finished_at_utc=now_iso(), actor="admin-api", input_json=run_input, output_json=None, error_text=err_text, ) await record_event_best_effort(run_id, "recorded", {"status": "failed"}) raise except Exception as e: await run_remote_record_run( run_id=run_id, run_type="ingest_email_imap", status="failed", started_at_utc=started_at_utc, finished_at_utc=now_iso(), actor="admin-api", input_json=run_input, output_json=None, error_text=str(e), ) await record_event_best_effort(run_id, "recorded", {"status": "failed"}) raise @app.post("/admin/poll-and-project") async def poll_and_project(payload: PollAndProjectPayload, x_admin_api_key: Optional[str] = Header(default=None)): check_admin_api_key(x_admin_api_key) if payload.imap.table != "lake.db1.messages": raise HTTPException( status_code=400, detail="poll-and-project currently supports only table lake.db1.messages", ) run_id = str(uuid.uuid4()) started_at_utc = now_iso() run_input = payload.model_dump() if run_input.get("imap", {}).get("password"): run_input["imap"]["password"] = "***" await record_event_best_effort( run_id, "started", { "input": { "imap": { "host": payload.imap.host, "mailbox": payload.imap.mailbox, "incremental": payload.imap.incremental, "since_uid": payload.imap.since_uid, "max_messages": payload.imap.max_messages, "table": payload.imap.table, }, "targets": payload.targets, "dry_run": payload.dry_run, "project_if_no_new": payload.project_if_no_new, } }, ) try: ingest = await ingest_email_imap(payload.imap, x_admin_api_key) ingest_result = ingest["result"] fetched_messages = int(ingest_result.get("fetched_messages", 0)) await record_event_best_effort( run_id, "ingest_done", { "ingest_run_id": ingest.get("run_id"), "fetched_messages": fetched_messages, "max_uid_fetched": ingest_result.get("max_uid_fetched"), }, ) if fetched_messages <= 0 and not payload.project_if_no_new: result = { "ingest_run_id": ingest.get("run_id"), "ingest_result": ingest_result, "skipped": True, "reason": "No new messages", } await run_remote_record_run( run_id=run_id, run_type="poll_and_project", status="succeeded", started_at_utc=started_at_utc, finished_at_utc=now_iso(), actor="admin-api", input_json=run_input, output_json=result, error_text=None, ) await record_event_best_effort(run_id, "recorded", {"status": "succeeded"}) return {"run_id": run_id, "result": result} if payload.release_name: release_name = payload.release_name else: ts = datetime.now(timezone.utc).strftime("%Y-%m-%d_%H%M%S") release_name = f"{payload.release_prefix}_{ts}_messages-auto" create_result = await run_remote_create_messages_release(release_name) await record_event_best_effort( run_id, "release_created", {"release_name": release_name}, ) projection = ProjectionTrigger( release_name=release_name, targets=payload.targets, concept_table=payload.concept_table, dry_run=payload.dry_run, ) projection_result = await run_remote_projector(projection) await record_event_best_effort( run_id, "projection_done", {"release_name": release_name, "targets": payload.targets}, ) result = { "ingest_run_id": ingest.get("run_id"), "ingest_result": ingest_result, "release": create_result, "projection": projection_result, "release_name": release_name, "skipped": False, } await run_remote_record_run( run_id=run_id, run_type="poll_and_project", status="succeeded", started_at_utc=started_at_utc, finished_at_utc=now_iso(), actor="admin-api", input_json=run_input, output_json=result, error_text=None, ) await record_event_best_effort(run_id, "recorded", {"status": "succeeded"}) return {"run_id": run_id, "result": result} except HTTPException as e: err_text = e.detail if isinstance(e.detail, str) else json.dumps(e.detail, ensure_ascii=False) await run_remote_record_run( run_id=run_id, run_type="poll_and_project", status="failed", started_at_utc=started_at_utc, finished_at_utc=now_iso(), actor="admin-api", input_json=run_input, output_json=None, error_text=err_text, ) await record_event_best_effort(run_id, "recorded", {"status": "failed"}) raise except Exception as e: await run_remote_record_run( run_id=run_id, run_type="poll_and_project", status="failed", started_at_utc=started_at_utc, finished_at_utc=now_iso(), actor="admin-api", input_json=run_input, output_json=None, error_text=str(e), ) await record_event_best_effort(run_id, "recorded", {"status": "failed"}) raise @app.post("/admin/ingest-message") async def ingest_message(payload: MessageIngestPayload, x_admin_api_key: Optional[str] = Header(default=None)): check_admin_api_key(x_admin_api_key) run_id = str(uuid.uuid4()) run_input = payload.model_dump() started_at_utc = now_iso() await record_event_best_effort( run_id, "started", {"input": {"table": payload.table, "message_id": payload.message_id, "thread_id": payload.thread_id}}, ) try: result = await run_remote_ingest_message(payload) await record_event_best_effort( run_id, "ingest_done", {"table": payload.table, "message_id": payload.message_id, "thread_id": payload.thread_id}, ) await run_remote_record_run( run_id=run_id, run_type="ingest_message", status="succeeded", started_at_utc=started_at_utc, finished_at_utc=now_iso(), actor="admin-api", input_json=run_input, output_json=result, error_text=None, ) await record_event_best_effort(run_id, "recorded", {"status": "succeeded"}) return {"run_id": run_id, "result": result} except HTTPException as e: err_text = e.detail if isinstance(e.detail, str) else json.dumps(e.detail, ensure_ascii=False) await run_remote_record_run( run_id=run_id, run_type="ingest_message", status="failed", started_at_utc=started_at_utc, finished_at_utc=now_iso(), actor="admin-api", input_json=run_input, output_json=None, error_text=err_text, ) await record_event_best_effort(run_id, "recorded", {"status": "failed"}) raise except Exception as e: await run_remote_record_run( run_id=run_id, run_type="ingest_message", status="failed", started_at_utc=started_at_utc, finished_at_utc=now_iso(), actor="admin-api", input_json=run_input, output_json=None, error_text=str(e), ) await record_event_best_effort(run_id, "recorded", {"status": "failed"}) raise @app.post("/admin/ingest-messages-batch") async def ingest_messages_batch( payload: MessageIngestBatchPayload, x_admin_api_key: Optional[str] = Header(default=None), ): check_admin_api_key(x_admin_api_key) if not payload.messages: raise HTTPException(status_code=400, detail="messages must contain at least 1 item") run_id = str(uuid.uuid4()) run_input = payload.model_dump() started_at_utc = now_iso() await record_event_best_effort( run_id, "started", {"input": {"table": payload.table, "rows": len(payload.messages)}}, ) try: result = await run_remote_ingest_messages_batch(payload) await record_event_best_effort( run_id, "ingest_done", {"table": payload.table, "rows": len(payload.messages)}, ) await run_remote_record_run( run_id=run_id, run_type="ingest_messages_batch", status="succeeded", started_at_utc=started_at_utc, finished_at_utc=now_iso(), actor="admin-api", input_json=run_input, output_json=result, error_text=None, ) await record_event_best_effort(run_id, "recorded", {"status": "succeeded"}) return {"run_id": run_id, "result": result} except HTTPException as e: err_text = e.detail if isinstance(e.detail, str) else json.dumps(e.detail, ensure_ascii=False) await run_remote_record_run( run_id=run_id, run_type="ingest_messages_batch", status="failed", started_at_utc=started_at_utc, finished_at_utc=now_iso(), actor="admin-api", input_json=run_input, output_json=None, error_text=err_text, ) await record_event_best_effort(run_id, "recorded", {"status": "failed"}) raise except Exception as e: await run_remote_record_run( run_id=run_id, run_type="ingest_messages_batch", status="failed", started_at_utc=started_at_utc, finished_at_utc=now_iso(), actor="admin-api", input_json=run_input, output_json=None, error_text=str(e), ) await record_event_best_effort(run_id, "recorded", {"status": "failed"}) raise