diff --git a/app.py b/app.py index 6d6b92a..e313d2e 100644 --- a/app.py +++ b/app.py @@ -2,17 +2,10 @@ import os import json import hashlib import asyncio -import shlex import uuid -import base64 -import imaplib -import email -import tempfile import re import time from pathlib import Path -from email import policy -from email.utils import parseaddr, parsedate_to_datetime from datetime import datetime, timezone from typing import Optional, List, Dict, Any, Literal @@ -24,6 +17,21 @@ from pydantic import BaseModel, Field from gremlin_python.driver import client as gremlin_client from gremlin_python.driver.serializer import GraphSONSerializersV3d0 from dotenv import load_dotenv +from services.assistant_retrieval import ( + AssistantRetrievalConfig, + DEFAULT_QUERY_STOPWORDS, + retrieve_sources_two_stage, +) +from services.assistant_planning import ( + build_assistant_plan_prompt as planning_build_assistant_plan_prompt, + execute_plan_step as planning_execute_plan_step, + fallback_plan as planning_fallback_plan, + find_plan_step as planning_find_plan_step, + enforce_step_policy as planning_enforce_step_policy, + is_high_risk_step as planning_is_high_risk_step, +) +from services.remote_ops import RemoteOps, RemoteOpsConfig +from services.imap_ingest import fetch_imap_messages_blocking as imap_fetch_messages_blocking APP_NAME = "concept-api" APP_VERSION = os.getenv("APP_VERSION", "dev-local") @@ -96,6 +104,31 @@ CREATE_MESSAGES_RELEASE_REMOTE_SCRIPT = os.getenv( "CREATE_MESSAGES_RELEASE_REMOTE_SCRIPT", "./create-messages-release-via-spark-container.sh", ) +REMOTE_OPS = RemoteOps( + RemoteOpsConfig( + ssh_host=PROJECTOR_SSH_HOST, + remote_dir=PROJECTOR_REMOTE_DIR, + ssh_bin=PROJECTOR_SSH_BIN, + ssh_opts=PROJECTOR_SSH_OPTS, + scp_bin=PROJECTOR_SCP_BIN, + scp_opts=PROJECTOR_SCP_OPTS, + timeout_sec=PROJECTOR_TIMEOUT_SEC, + projector_remote_script=PROJECTOR_REMOTE_SCRIPT, + ingest_message_remote_script=INGEST_MESSAGE_REMOTE_SCRIPT, + ingest_messages_batch_remote_script=INGEST_MESSAGES_BATCH_REMOTE_SCRIPT, + assistant_feedback_remote_script=ASSISTANT_FEEDBACK_REMOTE_SCRIPT, + assistant_feedback_query_remote_script=ASSISTANT_FEEDBACK_QUERY_REMOTE_SCRIPT, + assistant_metrics_query_remote_script=ASSISTANT_METRICS_QUERY_REMOTE_SCRIPT, + assistant_action_remote_script=ASSISTANT_ACTION_REMOTE_SCRIPT, + assistant_actions_query_remote_script=ASSISTANT_ACTIONS_QUERY_REMOTE_SCRIPT, + assistant_proposals_remote_script=ASSISTANT_PROPOSALS_REMOTE_SCRIPT, + assistant_proposals_query_remote_script=ASSISTANT_PROPOSALS_QUERY_REMOTE_SCRIPT, + runs_remote_script=RUNS_REMOTE_SCRIPT, + run_events_remote_script=RUN_EVENTS_REMOTE_SCRIPT, + imap_checkpoint_remote_script=IMAP_CHECKPOINT_REMOTE_SCRIPT, + create_messages_release_remote_script=CREATE_MESSAGES_RELEASE_REMOTE_SCRIPT, + ) +) app = FastAPI(title=APP_NAME) UI_DIR = Path(__file__).resolve().parent / "ui" @@ -374,12 +407,6 @@ def now_iso() -> str: return datetime.now(timezone.utc).isoformat() -def _tail(text: str, max_chars: int = 8000) -> str: - if len(text) <= max_chars: - return text - return text[-max_chars:] - - def check_admin_api_key(x_admin_api_key: Optional[str]) -> None: if not ADMIN_API_KEY: raise HTTPException(status_code=503, detail="ADMIN_API_KEY is not configured") @@ -394,183 +421,13 @@ def make_fingerprint(name: str, kind: Optional[str], external_ids: Dict[str, str return hashlib.sha256(raw.encode("utf-8")).hexdigest() -def _clean_header_id(v: Optional[str]) -> str: - if not v: - return "" - return v.strip().strip("<>").strip() - - -def _normalize_thread_id(msg_id: str, refs: str, in_reply_to: str, subject: str, sender: str) -> str: - refs_clean = _clean_header_id(refs.split()[-1] if refs else "") - in_reply_clean = _clean_header_id(in_reply_to) - if refs_clean: - return f"thread:{refs_clean}" - if in_reply_clean: - return f"thread:{in_reply_clean}" - seed = f"{subject.strip().lower()}|{sender.strip().lower()}" - if not seed.strip("|"): - seed = msg_id - return "thread:" + hashlib.sha256(seed.encode("utf-8")).hexdigest()[:24] - - -def _extract_body_text(msg: email.message.Message) -> str: - try: - if msg.is_multipart(): - for part in msg.walk(): - ctype = (part.get_content_type() or "").lower() - disp = (part.get("Content-Disposition") or "").lower() - if ctype == "text/plain" and "attachment" not in disp: - payload_obj = part.get_content() - if isinstance(payload_obj, str): - return payload_obj.strip() - if isinstance(payload_obj, bytes): - return payload_obj.decode(part.get_content_charset() or "utf-8", errors="replace").strip() - for part in msg.walk(): - ctype = (part.get_content_type() or "").lower() - if ctype == "text/html": - html_obj = part.get_content() - if isinstance(html_obj, bytes): - html_obj = html_obj.decode(part.get_content_charset() or "utf-8", errors="replace") - if isinstance(html_obj, str): - return html_obj.strip() - return "" - payload_obj = msg.get_content() - if isinstance(payload_obj, str): - return payload_obj.strip() - if isinstance(payload_obj, bytes): - return payload_obj.decode(msg.get_content_charset() or "utf-8", errors="replace").strip() - return "" - except Exception: - return "" - - def fetch_imap_messages_blocking( payload: EmailImapIngestPayload, effective_search_criteria: str, since_uid: Optional[int], ) -> List[MessageIngestItem]: - password = payload.password or os.getenv("IMAP_PASSWORD", "") - if not password: - raise ValueError("IMAP password missing: provide payload.password or set IMAP_PASSWORD") - - if payload.use_ssl: - client = imaplib.IMAP4_SSL(payload.host, payload.port) - else: - client = imaplib.IMAP4(payload.host, payload.port) - - try: - status, _ = client.login(payload.username, password) - if status != "OK": - raise RuntimeError("IMAP login failed") - status, _ = client.select(payload.mailbox, readonly=True) - if status != "OK": - raise RuntimeError(f"IMAP select mailbox failed: {payload.mailbox}") - - if since_uid is not None: - status, search_data = client.uid("search", None, "UID", f"{int(since_uid) + 1}:*") - else: - status, search_data = client.uid("search", None, effective_search_criteria) - if status != "OK": - raise RuntimeError(f"IMAP search failed: {effective_search_criteria}") - uid_bytes = search_data[0] if search_data else b"" - uid_list = [u for u in uid_bytes.decode("utf-8", errors="replace").split() if u] - if since_uid is not None: - filtered: List[str] = [] - for u in uid_list: - try: - if int(u) > int(since_uid): - filtered.append(u) - except Exception: - continue - uid_list = filtered - if not uid_list: - return [] - # For incremental UID windows, process oldest-new first so checkpointing cannot skip gaps. - is_uid_window = since_uid is not None - if is_uid_window: - selected_uids = uid_list[: payload.max_messages] - else: - # For non-incremental scans (e.g. ALL), keep "latest N" behavior. - selected_uids = uid_list[-payload.max_messages :] - - out: List[MessageIngestItem] = [] - for uid in selected_uids: - status, msg_data = client.uid("fetch", uid, "(RFC822)") - if status != "OK" or not msg_data: - continue - raw_bytes = None - for part in msg_data: - if isinstance(part, tuple) and len(part) >= 2 and isinstance(part[1], (bytes, bytearray)): - raw_bytes = bytes(part[1]) - break - if not raw_bytes: - continue - msg = email.message_from_bytes(raw_bytes, policy=policy.default) - - subject = str(msg.get("Subject") or "").strip() - from_raw = str(msg.get("From") or "").strip() - to_raw = str(msg.get("To") or "").strip() - date_raw = str(msg.get("Date") or "").strip() - msg_id_raw = str(msg.get("Message-Id") or msg.get("Message-ID") or "").strip() - refs_raw = str(msg.get("References") or "").strip() - in_reply_raw = str(msg.get("In-Reply-To") or "").strip() - - sender_email = parseaddr(from_raw)[1] or from_raw or "unknown" - msg_id_clean = _clean_header_id(msg_id_raw) - if not msg_id_clean: - seed = f"{uid}|{subject}|{sender_email}|{date_raw}" - msg_id_clean = "imap-" + hashlib.sha256(seed.encode("utf-8")).hexdigest()[:24] - - thread_id = _normalize_thread_id( - msg_id=msg_id_clean, - refs=refs_raw, - in_reply_to=in_reply_raw, - subject=subject, - sender=sender_email, - ) - - sent_at_iso = None - if date_raw: - try: - dt = parsedate_to_datetime(date_raw) - if dt.tzinfo is None: - dt = dt.replace(tzinfo=timezone.utc) - sent_at_iso = dt.astimezone(timezone.utc).isoformat() - except Exception: - sent_at_iso = None - - body = _extract_body_text(msg) - if not body: - body = f"(no body) {subject}".strip() - - metadata = { - "subject": subject, - "from": from_raw, - "to": to_raw, - "date": date_raw, - "imap_uid": uid, - "mailbox": payload.mailbox, - "host": payload.host, - "username": payload.username, - } - - out.append( - MessageIngestItem( - thread_id=thread_id, - message_id=msg_id_clean, - sender=sender_email, - channel=payload.channel, - sent_at=sent_at_iso, - body=body, - metadata=metadata, - ) - ) - return out - finally: - try: - client.logout() - except Exception: - pass + rows = imap_fetch_messages_blocking(payload, effective_search_criteria, since_uid) + return [MessageIngestItem(**row) for row in rows] async def run_remote_query_imap_checkpoint( @@ -579,89 +436,11 @@ async def run_remote_query_imap_checkpoint( username: str, table: str, ) -> Optional[int]: - parts = [ - IMAP_CHECKPOINT_REMOTE_SCRIPT, - host, - mailbox, - username, - table, - ] - command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}" - ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] - proc = await asyncio.create_subprocess_exec( - *ssh_args, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - try: - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) - except asyncio.TimeoutError: - proc.kill() - await proc.wait() - raise HTTPException(status_code=504, detail="IMAP checkpoint query timed out") - - out = stdout.decode("utf-8", errors="replace") - err = stderr.decode("utf-8", errors="replace") - if proc.returncode != 0: - raise HTTPException( - status_code=502, - detail={ - "host": PROJECTOR_SSH_HOST, - "remote_dir": PROJECTOR_REMOTE_DIR, - "exit_code": proc.returncode, - "stdout_tail": _tail(out), - "stderr_tail": _tail(err), - }, - ) - try: - obj = _extract_json_object_from_text(out) - val = obj.get("max_uid") - if val is None: - return None - return int(val) - except Exception as e: - raise HTTPException( - status_code=502, - detail={ - "message": f"Unable to parse IMAP checkpoint output: {e}", - "stdout_tail": _tail(out), - "stderr_tail": _tail(err), - }, - ) + return await REMOTE_OPS.run_remote_query_imap_checkpoint(host, mailbox, username, table) async def run_remote_create_messages_release(release_name: str) -> Dict[str, Any]: - parts = [ - CREATE_MESSAGES_RELEASE_REMOTE_SCRIPT, - release_name, - ] - command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}" - ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] - proc = await asyncio.create_subprocess_exec( - *ssh_args, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - try: - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) - except asyncio.TimeoutError: - proc.kill() - await proc.wait() - raise HTTPException(status_code=504, detail="Create messages release timed out") - - out = stdout.decode("utf-8", errors="replace") - err = stderr.decode("utf-8", errors="replace") - result = { - "host": PROJECTOR_SSH_HOST, - "remote_dir": PROJECTOR_REMOTE_DIR, - "exit_code": proc.returncode, - "release_name": release_name, - "stdout_tail": _tail(out), - "stderr_tail": _tail(err), - } - if proc.returncode != 0: - raise HTTPException(status_code=502, detail=result) - return result + return await REMOTE_OPS.run_remote_create_messages_release(release_name) async def ipfs_add_json(payload: Dict[str, Any]) -> str: @@ -918,14 +697,12 @@ TASK_AI_CACHE: Dict[str, Dict[str, Any]] = {} ASSISTANT_CHAT_MAX_TURNS = int(os.getenv("ASSISTANT_CHAT_MAX_TURNS", "20")) ASSISTANT_CHAT_STORE_PATH = os.getenv("ASSISTANT_CHAT_STORE_PATH", ".assistant_chat_sessions.json") ASSISTANT_CHAT_SESSIONS: Dict[str, List[Dict[str, str]]] = {} -ASSISTANT_CHAT_MIN_TOKEN_OVERLAP = int(os.getenv("ASSISTANT_CHAT_MIN_TOKEN_OVERLAP", "1")) -ASSISTANT_SOURCE_CANDIDATE_MULTIPLIER = int(os.getenv("ASSISTANT_SOURCE_CANDIDATE_MULTIPLIER", "4")) -ASSISTANT_SOURCE_MIN_COVERAGE = float(os.getenv("ASSISTANT_SOURCE_MIN_COVERAGE", "0.6")) -ASSISTANT_QUERY_STOPWORDS = { - "the", "and", "for", "with", "that", "this", "from", "have", "has", "had", "are", "was", "were", "will", - "would", "should", "could", "can", "you", "your", "about", "what", "when", "where", "which", "who", "whom", - "why", "how", "tomorrow", "today", "please", "any", "there", "need", "want", "know", "does", "did", "done", -} +ASSISTANT_RETRIEVAL_CONFIG = AssistantRetrievalConfig( + min_token_overlap=int(os.getenv("ASSISTANT_CHAT_MIN_TOKEN_OVERLAP", "1")), + source_candidate_multiplier=int(os.getenv("ASSISTANT_SOURCE_CANDIDATE_MULTIPLIER", "4")), + source_min_coverage=float(os.getenv("ASSISTANT_SOURCE_MIN_COVERAGE", "0.6")), + query_stopwords=set(DEFAULT_QUERY_STOPWORDS), +) def _split_sentences(text: str) -> List[str]: @@ -947,113 +724,21 @@ def _extract_who(text: str, default_sender: Optional[str]) -> Optional[str]: return default_sender or None -def _query_tokens(text: str) -> set[str]: - return { - t for t in re.findall(r"[a-z0-9]{3,}", (text or "").lower()) - if t not in ASSISTANT_QUERY_STOPWORDS - } - - -def _source_text_for_match(src: Dict[str, Any]) -> str: - return " ".join( - [ - str(src.get("text") or ""), - str(src.get("description") or ""), - str(src.get("summary") or ""), - str(src.get("display_name") or ""), - str(src.get("canonical_name") or ""), - ] - ) - - -def _filter_relevant_hits_for_query(hits: List[Dict[str, Any]], query: str) -> List[Dict[str, Any]]: - q_tokens = _query_tokens(query) - if not q_tokens: - return hits - out: List[Dict[str, Any]] = [] - for h in hits: - src = h.get("_source", {}) or {} - s_tokens = _query_tokens(_source_text_for_match(src)) - overlap = len(q_tokens.intersection(s_tokens)) - if overlap >= ASSISTANT_CHAT_MIN_TOKEN_OVERLAP: - out.append(h) - return out - - -def _is_strong_source_match(query: str, src: Dict[str, Any]) -> bool: - q_tokens = _query_tokens(query) - if not q_tokens: - return True - s_tokens = _query_tokens(_source_text_for_match(src)) - overlap = len(q_tokens.intersection(s_tokens)) - q_len = max(1, len(q_tokens)) - coverage = overlap / q_len - min_overlap = ASSISTANT_CHAT_MIN_TOKEN_OVERLAP - # For multi-term questions, require at least 2 matched terms by default. - if q_len >= 2: - min_overlap = max(min_overlap, 2) - return overlap >= min_overlap and coverage >= ASSISTANT_SOURCE_MIN_COVERAGE - - async def _retrieve_sources_two_stage( query: str, release_name: Optional[str], max_sources: int, include_release_recent_fallback: bool = True, ) -> List[Dict[str, Any]]: - candidate_size = max(max_sources, max_sources * max(2, ASSISTANT_SOURCE_CANDIDATE_MULTIPLIER)) - seen_keys: set[str] = set() - candidates: List[Dict[str, Any]] = [] - - def _add_hits(hs: List[Dict[str, Any]]) -> None: - for h in hs: - src = h.get("_source", {}) or {} - key = str(src.get("concept_id") or src.get("source_pk") or "") - if not key: - key = hashlib.sha256( - json.dumps(src, ensure_ascii=False, sort_keys=True).encode("utf-8") - ).hexdigest()[:20] - if key in seen_keys: - continue - seen_keys.add(key) - candidates.append(h) - - # Stage 1: broad retrieval - try: - _add_hits(await es_search_hits(q=query, size=candidate_size, release_name=release_name)) - except Exception as e: - print(f"[WARN] stage1 release search failed: {e}") - if len(candidates) < max_sources: - try: - _add_hits(await es_search_hits(q=query, size=candidate_size, release_name=None)) - except Exception as e: - print(f"[WARN] stage1 global search failed: {e}") - if len(candidates) < max_sources and include_release_recent_fallback and release_name: - try: - _add_hits(await es_recent_by_release(release_name, size=candidate_size)) - except Exception as e: - print(f"[WARN] stage1 release-recent fallback failed: {e}") - - # Stage 2: relevance rerank and trim - q_tokens = _query_tokens(query) - ranked: List[Dict[str, Any]] = [] - for h in candidates: - src = h.get("_source", {}) or {} - s_tokens = _query_tokens(_source_text_for_match(src)) - overlap = len(q_tokens.intersection(s_tokens)) if q_tokens else 0 - base_score = float(h.get("_score") or 0.0) - ranked.append({"hit": h, "overlap": overlap, "base_score": base_score}) - - ranked.sort(key=lambda x: (x["overlap"], x["base_score"]), reverse=True) - relevant = [] - for x in ranked: - src = x["hit"].get("_source", {}) or {} - if _is_strong_source_match(query, src): - relevant.append(x) - if relevant: - return [x["hit"] for x in relevant[:max_sources]] - # Explicitly return no sources instead of attaching weak matches. - return [] + return await retrieve_sources_two_stage( + query=query, + release_name=release_name, + max_sources=max_sources, + include_release_recent_fallback=include_release_recent_fallback, + cfg=ASSISTANT_RETRIEVAL_CONFIG, + es_search_hits=lambda q, size, rn: es_search_hits(q=q, size=size, release_name=rn), + es_recent_by_release=lambda rn, size: es_recent_by_release(rn, size=size), + ) def extract_pending_tasks_from_source(src: Dict[str, Any]) -> List[Dict[str, Any]]: @@ -1785,131 +1470,27 @@ async def _assistant_write_code_from_learn( def build_assistant_plan_prompt(payload: AssistantPlanPayload, source_docs: List[Dict[str, Any]]) -> str: - constraints = payload.constraints or [] - constraint_lines = "\n".join(f"- {c}" for c in constraints) if constraints else "- None" - context_chunks = [] - for d in source_docs: - src = d.get("_source", {}) or {} - context_chunks.append( - "\n".join( - [ - f"concept_id: {src.get('concept_id', '')}", - f"source_pk: {src.get('source_pk', '')}", - f"source_table: {src.get('source_table', '')}", - f"release_name: {src.get('release_name', '')}", - f"text: {str(src.get('text') or src.get('description') or src.get('summary') or '')[:600]}", - ] - ) - ) - context = "\n\n---\n\n".join(context_chunks) if context_chunks else "No retrieved context." - return ( - "You are a cautious personal assistant planner. Produce an execution plan only; do not execute anything.\n" - "Return valid JSON ONLY with this exact shape:\n" - '{' - '"plan": [' - '{"step_id":"S1","title":"...","action_type":"research|draft|ask_user|prepare_data|review","requires_approval":true|false,"notes":"..."}' - "]" - "}\n" - f"Use at most {payload.max_steps} steps.\n" - "Prefer safe read-only and draft actions first.\n\n" - f"Task type: {payload.task_type}\n" - f"Objective: {payload.objective}\n" - f"Constraints:\n{constraint_lines}\n\n" - "Retrieved context:\n" - f"{context}\n" - ) + return planning_build_assistant_plan_prompt(payload, source_docs) def fallback_plan(payload: AssistantPlanPayload) -> List[AssistantPlanStep]: - return [ - AssistantPlanStep( - step_id="S1", - title="Gather relevant facts and constraints", - action_type="research", - requires_approval=False, - notes="Review messages/concepts and identify required context.", - ), - AssistantPlanStep( - step_id="S2", - title="Draft a response or action proposal", - action_type="draft", - requires_approval=False, - notes="Produce a concise draft aligned with objective and constraints.", - ), - AssistantPlanStep( - step_id="S3", - title="Request user confirmation before any external action", - action_type="ask_user", - requires_approval=True, - notes="Do not send or execute changes until approved.", - ), - ][: payload.max_steps] + return [AssistantPlanStep(**s) for s in planning_fallback_plan(payload.max_steps)] def find_plan_step(plan: List[AssistantPlanStep], step_id: str) -> Optional[AssistantPlanStep]: - for s in plan: - if s.step_id == step_id: - return s - return None + return planning_find_plan_step(plan, step_id) def is_high_risk_step(step: AssistantPlanStep) -> bool: - text = f"{step.title} {step.notes or ''}".lower() - high_risk_terms = [ - "send", - "submit", - "pay", - "payment", - "transfer", - "wire", - "sign", - "file", - "delete", - "close account", - "change account", - ] - return any(t in text for t in high_risk_terms) + return planning_is_high_risk_step(step) def enforce_step_policy(payload: AssistantExecuteStepPayload, step: AssistantPlanStep) -> Optional[str]: - # Plan-declared approval gate. - if step.requires_approval and not payload.approved: - return "Step requires approval but approved=false." - # Extra hard gate for risky external actions. - if is_high_risk_step(step): - if not payload.approved: - return "High-risk step requires approved=true." - if not (payload.manual_confirm_token and payload.manual_confirm_token.strip()): - return "High-risk step requires manual_confirm_token." - return None + return planning_enforce_step_policy(payload, step) async def execute_plan_step(payload: AssistantExecuteStepPayload, step: AssistantPlanStep) -> Dict[str, Any]: - if step.action_type == "draft": - prompt = ( - "Draft concise text for this approved planning step.\n" - f"Task type: {payload.task_type}\n" - f"Objective: {payload.objective}\n" - f"Step: {step.title}\n" - f"Notes: {step.notes or ''}\n" - "Output only final draft text." - ) - try: - text = await ollama_generate(prompt) - if not text.strip(): - text = f"Draft for step '{step.title}'." - except Exception: - text = f"Draft for step '{step.title}'." - return {"draft": text} - if step.action_type == "research": - return {"note": "Research step acknowledged. Use /search or /assistant/draft for grounded retrieval."} - if step.action_type == "prepare_data": - return {"note": "Prepare-data step acknowledged.", "checklist": ["Collect required inputs", "Normalize format", "Validate completeness"]} - if step.action_type == "review": - return {"note": "Review step requires human review before external action."} - if step.action_type == "ask_user": - return {"question": "Please confirm whether to proceed with the next high-impact action."} - return {"note": "Step recognized but no executor implemented."} + return await planning_execute_plan_step(payload, step, generate_text=ollama_generate) def _gremlin_submit_blocking(q: str, bindings: Dict[str, Any]) -> Any: # Create/close client per call so transport loop is owned by this worker thread. @@ -1928,256 +1509,22 @@ async def gremlin_submit(q: str, bindings: Dict[str, Any]) -> Any: async def run_remote_projector(payload: ProjectionTrigger) -> Dict[str, Any]: - parts = [ - PROJECTOR_REMOTE_SCRIPT, - "--release-name", payload.release_name, - "--targets", payload.targets, - ] - if payload.concept_table: - parts.extend(["--concept-table", payload.concept_table]) - if payload.dry_run: - parts.append("--dry-run") - - command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}" - ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] - proc = await asyncio.create_subprocess_exec( - *ssh_args, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - try: - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) - except asyncio.TimeoutError: - proc.kill() - await proc.wait() - raise HTTPException(status_code=504, detail="Projector execution timed out") - - out = stdout.decode("utf-8", errors="replace") - err = stderr.decode("utf-8", errors="replace") - spark_read_done = "[STEP] spark_read_done" in out - projection_done = "[STEP] projection_done" in out - result = { - "host": PROJECTOR_SSH_HOST, - "remote_dir": PROJECTOR_REMOTE_DIR, - "exit_code": proc.returncode, - "spark_read_done": spark_read_done, - "projection_done": projection_done, - "stdout_tail": _tail(out), - "stderr_tail": _tail(err), - } - if proc.returncode != 0: - raise HTTPException(status_code=502, detail=result) - return result - - -def _b64(s: str) -> str: - return base64.b64encode(s.encode("utf-8")).decode("ascii") + return await REMOTE_OPS.run_remote_projector(payload) async def run_remote_ingest_message(payload: MessageIngestPayload) -> Dict[str, Any]: - sent_at = payload.sent_at or "" - parts = [ - INGEST_MESSAGE_REMOTE_SCRIPT, - payload.table, - payload.thread_id, - payload.message_id, - payload.sender, - payload.channel, - sent_at, - _b64(payload.body), - _b64(json.dumps(payload.metadata, ensure_ascii=False)), - ] - command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}" - ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] - proc = await asyncio.create_subprocess_exec( - *ssh_args, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - try: - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) - except asyncio.TimeoutError: - proc.kill() - await proc.wait() - raise HTTPException(status_code=504, detail="Message ingest execution timed out") - - out = stdout.decode("utf-8", errors="replace") - err = stderr.decode("utf-8", errors="replace") - result = { - "host": PROJECTOR_SSH_HOST, - "remote_dir": PROJECTOR_REMOTE_DIR, - "exit_code": proc.returncode, - "stdout_tail": _tail(out), - "stderr_tail": _tail(err), - } - if proc.returncode != 0: - raise HTTPException(status_code=502, detail=result) - return result + return await REMOTE_OPS.run_remote_ingest_message(payload) async def run_remote_ingest_messages_batch(payload: MessageIngestBatchPayload) -> Dict[str, Any]: - rows = [] - for m in payload.messages: - rows.append( - { - "thread_id": m.thread_id, - "message_id": m.message_id, - "sender": m.sender, - "channel": m.channel, - "sent_at": m.sent_at, - "body": m.body, - "metadata": m.metadata, - } - ) - if not rows: - return { - "host": PROJECTOR_SSH_HOST, - "remote_dir": PROJECTOR_REMOTE_DIR, - "exit_code": 0, - "rows": 0, - "stdout_tail": "[INFO] No rows to ingest", - "stderr_tail": "", - } - - local_tmp = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False, encoding="utf-8") - remote_tmp = f"{PROJECTOR_REMOTE_DIR}/.ingest_messages_{uuid.uuid4().hex}.json" - try: - json.dump(rows, local_tmp, ensure_ascii=False) - local_tmp.flush() - local_tmp.close() - - scp_target = f"{PROJECTOR_SSH_HOST}:{remote_tmp}" - scp_args = [PROJECTOR_SCP_BIN, *shlex.split(PROJECTOR_SCP_OPTS), local_tmp.name, scp_target] - scp_proc = await asyncio.create_subprocess_exec( - *scp_args, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - try: - scp_stdout, scp_stderr = await asyncio.wait_for(scp_proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) - except asyncio.TimeoutError: - scp_proc.kill() - await scp_proc.wait() - raise HTTPException(status_code=504, detail="Batch payload upload timed out") - if scp_proc.returncode != 0: - raise HTTPException( - status_code=502, - detail={ - "host": PROJECTOR_SSH_HOST, - "remote_dir": PROJECTOR_REMOTE_DIR, - "exit_code": scp_proc.returncode, - "stdout_tail": _tail(scp_stdout.decode("utf-8", errors="replace")), - "stderr_tail": _tail(scp_stderr.decode("utf-8", errors="replace")), - }, - ) - - payload_arg = f"@{remote_tmp}" - parts = [ - INGEST_MESSAGES_BATCH_REMOTE_SCRIPT, - payload.table, - payload.dedupe_mode, - payload_arg, - ] - batch_cmd = " ".join(shlex.quote(p) for p in parts) - command = ( - f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && " - f"({batch_cmd}); rc=$?; rm -f {shlex.quote(remote_tmp)}; exit $rc" - ) - ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] - proc = await asyncio.create_subprocess_exec( - *ssh_args, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - try: - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) - except asyncio.TimeoutError: - proc.kill() - await proc.wait() - raise HTTPException(status_code=504, detail="Batch message ingest execution timed out") - finally: - try: - os.unlink(local_tmp.name) - except Exception: - pass - - out = stdout.decode("utf-8", errors="replace") - err = stderr.decode("utf-8", errors="replace") - result = { - "host": PROJECTOR_SSH_HOST, - "remote_dir": PROJECTOR_REMOTE_DIR, - "exit_code": proc.returncode, - "rows": len(rows), - "stdout_tail": _tail(out), - "stderr_tail": _tail(err), - } - if proc.returncode != 0: - raise HTTPException(status_code=502, detail=result) - return result + return await REMOTE_OPS.run_remote_ingest_messages_batch(payload) async def run_remote_assistant_feedback( feedback_id: str, payload: AssistantFeedbackPayload, ) -> Dict[str, Any]: - confidence = payload.confidence if payload.confidence is not None else 0.0 - parts = [ - ASSISTANT_FEEDBACK_REMOTE_SCRIPT, - feedback_id, - now_iso(), - payload.outcome, - payload.task_type, - payload.release_name or "", - f"{confidence}", - "true" if payload.needs_review else "false", - _b64(payload.goal or ""), - _b64(payload.draft), - _b64(payload.final_text or ""), - _b64(json.dumps([s.model_dump() for s in payload.sources], ensure_ascii=False)), - _b64(payload.notes or ""), - ] - command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}" - ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] - proc = await asyncio.create_subprocess_exec( - *ssh_args, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - try: - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) - except asyncio.TimeoutError: - proc.kill() - await proc.wait() - raise HTTPException(status_code=504, detail="Assistant feedback execution timed out") - - out = stdout.decode("utf-8", errors="replace") - err = stderr.decode("utf-8", errors="replace") - result = { - "host": PROJECTOR_SSH_HOST, - "remote_dir": PROJECTOR_REMOTE_DIR, - "exit_code": proc.returncode, - "stdout_tail": _tail(out), - "stderr_tail": _tail(err), - } - if proc.returncode != 0: - raise HTTPException(status_code=502, detail=result) - return result - - -def _extract_json_array_from_text(text: str) -> List[Dict[str, Any]]: - start = text.find("[") - end = text.rfind("]") - if start == -1 or end == -1 or end < start: - raise ValueError("No JSON array found in output") - candidate = text[start : end + 1] - obj = json.loads(candidate) - if not isinstance(obj, list): - raise ValueError("Parsed value is not a JSON array") - out: List[Dict[str, Any]] = [] - for item in obj: - if isinstance(item, dict): - out.append(item) - return out + return await REMOTE_OPS.run_remote_assistant_feedback(feedback_id, payload, now_iso()) async def run_remote_query_assistant_feedback( @@ -2186,56 +1533,7 @@ async def run_remote_query_assistant_feedback( release_name: Optional[str], limit: int, ) -> Dict[str, Any]: - parts = [ - ASSISTANT_FEEDBACK_QUERY_REMOTE_SCRIPT, - outcome or "", - task_type or "", - release_name or "", - str(limit), - ] - command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}" - ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] - proc = await asyncio.create_subprocess_exec( - *ssh_args, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - try: - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) - except asyncio.TimeoutError: - proc.kill() - await proc.wait() - raise HTTPException(status_code=504, detail="Assistant feedback query timed out") - - out = stdout.decode("utf-8", errors="replace") - err = stderr.decode("utf-8", errors="replace") - if proc.returncode != 0: - raise HTTPException( - status_code=502, - detail={ - "host": PROJECTOR_SSH_HOST, - "remote_dir": PROJECTOR_REMOTE_DIR, - "exit_code": proc.returncode, - "stdout_tail": _tail(out), - "stderr_tail": _tail(err), - }, - ) - try: - rows = _extract_json_array_from_text(out) - except Exception as e: - raise HTTPException( - status_code=502, - detail={ - "message": f"Unable to parse feedback query output: {e}", - "stdout_tail": _tail(out), - "stderr_tail": _tail(err), - }, - ) - return { - "host": PROJECTOR_SSH_HOST, - "remote_dir": PROJECTOR_REMOTE_DIR, - "rows": rows, - } + return await REMOTE_OPS.run_remote_query_assistant_feedback(outcome, task_type, release_name, limit) async def run_remote_query_assistant_metrics( @@ -2245,57 +1543,7 @@ async def run_remote_query_assistant_metrics( group_by: str, limit: int, ) -> Dict[str, Any]: - parts = [ - ASSISTANT_METRICS_QUERY_REMOTE_SCRIPT, - task_type or "", - release_name or "", - outcome or "", - group_by, - str(limit), - ] - command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}" - ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] - proc = await asyncio.create_subprocess_exec( - *ssh_args, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - try: - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) - except asyncio.TimeoutError: - proc.kill() - await proc.wait() - raise HTTPException(status_code=504, detail="Assistant metrics query timed out") - - out = stdout.decode("utf-8", errors="replace") - err = stderr.decode("utf-8", errors="replace") - if proc.returncode != 0: - raise HTTPException( - status_code=502, - detail={ - "host": PROJECTOR_SSH_HOST, - "remote_dir": PROJECTOR_REMOTE_DIR, - "exit_code": proc.returncode, - "stdout_tail": _tail(out), - "stderr_tail": _tail(err), - }, - ) - try: - rows = _extract_json_array_from_text(out) - except Exception as e: - raise HTTPException( - status_code=502, - detail={ - "message": f"Unable to parse metrics query output: {e}", - "stdout_tail": _tail(out), - "stderr_tail": _tail(err), - }, - ) - return { - "host": PROJECTOR_SSH_HOST, - "remote_dir": PROJECTOR_REMOTE_DIR, - "rows": rows, - } + return await REMOTE_OPS.run_remote_query_assistant_metrics(task_type, release_name, outcome, group_by, limit) async def run_remote_assistant_action( @@ -2306,48 +1554,15 @@ async def run_remote_assistant_action( output_json: Dict[str, Any], error_text: Optional[str], ) -> Dict[str, Any]: - parts = [ - ASSISTANT_ACTION_REMOTE_SCRIPT, + return await REMOTE_OPS.run_remote_assistant_action( action_id, - now_iso(), - payload.task_type, - payload.release_name or "", - _b64(payload.objective), - step.step_id, - _b64(step.title), - step.action_type, - "true" if step.requires_approval else "false", - "true" if payload.approved else "false", + payload, + step, status, - _b64(json.dumps(output_json, ensure_ascii=False)), - _b64(error_text or ""), - ] - command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}" - ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] - proc = await asyncio.create_subprocess_exec( - *ssh_args, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, + output_json, + error_text, + now_iso(), ) - try: - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) - except asyncio.TimeoutError: - proc.kill() - await proc.wait() - raise HTTPException(status_code=504, detail="Assistant action logging timed out") - - out = stdout.decode("utf-8", errors="replace") - err = stderr.decode("utf-8", errors="replace") - result = { - "host": PROJECTOR_SSH_HOST, - "remote_dir": PROJECTOR_REMOTE_DIR, - "exit_code": proc.returncode, - "stdout_tail": _tail(out), - "stderr_tail": _tail(err), - } - if proc.returncode != 0: - raise HTTPException(status_code=502, detail=result) - return result async def run_remote_query_assistant_actions( @@ -2358,58 +1573,14 @@ async def run_remote_query_assistant_actions( action_type: Optional[str], limit: int, ) -> Dict[str, Any]: - parts = [ - ASSISTANT_ACTIONS_QUERY_REMOTE_SCRIPT, - status or "", - task_type or "", - release_name or "", - step_id or "", - action_type or "", - str(limit), - ] - command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}" - ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] - proc = await asyncio.create_subprocess_exec( - *ssh_args, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, + return await REMOTE_OPS.run_remote_query_assistant_actions( + status, + task_type, + release_name, + step_id, + action_type, + limit, ) - try: - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) - except asyncio.TimeoutError: - proc.kill() - await proc.wait() - raise HTTPException(status_code=504, detail="Assistant actions query timed out") - - out = stdout.decode("utf-8", errors="replace") - err = stderr.decode("utf-8", errors="replace") - if proc.returncode != 0: - raise HTTPException( - status_code=502, - detail={ - "host": PROJECTOR_SSH_HOST, - "remote_dir": PROJECTOR_REMOTE_DIR, - "exit_code": proc.returncode, - "stdout_tail": _tail(out), - "stderr_tail": _tail(err), - }, - ) - try: - rows = _extract_json_array_from_text(out) - except Exception as e: - raise HTTPException( - status_code=502, - detail={ - "message": f"Unable to parse actions query output: {e}", - "stdout_tail": _tail(out), - "stderr_tail": _tail(err), - }, - ) - return { - "host": PROJECTOR_SSH_HOST, - "remote_dir": PROJECTOR_REMOTE_DIR, - "rows": rows, - } async def run_remote_record_assistant_proposals( @@ -2421,42 +1592,15 @@ async def run_remote_record_assistant_proposals( signals: Dict[str, Any], proposals: List[Dict[str, Any]], ) -> Dict[str, Any]: - parts = [ - ASSISTANT_PROPOSALS_REMOTE_SCRIPT, + return await REMOTE_OPS.run_remote_record_assistant_proposals( proposal_set_id, created_at_utc, - _b64(objective or ""), - release_name or "", - _b64(summary or ""), - _b64(json.dumps(signals or {}, ensure_ascii=False)), - _b64(json.dumps(proposals or [], ensure_ascii=False)), - ] - command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}" - ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] - proc = await asyncio.create_subprocess_exec( - *ssh_args, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, + objective, + release_name, + summary, + signals, + proposals, ) - try: - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) - except asyncio.TimeoutError: - proc.kill() - await proc.wait() - raise HTTPException(status_code=504, detail="Assistant proposals logging timed out") - - out = stdout.decode("utf-8", errors="replace") - err = stderr.decode("utf-8", errors="replace") - result = { - "host": PROJECTOR_SSH_HOST, - "remote_dir": PROJECTOR_REMOTE_DIR, - "exit_code": proc.returncode, - "stdout_tail": _tail(out), - "stderr_tail": _tail(err), - } - if proc.returncode != 0: - raise HTTPException(status_code=502, detail=result) - return result async def run_remote_query_assistant_proposals( @@ -2464,55 +1608,7 @@ async def run_remote_query_assistant_proposals( proposal_set_id: Optional[str], limit: int, ) -> Dict[str, Any]: - parts = [ - ASSISTANT_PROPOSALS_QUERY_REMOTE_SCRIPT, - release_name or "", - proposal_set_id or "", - str(limit), - ] - command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}" - ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] - proc = await asyncio.create_subprocess_exec( - *ssh_args, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - try: - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) - except asyncio.TimeoutError: - proc.kill() - await proc.wait() - raise HTTPException(status_code=504, detail="Assistant proposals query timed out") - - out = stdout.decode("utf-8", errors="replace") - err = stderr.decode("utf-8", errors="replace") - if proc.returncode != 0: - raise HTTPException( - status_code=502, - detail={ - "host": PROJECTOR_SSH_HOST, - "remote_dir": PROJECTOR_REMOTE_DIR, - "exit_code": proc.returncode, - "stdout_tail": _tail(out), - "stderr_tail": _tail(err), - }, - ) - try: - rows = _extract_json_array_from_text(out) - except Exception as e: - raise HTTPException( - status_code=502, - detail={ - "message": f"Unable to parse proposals query output: {e}", - "stdout_tail": _tail(out), - "stderr_tail": _tail(err), - }, - ) - return { - "host": PROJECTOR_SSH_HOST, - "remote_dir": PROJECTOR_REMOTE_DIR, - "rows": rows, - } + return await REMOTE_OPS.run_remote_query_assistant_proposals(release_name, proposal_set_id, limit) async def run_remote_record_run( @@ -2526,37 +1622,17 @@ async def run_remote_record_run( output_json: Optional[Dict[str, Any]], error_text: Optional[str], ) -> None: - parts = [ - RUNS_REMOTE_SCRIPT, + await REMOTE_OPS.run_remote_record_run( run_id, run_type, status, started_at_utc, finished_at_utc, actor, - _b64(json.dumps(input_json, ensure_ascii=False)), - _b64(json.dumps(output_json, ensure_ascii=False) if output_json is not None else ""), - _b64(error_text or ""), - ] - command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}" - ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] - proc = await asyncio.create_subprocess_exec( - *ssh_args, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, + input_json, + output_json, + error_text, ) - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) - if proc.returncode != 0: - raise HTTPException( - status_code=502, - detail={ - "message": "Failed to record run in Iceberg", - "host": PROJECTOR_SSH_HOST, - "exit_code": proc.returncode, - "stdout_tail": _tail(stdout.decode("utf-8", errors="replace")), - "stderr_tail": _tail(stderr.decode("utf-8", errors="replace")), - }, - ) async def run_remote_record_event( @@ -2564,40 +1640,11 @@ async def run_remote_record_event( event_type: str, detail_json: Dict[str, Any], ) -> None: - parts = [ - RUN_EVENTS_REMOTE_SCRIPT, - run_id, - event_type, - now_iso(), - _b64(json.dumps(detail_json, ensure_ascii=False)), - ] - command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}" - ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] - proc = await asyncio.create_subprocess_exec( - *ssh_args, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) - if proc.returncode != 0: - raise HTTPException( - status_code=502, - detail={ - "message": "Failed to record run event in Iceberg", - "host": PROJECTOR_SSH_HOST, - "exit_code": proc.returncode, - "stdout_tail": _tail(stdout.decode("utf-8", errors="replace")), - "stderr_tail": _tail(stderr.decode("utf-8", errors="replace")), - }, - ) + await REMOTE_OPS.run_remote_record_event(run_id, event_type, detail_json, now_iso()) async def record_event_best_effort(run_id: str, event_type: str, detail_json: Dict[str, Any]) -> None: - try: - await run_remote_record_event(run_id, event_type, detail_json) - except Exception as e: - # Event tracing must never break the primary projection flow. - print(f"[WARN] run event logging failed: run_id={run_id} event={event_type} error={e}") + await REMOTE_OPS.record_event_best_effort(run_id, event_type, detail_json, now_iso()) # --------- routes --------- diff --git a/services/__init__.py b/services/__init__.py new file mode 100644 index 0000000..917b1db --- /dev/null +++ b/services/__init__.py @@ -0,0 +1,2 @@ +"""Service-layer modules for assistant backend.""" + diff --git a/services/assistant_planning.py b/services/assistant_planning.py new file mode 100644 index 0000000..71bb408 --- /dev/null +++ b/services/assistant_planning.py @@ -0,0 +1,140 @@ +from typing import Any, Awaitable, Callable, Dict, List, Optional + + +def _get(obj: Any, name: str, default: Any = None) -> Any: + return getattr(obj, name, default) + + +def build_assistant_plan_prompt(payload: Any, source_docs: List[Dict[str, Any]]) -> str: + constraints = _get(payload, "constraints", []) or [] + constraint_lines = "\n".join(f"- {c}" for c in constraints) if constraints else "- None" + context_chunks = [] + for d in source_docs: + src = d.get("_source", {}) or {} + context_chunks.append( + "\n".join( + [ + f"concept_id: {src.get('concept_id', '')}", + f"source_pk: {src.get('source_pk', '')}", + f"source_table: {src.get('source_table', '')}", + f"release_name: {src.get('release_name', '')}", + f"text: {str(src.get('text') or src.get('description') or src.get('summary') or '')[:600]}", + ] + ) + ) + context = "\n\n---\n\n".join(context_chunks) if context_chunks else "No retrieved context." + return ( + "You are a cautious personal assistant planner. Produce an execution plan only; do not execute anything.\n" + "Return valid JSON ONLY with this exact shape:\n" + '{' + '"plan": [' + '{"step_id":"S1","title":"...","action_type":"research|draft|ask_user|prepare_data|review","requires_approval":true|false,"notes":"..."}' + "]" + "}\n" + f"Use at most {_get(payload, 'max_steps', 3)} steps.\n" + "Prefer safe read-only and draft actions first.\n\n" + f"Task type: {_get(payload, 'task_type', '')}\n" + f"Objective: {_get(payload, 'objective', '')}\n" + f"Constraints:\n{constraint_lines}\n\n" + "Retrieved context:\n" + f"{context}\n" + ) + + +def fallback_plan(max_steps: int) -> List[Dict[str, Any]]: + return [ + { + "step_id": "S1", + "title": "Gather relevant facts and constraints", + "action_type": "research", + "requires_approval": False, + "notes": "Review messages/concepts and identify required context.", + }, + { + "step_id": "S2", + "title": "Draft a response or action proposal", + "action_type": "draft", + "requires_approval": False, + "notes": "Produce a concise draft aligned with objective and constraints.", + }, + { + "step_id": "S3", + "title": "Request user confirmation before any external action", + "action_type": "ask_user", + "requires_approval": True, + "notes": "Do not send or execute changes until approved.", + }, + ][: max_steps] + + +def find_plan_step(plan: List[Any], step_id: str) -> Optional[Any]: + for s in plan: + if _get(s, "step_id", "") == step_id: + return s + return None + + +def is_high_risk_step(step: Any) -> bool: + text = f"{_get(step, 'title', '')} {_get(step, 'notes', '') or ''}".lower() + high_risk_terms = [ + "send", + "submit", + "pay", + "payment", + "transfer", + "wire", + "sign", + "file", + "delete", + "close account", + "change account", + ] + return any(t in text for t in high_risk_terms) + + +def enforce_step_policy(payload: Any, step: Any) -> Optional[str]: + if bool(_get(step, "requires_approval", False)) and not bool(_get(payload, "approved", False)): + return "Step requires approval but approved=false." + if is_high_risk_step(step): + if not bool(_get(payload, "approved", False)): + return "High-risk step requires approved=true." + token = str(_get(payload, "manual_confirm_token", "") or "") + if not token.strip(): + return "High-risk step requires manual_confirm_token." + return None + + +async def execute_plan_step( + payload: Any, + step: Any, + generate_text: Callable[[str], Awaitable[str]], +) -> Dict[str, Any]: + action_type = str(_get(step, "action_type", "")) + title = str(_get(step, "title", "")) + notes = str(_get(step, "notes", "") or "") + if action_type == "draft": + prompt = ( + "Draft concise text for this approved planning step.\n" + f"Task type: {_get(payload, 'task_type', '')}\n" + f"Objective: {_get(payload, 'objective', '')}\n" + f"Step: {title}\n" + f"Notes: {notes}\n" + "Output only final draft text." + ) + try: + text = await generate_text(prompt) + if not text.strip(): + text = f"Draft for step '{title}'." + except Exception: + text = f"Draft for step '{title}'." + return {"draft": text} + if action_type == "research": + return {"note": "Research step acknowledged. Use /search or /assistant/draft for grounded retrieval."} + if action_type == "prepare_data": + return {"note": "Prepare-data step acknowledged.", "checklist": ["Collect required inputs", "Normalize format", "Validate completeness"]} + if action_type == "review": + return {"note": "Review step requires human review before external action."} + if action_type == "ask_user": + return {"question": "Please confirm whether to proceed with the next high-impact action."} + return {"note": "Step recognized but no executor implemented."} + diff --git a/services/assistant_retrieval.py b/services/assistant_retrieval.py new file mode 100644 index 0000000..bef311c --- /dev/null +++ b/services/assistant_retrieval.py @@ -0,0 +1,115 @@ +import hashlib +import json +import re +from dataclasses import dataclass +from typing import Any, Awaitable, Callable, Dict, List, Optional, Set + + +DEFAULT_QUERY_STOPWORDS: Set[str] = { + "the", "and", "for", "with", "that", "this", "from", "have", "has", "had", "are", "was", "were", "will", + "would", "should", "could", "can", "you", "your", "about", "what", "when", "where", "which", "who", "whom", + "why", "how", "tomorrow", "today", "please", "any", "there", "need", "want", "know", "does", "did", "done", +} + + +@dataclass(frozen=True) +class AssistantRetrievalConfig: + min_token_overlap: int = 1 + source_candidate_multiplier: int = 4 + source_min_coverage: float = 0.6 + query_stopwords: Set[str] = frozenset(DEFAULT_QUERY_STOPWORDS) + + +def query_tokens(text: str, stopwords: Set[str]) -> Set[str]: + return { + t for t in re.findall(r"[a-z0-9]{3,}", (text or "").lower()) + if t not in stopwords + } + + +def source_text_for_match(src: Dict[str, Any]) -> str: + return " ".join( + [ + str(src.get("text") or ""), + str(src.get("description") or ""), + str(src.get("summary") or ""), + str(src.get("display_name") or ""), + str(src.get("canonical_name") or ""), + ] + ) + + +def is_strong_source_match(query: str, src: Dict[str, Any], cfg: AssistantRetrievalConfig) -> bool: + q_tokens = query_tokens(query, cfg.query_stopwords) + if not q_tokens: + return True + s_tokens = query_tokens(source_text_for_match(src), cfg.query_stopwords) + overlap = len(q_tokens.intersection(s_tokens)) + q_len = max(1, len(q_tokens)) + coverage = overlap / q_len + min_overlap = cfg.min_token_overlap + if q_len >= 2: + min_overlap = max(min_overlap, 2) + return overlap >= min_overlap and coverage >= cfg.source_min_coverage + + +async def retrieve_sources_two_stage( + query: str, + release_name: Optional[str], + max_sources: int, + include_release_recent_fallback: bool, + cfg: AssistantRetrievalConfig, + es_search_hits: Callable[[str, int, Optional[str]], Awaitable[List[Dict[str, Any]]]], + es_recent_by_release: Callable[[str, int], Awaitable[List[Dict[str, Any]]]], +) -> List[Dict[str, Any]]: + candidate_size = max(max_sources, max_sources * max(2, cfg.source_candidate_multiplier)) + seen_keys: set[str] = set() + candidates: List[Dict[str, Any]] = [] + + def add_hits(hs: List[Dict[str, Any]]) -> None: + for h in hs: + src = h.get("_source", {}) or {} + key = str(src.get("concept_id") or src.get("source_pk") or "") + if not key: + key = hashlib.sha256( + json.dumps(src, ensure_ascii=False, sort_keys=True).encode("utf-8") + ).hexdigest()[:20] + if key in seen_keys: + continue + seen_keys.add(key) + candidates.append(h) + + try: + add_hits(await es_search_hits(query, candidate_size, release_name)) + except Exception as e: + print(f"[WARN] stage1 release search failed: {e}") + if len(candidates) < max_sources: + try: + add_hits(await es_search_hits(query, candidate_size, None)) + except Exception as e: + print(f"[WARN] stage1 global search failed: {e}") + if len(candidates) < max_sources and include_release_recent_fallback and release_name: + try: + add_hits(await es_recent_by_release(release_name, candidate_size)) + except Exception as e: + print(f"[WARN] stage1 release-recent fallback failed: {e}") + + q_tokens = query_tokens(query, cfg.query_stopwords) + ranked: List[Dict[str, Any]] = [] + for h in candidates: + src = h.get("_source", {}) or {} + s_tokens = query_tokens(source_text_for_match(src), cfg.query_stopwords) + overlap = len(q_tokens.intersection(s_tokens)) if q_tokens else 0 + base_score = float(h.get("_score") or 0.0) + ranked.append({"hit": h, "overlap": overlap, "base_score": base_score}) + + ranked.sort(key=lambda x: (x["overlap"], x["base_score"]), reverse=True) + relevant = [] + for x in ranked: + src = x["hit"].get("_source", {}) or {} + if is_strong_source_match(query, src, cfg): + relevant.append(x) + if relevant: + return [x["hit"] for x in relevant[:max_sources]] + return [] + diff --git a/services/imap_ingest.py b/services/imap_ingest.py new file mode 100644 index 0000000..c8ae879 --- /dev/null +++ b/services/imap_ingest.py @@ -0,0 +1,193 @@ +import email +import hashlib +import imaplib +import os +from datetime import timezone +from email import policy +from email.utils import parseaddr, parsedate_to_datetime +from typing import Any, Dict, List, Optional + + +def clean_header_id(v: Optional[str]) -> str: + if not v: + return "" + return v.strip().strip("<>").strip() + + +def normalize_thread_id(msg_id: str, refs: str, in_reply_to: str, subject: str, sender: str) -> str: + refs_clean = clean_header_id(refs.split()[-1] if refs else "") + in_reply_clean = clean_header_id(in_reply_to) + if refs_clean: + return f"thread:{refs_clean}" + if in_reply_clean: + return f"thread:{in_reply_clean}" + seed = f"{subject.strip().lower()}|{sender.strip().lower()}" + if not seed.strip("|"): + seed = msg_id + return "thread:" + hashlib.sha256(seed.encode("utf-8")).hexdigest()[:24] + + +def extract_body_text(msg: email.message.Message) -> str: + try: + if msg.is_multipart(): + for part in msg.walk(): + ctype = (part.get_content_type() or "").lower() + disp = (part.get("Content-Disposition") or "").lower() + if ctype == "text/plain" and "attachment" not in disp: + payload_obj = part.get_content() + if isinstance(payload_obj, str): + return payload_obj.strip() + if isinstance(payload_obj, bytes): + return payload_obj.decode(part.get_content_charset() or "utf-8", errors="replace").strip() + for part in msg.walk(): + ctype = (part.get_content_type() or "").lower() + if ctype == "text/html": + html_obj = part.get_content() + if isinstance(html_obj, bytes): + html_obj = html_obj.decode(part.get_content_charset() or "utf-8", errors="replace") + if isinstance(html_obj, str): + return html_obj.strip() + return "" + payload_obj = msg.get_content() + if isinstance(payload_obj, str): + return payload_obj.strip() + if isinstance(payload_obj, bytes): + return payload_obj.decode(msg.get_content_charset() or "utf-8", errors="replace").strip() + return "" + except Exception: + return "" + + +def fetch_imap_messages_blocking( + payload: Any, + effective_search_criteria: str, + since_uid: Optional[int], +) -> List[Dict[str, Any]]: + password = getattr(payload, "password", None) or os.getenv("IMAP_PASSWORD", "") + if not password: + raise ValueError("IMAP password missing: provide payload.password or set IMAP_PASSWORD") + + host = str(getattr(payload, "host")) + port = int(getattr(payload, "port")) + use_ssl = bool(getattr(payload, "use_ssl")) + username = str(getattr(payload, "username")) + mailbox = str(getattr(payload, "mailbox")) + max_messages = int(getattr(payload, "max_messages")) + channel = str(getattr(payload, "channel")) + + if use_ssl: + client = imaplib.IMAP4_SSL(host, port) + else: + client = imaplib.IMAP4(host, port) + + try: + status, _ = client.login(username, password) + if status != "OK": + raise RuntimeError("IMAP login failed") + status, _ = client.select(mailbox, readonly=True) + if status != "OK": + raise RuntimeError(f"IMAP select mailbox failed: {mailbox}") + + if since_uid is not None: + status, search_data = client.uid("search", None, "UID", f"{int(since_uid) + 1}:*") + else: + status, search_data = client.uid("search", None, effective_search_criteria) + if status != "OK": + raise RuntimeError(f"IMAP search failed: {effective_search_criteria}") + uid_bytes = search_data[0] if search_data else b"" + uid_list = [u for u in uid_bytes.decode("utf-8", errors="replace").split() if u] + if since_uid is not None: + filtered: List[str] = [] + for u in uid_list: + try: + if int(u) > int(since_uid): + filtered.append(u) + except Exception: + continue + uid_list = filtered + if not uid_list: + return [] + is_uid_window = since_uid is not None + if is_uid_window: + selected_uids = uid_list[:max_messages] + else: + selected_uids = uid_list[-max_messages:] + + out: List[Dict[str, Any]] = [] + for uid in selected_uids: + status, msg_data = client.uid("fetch", uid, "(RFC822)") + if status != "OK" or not msg_data: + continue + raw_bytes = None + for part in msg_data: + if isinstance(part, tuple) and len(part) >= 2 and isinstance(part[1], (bytes, bytearray)): + raw_bytes = bytes(part[1]) + break + if not raw_bytes: + continue + msg = email.message_from_bytes(raw_bytes, policy=policy.default) + + subject = str(msg.get("Subject") or "").strip() + from_raw = str(msg.get("From") or "").strip() + to_raw = str(msg.get("To") or "").strip() + date_raw = str(msg.get("Date") or "").strip() + msg_id_raw = str(msg.get("Message-Id") or msg.get("Message-ID") or "").strip() + refs_raw = str(msg.get("References") or "").strip() + in_reply_raw = str(msg.get("In-Reply-To") or "").strip() + + sender_email = parseaddr(from_raw)[1] or from_raw or "unknown" + msg_id_clean = clean_header_id(msg_id_raw) + if not msg_id_clean: + seed = f"{uid}|{subject}|{sender_email}|{date_raw}" + msg_id_clean = "imap-" + hashlib.sha256(seed.encode("utf-8")).hexdigest()[:24] + + thread_id = normalize_thread_id( + msg_id=msg_id_clean, + refs=refs_raw, + in_reply_to=in_reply_raw, + subject=subject, + sender=sender_email, + ) + + sent_at_iso = None + if date_raw: + try: + dt = parsedate_to_datetime(date_raw) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + sent_at_iso = dt.astimezone(timezone.utc).isoformat() + except Exception: + sent_at_iso = None + + body = extract_body_text(msg) + if not body: + body = f"(no body) {subject}".strip() + + metadata = { + "subject": subject, + "from": from_raw, + "to": to_raw, + "date": date_raw, + "imap_uid": uid, + "mailbox": mailbox, + "host": host, + "username": username, + } + out.append( + { + "thread_id": thread_id, + "message_id": msg_id_clean, + "sender": sender_email, + "channel": channel, + "sent_at": sent_at_iso, + "body": body, + "metadata": metadata, + } + ) + return out + finally: + try: + client.logout() + except Exception: + pass + diff --git a/services/remote_ops.py b/services/remote_ops.py new file mode 100644 index 0000000..1595702 --- /dev/null +++ b/services/remote_ops.py @@ -0,0 +1,555 @@ +import asyncio +import base64 +import json +import os +import shlex +import tempfile +import uuid +from dataclasses import dataclass +from typing import Any, Dict, List, Optional + +from fastapi import HTTPException + + +@dataclass(frozen=True) +class RemoteOpsConfig: + ssh_host: str + remote_dir: str + ssh_bin: str + ssh_opts: str + scp_bin: str + scp_opts: str + timeout_sec: int + projector_remote_script: str + ingest_message_remote_script: str + ingest_messages_batch_remote_script: str + assistant_feedback_remote_script: str + assistant_feedback_query_remote_script: str + assistant_metrics_query_remote_script: str + assistant_action_remote_script: str + assistant_actions_query_remote_script: str + assistant_proposals_remote_script: str + assistant_proposals_query_remote_script: str + runs_remote_script: str + run_events_remote_script: str + imap_checkpoint_remote_script: str + create_messages_release_remote_script: str + + +def _tail(text: str, max_chars: int = 8000) -> str: + if len(text) <= max_chars: + return text + return text[-max_chars:] + + +def _b64(s: str) -> str: + return base64.b64encode(s.encode("utf-8")).decode("ascii") + + +def _extract_json_array_from_text(text: str) -> List[Dict[str, Any]]: + start = text.find("[") + end = text.rfind("]") + if start == -1 or end == -1 or end < start: + raise ValueError("No JSON array found in output") + candidate = text[start : end + 1] + obj = json.loads(candidate) + if not isinstance(obj, list): + raise ValueError("Parsed value is not a JSON array") + out: List[Dict[str, Any]] = [] + for item in obj: + if isinstance(item, dict): + out.append(item) + return out + + +def _extract_json_object_from_text(text: str) -> Dict[str, Any]: + start = text.find("{") + end = text.rfind("}") + if start == -1 or end == -1 or end < start: + raise ValueError("No JSON object found in output") + candidate = text[start : end + 1] + obj = json.loads(candidate) + if not isinstance(obj, dict): + raise ValueError("Parsed value is not a JSON object") + return obj + + +class RemoteOps: + def __init__(self, cfg: RemoteOpsConfig): + self.cfg = cfg + + def _ssh_args(self, command: str) -> List[str]: + return [self.cfg.ssh_bin, *shlex.split(self.cfg.ssh_opts), self.cfg.ssh_host, command] + + async def _run_ssh(self, parts: List[str], timeout_error: str) -> Dict[str, Any]: + command = f"cd {shlex.quote(self.cfg.remote_dir)} && {' '.join(shlex.quote(p) for p in parts)}" + proc = await asyncio.create_subprocess_exec( + *self._ssh_args(command), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + try: + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=self.cfg.timeout_sec) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + raise HTTPException(status_code=504, detail=timeout_error) + out = stdout.decode("utf-8", errors="replace") + err = stderr.decode("utf-8", errors="replace") + return {"code": proc.returncode, "out": out, "err": err} + + def _error_detail(self, code: int, out: str, err: str) -> Dict[str, Any]: + return { + "host": self.cfg.ssh_host, + "remote_dir": self.cfg.remote_dir, + "exit_code": code, + "stdout_tail": _tail(out), + "stderr_tail": _tail(err), + } + + async def run_remote_query_imap_checkpoint( + self, + host: str, + mailbox: str, + username: str, + table: str, + ) -> Optional[int]: + res = await self._run_ssh( + [self.cfg.imap_checkpoint_remote_script, host, mailbox, username, table], + "IMAP checkpoint query timed out", + ) + if res["code"] != 0: + raise HTTPException(status_code=502, detail=self._error_detail(res["code"], res["out"], res["err"])) + try: + obj = _extract_json_object_from_text(res["out"]) + val = obj.get("max_uid") + if val is None: + return None + return int(val) + except Exception as e: + raise HTTPException( + status_code=502, + detail={ + "message": f"Unable to parse IMAP checkpoint output: {e}", + "stdout_tail": _tail(res["out"]), + "stderr_tail": _tail(res["err"]), + }, + ) + + async def run_remote_create_messages_release(self, release_name: str) -> Dict[str, Any]: + res = await self._run_ssh( + [self.cfg.create_messages_release_remote_script, release_name], + "Create messages release timed out", + ) + result = { + **self._error_detail(res["code"], res["out"], res["err"]), + "release_name": release_name, + } + if res["code"] != 0: + raise HTTPException(status_code=502, detail=result) + return result + + async def run_remote_projector(self, payload: Any) -> Dict[str, Any]: + parts = [ + self.cfg.projector_remote_script, + "--release-name", str(getattr(payload, "release_name", "")), + "--targets", str(getattr(payload, "targets", "both")), + ] + if getattr(payload, "concept_table", None): + parts.extend(["--concept-table", str(getattr(payload, "concept_table"))]) + if bool(getattr(payload, "dry_run", False)): + parts.append("--dry-run") + + res = await self._run_ssh(parts, "Projector execution timed out") + result = { + **self._error_detail(res["code"], res["out"], res["err"]), + "spark_read_done": "[STEP] spark_read_done" in res["out"], + "projection_done": "[STEP] projection_done" in res["out"], + } + if res["code"] != 0: + raise HTTPException(status_code=502, detail=result) + return result + + async def run_remote_ingest_message(self, payload: Any) -> Dict[str, Any]: + parts = [ + self.cfg.ingest_message_remote_script, + str(getattr(payload, "table")), + str(getattr(payload, "thread_id")), + str(getattr(payload, "message_id")), + str(getattr(payload, "sender")), + str(getattr(payload, "channel")), + str(getattr(payload, "sent_at") or ""), + _b64(str(getattr(payload, "body") or "")), + _b64(json.dumps(getattr(payload, "metadata", {}) or {}, ensure_ascii=False)), + ] + res = await self._run_ssh(parts, "Message ingest execution timed out") + result = self._error_detail(res["code"], res["out"], res["err"]) + if res["code"] != 0: + raise HTTPException(status_code=502, detail=result) + return result + + async def run_remote_ingest_messages_batch(self, payload: Any) -> Dict[str, Any]: + rows = [] + for m in list(getattr(payload, "messages", []) or []): + rows.append( + { + "thread_id": getattr(m, "thread_id"), + "message_id": getattr(m, "message_id"), + "sender": getattr(m, "sender"), + "channel": getattr(m, "channel"), + "sent_at": getattr(m, "sent_at"), + "body": getattr(m, "body"), + "metadata": getattr(m, "metadata"), + } + ) + if not rows: + return { + "host": self.cfg.ssh_host, + "remote_dir": self.cfg.remote_dir, + "exit_code": 0, + "rows": 0, + "stdout_tail": "[INFO] No rows to ingest", + "stderr_tail": "", + } + + local_tmp = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False, encoding="utf-8") + remote_tmp = f"{self.cfg.remote_dir}/.ingest_messages_{uuid.uuid4().hex}.json" + try: + json.dump(rows, local_tmp, ensure_ascii=False) + local_tmp.flush() + local_tmp.close() + + scp_target = f"{self.cfg.ssh_host}:{remote_tmp}" + scp_args = [self.cfg.scp_bin, *shlex.split(self.cfg.scp_opts), local_tmp.name, scp_target] + scp_proc = await asyncio.create_subprocess_exec( + *scp_args, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + try: + scp_stdout, scp_stderr = await asyncio.wait_for(scp_proc.communicate(), timeout=self.cfg.timeout_sec) + except asyncio.TimeoutError: + scp_proc.kill() + await scp_proc.wait() + raise HTTPException(status_code=504, detail="Batch payload upload timed out") + if scp_proc.returncode != 0: + raise HTTPException( + status_code=502, + detail={ + "host": self.cfg.ssh_host, + "remote_dir": self.cfg.remote_dir, + "exit_code": scp_proc.returncode, + "stdout_tail": _tail(scp_stdout.decode("utf-8", errors="replace")), + "stderr_tail": _tail(scp_stderr.decode("utf-8", errors="replace")), + }, + ) + + payload_arg = f"@{remote_tmp}" + parts = [ + self.cfg.ingest_messages_batch_remote_script, + str(getattr(payload, "table")), + str(getattr(payload, "dedupe_mode")), + payload_arg, + ] + batch_cmd = " ".join(shlex.quote(p) for p in parts) + command = ( + f"cd {shlex.quote(self.cfg.remote_dir)} && " + f"({batch_cmd}); rc=$?; rm -f {shlex.quote(remote_tmp)}; exit $rc" + ) + proc = await asyncio.create_subprocess_exec( + *self._ssh_args(command), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + try: + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=self.cfg.timeout_sec) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + raise HTTPException(status_code=504, detail="Batch message ingest execution timed out") + finally: + try: + os.unlink(local_tmp.name) + except Exception: + pass + + out = stdout.decode("utf-8", errors="replace") + err = stderr.decode("utf-8", errors="replace") + result = { + "host": self.cfg.ssh_host, + "remote_dir": self.cfg.remote_dir, + "exit_code": proc.returncode, + "rows": len(rows), + "stdout_tail": _tail(out), + "stderr_tail": _tail(err), + } + if proc.returncode != 0: + raise HTTPException(status_code=502, detail=result) + return result + + async def run_remote_assistant_feedback(self, feedback_id: str, payload: Any, created_at_utc: str) -> Dict[str, Any]: + confidence = getattr(payload, "confidence", None) + conf = confidence if confidence is not None else 0.0 + sources = [s.model_dump() for s in list(getattr(payload, "sources", []) or [])] + parts = [ + self.cfg.assistant_feedback_remote_script, + feedback_id, + created_at_utc, + str(getattr(payload, "outcome")), + str(getattr(payload, "task_type")), + str(getattr(payload, "release_name") or ""), + f"{conf}", + "true" if bool(getattr(payload, "needs_review", False)) else "false", + _b64(str(getattr(payload, "goal") or "")), + _b64(str(getattr(payload, "draft") or "")), + _b64(str(getattr(payload, "final_text") or "")), + _b64(json.dumps(sources, ensure_ascii=False)), + _b64(str(getattr(payload, "notes") or "")), + ] + res = await self._run_ssh(parts, "Assistant feedback execution timed out") + result = self._error_detail(res["code"], res["out"], res["err"]) + if res["code"] != 0: + raise HTTPException(status_code=502, detail=result) + return result + + async def run_remote_query_assistant_feedback( + self, outcome: Optional[str], task_type: Optional[str], release_name: Optional[str], limit: int + ) -> Dict[str, Any]: + parts = [ + self.cfg.assistant_feedback_query_remote_script, + outcome or "", + task_type or "", + release_name or "", + str(limit), + ] + res = await self._run_ssh(parts, "Assistant feedback query timed out") + if res["code"] != 0: + raise HTTPException(status_code=502, detail=self._error_detail(res["code"], res["out"], res["err"])) + try: + rows = _extract_json_array_from_text(res["out"]) + except Exception as e: + raise HTTPException( + status_code=502, + detail={ + "message": f"Unable to parse feedback query output: {e}", + "stdout_tail": _tail(res["out"]), + "stderr_tail": _tail(res["err"]), + }, + ) + return {"host": self.cfg.ssh_host, "remote_dir": self.cfg.remote_dir, "rows": rows} + + async def run_remote_query_assistant_metrics( + self, task_type: Optional[str], release_name: Optional[str], outcome: Optional[str], group_by: str, limit: int + ) -> Dict[str, Any]: + parts = [ + self.cfg.assistant_metrics_query_remote_script, + task_type or "", + release_name or "", + outcome or "", + group_by, + str(limit), + ] + res = await self._run_ssh(parts, "Assistant metrics query timed out") + if res["code"] != 0: + raise HTTPException(status_code=502, detail=self._error_detail(res["code"], res["out"], res["err"])) + try: + rows = _extract_json_array_from_text(res["out"]) + except Exception as e: + raise HTTPException( + status_code=502, + detail={ + "message": f"Unable to parse metrics query output: {e}", + "stdout_tail": _tail(res["out"]), + "stderr_tail": _tail(res["err"]), + }, + ) + return {"host": self.cfg.ssh_host, "remote_dir": self.cfg.remote_dir, "rows": rows} + + async def run_remote_assistant_action( + self, action_id: str, payload: Any, step: Any, status: str, output_json: Dict[str, Any], error_text: Optional[str], created_at_utc: str + ) -> Dict[str, Any]: + parts = [ + self.cfg.assistant_action_remote_script, + action_id, + created_at_utc, + str(getattr(payload, "task_type")), + str(getattr(payload, "release_name") or ""), + _b64(str(getattr(payload, "objective") or "")), + str(getattr(step, "step_id")), + _b64(str(getattr(step, "title") or "")), + str(getattr(step, "action_type")), + "true" if bool(getattr(step, "requires_approval", False)) else "false", + "true" if bool(getattr(payload, "approved", False)) else "false", + status, + _b64(json.dumps(output_json, ensure_ascii=False)), + _b64(error_text or ""), + ] + res = await self._run_ssh(parts, "Assistant action logging timed out") + result = self._error_detail(res["code"], res["out"], res["err"]) + if res["code"] != 0: + raise HTTPException(status_code=502, detail=result) + return result + + async def run_remote_query_assistant_actions( + self, + status: Optional[str], + task_type: Optional[str], + release_name: Optional[str], + step_id: Optional[str], + action_type: Optional[str], + limit: int, + ) -> Dict[str, Any]: + parts = [ + self.cfg.assistant_actions_query_remote_script, + status or "", + task_type or "", + release_name or "", + step_id or "", + action_type or "", + str(limit), + ] + res = await self._run_ssh(parts, "Assistant actions query timed out") + if res["code"] != 0: + raise HTTPException(status_code=502, detail=self._error_detail(res["code"], res["out"], res["err"])) + try: + rows = _extract_json_array_from_text(res["out"]) + except Exception as e: + raise HTTPException( + status_code=502, + detail={ + "message": f"Unable to parse actions query output: {e}", + "stdout_tail": _tail(res["out"]), + "stderr_tail": _tail(res["err"]), + }, + ) + return {"host": self.cfg.ssh_host, "remote_dir": self.cfg.remote_dir, "rows": rows} + + async def run_remote_record_assistant_proposals( + self, + proposal_set_id: str, + created_at_utc: str, + objective: str, + release_name: Optional[str], + summary: str, + signals: Dict[str, Any], + proposals: List[Dict[str, Any]], + ) -> Dict[str, Any]: + parts = [ + self.cfg.assistant_proposals_remote_script, + proposal_set_id, + created_at_utc, + _b64(objective or ""), + release_name or "", + _b64(summary or ""), + _b64(json.dumps(signals or {}, ensure_ascii=False)), + _b64(json.dumps(proposals or [], ensure_ascii=False)), + ] + res = await self._run_ssh(parts, "Assistant proposals logging timed out") + result = self._error_detail(res["code"], res["out"], res["err"]) + if res["code"] != 0: + raise HTTPException(status_code=502, detail=result) + return result + + async def run_remote_query_assistant_proposals( + self, + release_name: Optional[str], + proposal_set_id: Optional[str], + limit: int, + ) -> Dict[str, Any]: + parts = [ + self.cfg.assistant_proposals_query_remote_script, + release_name or "", + proposal_set_id or "", + str(limit), + ] + res = await self._run_ssh(parts, "Assistant proposals query timed out") + if res["code"] != 0: + raise HTTPException(status_code=502, detail=self._error_detail(res["code"], res["out"], res["err"])) + try: + rows = _extract_json_array_from_text(res["out"]) + except Exception as e: + raise HTTPException( + status_code=502, + detail={ + "message": f"Unable to parse proposals query output: {e}", + "stdout_tail": _tail(res["out"]), + "stderr_tail": _tail(res["err"]), + }, + ) + return {"host": self.cfg.ssh_host, "remote_dir": self.cfg.remote_dir, "rows": rows} + + async def run_remote_record_run( + self, + run_id: str, + run_type: str, + status: str, + started_at_utc: str, + finished_at_utc: str, + actor: str, + input_json: Dict[str, Any], + output_json: Optional[Dict[str, Any]], + error_text: Optional[str], + ) -> None: + parts = [ + self.cfg.runs_remote_script, + run_id, + run_type, + status, + started_at_utc, + finished_at_utc, + actor, + _b64(json.dumps(input_json, ensure_ascii=False)), + _b64(json.dumps(output_json, ensure_ascii=False) if output_json is not None else ""), + _b64(error_text or ""), + ] + command = f"cd {shlex.quote(self.cfg.remote_dir)} && {' '.join(shlex.quote(p) for p in parts)}" + proc = await asyncio.create_subprocess_exec( + *self._ssh_args(command), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=self.cfg.timeout_sec) + if proc.returncode != 0: + raise HTTPException( + status_code=502, + detail={ + "message": "Failed to record run in Iceberg", + "host": self.cfg.ssh_host, + "exit_code": proc.returncode, + "stdout_tail": _tail(stdout.decode("utf-8", errors="replace")), + "stderr_tail": _tail(stderr.decode("utf-8", errors="replace")), + }, + ) + + async def run_remote_record_event(self, run_id: str, event_type: str, detail_json: Dict[str, Any], created_at_utc: str) -> None: + parts = [ + self.cfg.run_events_remote_script, + run_id, + event_type, + created_at_utc, + _b64(json.dumps(detail_json, ensure_ascii=False)), + ] + command = f"cd {shlex.quote(self.cfg.remote_dir)} && {' '.join(shlex.quote(p) for p in parts)}" + proc = await asyncio.create_subprocess_exec( + *self._ssh_args(command), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=self.cfg.timeout_sec) + if proc.returncode != 0: + raise HTTPException( + status_code=502, + detail={ + "message": "Failed to record run event in Iceberg", + "host": self.cfg.ssh_host, + "exit_code": proc.returncode, + "stdout_tail": _tail(stdout.decode("utf-8", errors="replace")), + "stderr_tail": _tail(stderr.decode("utf-8", errors="replace")), + }, + ) + + async def record_event_best_effort(self, run_id: str, event_type: str, detail_json: Dict[str, Any], created_at_utc: str) -> None: + try: + await self.run_remote_record_event(run_id, event_type, detail_json, created_at_utc) + except Exception as e: + print(f"[WARN] run event logging failed: run_id={run_id} event={event_type} error={e}")