141 lines
5.3 KiB
Python
141 lines
5.3 KiB
Python
|
|
from typing import Any, Awaitable, Callable, Dict, List, Optional
|
||
|
|
|
||
|
|
|
||
|
|
def _get(obj: Any, name: str, default: Any = None) -> Any:
|
||
|
|
return getattr(obj, name, default)
|
||
|
|
|
||
|
|
|
||
|
|
def build_assistant_plan_prompt(payload: Any, source_docs: List[Dict[str, Any]]) -> str:
|
||
|
|
constraints = _get(payload, "constraints", []) or []
|
||
|
|
constraint_lines = "\n".join(f"- {c}" for c in constraints) if constraints else "- None"
|
||
|
|
context_chunks = []
|
||
|
|
for d in source_docs:
|
||
|
|
src = d.get("_source", {}) or {}
|
||
|
|
context_chunks.append(
|
||
|
|
"\n".join(
|
||
|
|
[
|
||
|
|
f"concept_id: {src.get('concept_id', '')}",
|
||
|
|
f"source_pk: {src.get('source_pk', '')}",
|
||
|
|
f"source_table: {src.get('source_table', '')}",
|
||
|
|
f"release_name: {src.get('release_name', '')}",
|
||
|
|
f"text: {str(src.get('text') or src.get('description') or src.get('summary') or '')[:600]}",
|
||
|
|
]
|
||
|
|
)
|
||
|
|
)
|
||
|
|
context = "\n\n---\n\n".join(context_chunks) if context_chunks else "No retrieved context."
|
||
|
|
return (
|
||
|
|
"You are a cautious personal assistant planner. Produce an execution plan only; do not execute anything.\n"
|
||
|
|
"Return valid JSON ONLY with this exact shape:\n"
|
||
|
|
'{'
|
||
|
|
'"plan": ['
|
||
|
|
'{"step_id":"S1","title":"...","action_type":"research|draft|ask_user|prepare_data|review","requires_approval":true|false,"notes":"..."}'
|
||
|
|
"]"
|
||
|
|
"}\n"
|
||
|
|
f"Use at most {_get(payload, 'max_steps', 3)} steps.\n"
|
||
|
|
"Prefer safe read-only and draft actions first.\n\n"
|
||
|
|
f"Task type: {_get(payload, 'task_type', '')}\n"
|
||
|
|
f"Objective: {_get(payload, 'objective', '')}\n"
|
||
|
|
f"Constraints:\n{constraint_lines}\n\n"
|
||
|
|
"Retrieved context:\n"
|
||
|
|
f"{context}\n"
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def fallback_plan(max_steps: int) -> List[Dict[str, Any]]:
|
||
|
|
return [
|
||
|
|
{
|
||
|
|
"step_id": "S1",
|
||
|
|
"title": "Gather relevant facts and constraints",
|
||
|
|
"action_type": "research",
|
||
|
|
"requires_approval": False,
|
||
|
|
"notes": "Review messages/concepts and identify required context.",
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"step_id": "S2",
|
||
|
|
"title": "Draft a response or action proposal",
|
||
|
|
"action_type": "draft",
|
||
|
|
"requires_approval": False,
|
||
|
|
"notes": "Produce a concise draft aligned with objective and constraints.",
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"step_id": "S3",
|
||
|
|
"title": "Request user confirmation before any external action",
|
||
|
|
"action_type": "ask_user",
|
||
|
|
"requires_approval": True,
|
||
|
|
"notes": "Do not send or execute changes until approved.",
|
||
|
|
},
|
||
|
|
][: max_steps]
|
||
|
|
|
||
|
|
|
||
|
|
def find_plan_step(plan: List[Any], step_id: str) -> Optional[Any]:
|
||
|
|
for s in plan:
|
||
|
|
if _get(s, "step_id", "") == step_id:
|
||
|
|
return s
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def is_high_risk_step(step: Any) -> bool:
|
||
|
|
text = f"{_get(step, 'title', '')} {_get(step, 'notes', '') or ''}".lower()
|
||
|
|
high_risk_terms = [
|
||
|
|
"send",
|
||
|
|
"submit",
|
||
|
|
"pay",
|
||
|
|
"payment",
|
||
|
|
"transfer",
|
||
|
|
"wire",
|
||
|
|
"sign",
|
||
|
|
"file",
|
||
|
|
"delete",
|
||
|
|
"close account",
|
||
|
|
"change account",
|
||
|
|
]
|
||
|
|
return any(t in text for t in high_risk_terms)
|
||
|
|
|
||
|
|
|
||
|
|
def enforce_step_policy(payload: Any, step: Any) -> Optional[str]:
|
||
|
|
if bool(_get(step, "requires_approval", False)) and not bool(_get(payload, "approved", False)):
|
||
|
|
return "Step requires approval but approved=false."
|
||
|
|
if is_high_risk_step(step):
|
||
|
|
if not bool(_get(payload, "approved", False)):
|
||
|
|
return "High-risk step requires approved=true."
|
||
|
|
token = str(_get(payload, "manual_confirm_token", "") or "")
|
||
|
|
if not token.strip():
|
||
|
|
return "High-risk step requires manual_confirm_token."
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
async def execute_plan_step(
|
||
|
|
payload: Any,
|
||
|
|
step: Any,
|
||
|
|
generate_text: Callable[[str], Awaitable[str]],
|
||
|
|
) -> Dict[str, Any]:
|
||
|
|
action_type = str(_get(step, "action_type", ""))
|
||
|
|
title = str(_get(step, "title", ""))
|
||
|
|
notes = str(_get(step, "notes", "") or "")
|
||
|
|
if action_type == "draft":
|
||
|
|
prompt = (
|
||
|
|
"Draft concise text for this approved planning step.\n"
|
||
|
|
f"Task type: {_get(payload, 'task_type', '')}\n"
|
||
|
|
f"Objective: {_get(payload, 'objective', '')}\n"
|
||
|
|
f"Step: {title}\n"
|
||
|
|
f"Notes: {notes}\n"
|
||
|
|
"Output only final draft text."
|
||
|
|
)
|
||
|
|
try:
|
||
|
|
text = await generate_text(prompt)
|
||
|
|
if not text.strip():
|
||
|
|
text = f"Draft for step '{title}'."
|
||
|
|
except Exception:
|
||
|
|
text = f"Draft for step '{title}'."
|
||
|
|
return {"draft": text}
|
||
|
|
if action_type == "research":
|
||
|
|
return {"note": "Research step acknowledged. Use /search or /assistant/draft for grounded retrieval."}
|
||
|
|
if action_type == "prepare_data":
|
||
|
|
return {"note": "Prepare-data step acknowledged.", "checklist": ["Collect required inputs", "Normalize format", "Validate completeness"]}
|
||
|
|
if action_type == "review":
|
||
|
|
return {"note": "Review step requires human review before external action."}
|
||
|
|
if action_type == "ask_user":
|
||
|
|
return {"question": "Please confirm whether to proceed with the next high-impact action."}
|
||
|
|
return {"note": "Step recognized but no executor implemented."}
|
||
|
|
|