jecio/app.py

4033 lines
142 KiB
Python
Raw Normal View History

import os
import json
import hashlib
import asyncio
import shlex
import uuid
import base64
import imaplib
import email
import tempfile
import re
import time
from pathlib import Path
from email import policy
from email.utils import parseaddr, parsedate_to_datetime
from datetime import datetime, timezone
from typing import Optional, List, Dict, Any, Literal
import httpx
from fastapi import FastAPI, HTTPException, Header
from fastapi.responses import FileResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
from gremlin_python.driver import client as gremlin_client
from gremlin_python.driver.serializer import GraphSONSerializersV3d0
from dotenv import load_dotenv
APP_NAME = "concept-api"
APP_VERSION = os.getenv("APP_VERSION", "dev-local")
APP_STARTED_AT_UTC = datetime.now(timezone.utc).isoformat()
# Keep env loading behavior aligned with connectivity_check.py.
load_dotenv()
# ---- config (set these env vars) ----
GREMLIN_URL = os.getenv("GREMLIN_URL", "ws://localhost:8182/gremlin")
ES_URL = os.getenv("ES_URL", "http://localhost:9200")
ES_INDEX = os.getenv("ES_INDEX", "concepts")
IPFS_API = os.getenv("IPFS_API", "http://localhost:5001") # Kubo HTTP API
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://localhost:11434")
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "llama3.1:8b")
OLLAMA_EMBED_MODEL = os.getenv("OLLAMA_EMBED_MODEL", "nomic-embed-text")
PROJECTOR_SSH_HOST = os.getenv("PROJECTOR_SSH_HOST", "lakehouse-core.rakeroots.lan")
PROJECTOR_REMOTE_DIR = os.getenv("PROJECTOR_REMOTE_DIR", "/tmp/jecio")
PROJECTOR_REMOTE_SCRIPT = os.getenv("PROJECTOR_REMOTE_SCRIPT", "./run-projector-standard.sh")
PROJECTOR_SSH_BIN = os.getenv("PROJECTOR_SSH_BIN", "ssh")
PROJECTOR_SSH_OPTS = os.getenv("PROJECTOR_SSH_OPTS", "-o BatchMode=yes -o ConnectTimeout=10")
PROJECTOR_SCP_BIN = os.getenv("PROJECTOR_SCP_BIN", "scp")
PROJECTOR_SCP_OPTS = os.getenv("PROJECTOR_SCP_OPTS", "-o BatchMode=yes -o ConnectTimeout=10")
PROJECTOR_TIMEOUT_SEC = int(os.getenv("PROJECTOR_TIMEOUT_SEC", "900"))
ADMIN_API_KEY = os.getenv("ADMIN_API_KEY", "")
RUNS_REMOTE_SCRIPT = os.getenv("RUNS_REMOTE_SCRIPT", "./record-run-via-spark-container.sh")
RUN_EVENTS_REMOTE_SCRIPT = os.getenv("RUN_EVENTS_REMOTE_SCRIPT", "./record-run-event-via-spark-container.sh")
INGEST_MESSAGE_REMOTE_SCRIPT = os.getenv("INGEST_MESSAGE_REMOTE_SCRIPT", "./ingest-message-via-spark-container.sh")
INGEST_MESSAGES_BATCH_REMOTE_SCRIPT = os.getenv(
"INGEST_MESSAGES_BATCH_REMOTE_SCRIPT",
"./ingest-messages-batch-via-spark-container.sh",
)
ASSISTANT_FEEDBACK_REMOTE_SCRIPT = os.getenv(
"ASSISTANT_FEEDBACK_REMOTE_SCRIPT",
"./record-assistant-feedback-via-spark-container.sh",
)
ASSISTANT_FEEDBACK_QUERY_REMOTE_SCRIPT = os.getenv(
"ASSISTANT_FEEDBACK_QUERY_REMOTE_SCRIPT",
"./query-assistant-feedback-via-spark-container.sh",
)
ASSISTANT_METRICS_QUERY_REMOTE_SCRIPT = os.getenv(
"ASSISTANT_METRICS_QUERY_REMOTE_SCRIPT",
"./query-assistant-metrics-via-spark-container.sh",
)
ASSISTANT_ACTION_REMOTE_SCRIPT = os.getenv(
"ASSISTANT_ACTION_REMOTE_SCRIPT",
"./record-assistant-action-via-spark-container.sh",
)
ASSISTANT_ACTIONS_QUERY_REMOTE_SCRIPT = os.getenv(
"ASSISTANT_ACTIONS_QUERY_REMOTE_SCRIPT",
"./query-assistant-actions-via-spark-container.sh",
)
ASSISTANT_PROPOSALS_REMOTE_SCRIPT = os.getenv(
"ASSISTANT_PROPOSALS_REMOTE_SCRIPT",
"./record-assistant-proposals-via-spark-container.sh",
)
ASSISTANT_PROPOSALS_QUERY_REMOTE_SCRIPT = os.getenv(
"ASSISTANT_PROPOSALS_QUERY_REMOTE_SCRIPT",
"./query-assistant-proposals-via-spark-container.sh",
)
IMAP_CHECKPOINT_REMOTE_SCRIPT = os.getenv(
"IMAP_CHECKPOINT_REMOTE_SCRIPT",
"./query-imap-checkpoint-via-spark-container.sh",
)
CREATE_MESSAGES_RELEASE_REMOTE_SCRIPT = os.getenv(
"CREATE_MESSAGES_RELEASE_REMOTE_SCRIPT",
"./create-messages-release-via-spark-container.sh",
)
app = FastAPI(title=APP_NAME)
UI_DIR = Path(__file__).resolve().parent / "ui"
UI_ASSETS_DIR = UI_DIR / "assets"
if UI_ASSETS_DIR.exists():
app.mount("/ui/assets", StaticFiles(directory=str(UI_ASSETS_DIR)), name="ui-assets")
# --------- models ---------
class ConceptCreate(BaseModel):
canonical_name: str = Field(..., min_length=1)
kind: Optional[str] = None
aliases: List[str] = []
description: Optional[str] = None
external_ids: Dict[str, str] = {} # {"wikidata":"Q42"} etc.
tags: List[str] = []
class ConceptOut(BaseModel):
concept_id: str
canonical_name: str
kind: Optional[str] = None
aliases: List[str] = []
external_ids: Dict[str, str] = {}
tags: List[str] = []
latest_cid: Optional[str] = None
summary: Optional[str] = None
created_at: str
updated_at: str
class ProjectionTrigger(BaseModel):
release_name: str = Field(..., min_length=1)
targets: Literal["es", "gremlin", "both"] = "both"
concept_table: Optional[str] = None
dry_run: bool = False
class MessageIngestPayload(BaseModel):
thread_id: str = Field(..., min_length=1)
message_id: str = Field(..., min_length=1)
sender: str = Field(..., min_length=1)
channel: str = Field(..., min_length=1)
sent_at: Optional[str] = None
body: str = Field(..., min_length=1)
metadata: Dict[str, Any] = {}
table: str = "lake.db1.messages"
class MessageIngestItem(BaseModel):
thread_id: str = Field(..., min_length=1)
message_id: str = Field(..., min_length=1)
sender: str = Field(..., min_length=1)
channel: str = Field(..., min_length=1)
sent_at: Optional[str] = None
body: str = Field(..., min_length=1)
metadata: Dict[str, Any] = {}
class MessageIngestBatchPayload(BaseModel):
table: str = "lake.db1.messages"
dedupe_mode: Literal["none", "message_id", "thread_message"] = "none"
messages: List[MessageIngestItem] = Field(default_factory=list)
class EmailImapIngestPayload(BaseModel):
host: str = Field(..., min_length=1)
port: int = 993
use_ssl: bool = True
username: str = Field(..., min_length=1)
password: Optional[str] = None
mailbox: str = "INBOX"
search_criteria: str = "ALL"
max_messages: int = Field(default=50, ge=1, le=500)
table: str = "lake.db1.messages"
dedupe_mode: Literal["none", "message_id", "thread_message"] = "message_id"
channel: str = "email-imap"
incremental: bool = True
since_uid: Optional[int] = None
class PollAndProjectPayload(BaseModel):
imap: EmailImapIngestPayload
release_name: Optional[str] = None
release_prefix: str = "rel"
targets: Literal["es", "gremlin", "both"] = "es"
concept_table: str = "lake.db1.messages"
dry_run: bool = False
project_if_no_new: bool = False
class AssistantDraftPayload(BaseModel):
task_type: Literal["message", "finance", "gov", "general"] = "general"
goal: str = Field(..., min_length=3)
recipient: Optional[str] = None
tone: Optional[str] = "professional"
constraints: List[str] = Field(default_factory=list)
release_name: Optional[str] = None
max_sources: int = Field(default=5, ge=1, le=20)
class AssistantDraftSource(BaseModel):
concept_id: str
source_pk: Optional[str] = None
source_table: Optional[str] = None
release_name: Optional[str] = None
score: Optional[float] = None
class AssistantDraftResponse(BaseModel):
task_type: str
draft: str
sources: List[AssistantDraftSource]
confidence: float
needs_review: bool
release_name: Optional[str] = None
class AssistantPlanPayload(BaseModel):
task_type: Literal["message", "finance", "gov", "general"] = "general"
objective: str = Field(..., min_length=3)
constraints: List[str] = Field(default_factory=list)
release_name: Optional[str] = None
max_sources: int = Field(default=5, ge=1, le=20)
max_steps: int = Field(default=6, ge=1, le=20)
class AssistantPlanStep(BaseModel):
step_id: str
title: str
action_type: Literal["research", "draft", "ask_user", "prepare_data", "review"]
requires_approval: bool = False
notes: Optional[str] = None
class AssistantPlanResponse(BaseModel):
objective: str
task_type: str
plan: List[AssistantPlanStep]
sources: List[AssistantDraftSource]
needs_review: bool
confidence: float
release_name: Optional[str] = None
class AssistantExecuteStepPayload(BaseModel):
task_type: Literal["message", "finance", "gov", "general"] = "general"
objective: str = Field(..., min_length=3)
release_name: Optional[str] = None
plan: List[AssistantPlanStep]
step_id: str = Field(..., min_length=1)
approved: bool = False
manual_confirm_token: Optional[str] = None
class AssistantExecuteStepResponse(BaseModel):
action_id: str
step_id: str
status: Literal["blocked", "executed"]
output: Dict[str, Any]
needs_review: bool
class AssistantFeedbackPayload(BaseModel):
outcome: Literal["accepted", "edited", "rejected"]
task_type: Literal["message", "finance", "gov", "general"] = "general"
release_name: Optional[str] = None
goal: Optional[str] = None
draft: str = Field(..., min_length=1)
final_text: Optional[str] = None
sources: List[AssistantDraftSource] = Field(default_factory=list)
confidence: Optional[float] = None
needs_review: bool = True
notes: Optional[str] = None
class AssistantLearnPayload(BaseModel):
text: str = Field(..., min_length=3)
title: Optional[str] = None
tags: List[str] = Field(default_factory=list)
release_name: Optional[str] = None
metadata: Dict[str, Any] = Field(default_factory=dict)
write_code: bool = False
code_objective: Optional[str] = None
code_files: List[str] = Field(default_factory=list)
code_dry_run: bool = True
code_commit: bool = False
code_branch: Optional[str] = None
class AssistantChatMessage(BaseModel):
role: Literal["user", "assistant"]
content: str
class AssistantChatPayload(BaseModel):
message: str = Field(..., min_length=1)
session_id: Optional[str] = None
release_name: Optional[str] = None
max_sources: int = Field(default=6, ge=1, le=20)
history: List[AssistantChatMessage] = Field(default_factory=list)
temperature_hint: Optional[str] = "balanced"
class AssistantChatResponse(BaseModel):
session_id: str
answer: str
sources: List[AssistantDraftSource]
confidence: float
release_name: Optional[str] = None
class AssistantSelfImprovePayload(BaseModel):
objective: str = "Improve assistant quality and reliability"
release_name: Optional[str] = None
max_proposals: int = Field(default=5, ge=1, le=20)
feedback_limit: int = Field(default=50, ge=1, le=500)
action_limit: int = Field(default=50, ge=1, le=500)
include_edited_feedback: bool = True
include_rejected_feedback: bool = True
include_blocked_actions: bool = True
apply: bool = False
class AssistantSelfImproveProposal(BaseModel):
proposal_id: str
title: str
problem: str
change: str
files: List[str] = Field(default_factory=list)
risk: Literal["low", "medium", "high"] = "medium"
tests: List[str] = Field(default_factory=list)
auto_apply_safe: bool = False
class AssistantSelfImproveResponse(BaseModel):
objective: str
release_name: Optional[str] = None
proposal_set_id: str
created_at_utc: str
summary: str
proposals: List[AssistantSelfImproveProposal]
signals: Dict[str, Any]
apply_blocked: bool
apply_block_reason: Optional[str] = None
class AssistantSelfImproveApplyPayload(BaseModel):
objective: Optional[str] = None
release_name: Optional[str] = None
proposal_set_id: Optional[str] = None
proposal_id: Optional[str] = None
proposal: Optional[AssistantSelfImproveProposal] = None
dry_run: bool = False
class AssistantSelfImproveApplyResponse(BaseModel):
applied: bool
dry_run: bool
repo_dir: str
branch_name: str
proposal_file: str
commit: Optional[str] = None
commands: List[str] = Field(default_factory=list)
detail: Optional[str] = None
# --------- helpers ---------
def now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _tail(text: str, max_chars: int = 8000) -> str:
if len(text) <= max_chars:
return text
return text[-max_chars:]
def check_admin_api_key(x_admin_api_key: Optional[str]) -> None:
if not ADMIN_API_KEY:
raise HTTPException(status_code=503, detail="ADMIN_API_KEY is not configured")
if x_admin_api_key != ADMIN_API_KEY:
raise HTTPException(status_code=401, detail="Unauthorized")
def make_fingerprint(name: str, kind: Optional[str], external_ids: Dict[str, str]) -> str:
norm = (name or "").strip().lower()
k = (kind or "").strip().lower()
ext = "|".join(f"{a}:{b}".lower() for a, b in sorted(external_ids.items()))
raw = f"{norm}|{k}|{ext}"
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def _clean_header_id(v: Optional[str]) -> str:
if not v:
return ""
return v.strip().strip("<>").strip()
def _normalize_thread_id(msg_id: str, refs: str, in_reply_to: str, subject: str, sender: str) -> str:
refs_clean = _clean_header_id(refs.split()[-1] if refs else "")
in_reply_clean = _clean_header_id(in_reply_to)
if refs_clean:
return f"thread:{refs_clean}"
if in_reply_clean:
return f"thread:{in_reply_clean}"
seed = f"{subject.strip().lower()}|{sender.strip().lower()}"
if not seed.strip("|"):
seed = msg_id
return "thread:" + hashlib.sha256(seed.encode("utf-8")).hexdigest()[:24]
def _extract_body_text(msg: email.message.Message) -> str:
try:
if msg.is_multipart():
for part in msg.walk():
ctype = (part.get_content_type() or "").lower()
disp = (part.get("Content-Disposition") or "").lower()
if ctype == "text/plain" and "attachment" not in disp:
payload_obj = part.get_content()
if isinstance(payload_obj, str):
return payload_obj.strip()
if isinstance(payload_obj, bytes):
return payload_obj.decode(part.get_content_charset() or "utf-8", errors="replace").strip()
for part in msg.walk():
ctype = (part.get_content_type() or "").lower()
if ctype == "text/html":
html_obj = part.get_content()
if isinstance(html_obj, bytes):
html_obj = html_obj.decode(part.get_content_charset() or "utf-8", errors="replace")
if isinstance(html_obj, str):
return html_obj.strip()
return ""
payload_obj = msg.get_content()
if isinstance(payload_obj, str):
return payload_obj.strip()
if isinstance(payload_obj, bytes):
return payload_obj.decode(msg.get_content_charset() or "utf-8", errors="replace").strip()
return ""
except Exception:
return ""
def fetch_imap_messages_blocking(
payload: EmailImapIngestPayload,
effective_search_criteria: str,
since_uid: Optional[int],
) -> List[MessageIngestItem]:
password = payload.password or os.getenv("IMAP_PASSWORD", "")
if not password:
raise ValueError("IMAP password missing: provide payload.password or set IMAP_PASSWORD")
if payload.use_ssl:
client = imaplib.IMAP4_SSL(payload.host, payload.port)
else:
client = imaplib.IMAP4(payload.host, payload.port)
try:
status, _ = client.login(payload.username, password)
if status != "OK":
raise RuntimeError("IMAP login failed")
status, _ = client.select(payload.mailbox, readonly=True)
if status != "OK":
raise RuntimeError(f"IMAP select mailbox failed: {payload.mailbox}")
if since_uid is not None:
status, search_data = client.uid("search", None, "UID", f"{int(since_uid) + 1}:*")
else:
status, search_data = client.uid("search", None, effective_search_criteria)
if status != "OK":
raise RuntimeError(f"IMAP search failed: {effective_search_criteria}")
uid_bytes = search_data[0] if search_data else b""
uid_list = [u for u in uid_bytes.decode("utf-8", errors="replace").split() if u]
if since_uid is not None:
filtered: List[str] = []
for u in uid_list:
try:
if int(u) > int(since_uid):
filtered.append(u)
except Exception:
continue
uid_list = filtered
if not uid_list:
return []
# For incremental UID windows, process oldest-new first so checkpointing cannot skip gaps.
is_uid_window = since_uid is not None
if is_uid_window:
selected_uids = uid_list[: payload.max_messages]
else:
# For non-incremental scans (e.g. ALL), keep "latest N" behavior.
selected_uids = uid_list[-payload.max_messages :]
out: List[MessageIngestItem] = []
for uid in selected_uids:
status, msg_data = client.uid("fetch", uid, "(RFC822)")
if status != "OK" or not msg_data:
continue
raw_bytes = None
for part in msg_data:
if isinstance(part, tuple) and len(part) >= 2 and isinstance(part[1], (bytes, bytearray)):
raw_bytes = bytes(part[1])
break
if not raw_bytes:
continue
msg = email.message_from_bytes(raw_bytes, policy=policy.default)
subject = str(msg.get("Subject") or "").strip()
from_raw = str(msg.get("From") or "").strip()
to_raw = str(msg.get("To") or "").strip()
date_raw = str(msg.get("Date") or "").strip()
msg_id_raw = str(msg.get("Message-Id") or msg.get("Message-ID") or "").strip()
refs_raw = str(msg.get("References") or "").strip()
in_reply_raw = str(msg.get("In-Reply-To") or "").strip()
sender_email = parseaddr(from_raw)[1] or from_raw or "unknown"
msg_id_clean = _clean_header_id(msg_id_raw)
if not msg_id_clean:
seed = f"{uid}|{subject}|{sender_email}|{date_raw}"
msg_id_clean = "imap-" + hashlib.sha256(seed.encode("utf-8")).hexdigest()[:24]
thread_id = _normalize_thread_id(
msg_id=msg_id_clean,
refs=refs_raw,
in_reply_to=in_reply_raw,
subject=subject,
sender=sender_email,
)
sent_at_iso = None
if date_raw:
try:
dt = parsedate_to_datetime(date_raw)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
sent_at_iso = dt.astimezone(timezone.utc).isoformat()
except Exception:
sent_at_iso = None
body = _extract_body_text(msg)
if not body:
body = f"(no body) {subject}".strip()
metadata = {
"subject": subject,
"from": from_raw,
"to": to_raw,
"date": date_raw,
"imap_uid": uid,
"mailbox": payload.mailbox,
"host": payload.host,
"username": payload.username,
}
out.append(
MessageIngestItem(
thread_id=thread_id,
message_id=msg_id_clean,
sender=sender_email,
channel=payload.channel,
sent_at=sent_at_iso,
body=body,
metadata=metadata,
)
)
return out
finally:
try:
client.logout()
except Exception:
pass
async def run_remote_query_imap_checkpoint(
host: str,
mailbox: str,
username: str,
table: str,
) -> Optional[int]:
parts = [
IMAP_CHECKPOINT_REMOTE_SCRIPT,
host,
mailbox,
username,
table,
]
command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}"
ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
proc = await asyncio.create_subprocess_exec(
*ssh_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
raise HTTPException(status_code=504, detail="IMAP checkpoint query timed out")
out = stdout.decode("utf-8", errors="replace")
err = stderr.decode("utf-8", errors="replace")
if proc.returncode != 0:
raise HTTPException(
status_code=502,
detail={
"host": PROJECTOR_SSH_HOST,
"remote_dir": PROJECTOR_REMOTE_DIR,
"exit_code": proc.returncode,
"stdout_tail": _tail(out),
"stderr_tail": _tail(err),
},
)
try:
obj = _extract_json_object_from_text(out)
val = obj.get("max_uid")
if val is None:
return None
return int(val)
except Exception as e:
raise HTTPException(
status_code=502,
detail={
"message": f"Unable to parse IMAP checkpoint output: {e}",
"stdout_tail": _tail(out),
"stderr_tail": _tail(err),
},
)
async def run_remote_create_messages_release(release_name: str) -> Dict[str, Any]:
parts = [
CREATE_MESSAGES_RELEASE_REMOTE_SCRIPT,
release_name,
]
command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}"
ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
proc = await asyncio.create_subprocess_exec(
*ssh_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
raise HTTPException(status_code=504, detail="Create messages release timed out")
out = stdout.decode("utf-8", errors="replace")
err = stderr.decode("utf-8", errors="replace")
result = {
"host": PROJECTOR_SSH_HOST,
"remote_dir": PROJECTOR_REMOTE_DIR,
"exit_code": proc.returncode,
"release_name": release_name,
"stdout_tail": _tail(out),
"stderr_tail": _tail(err),
}
if proc.returncode != 0:
raise HTTPException(status_code=502, detail=result)
return result
async def ipfs_add_json(payload: Dict[str, Any]) -> str:
# Kubo: POST /api/v0/add with file content in multipart
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
files = {"file": ("concept.json", data, "application/json")}
async with httpx.AsyncClient(timeout=30) as h:
r = await h.post(f"{IPFS_API}/api/v0/add", files=files)
r.raise_for_status()
# Response is text lines of JSON; last line contains Hash
# Often single line, but handle both
last = r.text.strip().splitlines()[-1]
obj = json.loads(last)
return obj["Hash"]
async def ollama_summary(text: str) -> str:
prompt = (
"Summarize the following concept in 1-2 sentences. "
"Keep it factual and compact.\n\n"
f"{text}"
)
async with httpx.AsyncClient(timeout=60) as h:
r = await h.post(
f"{OLLAMA_URL}/api/generate",
json={"model": OLLAMA_MODEL, "prompt": prompt, "stream": False},
)
r.raise_for_status()
return (r.json().get("response") or "").strip()
async def ollama_embed(text: str) -> List[float]:
async with httpx.AsyncClient(timeout=60) as h:
r = await h.post(
f"{OLLAMA_URL}/api/embeddings",
json={"model": OLLAMA_EMBED_MODEL, "prompt": text},
)
r.raise_for_status()
emb = r.json().get("embedding")
if not isinstance(emb, list):
return []
return emb
async def es_ensure_index():
# Minimal mapping: text fields + dense_vector (optional)
# If your ES doesn't support dense_vector, remove it.
mapping = {
"mappings": {
"properties": {
"concept_id": {"type": "keyword"},
"canonical_name": {"type": "text"},
"kind": {"type": "keyword"},
"aliases": {"type": "text"},
"tags": {"type": "keyword"},
"summary": {"type": "text"},
"latest_cid": {"type": "keyword"},
"fingerprint": {"type": "keyword"},
"created_at": {"type": "date"},
"updated_at": {"type": "date"},
"embedding": {"type": "dense_vector", "dims": 768, "index": False}, # may vary
}
}
}
async with httpx.AsyncClient(timeout=30) as h:
head = await h.get(f"{ES_URL}/{ES_INDEX}")
if head.status_code == 200:
return
r = await h.put(f"{ES_URL}/{ES_INDEX}", json=mapping)
# If this fails due to dense_vector incompatibility, you can still proceed:
if r.status_code >= 400:
# try without vector
mapping["mappings"]["properties"].pop("embedding", None)
r2 = await h.put(f"{ES_URL}/{ES_INDEX}", json=mapping)
r2.raise_for_status()
async def es_index(doc: Dict[str, Any]):
async with httpx.AsyncClient(timeout=30) as h:
r = await h.put(f"{ES_URL}/{ES_INDEX}/_doc/{doc['concept_id']}", json=doc)
r.raise_for_status()
async def es_search(q: str, size: int = 10) -> List[Dict[str, Any]]:
query = {
"size": size,
"query": {
"multi_match": {
"query": q,
"fields": ["canonical_name^3", "aliases^2", "summary", "tags"],
}
},
}
async with httpx.AsyncClient(timeout=30) as h:
r = await h.post(f"{ES_URL}/{ES_INDEX}/_search", json=query)
r.raise_for_status()
hits = r.json().get("hits", {}).get("hits", [])
return [h["_source"] for h in hits]
async def es_search_hits(q: str, size: int = 10, release_name: Optional[str] = None) -> List[Dict[str, Any]]:
must_clause: Dict[str, Any] = {
"multi_match": {
"query": q,
"fields": [
"display_name^3",
"canonical_name^3",
"description^2",
"text^2",
"summary^2",
"aliases^2",
"tags",
"source_pk^2",
"source_table",
],
}
}
query: Dict[str, Any] = {"size": size}
if release_name:
release_filter = {
"bool": {
"should": [
{"term": {"release_name.keyword": release_name}},
{"term": {"release_name": release_name}},
{"match_phrase": {"release_name": release_name}},
],
"minimum_should_match": 1,
}
}
query["query"] = {
"bool": {
"must": [must_clause],
"filter": [release_filter],
}
}
else:
query["query"] = must_clause
async with httpx.AsyncClient(timeout=30) as h:
r = await h.post(f"{ES_URL}/{ES_INDEX}/_search", json=query)
r.raise_for_status()
return r.json().get("hits", {}).get("hits", [])
async def es_recent_by_release(release_name: str, size: int = 10) -> List[Dict[str, Any]]:
query: Dict[str, Any] = {
"size": size,
"query": {
"bool": {
"filter": [
{
"bool": {
"should": [
{"term": {"release_name.keyword": release_name}},
{"term": {"release_name": release_name}},
{"match_phrase": {"release_name": release_name}},
],
"minimum_should_match": 1,
}
}
]
}
},
"sort": [{"updated_at": {"order": "desc", "unmapped_type": "date"}}],
}
async with httpx.AsyncClient(timeout=30) as h:
r = await h.post(f"{ES_URL}/{ES_INDEX}/_search", json=query)
r.raise_for_status()
return r.json().get("hits", {}).get("hits", [])
async def es_recent_messages(
size: int = 20,
release_name: Optional[str] = None,
q: Optional[str] = None,
) -> List[Dict[str, Any]]:
filters: List[Dict[str, Any]] = [
{
"bool": {
"should": [
{"term": {"concept_type.keyword": "message"}},
{"term": {"concept_type": "message"}},
{"term": {"kind.keyword": "message"}},
{"term": {"kind": "message"}},
],
"minimum_should_match": 1,
}
}
]
if release_name:
filters.append(
{
"bool": {
"should": [
{"term": {"release_name.keyword": release_name}},
{"term": {"release_name": release_name}},
{"match_phrase": {"release_name": release_name}},
],
"minimum_should_match": 1,
}
}
)
must: List[Dict[str, Any]] = []
if q and q.strip():
must.append(
{
"multi_match": {
"query": q.strip(),
"fields": [
"text^3",
"description^2",
"summary^2",
"display_name^2",
"canonical_name^2",
"source_pk^2",
],
}
}
)
query: Dict[str, Any] = {
"size": size,
"query": {
"bool": {
"filter": filters,
"must": must,
}
},
"sort": [{"updated_at": {"order": "desc", "unmapped_type": "date"}}],
}
async with httpx.AsyncClient(timeout=30) as h:
r = await h.post(f"{ES_URL}/{ES_INDEX}/_search", json=query)
r.raise_for_status()
return r.json().get("hits", {}).get("hits", [])
TASK_TRIGGER_RE = re.compile(
r"\b("
r"please|can you|could you|need to|needs to|todo|to do|follow up|follow-up|"
r"review|fix|send|reply|schedule|book|call|remind|prepare|draft|submit|pay|"
r"deadline|due|let me know|check in|confirm|update me|get back to me"
r")\b",
flags=re.IGNORECASE,
)
TASK_DONE_RE = re.compile(r"\b(done|completed|closed|resolved|fixed)\b", flags=re.IGNORECASE)
DUE_HINT_RE = re.compile(
r"\b("
r"today|tomorrow|tonight|next week|next month|monday|tuesday|wednesday|thursday|friday|saturday|sunday|"
r"\d{4}-\d{2}-\d{2}|"
r"\d{1,2}:\d{2}"
r")\b",
flags=re.IGNORECASE,
)
TASK_AI_CACHE_TTL_SEC = int(os.getenv("TASK_AI_CACHE_TTL_SEC", "3600"))
TASK_AI_CACHE_MAX_SIZE = int(os.getenv("TASK_AI_CACHE_MAX_SIZE", "5000"))
TASK_AI_CACHE: Dict[str, Dict[str, Any]] = {}
ASSISTANT_CHAT_MAX_TURNS = int(os.getenv("ASSISTANT_CHAT_MAX_TURNS", "20"))
ASSISTANT_CHAT_SESSIONS: Dict[str, List[Dict[str, str]]] = {}
def _split_sentences(text: str) -> List[str]:
raw_parts = re.split(r"[\n\r]+|(?<=[.!?])\s+", text or "")
return [p.strip() for p in raw_parts if p and p.strip()]
def _extract_due_hint(text: str) -> Optional[str]:
m = DUE_HINT_RE.search(text or "")
if not m:
return None
return m.group(1)
def _extract_who(text: str, default_sender: Optional[str]) -> Optional[str]:
m = re.search(r"\b(?:to|for)\s+([A-Z][a-zA-Z0-9_-]{1,40})\b", text or "")
if m:
return m.group(1)
return default_sender or None
def extract_pending_tasks_from_source(src: Dict[str, Any]) -> List[Dict[str, Any]]:
text = str(src.get("text") or src.get("description") or src.get("summary") or "").strip()
if not text:
return []
attrs_raw = src.get("attributes_json")
attrs: Dict[str, Any] = {}
if isinstance(attrs_raw, str) and attrs_raw.strip():
try:
parsed = json.loads(attrs_raw)
if isinstance(parsed, dict):
attrs = parsed
except Exception:
attrs = {}
elif isinstance(attrs_raw, dict):
attrs = attrs_raw
thread_id = attrs.get("thread_id")
message_id = attrs.get("message_id") or src.get("source_pk")
sender = attrs.get("sender")
sent_at = attrs.get("sent_at")
base_source = f"{src.get('concept_id') or ''}|{message_id or ''}"
tasks: List[Dict[str, Any]] = []
for sentence in _split_sentences(text):
if len(sentence) < 8:
continue
looks_like_question = "?" in sentence
if not TASK_TRIGGER_RE.search(sentence) and not looks_like_question:
continue
status = "done" if TASK_DONE_RE.search(sentence) else "pending"
due_hint = _extract_due_hint(sentence)
who = _extract_who(sentence, sender)
task_id = "task-" + hashlib.sha256(f"{base_source}|{sentence.lower()}".encode("utf-8")).hexdigest()[:16]
tasks.append(
{
"task_id": task_id,
"status": status,
"todo": sentence[:400],
"due_hint": due_hint,
"who": who,
"concept_id": src.get("concept_id"),
"source_pk": src.get("source_pk"),
"source_table": src.get("source_table"),
"release_name": src.get("release_name"),
"thread_id": thread_id,
"message_id": message_id,
"sender": sender,
"sent_at": sent_at,
"updated_at": src.get("updated_at"),
}
)
return tasks
def _task_ai_cache_key(src: Dict[str, Any]) -> str:
text = str(src.get("text") or src.get("description") or src.get("summary") or "")
text_hash = hashlib.sha256(text.encode("utf-8")).hexdigest()[:24]
base = "|".join(
[
str(src.get("concept_id") or ""),
str(src.get("source_pk") or ""),
str(src.get("updated_at") or ""),
text_hash,
OLLAMA_MODEL,
]
)
return hashlib.sha256(base.encode("utf-8")).hexdigest()
def _task_ai_cache_get(key: str) -> Optional[List[Dict[str, Any]]]:
obj = TASK_AI_CACHE.get(key)
if not obj:
return None
expires_at = float(obj.get("expires_at") or 0)
if expires_at <= time.time():
TASK_AI_CACHE.pop(key, None)
return None
tasks = obj.get("tasks")
if not isinstance(tasks, list):
return None
return tasks
def _task_ai_cache_set(key: str, tasks: List[Dict[str, Any]]) -> None:
# Keep cache bounded in a simple way by evicting oldest-expiry items first.
if len(TASK_AI_CACHE) >= TASK_AI_CACHE_MAX_SIZE:
keys_sorted = sorted(TASK_AI_CACHE.items(), key=lambda kv: float(kv[1].get("expires_at") or 0))
trim_count = max(1, TASK_AI_CACHE_MAX_SIZE // 10)
for k, _ in keys_sorted[:trim_count]:
TASK_AI_CACHE.pop(k, None)
TASK_AI_CACHE[key] = {
"expires_at": time.time() + TASK_AI_CACHE_TTL_SEC,
"tasks": tasks,
}
def build_task_extraction_prompt(src: Dict[str, Any]) -> str:
text = str(src.get("text") or src.get("description") or src.get("summary") or "")[:4000]
attrs_raw = src.get("attributes_json")
attrs = attrs_raw if isinstance(attrs_raw, str) else json.dumps(attrs_raw or {}, ensure_ascii=False)
return (
"Extract actionable tasks from this message. Ignore pure marketing/promotional content.\n"
"Return strict JSON only with shape:\n"
'{"tasks":[{"todo":"...","status":"pending|done","due_hint":"...|null","who":"...|null"}]}\n'
"If no actionable tasks, return {\"tasks\":[]}.\n\n"
f"Message concept_id: {src.get('concept_id')}\n"
f"Source pk: {src.get('source_pk')}\n"
f"Attributes JSON: {attrs}\n"
f"Text:\n{text}\n"
)
async def extract_pending_tasks_from_source_ai(src: Dict[str, Any]) -> List[Dict[str, Any]]:
prompt = build_task_extraction_prompt(src)
raw = await ollama_generate(prompt)
obj = _extract_json_object_from_text(raw)
task_items = obj.get("tasks")
if not isinstance(task_items, list):
return []
attrs_raw = src.get("attributes_json")
attrs: Dict[str, Any] = {}
if isinstance(attrs_raw, str) and attrs_raw.strip():
try:
parsed = json.loads(attrs_raw)
if isinstance(parsed, dict):
attrs = parsed
except Exception:
attrs = {}
elif isinstance(attrs_raw, dict):
attrs = attrs_raw
thread_id = attrs.get("thread_id")
message_id = attrs.get("message_id") or src.get("source_pk")
sender = attrs.get("sender")
sent_at = attrs.get("sent_at")
base_source = f"{src.get('concept_id') or ''}|{message_id or ''}"
out: List[Dict[str, Any]] = []
for item in task_items:
if not isinstance(item, dict):
continue
todo = str(item.get("todo") or "").strip()
if len(todo) < 4:
continue
status_raw = str(item.get("status") or "pending").strip().lower()
status = "done" if status_raw == "done" else "pending"
due_hint = item.get("due_hint")
who = item.get("who")
todo_norm = todo[:400]
task_id = "task-" + hashlib.sha256(f"{base_source}|{todo_norm.lower()}".encode("utf-8")).hexdigest()[:16]
out.append(
{
"task_id": task_id,
"status": status,
"todo": todo_norm,
"due_hint": str(due_hint) if due_hint is not None else None,
"who": str(who) if who is not None else sender,
"concept_id": src.get("concept_id"),
"source_pk": src.get("source_pk"),
"source_table": src.get("source_table"),
"release_name": src.get("release_name"),
"thread_id": thread_id,
"message_id": message_id,
"sender": sender,
"sent_at": sent_at,
"updated_at": src.get("updated_at"),
}
)
return out
def build_chat_prompt(
user_message: str,
history: List[Dict[str, str]],
source_docs: List[Dict[str, Any]],
release_name: Optional[str],
) -> str:
history_lines: List[str] = []
for t in history[-ASSISTANT_CHAT_MAX_TURNS:]:
role = t.get("role", "")
content = (t.get("content", "") or "").strip()
if role in ("user", "assistant") and content:
history_lines.append(f"{role}: {content[:1200]}")
context_chunks = []
for d in source_docs:
src = d.get("_source", {}) or {}
context_chunks.append(
"\n".join(
[
f"concept_id: {src.get('concept_id', '')}",
f"source_pk: {src.get('source_pk', '')}",
f"release_name: {src.get('release_name', '')}",
f"text: {str(src.get('text') or src.get('description') or src.get('summary') or '')[:1200]}",
]
)
)
context = "\n\n---\n\n".join(context_chunks) if context_chunks else "No retrieved context."
hist = "\n".join(history_lines) if history_lines else "(none)"
return (
"You are a practical personal assistant. Be concise, factual, and useful.\n"
"Use retrieved context when available. If uncertain, say so briefly and ask one clarifying question.\n"
"Do not claim external actions were already performed.\n\n"
f"Release filter: {release_name or '(none)'}\n"
f"Conversation history:\n{hist}\n\n"
f"Retrieved context:\n{context}\n\n"
f"User: {user_message}\n"
"Assistant:"
)
def _append_chat_turn(session_id: str, role: str, content: str) -> None:
turns = ASSISTANT_CHAT_SESSIONS.get(session_id, [])
turns.append({"role": role, "content": content})
max_items = ASSISTANT_CHAT_MAX_TURNS * 2
if len(turns) > max_items:
turns = turns[-max_items:]
ASSISTANT_CHAT_SESSIONS[session_id] = turns
def _slugify(text: str) -> str:
t = re.sub(r"[^a-zA-Z0-9]+", "-", (text or "").strip().lower()).strip("-")
return t[:60] or "proposal"
async def _run_local_cmd(args: List[str], cwd: Path) -> Dict[str, Any]:
proc = await asyncio.create_subprocess_exec(
*args,
cwd=str(cwd),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
out = stdout.decode("utf-8", errors="replace")
err = stderr.decode("utf-8", errors="replace")
return {"code": proc.returncode, "stdout": out, "stderr": err}
def build_self_improve_prompt(
payload: AssistantSelfImprovePayload,
feedback_rows: List[Dict[str, Any]],
action_rows: List[Dict[str, Any]],
) -> str:
feedback_lines: List[str] = []
for r in feedback_rows[: payload.feedback_limit]:
feedback_lines.append(
" | ".join(
[
f"outcome={r.get('outcome')}",
f"task_type={r.get('task_type')}",
f"release={r.get('release_name')}",
f"goal={str(r.get('goal') or '')[:200]}",
f"notes={str(r.get('notes') or '')[:200]}",
f"draft={str(r.get('draft_text') or '')[:220]}",
f"final={str(r.get('final_text') or '')[:220]}",
]
)
)
action_lines: List[str] = []
for r in action_rows[: payload.action_limit]:
action_lines.append(
" | ".join(
[
f"status={r.get('status')}",
f"task_type={r.get('task_type')}",
f"step_id={r.get('step_id')}",
f"action_type={r.get('action_type')}",
f"title={str(r.get('step_title') or '')[:160]}",
f"error={str(r.get('error_text') or '')[:220]}",
]
)
)
feedback_block = "\n".join(feedback_lines) if feedback_lines else "(none)"
action_block = "\n".join(action_lines) if action_lines else "(none)"
return (
"You are a senior reliability engineer. Propose concrete code improvements.\n"
"Return JSON only with exact shape:\n"
'{'
'"summary":"...",'
'"proposals":[{"proposal_id":"P1","title":"...","problem":"...","change":"...","files":["app.py"],'
'"risk":"low|medium|high","tests":["..."],"auto_apply_safe":true|false}]}'
"\n"
f"Create at most {payload.max_proposals} proposals.\n"
"Prioritize high-impact, low-risk changes first.\n"
"Only propose changes grounded in the provided signals.\n\n"
f"Objective: {payload.objective}\n"
f"Release filter: {payload.release_name or '(none)'}\n\n"
f"Feedback signals:\n{feedback_block}\n\n"
f"Action signals:\n{action_block}\n"
)
def fallback_self_improve_proposals() -> List[AssistantSelfImproveProposal]:
return [
AssistantSelfImproveProposal(
proposal_id="P1",
title="Improve retrieval fallback ordering",
problem="Low-confidence drafts still occur when release-filtered retrieval returns no hits.",
change="Add deterministic fallback chain and expose retrieval diagnostics in API responses.",
files=["app.py"],
risk="low",
tests=["Call /assistant/draft with missing release_name and verify non-empty sources fallback."],
auto_apply_safe=True,
),
AssistantSelfImproveProposal(
proposal_id="P2",
title="Add deterministic task extraction benchmark route",
problem="Task extraction quality is hard to evaluate consistently over time.",
change="Introduce an eval endpoint with fixed fixtures and compare AI vs heuristic precision.",
files=["app.py", "tests/"],
risk="medium",
tests=["Run fixtures for promotional and actionable emails and assert expected tasks count."],
auto_apply_safe=False,
),
]
def build_assistant_prompt(payload: AssistantDraftPayload, source_docs: List[Dict[str, Any]]) -> str:
recipient = payload.recipient or "unspecified recipient"
tone = payload.tone or "professional"
constraints = payload.constraints or []
constraint_lines = "\n".join(f"- {c}" for c in constraints) if constraints else "- None"
context_chunks = []
for d in source_docs:
src = d.get("_source", {}) or {}
context_chunks.append(
"\n".join(
[
f"concept_id: {src.get('concept_id', '')}",
f"concept_type: {src.get('concept_type', '')}",
f"source_pk: {src.get('source_pk', '')}",
f"source_table: {src.get('source_table', '')}",
f"release_name: {src.get('release_name', '')}",
f"text: {str(src.get('text') or src.get('description') or src.get('summary') or '')[:1000]}",
]
)
)
context = "\n\n---\n\n".join(context_chunks) if context_chunks else "No retrieved context."
return (
"You are a careful personal assistant. Draft a response based only on provided context.\n"
"If context is missing, state uncertainty and ask concise follow-up questions.\n"
"Do not claim actions were sent; provide draft-only text.\n\n"
f"Task type: {payload.task_type}\n"
f"Goal: {payload.goal}\n"
f"Recipient: {recipient}\n"
f"Tone: {tone}\n"
f"Constraints:\n{constraint_lines}\n\n"
"Retrieved context:\n"
f"{context}\n\n"
"Output only the draft text."
)
async def ollama_generate(prompt: str) -> str:
async with httpx.AsyncClient(timeout=90) as h:
r = await h.post(
f"{OLLAMA_URL}/api/generate",
json={"model": OLLAMA_MODEL, "prompt": prompt, "stream": False},
)
r.raise_for_status()
return (r.json().get("response") or "").strip()
def fallback_draft_text(payload: AssistantDraftPayload) -> str:
recipient = payload.recipient or "there"
tone = (payload.tone or "").lower()
if payload.task_type == "message":
if "friendly" in tone:
return (
f"Hi {recipient},\n\n"
"Thanks for your message. I received it and will follow up tomorrow.\n\n"
"Best,"
)
return (
f"Hello {recipient},\n\n"
"I confirm receipt of your message and will follow up tomorrow.\n\n"
"Regards,"
)
return (
"Draft:\n"
f"{payload.goal}\n\n"
"I can refine this further once retrieval/model services are available."
)
def _extract_json_object_from_text(text: str) -> Dict[str, Any]:
start = text.find("{")
end = text.rfind("}")
if start == -1 or end == -1 or end < start:
raise ValueError("No JSON object found in output")
candidate = text[start : end + 1]
obj = json.loads(candidate)
if not isinstance(obj, dict):
raise ValueError("Parsed value is not a JSON object")
return obj
def _extract_code_text(text: str) -> str:
raw = (text or "").strip()
if raw.startswith("```"):
lines = raw.splitlines()
if lines and lines[0].startswith("```"):
lines = lines[1:]
if lines and lines[-1].strip().startswith("```"):
lines = lines[:-1]
return "\n".join(lines).rstrip() + "\n"
return raw
def build_learn_code_prompt(
objective: str,
note_title: str,
note_text: str,
file_path: str,
current_content: str,
) -> str:
return (
"You are editing one source file in a real codebase.\n"
"Make minimal, correct changes to satisfy the objective.\n"
"Do not add explanations.\n"
"Return ONLY the full updated file content.\n\n"
f"Objective:\n{objective}\n\n"
f"Learn note title:\n{note_title}\n\n"
f"Learn note text:\n{note_text[:4000]}\n\n"
f"File path:\n{file_path}\n\n"
"Current file content:\n"
f"{current_content[:120000]}\n"
)
async def _assistant_write_code_from_learn(
payload: AssistantLearnPayload,
note_title: str,
note_text: str,
) -> Dict[str, Any]:
repo_dir = Path(__file__).resolve().parent
objective = (payload.code_objective or "").strip() or f"Implement changes from learn note: {note_title}"
requested_files = [str(x).strip() for x in (payload.code_files or []) if str(x).strip()]
if not requested_files:
raise HTTPException(status_code=400, detail="code_files is required when write_code=true")
branch_name = (payload.code_branch or "").strip()
if not branch_name:
branch_name = f"assistant/learn-{_slugify(note_title)}-{uuid.uuid4().hex[:6]}"
chk = await _run_local_cmd(["git", "rev-parse", "--is-inside-work-tree"], repo_dir)
if chk["code"] != 0:
raise HTTPException(status_code=500, detail={"message": "Not a git repository", "stderr": chk["stderr"]})
if not payload.code_dry_run:
co = await _run_local_cmd(["git", "checkout", "-B", branch_name], repo_dir)
if co["code"] != 0:
raise HTTPException(
status_code=500,
detail={"message": "Failed to create/switch branch", "stderr": co["stderr"]},
)
changed_files: List[str] = []
skipped_files: List[Dict[str, str]] = []
file_results: List[Dict[str, Any]] = []
for rel in requested_files[:20]:
rel_path = Path(rel)
if rel_path.is_absolute() or ".." in rel_path.parts:
skipped_files.append({"file": rel, "reason": "Only workspace-relative paths are allowed."})
continue
abs_path = (repo_dir / rel_path).resolve()
try:
abs_path.relative_to(repo_dir)
except ValueError:
skipped_files.append({"file": rel, "reason": "Path escapes repository root."})
continue
if not abs_path.exists() or not abs_path.is_file():
skipped_files.append({"file": rel, "reason": "File does not exist."})
continue
current = abs_path.read_text(encoding="utf-8", errors="replace")
prompt = build_learn_code_prompt(
objective=objective,
note_title=note_title,
note_text=note_text,
file_path=rel,
current_content=current,
)
raw = await ollama_generate(prompt)
updated = _extract_code_text(raw)
if not updated.strip():
skipped_files.append({"file": rel, "reason": "Model returned empty content."})
continue
changed = updated != current
file_results.append({"file": rel, "changed": changed})
if not changed:
continue
changed_files.append(rel)
if not payload.code_dry_run:
abs_path.write_text(updated, encoding="utf-8")
commit_hash: Optional[str] = None
commit_detail: Optional[str] = None
if changed_files and not payload.code_dry_run and payload.code_commit:
add = await _run_local_cmd(["git", "add", *changed_files], repo_dir)
if add["code"] != 0:
raise HTTPException(status_code=500, detail={"message": "Failed to add changed files", "stderr": add["stderr"]})
msg = f"feat(assistant): apply learn code update {note_title[:50]}"
cm = await _run_local_cmd(["git", "commit", "-m", msg], repo_dir)
commit_detail = (cm["stdout"] or cm["stderr"] or "").strip()
if cm["code"] == 0:
head = await _run_local_cmd(["git", "rev-parse", "HEAD"], repo_dir)
if head["code"] == 0:
commit_hash = head["stdout"].strip()
return {
"attempted": True,
"objective": objective,
"dry_run": payload.code_dry_run,
"branch_name": branch_name,
"requested_files": requested_files,
"changed_files": changed_files,
"skipped_files": skipped_files,
"file_results": file_results,
"commit": commit_hash,
"commit_detail": commit_detail,
}
def build_assistant_plan_prompt(payload: AssistantPlanPayload, source_docs: List[Dict[str, Any]]) -> str:
constraints = payload.constraints or []
constraint_lines = "\n".join(f"- {c}" for c in constraints) if constraints else "- None"
context_chunks = []
for d in source_docs:
src = d.get("_source", {}) or {}
context_chunks.append(
"\n".join(
[
f"concept_id: {src.get('concept_id', '')}",
f"source_pk: {src.get('source_pk', '')}",
f"source_table: {src.get('source_table', '')}",
f"release_name: {src.get('release_name', '')}",
f"text: {str(src.get('text') or src.get('description') or src.get('summary') or '')[:600]}",
]
)
)
context = "\n\n---\n\n".join(context_chunks) if context_chunks else "No retrieved context."
return (
"You are a cautious personal assistant planner. Produce an execution plan only; do not execute anything.\n"
"Return valid JSON ONLY with this exact shape:\n"
'{'
'"plan": ['
'{"step_id":"S1","title":"...","action_type":"research|draft|ask_user|prepare_data|review","requires_approval":true|false,"notes":"..."}'
"]"
"}\n"
f"Use at most {payload.max_steps} steps.\n"
"Prefer safe read-only and draft actions first.\n\n"
f"Task type: {payload.task_type}\n"
f"Objective: {payload.objective}\n"
f"Constraints:\n{constraint_lines}\n\n"
"Retrieved context:\n"
f"{context}\n"
)
def fallback_plan(payload: AssistantPlanPayload) -> List[AssistantPlanStep]:
return [
AssistantPlanStep(
step_id="S1",
title="Gather relevant facts and constraints",
action_type="research",
requires_approval=False,
notes="Review messages/concepts and identify required context.",
),
AssistantPlanStep(
step_id="S2",
title="Draft a response or action proposal",
action_type="draft",
requires_approval=False,
notes="Produce a concise draft aligned with objective and constraints.",
),
AssistantPlanStep(
step_id="S3",
title="Request user confirmation before any external action",
action_type="ask_user",
requires_approval=True,
notes="Do not send or execute changes until approved.",
),
][: payload.max_steps]
def find_plan_step(plan: List[AssistantPlanStep], step_id: str) -> Optional[AssistantPlanStep]:
for s in plan:
if s.step_id == step_id:
return s
return None
def is_high_risk_step(step: AssistantPlanStep) -> bool:
text = f"{step.title} {step.notes or ''}".lower()
high_risk_terms = [
"send",
"submit",
"pay",
"payment",
"transfer",
"wire",
"sign",
"file",
"delete",
"close account",
"change account",
]
return any(t in text for t in high_risk_terms)
def enforce_step_policy(payload: AssistantExecuteStepPayload, step: AssistantPlanStep) -> Optional[str]:
# Plan-declared approval gate.
if step.requires_approval and not payload.approved:
return "Step requires approval but approved=false."
# Extra hard gate for risky external actions.
if is_high_risk_step(step):
if not payload.approved:
return "High-risk step requires approved=true."
if not (payload.manual_confirm_token and payload.manual_confirm_token.strip()):
return "High-risk step requires manual_confirm_token."
return None
async def execute_plan_step(payload: AssistantExecuteStepPayload, step: AssistantPlanStep) -> Dict[str, Any]:
if step.action_type == "draft":
prompt = (
"Draft concise text for this approved planning step.\n"
f"Task type: {payload.task_type}\n"
f"Objective: {payload.objective}\n"
f"Step: {step.title}\n"
f"Notes: {step.notes or ''}\n"
"Output only final draft text."
)
try:
text = await ollama_generate(prompt)
if not text.strip():
text = f"Draft for step '{step.title}'."
except Exception:
text = f"Draft for step '{step.title}'."
return {"draft": text}
if step.action_type == "research":
return {"note": "Research step acknowledged. Use /search or /assistant/draft for grounded retrieval."}
if step.action_type == "prepare_data":
return {"note": "Prepare-data step acknowledged.", "checklist": ["Collect required inputs", "Normalize format", "Validate completeness"]}
if step.action_type == "review":
return {"note": "Review step requires human review before external action."}
if step.action_type == "ask_user":
return {"question": "Please confirm whether to proceed with the next high-impact action."}
return {"note": "Step recognized but no executor implemented."}
def _gremlin_submit_blocking(q: str, bindings: Dict[str, Any]) -> Any:
# Create/close client per call so transport loop is owned by this worker thread.
c = gremlin_client.Client(
GREMLIN_URL,
"g",
message_serializer=GraphSONSerializersV3d0(),
)
try:
return c.submit(q, bindings).all().result()
finally:
c.close()
async def gremlin_submit(q: str, bindings: Dict[str, Any]) -> Any:
return await asyncio.to_thread(_gremlin_submit_blocking, q, bindings)
async def run_remote_projector(payload: ProjectionTrigger) -> Dict[str, Any]:
parts = [
PROJECTOR_REMOTE_SCRIPT,
"--release-name", payload.release_name,
"--targets", payload.targets,
]
if payload.concept_table:
parts.extend(["--concept-table", payload.concept_table])
if payload.dry_run:
parts.append("--dry-run")
command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}"
ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
proc = await asyncio.create_subprocess_exec(
*ssh_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
raise HTTPException(status_code=504, detail="Projector execution timed out")
out = stdout.decode("utf-8", errors="replace")
err = stderr.decode("utf-8", errors="replace")
spark_read_done = "[STEP] spark_read_done" in out
projection_done = "[STEP] projection_done" in out
result = {
"host": PROJECTOR_SSH_HOST,
"remote_dir": PROJECTOR_REMOTE_DIR,
"exit_code": proc.returncode,
"spark_read_done": spark_read_done,
"projection_done": projection_done,
"stdout_tail": _tail(out),
"stderr_tail": _tail(err),
}
if proc.returncode != 0:
raise HTTPException(status_code=502, detail=result)
return result
def _b64(s: str) -> str:
return base64.b64encode(s.encode("utf-8")).decode("ascii")
async def run_remote_ingest_message(payload: MessageIngestPayload) -> Dict[str, Any]:
sent_at = payload.sent_at or ""
parts = [
INGEST_MESSAGE_REMOTE_SCRIPT,
payload.table,
payload.thread_id,
payload.message_id,
payload.sender,
payload.channel,
sent_at,
_b64(payload.body),
_b64(json.dumps(payload.metadata, ensure_ascii=False)),
]
command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}"
ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
proc = await asyncio.create_subprocess_exec(
*ssh_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
raise HTTPException(status_code=504, detail="Message ingest execution timed out")
out = stdout.decode("utf-8", errors="replace")
err = stderr.decode("utf-8", errors="replace")
result = {
"host": PROJECTOR_SSH_HOST,
"remote_dir": PROJECTOR_REMOTE_DIR,
"exit_code": proc.returncode,
"stdout_tail": _tail(out),
"stderr_tail": _tail(err),
}
if proc.returncode != 0:
raise HTTPException(status_code=502, detail=result)
return result
async def run_remote_ingest_messages_batch(payload: MessageIngestBatchPayload) -> Dict[str, Any]:
rows = []
for m in payload.messages:
rows.append(
{
"thread_id": m.thread_id,
"message_id": m.message_id,
"sender": m.sender,
"channel": m.channel,
"sent_at": m.sent_at,
"body": m.body,
"metadata": m.metadata,
}
)
if not rows:
return {
"host": PROJECTOR_SSH_HOST,
"remote_dir": PROJECTOR_REMOTE_DIR,
"exit_code": 0,
"rows": 0,
"stdout_tail": "[INFO] No rows to ingest",
"stderr_tail": "",
}
local_tmp = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False, encoding="utf-8")
remote_tmp = f"{PROJECTOR_REMOTE_DIR}/.ingest_messages_{uuid.uuid4().hex}.json"
try:
json.dump(rows, local_tmp, ensure_ascii=False)
local_tmp.flush()
local_tmp.close()
scp_target = f"{PROJECTOR_SSH_HOST}:{remote_tmp}"
scp_args = [PROJECTOR_SCP_BIN, *shlex.split(PROJECTOR_SCP_OPTS), local_tmp.name, scp_target]
scp_proc = await asyncio.create_subprocess_exec(
*scp_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
scp_stdout, scp_stderr = await asyncio.wait_for(scp_proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
except asyncio.TimeoutError:
scp_proc.kill()
await scp_proc.wait()
raise HTTPException(status_code=504, detail="Batch payload upload timed out")
if scp_proc.returncode != 0:
raise HTTPException(
status_code=502,
detail={
"host": PROJECTOR_SSH_HOST,
"remote_dir": PROJECTOR_REMOTE_DIR,
"exit_code": scp_proc.returncode,
"stdout_tail": _tail(scp_stdout.decode("utf-8", errors="replace")),
"stderr_tail": _tail(scp_stderr.decode("utf-8", errors="replace")),
},
)
payload_arg = f"@{remote_tmp}"
parts = [
INGEST_MESSAGES_BATCH_REMOTE_SCRIPT,
payload.table,
payload.dedupe_mode,
payload_arg,
]
batch_cmd = " ".join(shlex.quote(p) for p in parts)
command = (
f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && "
f"({batch_cmd}); rc=$?; rm -f {shlex.quote(remote_tmp)}; exit $rc"
)
ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
proc = await asyncio.create_subprocess_exec(
*ssh_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
raise HTTPException(status_code=504, detail="Batch message ingest execution timed out")
finally:
try:
os.unlink(local_tmp.name)
except Exception:
pass
out = stdout.decode("utf-8", errors="replace")
err = stderr.decode("utf-8", errors="replace")
result = {
"host": PROJECTOR_SSH_HOST,
"remote_dir": PROJECTOR_REMOTE_DIR,
"exit_code": proc.returncode,
"rows": len(rows),
"stdout_tail": _tail(out),
"stderr_tail": _tail(err),
}
if proc.returncode != 0:
raise HTTPException(status_code=502, detail=result)
return result
async def run_remote_assistant_feedback(
feedback_id: str,
payload: AssistantFeedbackPayload,
) -> Dict[str, Any]:
confidence = payload.confidence if payload.confidence is not None else 0.0
parts = [
ASSISTANT_FEEDBACK_REMOTE_SCRIPT,
feedback_id,
now_iso(),
payload.outcome,
payload.task_type,
payload.release_name or "",
f"{confidence}",
"true" if payload.needs_review else "false",
_b64(payload.goal or ""),
_b64(payload.draft),
_b64(payload.final_text or ""),
_b64(json.dumps([s.model_dump() for s in payload.sources], ensure_ascii=False)),
_b64(payload.notes or ""),
]
command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}"
ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
proc = await asyncio.create_subprocess_exec(
*ssh_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
raise HTTPException(status_code=504, detail="Assistant feedback execution timed out")
out = stdout.decode("utf-8", errors="replace")
err = stderr.decode("utf-8", errors="replace")
result = {
"host": PROJECTOR_SSH_HOST,
"remote_dir": PROJECTOR_REMOTE_DIR,
"exit_code": proc.returncode,
"stdout_tail": _tail(out),
"stderr_tail": _tail(err),
}
if proc.returncode != 0:
raise HTTPException(status_code=502, detail=result)
return result
def _extract_json_array_from_text(text: str) -> List[Dict[str, Any]]:
start = text.find("[")
end = text.rfind("]")
if start == -1 or end == -1 or end < start:
raise ValueError("No JSON array found in output")
candidate = text[start : end + 1]
obj = json.loads(candidate)
if not isinstance(obj, list):
raise ValueError("Parsed value is not a JSON array")
out: List[Dict[str, Any]] = []
for item in obj:
if isinstance(item, dict):
out.append(item)
return out
async def run_remote_query_assistant_feedback(
outcome: Optional[str],
task_type: Optional[str],
release_name: Optional[str],
limit: int,
) -> Dict[str, Any]:
parts = [
ASSISTANT_FEEDBACK_QUERY_REMOTE_SCRIPT,
outcome or "",
task_type or "",
release_name or "",
str(limit),
]
command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}"
ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
proc = await asyncio.create_subprocess_exec(
*ssh_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
raise HTTPException(status_code=504, detail="Assistant feedback query timed out")
out = stdout.decode("utf-8", errors="replace")
err = stderr.decode("utf-8", errors="replace")
if proc.returncode != 0:
raise HTTPException(
status_code=502,
detail={
"host": PROJECTOR_SSH_HOST,
"remote_dir": PROJECTOR_REMOTE_DIR,
"exit_code": proc.returncode,
"stdout_tail": _tail(out),
"stderr_tail": _tail(err),
},
)
try:
rows = _extract_json_array_from_text(out)
except Exception as e:
raise HTTPException(
status_code=502,
detail={
"message": f"Unable to parse feedback query output: {e}",
"stdout_tail": _tail(out),
"stderr_tail": _tail(err),
},
)
return {
"host": PROJECTOR_SSH_HOST,
"remote_dir": PROJECTOR_REMOTE_DIR,
"rows": rows,
}
async def run_remote_query_assistant_metrics(
task_type: Optional[str],
release_name: Optional[str],
outcome: Optional[str],
group_by: str,
limit: int,
) -> Dict[str, Any]:
parts = [
ASSISTANT_METRICS_QUERY_REMOTE_SCRIPT,
task_type or "",
release_name or "",
outcome or "",
group_by,
str(limit),
]
command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}"
ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
proc = await asyncio.create_subprocess_exec(
*ssh_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
raise HTTPException(status_code=504, detail="Assistant metrics query timed out")
out = stdout.decode("utf-8", errors="replace")
err = stderr.decode("utf-8", errors="replace")
if proc.returncode != 0:
raise HTTPException(
status_code=502,
detail={
"host": PROJECTOR_SSH_HOST,
"remote_dir": PROJECTOR_REMOTE_DIR,
"exit_code": proc.returncode,
"stdout_tail": _tail(out),
"stderr_tail": _tail(err),
},
)
try:
rows = _extract_json_array_from_text(out)
except Exception as e:
raise HTTPException(
status_code=502,
detail={
"message": f"Unable to parse metrics query output: {e}",
"stdout_tail": _tail(out),
"stderr_tail": _tail(err),
},
)
return {
"host": PROJECTOR_SSH_HOST,
"remote_dir": PROJECTOR_REMOTE_DIR,
"rows": rows,
}
async def run_remote_assistant_action(
action_id: str,
payload: AssistantExecuteStepPayload,
step: AssistantPlanStep,
status: str,
output_json: Dict[str, Any],
error_text: Optional[str],
) -> Dict[str, Any]:
parts = [
ASSISTANT_ACTION_REMOTE_SCRIPT,
action_id,
now_iso(),
payload.task_type,
payload.release_name or "",
_b64(payload.objective),
step.step_id,
_b64(step.title),
step.action_type,
"true" if step.requires_approval else "false",
"true" if payload.approved else "false",
status,
_b64(json.dumps(output_json, ensure_ascii=False)),
_b64(error_text or ""),
]
command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}"
ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
proc = await asyncio.create_subprocess_exec(
*ssh_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
raise HTTPException(status_code=504, detail="Assistant action logging timed out")
out = stdout.decode("utf-8", errors="replace")
err = stderr.decode("utf-8", errors="replace")
result = {
"host": PROJECTOR_SSH_HOST,
"remote_dir": PROJECTOR_REMOTE_DIR,
"exit_code": proc.returncode,
"stdout_tail": _tail(out),
"stderr_tail": _tail(err),
}
if proc.returncode != 0:
raise HTTPException(status_code=502, detail=result)
return result
async def run_remote_query_assistant_actions(
status: Optional[str],
task_type: Optional[str],
release_name: Optional[str],
step_id: Optional[str],
action_type: Optional[str],
limit: int,
) -> Dict[str, Any]:
parts = [
ASSISTANT_ACTIONS_QUERY_REMOTE_SCRIPT,
status or "",
task_type or "",
release_name or "",
step_id or "",
action_type or "",
str(limit),
]
command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}"
ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
proc = await asyncio.create_subprocess_exec(
*ssh_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
raise HTTPException(status_code=504, detail="Assistant actions query timed out")
out = stdout.decode("utf-8", errors="replace")
err = stderr.decode("utf-8", errors="replace")
if proc.returncode != 0:
raise HTTPException(
status_code=502,
detail={
"host": PROJECTOR_SSH_HOST,
"remote_dir": PROJECTOR_REMOTE_DIR,
"exit_code": proc.returncode,
"stdout_tail": _tail(out),
"stderr_tail": _tail(err),
},
)
try:
rows = _extract_json_array_from_text(out)
except Exception as e:
raise HTTPException(
status_code=502,
detail={
"message": f"Unable to parse actions query output: {e}",
"stdout_tail": _tail(out),
"stderr_tail": _tail(err),
},
)
return {
"host": PROJECTOR_SSH_HOST,
"remote_dir": PROJECTOR_REMOTE_DIR,
"rows": rows,
}
async def run_remote_record_assistant_proposals(
proposal_set_id: str,
created_at_utc: str,
objective: str,
release_name: Optional[str],
summary: str,
signals: Dict[str, Any],
proposals: List[Dict[str, Any]],
) -> Dict[str, Any]:
parts = [
ASSISTANT_PROPOSALS_REMOTE_SCRIPT,
proposal_set_id,
created_at_utc,
_b64(objective or ""),
release_name or "",
_b64(summary or ""),
_b64(json.dumps(signals or {}, ensure_ascii=False)),
_b64(json.dumps(proposals or [], ensure_ascii=False)),
]
command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}"
ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
proc = await asyncio.create_subprocess_exec(
*ssh_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
raise HTTPException(status_code=504, detail="Assistant proposals logging timed out")
out = stdout.decode("utf-8", errors="replace")
err = stderr.decode("utf-8", errors="replace")
result = {
"host": PROJECTOR_SSH_HOST,
"remote_dir": PROJECTOR_REMOTE_DIR,
"exit_code": proc.returncode,
"stdout_tail": _tail(out),
"stderr_tail": _tail(err),
}
if proc.returncode != 0:
raise HTTPException(status_code=502, detail=result)
return result
async def run_remote_query_assistant_proposals(
release_name: Optional[str],
proposal_set_id: Optional[str],
limit: int,
) -> Dict[str, Any]:
parts = [
ASSISTANT_PROPOSALS_QUERY_REMOTE_SCRIPT,
release_name or "",
proposal_set_id or "",
str(limit),
]
command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}"
ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
proc = await asyncio.create_subprocess_exec(
*ssh_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
raise HTTPException(status_code=504, detail="Assistant proposals query timed out")
out = stdout.decode("utf-8", errors="replace")
err = stderr.decode("utf-8", errors="replace")
if proc.returncode != 0:
raise HTTPException(
status_code=502,
detail={
"host": PROJECTOR_SSH_HOST,
"remote_dir": PROJECTOR_REMOTE_DIR,
"exit_code": proc.returncode,
"stdout_tail": _tail(out),
"stderr_tail": _tail(err),
},
)
try:
rows = _extract_json_array_from_text(out)
except Exception as e:
raise HTTPException(
status_code=502,
detail={
"message": f"Unable to parse proposals query output: {e}",
"stdout_tail": _tail(out),
"stderr_tail": _tail(err),
},
)
return {
"host": PROJECTOR_SSH_HOST,
"remote_dir": PROJECTOR_REMOTE_DIR,
"rows": rows,
}
async def run_remote_record_run(
run_id: str,
run_type: str,
status: str,
started_at_utc: str,
finished_at_utc: str,
actor: str,
input_json: Dict[str, Any],
output_json: Optional[Dict[str, Any]],
error_text: Optional[str],
) -> None:
parts = [
RUNS_REMOTE_SCRIPT,
run_id,
run_type,
status,
started_at_utc,
finished_at_utc,
actor,
_b64(json.dumps(input_json, ensure_ascii=False)),
_b64(json.dumps(output_json, ensure_ascii=False) if output_json is not None else ""),
_b64(error_text or ""),
]
command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}"
ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
proc = await asyncio.create_subprocess_exec(
*ssh_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
if proc.returncode != 0:
raise HTTPException(
status_code=502,
detail={
"message": "Failed to record run in Iceberg",
"host": PROJECTOR_SSH_HOST,
"exit_code": proc.returncode,
"stdout_tail": _tail(stdout.decode("utf-8", errors="replace")),
"stderr_tail": _tail(stderr.decode("utf-8", errors="replace")),
},
)
async def run_remote_record_event(
run_id: str,
event_type: str,
detail_json: Dict[str, Any],
) -> None:
parts = [
RUN_EVENTS_REMOTE_SCRIPT,
run_id,
event_type,
now_iso(),
_b64(json.dumps(detail_json, ensure_ascii=False)),
]
command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}"
ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command]
proc = await asyncio.create_subprocess_exec(
*ssh_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC)
if proc.returncode != 0:
raise HTTPException(
status_code=502,
detail={
"message": "Failed to record run event in Iceberg",
"host": PROJECTOR_SSH_HOST,
"exit_code": proc.returncode,
"stdout_tail": _tail(stdout.decode("utf-8", errors="replace")),
"stderr_tail": _tail(stderr.decode("utf-8", errors="replace")),
},
)
async def record_event_best_effort(run_id: str, event_type: str, detail_json: Dict[str, Any]) -> None:
try:
await run_remote_record_event(run_id, event_type, detail_json)
except Exception as e:
# Event tracing must never break the primary projection flow.
print(f"[WARN] run event logging failed: run_id={run_id} event={event_type} error={e}")
# --------- routes ---------
@app.on_event("startup")
async def startup():
await es_ensure_index()
@app.get("/meta")
async def meta():
return {
"app_name": APP_NAME,
"version": APP_VERSION,
"started_at_utc": APP_STARTED_AT_UTC,
"features": {
"assistant_chat": True,
"assistant_learn": True,
"assistant_tasks_ai": True,
"assistant_ui": True,
},
}
@app.get("/ui", include_in_schema=False)
async def assistant_ui():
index_html = UI_DIR / "index.html"
if not index_html.exists():
raise HTTPException(status_code=404, detail="UI not found")
html = index_html.read_text(encoding="utf-8")
css_path = UI_ASSETS_DIR / "styles.css"
js_path = UI_ASSETS_DIR / "app.js"
css = css_path.read_text(encoding="utf-8") if css_path.exists() else ""
js = js_path.read_text(encoding="utf-8") if js_path.exists() else ""
html = html.replace(
'<link rel="stylesheet" href="/ui/assets/styles.css" />',
f"<style>{css}</style>",
)
html = html.replace(
'<script src="/ui/assets/app.js"></script>',
f"<script>{js}</script>",
)
return HTMLResponse(content=html)
@app.post("/concepts", response_model=ConceptOut)
async def create_concept(payload: ConceptCreate):
created = now_iso()
updated = created
fingerprint = make_fingerprint(payload.canonical_name, payload.kind, payload.external_ids)
# Store long content in IPFS (version 1)
content_doc = {
"canonical_name": payload.canonical_name,
"kind": payload.kind,
"aliases": payload.aliases,
"external_ids": payload.external_ids,
"tags": payload.tags,
"description": payload.description,
"created_at": created,
}
latest_cid = await ipfs_add_json(content_doc)
# LLM extras (optional)
summary = None
embedding: List[float] = []
base_text = payload.description or payload.canonical_name
if base_text.strip():
try:
summary = await ollama_summary(base_text)
except Exception:
summary = None
try:
embedding = await ollama_embed(base_text)
except Exception:
embedding = []
# Create vertex in JanusGraph (Concept)
# Uses a stable concept_id as your canonical handle
concept_id = (await gremlin_submit(
"""
import java.util.UUID
def id = UUID.randomUUID().toString()
g.addV('Concept')
.property('concept_id', id)
.property('canonical_name', canonical_name)
.property('kind', kind)
.property('aliases', aliases_json)
.property('external_ids', external_ids_json)
.property('tags', tags_json)
.property('fingerprint', fingerprint)
.property('latest_cid', latest_cid)
.property('summary', summary)
.property('created_at', created_at)
.property('updated_at', updated_at)
.values('concept_id')
""",
{
"canonical_name": payload.canonical_name,
"kind": payload.kind or "",
"aliases_json": json.dumps(payload.aliases, ensure_ascii=False),
"external_ids_json": json.dumps(payload.external_ids, ensure_ascii=False),
"tags_json": json.dumps(payload.tags, ensure_ascii=False),
"fingerprint": fingerprint,
"latest_cid": latest_cid,
"summary": summary or "",
"created_at": created,
"updated_at": updated,
},
))[0]
out = {
"concept_id": concept_id,
"canonical_name": payload.canonical_name,
"kind": payload.kind,
"aliases": payload.aliases,
"external_ids": payload.external_ids,
"tags": payload.tags,
"latest_cid": latest_cid,
"summary": summary,
"created_at": created,
"updated_at": updated,
}
# Index in Elasticsearch
doc = dict(out)
doc["fingerprint"] = fingerprint
if embedding:
doc["embedding"] = embedding
await es_index(doc)
return out
@app.get("/concepts/{concept_id}", response_model=ConceptOut)
async def get_concept(concept_id: str):
rows = await gremlin_submit(
"""
g.V().hasLabel('Concept').has('concept_id', concept_id)
.project('concept_id','canonical_name','kind','aliases','external_ids','tags','latest_cid','summary','created_at','updated_at')
.by(values('concept_id'))
.by(values('canonical_name'))
.by(values('kind'))
.by(values('aliases'))
.by(values('external_ids'))
.by(values('tags'))
.by(values('latest_cid'))
.by(values('summary'))
.by(values('created_at'))
.by(values('updated_at'))
""",
{"concept_id": concept_id},
)
if not rows:
raise HTTPException(status_code=404, detail="Concept not found")
r = rows[0]
external_ids = {}
try:
external_ids = json.loads(r.get("external_ids") or "{}")
except Exception:
external_ids = {}
aliases = []
try:
aliases = json.loads(r.get("aliases") or "[]")
if not isinstance(aliases, list):
aliases = []
except Exception:
aliases = []
tags = []
try:
tags = json.loads(r.get("tags") or "[]")
if not isinstance(tags, list):
tags = []
except Exception:
tags = []
return {
"concept_id": r.get("concept_id"),
"canonical_name": r.get("canonical_name"),
"kind": (r.get("kind") or None),
"aliases": aliases,
"external_ids": external_ids,
"tags": tags,
"latest_cid": (r.get("latest_cid") or None),
"summary": (r.get("summary") or None),
"created_at": r.get("created_at"),
"updated_at": r.get("updated_at"),
}
@app.get("/search")
async def search(q: str, size: int = 10):
results = await es_search(q, size=size)
return {"q": q, "results": results}
@app.get("/assistant/inbox")
async def assistant_inbox(
release_name: Optional[str] = None,
q: Optional[str] = None,
limit: int = 20,
x_admin_api_key: Optional[str] = Header(default=None),
):
check_admin_api_key(x_admin_api_key)
bounded_limit = max(1, min(limit, 200))
hits = await es_recent_messages(
size=bounded_limit,
release_name=release_name,
q=q,
)
rows: List[Dict[str, Any]] = []
for h in hits:
src = h.get("_source", {}) or {}
rows.append(
{
"concept_id": src.get("concept_id"),
"source_pk": src.get("source_pk"),
"source_table": src.get("source_table"),
"release_name": src.get("release_name"),
"concept_type": src.get("concept_type") or src.get("kind"),
"display_name": src.get("display_name") or src.get("canonical_name"),
"text": src.get("text"),
"summary": src.get("summary"),
"description": src.get("description"),
"updated_at": src.get("updated_at"),
"score": float(h.get("_score")) if h.get("_score") is not None else None,
}
)
return {
"count": len(rows),
"filters": {
"release_name": release_name,
"q": q,
"limit": bounded_limit,
},
"rows": rows,
}
@app.get("/assistant/tasks")
async def assistant_tasks(
release_name: Optional[str] = None,
q: Optional[str] = None,
only_pending: bool = True,
use_ai: bool = True,
limit: int = 50,
x_admin_api_key: Optional[str] = Header(default=None),
):
check_admin_api_key(x_admin_api_key)
bounded_limit = max(1, min(limit, 500))
# Pull more messages than final task limit because each message can yield 0..N tasks.
hits = await es_recent_messages(
size=min(1000, max(100, bounded_limit * 4)),
release_name=release_name,
q=q,
)
rows: List[Dict[str, Any]] = []
seen_task_ids: set[str] = set()
ai_cache_hits = 0
ai_calls = 0
for h in hits:
src = h.get("_source", {}) or {}
extracted: List[Dict[str, Any]] = []
if use_ai:
cache_key = _task_ai_cache_key(src)
cached = _task_ai_cache_get(cache_key)
if cached is not None:
ai_cache_hits += 1
extracted = cached
else:
ai_calls += 1
try:
extracted = await extract_pending_tasks_from_source_ai(src)
except Exception as e:
print(f"[WARN] assistant_tasks ai extraction failed: {e}")
extracted = []
_task_ai_cache_set(cache_key, extracted)
if not extracted:
extracted = extract_pending_tasks_from_source(src)
for t in extracted:
if only_pending and t.get("status") != "pending":
continue
task_id = str(t.get("task_id") or "")
if not task_id or task_id in seen_task_ids:
continue
seen_task_ids.add(task_id)
rows.append(t)
if len(rows) >= bounded_limit:
break
if len(rows) >= bounded_limit:
break
return {
"count": len(rows),
"filters": {
"release_name": release_name,
"q": q,
"only_pending": only_pending,
"use_ai": use_ai,
"limit": bounded_limit,
"ai_cache_ttl_sec": TASK_AI_CACHE_TTL_SEC,
},
"stats": {
"messages_scanned": len(hits),
"ai_cache_hits": ai_cache_hits,
"ai_calls": ai_calls,
"ai_cache_size": len(TASK_AI_CACHE),
},
"rows": rows,
}
@app.post("/assistant/learn")
async def assistant_learn(payload: AssistantLearnPayload, x_admin_api_key: Optional[str] = Header(default=None)):
check_admin_api_key(x_admin_api_key)
now = now_iso()
note_id = "note-" + uuid.uuid4().hex[:16]
title = (payload.title or "").strip()
if not title:
title = payload.text.strip().splitlines()[0][:80]
summary = payload.text.strip()[:280]
doc = {
"concept_id": f"note:{note_id}",
"concept_type": "note",
"display_name": title,
"description": summary,
"text": payload.text,
"source_table": "assistant.learn",
"source_pk": note_id,
"release_name": payload.release_name or "",
"ref_hash": "",
"attributes_json": json.dumps(payload.metadata or {}, ensure_ascii=False, sort_keys=True),
"canonical_name": title,
"kind": "note",
"aliases": [],
"external_ids": {},
"tags": payload.tags or [],
"latest_cid": None,
"summary": summary,
"created_at": now,
"updated_at": now,
"fingerprint": make_fingerprint(title, "note", {}),
}
await es_index(doc)
response: Dict[str, Any] = {
"stored": True,
"concept_id": doc["concept_id"],
"release_name": payload.release_name,
"title": title,
"tags": payload.tags,
}
if payload.write_code:
response["code_write"] = await _assistant_write_code_from_learn(
payload=payload,
note_title=title,
note_text=payload.text,
)
return response
@app.post("/assistant/chat", response_model=AssistantChatResponse)
async def assistant_chat(payload: AssistantChatPayload, x_admin_api_key: Optional[str] = Header(default=None)):
check_admin_api_key(x_admin_api_key)
session_id = (payload.session_id or str(uuid.uuid4())).strip()
history: List[Dict[str, str]] = []
if payload.history:
history = [{"role": h.role, "content": h.content} for h in payload.history if h.content.strip()]
else:
history = ASSISTANT_CHAT_SESSIONS.get(session_id, [])
hits: List[Dict[str, Any]] = []
try:
hits = await es_search_hits(
q=payload.message,
size=payload.max_sources,
release_name=payload.release_name,
)
except Exception as e:
print(f"[WARN] assistant_chat retrieval failed: {e}")
hits = []
if not hits and payload.release_name:
try:
hits = await es_recent_by_release(payload.release_name, size=payload.max_sources)
except Exception as e:
print(f"[WARN] assistant_chat release fallback failed: {e}")
if not hits:
try:
hits = await es_recent_messages(size=payload.max_sources, release_name=payload.release_name, q=None)
except Exception as e:
print(f"[WARN] assistant_chat inbox fallback failed: {e}")
prompt = build_chat_prompt(
user_message=payload.message,
history=history,
source_docs=hits,
release_name=payload.release_name,
)
try:
answer = await ollama_generate(prompt)
if not answer.strip():
answer = "I don't have enough context to answer confidently. Can you share one more detail?"
except Exception as e:
print(f"[WARN] assistant_chat generation failed: {e}")
answer = "I could not generate a response right now. Please retry."
sources: List[AssistantDraftSource] = []
for h in hits:
src = h.get("_source", {}) or {}
sources.append(
AssistantDraftSource(
concept_id=str(src.get("concept_id") or ""),
source_pk=src.get("source_pk"),
source_table=src.get("source_table"),
release_name=src.get("release_name"),
score=float(h.get("_score")) if h.get("_score") is not None else None,
)
)
source_count = len([s for s in sources if s.concept_id])
confidence = 0.35
if source_count >= 5:
confidence = 0.85
elif source_count >= 3:
confidence = 0.75
elif source_count >= 1:
confidence = 0.6
if len(answer.strip()) < 30:
confidence = min(confidence, 0.45)
_append_chat_turn(session_id, "user", payload.message)
_append_chat_turn(session_id, "assistant", answer)
return AssistantChatResponse(
session_id=session_id,
answer=answer,
sources=[s for s in sources if s.concept_id],
confidence=confidence,
release_name=payload.release_name,
)
@app.post("/assistant/self-improve", response_model=AssistantSelfImproveResponse)
async def assistant_self_improve(
payload: AssistantSelfImprovePayload,
x_admin_api_key: Optional[str] = Header(default=None),
):
check_admin_api_key(x_admin_api_key)
feedback_rows: List[Dict[str, Any]] = []
if payload.include_edited_feedback:
try:
res = await run_remote_query_assistant_feedback(
outcome="edited",
task_type=None,
release_name=payload.release_name,
limit=payload.feedback_limit,
)
feedback_rows.extend(res.get("rows", []))
except Exception as e:
print(f"[WARN] self-improve edited feedback query failed: {e}")
if payload.include_rejected_feedback:
try:
res = await run_remote_query_assistant_feedback(
outcome="rejected",
task_type=None,
release_name=payload.release_name,
limit=payload.feedback_limit,
)
feedback_rows.extend(res.get("rows", []))
except Exception as e:
print(f"[WARN] self-improve rejected feedback query failed: {e}")
action_rows: List[Dict[str, Any]] = []
if payload.include_blocked_actions:
try:
res = await run_remote_query_assistant_actions(
status="blocked",
task_type=None,
release_name=payload.release_name,
step_id=None,
action_type=None,
limit=payload.action_limit,
)
action_rows.extend(res.get("rows", []))
except Exception as e:
print(f"[WARN] self-improve blocked actions query failed: {e}")
# De-duplicate simple repeats.
seen_feedback: set[str] = set()
dedup_feedback: List[Dict[str, Any]] = []
for r in feedback_rows:
key = str(r.get("feedback_id") or "") or hashlib.sha256(
json.dumps(r, ensure_ascii=False, sort_keys=True).encode("utf-8")
).hexdigest()
if key in seen_feedback:
continue
seen_feedback.add(key)
dedup_feedback.append(r)
feedback_rows = dedup_feedback[: payload.feedback_limit]
seen_actions: set[str] = set()
dedup_actions: List[Dict[str, Any]] = []
for r in action_rows:
key = str(r.get("action_id") or "") or hashlib.sha256(
json.dumps(r, ensure_ascii=False, sort_keys=True).encode("utf-8")
).hexdigest()
if key in seen_actions:
continue
seen_actions.add(key)
dedup_actions.append(r)
action_rows = dedup_actions[: payload.action_limit]
prompt = build_self_improve_prompt(payload, feedback_rows, action_rows)
proposals: List[AssistantSelfImproveProposal] = []
summary = "Generated from fallback proposal set."
try:
raw = await ollama_generate(prompt)
parsed = _extract_json_object_from_text(raw)
summary = str(parsed.get("summary") or "").strip() or summary
raw_proposals = parsed.get("proposals")
if isinstance(raw_proposals, list):
for idx, p in enumerate(raw_proposals[: payload.max_proposals]):
if not isinstance(p, dict):
continue
risk_raw = str(p.get("risk") or "medium").strip().lower()
risk: Literal["low", "medium", "high"] = "medium"
if risk_raw in ("low", "medium", "high"):
risk = risk_raw # type: ignore[assignment]
files = [str(x) for x in (p.get("files") or []) if str(x).strip()]
tests = [str(x) for x in (p.get("tests") or []) if str(x).strip()]
proposals.append(
AssistantSelfImproveProposal(
proposal_id=str(p.get("proposal_id") or f"P{idx+1}"),
title=str(p.get("title") or "").strip() or f"Proposal {idx+1}",
problem=str(p.get("problem") or "").strip() or "Unspecified problem.",
change=str(p.get("change") or "").strip() or "No change details provided.",
files=files,
risk=risk,
tests=tests,
auto_apply_safe=bool(p.get("auto_apply_safe", False)),
)
)
except Exception as e:
print(f"[WARN] self-improve generation failed: {e}")
if not proposals:
proposals = fallback_self_improve_proposals()[: payload.max_proposals]
created_at_utc = now_iso()
proposal_set_id = str(uuid.uuid4())
signals = {
"feedback_rows": len(feedback_rows),
"blocked_action_rows": len(action_rows),
}
proposal_dicts = [p.model_dump() for p in proposals[: payload.max_proposals]]
try:
await run_remote_record_assistant_proposals(
proposal_set_id=proposal_set_id,
created_at_utc=created_at_utc,
objective=payload.objective,
release_name=payload.release_name,
summary=summary,
signals=signals,
proposals=proposal_dicts,
)
except Exception as e:
print(f"[WARN] self-improve proposal persistence failed: {e}")
apply_blocked = payload.apply
apply_block_reason: Optional[str] = None
if payload.apply:
apply_block_reason = "Auto-apply is not enabled yet. Use proposals as PR/patch inputs."
return AssistantSelfImproveResponse(
objective=payload.objective,
release_name=payload.release_name,
proposal_set_id=proposal_set_id,
created_at_utc=created_at_utc,
summary=summary,
proposals=proposals[: payload.max_proposals],
signals=signals,
apply_blocked=apply_blocked,
apply_block_reason=apply_block_reason,
)
@app.get("/assistant/self-improve/history")
async def assistant_self_improve_history(
release_name: Optional[str] = None,
proposal_set_id: Optional[str] = None,
limit: int = 200,
x_admin_api_key: Optional[str] = Header(default=None),
):
check_admin_api_key(x_admin_api_key)
bounded_limit = max(1, min(limit, 2000))
result = await run_remote_query_assistant_proposals(
release_name=release_name,
proposal_set_id=proposal_set_id,
limit=bounded_limit,
)
rows = result["rows"]
grouped: Dict[str, Dict[str, Any]] = {}
for r in rows:
set_id = str(r.get("proposal_set_id") or "")
if not set_id:
continue
g = grouped.get(set_id)
if g is None:
signals_json = r.get("signals_json") or "{}"
try:
signals = json.loads(signals_json)
if not isinstance(signals, dict):
signals = {}
except Exception:
signals = {}
g = {
"proposal_set_id": set_id,
"created_at_utc": r.get("created_at_utc"),
"objective": r.get("objective"),
"release_name": r.get("release_name"),
"summary": r.get("summary"),
"signals": signals,
"proposals": [],
}
grouped[set_id] = g
files_json = r.get("files_json") or "[]"
tests_json = r.get("tests_json") or "[]"
try:
files = json.loads(files_json)
if not isinstance(files, list):
files = []
except Exception:
files = []
try:
tests = json.loads(tests_json)
if not isinstance(tests, list):
tests = []
except Exception:
tests = []
g["proposals"].append(
{
"proposal_id": r.get("proposal_id"),
"title": r.get("title"),
"problem": r.get("problem"),
"change": r.get("change_text"),
"files": files,
"risk": r.get("risk") or "medium",
"tests": tests,
"auto_apply_safe": bool(r.get("auto_apply_safe", False)),
}
)
sets = list(grouped.values())
sets.sort(key=lambda x: str(x.get("created_at_utc") or ""), reverse=True)
return {
"count": len(sets),
"filters": {
"release_name": release_name,
"proposal_set_id": proposal_set_id,
"limit": bounded_limit,
},
"rows": sets,
}
@app.post("/assistant/self-improve/apply", response_model=AssistantSelfImproveApplyResponse)
async def assistant_self_improve_apply(
payload: AssistantSelfImproveApplyPayload,
x_admin_api_key: Optional[str] = Header(default=None),
):
check_admin_api_key(x_admin_api_key)
repo_dir = Path(__file__).resolve().parent
proposal: Optional[AssistantSelfImproveProposal] = payload.proposal
if proposal is None:
if not (payload.proposal_set_id and payload.proposal_id):
raise HTTPException(
status_code=400,
detail="Provide either proposal object or both proposal_set_id and proposal_id.",
)
hist = await run_remote_query_assistant_proposals(
release_name=payload.release_name,
proposal_set_id=payload.proposal_set_id,
limit=500,
)
matched: Optional[AssistantSelfImproveProposal] = None
for r in hist.get("rows", []):
if str(r.get("proposal_id") or "") != payload.proposal_id:
continue
files_json = r.get("files_json") or "[]"
tests_json = r.get("tests_json") or "[]"
try:
files = json.loads(files_json)
if not isinstance(files, list):
files = []
except Exception:
files = []
try:
tests = json.loads(tests_json)
if not isinstance(tests, list):
tests = []
except Exception:
tests = []
risk_raw = str(r.get("risk") or "medium").strip().lower()
risk: Literal["low", "medium", "high"] = "medium"
if risk_raw in ("low", "medium", "high"):
risk = risk_raw # type: ignore[assignment]
matched = AssistantSelfImproveProposal(
proposal_id=str(r.get("proposal_id") or payload.proposal_id),
title=str(r.get("title") or ""),
problem=str(r.get("problem") or ""),
change=str(r.get("change_text") or ""),
files=[str(x) for x in files if str(x).strip()],
risk=risk,
tests=[str(x) for x in tests if str(x).strip()],
auto_apply_safe=bool(r.get("auto_apply_safe", False)),
)
break
if matched is None:
raise HTTPException(
status_code=404,
detail=f"Proposal not found for set_id={payload.proposal_set_id} proposal_id={payload.proposal_id}",
)
proposal = matched
assert proposal is not None
branch_slug = _slugify(f"{proposal.proposal_id}-{proposal.title}")
branch_name = f"assistant/{branch_slug}"
proposals_dir = repo_dir / "proposals"
proposals_dir.mkdir(parents=True, exist_ok=True)
proposal_file = proposals_dir / f"{branch_slug}.md"
commands: List[str] = [
f"git -C {repo_dir} rev-parse --is-inside-work-tree",
f"git -C {repo_dir} checkout -B {branch_name}",
f"write {proposal_file}",
f"git -C {repo_dir} add {proposal_file}",
f"git -C {repo_dir} commit -m 'chore(self-improve): apply {proposal.proposal_id} {proposal.title}'",
]
md = "\n".join(
[
f"# {proposal.proposal_id}: {proposal.title}",
"",
f"- Objective: {payload.objective or ''}",
f"- Release: {payload.release_name or ''}",
f"- Risk: {proposal.risk}",
f"- Auto apply safe: {proposal.auto_apply_safe}",
"",
"## Problem",
proposal.problem,
"",
"## Change",
proposal.change,
"",
"## Files",
*(f"- `{f}`" for f in (proposal.files or [])),
"",
"## Tests",
*(f"- {t}" for t in (proposal.tests or [])),
"",
f"_Generated at {now_iso()}_",
"",
]
)
if payload.dry_run:
return AssistantSelfImproveApplyResponse(
applied=False,
dry_run=True,
repo_dir=str(repo_dir),
branch_name=branch_name,
proposal_file=str(proposal_file.relative_to(repo_dir)),
commit=None,
commands=commands,
detail="Dry-run only. No branch or commit created.",
)
chk = await _run_local_cmd(["git", "rev-parse", "--is-inside-work-tree"], repo_dir)
if chk["code"] != 0:
raise HTTPException(status_code=500, detail={"message": "Not a git repository", "stderr": chk["stderr"]})
co = await _run_local_cmd(["git", "checkout", "-B", branch_name], repo_dir)
if co["code"] != 0:
raise HTTPException(status_code=500, detail={"message": "Failed to create/switch branch", "stderr": co["stderr"]})
proposal_file.write_text(md, encoding="utf-8")
add = await _run_local_cmd(["git", "add", str(proposal_file.relative_to(repo_dir))], repo_dir)
if add["code"] != 0:
raise HTTPException(status_code=500, detail={"message": "Failed to add proposal file", "stderr": add["stderr"]})
commit_msg = f"chore(self-improve): apply {proposal.proposal_id} {proposal.title}"
cm = await _run_local_cmd(["git", "commit", "-m", commit_msg], repo_dir)
if cm["code"] != 0:
# Common case: nothing to commit. still return branch + file info.
detail = (cm["stderr"] or cm["stdout"] or "").strip()
head = await _run_local_cmd(["git", "rev-parse", "HEAD"], repo_dir)
commit = head["stdout"].strip() if head["code"] == 0 else None
return AssistantSelfImproveApplyResponse(
applied=True,
dry_run=False,
repo_dir=str(repo_dir),
branch_name=branch_name,
proposal_file=str(proposal_file.relative_to(repo_dir)),
commit=commit,
commands=commands,
detail=detail or "No new commit created (possibly no diff).",
)
head = await _run_local_cmd(["git", "rev-parse", "HEAD"], repo_dir)
commit = head["stdout"].strip() if head["code"] == 0 else None
return AssistantSelfImproveApplyResponse(
applied=True,
dry_run=False,
repo_dir=str(repo_dir),
branch_name=branch_name,
proposal_file=str(proposal_file.relative_to(repo_dir)),
commit=commit,
commands=commands,
detail="Branch and proposal commit created.",
)
@app.post("/assistant/draft", response_model=AssistantDraftResponse)
async def assistant_draft(payload: AssistantDraftPayload, x_admin_api_key: Optional[str] = Header(default=None)):
check_admin_api_key(x_admin_api_key)
retrieval_query = " ".join(
[
payload.goal,
payload.recipient or "",
payload.task_type,
" ".join(payload.constraints),
]
).strip()
try:
hits = await es_search_hits(
q=retrieval_query,
size=payload.max_sources,
release_name=payload.release_name,
)
except Exception as e:
print(f"[WARN] assistant_draft retrieval failed: {e}")
hits = []
if not hits and payload.release_name:
try:
hits = await es_recent_by_release(payload.release_name, size=payload.max_sources)
except Exception as e:
print(f"[WARN] assistant_draft release fallback retrieval failed: {e}")
if not hits and payload.release_name:
try:
hits = await es_recent_messages(
size=payload.max_sources,
release_name=payload.release_name,
q=None,
)
except Exception as e:
print(f"[WARN] assistant_draft inbox fallback retrieval failed: {e}")
if not hits:
try:
hits = await es_recent_messages(
size=payload.max_sources,
release_name=None,
q=None,
)
except Exception as e:
print(f"[WARN] assistant_draft global fallback retrieval failed: {e}")
prompt = build_assistant_prompt(payload, hits)
used_fallback = False
try:
draft = await ollama_generate(prompt)
if not draft.strip():
used_fallback = True
draft = fallback_draft_text(payload)
except Exception as e:
print(f"[WARN] assistant_draft generation failed: {e}")
used_fallback = True
draft = fallback_draft_text(payload)
sources: List[AssistantDraftSource] = []
for h in hits:
src = h.get("_source", {}) or {}
sources.append(
AssistantDraftSource(
concept_id=str(src.get("concept_id") or ""),
source_pk=src.get("source_pk"),
source_table=src.get("source_table"),
release_name=src.get("release_name"),
score=float(h.get("_score")) if h.get("_score") is not None else None,
)
)
source_count = len([s for s in sources if s.concept_id])
if source_count >= 5:
confidence = 0.85
elif source_count >= 3:
confidence = 0.75
elif source_count >= 1:
confidence = 0.6
else:
confidence = 0.35
if len(draft.strip()) < 40:
confidence = min(confidence, 0.45)
if used_fallback:
confidence = min(confidence, 0.4)
source_ids = [s.concept_id for s in sources if s.concept_id]
citation_required = confidence < 0.75 or used_fallback
if citation_required and source_ids:
already_cited = any(cid in draft for cid in source_ids)
if not already_cited:
cited = ", ".join(source_ids[:3])
draft = f"{draft.rstrip()}\n\nSources: {cited}"
return AssistantDraftResponse(
task_type=payload.task_type,
draft=draft,
sources=[s for s in sources if s.concept_id],
confidence=confidence,
needs_review=True,
release_name=payload.release_name,
)
@app.post("/assistant/plan", response_model=AssistantPlanResponse)
async def assistant_plan(payload: AssistantPlanPayload, x_admin_api_key: Optional[str] = Header(default=None)):
check_admin_api_key(x_admin_api_key)
retrieval_query = " ".join([payload.objective, payload.task_type, " ".join(payload.constraints)]).strip()
try:
hits = await es_search_hits(
q=retrieval_query,
size=payload.max_sources,
release_name=payload.release_name,
)
except Exception as e:
print(f"[WARN] assistant_plan retrieval failed: {e}")
hits = []
if not hits and payload.release_name:
try:
hits = await es_recent_by_release(payload.release_name, size=payload.max_sources)
except Exception as e:
print(f"[WARN] assistant_plan release fallback retrieval failed: {e}")
prompt = build_assistant_plan_prompt(payload, hits)
used_fallback = False
plan_steps: List[AssistantPlanStep] = []
try:
raw = await ollama_generate(prompt)
parsed = _extract_json_object_from_text(raw)
raw_steps = parsed.get("plan")
if isinstance(raw_steps, list):
allowed_action_types = {"research", "draft", "ask_user", "prepare_data", "review"}
for i, s in enumerate(raw_steps[: payload.max_steps]):
if not isinstance(s, dict):
continue
step_id = str(s.get("step_id") or f"S{i+1}").strip() or f"S{i+1}"
title = str(s.get("title") or "").strip()
if not title:
continue
at = str(s.get("action_type") or "research").strip()
action_type = at if at in allowed_action_types else "research"
requires_approval = bool(s.get("requires_approval", False))
notes = str(s.get("notes")).strip() if s.get("notes") is not None else None
plan_steps.append(
AssistantPlanStep(
step_id=step_id,
title=title,
action_type=action_type, # type: ignore[arg-type]
requires_approval=requires_approval,
notes=notes,
)
)
except Exception as e:
print(f"[WARN] assistant_plan generation failed: {e}")
used_fallback = True
if not plan_steps:
used_fallback = True
plan_steps = fallback_plan(payload)
sources: List[AssistantDraftSource] = []
for h in hits:
src = h.get("_source", {}) or {}
sources.append(
AssistantDraftSource(
concept_id=str(src.get("concept_id") or ""),
source_pk=src.get("source_pk"),
source_table=src.get("source_table"),
release_name=src.get("release_name"),
score=float(h.get("_score")) if h.get("_score") is not None else None,
)
)
source_count = len([s for s in sources if s.concept_id])
if source_count >= 5:
confidence = 0.85
elif source_count >= 3:
confidence = 0.75
elif source_count >= 1:
confidence = 0.6
else:
confidence = 0.35
if used_fallback:
confidence = min(confidence, 0.45)
return AssistantPlanResponse(
objective=payload.objective,
task_type=payload.task_type,
plan=plan_steps,
sources=[s for s in sources if s.concept_id],
needs_review=True,
confidence=confidence,
release_name=payload.release_name,
)
@app.post("/assistant/execute-step", response_model=AssistantExecuteStepResponse)
async def assistant_execute_step(payload: AssistantExecuteStepPayload, x_admin_api_key: Optional[str] = Header(default=None)):
check_admin_api_key(x_admin_api_key)
action_id = str(uuid.uuid4())
started_at_utc = now_iso()
run_input = payload.model_dump()
await record_event_best_effort(action_id, "started", {"step_id": payload.step_id, "approved": payload.approved})
step = find_plan_step(payload.plan, payload.step_id)
if step is None:
error_text = f"Step '{payload.step_id}' not found in plan."
await run_remote_assistant_action(
action_id=action_id,
payload=payload,
step=AssistantPlanStep(
step_id=payload.step_id,
title="missing-step",
action_type="review",
requires_approval=True,
notes=error_text,
),
status="blocked",
output_json={"reason": error_text},
error_text=error_text,
)
await run_remote_record_run(
run_id=action_id,
run_type="assistant_execute_step",
status="failed",
started_at_utc=started_at_utc,
finished_at_utc=now_iso(),
actor="admin-api",
input_json=run_input,
output_json={"reason": error_text},
error_text=error_text,
)
raise HTTPException(status_code=400, detail=error_text)
policy_error = enforce_step_policy(payload, step)
if policy_error:
out = {"policy_blocked": True, "reason": policy_error}
await record_event_best_effort(action_id, "policy_blocked", {"step_id": step.step_id, "reason": policy_error})
await run_remote_assistant_action(
action_id=action_id,
payload=payload,
step=step,
status="blocked",
output_json=out,
error_text=policy_error,
)
await run_remote_record_run(
run_id=action_id,
run_type="assistant_execute_step",
status="failed",
started_at_utc=started_at_utc,
finished_at_utc=now_iso(),
actor="admin-api",
input_json=run_input,
output_json=out,
error_text=policy_error,
)
return AssistantExecuteStepResponse(
action_id=action_id,
step_id=step.step_id,
status="blocked",
output=out,
needs_review=True,
)
try:
output = await execute_plan_step(payload, step)
await record_event_best_effort(action_id, "executed", {"step_id": step.step_id, "action_type": step.action_type})
await run_remote_assistant_action(
action_id=action_id,
payload=payload,
step=step,
status="executed",
output_json=output,
error_text=None,
)
await run_remote_record_run(
run_id=action_id,
run_type="assistant_execute_step",
status="succeeded",
started_at_utc=started_at_utc,
finished_at_utc=now_iso(),
actor="admin-api",
input_json=run_input,
output_json=output,
error_text=None,
)
await record_event_best_effort(action_id, "recorded", {"status": "succeeded"})
return AssistantExecuteStepResponse(
action_id=action_id,
step_id=step.step_id,
status="executed",
output=output,
needs_review=True,
)
except Exception as e:
err_text = str(e)
await run_remote_assistant_action(
action_id=action_id,
payload=payload,
step=step,
status="blocked",
output_json={"error": err_text},
error_text=err_text,
)
await run_remote_record_run(
run_id=action_id,
run_type="assistant_execute_step",
status="failed",
started_at_utc=started_at_utc,
finished_at_utc=now_iso(),
actor="admin-api",
input_json=run_input,
output_json=None,
error_text=err_text,
)
await record_event_best_effort(action_id, "recorded", {"status": "failed"})
raise HTTPException(status_code=500, detail=err_text)
@app.post("/assistant/feedback")
async def assistant_feedback(payload: AssistantFeedbackPayload, x_admin_api_key: Optional[str] = Header(default=None)):
check_admin_api_key(x_admin_api_key)
run_id = str(uuid.uuid4())
run_input = payload.model_dump()
started_at_utc = now_iso()
await record_event_best_effort(
run_id,
"started",
{"input": {"outcome": payload.outcome, "task_type": payload.task_type, "release_name": payload.release_name}},
)
try:
result = await run_remote_assistant_feedback(run_id, payload)
await record_event_best_effort(
run_id,
"feedback_recorded",
{"outcome": payload.outcome, "task_type": payload.task_type},
)
await run_remote_record_run(
run_id=run_id,
run_type="assistant_feedback",
status="succeeded",
started_at_utc=started_at_utc,
finished_at_utc=now_iso(),
actor="admin-api",
input_json=run_input,
output_json=result,
error_text=None,
)
await record_event_best_effort(run_id, "recorded", {"status": "succeeded"})
return {"run_id": run_id, "result": result}
except HTTPException as e:
err_text = e.detail if isinstance(e.detail, str) else json.dumps(e.detail, ensure_ascii=False)
await run_remote_record_run(
run_id=run_id,
run_type="assistant_feedback",
status="failed",
started_at_utc=started_at_utc,
finished_at_utc=now_iso(),
actor="admin-api",
input_json=run_input,
output_json=None,
error_text=err_text,
)
await record_event_best_effort(run_id, "recorded", {"status": "failed"})
raise
except Exception as e:
await run_remote_record_run(
run_id=run_id,
run_type="assistant_feedback",
status="failed",
started_at_utc=started_at_utc,
finished_at_utc=now_iso(),
actor="admin-api",
input_json=run_input,
output_json=None,
error_text=str(e),
)
await record_event_best_effort(run_id, "recorded", {"status": "failed"})
raise
@app.get("/assistant/feedback")
async def assistant_feedback_list(
outcome: Optional[Literal["accepted", "edited", "rejected"]] = None,
task_type: Optional[Literal["message", "finance", "gov", "general"]] = None,
release_name: Optional[str] = None,
limit: int = 50,
x_admin_api_key: Optional[str] = Header(default=None),
):
check_admin_api_key(x_admin_api_key)
bounded_limit = max(1, min(limit, 500))
result = await run_remote_query_assistant_feedback(
outcome=outcome,
task_type=task_type,
release_name=release_name,
limit=bounded_limit,
)
rows = result["rows"]
return {
"count": len(rows),
"filters": {
"outcome": outcome,
"task_type": task_type,
"release_name": release_name,
"limit": bounded_limit,
},
"rows": rows,
}
@app.get("/assistant/metrics")
async def assistant_metrics(
task_type: Optional[Literal["message", "finance", "gov", "general"]] = None,
release_name: Optional[str] = None,
outcome: Optional[Literal["accepted", "edited", "rejected"]] = None,
group_by: Literal["task_type", "release_name", "both"] = "both",
limit: int = 100,
x_admin_api_key: Optional[str] = Header(default=None),
):
check_admin_api_key(x_admin_api_key)
bounded_limit = max(1, min(limit, 1000))
result = await run_remote_query_assistant_metrics(
task_type=task_type,
release_name=release_name,
outcome=outcome,
group_by=group_by,
limit=bounded_limit,
)
rows = result["rows"]
return {
"count": len(rows),
"filters": {
"task_type": task_type,
"release_name": release_name,
"outcome": outcome,
"group_by": group_by,
"limit": bounded_limit,
},
"rows": rows,
}
@app.get("/assistant/actions")
async def assistant_actions(
status: Optional[Literal["blocked", "executed"]] = None,
task_type: Optional[Literal["message", "finance", "gov", "general"]] = None,
release_name: Optional[str] = None,
step_id: Optional[str] = None,
action_type: Optional[Literal["research", "draft", "ask_user", "prepare_data", "review"]] = None,
limit: int = 50,
x_admin_api_key: Optional[str] = Header(default=None),
):
check_admin_api_key(x_admin_api_key)
bounded_limit = max(1, min(limit, 500))
result = await run_remote_query_assistant_actions(
status=status,
task_type=task_type,
release_name=release_name,
step_id=step_id,
action_type=action_type,
limit=bounded_limit,
)
rows = result["rows"]
return {
"count": len(rows),
"filters": {
"status": status,
"task_type": task_type,
"release_name": release_name,
"step_id": step_id,
"action_type": action_type,
"limit": bounded_limit,
},
"rows": rows,
}
@app.post("/admin/project-release")
async def project_release(payload: ProjectionTrigger, x_admin_api_key: Optional[str] = Header(default=None)):
check_admin_api_key(x_admin_api_key)
run_id = str(uuid.uuid4())
run_input = payload.model_dump()
started_at_utc = now_iso()
await record_event_best_effort(run_id, "started", {"input": run_input})
try:
result = await run_remote_projector(payload)
if result.get("spark_read_done"):
await record_event_best_effort(run_id, "spark_read_done", {"release_name": payload.release_name})
if result.get("projection_done"):
await record_event_best_effort(
run_id,
"projection_done",
{"targets": payload.targets, "dry_run": payload.dry_run},
)
await run_remote_record_run(
run_id=run_id,
run_type="projection",
status="succeeded",
started_at_utc=started_at_utc,
finished_at_utc=now_iso(),
actor="admin-api",
input_json=run_input,
output_json=result,
error_text=None,
)
await record_event_best_effort(run_id, "recorded", {"status": "succeeded"})
return {"run_id": run_id, "result": result}
except HTTPException as e:
err_text = e.detail if isinstance(e.detail, str) else json.dumps(e.detail, ensure_ascii=False)
await run_remote_record_run(
run_id=run_id,
run_type="projection",
status="failed",
started_at_utc=started_at_utc,
finished_at_utc=now_iso(),
actor="admin-api",
input_json=run_input,
output_json=None,
error_text=err_text,
)
await record_event_best_effort(run_id, "recorded", {"status": "failed"})
raise
except Exception as e:
await run_remote_record_run(
run_id=run_id,
run_type="projection",
status="failed",
started_at_utc=started_at_utc,
finished_at_utc=now_iso(),
actor="admin-api",
input_json=run_input,
output_json=None,
error_text=str(e),
)
await record_event_best_effort(run_id, "recorded", {"status": "failed"})
raise
@app.post("/admin/ingest-email-imap")
async def ingest_email_imap(payload: EmailImapIngestPayload, x_admin_api_key: Optional[str] = Header(default=None)):
check_admin_api_key(x_admin_api_key)
run_id = str(uuid.uuid4())
started_at_utc = now_iso()
run_input = payload.model_dump()
if "password" in run_input:
run_input["password"] = "***"
effective_since_uid = payload.since_uid
if effective_since_uid is None and payload.incremental:
effective_since_uid = await run_remote_query_imap_checkpoint(
host=payload.host,
mailbox=payload.mailbox,
username=payload.username,
table=payload.table,
)
if effective_since_uid is not None:
effective_search_criteria = f"UID {int(effective_since_uid) + 1}:*"
else:
effective_search_criteria = payload.search_criteria
await record_event_best_effort(
run_id,
"started",
{
"input": {
"host": payload.host,
"mailbox": payload.mailbox,
"search_criteria": effective_search_criteria,
"since_uid": effective_since_uid,
"max_messages": payload.max_messages,
"table": payload.table,
"dedupe_mode": payload.dedupe_mode,
}
},
)
try:
items = await asyncio.to_thread(
fetch_imap_messages_blocking,
payload,
effective_search_criteria,
effective_since_uid,
)
max_uid_fetched: Optional[int] = None
for m in items:
uid_raw = m.metadata.get("imap_uid")
try:
uid_int = int(uid_raw)
except Exception:
continue
if max_uid_fetched is None or uid_int > max_uid_fetched:
max_uid_fetched = uid_int
batch_payload = MessageIngestBatchPayload(
table=payload.table,
dedupe_mode=payload.dedupe_mode,
messages=items,
)
ingest_result = await run_remote_ingest_messages_batch(batch_payload)
result = {
"incremental": payload.incremental,
"since_uid": effective_since_uid,
"search_criteria_used": effective_search_criteria,
"max_uid_fetched": max_uid_fetched,
"fetched_messages": len(items),
"ingested_rows_requested": len(items),
"ingest_result": ingest_result,
}
await record_event_best_effort(
run_id,
"ingest_done",
{"fetched_messages": len(items), "table": payload.table, "max_uid_fetched": max_uid_fetched},
)
await run_remote_record_run(
run_id=run_id,
run_type="ingest_email_imap",
status="succeeded",
started_at_utc=started_at_utc,
finished_at_utc=now_iso(),
actor="admin-api",
input_json=run_input,
output_json=result,
error_text=None,
)
await record_event_best_effort(run_id, "recorded", {"status": "succeeded"})
return {"run_id": run_id, "result": result}
except HTTPException as e:
err_text = e.detail if isinstance(e.detail, str) else json.dumps(e.detail, ensure_ascii=False)
await run_remote_record_run(
run_id=run_id,
run_type="ingest_email_imap",
status="failed",
started_at_utc=started_at_utc,
finished_at_utc=now_iso(),
actor="admin-api",
input_json=run_input,
output_json=None,
error_text=err_text,
)
await record_event_best_effort(run_id, "recorded", {"status": "failed"})
raise
except Exception as e:
await run_remote_record_run(
run_id=run_id,
run_type="ingest_email_imap",
status="failed",
started_at_utc=started_at_utc,
finished_at_utc=now_iso(),
actor="admin-api",
input_json=run_input,
output_json=None,
error_text=str(e),
)
await record_event_best_effort(run_id, "recorded", {"status": "failed"})
raise
@app.post("/admin/poll-and-project")
async def poll_and_project(payload: PollAndProjectPayload, x_admin_api_key: Optional[str] = Header(default=None)):
check_admin_api_key(x_admin_api_key)
if payload.imap.table != "lake.db1.messages":
raise HTTPException(
status_code=400,
detail="poll-and-project currently supports only table lake.db1.messages",
)
run_id = str(uuid.uuid4())
started_at_utc = now_iso()
run_input = payload.model_dump()
if run_input.get("imap", {}).get("password"):
run_input["imap"]["password"] = "***"
await record_event_best_effort(
run_id,
"started",
{
"input": {
"imap": {
"host": payload.imap.host,
"mailbox": payload.imap.mailbox,
"incremental": payload.imap.incremental,
"since_uid": payload.imap.since_uid,
"max_messages": payload.imap.max_messages,
"table": payload.imap.table,
},
"targets": payload.targets,
"dry_run": payload.dry_run,
"project_if_no_new": payload.project_if_no_new,
}
},
)
try:
ingest = await ingest_email_imap(payload.imap, x_admin_api_key)
ingest_result = ingest["result"]
fetched_messages = int(ingest_result.get("fetched_messages", 0))
await record_event_best_effort(
run_id,
"ingest_done",
{
"ingest_run_id": ingest.get("run_id"),
"fetched_messages": fetched_messages,
"max_uid_fetched": ingest_result.get("max_uid_fetched"),
},
)
if fetched_messages <= 0 and not payload.project_if_no_new:
result = {
"ingest_run_id": ingest.get("run_id"),
"ingest_result": ingest_result,
"skipped": True,
"reason": "No new messages",
}
await run_remote_record_run(
run_id=run_id,
run_type="poll_and_project",
status="succeeded",
started_at_utc=started_at_utc,
finished_at_utc=now_iso(),
actor="admin-api",
input_json=run_input,
output_json=result,
error_text=None,
)
await record_event_best_effort(run_id, "recorded", {"status": "succeeded"})
return {"run_id": run_id, "result": result}
if payload.release_name:
release_name = payload.release_name
else:
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d_%H%M%S")
release_name = f"{payload.release_prefix}_{ts}_messages-auto"
create_result = await run_remote_create_messages_release(release_name)
await record_event_best_effort(
run_id,
"release_created",
{"release_name": release_name},
)
projection = ProjectionTrigger(
release_name=release_name,
targets=payload.targets,
concept_table=payload.concept_table,
dry_run=payload.dry_run,
)
projection_result = await run_remote_projector(projection)
await record_event_best_effort(
run_id,
"projection_done",
{"release_name": release_name, "targets": payload.targets},
)
result = {
"ingest_run_id": ingest.get("run_id"),
"ingest_result": ingest_result,
"release": create_result,
"projection": projection_result,
"release_name": release_name,
"skipped": False,
}
await run_remote_record_run(
run_id=run_id,
run_type="poll_and_project",
status="succeeded",
started_at_utc=started_at_utc,
finished_at_utc=now_iso(),
actor="admin-api",
input_json=run_input,
output_json=result,
error_text=None,
)
await record_event_best_effort(run_id, "recorded", {"status": "succeeded"})
return {"run_id": run_id, "result": result}
except HTTPException as e:
err_text = e.detail if isinstance(e.detail, str) else json.dumps(e.detail, ensure_ascii=False)
await run_remote_record_run(
run_id=run_id,
run_type="poll_and_project",
status="failed",
started_at_utc=started_at_utc,
finished_at_utc=now_iso(),
actor="admin-api",
input_json=run_input,
output_json=None,
error_text=err_text,
)
await record_event_best_effort(run_id, "recorded", {"status": "failed"})
raise
except Exception as e:
await run_remote_record_run(
run_id=run_id,
run_type="poll_and_project",
status="failed",
started_at_utc=started_at_utc,
finished_at_utc=now_iso(),
actor="admin-api",
input_json=run_input,
output_json=None,
error_text=str(e),
)
await record_event_best_effort(run_id, "recorded", {"status": "failed"})
raise
@app.post("/admin/ingest-message")
async def ingest_message(payload: MessageIngestPayload, x_admin_api_key: Optional[str] = Header(default=None)):
check_admin_api_key(x_admin_api_key)
run_id = str(uuid.uuid4())
run_input = payload.model_dump()
started_at_utc = now_iso()
await record_event_best_effort(
run_id,
"started",
{"input": {"table": payload.table, "message_id": payload.message_id, "thread_id": payload.thread_id}},
)
try:
result = await run_remote_ingest_message(payload)
await record_event_best_effort(
run_id,
"ingest_done",
{"table": payload.table, "message_id": payload.message_id, "thread_id": payload.thread_id},
)
await run_remote_record_run(
run_id=run_id,
run_type="ingest_message",
status="succeeded",
started_at_utc=started_at_utc,
finished_at_utc=now_iso(),
actor="admin-api",
input_json=run_input,
output_json=result,
error_text=None,
)
await record_event_best_effort(run_id, "recorded", {"status": "succeeded"})
return {"run_id": run_id, "result": result}
except HTTPException as e:
err_text = e.detail if isinstance(e.detail, str) else json.dumps(e.detail, ensure_ascii=False)
await run_remote_record_run(
run_id=run_id,
run_type="ingest_message",
status="failed",
started_at_utc=started_at_utc,
finished_at_utc=now_iso(),
actor="admin-api",
input_json=run_input,
output_json=None,
error_text=err_text,
)
await record_event_best_effort(run_id, "recorded", {"status": "failed"})
raise
except Exception as e:
await run_remote_record_run(
run_id=run_id,
run_type="ingest_message",
status="failed",
started_at_utc=started_at_utc,
finished_at_utc=now_iso(),
actor="admin-api",
input_json=run_input,
output_json=None,
error_text=str(e),
)
await record_event_best_effort(run_id, "recorded", {"status": "failed"})
raise
@app.post("/admin/ingest-messages-batch")
async def ingest_messages_batch(
payload: MessageIngestBatchPayload,
x_admin_api_key: Optional[str] = Header(default=None),
):
check_admin_api_key(x_admin_api_key)
if not payload.messages:
raise HTTPException(status_code=400, detail="messages must contain at least 1 item")
run_id = str(uuid.uuid4())
run_input = payload.model_dump()
started_at_utc = now_iso()
await record_event_best_effort(
run_id,
"started",
{"input": {"table": payload.table, "rows": len(payload.messages)}},
)
try:
result = await run_remote_ingest_messages_batch(payload)
await record_event_best_effort(
run_id,
"ingest_done",
{"table": payload.table, "rows": len(payload.messages)},
)
await run_remote_record_run(
run_id=run_id,
run_type="ingest_messages_batch",
status="succeeded",
started_at_utc=started_at_utc,
finished_at_utc=now_iso(),
actor="admin-api",
input_json=run_input,
output_json=result,
error_text=None,
)
await record_event_best_effort(run_id, "recorded", {"status": "succeeded"})
return {"run_id": run_id, "result": result}
except HTTPException as e:
err_text = e.detail if isinstance(e.detail, str) else json.dumps(e.detail, ensure_ascii=False)
await run_remote_record_run(
run_id=run_id,
run_type="ingest_messages_batch",
status="failed",
started_at_utc=started_at_utc,
finished_at_utc=now_iso(),
actor="admin-api",
input_json=run_input,
output_json=None,
error_text=err_text,
)
await record_event_best_effort(run_id, "recorded", {"status": "failed"})
raise
except Exception as e:
await run_remote_record_run(
run_id=run_id,
run_type="ingest_messages_batch",
status="failed",
started_at_utc=started_at_utc,
finished_at_utc=now_iso(),
actor="admin-api",
input_json=run_input,
output_json=None,
error_text=str(e),
)
await record_event_best_effort(run_id, "recorded", {"status": "failed"})
raise