import argparse import hashlib import json import os import urllib.error import urllib.request from datetime import date, datetime, timezone from typing import Any, Dict, List, Optional try: from dotenv import load_dotenv except Exception: load_dotenv = None DEFAULT_SPARK_PACKAGES = ( "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.10.1," "org.apache.iceberg:iceberg-aws-bundle:1.10.1," "org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.104.5" ) def utc_now_iso() -> str: return datetime.now(timezone.utc).isoformat() def parse_json_maybe(value: Any, expected_type: type, fallback: Any) -> Any: if value is None: return fallback if isinstance(value, expected_type): return value if isinstance(value, str): try: parsed = json.loads(value) if isinstance(parsed, expected_type): return parsed except Exception: return fallback return fallback def first_str(row: Dict[str, Any], keys: List[str]) -> Optional[str]: for key in keys: val = row.get(key) if isinstance(val, str) and val.strip(): return val.strip() return None def to_iso(value: Any) -> Optional[str]: if isinstance(value, datetime): return value.isoformat() if isinstance(value, date): return datetime.combine(value, datetime.min.time(), timezone.utc).isoformat() if isinstance(value, str) and value.strip(): return value.strip() return None def make_fingerprint(name: str, kind: Optional[str], external_ids: Dict[str, str]) -> str: norm = (name or "").strip().lower() kind_norm = (kind or "").strip().lower() ext = "|".join(f"{k}:{v}".lower() for k, v in sorted(external_ids.items())) raw = f"{norm}|{kind_norm}|{ext}" return hashlib.sha256(raw.encode("utf-8")).hexdigest() def load_manifest(path: str) -> Dict[str, Any]: with open(path, "r", encoding="utf-8") as f: raw = json.load(f) if isinstance(raw, dict): manifest_json = raw.get("manifest_json") if isinstance(manifest_json, str): try: parsed = json.loads(manifest_json) if isinstance(parsed, dict): return parsed except Exception: pass return raw if isinstance(raw, list) and raw and isinstance(raw[0], dict): manifest_json = raw[0].get("manifest_json") if isinstance(manifest_json, str): parsed = json.loads(manifest_json) if isinstance(parsed, dict): return parsed raise ValueError("Manifest file must contain a manifest object or releases_v2 row with manifest_json.") def infer_manifest_ref(manifest: Dict[str, Any]) -> Optional[str]: nessie = manifest.get("nessie") if isinstance(nessie, dict): ref_obj = nessie.get("ref") if isinstance(ref_obj, dict): ref_name = ref_obj.get("name") if isinstance(ref_name, str) and ref_name.strip(): return ref_name.strip() tag = nessie.get("tag") if isinstance(tag, str) and tag.strip(): return tag.strip() release_obj = manifest.get("release") if isinstance(release_obj, dict): release_name = release_obj.get("name") if isinstance(release_name, str) and release_name.strip(): return release_name.strip() for key in ("nessie_tag", "tag", "release_name"): val = manifest.get(key) if isinstance(val, str) and val.strip(): return val.strip() return None def extract_table_identifiers(manifest: Dict[str, Any]) -> List[str]: out: List[str] = [] tables = manifest.get("tables") if isinstance(tables, list): for t in tables: if not isinstance(t, dict): continue ident = t.get("table_identifier") or t.get("identifier") or t.get("table") if isinstance(ident, str) and ident.strip(): out.append(ident.strip()) if out: return out rows = manifest.get("rows") if isinstance(rows, list): for row in rows: if not isinstance(row, dict): continue ident = row.get("table_identifier") if isinstance(ident, str) and ident.strip(): out.append(ident.strip()) return out def infer_concept_table(tables: List[str]) -> Optional[str]: for t in tables: lower = t.lower() if "concept" in lower: return t return tables[0] if tables else None def load_manifest_from_registry( spark: Any, catalog: str, release_name: str, releases_table: Optional[str] = None, ) -> Dict[str, Any]: from pyspark.sql import functions as F table = releases_table or os.getenv("RELEASES_TABLE", "db1.releases_v2") if table.count(".") == 1: table = f"{catalog}.{table}" row = ( spark.table(table) .where(F.col("release_name") == release_name) .orderBy(F.col("ingested_at_utc").desc_nulls_last()) .select("manifest_json") .limit(1) .collect() ) if not row: raise ValueError(f"Release '{release_name}' not found in registry table {table}.") manifest_json = row[0]["manifest_json"] if not isinstance(manifest_json, str) or not manifest_json.strip(): raise ValueError(f"Release '{release_name}' has empty manifest_json in {table}.") manifest = json.loads(manifest_json) if not isinstance(manifest, dict): raise ValueError(f"Release '{release_name}' manifest_json is not a JSON object.") return manifest def build_spark(ref: str): try: from pyspark.sql import SparkSession except Exception as e: raise RuntimeError( "pyspark is not installed. Install it or run this with spark-submit." ) from e catalog = os.getenv("SPARK_CATALOG", "lake") builder = ( SparkSession.builder.appName("release-projector") .config( "spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions," "org.projectnessie.spark.extensions.NessieSparkSessionExtensions", ) .config("spark.jars.packages", os.getenv("SPARK_PACKAGES", DEFAULT_SPARK_PACKAGES)) .config(f"spark.sql.catalog.{catalog}", "org.apache.iceberg.spark.SparkCatalog") .config(f"spark.sql.catalog.{catalog}.catalog-impl", "org.apache.iceberg.nessie.NessieCatalog") .config(f"spark.sql.catalog.{catalog}.uri", os.getenv("NESSIE_URI", "http://lakehouse-core:19120/api/v2")) .config(f"spark.sql.catalog.{catalog}.ref", ref) .config( f"spark.sql.catalog.{catalog}.warehouse", os.getenv("NESSIE_WAREHOUSE", "s3a://lakehouse/warehouse"), ) .config(f"spark.sql.catalog.{catalog}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") .config("spark.hadoop.fs.s3a.endpoint", os.getenv("S3_ENDPOINT", "http://lakehouse-core:9000")) .config("spark.hadoop.fs.s3a.path.style.access", os.getenv("S3_PATH_STYLE", "true")) .config( "spark.hadoop.fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID", os.getenv("MINIO_ROOT_USER", "minioadmin")), ) .config( "spark.hadoop.fs.s3a.secret.key", os.getenv("AWS_SECRET_ACCESS_KEY", os.getenv("MINIO_ROOT_PASSWORD", "minioadmin")), ) ) spark_master = os.getenv("SPARK_MASTER") if spark_master: builder = builder.master(spark_master) return builder.getOrCreate(), catalog def ensure_es_index(es_url: str, es_index: str) -> None: mapping = { "mappings": { "properties": { "concept_id": {"type": "keyword"}, "concept_type": {"type": "keyword"}, "display_name": {"type": "text"}, "description": {"type": "text"}, "text": {"type": "text"}, "source_table": {"type": "keyword"}, "source_pk": {"type": "keyword"}, "release_name": {"type": "keyword"}, "ref_hash": {"type": "keyword"}, "attributes_json": {"type": "text"}, "canonical_name": {"type": "text"}, "kind": {"type": "keyword"}, "aliases": {"type": "text"}, "tags": {"type": "keyword"}, "summary": {"type": "text"}, "latest_cid": {"type": "keyword"}, "fingerprint": {"type": "keyword"}, "created_at": {"type": "date"}, "updated_at": {"type": "date"}, } } } url = f"{es_url.rstrip('/')}/{es_index}" req_get = urllib.request.Request(url, method="GET") try: with urllib.request.urlopen(req_get, timeout=30) as resp: if 200 <= resp.status < 300: return except urllib.error.HTTPError as e: if e.code != 404: raise body = json.dumps(mapping).encode("utf-8") req_put = urllib.request.Request(url, data=body, method="PUT") req_put.add_header("Content-Type", "application/json") with urllib.request.urlopen(req_put, timeout=30) as resp: if resp.status >= 400: raise RuntimeError(f"Failed to create ES index {es_index}: HTTP {resp.status}") def es_upsert(es_url: str, es_index: str, doc: Dict[str, Any]) -> None: url = f"{es_url.rstrip('/')}/{es_index}/_doc/{doc['concept_id']}" body = json.dumps(doc, default=str).encode("utf-8") req = urllib.request.Request(url, data=body, method="PUT") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: if resp.status >= 400: raise RuntimeError(f"Failed ES upsert for {doc['concept_id']}: HTTP {resp.status}") def gremlin_upsert(gremlin_url: str, concept: Dict[str, Any]) -> None: from gremlin_python.driver import client as gremlin_client from gremlin_python.driver.serializer import GraphSONSerializersV3d0 created_at = concept.get("created_at") or utc_now_iso() updated_at = concept.get("updated_at") or utc_now_iso() query = """ g.V().hasLabel('Concept').has('concept_id', concept_id).fold() .coalesce( unfold(), addV('Concept').property('concept_id', concept_id).property('created_at', created_at) ) .property('canonical_name', canonical_name) .property('kind', kind) .property('concept_type', concept_type) .property('display_name', display_name) .property('description', description) .property('text', text) .property('source_table', source_table) .property('source_pk', source_pk) .property('release_name', release_name) .property('ref_hash', ref_hash) .property('attributes_json', attributes_json) .property('aliases', aliases_json) .property('external_ids', external_ids_json) .property('tags', tags_json) .property('fingerprint', fingerprint) .property('latest_cid', latest_cid) .property('summary', summary) .property('updated_at', updated_at) .values('concept_id') """ c = gremlin_client.Client( gremlin_url, "g", message_serializer=GraphSONSerializersV3d0(), ) try: c.submit( query, { "concept_id": concept["concept_id"], "canonical_name": concept.get("canonical_name") or "", "kind": concept.get("kind") or "", "concept_type": concept.get("concept_type") or "", "display_name": concept.get("display_name") or "", "description": concept.get("description") or "", "text": concept.get("text") or "", "source_table": concept.get("source_table") or "", "source_pk": concept.get("source_pk") or "", "release_name": concept.get("release_name") or "", "ref_hash": concept.get("ref_hash") or "", "attributes_json": concept.get("attributes_json") or "{}", "aliases_json": json.dumps(concept.get("aliases", []), ensure_ascii=False), "external_ids_json": json.dumps(concept.get("external_ids", {}), ensure_ascii=False), "tags_json": json.dumps(concept.get("tags", []), ensure_ascii=False), "fingerprint": concept["fingerprint"], "latest_cid": concept.get("latest_cid") or "", "summary": concept.get("summary") or "", "created_at": created_at, "updated_at": updated_at, }, ).all().result() finally: c.close() def _infer_concept_type(row: Dict[str, Any], source_table: Optional[str]) -> str: explicit = first_str(row, ["concept_type", "kind", "type"]) if explicit: return explicit.lower() lower_table = (source_table or "").lower() if "messages" in lower_table: return "message" if "docs" in lower_table or "documents" in lower_table: return "document" if "message_id" in row: return "message" if "doc_id" in row or "document_id" in row: return "document" return "entity" def _source_pk(row: Dict[str, Any]) -> Optional[str]: return first_str(row, ["source_pk", "message_id", "doc_id", "document_id", "id", "uuid"]) def row_to_concept( row: Dict[str, Any], source_table: Optional[str], release_name: Optional[str], ref_hash: Optional[str], ) -> Optional[Dict[str, Any]]: concept_type = _infer_concept_type(row, source_table) source_pk = _source_pk(row) display_name = first_str( row, [ "display_name", "canonical_name", "title", "name", "subject", "doc_name", "document_name", ], ) if not display_name and source_pk: display_name = f"{concept_type}:{source_pk}" if not display_name: display_name = first_str(row, ["body", "text", "content"]) if display_name: display_name = display_name[:120] if not display_name: return None external_ids = parse_json_maybe(row.get("external_ids"), dict, {}) aliases = parse_json_maybe(row.get("aliases"), list, []) tags = parse_json_maybe(row.get("tags"), list, []) kind = first_str(row, ["kind", "type", "doc_type", "document_type"]) or concept_type concept_id = first_str(row, ["concept_id", "doc_id", "document_id", "id", "uuid"]) if not concept_id and source_pk: concept_id = f"{concept_type}:{source_pk}" if not isinstance(concept_id, str) or not concept_id.strip(): concept_id = hashlib.sha256( f"{concept_type}|{display_name}|{json.dumps(external_ids, sort_keys=True)}".encode("utf-8") ).hexdigest() description = first_str(row, ["description", "summary", "abstract"]) if not description: body = first_str(row, ["content", "text", "body"]) if body: description = body[:512] text = first_str(row, ["text", "content", "body"]) if not text: text = description # Keep typed attributes stable and searchable without exploding ES mapping. attributes_obj = row return { "concept_id": concept_id, "concept_type": concept_type, "display_name": display_name, "description": description, "text": text, "source_table": source_table, "source_pk": source_pk, "release_name": release_name, "ref_hash": ref_hash, "attributes_json": json.dumps(attributes_obj, ensure_ascii=False, default=str, sort_keys=True), "canonical_name": display_name, "kind": kind, "aliases": aliases, "external_ids": external_ids, "tags": tags, "latest_cid": first_str(row, ["latest_cid", "cid", "ipfs_cid"]), "summary": description, "created_at": to_iso(row.get("created_at")) or utc_now_iso(), "updated_at": to_iso(row.get("updated_at")) or utc_now_iso(), "fingerprint": make_fingerprint(display_name, concept_type, external_ids), } def project_release( manifest_file: Optional[str], release_name: Optional[str], concept_table: Optional[str], nessie_ref: Optional[str], releases_ref: Optional[str], dry_run: bool, targets: str, ) -> None: if not manifest_file and not release_name: raise ValueError("Provide either --manifest-file or --release-name.") manifest: Optional[Dict[str, Any]] = load_manifest(manifest_file) if manifest_file else None # Release-name mode: lookup manifest on registry ref (usually main), then project on release tag. if manifest is None and release_name: registry_ref = releases_ref or os.getenv("RELEASES_REF", "main") spark, catalog = build_spark(registry_ref) manifest = load_manifest_from_registry(spark, catalog, release_name) ref = nessie_ref or infer_manifest_ref(manifest) or release_name if ref != registry_ref: spark.stop() spark, catalog = build_spark(ref) else: ref = nessie_ref or (infer_manifest_ref(manifest) if manifest else None) or release_name if not ref: raise ValueError("Unable to infer Nessie ref/tag; pass --nessie-ref explicitly.") spark, catalog = build_spark(ref) table_identifiers: List[str] = extract_table_identifiers(manifest) if manifest else [] table = concept_table or (infer_concept_table(table_identifiers) if manifest else None) if not table: raise ValueError("Unable to infer concept table; pass --concept-table explicitly.") if table.count(".") == 1: table = f"{catalog}.{table}" print(f"[INFO] Using Nessie ref/tag: {ref}") print(f"[INFO] Reading table: {table}") release_name_effective = None ref_hash = None if manifest: rel = manifest.get("release") if isinstance(rel, dict): rel_name = rel.get("name") if isinstance(rel_name, str) and rel_name.strip(): release_name_effective = rel_name.strip() nes = manifest.get("nessie") if isinstance(nes, dict): ref_obj = nes.get("ref") if isinstance(ref_obj, dict): h = ref_obj.get("hash") if isinstance(h, str) and h.strip(): ref_hash = h.strip() if not release_name_effective and release_name and isinstance(release_name, str) and release_name.strip(): release_name_effective = release_name.strip() df = spark.table(table) rows = [r.asDict(recursive=True) for r in df.collect()] concepts = [c for c in (row_to_concept(r, table, release_name_effective, ref_hash) for r in rows) if c] print(f"[INFO] Read {len(rows)} rows, {len(concepts)} valid concepts") print("[STEP] spark_read_done") if dry_run: print("[INFO] Dry-run enabled. No writes performed.") return use_es = targets in ("both", "es") use_gremlin = targets in ("both", "gremlin") print(f"[INFO] Projection targets: {targets}") gremlin_url = os.getenv("GREMLIN_URL", "ws://localhost:8182/gremlin") es_url = os.getenv("ES_URL", "http://localhost:9200") es_index = os.getenv("ES_INDEX", "concepts") if use_es: ensure_es_index(es_url, es_index) success = 0 failures = 0 gremlin_missing = False es_missing = False for concept in concepts: try: wrote_any = False if use_gremlin and not gremlin_missing: try: gremlin_upsert(gremlin_url, concept) wrote_any = True except ModuleNotFoundError as e: gremlin_missing = True print(f"[WARN] Gremlin dependency missing ({e}). Continuing with ES only.") except Exception as e: print(f"[WARN] Gremlin upsert failed for {concept.get('concept_id')}: {e}") if use_es and not es_missing: try: es_upsert(es_url, es_index, concept) wrote_any = True except ModuleNotFoundError as e: es_missing = True print(f"[WARN] ES dependency missing ({e}). Continuing with Gremlin only.") except Exception as e: print(f"[WARN] ES upsert failed for {concept.get('concept_id')}: {e}") if wrote_any: success += 1 else: failures += 1 print(f"[WARN] No projection target succeeded for {concept.get('concept_id')}") except Exception as e: failures += 1 print(f"[WARN] Failed concept {concept.get('concept_id')}: {e}") print("[STEP] projection_done") print(f"[DONE] Projected {success} concepts ({failures} failed)") def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser(description="Project a lakehouse release into JanusGraph + Elasticsearch.") p.add_argument("--manifest-file", help="Path to release manifest JSON") p.add_argument("--release-name", help="Release name to load from releases_v2 registry") p.add_argument("--concept-table", help="Full Iceberg table identifier holding concepts") p.add_argument("--nessie-ref", help="Nessie branch/tag to read from (defaults to manifest tag)") p.add_argument("--releases-ref", help="Nessie ref used to read releases_v2 (default: main)") p.add_argument( "--targets", choices=["es", "gremlin", "both"], default="both", help="Projection targets to write (default: both)", ) p.add_argument("--dry-run", action="store_true", help="Read and validate only") return p.parse_args() def main() -> None: if load_dotenv is not None: load_dotenv() args = parse_args() project_release( manifest_file=args.manifest_file, release_name=args.release_name, concept_table=args.concept_table, nessie_ref=args.nessie_ref, releases_ref=args.releases_ref, dry_run=args.dry_run, targets=args.targets, ) if __name__ == "__main__": main()