608 lines
22 KiB
Python
608 lines
22 KiB
Python
|
|
import argparse
|
||
|
|
import hashlib
|
||
|
|
import json
|
||
|
|
import os
|
||
|
|
import urllib.error
|
||
|
|
import urllib.request
|
||
|
|
from datetime import date, datetime, timezone
|
||
|
|
from typing import Any, Dict, List, Optional
|
||
|
|
|
||
|
|
try:
|
||
|
|
from dotenv import load_dotenv
|
||
|
|
except Exception:
|
||
|
|
load_dotenv = None
|
||
|
|
|
||
|
|
|
||
|
|
DEFAULT_SPARK_PACKAGES = (
|
||
|
|
"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.10.1,"
|
||
|
|
"org.apache.iceberg:iceberg-aws-bundle:1.10.1,"
|
||
|
|
"org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.104.5"
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def utc_now_iso() -> str:
|
||
|
|
return datetime.now(timezone.utc).isoformat()
|
||
|
|
|
||
|
|
|
||
|
|
def parse_json_maybe(value: Any, expected_type: type, fallback: Any) -> Any:
|
||
|
|
if value is None:
|
||
|
|
return fallback
|
||
|
|
if isinstance(value, expected_type):
|
||
|
|
return value
|
||
|
|
if isinstance(value, str):
|
||
|
|
try:
|
||
|
|
parsed = json.loads(value)
|
||
|
|
if isinstance(parsed, expected_type):
|
||
|
|
return parsed
|
||
|
|
except Exception:
|
||
|
|
return fallback
|
||
|
|
return fallback
|
||
|
|
|
||
|
|
|
||
|
|
def first_str(row: Dict[str, Any], keys: List[str]) -> Optional[str]:
|
||
|
|
for key in keys:
|
||
|
|
val = row.get(key)
|
||
|
|
if isinstance(val, str) and val.strip():
|
||
|
|
return val.strip()
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def to_iso(value: Any) -> Optional[str]:
|
||
|
|
if isinstance(value, datetime):
|
||
|
|
return value.isoformat()
|
||
|
|
if isinstance(value, date):
|
||
|
|
return datetime.combine(value, datetime.min.time(), timezone.utc).isoformat()
|
||
|
|
if isinstance(value, str) and value.strip():
|
||
|
|
return value.strip()
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def make_fingerprint(name: str, kind: Optional[str], external_ids: Dict[str, str]) -> str:
|
||
|
|
norm = (name or "").strip().lower()
|
||
|
|
kind_norm = (kind or "").strip().lower()
|
||
|
|
ext = "|".join(f"{k}:{v}".lower() for k, v in sorted(external_ids.items()))
|
||
|
|
raw = f"{norm}|{kind_norm}|{ext}"
|
||
|
|
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
|
||
|
|
|
||
|
|
|
||
|
|
def load_manifest(path: str) -> Dict[str, Any]:
|
||
|
|
with open(path, "r", encoding="utf-8") as f:
|
||
|
|
raw = json.load(f)
|
||
|
|
|
||
|
|
if isinstance(raw, dict):
|
||
|
|
manifest_json = raw.get("manifest_json")
|
||
|
|
if isinstance(manifest_json, str):
|
||
|
|
try:
|
||
|
|
parsed = json.loads(manifest_json)
|
||
|
|
if isinstance(parsed, dict):
|
||
|
|
return parsed
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
return raw
|
||
|
|
|
||
|
|
if isinstance(raw, list) and raw and isinstance(raw[0], dict):
|
||
|
|
manifest_json = raw[0].get("manifest_json")
|
||
|
|
if isinstance(manifest_json, str):
|
||
|
|
parsed = json.loads(manifest_json)
|
||
|
|
if isinstance(parsed, dict):
|
||
|
|
return parsed
|
||
|
|
|
||
|
|
raise ValueError("Manifest file must contain a manifest object or releases_v2 row with manifest_json.")
|
||
|
|
|
||
|
|
|
||
|
|
def infer_manifest_ref(manifest: Dict[str, Any]) -> Optional[str]:
|
||
|
|
nessie = manifest.get("nessie")
|
||
|
|
if isinstance(nessie, dict):
|
||
|
|
ref_obj = nessie.get("ref")
|
||
|
|
if isinstance(ref_obj, dict):
|
||
|
|
ref_name = ref_obj.get("name")
|
||
|
|
if isinstance(ref_name, str) and ref_name.strip():
|
||
|
|
return ref_name.strip()
|
||
|
|
tag = nessie.get("tag")
|
||
|
|
if isinstance(tag, str) and tag.strip():
|
||
|
|
return tag.strip()
|
||
|
|
|
||
|
|
release_obj = manifest.get("release")
|
||
|
|
if isinstance(release_obj, dict):
|
||
|
|
release_name = release_obj.get("name")
|
||
|
|
if isinstance(release_name, str) and release_name.strip():
|
||
|
|
return release_name.strip()
|
||
|
|
|
||
|
|
for key in ("nessie_tag", "tag", "release_name"):
|
||
|
|
val = manifest.get(key)
|
||
|
|
if isinstance(val, str) and val.strip():
|
||
|
|
return val.strip()
|
||
|
|
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def extract_table_identifiers(manifest: Dict[str, Any]) -> List[str]:
|
||
|
|
out: List[str] = []
|
||
|
|
tables = manifest.get("tables")
|
||
|
|
if isinstance(tables, list):
|
||
|
|
for t in tables:
|
||
|
|
if not isinstance(t, dict):
|
||
|
|
continue
|
||
|
|
ident = t.get("table_identifier") or t.get("identifier") or t.get("table")
|
||
|
|
if isinstance(ident, str) and ident.strip():
|
||
|
|
out.append(ident.strip())
|
||
|
|
|
||
|
|
if out:
|
||
|
|
return out
|
||
|
|
|
||
|
|
rows = manifest.get("rows")
|
||
|
|
if isinstance(rows, list):
|
||
|
|
for row in rows:
|
||
|
|
if not isinstance(row, dict):
|
||
|
|
continue
|
||
|
|
ident = row.get("table_identifier")
|
||
|
|
if isinstance(ident, str) and ident.strip():
|
||
|
|
out.append(ident.strip())
|
||
|
|
|
||
|
|
return out
|
||
|
|
|
||
|
|
|
||
|
|
def infer_concept_table(tables: List[str]) -> Optional[str]:
|
||
|
|
for t in tables:
|
||
|
|
lower = t.lower()
|
||
|
|
if "concept" in lower:
|
||
|
|
return t
|
||
|
|
return tables[0] if tables else None
|
||
|
|
|
||
|
|
|
||
|
|
def load_manifest_from_registry(
|
||
|
|
spark: Any,
|
||
|
|
catalog: str,
|
||
|
|
release_name: str,
|
||
|
|
releases_table: Optional[str] = None,
|
||
|
|
) -> Dict[str, Any]:
|
||
|
|
from pyspark.sql import functions as F
|
||
|
|
|
||
|
|
table = releases_table or os.getenv("RELEASES_TABLE", "db1.releases_v2")
|
||
|
|
if table.count(".") == 1:
|
||
|
|
table = f"{catalog}.{table}"
|
||
|
|
|
||
|
|
row = (
|
||
|
|
spark.table(table)
|
||
|
|
.where(F.col("release_name") == release_name)
|
||
|
|
.orderBy(F.col("ingested_at_utc").desc_nulls_last())
|
||
|
|
.select("manifest_json")
|
||
|
|
.limit(1)
|
||
|
|
.collect()
|
||
|
|
)
|
||
|
|
if not row:
|
||
|
|
raise ValueError(f"Release '{release_name}' not found in registry table {table}.")
|
||
|
|
|
||
|
|
manifest_json = row[0]["manifest_json"]
|
||
|
|
if not isinstance(manifest_json, str) or not manifest_json.strip():
|
||
|
|
raise ValueError(f"Release '{release_name}' has empty manifest_json in {table}.")
|
||
|
|
|
||
|
|
manifest = json.loads(manifest_json)
|
||
|
|
if not isinstance(manifest, dict):
|
||
|
|
raise ValueError(f"Release '{release_name}' manifest_json is not a JSON object.")
|
||
|
|
return manifest
|
||
|
|
|
||
|
|
|
||
|
|
def build_spark(ref: str):
|
||
|
|
try:
|
||
|
|
from pyspark.sql import SparkSession
|
||
|
|
except Exception as e:
|
||
|
|
raise RuntimeError(
|
||
|
|
"pyspark is not installed. Install it or run this with spark-submit."
|
||
|
|
) from e
|
||
|
|
|
||
|
|
catalog = os.getenv("SPARK_CATALOG", "lake")
|
||
|
|
|
||
|
|
builder = (
|
||
|
|
SparkSession.builder.appName("release-projector")
|
||
|
|
.config(
|
||
|
|
"spark.sql.extensions",
|
||
|
|
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,"
|
||
|
|
"org.projectnessie.spark.extensions.NessieSparkSessionExtensions",
|
||
|
|
)
|
||
|
|
.config("spark.jars.packages", os.getenv("SPARK_PACKAGES", DEFAULT_SPARK_PACKAGES))
|
||
|
|
.config(f"spark.sql.catalog.{catalog}", "org.apache.iceberg.spark.SparkCatalog")
|
||
|
|
.config(f"spark.sql.catalog.{catalog}.catalog-impl", "org.apache.iceberg.nessie.NessieCatalog")
|
||
|
|
.config(f"spark.sql.catalog.{catalog}.uri", os.getenv("NESSIE_URI", "http://lakehouse-core:19120/api/v2"))
|
||
|
|
.config(f"spark.sql.catalog.{catalog}.ref", ref)
|
||
|
|
.config(
|
||
|
|
f"spark.sql.catalog.{catalog}.warehouse",
|
||
|
|
os.getenv("NESSIE_WAREHOUSE", "s3a://lakehouse/warehouse"),
|
||
|
|
)
|
||
|
|
.config(f"spark.sql.catalog.{catalog}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
|
||
|
|
.config("spark.hadoop.fs.s3a.endpoint", os.getenv("S3_ENDPOINT", "http://lakehouse-core:9000"))
|
||
|
|
.config("spark.hadoop.fs.s3a.path.style.access", os.getenv("S3_PATH_STYLE", "true"))
|
||
|
|
.config(
|
||
|
|
"spark.hadoop.fs.s3a.access.key",
|
||
|
|
os.getenv("AWS_ACCESS_KEY_ID", os.getenv("MINIO_ROOT_USER", "minioadmin")),
|
||
|
|
)
|
||
|
|
.config(
|
||
|
|
"spark.hadoop.fs.s3a.secret.key",
|
||
|
|
os.getenv("AWS_SECRET_ACCESS_KEY", os.getenv("MINIO_ROOT_PASSWORD", "minioadmin")),
|
||
|
|
)
|
||
|
|
)
|
||
|
|
|
||
|
|
spark_master = os.getenv("SPARK_MASTER")
|
||
|
|
if spark_master:
|
||
|
|
builder = builder.master(spark_master)
|
||
|
|
|
||
|
|
return builder.getOrCreate(), catalog
|
||
|
|
|
||
|
|
|
||
|
|
def ensure_es_index(es_url: str, es_index: str) -> None:
|
||
|
|
mapping = {
|
||
|
|
"mappings": {
|
||
|
|
"properties": {
|
||
|
|
"concept_id": {"type": "keyword"},
|
||
|
|
"concept_type": {"type": "keyword"},
|
||
|
|
"display_name": {"type": "text"},
|
||
|
|
"description": {"type": "text"},
|
||
|
|
"text": {"type": "text"},
|
||
|
|
"source_table": {"type": "keyword"},
|
||
|
|
"source_pk": {"type": "keyword"},
|
||
|
|
"release_name": {"type": "keyword"},
|
||
|
|
"ref_hash": {"type": "keyword"},
|
||
|
|
"attributes_json": {"type": "text"},
|
||
|
|
"canonical_name": {"type": "text"},
|
||
|
|
"kind": {"type": "keyword"},
|
||
|
|
"aliases": {"type": "text"},
|
||
|
|
"tags": {"type": "keyword"},
|
||
|
|
"summary": {"type": "text"},
|
||
|
|
"latest_cid": {"type": "keyword"},
|
||
|
|
"fingerprint": {"type": "keyword"},
|
||
|
|
"created_at": {"type": "date"},
|
||
|
|
"updated_at": {"type": "date"},
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
url = f"{es_url.rstrip('/')}/{es_index}"
|
||
|
|
req_get = urllib.request.Request(url, method="GET")
|
||
|
|
try:
|
||
|
|
with urllib.request.urlopen(req_get, timeout=30) as resp:
|
||
|
|
if 200 <= resp.status < 300:
|
||
|
|
return
|
||
|
|
except urllib.error.HTTPError as e:
|
||
|
|
if e.code != 404:
|
||
|
|
raise
|
||
|
|
|
||
|
|
body = json.dumps(mapping).encode("utf-8")
|
||
|
|
req_put = urllib.request.Request(url, data=body, method="PUT")
|
||
|
|
req_put.add_header("Content-Type", "application/json")
|
||
|
|
with urllib.request.urlopen(req_put, timeout=30) as resp:
|
||
|
|
if resp.status >= 400:
|
||
|
|
raise RuntimeError(f"Failed to create ES index {es_index}: HTTP {resp.status}")
|
||
|
|
|
||
|
|
|
||
|
|
def es_upsert(es_url: str, es_index: str, doc: Dict[str, Any]) -> None:
|
||
|
|
url = f"{es_url.rstrip('/')}/{es_index}/_doc/{doc['concept_id']}"
|
||
|
|
body = json.dumps(doc, default=str).encode("utf-8")
|
||
|
|
req = urllib.request.Request(url, data=body, method="PUT")
|
||
|
|
req.add_header("Content-Type", "application/json")
|
||
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
||
|
|
if resp.status >= 400:
|
||
|
|
raise RuntimeError(f"Failed ES upsert for {doc['concept_id']}: HTTP {resp.status}")
|
||
|
|
|
||
|
|
|
||
|
|
def gremlin_upsert(gremlin_url: str, concept: Dict[str, Any]) -> None:
|
||
|
|
from gremlin_python.driver import client as gremlin_client
|
||
|
|
from gremlin_python.driver.serializer import GraphSONSerializersV3d0
|
||
|
|
|
||
|
|
created_at = concept.get("created_at") or utc_now_iso()
|
||
|
|
updated_at = concept.get("updated_at") or utc_now_iso()
|
||
|
|
|
||
|
|
query = """
|
||
|
|
g.V().hasLabel('Concept').has('concept_id', concept_id).fold()
|
||
|
|
.coalesce(
|
||
|
|
unfold(),
|
||
|
|
addV('Concept').property('concept_id', concept_id).property('created_at', created_at)
|
||
|
|
)
|
||
|
|
.property('canonical_name', canonical_name)
|
||
|
|
.property('kind', kind)
|
||
|
|
.property('concept_type', concept_type)
|
||
|
|
.property('display_name', display_name)
|
||
|
|
.property('description', description)
|
||
|
|
.property('text', text)
|
||
|
|
.property('source_table', source_table)
|
||
|
|
.property('source_pk', source_pk)
|
||
|
|
.property('release_name', release_name)
|
||
|
|
.property('ref_hash', ref_hash)
|
||
|
|
.property('attributes_json', attributes_json)
|
||
|
|
.property('aliases', aliases_json)
|
||
|
|
.property('external_ids', external_ids_json)
|
||
|
|
.property('tags', tags_json)
|
||
|
|
.property('fingerprint', fingerprint)
|
||
|
|
.property('latest_cid', latest_cid)
|
||
|
|
.property('summary', summary)
|
||
|
|
.property('updated_at', updated_at)
|
||
|
|
.values('concept_id')
|
||
|
|
"""
|
||
|
|
|
||
|
|
c = gremlin_client.Client(
|
||
|
|
gremlin_url,
|
||
|
|
"g",
|
||
|
|
message_serializer=GraphSONSerializersV3d0(),
|
||
|
|
)
|
||
|
|
try:
|
||
|
|
c.submit(
|
||
|
|
query,
|
||
|
|
{
|
||
|
|
"concept_id": concept["concept_id"],
|
||
|
|
"canonical_name": concept.get("canonical_name") or "",
|
||
|
|
"kind": concept.get("kind") or "",
|
||
|
|
"concept_type": concept.get("concept_type") or "",
|
||
|
|
"display_name": concept.get("display_name") or "",
|
||
|
|
"description": concept.get("description") or "",
|
||
|
|
"text": concept.get("text") or "",
|
||
|
|
"source_table": concept.get("source_table") or "",
|
||
|
|
"source_pk": concept.get("source_pk") or "",
|
||
|
|
"release_name": concept.get("release_name") or "",
|
||
|
|
"ref_hash": concept.get("ref_hash") or "",
|
||
|
|
"attributes_json": concept.get("attributes_json") or "{}",
|
||
|
|
"aliases_json": json.dumps(concept.get("aliases", []), ensure_ascii=False),
|
||
|
|
"external_ids_json": json.dumps(concept.get("external_ids", {}), ensure_ascii=False),
|
||
|
|
"tags_json": json.dumps(concept.get("tags", []), ensure_ascii=False),
|
||
|
|
"fingerprint": concept["fingerprint"],
|
||
|
|
"latest_cid": concept.get("latest_cid") or "",
|
||
|
|
"summary": concept.get("summary") or "",
|
||
|
|
"created_at": created_at,
|
||
|
|
"updated_at": updated_at,
|
||
|
|
},
|
||
|
|
).all().result()
|
||
|
|
finally:
|
||
|
|
c.close()
|
||
|
|
|
||
|
|
|
||
|
|
def _infer_concept_type(row: Dict[str, Any], source_table: Optional[str]) -> str:
|
||
|
|
explicit = first_str(row, ["concept_type", "kind", "type"])
|
||
|
|
if explicit:
|
||
|
|
return explicit.lower()
|
||
|
|
lower_table = (source_table or "").lower()
|
||
|
|
if "messages" in lower_table:
|
||
|
|
return "message"
|
||
|
|
if "docs" in lower_table or "documents" in lower_table:
|
||
|
|
return "document"
|
||
|
|
if "message_id" in row:
|
||
|
|
return "message"
|
||
|
|
if "doc_id" in row or "document_id" in row:
|
||
|
|
return "document"
|
||
|
|
return "entity"
|
||
|
|
|
||
|
|
|
||
|
|
def _source_pk(row: Dict[str, Any]) -> Optional[str]:
|
||
|
|
return first_str(row, ["source_pk", "message_id", "doc_id", "document_id", "id", "uuid"])
|
||
|
|
|
||
|
|
|
||
|
|
def row_to_concept(
|
||
|
|
row: Dict[str, Any],
|
||
|
|
source_table: Optional[str],
|
||
|
|
release_name: Optional[str],
|
||
|
|
ref_hash: Optional[str],
|
||
|
|
) -> Optional[Dict[str, Any]]:
|
||
|
|
concept_type = _infer_concept_type(row, source_table)
|
||
|
|
source_pk = _source_pk(row)
|
||
|
|
display_name = first_str(
|
||
|
|
row,
|
||
|
|
[
|
||
|
|
"display_name",
|
||
|
|
"canonical_name",
|
||
|
|
"title",
|
||
|
|
"name",
|
||
|
|
"subject",
|
||
|
|
"doc_name",
|
||
|
|
"document_name",
|
||
|
|
],
|
||
|
|
)
|
||
|
|
if not display_name and source_pk:
|
||
|
|
display_name = f"{concept_type}:{source_pk}"
|
||
|
|
if not display_name:
|
||
|
|
display_name = first_str(row, ["body", "text", "content"])
|
||
|
|
if display_name:
|
||
|
|
display_name = display_name[:120]
|
||
|
|
if not display_name:
|
||
|
|
return None
|
||
|
|
|
||
|
|
external_ids = parse_json_maybe(row.get("external_ids"), dict, {})
|
||
|
|
aliases = parse_json_maybe(row.get("aliases"), list, [])
|
||
|
|
tags = parse_json_maybe(row.get("tags"), list, [])
|
||
|
|
|
||
|
|
kind = first_str(row, ["kind", "type", "doc_type", "document_type"]) or concept_type
|
||
|
|
|
||
|
|
concept_id = first_str(row, ["concept_id", "doc_id", "document_id", "id", "uuid"])
|
||
|
|
if not concept_id and source_pk:
|
||
|
|
concept_id = f"{concept_type}:{source_pk}"
|
||
|
|
if not isinstance(concept_id, str) or not concept_id.strip():
|
||
|
|
concept_id = hashlib.sha256(
|
||
|
|
f"{concept_type}|{display_name}|{json.dumps(external_ids, sort_keys=True)}".encode("utf-8")
|
||
|
|
).hexdigest()
|
||
|
|
|
||
|
|
description = first_str(row, ["description", "summary", "abstract"])
|
||
|
|
if not description:
|
||
|
|
body = first_str(row, ["content", "text", "body"])
|
||
|
|
if body:
|
||
|
|
description = body[:512]
|
||
|
|
|
||
|
|
text = first_str(row, ["text", "content", "body"])
|
||
|
|
if not text:
|
||
|
|
text = description
|
||
|
|
|
||
|
|
# Keep typed attributes stable and searchable without exploding ES mapping.
|
||
|
|
attributes_obj = row
|
||
|
|
|
||
|
|
return {
|
||
|
|
"concept_id": concept_id,
|
||
|
|
"concept_type": concept_type,
|
||
|
|
"display_name": display_name,
|
||
|
|
"description": description,
|
||
|
|
"text": text,
|
||
|
|
"source_table": source_table,
|
||
|
|
"source_pk": source_pk,
|
||
|
|
"release_name": release_name,
|
||
|
|
"ref_hash": ref_hash,
|
||
|
|
"attributes_json": json.dumps(attributes_obj, ensure_ascii=False, default=str, sort_keys=True),
|
||
|
|
"canonical_name": display_name,
|
||
|
|
"kind": kind,
|
||
|
|
"aliases": aliases,
|
||
|
|
"external_ids": external_ids,
|
||
|
|
"tags": tags,
|
||
|
|
"latest_cid": first_str(row, ["latest_cid", "cid", "ipfs_cid"]),
|
||
|
|
"summary": description,
|
||
|
|
"created_at": to_iso(row.get("created_at")) or utc_now_iso(),
|
||
|
|
"updated_at": to_iso(row.get("updated_at")) or utc_now_iso(),
|
||
|
|
"fingerprint": make_fingerprint(display_name, concept_type, external_ids),
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def project_release(
|
||
|
|
manifest_file: Optional[str],
|
||
|
|
release_name: Optional[str],
|
||
|
|
concept_table: Optional[str],
|
||
|
|
nessie_ref: Optional[str],
|
||
|
|
releases_ref: Optional[str],
|
||
|
|
dry_run: bool,
|
||
|
|
targets: str,
|
||
|
|
) -> None:
|
||
|
|
if not manifest_file and not release_name:
|
||
|
|
raise ValueError("Provide either --manifest-file or --release-name.")
|
||
|
|
|
||
|
|
manifest: Optional[Dict[str, Any]] = load_manifest(manifest_file) if manifest_file else None
|
||
|
|
|
||
|
|
# Release-name mode: lookup manifest on registry ref (usually main), then project on release tag.
|
||
|
|
if manifest is None and release_name:
|
||
|
|
registry_ref = releases_ref or os.getenv("RELEASES_REF", "main")
|
||
|
|
spark, catalog = build_spark(registry_ref)
|
||
|
|
manifest = load_manifest_from_registry(spark, catalog, release_name)
|
||
|
|
ref = nessie_ref or infer_manifest_ref(manifest) or release_name
|
||
|
|
if ref != registry_ref:
|
||
|
|
spark.stop()
|
||
|
|
spark, catalog = build_spark(ref)
|
||
|
|
else:
|
||
|
|
ref = nessie_ref or (infer_manifest_ref(manifest) if manifest else None) or release_name
|
||
|
|
if not ref:
|
||
|
|
raise ValueError("Unable to infer Nessie ref/tag; pass --nessie-ref explicitly.")
|
||
|
|
spark, catalog = build_spark(ref)
|
||
|
|
|
||
|
|
table_identifiers: List[str] = extract_table_identifiers(manifest) if manifest else []
|
||
|
|
table = concept_table or (infer_concept_table(table_identifiers) if manifest else None)
|
||
|
|
if not table:
|
||
|
|
raise ValueError("Unable to infer concept table; pass --concept-table explicitly.")
|
||
|
|
|
||
|
|
if table.count(".") == 1:
|
||
|
|
table = f"{catalog}.{table}"
|
||
|
|
|
||
|
|
print(f"[INFO] Using Nessie ref/tag: {ref}")
|
||
|
|
print(f"[INFO] Reading table: {table}")
|
||
|
|
|
||
|
|
release_name_effective = None
|
||
|
|
ref_hash = None
|
||
|
|
if manifest:
|
||
|
|
rel = manifest.get("release")
|
||
|
|
if isinstance(rel, dict):
|
||
|
|
rel_name = rel.get("name")
|
||
|
|
if isinstance(rel_name, str) and rel_name.strip():
|
||
|
|
release_name_effective = rel_name.strip()
|
||
|
|
nes = manifest.get("nessie")
|
||
|
|
if isinstance(nes, dict):
|
||
|
|
ref_obj = nes.get("ref")
|
||
|
|
if isinstance(ref_obj, dict):
|
||
|
|
h = ref_obj.get("hash")
|
||
|
|
if isinstance(h, str) and h.strip():
|
||
|
|
ref_hash = h.strip()
|
||
|
|
if not release_name_effective and release_name and isinstance(release_name, str) and release_name.strip():
|
||
|
|
release_name_effective = release_name.strip()
|
||
|
|
|
||
|
|
df = spark.table(table)
|
||
|
|
rows = [r.asDict(recursive=True) for r in df.collect()]
|
||
|
|
concepts = [c for c in (row_to_concept(r, table, release_name_effective, ref_hash) for r in rows) if c]
|
||
|
|
|
||
|
|
print(f"[INFO] Read {len(rows)} rows, {len(concepts)} valid concepts")
|
||
|
|
print("[STEP] spark_read_done")
|
||
|
|
if dry_run:
|
||
|
|
print("[INFO] Dry-run enabled. No writes performed.")
|
||
|
|
return
|
||
|
|
|
||
|
|
use_es = targets in ("both", "es")
|
||
|
|
use_gremlin = targets in ("both", "gremlin")
|
||
|
|
print(f"[INFO] Projection targets: {targets}")
|
||
|
|
|
||
|
|
gremlin_url = os.getenv("GREMLIN_URL", "ws://localhost:8182/gremlin")
|
||
|
|
es_url = os.getenv("ES_URL", "http://localhost:9200")
|
||
|
|
es_index = os.getenv("ES_INDEX", "concepts")
|
||
|
|
|
||
|
|
if use_es:
|
||
|
|
ensure_es_index(es_url, es_index)
|
||
|
|
|
||
|
|
success = 0
|
||
|
|
failures = 0
|
||
|
|
gremlin_missing = False
|
||
|
|
es_missing = False
|
||
|
|
for concept in concepts:
|
||
|
|
try:
|
||
|
|
wrote_any = False
|
||
|
|
if use_gremlin and not gremlin_missing:
|
||
|
|
try:
|
||
|
|
gremlin_upsert(gremlin_url, concept)
|
||
|
|
wrote_any = True
|
||
|
|
except ModuleNotFoundError as e:
|
||
|
|
gremlin_missing = True
|
||
|
|
print(f"[WARN] Gremlin dependency missing ({e}). Continuing with ES only.")
|
||
|
|
except Exception as e:
|
||
|
|
print(f"[WARN] Gremlin upsert failed for {concept.get('concept_id')}: {e}")
|
||
|
|
|
||
|
|
if use_es and not es_missing:
|
||
|
|
try:
|
||
|
|
es_upsert(es_url, es_index, concept)
|
||
|
|
wrote_any = True
|
||
|
|
except ModuleNotFoundError as e:
|
||
|
|
es_missing = True
|
||
|
|
print(f"[WARN] ES dependency missing ({e}). Continuing with Gremlin only.")
|
||
|
|
except Exception as e:
|
||
|
|
print(f"[WARN] ES upsert failed for {concept.get('concept_id')}: {e}")
|
||
|
|
|
||
|
|
if wrote_any:
|
||
|
|
success += 1
|
||
|
|
else:
|
||
|
|
failures += 1
|
||
|
|
print(f"[WARN] No projection target succeeded for {concept.get('concept_id')}")
|
||
|
|
except Exception as e:
|
||
|
|
failures += 1
|
||
|
|
print(f"[WARN] Failed concept {concept.get('concept_id')}: {e}")
|
||
|
|
|
||
|
|
print("[STEP] projection_done")
|
||
|
|
print(f"[DONE] Projected {success} concepts ({failures} failed)")
|
||
|
|
|
||
|
|
|
||
|
|
def parse_args() -> argparse.Namespace:
|
||
|
|
p = argparse.ArgumentParser(description="Project a lakehouse release into JanusGraph + Elasticsearch.")
|
||
|
|
p.add_argument("--manifest-file", help="Path to release manifest JSON")
|
||
|
|
p.add_argument("--release-name", help="Release name to load from releases_v2 registry")
|
||
|
|
p.add_argument("--concept-table", help="Full Iceberg table identifier holding concepts")
|
||
|
|
p.add_argument("--nessie-ref", help="Nessie branch/tag to read from (defaults to manifest tag)")
|
||
|
|
p.add_argument("--releases-ref", help="Nessie ref used to read releases_v2 (default: main)")
|
||
|
|
p.add_argument(
|
||
|
|
"--targets",
|
||
|
|
choices=["es", "gremlin", "both"],
|
||
|
|
default="both",
|
||
|
|
help="Projection targets to write (default: both)",
|
||
|
|
)
|
||
|
|
p.add_argument("--dry-run", action="store_true", help="Read and validate only")
|
||
|
|
return p.parse_args()
|
||
|
|
|
||
|
|
|
||
|
|
def main() -> None:
|
||
|
|
if load_dotenv is not None:
|
||
|
|
load_dotenv()
|
||
|
|
args = parse_args()
|
||
|
|
project_release(
|
||
|
|
manifest_file=args.manifest_file,
|
||
|
|
release_name=args.release_name,
|
||
|
|
concept_table=args.concept_table,
|
||
|
|
nessie_ref=args.nessie_ref,
|
||
|
|
releases_ref=args.releases_ref,
|
||
|
|
dry_run=args.dry_run,
|
||
|
|
targets=args.targets,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|