import argparse import hashlib import json import os import urllib.error import urllib.parse import urllib.request from datetime import datetime, timezone from pyspark.sql import SparkSession from pyspark.sql import types as T def now_iso() -> str: return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace('+00:00', 'Z') def http_json(method: str, url: str, payload: dict | None = None) -> dict: data = json.dumps(payload).encode("utf-8") if payload is not None else None req = urllib.request.Request(url, data=data, method=method) req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: body = resp.read().decode("utf-8") return json.loads(body) if body else {} def get_ref(nessie_uri: str, ref_name: str) -> dict | None: try: return http_json("GET", f"{nessie_uri.rstrip('/')}/trees/{urllib.parse.quote(ref_name, safe='')}") except urllib.error.HTTPError as e: if e.code == 404: return None raise def extract_ref_hash(ref_obj: dict) -> str: # Nessie responses can vary by endpoint/version: # - {"type":"BRANCH","name":"main","hash":"..."} # - {"reference":{"type":"BRANCH","name":"main","hash":"..."}} if isinstance(ref_obj.get("hash"), str) and ref_obj["hash"]: return ref_obj["hash"] reference = ref_obj.get("reference") if isinstance(reference, dict) and isinstance(reference.get("hash"), str) and reference["hash"]: return reference["hash"] raise KeyError("hash") def ensure_tag(nessie_uri: str, tag_name: str) -> dict: existing = get_ref(nessie_uri, tag_name) if existing is not None: return existing main_ref = http_json("GET", f"{nessie_uri.rstrip('/')}/trees/main") payload = { "type": "BRANCH", "name": "main", "hash": extract_ref_hash(main_ref), } query = urllib.parse.urlencode({"name": tag_name, "type": "TAG"}) http_json("POST", f"{nessie_uri.rstrip('/')}/trees?{query}", payload) created = get_ref(nessie_uri, tag_name) if created is None: raise RuntimeError(f"Tag creation appeared to succeed but tag '{tag_name}' is not retrievable") return created def create_registry_table_if_missing(spark: SparkSession, releases_table: str) -> None: spark.sql( f""" CREATE TABLE IF NOT EXISTS {releases_table} ( release_name STRING, ref_type STRING, ref_name STRING, ref_hash STRING, created_at_utc STRING, ingested_at_utc STRING, table_identifier STRING, snapshot_id BIGINT, metadata_location STRING, manifest_sha256 STRING, manifest_json STRING ) USING iceberg """ ) def _to_utc_datetime(value: str): # Accept ISO strings with 'Z' suffix. return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(timezone.utc) def _convert_value_for_type(field: T.StructField, value): if value is None: return None dt = field.dataType if isinstance(dt, T.StringType): return str(value) if isinstance(dt, T.LongType): return int(value) if isinstance(dt, T.IntegerType): return int(value) if isinstance(dt, T.ShortType): return int(value) if isinstance(dt, T.ByteType): return int(value) if isinstance(dt, T.BooleanType): return bool(value) if isinstance(dt, T.FloatType): return float(value) if isinstance(dt, T.DoubleType): return float(value) if isinstance(dt, T.TimestampType): if isinstance(value, datetime): return value return _to_utc_datetime(str(value)) if isinstance(dt, T.DateType): if isinstance(value, datetime): return value.date() return _to_utc_datetime(str(value)).date() # Leave unsupported/complex types as-is; Spark can still validate and fail clearly. return value def append_registry_row( spark: SparkSession, releases_table: str, release_name: str, ref_type: str, ref_name: str, ref_hash: str, created_at_utc: str, ingested_at_utc: str, table_identifier: str, snapshot_id: int, metadata_location: str, manifest_sha256: str, manifest_json: str, created_by: str, description: str, ) -> None: target_schema = spark.table(releases_table).schema base_values = { "release_name": release_name, "ref_type": ref_type, "ref_name": ref_name, "ref_hash": ref_hash, "created_at_utc": created_at_utc, "ingested_at_utc": ingested_at_utc, "table_identifier": table_identifier, "snapshot_id": int(snapshot_id), "metadata_location": metadata_location, "manifest_sha256": manifest_sha256, "manifest_json": manifest_json, "created_by": created_by, "description": description, "release_description": description, } row_values = [] missing_required = [] for field in target_schema.fields: name = field.name if name in base_values: value = _convert_value_for_type(field, base_values[name]) row_values.append(value) continue if field.nullable: row_values.append(None) continue missing_required.append(name) if missing_required: raise RuntimeError( "Cannot append to registry table " f"{releases_table}. Missing required columns with no known mapping: {', '.join(missing_required)}" ) df = spark.createDataFrame([tuple(row_values)], schema=target_schema) df.writeTo(releases_table).append() def main() -> None: p = argparse.ArgumentParser(description="Create a release tag + manifest + registry row for a table.") p.add_argument("--release-name", required=True) p.add_argument("--table", default="lake.db1.messages") p.add_argument("--nessie-uri", default=os.getenv("NESSIE_URI", "http://nessie:19120/api/v2")) p.add_argument("--manifest-out", required=True) p.add_argument("--description", default="Messages release") p.add_argument("--created-by", default=os.getenv("USER", "unknown")) p.add_argument("--releases-table", default=os.getenv("RELEASES_TABLE", "lake.db1.releases_v2")) p.add_argument("--skip-registry", action="store_true") args = p.parse_args() created_at = now_iso() tag_ref = ensure_tag(args.nessie_uri, args.release_name) ref_hash = extract_ref_hash(tag_ref) spark = SparkSession.builder.appName("create-release-manifest").getOrCreate() snap_row = spark.sql( f"SELECT snapshot_id FROM {args.table}.snapshots ORDER BY committed_at DESC LIMIT 1" ).collect() if not snap_row: raise RuntimeError(f"No snapshots found for table {args.table}") snapshot_id = int(snap_row[0]["snapshot_id"]) meta_row = spark.sql( f"SELECT file AS metadata_location FROM {args.table}.metadata_log_entries ORDER BY timestamp DESC LIMIT 1" ).collect() if not meta_row: raise RuntimeError(f"No metadata log entries found for table {args.table}") metadata_location = str(meta_row[0]["metadata_location"]) manifest = { "schema_version": "lakehouse-release-manifest/v1", "release": { "name": args.release_name, "created_at_utc": created_at, "created_by": args.created_by, "description": args.description, }, "nessie": { "uri": args.nessie_uri, "ref": { "type": "tag", "name": args.release_name, "hash": ref_hash, }, }, "tables": [ { "identifier": args.table, "format": "iceberg", "current_snapshot_id": snapshot_id, "metadata_location": metadata_location, } ], } manifest_json = json.dumps(manifest, ensure_ascii=False, indent=2) manifest_sha256 = hashlib.sha256(manifest_json.encode("utf-8")).hexdigest() os.makedirs(os.path.dirname(args.manifest_out) or ".", exist_ok=True) with open(args.manifest_out, "w", encoding="utf-8") as f: f.write(manifest_json) if not args.skip_registry: create_registry_table_if_missing(spark, args.releases_table) append_registry_row( spark=spark, releases_table=args.releases_table, release_name=args.release_name, ref_type="tag", ref_name=args.release_name, ref_hash=ref_hash, created_at_utc=created_at, ingested_at_utc=now_iso(), table_identifier=args.table, snapshot_id=snapshot_id, metadata_location=metadata_location, manifest_sha256=manifest_sha256, manifest_json=manifest_json, created_by=args.created_by, description=args.description, ) print(f"[INFO] release_name={args.release_name}") print(f"[INFO] table={args.table}") print(f"[INFO] ref_hash={ref_hash}") print(f"[INFO] snapshot_id={snapshot_id}") print(f"[INFO] manifest_out={args.manifest_out}") if args.skip_registry: print("[INFO] registry=skipped") else: print(f"[INFO] registry_table={args.releases_table}") if __name__ == "__main__": main()