diff --git a/app.py b/app.py
index 916ccfe..1b4220d 100644
--- a/app.py
+++ b/app.py
@@ -1,4032 +1,64 @@
-import os
-import json
-import hashlib
-import asyncio
-import shlex
-import uuid
-import base64
-import imaplib
-import email
-import tempfile
-import re
-import time
-from pathlib import Path
-from email import policy
-from email.utils import parseaddr, parsedate_to_datetime
-from datetime import datetime, timezone
-from typing import Optional, List, Dict, Any, Literal
+I'll provide a review of the code you provided. I'll point out some potential issues, suggestions for improvement, and best practices.
-import httpx
-from fastapi import FastAPI, HTTPException, Header
-from fastapi.responses import FileResponse, HTMLResponse
-from fastapi.staticfiles import StaticFiles
-from pydantic import BaseModel, Field
-from gremlin_python.driver import client as gremlin_client
-from gremlin_python.driver.serializer import GraphSONSerializersV3d0
-from dotenv import load_dotenv
+**Overall Structure**
-APP_NAME = "concept-api"
-APP_VERSION = os.getenv("APP_VERSION", "dev-local")
-APP_STARTED_AT_UTC = datetime.now(timezone.utc).isoformat()
+The code is well-structured and organized into separate functions for each API endpoint. This makes it easy to read and understand. However, there are some long functions that could be broken down into smaller ones for better maintainability.
-# Keep env loading behavior aligned with connectivity_check.py.
-load_dotenv()
+**Naming Conventions**
-# ---- config (set these env vars) ----
-GREMLIN_URL = os.getenv("GREMLIN_URL", "ws://localhost:8182/gremlin")
-ES_URL = os.getenv("ES_URL", "http://localhost:9200")
-ES_INDEX = os.getenv("ES_INDEX", "concepts")
-IPFS_API = os.getenv("IPFS_API", "http://localhost:5001") # Kubo HTTP API
-OLLAMA_URL = os.getenv("OLLAMA_URL", "http://localhost:11434")
-OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "llama3.1:8b")
-OLLAMA_EMBED_MODEL = os.getenv("OLLAMA_EMBED_MODEL", "nomic-embed-text")
-PROJECTOR_SSH_HOST = os.getenv("PROJECTOR_SSH_HOST", "lakehouse-core.rakeroots.lan")
-PROJECTOR_REMOTE_DIR = os.getenv("PROJECTOR_REMOTE_DIR", "/tmp/jecio")
-PROJECTOR_REMOTE_SCRIPT = os.getenv("PROJECTOR_REMOTE_SCRIPT", "./run-projector-standard.sh")
-PROJECTOR_SSH_BIN = os.getenv("PROJECTOR_SSH_BIN", "ssh")
-PROJECTOR_SSH_OPTS = os.getenv("PROJECTOR_SSH_OPTS", "-o BatchMode=yes -o ConnectTimeout=10")
-PROJECTOR_SCP_BIN = os.getenv("PROJECTOR_SCP_BIN", "scp")
-PROJECTOR_SCP_OPTS = os.getenv("PROJECTOR_SCP_OPTS", "-o BatchMode=yes -o ConnectTimeout=10")
-PROJECTOR_TIMEOUT_SEC = int(os.getenv("PROJECTOR_TIMEOUT_SEC", "900"))
-ADMIN_API_KEY = os.getenv("ADMIN_API_KEY", "")
-RUNS_REMOTE_SCRIPT = os.getenv("RUNS_REMOTE_SCRIPT", "./record-run-via-spark-container.sh")
-RUN_EVENTS_REMOTE_SCRIPT = os.getenv("RUN_EVENTS_REMOTE_SCRIPT", "./record-run-event-via-spark-container.sh")
-INGEST_MESSAGE_REMOTE_SCRIPT = os.getenv("INGEST_MESSAGE_REMOTE_SCRIPT", "./ingest-message-via-spark-container.sh")
-INGEST_MESSAGES_BATCH_REMOTE_SCRIPT = os.getenv(
- "INGEST_MESSAGES_BATCH_REMOTE_SCRIPT",
- "./ingest-messages-batch-via-spark-container.sh",
-)
-ASSISTANT_FEEDBACK_REMOTE_SCRIPT = os.getenv(
- "ASSISTANT_FEEDBACK_REMOTE_SCRIPT",
- "./record-assistant-feedback-via-spark-container.sh",
-)
-ASSISTANT_FEEDBACK_QUERY_REMOTE_SCRIPT = os.getenv(
- "ASSISTANT_FEEDBACK_QUERY_REMOTE_SCRIPT",
- "./query-assistant-feedback-via-spark-container.sh",
-)
-ASSISTANT_METRICS_QUERY_REMOTE_SCRIPT = os.getenv(
- "ASSISTANT_METRICS_QUERY_REMOTE_SCRIPT",
- "./query-assistant-metrics-via-spark-container.sh",
-)
-ASSISTANT_ACTION_REMOTE_SCRIPT = os.getenv(
- "ASSISTANT_ACTION_REMOTE_SCRIPT",
- "./record-assistant-action-via-spark-container.sh",
-)
-ASSISTANT_ACTIONS_QUERY_REMOTE_SCRIPT = os.getenv(
- "ASSISTANT_ACTIONS_QUERY_REMOTE_SCRIPT",
- "./query-assistant-actions-via-spark-container.sh",
-)
-ASSISTANT_PROPOSALS_REMOTE_SCRIPT = os.getenv(
- "ASSISTANT_PROPOSALS_REMOTE_SCRIPT",
- "./record-assistant-proposals-via-spark-container.sh",
-)
-ASSISTANT_PROPOSALS_QUERY_REMOTE_SCRIPT = os.getenv(
- "ASSISTANT_PROPOSALS_QUERY_REMOTE_SCRIPT",
- "./query-assistant-proposals-via-spark-container.sh",
-)
-IMAP_CHECKPOINT_REMOTE_SCRIPT = os.getenv(
- "IMAP_CHECKPOINT_REMOTE_SCRIPT",
- "./query-imap-checkpoint-via-spark-container.sh",
-)
-CREATE_MESSAGES_RELEASE_REMOTE_SCRIPT = os.getenv(
- "CREATE_MESSAGES_RELEASE_REMOTE_SCRIPT",
- "./create-messages-release-via-spark-container.sh",
-)
+Most variable and function names follow a consistent naming convention (e.g., `assistant_draft`, `es_search_hits`). However, some names are not descriptive enough or use inconsistent naming conventions (e.g., `payload` vs. `req`).
-app = FastAPI(title=APP_NAME)
-UI_DIR = Path(__file__).resolve().parent / "ui"
-UI_ASSETS_DIR = UI_DIR / "assets"
-if UI_ASSETS_DIR.exists():
- app.mount("/ui/assets", StaticFiles(directory=str(UI_ASSETS_DIR)), name="ui-assets")
+**Type Hints and Docstrings**
+There are no type hints for function parameters or return types. Adding type hints would improve code readability and help catch errors during development. Additionally, docstrings could be added to explain the purpose of each function.
-# --------- models ---------
-class ConceptCreate(BaseModel):
- canonical_name: str = Field(..., min_length=1)
- kind: Optional[str] = None
- aliases: List[str] = []
- description: Optional[str] = None
- external_ids: Dict[str, str] = {} # {"wikidata":"Q42"} etc.
- tags: List[str] = []
+**Error Handling**
+Some functions do not handle errors properly. For example, in `assistant_draft`, if an exception occurs while generating the prompt, the function will silently fail without notifying the user.
-class ConceptOut(BaseModel):
- concept_id: str
- canonical_name: str
- kind: Optional[str] = None
- aliases: List[str] = []
- external_ids: Dict[str, str] = {}
- tags: List[str] = []
- latest_cid: Optional[str] = None
- summary: Optional[str] = None
- created_at: str
- updated_at: str
+**Code Duplication**
+There is some code duplication between functions (e.g., error handling and type checking). This could be extracted into a separate utility function to reduce duplication and make the code more maintainable.
-class ProjectionTrigger(BaseModel):
- release_name: str = Field(..., min_length=1)
- targets: Literal["es", "gremlin", "both"] = "both"
- concept_table: Optional[str] = None
- dry_run: bool = False
+Here are some specific suggestions for improvement:
+1. **Extract Utility Functions**: Extract common functionality like error handling, logging, and API calls into separate utility functions to reduce code duplication.
+2. **Use Consistent Naming Conventions**: Use consistent naming conventions throughout the codebase (e.g., `assistant_draft` instead of `assistantDraft`).
+3. **Add Type Hints and Docstrings**: Add type hints for function parameters and return types, as well as docstrings to explain each function's purpose.
+4. **Improve Error Handling**: Improve error handling by catching specific exceptions and providing meaningful error messages to the user.
+5. **Reduce Function Length**: Break down long functions into smaller ones to improve maintainability.
-class MessageIngestPayload(BaseModel):
- thread_id: str = Field(..., min_length=1)
- message_id: str = Field(..., min_length=1)
- sender: str = Field(..., min_length=1)
- channel: str = Field(..., min_length=1)
- sent_at: Optional[str] = None
- body: str = Field(..., min_length=1)
- metadata: Dict[str, Any] = {}
- table: str = "lake.db1.messages"
+Here is a sample of how you could refactor the `assistant_draft` function using these suggestions:
+```python
+from typing import List, Dict
-class MessageIngestItem(BaseModel):
- thread_id: str = Field(..., min_length=1)
- message_id: str = Field(..., min_length=1)
- sender: str = Field(..., min_length=1)
- channel: str = Field(..., min_length=1)
- sent_at: Optional[str] = None
- body: str = Field(..., min_length=1)
- metadata: Dict[str, Any] = {}
-
-
-class MessageIngestBatchPayload(BaseModel):
- table: str = "lake.db1.messages"
- dedupe_mode: Literal["none", "message_id", "thread_message"] = "none"
- messages: List[MessageIngestItem] = Field(default_factory=list)
-
-
-class EmailImapIngestPayload(BaseModel):
- host: str = Field(..., min_length=1)
- port: int = 993
- use_ssl: bool = True
- username: str = Field(..., min_length=1)
- password: Optional[str] = None
- mailbox: str = "INBOX"
- search_criteria: str = "ALL"
- max_messages: int = Field(default=50, ge=1, le=500)
- table: str = "lake.db1.messages"
- dedupe_mode: Literal["none", "message_id", "thread_message"] = "message_id"
- channel: str = "email-imap"
- incremental: bool = True
- since_uid: Optional[int] = None
-
-
-class PollAndProjectPayload(BaseModel):
- imap: EmailImapIngestPayload
- release_name: Optional[str] = None
- release_prefix: str = "rel"
- targets: Literal["es", "gremlin", "both"] = "es"
- concept_table: str = "lake.db1.messages"
- dry_run: bool = False
- project_if_no_new: bool = False
-
-
-class AssistantDraftPayload(BaseModel):
- task_type: Literal["message", "finance", "gov", "general"] = "general"
- goal: str = Field(..., min_length=3)
- recipient: Optional[str] = None
- tone: Optional[str] = "professional"
- constraints: List[str] = Field(default_factory=list)
- release_name: Optional[str] = None
- max_sources: int = Field(default=5, ge=1, le=20)
-
-
-class AssistantDraftSource(BaseModel):
- concept_id: str
- source_pk: Optional[str] = None
- source_table: Optional[str] = None
- release_name: Optional[str] = None
- score: Optional[float] = None
-
-
-class AssistantDraftResponse(BaseModel):
- task_type: str
- draft: str
- sources: List[AssistantDraftSource]
- confidence: float
- needs_review: bool
- release_name: Optional[str] = None
-
-
-class AssistantPlanPayload(BaseModel):
- task_type: Literal["message", "finance", "gov", "general"] = "general"
- objective: str = Field(..., min_length=3)
- constraints: List[str] = Field(default_factory=list)
- release_name: Optional[str] = None
- max_sources: int = Field(default=5, ge=1, le=20)
- max_steps: int = Field(default=6, ge=1, le=20)
-
-
-class AssistantPlanStep(BaseModel):
- step_id: str
- title: str
- action_type: Literal["research", "draft", "ask_user", "prepare_data", "review"]
- requires_approval: bool = False
- notes: Optional[str] = None
-
-
-class AssistantPlanResponse(BaseModel):
- objective: str
- task_type: str
- plan: List[AssistantPlanStep]
- sources: List[AssistantDraftSource]
- needs_review: bool
- confidence: float
- release_name: Optional[str] = None
-
-
-class AssistantExecuteStepPayload(BaseModel):
- task_type: Literal["message", "finance", "gov", "general"] = "general"
- objective: str = Field(..., min_length=3)
- release_name: Optional[str] = None
- plan: List[AssistantPlanStep]
- step_id: str = Field(..., min_length=1)
- approved: bool = False
- manual_confirm_token: Optional[str] = None
-
-
-class AssistantExecuteStepResponse(BaseModel):
- action_id: str
- step_id: str
- status: Literal["blocked", "executed"]
- output: Dict[str, Any]
- needs_review: bool
-
-
-class AssistantFeedbackPayload(BaseModel):
- outcome: Literal["accepted", "edited", "rejected"]
- task_type: Literal["message", "finance", "gov", "general"] = "general"
- release_name: Optional[str] = None
- goal: Optional[str] = None
- draft: str = Field(..., min_length=1)
- final_text: Optional[str] = None
- sources: List[AssistantDraftSource] = Field(default_factory=list)
- confidence: Optional[float] = None
- needs_review: bool = True
- notes: Optional[str] = None
-
-
-class AssistantLearnPayload(BaseModel):
- text: str = Field(..., min_length=3)
- title: Optional[str] = None
- tags: List[str] = Field(default_factory=list)
- release_name: Optional[str] = None
- metadata: Dict[str, Any] = Field(default_factory=dict)
- write_code: bool = False
- code_objective: Optional[str] = None
- code_files: List[str] = Field(default_factory=list)
- code_dry_run: bool = True
- code_commit: bool = False
- code_branch: Optional[str] = None
-
-
-class AssistantChatMessage(BaseModel):
- role: Literal["user", "assistant"]
- content: str
-
-
-class AssistantChatPayload(BaseModel):
- message: str = Field(..., min_length=1)
- session_id: Optional[str] = None
- release_name: Optional[str] = None
- max_sources: int = Field(default=6, ge=1, le=20)
- history: List[AssistantChatMessage] = Field(default_factory=list)
- temperature_hint: Optional[str] = "balanced"
-
-
-class AssistantChatResponse(BaseModel):
- session_id: str
- answer: str
- sources: List[AssistantDraftSource]
- confidence: float
- release_name: Optional[str] = None
-
-
-class AssistantSelfImprovePayload(BaseModel):
- objective: str = "Improve assistant quality and reliability"
- release_name: Optional[str] = None
- max_proposals: int = Field(default=5, ge=1, le=20)
- feedback_limit: int = Field(default=50, ge=1, le=500)
- action_limit: int = Field(default=50, ge=1, le=500)
- include_edited_feedback: bool = True
- include_rejected_feedback: bool = True
- include_blocked_actions: bool = True
- apply: bool = False
-
-
-class AssistantSelfImproveProposal(BaseModel):
- proposal_id: str
- title: str
- problem: str
- change: str
- files: List[str] = Field(default_factory=list)
- risk: Literal["low", "medium", "high"] = "medium"
- tests: List[str] = Field(default_factory=list)
- auto_apply_safe: bool = False
-
-
-class AssistantSelfImproveResponse(BaseModel):
- objective: str
- release_name: Optional[str] = None
- proposal_set_id: str
- created_at_utc: str
- summary: str
- proposals: List[AssistantSelfImproveProposal]
- signals: Dict[str, Any]
- apply_blocked: bool
- apply_block_reason: Optional[str] = None
-
-
-class AssistantSelfImproveApplyPayload(BaseModel):
- objective: Optional[str] = None
- release_name: Optional[str] = None
- proposal_set_id: Optional[str] = None
- proposal_id: Optional[str] = None
- proposal: Optional[AssistantSelfImproveProposal] = None
- dry_run: bool = False
-
-
-class AssistantSelfImproveApplyResponse(BaseModel):
- applied: bool
- dry_run: bool
- repo_dir: str
- branch_name: str
- proposal_file: str
- commit: Optional[str] = None
- commands: List[str] = Field(default_factory=list)
- detail: Optional[str] = None
-
-
-# --------- helpers ---------
-def now_iso() -> str:
- return datetime.now(timezone.utc).isoformat()
-
-
-def _tail(text: str, max_chars: int = 8000) -> str:
- if len(text) <= max_chars:
- return text
- return text[-max_chars:]
-
-
-def check_admin_api_key(x_admin_api_key: Optional[str]) -> None:
- if not ADMIN_API_KEY:
- raise HTTPException(status_code=503, detail="ADMIN_API_KEY is not configured")
- if x_admin_api_key != ADMIN_API_KEY:
- raise HTTPException(status_code=401, detail="Unauthorized")
-
-def make_fingerprint(name: str, kind: Optional[str], external_ids: Dict[str, str]) -> str:
- norm = (name or "").strip().lower()
- k = (kind or "").strip().lower()
- ext = "|".join(f"{a}:{b}".lower() for a, b in sorted(external_ids.items()))
- raw = f"{norm}|{k}|{ext}"
- return hashlib.sha256(raw.encode("utf-8")).hexdigest()
-
-
-def _clean_header_id(v: Optional[str]) -> str:
- if not v:
- return ""
- return v.strip().strip("<>").strip()
-
-
-def _normalize_thread_id(msg_id: str, refs: str, in_reply_to: str, subject: str, sender: str) -> str:
- refs_clean = _clean_header_id(refs.split()[-1] if refs else "")
- in_reply_clean = _clean_header_id(in_reply_to)
- if refs_clean:
- return f"thread:{refs_clean}"
- if in_reply_clean:
- return f"thread:{in_reply_clean}"
- seed = f"{subject.strip().lower()}|{sender.strip().lower()}"
- if not seed.strip("|"):
- seed = msg_id
- return "thread:" + hashlib.sha256(seed.encode("utf-8")).hexdigest()[:24]
-
-
-def _extract_body_text(msg: email.message.Message) -> str:
+def get_draft(prompt: str) -> str:
+ """Generate a draft based on the given prompt."""
try:
- if msg.is_multipart():
- for part in msg.walk():
- ctype = (part.get_content_type() or "").lower()
- disp = (part.get("Content-Disposition") or "").lower()
- if ctype == "text/plain" and "attachment" not in disp:
- payload_obj = part.get_content()
- if isinstance(payload_obj, str):
- return payload_obj.strip()
- if isinstance(payload_obj, bytes):
- return payload_obj.decode(part.get_content_charset() or "utf-8", errors="replace").strip()
- for part in msg.walk():
- ctype = (part.get_content_type() or "").lower()
- if ctype == "text/html":
- html_obj = part.get_content()
- if isinstance(html_obj, bytes):
- html_obj = html_obj.decode(part.get_content_charset() or "utf-8", errors="replace")
- if isinstance(html_obj, str):
- return html_obj.strip()
- return ""
- payload_obj = msg.get_content()
- if isinstance(payload_obj, str):
- return payload_obj.strip()
- if isinstance(payload_obj, bytes):
- return payload_obj.decode(msg.get_content_charset() or "utf-8", errors="replace").strip()
- return ""
- except Exception:
- return ""
-
-
-def fetch_imap_messages_blocking(
- payload: EmailImapIngestPayload,
- effective_search_criteria: str,
- since_uid: Optional[int],
-) -> List[MessageIngestItem]:
- password = payload.password or os.getenv("IMAP_PASSWORD", "")
- if not password:
- raise ValueError("IMAP password missing: provide payload.password or set IMAP_PASSWORD")
-
- if payload.use_ssl:
- client = imaplib.IMAP4_SSL(payload.host, payload.port)
- else:
- client = imaplib.IMAP4(payload.host, payload.port)
-
- try:
- status, _ = client.login(payload.username, password)
- if status != "OK":
- raise RuntimeError("IMAP login failed")
- status, _ = client.select(payload.mailbox, readonly=True)
- if status != "OK":
- raise RuntimeError(f"IMAP select mailbox failed: {payload.mailbox}")
-
- if since_uid is not None:
- status, search_data = client.uid("search", None, "UID", f"{int(since_uid) + 1}:*")
- else:
- status, search_data = client.uid("search", None, effective_search_criteria)
- if status != "OK":
- raise RuntimeError(f"IMAP search failed: {effective_search_criteria}")
- uid_bytes = search_data[0] if search_data else b""
- uid_list = [u for u in uid_bytes.decode("utf-8", errors="replace").split() if u]
- if since_uid is not None:
- filtered: List[str] = []
- for u in uid_list:
- try:
- if int(u) > int(since_uid):
- filtered.append(u)
- except Exception:
- continue
- uid_list = filtered
- if not uid_list:
- return []
- # For incremental UID windows, process oldest-new first so checkpointing cannot skip gaps.
- is_uid_window = since_uid is not None
- if is_uid_window:
- selected_uids = uid_list[: payload.max_messages]
- else:
- # For non-incremental scans (e.g. ALL), keep "latest N" behavior.
- selected_uids = uid_list[-payload.max_messages :]
-
- out: List[MessageIngestItem] = []
- for uid in selected_uids:
- status, msg_data = client.uid("fetch", uid, "(RFC822)")
- if status != "OK" or not msg_data:
- continue
- raw_bytes = None
- for part in msg_data:
- if isinstance(part, tuple) and len(part) >= 2 and isinstance(part[1], (bytes, bytearray)):
- raw_bytes = bytes(part[1])
- break
- if not raw_bytes:
- continue
- msg = email.message_from_bytes(raw_bytes, policy=policy.default)
-
- subject = str(msg.get("Subject") or "").strip()
- from_raw = str(msg.get("From") or "").strip()
- to_raw = str(msg.get("To") or "").strip()
- date_raw = str(msg.get("Date") or "").strip()
- msg_id_raw = str(msg.get("Message-Id") or msg.get("Message-ID") or "").strip()
- refs_raw = str(msg.get("References") or "").strip()
- in_reply_raw = str(msg.get("In-Reply-To") or "").strip()
-
- sender_email = parseaddr(from_raw)[1] or from_raw or "unknown"
- msg_id_clean = _clean_header_id(msg_id_raw)
- if not msg_id_clean:
- seed = f"{uid}|{subject}|{sender_email}|{date_raw}"
- msg_id_clean = "imap-" + hashlib.sha256(seed.encode("utf-8")).hexdigest()[:24]
-
- thread_id = _normalize_thread_id(
- msg_id=msg_id_clean,
- refs=refs_raw,
- in_reply_to=in_reply_raw,
- subject=subject,
- sender=sender_email,
- )
-
- sent_at_iso = None
- if date_raw:
- try:
- dt = parsedate_to_datetime(date_raw)
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- sent_at_iso = dt.astimezone(timezone.utc).isoformat()
- except Exception:
- sent_at_iso = None
-
- body = _extract_body_text(msg)
- if not body:
- body = f"(no body) {subject}".strip()
-
- metadata = {
- "subject": subject,
- "from": from_raw,
- "to": to_raw,
- "date": date_raw,
- "imap_uid": uid,
- "mailbox": payload.mailbox,
- "host": payload.host,
- "username": payload.username,
- }
-
- out.append(
- MessageIngestItem(
- thread_id=thread_id,
- message_id=msg_id_clean,
- sender=sender_email,
- channel=payload.channel,
- sent_at=sent_at_iso,
- body=body,
- metadata=metadata,
- )
- )
- return out
- finally:
- try:
- client.logout()
- except Exception:
- pass
-
-
-async def run_remote_query_imap_checkpoint(
- host: str,
- mailbox: str,
- username: str,
- table: str,
-) -> Optional[int]:
- parts = [
- IMAP_CHECKPOINT_REMOTE_SCRIPT,
- host,
- mailbox,
- username,
- table,
- ]
- command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}"
- ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
- proc = await asyncio.create_subprocess_exec(
- *ssh_args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- try:
- stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
- except asyncio.TimeoutError:
- proc.kill()
- await proc.wait()
- raise HTTPException(status_code=504, detail="IMAP checkpoint query timed out")
-
- out = stdout.decode("utf-8", errors="replace")
- err = stderr.decode("utf-8", errors="replace")
- if proc.returncode != 0:
- raise HTTPException(
- status_code=502,
- detail={
- "host": PROJECTOR_SSH_HOST,
- "remote_dir": PROJECTOR_REMOTE_DIR,
- "exit_code": proc.returncode,
- "stdout_tail": _tail(out),
- "stderr_tail": _tail(err),
- },
- )
- try:
- obj = _extract_json_object_from_text(out)
- val = obj.get("max_uid")
- if val is None:
- return None
- return int(val)
- except Exception as e:
- raise HTTPException(
- status_code=502,
- detail={
- "message": f"Unable to parse IMAP checkpoint output: {e}",
- "stdout_tail": _tail(out),
- "stderr_tail": _tail(err),
- },
- )
-
-
-async def run_remote_create_messages_release(release_name: str) -> Dict[str, Any]:
- parts = [
- CREATE_MESSAGES_RELEASE_REMOTE_SCRIPT,
- release_name,
- ]
- command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}"
- ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
- proc = await asyncio.create_subprocess_exec(
- *ssh_args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- try:
- stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
- except asyncio.TimeoutError:
- proc.kill()
- await proc.wait()
- raise HTTPException(status_code=504, detail="Create messages release timed out")
-
- out = stdout.decode("utf-8", errors="replace")
- err = stderr.decode("utf-8", errors="replace")
- result = {
- "host": PROJECTOR_SSH_HOST,
- "remote_dir": PROJECTOR_REMOTE_DIR,
- "exit_code": proc.returncode,
- "release_name": release_name,
- "stdout_tail": _tail(out),
- "stderr_tail": _tail(err),
- }
- if proc.returncode != 0:
- raise HTTPException(status_code=502, detail=result)
- return result
-
-
-async def ipfs_add_json(payload: Dict[str, Any]) -> str:
- # Kubo: POST /api/v0/add with file content in multipart
- data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
- files = {"file": ("concept.json", data, "application/json")}
- async with httpx.AsyncClient(timeout=30) as h:
- r = await h.post(f"{IPFS_API}/api/v0/add", files=files)
- r.raise_for_status()
- # Response is text lines of JSON; last line contains Hash
- # Often single line, but handle both
- last = r.text.strip().splitlines()[-1]
- obj = json.loads(last)
- return obj["Hash"]
-
-async def ollama_summary(text: str) -> str:
- prompt = (
- "Summarize the following concept in 1-2 sentences. "
- "Keep it factual and compact.\n\n"
- f"{text}"
- )
- async with httpx.AsyncClient(timeout=60) as h:
- r = await h.post(
- f"{OLLAMA_URL}/api/generate",
- json={"model": OLLAMA_MODEL, "prompt": prompt, "stream": False},
- )
- r.raise_for_status()
- return (r.json().get("response") or "").strip()
-
-async def ollama_embed(text: str) -> List[float]:
- async with httpx.AsyncClient(timeout=60) as h:
- r = await h.post(
- f"{OLLAMA_URL}/api/embeddings",
- json={"model": OLLAMA_EMBED_MODEL, "prompt": text},
- )
- r.raise_for_status()
- emb = r.json().get("embedding")
- if not isinstance(emb, list):
- return []
- return emb
-
-async def es_ensure_index():
- # Minimal mapping: text fields + dense_vector (optional)
- # If your ES doesn't support dense_vector, remove it.
- mapping = {
- "mappings": {
- "properties": {
- "concept_id": {"type": "keyword"},
- "canonical_name": {"type": "text"},
- "kind": {"type": "keyword"},
- "aliases": {"type": "text"},
- "tags": {"type": "keyword"},
- "summary": {"type": "text"},
- "latest_cid": {"type": "keyword"},
- "fingerprint": {"type": "keyword"},
- "created_at": {"type": "date"},
- "updated_at": {"type": "date"},
- "embedding": {"type": "dense_vector", "dims": 768, "index": False}, # may vary
- }
- }
- }
- async with httpx.AsyncClient(timeout=30) as h:
- head = await h.get(f"{ES_URL}/{ES_INDEX}")
- if head.status_code == 200:
- return
- r = await h.put(f"{ES_URL}/{ES_INDEX}", json=mapping)
- # If this fails due to dense_vector incompatibility, you can still proceed:
- if r.status_code >= 400:
- # try without vector
- mapping["mappings"]["properties"].pop("embedding", None)
- r2 = await h.put(f"{ES_URL}/{ES_INDEX}", json=mapping)
- r2.raise_for_status()
-
-async def es_index(doc: Dict[str, Any]):
- async with httpx.AsyncClient(timeout=30) as h:
- r = await h.put(f"{ES_URL}/{ES_INDEX}/_doc/{doc['concept_id']}", json=doc)
- r.raise_for_status()
-
-async def es_search(q: str, size: int = 10) -> List[Dict[str, Any]]:
- query = {
- "size": size,
- "query": {
- "multi_match": {
- "query": q,
- "fields": ["canonical_name^3", "aliases^2", "summary", "tags"],
- }
- },
- }
- async with httpx.AsyncClient(timeout=30) as h:
- r = await h.post(f"{ES_URL}/{ES_INDEX}/_search", json=query)
- r.raise_for_status()
- hits = r.json().get("hits", {}).get("hits", [])
- return [h["_source"] for h in hits]
-
-
-async def es_search_hits(q: str, size: int = 10, release_name: Optional[str] = None) -> List[Dict[str, Any]]:
- must_clause: Dict[str, Any] = {
- "multi_match": {
- "query": q,
- "fields": [
- "display_name^3",
- "canonical_name^3",
- "description^2",
- "text^2",
- "summary^2",
- "aliases^2",
- "tags",
- "source_pk^2",
- "source_table",
- ],
- }
- }
- query: Dict[str, Any] = {"size": size}
- if release_name:
- release_filter = {
- "bool": {
- "should": [
- {"term": {"release_name.keyword": release_name}},
- {"term": {"release_name": release_name}},
- {"match_phrase": {"release_name": release_name}},
- ],
- "minimum_should_match": 1,
- }
- }
- query["query"] = {
- "bool": {
- "must": [must_clause],
- "filter": [release_filter],
- }
- }
- else:
- query["query"] = must_clause
-
- async with httpx.AsyncClient(timeout=30) as h:
- r = await h.post(f"{ES_URL}/{ES_INDEX}/_search", json=query)
- r.raise_for_status()
- return r.json().get("hits", {}).get("hits", [])
-
-
-async def es_recent_by_release(release_name: str, size: int = 10) -> List[Dict[str, Any]]:
- query: Dict[str, Any] = {
- "size": size,
- "query": {
- "bool": {
- "filter": [
- {
- "bool": {
- "should": [
- {"term": {"release_name.keyword": release_name}},
- {"term": {"release_name": release_name}},
- {"match_phrase": {"release_name": release_name}},
- ],
- "minimum_should_match": 1,
- }
- }
- ]
- }
- },
- "sort": [{"updated_at": {"order": "desc", "unmapped_type": "date"}}],
- }
- async with httpx.AsyncClient(timeout=30) as h:
- r = await h.post(f"{ES_URL}/{ES_INDEX}/_search", json=query)
- r.raise_for_status()
- return r.json().get("hits", {}).get("hits", [])
-
-
-async def es_recent_messages(
- size: int = 20,
- release_name: Optional[str] = None,
- q: Optional[str] = None,
-) -> List[Dict[str, Any]]:
- filters: List[Dict[str, Any]] = [
- {
- "bool": {
- "should": [
- {"term": {"concept_type.keyword": "message"}},
- {"term": {"concept_type": "message"}},
- {"term": {"kind.keyword": "message"}},
- {"term": {"kind": "message"}},
- ],
- "minimum_should_match": 1,
- }
- }
- ]
- if release_name:
- filters.append(
- {
- "bool": {
- "should": [
- {"term": {"release_name.keyword": release_name}},
- {"term": {"release_name": release_name}},
- {"match_phrase": {"release_name": release_name}},
- ],
- "minimum_should_match": 1,
- }
- }
- )
-
- must: List[Dict[str, Any]] = []
- if q and q.strip():
- must.append(
- {
- "multi_match": {
- "query": q.strip(),
- "fields": [
- "text^3",
- "description^2",
- "summary^2",
- "display_name^2",
- "canonical_name^2",
- "source_pk^2",
- ],
- }
- }
- )
-
- query: Dict[str, Any] = {
- "size": size,
- "query": {
- "bool": {
- "filter": filters,
- "must": must,
- }
- },
- "sort": [{"updated_at": {"order": "desc", "unmapped_type": "date"}}],
- }
- async with httpx.AsyncClient(timeout=30) as h:
- r = await h.post(f"{ES_URL}/{ES_INDEX}/_search", json=query)
- r.raise_for_status()
- return r.json().get("hits", {}).get("hits", [])
-
-
-TASK_TRIGGER_RE = re.compile(
- r"\b("
- r"please|can you|could you|need to|needs to|todo|to do|follow up|follow-up|"
- r"review|fix|send|reply|schedule|book|call|remind|prepare|draft|submit|pay|"
- r"deadline|due|let me know|check in|confirm|update me|get back to me"
- r")\b",
- flags=re.IGNORECASE,
-)
-TASK_DONE_RE = re.compile(r"\b(done|completed|closed|resolved|fixed)\b", flags=re.IGNORECASE)
-DUE_HINT_RE = re.compile(
- r"\b("
- r"today|tomorrow|tonight|next week|next month|monday|tuesday|wednesday|thursday|friday|saturday|sunday|"
- r"\d{4}-\d{2}-\d{2}|"
- r"\d{1,2}:\d{2}"
- r")\b",
- flags=re.IGNORECASE,
-)
-
-TASK_AI_CACHE_TTL_SEC = int(os.getenv("TASK_AI_CACHE_TTL_SEC", "3600"))
-TASK_AI_CACHE_MAX_SIZE = int(os.getenv("TASK_AI_CACHE_MAX_SIZE", "5000"))
-TASK_AI_CACHE: Dict[str, Dict[str, Any]] = {}
-ASSISTANT_CHAT_MAX_TURNS = int(os.getenv("ASSISTANT_CHAT_MAX_TURNS", "20"))
-ASSISTANT_CHAT_SESSIONS: Dict[str, List[Dict[str, str]]] = {}
-
-
-def _split_sentences(text: str) -> List[str]:
- raw_parts = re.split(r"[\n\r]+|(?<=[.!?])\s+", text or "")
- return [p.strip() for p in raw_parts if p and p.strip()]
-
-
-def _extract_due_hint(text: str) -> Optional[str]:
- m = DUE_HINT_RE.search(text or "")
- if not m:
- return None
- return m.group(1)
-
-
-def _extract_who(text: str, default_sender: Optional[str]) -> Optional[str]:
- m = re.search(r"\b(?:to|for)\s+([A-Z][a-zA-Z0-9_-]{1,40})\b", text or "")
- if m:
- return m.group(1)
- return default_sender or None
-
-
-def extract_pending_tasks_from_source(src: Dict[str, Any]) -> List[Dict[str, Any]]:
- text = str(src.get("text") or src.get("description") or src.get("summary") or "").strip()
- if not text:
- return []
-
- attrs_raw = src.get("attributes_json")
- attrs: Dict[str, Any] = {}
- if isinstance(attrs_raw, str) and attrs_raw.strip():
- try:
- parsed = json.loads(attrs_raw)
- if isinstance(parsed, dict):
- attrs = parsed
- except Exception:
- attrs = {}
- elif isinstance(attrs_raw, dict):
- attrs = attrs_raw
-
- thread_id = attrs.get("thread_id")
- message_id = attrs.get("message_id") or src.get("source_pk")
- sender = attrs.get("sender")
- sent_at = attrs.get("sent_at")
- base_source = f"{src.get('concept_id') or ''}|{message_id or ''}"
-
- tasks: List[Dict[str, Any]] = []
- for sentence in _split_sentences(text):
- if len(sentence) < 8:
- continue
- looks_like_question = "?" in sentence
- if not TASK_TRIGGER_RE.search(sentence) and not looks_like_question:
- continue
-
- status = "done" if TASK_DONE_RE.search(sentence) else "pending"
- due_hint = _extract_due_hint(sentence)
- who = _extract_who(sentence, sender)
- task_id = "task-" + hashlib.sha256(f"{base_source}|{sentence.lower()}".encode("utf-8")).hexdigest()[:16]
- tasks.append(
- {
- "task_id": task_id,
- "status": status,
- "todo": sentence[:400],
- "due_hint": due_hint,
- "who": who,
- "concept_id": src.get("concept_id"),
- "source_pk": src.get("source_pk"),
- "source_table": src.get("source_table"),
- "release_name": src.get("release_name"),
- "thread_id": thread_id,
- "message_id": message_id,
- "sender": sender,
- "sent_at": sent_at,
- "updated_at": src.get("updated_at"),
- }
- )
- return tasks
-
-
-def _task_ai_cache_key(src: Dict[str, Any]) -> str:
- text = str(src.get("text") or src.get("description") or src.get("summary") or "")
- text_hash = hashlib.sha256(text.encode("utf-8")).hexdigest()[:24]
- base = "|".join(
- [
- str(src.get("concept_id") or ""),
- str(src.get("source_pk") or ""),
- str(src.get("updated_at") or ""),
- text_hash,
- OLLAMA_MODEL,
- ]
- )
- return hashlib.sha256(base.encode("utf-8")).hexdigest()
-
-
-def _task_ai_cache_get(key: str) -> Optional[List[Dict[str, Any]]]:
- obj = TASK_AI_CACHE.get(key)
- if not obj:
- return None
- expires_at = float(obj.get("expires_at") or 0)
- if expires_at <= time.time():
- TASK_AI_CACHE.pop(key, None)
- return None
- tasks = obj.get("tasks")
- if not isinstance(tasks, list):
- return None
- return tasks
-
-
-def _task_ai_cache_set(key: str, tasks: List[Dict[str, Any]]) -> None:
- # Keep cache bounded in a simple way by evicting oldest-expiry items first.
- if len(TASK_AI_CACHE) >= TASK_AI_CACHE_MAX_SIZE:
- keys_sorted = sorted(TASK_AI_CACHE.items(), key=lambda kv: float(kv[1].get("expires_at") or 0))
- trim_count = max(1, TASK_AI_CACHE_MAX_SIZE // 10)
- for k, _ in keys_sorted[:trim_count]:
- TASK_AI_CACHE.pop(k, None)
- TASK_AI_CACHE[key] = {
- "expires_at": time.time() + TASK_AI_CACHE_TTL_SEC,
- "tasks": tasks,
- }
-
-
-def build_task_extraction_prompt(src: Dict[str, Any]) -> str:
- text = str(src.get("text") or src.get("description") or src.get("summary") or "")[:4000]
- attrs_raw = src.get("attributes_json")
- attrs = attrs_raw if isinstance(attrs_raw, str) else json.dumps(attrs_raw or {}, ensure_ascii=False)
- return (
- "Extract actionable tasks from this message. Ignore pure marketing/promotional content.\n"
- "Return strict JSON only with shape:\n"
- '{"tasks":[{"todo":"...","status":"pending|done","due_hint":"...|null","who":"...|null"}]}\n'
- "If no actionable tasks, return {\"tasks\":[]}.\n\n"
- f"Message concept_id: {src.get('concept_id')}\n"
- f"Source pk: {src.get('source_pk')}\n"
- f"Attributes JSON: {attrs}\n"
- f"Text:\n{text}\n"
- )
-
-
-async def extract_pending_tasks_from_source_ai(src: Dict[str, Any]) -> List[Dict[str, Any]]:
- prompt = build_task_extraction_prompt(src)
- raw = await ollama_generate(prompt)
- obj = _extract_json_object_from_text(raw)
- task_items = obj.get("tasks")
- if not isinstance(task_items, list):
- return []
-
- attrs_raw = src.get("attributes_json")
- attrs: Dict[str, Any] = {}
- if isinstance(attrs_raw, str) and attrs_raw.strip():
- try:
- parsed = json.loads(attrs_raw)
- if isinstance(parsed, dict):
- attrs = parsed
- except Exception:
- attrs = {}
- elif isinstance(attrs_raw, dict):
- attrs = attrs_raw
-
- thread_id = attrs.get("thread_id")
- message_id = attrs.get("message_id") or src.get("source_pk")
- sender = attrs.get("sender")
- sent_at = attrs.get("sent_at")
- base_source = f"{src.get('concept_id') or ''}|{message_id or ''}"
- out: List[Dict[str, Any]] = []
- for item in task_items:
- if not isinstance(item, dict):
- continue
- todo = str(item.get("todo") or "").strip()
- if len(todo) < 4:
- continue
- status_raw = str(item.get("status") or "pending").strip().lower()
- status = "done" if status_raw == "done" else "pending"
- due_hint = item.get("due_hint")
- who = item.get("who")
- todo_norm = todo[:400]
- task_id = "task-" + hashlib.sha256(f"{base_source}|{todo_norm.lower()}".encode("utf-8")).hexdigest()[:16]
- out.append(
- {
- "task_id": task_id,
- "status": status,
- "todo": todo_norm,
- "due_hint": str(due_hint) if due_hint is not None else None,
- "who": str(who) if who is not None else sender,
- "concept_id": src.get("concept_id"),
- "source_pk": src.get("source_pk"),
- "source_table": src.get("source_table"),
- "release_name": src.get("release_name"),
- "thread_id": thread_id,
- "message_id": message_id,
- "sender": sender,
- "sent_at": sent_at,
- "updated_at": src.get("updated_at"),
- }
- )
- return out
-
-
-def build_chat_prompt(
- user_message: str,
- history: List[Dict[str, str]],
- source_docs: List[Dict[str, Any]],
- release_name: Optional[str],
-) -> str:
- history_lines: List[str] = []
- for t in history[-ASSISTANT_CHAT_MAX_TURNS:]:
- role = t.get("role", "")
- content = (t.get("content", "") or "").strip()
- if role in ("user", "assistant") and content:
- history_lines.append(f"{role}: {content[:1200]}")
-
- context_chunks = []
- for d in source_docs:
- src = d.get("_source", {}) or {}
- context_chunks.append(
- "\n".join(
- [
- f"concept_id: {src.get('concept_id', '')}",
- f"source_pk: {src.get('source_pk', '')}",
- f"release_name: {src.get('release_name', '')}",
- f"text: {str(src.get('text') or src.get('description') or src.get('summary') or '')[:1200]}",
- ]
- )
- )
- context = "\n\n---\n\n".join(context_chunks) if context_chunks else "No retrieved context."
- hist = "\n".join(history_lines) if history_lines else "(none)"
-
- return (
- "You are a practical personal assistant. Be concise, factual, and useful.\n"
- "Use retrieved context when available. If uncertain, say so briefly and ask one clarifying question.\n"
- "Do not claim external actions were already performed.\n\n"
- f"Release filter: {release_name or '(none)'}\n"
- f"Conversation history:\n{hist}\n\n"
- f"Retrieved context:\n{context}\n\n"
- f"User: {user_message}\n"
- "Assistant:"
- )
-
-
-def _append_chat_turn(session_id: str, role: str, content: str) -> None:
- turns = ASSISTANT_CHAT_SESSIONS.get(session_id, [])
- turns.append({"role": role, "content": content})
- max_items = ASSISTANT_CHAT_MAX_TURNS * 2
- if len(turns) > max_items:
- turns = turns[-max_items:]
- ASSISTANT_CHAT_SESSIONS[session_id] = turns
-
-
-def _slugify(text: str) -> str:
- t = re.sub(r"[^a-zA-Z0-9]+", "-", (text or "").strip().lower()).strip("-")
- return t[:60] or "proposal"
-
-
-async def _run_local_cmd(args: List[str], cwd: Path) -> Dict[str, Any]:
- proc = await asyncio.create_subprocess_exec(
- *args,
- cwd=str(cwd),
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- stdout, stderr = await proc.communicate()
- out = stdout.decode("utf-8", errors="replace")
- err = stderr.decode("utf-8", errors="replace")
- return {"code": proc.returncode, "stdout": out, "stderr": err}
-
-
-def build_self_improve_prompt(
- payload: AssistantSelfImprovePayload,
- feedback_rows: List[Dict[str, Any]],
- action_rows: List[Dict[str, Any]],
-) -> str:
- feedback_lines: List[str] = []
- for r in feedback_rows[: payload.feedback_limit]:
- feedback_lines.append(
- " | ".join(
- [
- f"outcome={r.get('outcome')}",
- f"task_type={r.get('task_type')}",
- f"release={r.get('release_name')}",
- f"goal={str(r.get('goal') or '')[:200]}",
- f"notes={str(r.get('notes') or '')[:200]}",
- f"draft={str(r.get('draft_text') or '')[:220]}",
- f"final={str(r.get('final_text') or '')[:220]}",
- ]
- )
- )
- action_lines: List[str] = []
- for r in action_rows[: payload.action_limit]:
- action_lines.append(
- " | ".join(
- [
- f"status={r.get('status')}",
- f"task_type={r.get('task_type')}",
- f"step_id={r.get('step_id')}",
- f"action_type={r.get('action_type')}",
- f"title={str(r.get('step_title') or '')[:160]}",
- f"error={str(r.get('error_text') or '')[:220]}",
- ]
- )
- )
-
- feedback_block = "\n".join(feedback_lines) if feedback_lines else "(none)"
- action_block = "\n".join(action_lines) if action_lines else "(none)"
- return (
- "You are a senior reliability engineer. Propose concrete code improvements.\n"
- "Return JSON only with exact shape:\n"
- '{'
- '"summary":"...",'
- '"proposals":[{"proposal_id":"P1","title":"...","problem":"...","change":"...","files":["app.py"],'
- '"risk":"low|medium|high","tests":["..."],"auto_apply_safe":true|false}]}'
- "\n"
- f"Create at most {payload.max_proposals} proposals.\n"
- "Prioritize high-impact, low-risk changes first.\n"
- "Only propose changes grounded in the provided signals.\n\n"
- f"Objective: {payload.objective}\n"
- f"Release filter: {payload.release_name or '(none)'}\n\n"
- f"Feedback signals:\n{feedback_block}\n\n"
- f"Action signals:\n{action_block}\n"
- )
-
-
-def fallback_self_improve_proposals() -> List[AssistantSelfImproveProposal]:
- return [
- AssistantSelfImproveProposal(
- proposal_id="P1",
- title="Improve retrieval fallback ordering",
- problem="Low-confidence drafts still occur when release-filtered retrieval returns no hits.",
- change="Add deterministic fallback chain and expose retrieval diagnostics in API responses.",
- files=["app.py"],
- risk="low",
- tests=["Call /assistant/draft with missing release_name and verify non-empty sources fallback."],
- auto_apply_safe=True,
- ),
- AssistantSelfImproveProposal(
- proposal_id="P2",
- title="Add deterministic task extraction benchmark route",
- problem="Task extraction quality is hard to evaluate consistently over time.",
- change="Introduce an eval endpoint with fixed fixtures and compare AI vs heuristic precision.",
- files=["app.py", "tests/"],
- risk="medium",
- tests=["Run fixtures for promotional and actionable emails and assert expected tasks count."],
- auto_apply_safe=False,
- ),
- ]
-
-
-def build_assistant_prompt(payload: AssistantDraftPayload, source_docs: List[Dict[str, Any]]) -> str:
- recipient = payload.recipient or "unspecified recipient"
- tone = payload.tone or "professional"
- constraints = payload.constraints or []
- constraint_lines = "\n".join(f"- {c}" for c in constraints) if constraints else "- None"
-
- context_chunks = []
- for d in source_docs:
- src = d.get("_source", {}) or {}
- context_chunks.append(
- "\n".join(
- [
- f"concept_id: {src.get('concept_id', '')}",
- f"concept_type: {src.get('concept_type', '')}",
- f"source_pk: {src.get('source_pk', '')}",
- f"source_table: {src.get('source_table', '')}",
- f"release_name: {src.get('release_name', '')}",
- f"text: {str(src.get('text') or src.get('description') or src.get('summary') or '')[:1000]}",
- ]
- )
- )
- context = "\n\n---\n\n".join(context_chunks) if context_chunks else "No retrieved context."
-
- return (
- "You are a careful personal assistant. Draft a response based only on provided context.\n"
- "If context is missing, state uncertainty and ask concise follow-up questions.\n"
- "Do not claim actions were sent; provide draft-only text.\n\n"
- f"Task type: {payload.task_type}\n"
- f"Goal: {payload.goal}\n"
- f"Recipient: {recipient}\n"
- f"Tone: {tone}\n"
- f"Constraints:\n{constraint_lines}\n\n"
- "Retrieved context:\n"
- f"{context}\n\n"
- "Output only the draft text."
- )
-
-
-async def ollama_generate(prompt: str) -> str:
- async with httpx.AsyncClient(timeout=90) as h:
- r = await h.post(
- f"{OLLAMA_URL}/api/generate",
- json={"model": OLLAMA_MODEL, "prompt": prompt, "stream": False},
- )
- r.raise_for_status()
- return (r.json().get("response") or "").strip()
-
-
-def fallback_draft_text(payload: AssistantDraftPayload) -> str:
- recipient = payload.recipient or "there"
- tone = (payload.tone or "").lower()
- if payload.task_type == "message":
- if "friendly" in tone:
- return (
- f"Hi {recipient},\n\n"
- "Thanks for your message. I received it and will follow up tomorrow.\n\n"
- "Best,"
- )
- return (
- f"Hello {recipient},\n\n"
- "I confirm receipt of your message and will follow up tomorrow.\n\n"
- "Regards,"
- )
- return (
- "Draft:\n"
- f"{payload.goal}\n\n"
- "I can refine this further once retrieval/model services are available."
- )
-
-
-def _extract_json_object_from_text(text: str) -> Dict[str, Any]:
- start = text.find("{")
- end = text.rfind("}")
- if start == -1 or end == -1 or end < start:
- raise ValueError("No JSON object found in output")
- candidate = text[start : end + 1]
- obj = json.loads(candidate)
- if not isinstance(obj, dict):
- raise ValueError("Parsed value is not a JSON object")
- return obj
-
-
-def _extract_code_text(text: str) -> str:
- raw = (text or "").strip()
- if raw.startswith("```"):
- lines = raw.splitlines()
- if lines and lines[0].startswith("```"):
- lines = lines[1:]
- if lines and lines[-1].strip().startswith("```"):
- lines = lines[:-1]
- return "\n".join(lines).rstrip() + "\n"
- return raw
-
-
-def build_learn_code_prompt(
- objective: str,
- note_title: str,
- note_text: str,
- file_path: str,
- current_content: str,
-) -> str:
- return (
- "You are editing one source file in a real codebase.\n"
- "Make minimal, correct changes to satisfy the objective.\n"
- "Do not add explanations.\n"
- "Return ONLY the full updated file content.\n\n"
- f"Objective:\n{objective}\n\n"
- f"Learn note title:\n{note_title}\n\n"
- f"Learn note text:\n{note_text[:4000]}\n\n"
- f"File path:\n{file_path}\n\n"
- "Current file content:\n"
- f"{current_content[:120000]}\n"
- )
-
-
-async def _assistant_write_code_from_learn(
- payload: AssistantLearnPayload,
- note_title: str,
- note_text: str,
-) -> Dict[str, Any]:
- repo_dir = Path(__file__).resolve().parent
- objective = (payload.code_objective or "").strip() or f"Implement changes from learn note: {note_title}"
- requested_files = [str(x).strip() for x in (payload.code_files or []) if str(x).strip()]
- if not requested_files:
- raise HTTPException(status_code=400, detail="code_files is required when write_code=true")
-
- branch_name = (payload.code_branch or "").strip()
- if not branch_name:
- branch_name = f"assistant/learn-{_slugify(note_title)}-{uuid.uuid4().hex[:6]}"
-
- chk = await _run_local_cmd(["git", "rev-parse", "--is-inside-work-tree"], repo_dir)
- if chk["code"] != 0:
- raise HTTPException(status_code=500, detail={"message": "Not a git repository", "stderr": chk["stderr"]})
-
- if not payload.code_dry_run:
- co = await _run_local_cmd(["git", "checkout", "-B", branch_name], repo_dir)
- if co["code"] != 0:
- raise HTTPException(
- status_code=500,
- detail={"message": "Failed to create/switch branch", "stderr": co["stderr"]},
- )
-
- changed_files: List[str] = []
- skipped_files: List[Dict[str, str]] = []
- file_results: List[Dict[str, Any]] = []
-
- for rel in requested_files[:20]:
- rel_path = Path(rel)
- if rel_path.is_absolute() or ".." in rel_path.parts:
- skipped_files.append({"file": rel, "reason": "Only workspace-relative paths are allowed."})
- continue
- abs_path = (repo_dir / rel_path).resolve()
- try:
- abs_path.relative_to(repo_dir)
- except ValueError:
- skipped_files.append({"file": rel, "reason": "Path escapes repository root."})
- continue
- if not abs_path.exists() or not abs_path.is_file():
- skipped_files.append({"file": rel, "reason": "File does not exist."})
- continue
-
- current = abs_path.read_text(encoding="utf-8", errors="replace")
- prompt = build_learn_code_prompt(
- objective=objective,
- note_title=note_title,
- note_text=note_text,
- file_path=rel,
- current_content=current,
- )
- raw = await ollama_generate(prompt)
- updated = _extract_code_text(raw)
- if not updated.strip():
- skipped_files.append({"file": rel, "reason": "Model returned empty content."})
- continue
- changed = updated != current
- file_results.append({"file": rel, "changed": changed})
- if not changed:
- continue
- changed_files.append(rel)
- if not payload.code_dry_run:
- abs_path.write_text(updated, encoding="utf-8")
-
- commit_hash: Optional[str] = None
- commit_detail: Optional[str] = None
- if changed_files and not payload.code_dry_run and payload.code_commit:
- add = await _run_local_cmd(["git", "add", *changed_files], repo_dir)
- if add["code"] != 0:
- raise HTTPException(status_code=500, detail={"message": "Failed to add changed files", "stderr": add["stderr"]})
- msg = f"feat(assistant): apply learn code update {note_title[:50]}"
- cm = await _run_local_cmd(["git", "commit", "-m", msg], repo_dir)
- commit_detail = (cm["stdout"] or cm["stderr"] or "").strip()
- if cm["code"] == 0:
- head = await _run_local_cmd(["git", "rev-parse", "HEAD"], repo_dir)
- if head["code"] == 0:
- commit_hash = head["stdout"].strip()
-
- return {
- "attempted": True,
- "objective": objective,
- "dry_run": payload.code_dry_run,
- "branch_name": branch_name,
- "requested_files": requested_files,
- "changed_files": changed_files,
- "skipped_files": skipped_files,
- "file_results": file_results,
- "commit": commit_hash,
- "commit_detail": commit_detail,
- }
-
-
-def build_assistant_plan_prompt(payload: AssistantPlanPayload, source_docs: List[Dict[str, Any]]) -> str:
- constraints = payload.constraints or []
- constraint_lines = "\n".join(f"- {c}" for c in constraints) if constraints else "- None"
- context_chunks = []
- for d in source_docs:
- src = d.get("_source", {}) or {}
- context_chunks.append(
- "\n".join(
- [
- f"concept_id: {src.get('concept_id', '')}",
- f"source_pk: {src.get('source_pk', '')}",
- f"source_table: {src.get('source_table', '')}",
- f"release_name: {src.get('release_name', '')}",
- f"text: {str(src.get('text') or src.get('description') or src.get('summary') or '')[:600]}",
- ]
- )
- )
- context = "\n\n---\n\n".join(context_chunks) if context_chunks else "No retrieved context."
- return (
- "You are a cautious personal assistant planner. Produce an execution plan only; do not execute anything.\n"
- "Return valid JSON ONLY with this exact shape:\n"
- '{'
- '"plan": ['
- '{"step_id":"S1","title":"...","action_type":"research|draft|ask_user|prepare_data|review","requires_approval":true|false,"notes":"..."}'
- "]"
- "}\n"
- f"Use at most {payload.max_steps} steps.\n"
- "Prefer safe read-only and draft actions first.\n\n"
- f"Task type: {payload.task_type}\n"
- f"Objective: {payload.objective}\n"
- f"Constraints:\n{constraint_lines}\n\n"
- "Retrieved context:\n"
- f"{context}\n"
- )
-
-
-def fallback_plan(payload: AssistantPlanPayload) -> List[AssistantPlanStep]:
- return [
- AssistantPlanStep(
- step_id="S1",
- title="Gather relevant facts and constraints",
- action_type="research",
- requires_approval=False,
- notes="Review messages/concepts and identify required context.",
- ),
- AssistantPlanStep(
- step_id="S2",
- title="Draft a response or action proposal",
- action_type="draft",
- requires_approval=False,
- notes="Produce a concise draft aligned with objective and constraints.",
- ),
- AssistantPlanStep(
- step_id="S3",
- title="Request user confirmation before any external action",
- action_type="ask_user",
- requires_approval=True,
- notes="Do not send or execute changes until approved.",
- ),
- ][: payload.max_steps]
-
-
-def find_plan_step(plan: List[AssistantPlanStep], step_id: str) -> Optional[AssistantPlanStep]:
- for s in plan:
- if s.step_id == step_id:
- return s
- return None
-
-
-def is_high_risk_step(step: AssistantPlanStep) -> bool:
- text = f"{step.title} {step.notes or ''}".lower()
- high_risk_terms = [
- "send",
- "submit",
- "pay",
- "payment",
- "transfer",
- "wire",
- "sign",
- "file",
- "delete",
- "close account",
- "change account",
- ]
- return any(t in text for t in high_risk_terms)
-
-
-def enforce_step_policy(payload: AssistantExecuteStepPayload, step: AssistantPlanStep) -> Optional[str]:
- # Plan-declared approval gate.
- if step.requires_approval and not payload.approved:
- return "Step requires approval but approved=false."
- # Extra hard gate for risky external actions.
- if is_high_risk_step(step):
- if not payload.approved:
- return "High-risk step requires approved=true."
- if not (payload.manual_confirm_token and payload.manual_confirm_token.strip()):
- return "High-risk step requires manual_confirm_token."
- return None
-
-
-async def execute_plan_step(payload: AssistantExecuteStepPayload, step: AssistantPlanStep) -> Dict[str, Any]:
- if step.action_type == "draft":
- prompt = (
- "Draft concise text for this approved planning step.\n"
- f"Task type: {payload.task_type}\n"
- f"Objective: {payload.objective}\n"
- f"Step: {step.title}\n"
- f"Notes: {step.notes or ''}\n"
- "Output only final draft text."
- )
- try:
- text = await ollama_generate(prompt)
- if not text.strip():
- text = f"Draft for step '{step.title}'."
- except Exception:
- text = f"Draft for step '{step.title}'."
- return {"draft": text}
- if step.action_type == "research":
- return {"note": "Research step acknowledged. Use /search or /assistant/draft for grounded retrieval."}
- if step.action_type == "prepare_data":
- return {"note": "Prepare-data step acknowledged.", "checklist": ["Collect required inputs", "Normalize format", "Validate completeness"]}
- if step.action_type == "review":
- return {"note": "Review step requires human review before external action."}
- if step.action_type == "ask_user":
- return {"question": "Please confirm whether to proceed with the next high-impact action."}
- return {"note": "Step recognized but no executor implemented."}
-
-def _gremlin_submit_blocking(q: str, bindings: Dict[str, Any]) -> Any:
- # Create/close client per call so transport loop is owned by this worker thread.
- c = gremlin_client.Client(
- GREMLIN_URL,
- "g",
- message_serializer=GraphSONSerializersV3d0(),
- )
- try:
- return c.submit(q, bindings).all().result()
- finally:
- c.close()
-
-async def gremlin_submit(q: str, bindings: Dict[str, Any]) -> Any:
- return await asyncio.to_thread(_gremlin_submit_blocking, q, bindings)
-
-
-async def run_remote_projector(payload: ProjectionTrigger) -> Dict[str, Any]:
- parts = [
- PROJECTOR_REMOTE_SCRIPT,
- "--release-name", payload.release_name,
- "--targets", payload.targets,
- ]
- if payload.concept_table:
- parts.extend(["--concept-table", payload.concept_table])
- if payload.dry_run:
- parts.append("--dry-run")
-
- command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}"
- ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
- proc = await asyncio.create_subprocess_exec(
- *ssh_args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- try:
- stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
- except asyncio.TimeoutError:
- proc.kill()
- await proc.wait()
- raise HTTPException(status_code=504, detail="Projector execution timed out")
-
- out = stdout.decode("utf-8", errors="replace")
- err = stderr.decode("utf-8", errors="replace")
- spark_read_done = "[STEP] spark_read_done" in out
- projection_done = "[STEP] projection_done" in out
- result = {
- "host": PROJECTOR_SSH_HOST,
- "remote_dir": PROJECTOR_REMOTE_DIR,
- "exit_code": proc.returncode,
- "spark_read_done": spark_read_done,
- "projection_done": projection_done,
- "stdout_tail": _tail(out),
- "stderr_tail": _tail(err),
- }
- if proc.returncode != 0:
- raise HTTPException(status_code=502, detail=result)
- return result
-
-
-def _b64(s: str) -> str:
- return base64.b64encode(s.encode("utf-8")).decode("ascii")
-
-
-async def run_remote_ingest_message(payload: MessageIngestPayload) -> Dict[str, Any]:
- sent_at = payload.sent_at or ""
- parts = [
- INGEST_MESSAGE_REMOTE_SCRIPT,
- payload.table,
- payload.thread_id,
- payload.message_id,
- payload.sender,
- payload.channel,
- sent_at,
- _b64(payload.body),
- _b64(json.dumps(payload.metadata, ensure_ascii=False)),
- ]
- command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}"
- ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
- proc = await asyncio.create_subprocess_exec(
- *ssh_args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- try:
- stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
- except asyncio.TimeoutError:
- proc.kill()
- await proc.wait()
- raise HTTPException(status_code=504, detail="Message ingest execution timed out")
-
- out = stdout.decode("utf-8", errors="replace")
- err = stderr.decode("utf-8", errors="replace")
- result = {
- "host": PROJECTOR_SSH_HOST,
- "remote_dir": PROJECTOR_REMOTE_DIR,
- "exit_code": proc.returncode,
- "stdout_tail": _tail(out),
- "stderr_tail": _tail(err),
- }
- if proc.returncode != 0:
- raise HTTPException(status_code=502, detail=result)
- return result
-
-
-async def run_remote_ingest_messages_batch(payload: MessageIngestBatchPayload) -> Dict[str, Any]:
- rows = []
- for m in payload.messages:
- rows.append(
- {
- "thread_id": m.thread_id,
- "message_id": m.message_id,
- "sender": m.sender,
- "channel": m.channel,
- "sent_at": m.sent_at,
- "body": m.body,
- "metadata": m.metadata,
- }
- )
- if not rows:
- return {
- "host": PROJECTOR_SSH_HOST,
- "remote_dir": PROJECTOR_REMOTE_DIR,
- "exit_code": 0,
- "rows": 0,
- "stdout_tail": "[INFO] No rows to ingest",
- "stderr_tail": "",
- }
-
- local_tmp = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False, encoding="utf-8")
- remote_tmp = f"{PROJECTOR_REMOTE_DIR}/.ingest_messages_{uuid.uuid4().hex}.json"
- try:
- json.dump(rows, local_tmp, ensure_ascii=False)
- local_tmp.flush()
- local_tmp.close()
-
- scp_target = f"{PROJECTOR_SSH_HOST}:{remote_tmp}"
- scp_args = [PROJECTOR_SCP_BIN, *shlex.split(PROJECTOR_SCP_OPTS), local_tmp.name, scp_target]
- scp_proc = await asyncio.create_subprocess_exec(
- *scp_args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- try:
- scp_stdout, scp_stderr = await asyncio.wait_for(scp_proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
- except asyncio.TimeoutError:
- scp_proc.kill()
- await scp_proc.wait()
- raise HTTPException(status_code=504, detail="Batch payload upload timed out")
- if scp_proc.returncode != 0:
- raise HTTPException(
- status_code=502,
- detail={
- "host": PROJECTOR_SSH_HOST,
- "remote_dir": PROJECTOR_REMOTE_DIR,
- "exit_code": scp_proc.returncode,
- "stdout_tail": _tail(scp_stdout.decode("utf-8", errors="replace")),
- "stderr_tail": _tail(scp_stderr.decode("utf-8", errors="replace")),
- },
- )
-
- payload_arg = f"@{remote_tmp}"
- parts = [
- INGEST_MESSAGES_BATCH_REMOTE_SCRIPT,
- payload.table,
- payload.dedupe_mode,
- payload_arg,
- ]
- batch_cmd = " ".join(shlex.quote(p) for p in parts)
- command = (
- f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && "
- f"({batch_cmd}); rc=$?; rm -f {shlex.quote(remote_tmp)}; exit $rc"
- )
- ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
- proc = await asyncio.create_subprocess_exec(
- *ssh_args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- try:
- stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
- except asyncio.TimeoutError:
- proc.kill()
- await proc.wait()
- raise HTTPException(status_code=504, detail="Batch message ingest execution timed out")
- finally:
- try:
- os.unlink(local_tmp.name)
- except Exception:
- pass
-
- out = stdout.decode("utf-8", errors="replace")
- err = stderr.decode("utf-8", errors="replace")
- result = {
- "host": PROJECTOR_SSH_HOST,
- "remote_dir": PROJECTOR_REMOTE_DIR,
- "exit_code": proc.returncode,
- "rows": len(rows),
- "stdout_tail": _tail(out),
- "stderr_tail": _tail(err),
- }
- if proc.returncode != 0:
- raise HTTPException(status_code=502, detail=result)
- return result
-
-
-async def run_remote_assistant_feedback(
- feedback_id: str,
- payload: AssistantFeedbackPayload,
-) -> Dict[str, Any]:
- confidence = payload.confidence if payload.confidence is not None else 0.0
- parts = [
- ASSISTANT_FEEDBACK_REMOTE_SCRIPT,
- feedback_id,
- now_iso(),
- payload.outcome,
- payload.task_type,
- payload.release_name or "",
- f"{confidence}",
- "true" if payload.needs_review else "false",
- _b64(payload.goal or ""),
- _b64(payload.draft),
- _b64(payload.final_text or ""),
- _b64(json.dumps([s.model_dump() for s in payload.sources], ensure_ascii=False)),
- _b64(payload.notes or ""),
- ]
- command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}"
- ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
- proc = await asyncio.create_subprocess_exec(
- *ssh_args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- try:
- stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
- except asyncio.TimeoutError:
- proc.kill()
- await proc.wait()
- raise HTTPException(status_code=504, detail="Assistant feedback execution timed out")
-
- out = stdout.decode("utf-8", errors="replace")
- err = stderr.decode("utf-8", errors="replace")
- result = {
- "host": PROJECTOR_SSH_HOST,
- "remote_dir": PROJECTOR_REMOTE_DIR,
- "exit_code": proc.returncode,
- "stdout_tail": _tail(out),
- "stderr_tail": _tail(err),
- }
- if proc.returncode != 0:
- raise HTTPException(status_code=502, detail=result)
- return result
-
-
-def _extract_json_array_from_text(text: str) -> List[Dict[str, Any]]:
- start = text.find("[")
- end = text.rfind("]")
- if start == -1 or end == -1 or end < start:
- raise ValueError("No JSON array found in output")
- candidate = text[start : end + 1]
- obj = json.loads(candidate)
- if not isinstance(obj, list):
- raise ValueError("Parsed value is not a JSON array")
- out: List[Dict[str, Any]] = []
- for item in obj:
- if isinstance(item, dict):
- out.append(item)
- return out
-
-
-async def run_remote_query_assistant_feedback(
- outcome: Optional[str],
- task_type: Optional[str],
- release_name: Optional[str],
- limit: int,
-) -> Dict[str, Any]:
- parts = [
- ASSISTANT_FEEDBACK_QUERY_REMOTE_SCRIPT,
- outcome or "",
- task_type or "",
- release_name or "",
- str(limit),
- ]
- command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}"
- ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
- proc = await asyncio.create_subprocess_exec(
- *ssh_args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- try:
- stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
- except asyncio.TimeoutError:
- proc.kill()
- await proc.wait()
- raise HTTPException(status_code=504, detail="Assistant feedback query timed out")
-
- out = stdout.decode("utf-8", errors="replace")
- err = stderr.decode("utf-8", errors="replace")
- if proc.returncode != 0:
- raise HTTPException(
- status_code=502,
- detail={
- "host": PROJECTOR_SSH_HOST,
- "remote_dir": PROJECTOR_REMOTE_DIR,
- "exit_code": proc.returncode,
- "stdout_tail": _tail(out),
- "stderr_tail": _tail(err),
- },
- )
- try:
- rows = _extract_json_array_from_text(out)
- except Exception as e:
- raise HTTPException(
- status_code=502,
- detail={
- "message": f"Unable to parse feedback query output: {e}",
- "stdout_tail": _tail(out),
- "stderr_tail": _tail(err),
- },
- )
- return {
- "host": PROJECTOR_SSH_HOST,
- "remote_dir": PROJECTOR_REMOTE_DIR,
- "rows": rows,
- }
-
-
-async def run_remote_query_assistant_metrics(
- task_type: Optional[str],
- release_name: Optional[str],
- outcome: Optional[str],
- group_by: str,
- limit: int,
-) -> Dict[str, Any]:
- parts = [
- ASSISTANT_METRICS_QUERY_REMOTE_SCRIPT,
- task_type or "",
- release_name or "",
- outcome or "",
- group_by,
- str(limit),
- ]
- command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}"
- ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
- proc = await asyncio.create_subprocess_exec(
- *ssh_args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- try:
- stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
- except asyncio.TimeoutError:
- proc.kill()
- await proc.wait()
- raise HTTPException(status_code=504, detail="Assistant metrics query timed out")
-
- out = stdout.decode("utf-8", errors="replace")
- err = stderr.decode("utf-8", errors="replace")
- if proc.returncode != 0:
- raise HTTPException(
- status_code=502,
- detail={
- "host": PROJECTOR_SSH_HOST,
- "remote_dir": PROJECTOR_REMOTE_DIR,
- "exit_code": proc.returncode,
- "stdout_tail": _tail(out),
- "stderr_tail": _tail(err),
- },
- )
- try:
- rows = _extract_json_array_from_text(out)
- except Exception as e:
- raise HTTPException(
- status_code=502,
- detail={
- "message": f"Unable to parse metrics query output: {e}",
- "stdout_tail": _tail(out),
- "stderr_tail": _tail(err),
- },
- )
- return {
- "host": PROJECTOR_SSH_HOST,
- "remote_dir": PROJECTOR_REMOTE_DIR,
- "rows": rows,
- }
-
-
-async def run_remote_assistant_action(
- action_id: str,
- payload: AssistantExecuteStepPayload,
- step: AssistantPlanStep,
- status: str,
- output_json: Dict[str, Any],
- error_text: Optional[str],
-) -> Dict[str, Any]:
- parts = [
- ASSISTANT_ACTION_REMOTE_SCRIPT,
- action_id,
- now_iso(),
- payload.task_type,
- payload.release_name or "",
- _b64(payload.objective),
- step.step_id,
- _b64(step.title),
- step.action_type,
- "true" if step.requires_approval else "false",
- "true" if payload.approved else "false",
- status,
- _b64(json.dumps(output_json, ensure_ascii=False)),
- _b64(error_text or ""),
- ]
- command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}"
- ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
- proc = await asyncio.create_subprocess_exec(
- *ssh_args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- try:
- stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
- except asyncio.TimeoutError:
- proc.kill()
- await proc.wait()
- raise HTTPException(status_code=504, detail="Assistant action logging timed out")
-
- out = stdout.decode("utf-8", errors="replace")
- err = stderr.decode("utf-8", errors="replace")
- result = {
- "host": PROJECTOR_SSH_HOST,
- "remote_dir": PROJECTOR_REMOTE_DIR,
- "exit_code": proc.returncode,
- "stdout_tail": _tail(out),
- "stderr_tail": _tail(err),
- }
- if proc.returncode != 0:
- raise HTTPException(status_code=502, detail=result)
- return result
-
-
-async def run_remote_query_assistant_actions(
- status: Optional[str],
- task_type: Optional[str],
- release_name: Optional[str],
- step_id: Optional[str],
- action_type: Optional[str],
- limit: int,
-) -> Dict[str, Any]:
- parts = [
- ASSISTANT_ACTIONS_QUERY_REMOTE_SCRIPT,
- status or "",
- task_type or "",
- release_name or "",
- step_id or "",
- action_type or "",
- str(limit),
- ]
- command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}"
- ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
- proc = await asyncio.create_subprocess_exec(
- *ssh_args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- try:
- stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
- except asyncio.TimeoutError:
- proc.kill()
- await proc.wait()
- raise HTTPException(status_code=504, detail="Assistant actions query timed out")
-
- out = stdout.decode("utf-8", errors="replace")
- err = stderr.decode("utf-8", errors="replace")
- if proc.returncode != 0:
- raise HTTPException(
- status_code=502,
- detail={
- "host": PROJECTOR_SSH_HOST,
- "remote_dir": PROJECTOR_REMOTE_DIR,
- "exit_code": proc.returncode,
- "stdout_tail": _tail(out),
- "stderr_tail": _tail(err),
- },
- )
- try:
- rows = _extract_json_array_from_text(out)
- except Exception as e:
- raise HTTPException(
- status_code=502,
- detail={
- "message": f"Unable to parse actions query output: {e}",
- "stdout_tail": _tail(out),
- "stderr_tail": _tail(err),
- },
- )
- return {
- "host": PROJECTOR_SSH_HOST,
- "remote_dir": PROJECTOR_REMOTE_DIR,
- "rows": rows,
- }
-
-
-async def run_remote_record_assistant_proposals(
- proposal_set_id: str,
- created_at_utc: str,
- objective: str,
- release_name: Optional[str],
- summary: str,
- signals: Dict[str, Any],
- proposals: List[Dict[str, Any]],
-) -> Dict[str, Any]:
- parts = [
- ASSISTANT_PROPOSALS_REMOTE_SCRIPT,
- proposal_set_id,
- created_at_utc,
- _b64(objective or ""),
- release_name or "",
- _b64(summary or ""),
- _b64(json.dumps(signals or {}, ensure_ascii=False)),
- _b64(json.dumps(proposals or [], ensure_ascii=False)),
- ]
- command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}"
- ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
- proc = await asyncio.create_subprocess_exec(
- *ssh_args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- try:
- stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
- except asyncio.TimeoutError:
- proc.kill()
- await proc.wait()
- raise HTTPException(status_code=504, detail="Assistant proposals logging timed out")
-
- out = stdout.decode("utf-8", errors="replace")
- err = stderr.decode("utf-8", errors="replace")
- result = {
- "host": PROJECTOR_SSH_HOST,
- "remote_dir": PROJECTOR_REMOTE_DIR,
- "exit_code": proc.returncode,
- "stdout_tail": _tail(out),
- "stderr_tail": _tail(err),
- }
- if proc.returncode != 0:
- raise HTTPException(status_code=502, detail=result)
- return result
-
-
-async def run_remote_query_assistant_proposals(
- release_name: Optional[str],
- proposal_set_id: Optional[str],
- limit: int,
-) -> Dict[str, Any]:
- parts = [
- ASSISTANT_PROPOSALS_QUERY_REMOTE_SCRIPT,
- release_name or "",
- proposal_set_id or "",
- str(limit),
- ]
- command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}"
- ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
- proc = await asyncio.create_subprocess_exec(
- *ssh_args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- try:
- stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
- except asyncio.TimeoutError:
- proc.kill()
- await proc.wait()
- raise HTTPException(status_code=504, detail="Assistant proposals query timed out")
-
- out = stdout.decode("utf-8", errors="replace")
- err = stderr.decode("utf-8", errors="replace")
- if proc.returncode != 0:
- raise HTTPException(
- status_code=502,
- detail={
- "host": PROJECTOR_SSH_HOST,
- "remote_dir": PROJECTOR_REMOTE_DIR,
- "exit_code": proc.returncode,
- "stdout_tail": _tail(out),
- "stderr_tail": _tail(err),
- },
- )
- try:
- rows = _extract_json_array_from_text(out)
- except Exception as e:
- raise HTTPException(
- status_code=502,
- detail={
- "message": f"Unable to parse proposals query output: {e}",
- "stdout_tail": _tail(out),
- "stderr_tail": _tail(err),
- },
- )
- return {
- "host": PROJECTOR_SSH_HOST,
- "remote_dir": PROJECTOR_REMOTE_DIR,
- "rows": rows,
- }
-
-
-async def run_remote_record_run(
- run_id: str,
- run_type: str,
- status: str,
- started_at_utc: str,
- finished_at_utc: str,
- actor: str,
- input_json: Dict[str, Any],
- output_json: Optional[Dict[str, Any]],
- error_text: Optional[str],
-) -> None:
- parts = [
- RUNS_REMOTE_SCRIPT,
- run_id,
- run_type,
- status,
- started_at_utc,
- finished_at_utc,
- actor,
- _b64(json.dumps(input_json, ensure_ascii=False)),
- _b64(json.dumps(output_json, ensure_ascii=False) if output_json is not None else ""),
- _b64(error_text or ""),
- ]
- command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}"
- ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
- proc = await asyncio.create_subprocess_exec(
- *ssh_args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
- if proc.returncode != 0:
- raise HTTPException(
- status_code=502,
- detail={
- "message": "Failed to record run in Iceberg",
- "host": PROJECTOR_SSH_HOST,
- "exit_code": proc.returncode,
- "stdout_tail": _tail(stdout.decode("utf-8", errors="replace")),
- "stderr_tail": _tail(stderr.decode("utf-8", errors="replace")),
- },
- )
-
-
-async def run_remote_record_event(
- run_id: str,
- event_type: str,
- detail_json: Dict[str, Any],
-) -> None:
- parts = [
- RUN_EVENTS_REMOTE_SCRIPT,
- run_id,
- event_type,
- now_iso(),
- _b64(json.dumps(detail_json, ensure_ascii=False)),
- ]
- command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}"
- ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
- proc = await asyncio.create_subprocess_exec(
- *ssh_args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
- if proc.returncode != 0:
- raise HTTPException(
- status_code=502,
- detail={
- "message": "Failed to record run event in Iceberg",
- "host": PROJECTOR_SSH_HOST,
- "exit_code": proc.returncode,
- "stdout_tail": _tail(stdout.decode("utf-8", errors="replace")),
- "stderr_tail": _tail(stderr.decode("utf-8", errors="replace")),
- },
- )
-
-
-async def record_event_best_effort(run_id: str, event_type: str, detail_json: Dict[str, Any]) -> None:
- try:
- await run_remote_record_event(run_id, event_type, detail_json)
- except Exception as e:
- # Event tracing must never break the primary projection flow.
- print(f"[WARN] run event logging failed: run_id={run_id} event={event_type} error={e}")
-
-
-# --------- routes ---------
-@app.on_event("startup")
-async def startup():
- await es_ensure_index()
-
-
-@app.get("/meta")
-async def meta():
- return {
- "app_name": APP_NAME,
- "version": APP_VERSION,
- "started_at_utc": APP_STARTED_AT_UTC,
- "features": {
- "assistant_chat": True,
- "assistant_learn": True,
- "assistant_tasks_ai": True,
- "assistant_ui": True,
- },
- }
-
-
-@app.get("/ui", include_in_schema=False)
-async def assistant_ui():
- index_html = UI_DIR / "index.html"
- if not index_html.exists():
- raise HTTPException(status_code=404, detail="UI not found")
- html = index_html.read_text(encoding="utf-8")
- css_path = UI_ASSETS_DIR / "styles.css"
- js_path = UI_ASSETS_DIR / "app.js"
- css = css_path.read_text(encoding="utf-8") if css_path.exists() else ""
- js = js_path.read_text(encoding="utf-8") if js_path.exists() else ""
- html = html.replace(
- '',
- f"",
- )
- html = html.replace(
- '',
- f"",
- )
- return HTMLResponse(content=html)
-
-
-@app.post("/concepts", response_model=ConceptOut)
-async def create_concept(payload: ConceptCreate):
- created = now_iso()
- updated = created
- fingerprint = make_fingerprint(payload.canonical_name, payload.kind, payload.external_ids)
-
- # Store long content in IPFS (version 1)
- content_doc = {
- "canonical_name": payload.canonical_name,
- "kind": payload.kind,
- "aliases": payload.aliases,
- "external_ids": payload.external_ids,
- "tags": payload.tags,
- "description": payload.description,
- "created_at": created,
- }
- latest_cid = await ipfs_add_json(content_doc)
-
- # LLM extras (optional)
- summary = None
- embedding: List[float] = []
- base_text = payload.description or payload.canonical_name
- if base_text.strip():
- try:
- summary = await ollama_summary(base_text)
- except Exception:
- summary = None
- try:
- embedding = await ollama_embed(base_text)
- except Exception:
- embedding = []
-
- # Create vertex in JanusGraph (Concept)
- # Uses a stable concept_id as your canonical handle
- concept_id = (await gremlin_submit(
- """
- import java.util.UUID
- def id = UUID.randomUUID().toString()
- g.addV('Concept')
- .property('concept_id', id)
- .property('canonical_name', canonical_name)
- .property('kind', kind)
- .property('aliases', aliases_json)
- .property('external_ids', external_ids_json)
- .property('tags', tags_json)
- .property('fingerprint', fingerprint)
- .property('latest_cid', latest_cid)
- .property('summary', summary)
- .property('created_at', created_at)
- .property('updated_at', updated_at)
- .values('concept_id')
- """,
- {
- "canonical_name": payload.canonical_name,
- "kind": payload.kind or "",
- "aliases_json": json.dumps(payload.aliases, ensure_ascii=False),
- "external_ids_json": json.dumps(payload.external_ids, ensure_ascii=False),
- "tags_json": json.dumps(payload.tags, ensure_ascii=False),
- "fingerprint": fingerprint,
- "latest_cid": latest_cid,
- "summary": summary or "",
- "created_at": created,
- "updated_at": updated,
- },
- ))[0]
-
- out = {
- "concept_id": concept_id,
- "canonical_name": payload.canonical_name,
- "kind": payload.kind,
- "aliases": payload.aliases,
- "external_ids": payload.external_ids,
- "tags": payload.tags,
- "latest_cid": latest_cid,
- "summary": summary,
- "created_at": created,
- "updated_at": updated,
- }
-
- # Index in Elasticsearch
- doc = dict(out)
- doc["fingerprint"] = fingerprint
- if embedding:
- doc["embedding"] = embedding
- await es_index(doc)
-
- return out
-
-@app.get("/concepts/{concept_id}", response_model=ConceptOut)
-async def get_concept(concept_id: str):
- rows = await gremlin_submit(
- """
- g.V().hasLabel('Concept').has('concept_id', concept_id)
- .project('concept_id','canonical_name','kind','aliases','external_ids','tags','latest_cid','summary','created_at','updated_at')
- .by(values('concept_id'))
- .by(values('canonical_name'))
- .by(values('kind'))
- .by(values('aliases'))
- .by(values('external_ids'))
- .by(values('tags'))
- .by(values('latest_cid'))
- .by(values('summary'))
- .by(values('created_at'))
- .by(values('updated_at'))
- """,
- {"concept_id": concept_id},
- )
- if not rows:
- raise HTTPException(status_code=404, detail="Concept not found")
-
- r = rows[0]
- external_ids = {}
- try:
- external_ids = json.loads(r.get("external_ids") or "{}")
- except Exception:
- external_ids = {}
-
- aliases = []
- try:
- aliases = json.loads(r.get("aliases") or "[]")
- if not isinstance(aliases, list):
- aliases = []
- except Exception:
- aliases = []
-
- tags = []
- try:
- tags = json.loads(r.get("tags") or "[]")
- if not isinstance(tags, list):
- tags = []
- except Exception:
- tags = []
-
- return {
- "concept_id": r.get("concept_id"),
- "canonical_name": r.get("canonical_name"),
- "kind": (r.get("kind") or None),
- "aliases": aliases,
- "external_ids": external_ids,
- "tags": tags,
- "latest_cid": (r.get("latest_cid") or None),
- "summary": (r.get("summary") or None),
- "created_at": r.get("created_at"),
- "updated_at": r.get("updated_at"),
- }
-
-@app.get("/search")
-async def search(q: str, size: int = 10):
- results = await es_search(q, size=size)
- return {"q": q, "results": results}
-
-
-@app.get("/assistant/inbox")
-async def assistant_inbox(
- release_name: Optional[str] = None,
- q: Optional[str] = None,
- limit: int = 20,
- x_admin_api_key: Optional[str] = Header(default=None),
-):
- check_admin_api_key(x_admin_api_key)
- bounded_limit = max(1, min(limit, 200))
- hits = await es_recent_messages(
- size=bounded_limit,
- release_name=release_name,
- q=q,
- )
- rows: List[Dict[str, Any]] = []
- for h in hits:
- src = h.get("_source", {}) or {}
- rows.append(
- {
- "concept_id": src.get("concept_id"),
- "source_pk": src.get("source_pk"),
- "source_table": src.get("source_table"),
- "release_name": src.get("release_name"),
- "concept_type": src.get("concept_type") or src.get("kind"),
- "display_name": src.get("display_name") or src.get("canonical_name"),
- "text": src.get("text"),
- "summary": src.get("summary"),
- "description": src.get("description"),
- "updated_at": src.get("updated_at"),
- "score": float(h.get("_score")) if h.get("_score") is not None else None,
- }
- )
- return {
- "count": len(rows),
- "filters": {
- "release_name": release_name,
- "q": q,
- "limit": bounded_limit,
- },
- "rows": rows,
- }
-
-
-@app.get("/assistant/tasks")
-async def assistant_tasks(
- release_name: Optional[str] = None,
- q: Optional[str] = None,
- only_pending: bool = True,
- use_ai: bool = True,
- limit: int = 50,
- x_admin_api_key: Optional[str] = Header(default=None),
-):
- check_admin_api_key(x_admin_api_key)
- bounded_limit = max(1, min(limit, 500))
- # Pull more messages than final task limit because each message can yield 0..N tasks.
- hits = await es_recent_messages(
- size=min(1000, max(100, bounded_limit * 4)),
- release_name=release_name,
- q=q,
- )
-
- rows: List[Dict[str, Any]] = []
- seen_task_ids: set[str] = set()
- ai_cache_hits = 0
- ai_calls = 0
- for h in hits:
- src = h.get("_source", {}) or {}
- extracted: List[Dict[str, Any]] = []
- if use_ai:
- cache_key = _task_ai_cache_key(src)
- cached = _task_ai_cache_get(cache_key)
- if cached is not None:
- ai_cache_hits += 1
- extracted = cached
- else:
- ai_calls += 1
- try:
- extracted = await extract_pending_tasks_from_source_ai(src)
- except Exception as e:
- print(f"[WARN] assistant_tasks ai extraction failed: {e}")
- extracted = []
- _task_ai_cache_set(cache_key, extracted)
- if not extracted:
- extracted = extract_pending_tasks_from_source(src)
- for t in extracted:
- if only_pending and t.get("status") != "pending":
- continue
- task_id = str(t.get("task_id") or "")
- if not task_id or task_id in seen_task_ids:
- continue
- seen_task_ids.add(task_id)
- rows.append(t)
- if len(rows) >= bounded_limit:
- break
- if len(rows) >= bounded_limit:
- break
-
- return {
- "count": len(rows),
- "filters": {
- "release_name": release_name,
- "q": q,
- "only_pending": only_pending,
- "use_ai": use_ai,
- "limit": bounded_limit,
- "ai_cache_ttl_sec": TASK_AI_CACHE_TTL_SEC,
- },
- "stats": {
- "messages_scanned": len(hits),
- "ai_cache_hits": ai_cache_hits,
- "ai_calls": ai_calls,
- "ai_cache_size": len(TASK_AI_CACHE),
- },
- "rows": rows,
- }
-
-
-@app.post("/assistant/learn")
-async def assistant_learn(payload: AssistantLearnPayload, x_admin_api_key: Optional[str] = Header(default=None)):
- check_admin_api_key(x_admin_api_key)
- now = now_iso()
- note_id = "note-" + uuid.uuid4().hex[:16]
- title = (payload.title or "").strip()
- if not title:
- title = payload.text.strip().splitlines()[0][:80]
- summary = payload.text.strip()[:280]
-
- doc = {
- "concept_id": f"note:{note_id}",
- "concept_type": "note",
- "display_name": title,
- "description": summary,
- "text": payload.text,
- "source_table": "assistant.learn",
- "source_pk": note_id,
- "release_name": payload.release_name or "",
- "ref_hash": "",
- "attributes_json": json.dumps(payload.metadata or {}, ensure_ascii=False, sort_keys=True),
- "canonical_name": title,
- "kind": "note",
- "aliases": [],
- "external_ids": {},
- "tags": payload.tags or [],
- "latest_cid": None,
- "summary": summary,
- "created_at": now,
- "updated_at": now,
- "fingerprint": make_fingerprint(title, "note", {}),
- }
- await es_index(doc)
- response: Dict[str, Any] = {
- "stored": True,
- "concept_id": doc["concept_id"],
- "release_name": payload.release_name,
- "title": title,
- "tags": payload.tags,
- }
- if payload.write_code:
- response["code_write"] = await _assistant_write_code_from_learn(
- payload=payload,
- note_title=title,
- note_text=payload.text,
- )
- return response
-
-
-@app.post("/assistant/chat", response_model=AssistantChatResponse)
-async def assistant_chat(payload: AssistantChatPayload, x_admin_api_key: Optional[str] = Header(default=None)):
- check_admin_api_key(x_admin_api_key)
- session_id = (payload.session_id or str(uuid.uuid4())).strip()
-
- history: List[Dict[str, str]] = []
- if payload.history:
- history = [{"role": h.role, "content": h.content} for h in payload.history if h.content.strip()]
- else:
- history = ASSISTANT_CHAT_SESSIONS.get(session_id, [])
-
- hits: List[Dict[str, Any]] = []
- try:
- hits = await es_search_hits(
- q=payload.message,
- size=payload.max_sources,
- release_name=payload.release_name,
- )
- except Exception as e:
- print(f"[WARN] assistant_chat retrieval failed: {e}")
- hits = []
- if not hits and payload.release_name:
- try:
- hits = await es_recent_by_release(payload.release_name, size=payload.max_sources)
- except Exception as e:
- print(f"[WARN] assistant_chat release fallback failed: {e}")
- if not hits:
- try:
- hits = await es_recent_messages(size=payload.max_sources, release_name=payload.release_name, q=None)
- except Exception as e:
- print(f"[WARN] assistant_chat inbox fallback failed: {e}")
-
- prompt = build_chat_prompt(
- user_message=payload.message,
- history=history,
- source_docs=hits,
- release_name=payload.release_name,
- )
- try:
- answer = await ollama_generate(prompt)
- if not answer.strip():
- answer = "I don't have enough context to answer confidently. Can you share one more detail?"
- except Exception as e:
- print(f"[WARN] assistant_chat generation failed: {e}")
- answer = "I could not generate a response right now. Please retry."
-
- sources: List[AssistantDraftSource] = []
- for h in hits:
- src = h.get("_source", {}) or {}
- sources.append(
- AssistantDraftSource(
- concept_id=str(src.get("concept_id") or ""),
- source_pk=src.get("source_pk"),
- source_table=src.get("source_table"),
- release_name=src.get("release_name"),
- score=float(h.get("_score")) if h.get("_score") is not None else None,
- )
- )
- source_count = len([s for s in sources if s.concept_id])
- confidence = 0.35
- if source_count >= 5:
- confidence = 0.85
- elif source_count >= 3:
- confidence = 0.75
- elif source_count >= 1:
- confidence = 0.6
- if len(answer.strip()) < 30:
- confidence = min(confidence, 0.45)
-
- _append_chat_turn(session_id, "user", payload.message)
- _append_chat_turn(session_id, "assistant", answer)
-
- return AssistantChatResponse(
- session_id=session_id,
- answer=answer,
- sources=[s for s in sources if s.concept_id],
- confidence=confidence,
- release_name=payload.release_name,
- )
-
-
-@app.post("/assistant/self-improve", response_model=AssistantSelfImproveResponse)
-async def assistant_self_improve(
- payload: AssistantSelfImprovePayload,
- x_admin_api_key: Optional[str] = Header(default=None),
-):
- check_admin_api_key(x_admin_api_key)
-
- feedback_rows: List[Dict[str, Any]] = []
- if payload.include_edited_feedback:
- try:
- res = await run_remote_query_assistant_feedback(
- outcome="edited",
- task_type=None,
- release_name=payload.release_name,
- limit=payload.feedback_limit,
- )
- feedback_rows.extend(res.get("rows", []))
- except Exception as e:
- print(f"[WARN] self-improve edited feedback query failed: {e}")
- if payload.include_rejected_feedback:
- try:
- res = await run_remote_query_assistant_feedback(
- outcome="rejected",
- task_type=None,
- release_name=payload.release_name,
- limit=payload.feedback_limit,
- )
- feedback_rows.extend(res.get("rows", []))
- except Exception as e:
- print(f"[WARN] self-improve rejected feedback query failed: {e}")
-
- action_rows: List[Dict[str, Any]] = []
- if payload.include_blocked_actions:
- try:
- res = await run_remote_query_assistant_actions(
- status="blocked",
- task_type=None,
- release_name=payload.release_name,
- step_id=None,
- action_type=None,
- limit=payload.action_limit,
- )
- action_rows.extend(res.get("rows", []))
- except Exception as e:
- print(f"[WARN] self-improve blocked actions query failed: {e}")
-
- # De-duplicate simple repeats.
- seen_feedback: set[str] = set()
- dedup_feedback: List[Dict[str, Any]] = []
- for r in feedback_rows:
- key = str(r.get("feedback_id") or "") or hashlib.sha256(
- json.dumps(r, ensure_ascii=False, sort_keys=True).encode("utf-8")
- ).hexdigest()
- if key in seen_feedback:
- continue
- seen_feedback.add(key)
- dedup_feedback.append(r)
- feedback_rows = dedup_feedback[: payload.feedback_limit]
-
- seen_actions: set[str] = set()
- dedup_actions: List[Dict[str, Any]] = []
- for r in action_rows:
- key = str(r.get("action_id") or "") or hashlib.sha256(
- json.dumps(r, ensure_ascii=False, sort_keys=True).encode("utf-8")
- ).hexdigest()
- if key in seen_actions:
- continue
- seen_actions.add(key)
- dedup_actions.append(r)
- action_rows = dedup_actions[: payload.action_limit]
-
- prompt = build_self_improve_prompt(payload, feedback_rows, action_rows)
- proposals: List[AssistantSelfImproveProposal] = []
- summary = "Generated from fallback proposal set."
- try:
- raw = await ollama_generate(prompt)
- parsed = _extract_json_object_from_text(raw)
- summary = str(parsed.get("summary") or "").strip() or summary
- raw_proposals = parsed.get("proposals")
- if isinstance(raw_proposals, list):
- for idx, p in enumerate(raw_proposals[: payload.max_proposals]):
- if not isinstance(p, dict):
- continue
- risk_raw = str(p.get("risk") or "medium").strip().lower()
- risk: Literal["low", "medium", "high"] = "medium"
- if risk_raw in ("low", "medium", "high"):
- risk = risk_raw # type: ignore[assignment]
- files = [str(x) for x in (p.get("files") or []) if str(x).strip()]
- tests = [str(x) for x in (p.get("tests") or []) if str(x).strip()]
- proposals.append(
- AssistantSelfImproveProposal(
- proposal_id=str(p.get("proposal_id") or f"P{idx+1}"),
- title=str(p.get("title") or "").strip() or f"Proposal {idx+1}",
- problem=str(p.get("problem") or "").strip() or "Unspecified problem.",
- change=str(p.get("change") or "").strip() or "No change details provided.",
- files=files,
- risk=risk,
- tests=tests,
- auto_apply_safe=bool(p.get("auto_apply_safe", False)),
- )
- )
- except Exception as e:
- print(f"[WARN] self-improve generation failed: {e}")
-
- if not proposals:
- proposals = fallback_self_improve_proposals()[: payload.max_proposals]
-
- created_at_utc = now_iso()
- proposal_set_id = str(uuid.uuid4())
- signals = {
- "feedback_rows": len(feedback_rows),
- "blocked_action_rows": len(action_rows),
- }
- proposal_dicts = [p.model_dump() for p in proposals[: payload.max_proposals]]
- try:
- await run_remote_record_assistant_proposals(
- proposal_set_id=proposal_set_id,
- created_at_utc=created_at_utc,
- objective=payload.objective,
- release_name=payload.release_name,
- summary=summary,
- signals=signals,
- proposals=proposal_dicts,
- )
- except Exception as e:
- print(f"[WARN] self-improve proposal persistence failed: {e}")
-
- apply_blocked = payload.apply
- apply_block_reason: Optional[str] = None
- if payload.apply:
- apply_block_reason = "Auto-apply is not enabled yet. Use proposals as PR/patch inputs."
-
- return AssistantSelfImproveResponse(
- objective=payload.objective,
- release_name=payload.release_name,
- proposal_set_id=proposal_set_id,
- created_at_utc=created_at_utc,
- summary=summary,
- proposals=proposals[: payload.max_proposals],
- signals=signals,
- apply_blocked=apply_blocked,
- apply_block_reason=apply_block_reason,
- )
-
-
-@app.get("/assistant/self-improve/history")
-async def assistant_self_improve_history(
- release_name: Optional[str] = None,
- proposal_set_id: Optional[str] = None,
- limit: int = 200,
- x_admin_api_key: Optional[str] = Header(default=None),
-):
- check_admin_api_key(x_admin_api_key)
- bounded_limit = max(1, min(limit, 2000))
- result = await run_remote_query_assistant_proposals(
- release_name=release_name,
- proposal_set_id=proposal_set_id,
- limit=bounded_limit,
- )
- rows = result["rows"]
-
- grouped: Dict[str, Dict[str, Any]] = {}
- for r in rows:
- set_id = str(r.get("proposal_set_id") or "")
- if not set_id:
- continue
- g = grouped.get(set_id)
- if g is None:
- signals_json = r.get("signals_json") or "{}"
- try:
- signals = json.loads(signals_json)
- if not isinstance(signals, dict):
- signals = {}
- except Exception:
- signals = {}
- g = {
- "proposal_set_id": set_id,
- "created_at_utc": r.get("created_at_utc"),
- "objective": r.get("objective"),
- "release_name": r.get("release_name"),
- "summary": r.get("summary"),
- "signals": signals,
- "proposals": [],
- }
- grouped[set_id] = g
-
- files_json = r.get("files_json") or "[]"
- tests_json = r.get("tests_json") or "[]"
- try:
- files = json.loads(files_json)
- if not isinstance(files, list):
- files = []
- except Exception:
- files = []
- try:
- tests = json.loads(tests_json)
- if not isinstance(tests, list):
- tests = []
- except Exception:
- tests = []
-
- g["proposals"].append(
- {
- "proposal_id": r.get("proposal_id"),
- "title": r.get("title"),
- "problem": r.get("problem"),
- "change": r.get("change_text"),
- "files": files,
- "risk": r.get("risk") or "medium",
- "tests": tests,
- "auto_apply_safe": bool(r.get("auto_apply_safe", False)),
- }
- )
-
- sets = list(grouped.values())
- sets.sort(key=lambda x: str(x.get("created_at_utc") or ""), reverse=True)
- return {
- "count": len(sets),
- "filters": {
- "release_name": release_name,
- "proposal_set_id": proposal_set_id,
- "limit": bounded_limit,
- },
- "rows": sets,
- }
-
-
-@app.post("/assistant/self-improve/apply", response_model=AssistantSelfImproveApplyResponse)
-async def assistant_self_improve_apply(
- payload: AssistantSelfImproveApplyPayload,
- x_admin_api_key: Optional[str] = Header(default=None),
-):
- check_admin_api_key(x_admin_api_key)
- repo_dir = Path(__file__).resolve().parent
- proposal: Optional[AssistantSelfImproveProposal] = payload.proposal
- if proposal is None:
- if not (payload.proposal_set_id and payload.proposal_id):
- raise HTTPException(
- status_code=400,
- detail="Provide either proposal object or both proposal_set_id and proposal_id.",
- )
- hist = await run_remote_query_assistant_proposals(
- release_name=payload.release_name,
- proposal_set_id=payload.proposal_set_id,
- limit=500,
- )
- matched: Optional[AssistantSelfImproveProposal] = None
- for r in hist.get("rows", []):
- if str(r.get("proposal_id") or "") != payload.proposal_id:
- continue
- files_json = r.get("files_json") or "[]"
- tests_json = r.get("tests_json") or "[]"
- try:
- files = json.loads(files_json)
- if not isinstance(files, list):
- files = []
- except Exception:
- files = []
- try:
- tests = json.loads(tests_json)
- if not isinstance(tests, list):
- tests = []
- except Exception:
- tests = []
- risk_raw = str(r.get("risk") or "medium").strip().lower()
- risk: Literal["low", "medium", "high"] = "medium"
- if risk_raw in ("low", "medium", "high"):
- risk = risk_raw # type: ignore[assignment]
- matched = AssistantSelfImproveProposal(
- proposal_id=str(r.get("proposal_id") or payload.proposal_id),
- title=str(r.get("title") or ""),
- problem=str(r.get("problem") or ""),
- change=str(r.get("change_text") or ""),
- files=[str(x) for x in files if str(x).strip()],
- risk=risk,
- tests=[str(x) for x in tests if str(x).strip()],
- auto_apply_safe=bool(r.get("auto_apply_safe", False)),
- )
- break
- if matched is None:
- raise HTTPException(
- status_code=404,
- detail=f"Proposal not found for set_id={payload.proposal_set_id} proposal_id={payload.proposal_id}",
- )
- proposal = matched
- assert proposal is not None
-
- branch_slug = _slugify(f"{proposal.proposal_id}-{proposal.title}")
- branch_name = f"assistant/{branch_slug}"
- proposals_dir = repo_dir / "proposals"
- proposals_dir.mkdir(parents=True, exist_ok=True)
- proposal_file = proposals_dir / f"{branch_slug}.md"
- commands: List[str] = [
- f"git -C {repo_dir} rev-parse --is-inside-work-tree",
- f"git -C {repo_dir} checkout -B {branch_name}",
- f"write {proposal_file}",
- f"git -C {repo_dir} add {proposal_file}",
- f"git -C {repo_dir} commit -m 'chore(self-improve): apply {proposal.proposal_id} {proposal.title}'",
- ]
-
- md = "\n".join(
- [
- f"# {proposal.proposal_id}: {proposal.title}",
- "",
- f"- Objective: {payload.objective or ''}",
- f"- Release: {payload.release_name or ''}",
- f"- Risk: {proposal.risk}",
- f"- Auto apply safe: {proposal.auto_apply_safe}",
- "",
- "## Problem",
- proposal.problem,
- "",
- "## Change",
- proposal.change,
- "",
- "## Files",
- *(f"- `{f}`" for f in (proposal.files or [])),
- "",
- "## Tests",
- *(f"- {t}" for t in (proposal.tests or [])),
- "",
- f"_Generated at {now_iso()}_",
- "",
- ]
- )
-
- if payload.dry_run:
- return AssistantSelfImproveApplyResponse(
- applied=False,
- dry_run=True,
- repo_dir=str(repo_dir),
- branch_name=branch_name,
- proposal_file=str(proposal_file.relative_to(repo_dir)),
- commit=None,
- commands=commands,
- detail="Dry-run only. No branch or commit created.",
- )
-
- chk = await _run_local_cmd(["git", "rev-parse", "--is-inside-work-tree"], repo_dir)
- if chk["code"] != 0:
- raise HTTPException(status_code=500, detail={"message": "Not a git repository", "stderr": chk["stderr"]})
-
- co = await _run_local_cmd(["git", "checkout", "-B", branch_name], repo_dir)
- if co["code"] != 0:
- raise HTTPException(status_code=500, detail={"message": "Failed to create/switch branch", "stderr": co["stderr"]})
-
- proposal_file.write_text(md, encoding="utf-8")
- add = await _run_local_cmd(["git", "add", str(proposal_file.relative_to(repo_dir))], repo_dir)
- if add["code"] != 0:
- raise HTTPException(status_code=500, detail={"message": "Failed to add proposal file", "stderr": add["stderr"]})
-
- commit_msg = f"chore(self-improve): apply {proposal.proposal_id} {proposal.title}"
- cm = await _run_local_cmd(["git", "commit", "-m", commit_msg], repo_dir)
- if cm["code"] != 0:
- # Common case: nothing to commit. still return branch + file info.
- detail = (cm["stderr"] or cm["stdout"] or "").strip()
- head = await _run_local_cmd(["git", "rev-parse", "HEAD"], repo_dir)
- commit = head["stdout"].strip() if head["code"] == 0 else None
- return AssistantSelfImproveApplyResponse(
- applied=True,
- dry_run=False,
- repo_dir=str(repo_dir),
- branch_name=branch_name,
- proposal_file=str(proposal_file.relative_to(repo_dir)),
- commit=commit,
- commands=commands,
- detail=detail or "No new commit created (possibly no diff).",
- )
-
- head = await _run_local_cmd(["git", "rev-parse", "HEAD"], repo_dir)
- commit = head["stdout"].strip() if head["code"] == 0 else None
- return AssistantSelfImproveApplyResponse(
- applied=True,
- dry_run=False,
- repo_dir=str(repo_dir),
- branch_name=branch_name,
- proposal_file=str(proposal_file.relative_to(repo_dir)),
- commit=commit,
- commands=commands,
- detail="Branch and proposal commit created.",
- )
-
-
-@app.post("/assistant/draft", response_model=AssistantDraftResponse)
-async def assistant_draft(payload: AssistantDraftPayload, x_admin_api_key: Optional[str] = Header(default=None)):
- check_admin_api_key(x_admin_api_key)
- retrieval_query = " ".join(
- [
- payload.goal,
- payload.recipient or "",
- payload.task_type,
- " ".join(payload.constraints),
- ]
- ).strip()
- try:
- hits = await es_search_hits(
- q=retrieval_query,
- size=payload.max_sources,
- release_name=payload.release_name,
- )
- except Exception as e:
- print(f"[WARN] assistant_draft retrieval failed: {e}")
- hits = []
- if not hits and payload.release_name:
- try:
- hits = await es_recent_by_release(payload.release_name, size=payload.max_sources)
- except Exception as e:
- print(f"[WARN] assistant_draft release fallback retrieval failed: {e}")
- if not hits and payload.release_name:
- try:
- hits = await es_recent_messages(
- size=payload.max_sources,
- release_name=payload.release_name,
- q=None,
- )
- except Exception as e:
- print(f"[WARN] assistant_draft inbox fallback retrieval failed: {e}")
- if not hits:
- try:
- hits = await es_recent_messages(
- size=payload.max_sources,
- release_name=None,
- q=None,
- )
- except Exception as e:
- print(f"[WARN] assistant_draft global fallback retrieval failed: {e}")
-
- prompt = build_assistant_prompt(payload, hits)
- used_fallback = False
- try:
- draft = await ollama_generate(prompt)
- if not draft.strip():
- used_fallback = True
- draft = fallback_draft_text(payload)
+ # Generate draft using OLLAMA API
+ return ollama_generate(prompt)
except Exception as e:
print(f"[WARN] assistant_draft generation failed: {e}")
- used_fallback = True
- draft = fallback_draft_text(payload)
+ return fallback_draft_text()
- sources: List[AssistantDraftSource] = []
- for h in hits:
- src = h.get("_source", {}) or {}
- sources.append(
- AssistantDraftSource(
- concept_id=str(src.get("concept_id") or ""),
- source_pk=src.get("source_pk"),
- source_table=src.get("source_table"),
- release_name=src.get("release_name"),
- score=float(h.get("_score")) if h.get("_score") is not None else None,
- )
- )
-
- source_count = len([s for s in sources if s.concept_id])
- if source_count >= 5:
- confidence = 0.85
- elif source_count >= 3:
- confidence = 0.75
- elif source_count >= 1:
- confidence = 0.6
- else:
- confidence = 0.35
-
- if len(draft.strip()) < 40:
- confidence = min(confidence, 0.45)
- if used_fallback:
- confidence = min(confidence, 0.4)
-
- source_ids = [s.concept_id for s in sources if s.concept_id]
- citation_required = confidence < 0.75 or used_fallback
- if citation_required and source_ids:
- already_cited = any(cid in draft for cid in source_ids)
- if not already_cited:
- cited = ", ".join(source_ids[:3])
- draft = f"{draft.rstrip()}\n\nSources: {cited}"
-
- return AssistantDraftResponse(
- task_type=payload.task_type,
- draft=draft,
- sources=[s for s in sources if s.concept_id],
- confidence=confidence,
- needs_review=True,
- release_name=payload.release_name,
- )
-
-
-@app.post("/assistant/plan", response_model=AssistantPlanResponse)
-async def assistant_plan(payload: AssistantPlanPayload, x_admin_api_key: Optional[str] = Header(default=None)):
- check_admin_api_key(x_admin_api_key)
- retrieval_query = " ".join([payload.objective, payload.task_type, " ".join(payload.constraints)]).strip()
+def get_sources(retrieval_query: str) -> List[Dict]:
+ """Retrieve relevant sources for the given query."""
try:
- hits = await es_search_hits(
- q=retrieval_query,
- size=payload.max_sources,
- release_name=payload.release_name,
- )
+ # Search for relevant sources using ES API
+ hits = es_search_hits(q=retrieval_query, size=5)
+ return [hit.get("_source", {}) or {} for hit in hits]
except Exception as e:
- print(f"[WARN] assistant_plan retrieval failed: {e}")
- hits = []
- if not hits and payload.release_name:
- try:
- hits = await es_recent_by_release(payload.release_name, size=payload.max_sources)
- except Exception as e:
- print(f"[WARN] assistant_plan release fallback retrieval failed: {e}")
+ print(f"[WARN] assistant_draft retrieval failed: {e}")
+ return []
- prompt = build_assistant_plan_prompt(payload, hits)
- used_fallback = False
- plan_steps: List[AssistantPlanStep] = []
- try:
- raw = await ollama_generate(prompt)
- parsed = _extract_json_object_from_text(raw)
- raw_steps = parsed.get("plan")
- if isinstance(raw_steps, list):
- allowed_action_types = {"research", "draft", "ask_user", "prepare_data", "review"}
- for i, s in enumerate(raw_steps[: payload.max_steps]):
- if not isinstance(s, dict):
- continue
- step_id = str(s.get("step_id") or f"S{i+1}").strip() or f"S{i+1}"
- title = str(s.get("title") or "").strip()
- if not title:
- continue
- at = str(s.get("action_type") or "research").strip()
- action_type = at if at in allowed_action_types else "research"
- requires_approval = bool(s.get("requires_approval", False))
- notes = str(s.get("notes")).strip() if s.get("notes") is not None else None
- plan_steps.append(
- AssistantPlanStep(
- step_id=step_id,
- title=title,
- action_type=action_type, # type: ignore[arg-type]
- requires_approval=requires_approval,
- notes=notes,
- )
- )
- except Exception as e:
- print(f"[WARN] assistant_plan generation failed: {e}")
- used_fallback = True
+async def assistant_draft(req):
+ # ... (rest of the function remains the same)
- if not plan_steps:
- used_fallback = True
- plan_steps = fallback_plan(payload)
+ prompt = build_assistant_prompt(payload, hits)
+ draft = get_draft(prompt)
+ sources = get_sources(retrieval_query=prompt)
+ # ...
+```
- sources: List[AssistantDraftSource] = []
- for h in hits:
- src = h.get("_source", {}) or {}
- sources.append(
- AssistantDraftSource(
- concept_id=str(src.get("concept_id") or ""),
- source_pk=src.get("source_pk"),
- source_table=src.get("source_table"),
- release_name=src.get("release_name"),
- score=float(h.get("_score")) if h.get("_score") is not None else None,
- )
- )
-
- source_count = len([s for s in sources if s.concept_id])
- if source_count >= 5:
- confidence = 0.85
- elif source_count >= 3:
- confidence = 0.75
- elif source_count >= 1:
- confidence = 0.6
- else:
- confidence = 0.35
- if used_fallback:
- confidence = min(confidence, 0.45)
-
- return AssistantPlanResponse(
- objective=payload.objective,
- task_type=payload.task_type,
- plan=plan_steps,
- sources=[s for s in sources if s.concept_id],
- needs_review=True,
- confidence=confidence,
- release_name=payload.release_name,
- )
-
-
-@app.post("/assistant/execute-step", response_model=AssistantExecuteStepResponse)
-async def assistant_execute_step(payload: AssistantExecuteStepPayload, x_admin_api_key: Optional[str] = Header(default=None)):
- check_admin_api_key(x_admin_api_key)
- action_id = str(uuid.uuid4())
- started_at_utc = now_iso()
- run_input = payload.model_dump()
- await record_event_best_effort(action_id, "started", {"step_id": payload.step_id, "approved": payload.approved})
-
- step = find_plan_step(payload.plan, payload.step_id)
- if step is None:
- error_text = f"Step '{payload.step_id}' not found in plan."
- await run_remote_assistant_action(
- action_id=action_id,
- payload=payload,
- step=AssistantPlanStep(
- step_id=payload.step_id,
- title="missing-step",
- action_type="review",
- requires_approval=True,
- notes=error_text,
- ),
- status="blocked",
- output_json={"reason": error_text},
- error_text=error_text,
- )
- await run_remote_record_run(
- run_id=action_id,
- run_type="assistant_execute_step",
- status="failed",
- started_at_utc=started_at_utc,
- finished_at_utc=now_iso(),
- actor="admin-api",
- input_json=run_input,
- output_json={"reason": error_text},
- error_text=error_text,
- )
- raise HTTPException(status_code=400, detail=error_text)
-
- policy_error = enforce_step_policy(payload, step)
- if policy_error:
- out = {"policy_blocked": True, "reason": policy_error}
- await record_event_best_effort(action_id, "policy_blocked", {"step_id": step.step_id, "reason": policy_error})
- await run_remote_assistant_action(
- action_id=action_id,
- payload=payload,
- step=step,
- status="blocked",
- output_json=out,
- error_text=policy_error,
- )
- await run_remote_record_run(
- run_id=action_id,
- run_type="assistant_execute_step",
- status="failed",
- started_at_utc=started_at_utc,
- finished_at_utc=now_iso(),
- actor="admin-api",
- input_json=run_input,
- output_json=out,
- error_text=policy_error,
- )
- return AssistantExecuteStepResponse(
- action_id=action_id,
- step_id=step.step_id,
- status="blocked",
- output=out,
- needs_review=True,
- )
-
- try:
- output = await execute_plan_step(payload, step)
- await record_event_best_effort(action_id, "executed", {"step_id": step.step_id, "action_type": step.action_type})
- await run_remote_assistant_action(
- action_id=action_id,
- payload=payload,
- step=step,
- status="executed",
- output_json=output,
- error_text=None,
- )
- await run_remote_record_run(
- run_id=action_id,
- run_type="assistant_execute_step",
- status="succeeded",
- started_at_utc=started_at_utc,
- finished_at_utc=now_iso(),
- actor="admin-api",
- input_json=run_input,
- output_json=output,
- error_text=None,
- )
- await record_event_best_effort(action_id, "recorded", {"status": "succeeded"})
- return AssistantExecuteStepResponse(
- action_id=action_id,
- step_id=step.step_id,
- status="executed",
- output=output,
- needs_review=True,
- )
- except Exception as e:
- err_text = str(e)
- await run_remote_assistant_action(
- action_id=action_id,
- payload=payload,
- step=step,
- status="blocked",
- output_json={"error": err_text},
- error_text=err_text,
- )
- await run_remote_record_run(
- run_id=action_id,
- run_type="assistant_execute_step",
- status="failed",
- started_at_utc=started_at_utc,
- finished_at_utc=now_iso(),
- actor="admin-api",
- input_json=run_input,
- output_json=None,
- error_text=err_text,
- )
- await record_event_best_effort(action_id, "recorded", {"status": "failed"})
- raise HTTPException(status_code=500, detail=err_text)
-
-
-@app.post("/assistant/feedback")
-async def assistant_feedback(payload: AssistantFeedbackPayload, x_admin_api_key: Optional[str] = Header(default=None)):
- check_admin_api_key(x_admin_api_key)
- run_id = str(uuid.uuid4())
- run_input = payload.model_dump()
- started_at_utc = now_iso()
- await record_event_best_effort(
- run_id,
- "started",
- {"input": {"outcome": payload.outcome, "task_type": payload.task_type, "release_name": payload.release_name}},
- )
- try:
- result = await run_remote_assistant_feedback(run_id, payload)
- await record_event_best_effort(
- run_id,
- "feedback_recorded",
- {"outcome": payload.outcome, "task_type": payload.task_type},
- )
- await run_remote_record_run(
- run_id=run_id,
- run_type="assistant_feedback",
- status="succeeded",
- started_at_utc=started_at_utc,
- finished_at_utc=now_iso(),
- actor="admin-api",
- input_json=run_input,
- output_json=result,
- error_text=None,
- )
- await record_event_best_effort(run_id, "recorded", {"status": "succeeded"})
- return {"run_id": run_id, "result": result}
- except HTTPException as e:
- err_text = e.detail if isinstance(e.detail, str) else json.dumps(e.detail, ensure_ascii=False)
- await run_remote_record_run(
- run_id=run_id,
- run_type="assistant_feedback",
- status="failed",
- started_at_utc=started_at_utc,
- finished_at_utc=now_iso(),
- actor="admin-api",
- input_json=run_input,
- output_json=None,
- error_text=err_text,
- )
- await record_event_best_effort(run_id, "recorded", {"status": "failed"})
- raise
- except Exception as e:
- await run_remote_record_run(
- run_id=run_id,
- run_type="assistant_feedback",
- status="failed",
- started_at_utc=started_at_utc,
- finished_at_utc=now_iso(),
- actor="admin-api",
- input_json=run_input,
- output_json=None,
- error_text=str(e),
- )
- await record_event_best_effort(run_id, "recorded", {"status": "failed"})
- raise
-
-
-@app.get("/assistant/feedback")
-async def assistant_feedback_list(
- outcome: Optional[Literal["accepted", "edited", "rejected"]] = None,
- task_type: Optional[Literal["message", "finance", "gov", "general"]] = None,
- release_name: Optional[str] = None,
- limit: int = 50,
- x_admin_api_key: Optional[str] = Header(default=None),
-):
- check_admin_api_key(x_admin_api_key)
- bounded_limit = max(1, min(limit, 500))
- result = await run_remote_query_assistant_feedback(
- outcome=outcome,
- task_type=task_type,
- release_name=release_name,
- limit=bounded_limit,
- )
- rows = result["rows"]
- return {
- "count": len(rows),
- "filters": {
- "outcome": outcome,
- "task_type": task_type,
- "release_name": release_name,
- "limit": bounded_limit,
- },
- "rows": rows,
- }
-
-
-@app.get("/assistant/metrics")
-async def assistant_metrics(
- task_type: Optional[Literal["message", "finance", "gov", "general"]] = None,
- release_name: Optional[str] = None,
- outcome: Optional[Literal["accepted", "edited", "rejected"]] = None,
- group_by: Literal["task_type", "release_name", "both"] = "both",
- limit: int = 100,
- x_admin_api_key: Optional[str] = Header(default=None),
-):
- check_admin_api_key(x_admin_api_key)
- bounded_limit = max(1, min(limit, 1000))
- result = await run_remote_query_assistant_metrics(
- task_type=task_type,
- release_name=release_name,
- outcome=outcome,
- group_by=group_by,
- limit=bounded_limit,
- )
- rows = result["rows"]
- return {
- "count": len(rows),
- "filters": {
- "task_type": task_type,
- "release_name": release_name,
- "outcome": outcome,
- "group_by": group_by,
- "limit": bounded_limit,
- },
- "rows": rows,
- }
-
-
-@app.get("/assistant/actions")
-async def assistant_actions(
- status: Optional[Literal["blocked", "executed"]] = None,
- task_type: Optional[Literal["message", "finance", "gov", "general"]] = None,
- release_name: Optional[str] = None,
- step_id: Optional[str] = None,
- action_type: Optional[Literal["research", "draft", "ask_user", "prepare_data", "review"]] = None,
- limit: int = 50,
- x_admin_api_key: Optional[str] = Header(default=None),
-):
- check_admin_api_key(x_admin_api_key)
- bounded_limit = max(1, min(limit, 500))
- result = await run_remote_query_assistant_actions(
- status=status,
- task_type=task_type,
- release_name=release_name,
- step_id=step_id,
- action_type=action_type,
- limit=bounded_limit,
- )
- rows = result["rows"]
- return {
- "count": len(rows),
- "filters": {
- "status": status,
- "task_type": task_type,
- "release_name": release_name,
- "step_id": step_id,
- "action_type": action_type,
- "limit": bounded_limit,
- },
- "rows": rows,
- }
-
-
-@app.post("/admin/project-release")
-async def project_release(payload: ProjectionTrigger, x_admin_api_key: Optional[str] = Header(default=None)):
- check_admin_api_key(x_admin_api_key)
- run_id = str(uuid.uuid4())
- run_input = payload.model_dump()
- started_at_utc = now_iso()
- await record_event_best_effort(run_id, "started", {"input": run_input})
- try:
- result = await run_remote_projector(payload)
- if result.get("spark_read_done"):
- await record_event_best_effort(run_id, "spark_read_done", {"release_name": payload.release_name})
- if result.get("projection_done"):
- await record_event_best_effort(
- run_id,
- "projection_done",
- {"targets": payload.targets, "dry_run": payload.dry_run},
- )
- await run_remote_record_run(
- run_id=run_id,
- run_type="projection",
- status="succeeded",
- started_at_utc=started_at_utc,
- finished_at_utc=now_iso(),
- actor="admin-api",
- input_json=run_input,
- output_json=result,
- error_text=None,
- )
- await record_event_best_effort(run_id, "recorded", {"status": "succeeded"})
- return {"run_id": run_id, "result": result}
- except HTTPException as e:
- err_text = e.detail if isinstance(e.detail, str) else json.dumps(e.detail, ensure_ascii=False)
- await run_remote_record_run(
- run_id=run_id,
- run_type="projection",
- status="failed",
- started_at_utc=started_at_utc,
- finished_at_utc=now_iso(),
- actor="admin-api",
- input_json=run_input,
- output_json=None,
- error_text=err_text,
- )
- await record_event_best_effort(run_id, "recorded", {"status": "failed"})
- raise
- except Exception as e:
- await run_remote_record_run(
- run_id=run_id,
- run_type="projection",
- status="failed",
- started_at_utc=started_at_utc,
- finished_at_utc=now_iso(),
- actor="admin-api",
- input_json=run_input,
- output_json=None,
- error_text=str(e),
- )
- await record_event_best_effort(run_id, "recorded", {"status": "failed"})
- raise
-
-
-@app.post("/admin/ingest-email-imap")
-async def ingest_email_imap(payload: EmailImapIngestPayload, x_admin_api_key: Optional[str] = Header(default=None)):
- check_admin_api_key(x_admin_api_key)
- run_id = str(uuid.uuid4())
- started_at_utc = now_iso()
- run_input = payload.model_dump()
- if "password" in run_input:
- run_input["password"] = "***"
-
- effective_since_uid = payload.since_uid
- if effective_since_uid is None and payload.incremental:
- effective_since_uid = await run_remote_query_imap_checkpoint(
- host=payload.host,
- mailbox=payload.mailbox,
- username=payload.username,
- table=payload.table,
- )
- if effective_since_uid is not None:
- effective_search_criteria = f"UID {int(effective_since_uid) + 1}:*"
- else:
- effective_search_criteria = payload.search_criteria
-
- await record_event_best_effort(
- run_id,
- "started",
- {
- "input": {
- "host": payload.host,
- "mailbox": payload.mailbox,
- "search_criteria": effective_search_criteria,
- "since_uid": effective_since_uid,
- "max_messages": payload.max_messages,
- "table": payload.table,
- "dedupe_mode": payload.dedupe_mode,
- }
- },
- )
-
- try:
- items = await asyncio.to_thread(
- fetch_imap_messages_blocking,
- payload,
- effective_search_criteria,
- effective_since_uid,
- )
- max_uid_fetched: Optional[int] = None
- for m in items:
- uid_raw = m.metadata.get("imap_uid")
- try:
- uid_int = int(uid_raw)
- except Exception:
- continue
- if max_uid_fetched is None or uid_int > max_uid_fetched:
- max_uid_fetched = uid_int
- batch_payload = MessageIngestBatchPayload(
- table=payload.table,
- dedupe_mode=payload.dedupe_mode,
- messages=items,
- )
- ingest_result = await run_remote_ingest_messages_batch(batch_payload)
- result = {
- "incremental": payload.incremental,
- "since_uid": effective_since_uid,
- "search_criteria_used": effective_search_criteria,
- "max_uid_fetched": max_uid_fetched,
- "fetched_messages": len(items),
- "ingested_rows_requested": len(items),
- "ingest_result": ingest_result,
- }
- await record_event_best_effort(
- run_id,
- "ingest_done",
- {"fetched_messages": len(items), "table": payload.table, "max_uid_fetched": max_uid_fetched},
- )
- await run_remote_record_run(
- run_id=run_id,
- run_type="ingest_email_imap",
- status="succeeded",
- started_at_utc=started_at_utc,
- finished_at_utc=now_iso(),
- actor="admin-api",
- input_json=run_input,
- output_json=result,
- error_text=None,
- )
- await record_event_best_effort(run_id, "recorded", {"status": "succeeded"})
- return {"run_id": run_id, "result": result}
- except HTTPException as e:
- err_text = e.detail if isinstance(e.detail, str) else json.dumps(e.detail, ensure_ascii=False)
- await run_remote_record_run(
- run_id=run_id,
- run_type="ingest_email_imap",
- status="failed",
- started_at_utc=started_at_utc,
- finished_at_utc=now_iso(),
- actor="admin-api",
- input_json=run_input,
- output_json=None,
- error_text=err_text,
- )
- await record_event_best_effort(run_id, "recorded", {"status": "failed"})
- raise
- except Exception as e:
- await run_remote_record_run(
- run_id=run_id,
- run_type="ingest_email_imap",
- status="failed",
- started_at_utc=started_at_utc,
- finished_at_utc=now_iso(),
- actor="admin-api",
- input_json=run_input,
- output_json=None,
- error_text=str(e),
- )
- await record_event_best_effort(run_id, "recorded", {"status": "failed"})
- raise
-
-
-@app.post("/admin/poll-and-project")
-async def poll_and_project(payload: PollAndProjectPayload, x_admin_api_key: Optional[str] = Header(default=None)):
- check_admin_api_key(x_admin_api_key)
- if payload.imap.table != "lake.db1.messages":
- raise HTTPException(
- status_code=400,
- detail="poll-and-project currently supports only table lake.db1.messages",
- )
-
- run_id = str(uuid.uuid4())
- started_at_utc = now_iso()
- run_input = payload.model_dump()
- if run_input.get("imap", {}).get("password"):
- run_input["imap"]["password"] = "***"
-
- await record_event_best_effort(
- run_id,
- "started",
- {
- "input": {
- "imap": {
- "host": payload.imap.host,
- "mailbox": payload.imap.mailbox,
- "incremental": payload.imap.incremental,
- "since_uid": payload.imap.since_uid,
- "max_messages": payload.imap.max_messages,
- "table": payload.imap.table,
- },
- "targets": payload.targets,
- "dry_run": payload.dry_run,
- "project_if_no_new": payload.project_if_no_new,
- }
- },
- )
-
- try:
- ingest = await ingest_email_imap(payload.imap, x_admin_api_key)
- ingest_result = ingest["result"]
- fetched_messages = int(ingest_result.get("fetched_messages", 0))
- await record_event_best_effort(
- run_id,
- "ingest_done",
- {
- "ingest_run_id": ingest.get("run_id"),
- "fetched_messages": fetched_messages,
- "max_uid_fetched": ingest_result.get("max_uid_fetched"),
- },
- )
-
- if fetched_messages <= 0 and not payload.project_if_no_new:
- result = {
- "ingest_run_id": ingest.get("run_id"),
- "ingest_result": ingest_result,
- "skipped": True,
- "reason": "No new messages",
- }
- await run_remote_record_run(
- run_id=run_id,
- run_type="poll_and_project",
- status="succeeded",
- started_at_utc=started_at_utc,
- finished_at_utc=now_iso(),
- actor="admin-api",
- input_json=run_input,
- output_json=result,
- error_text=None,
- )
- await record_event_best_effort(run_id, "recorded", {"status": "succeeded"})
- return {"run_id": run_id, "result": result}
-
- if payload.release_name:
- release_name = payload.release_name
- else:
- ts = datetime.now(timezone.utc).strftime("%Y-%m-%d_%H%M%S")
- release_name = f"{payload.release_prefix}_{ts}_messages-auto"
-
- create_result = await run_remote_create_messages_release(release_name)
- await record_event_best_effort(
- run_id,
- "release_created",
- {"release_name": release_name},
- )
-
- projection = ProjectionTrigger(
- release_name=release_name,
- targets=payload.targets,
- concept_table=payload.concept_table,
- dry_run=payload.dry_run,
- )
- projection_result = await run_remote_projector(projection)
- await record_event_best_effort(
- run_id,
- "projection_done",
- {"release_name": release_name, "targets": payload.targets},
- )
-
- result = {
- "ingest_run_id": ingest.get("run_id"),
- "ingest_result": ingest_result,
- "release": create_result,
- "projection": projection_result,
- "release_name": release_name,
- "skipped": False,
- }
- await run_remote_record_run(
- run_id=run_id,
- run_type="poll_and_project",
- status="succeeded",
- started_at_utc=started_at_utc,
- finished_at_utc=now_iso(),
- actor="admin-api",
- input_json=run_input,
- output_json=result,
- error_text=None,
- )
- await record_event_best_effort(run_id, "recorded", {"status": "succeeded"})
- return {"run_id": run_id, "result": result}
- except HTTPException as e:
- err_text = e.detail if isinstance(e.detail, str) else json.dumps(e.detail, ensure_ascii=False)
- await run_remote_record_run(
- run_id=run_id,
- run_type="poll_and_project",
- status="failed",
- started_at_utc=started_at_utc,
- finished_at_utc=now_iso(),
- actor="admin-api",
- input_json=run_input,
- output_json=None,
- error_text=err_text,
- )
- await record_event_best_effort(run_id, "recorded", {"status": "failed"})
- raise
- except Exception as e:
- await run_remote_record_run(
- run_id=run_id,
- run_type="poll_and_project",
- status="failed",
- started_at_utc=started_at_utc,
- finished_at_utc=now_iso(),
- actor="admin-api",
- input_json=run_input,
- output_json=None,
- error_text=str(e),
- )
- await record_event_best_effort(run_id, "recorded", {"status": "failed"})
- raise
-
-
-@app.post("/admin/ingest-message")
-async def ingest_message(payload: MessageIngestPayload, x_admin_api_key: Optional[str] = Header(default=None)):
- check_admin_api_key(x_admin_api_key)
- run_id = str(uuid.uuid4())
- run_input = payload.model_dump()
- started_at_utc = now_iso()
- await record_event_best_effort(
- run_id,
- "started",
- {"input": {"table": payload.table, "message_id": payload.message_id, "thread_id": payload.thread_id}},
- )
- try:
- result = await run_remote_ingest_message(payload)
- await record_event_best_effort(
- run_id,
- "ingest_done",
- {"table": payload.table, "message_id": payload.message_id, "thread_id": payload.thread_id},
- )
- await run_remote_record_run(
- run_id=run_id,
- run_type="ingest_message",
- status="succeeded",
- started_at_utc=started_at_utc,
- finished_at_utc=now_iso(),
- actor="admin-api",
- input_json=run_input,
- output_json=result,
- error_text=None,
- )
- await record_event_best_effort(run_id, "recorded", {"status": "succeeded"})
- return {"run_id": run_id, "result": result}
- except HTTPException as e:
- err_text = e.detail if isinstance(e.detail, str) else json.dumps(e.detail, ensure_ascii=False)
- await run_remote_record_run(
- run_id=run_id,
- run_type="ingest_message",
- status="failed",
- started_at_utc=started_at_utc,
- finished_at_utc=now_iso(),
- actor="admin-api",
- input_json=run_input,
- output_json=None,
- error_text=err_text,
- )
- await record_event_best_effort(run_id, "recorded", {"status": "failed"})
- raise
- except Exception as e:
- await run_remote_record_run(
- run_id=run_id,
- run_type="ingest_message",
- status="failed",
- started_at_utc=started_at_utc,
- finished_at_utc=now_iso(),
- actor="admin-api",
- input_json=run_input,
- output_json=None,
- error_text=str(e),
- )
- await record_event_best_effort(run_id, "recorded", {"status": "failed"})
- raise
-
-
-@app.post("/admin/ingest-messages-batch")
-async def ingest_messages_batch(
- payload: MessageIngestBatchPayload,
- x_admin_api_key: Optional[str] = Header(default=None),
-):
- check_admin_api_key(x_admin_api_key)
- if not payload.messages:
- raise HTTPException(status_code=400, detail="messages must contain at least 1 item")
-
- run_id = str(uuid.uuid4())
- run_input = payload.model_dump()
- started_at_utc = now_iso()
- await record_event_best_effort(
- run_id,
- "started",
- {"input": {"table": payload.table, "rows": len(payload.messages)}},
- )
- try:
- result = await run_remote_ingest_messages_batch(payload)
- await record_event_best_effort(
- run_id,
- "ingest_done",
- {"table": payload.table, "rows": len(payload.messages)},
- )
- await run_remote_record_run(
- run_id=run_id,
- run_type="ingest_messages_batch",
- status="succeeded",
- started_at_utc=started_at_utc,
- finished_at_utc=now_iso(),
- actor="admin-api",
- input_json=run_input,
- output_json=result,
- error_text=None,
- )
- await record_event_best_effort(run_id, "recorded", {"status": "succeeded"})
- return {"run_id": run_id, "result": result}
- except HTTPException as e:
- err_text = e.detail if isinstance(e.detail, str) else json.dumps(e.detail, ensure_ascii=False)
- await run_remote_record_run(
- run_id=run_id,
- run_type="ingest_messages_batch",
- status="failed",
- started_at_utc=started_at_utc,
- finished_at_utc=now_iso(),
- actor="admin-api",
- input_json=run_input,
- output_json=None,
- error_text=err_text,
- )
- await record_event_best_effort(run_id, "recorded", {"status": "failed"})
- raise
- except Exception as e:
- await run_remote_record_run(
- run_id=run_id,
- run_type="ingest_messages_batch",
- status="failed",
- started_at_utc=started_at_utc,
- finished_at_utc=now_iso(),
- actor="admin-api",
- input_json=run_input,
- output_json=None,
- error_text=str(e),
- )
- await record_event_best_effort(run_id, "recorded", {"status": "failed"})
- raise
+This is just a sample refactoring; you may need to modify it based on your specific requirements and code structure.
\ No newline at end of file