jecio/create_release_manifest.py

280 lines
9.3 KiB
Python
Raw Permalink Normal View History

import argparse
import hashlib
import json
import os
import urllib.error
import urllib.parse
import urllib.request
from datetime import datetime, timezone
from pyspark.sql import SparkSession
from pyspark.sql import types as T
def now_iso() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace('+00:00', 'Z')
def http_json(method: str, url: str, payload: dict | None = None) -> dict:
data = json.dumps(payload).encode("utf-8") if payload is not None else None
req = urllib.request.Request(url, data=data, method=method)
req.add_header("Content-Type", "application/json")
with urllib.request.urlopen(req, timeout=30) as resp:
body = resp.read().decode("utf-8")
return json.loads(body) if body else {}
def get_ref(nessie_uri: str, ref_name: str) -> dict | None:
try:
return http_json("GET", f"{nessie_uri.rstrip('/')}/trees/{urllib.parse.quote(ref_name, safe='')}")
except urllib.error.HTTPError as e:
if e.code == 404:
return None
raise
def extract_ref_hash(ref_obj: dict) -> str:
# Nessie responses can vary by endpoint/version:
# - {"type":"BRANCH","name":"main","hash":"..."}
# - {"reference":{"type":"BRANCH","name":"main","hash":"..."}}
if isinstance(ref_obj.get("hash"), str) and ref_obj["hash"]:
return ref_obj["hash"]
reference = ref_obj.get("reference")
if isinstance(reference, dict) and isinstance(reference.get("hash"), str) and reference["hash"]:
return reference["hash"]
raise KeyError("hash")
def ensure_tag(nessie_uri: str, tag_name: str) -> dict:
existing = get_ref(nessie_uri, tag_name)
if existing is not None:
return existing
main_ref = http_json("GET", f"{nessie_uri.rstrip('/')}/trees/main")
payload = {
"type": "BRANCH",
"name": "main",
"hash": extract_ref_hash(main_ref),
}
query = urllib.parse.urlencode({"name": tag_name, "type": "TAG"})
http_json("POST", f"{nessie_uri.rstrip('/')}/trees?{query}", payload)
created = get_ref(nessie_uri, tag_name)
if created is None:
raise RuntimeError(f"Tag creation appeared to succeed but tag '{tag_name}' is not retrievable")
return created
def create_registry_table_if_missing(spark: SparkSession, releases_table: str) -> None:
spark.sql(
f"""
CREATE TABLE IF NOT EXISTS {releases_table} (
release_name STRING,
ref_type STRING,
ref_name STRING,
ref_hash STRING,
created_at_utc STRING,
ingested_at_utc STRING,
table_identifier STRING,
snapshot_id BIGINT,
metadata_location STRING,
manifest_sha256 STRING,
manifest_json STRING
) USING iceberg
"""
)
def _to_utc_datetime(value: str):
# Accept ISO strings with 'Z' suffix.
return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(timezone.utc)
def _convert_value_for_type(field: T.StructField, value):
if value is None:
return None
dt = field.dataType
if isinstance(dt, T.StringType):
return str(value)
if isinstance(dt, T.LongType):
return int(value)
if isinstance(dt, T.IntegerType):
return int(value)
if isinstance(dt, T.ShortType):
return int(value)
if isinstance(dt, T.ByteType):
return int(value)
if isinstance(dt, T.BooleanType):
return bool(value)
if isinstance(dt, T.FloatType):
return float(value)
if isinstance(dt, T.DoubleType):
return float(value)
if isinstance(dt, T.TimestampType):
if isinstance(value, datetime):
return value
return _to_utc_datetime(str(value))
if isinstance(dt, T.DateType):
if isinstance(value, datetime):
return value.date()
return _to_utc_datetime(str(value)).date()
# Leave unsupported/complex types as-is; Spark can still validate and fail clearly.
return value
def append_registry_row(
spark: SparkSession,
releases_table: str,
release_name: str,
ref_type: str,
ref_name: str,
ref_hash: str,
created_at_utc: str,
ingested_at_utc: str,
table_identifier: str,
snapshot_id: int,
metadata_location: str,
manifest_sha256: str,
manifest_json: str,
created_by: str,
description: str,
) -> None:
target_schema = spark.table(releases_table).schema
base_values = {
"release_name": release_name,
"ref_type": ref_type,
"ref_name": ref_name,
"ref_hash": ref_hash,
"created_at_utc": created_at_utc,
"ingested_at_utc": ingested_at_utc,
"table_identifier": table_identifier,
"snapshot_id": int(snapshot_id),
"metadata_location": metadata_location,
"manifest_sha256": manifest_sha256,
"manifest_json": manifest_json,
"created_by": created_by,
"description": description,
"release_description": description,
}
row_values = []
missing_required = []
for field in target_schema.fields:
name = field.name
if name in base_values:
value = _convert_value_for_type(field, base_values[name])
row_values.append(value)
continue
if field.nullable:
row_values.append(None)
continue
missing_required.append(name)
if missing_required:
raise RuntimeError(
"Cannot append to registry table "
f"{releases_table}. Missing required columns with no known mapping: {', '.join(missing_required)}"
)
df = spark.createDataFrame([tuple(row_values)], schema=target_schema)
df.writeTo(releases_table).append()
def main() -> None:
p = argparse.ArgumentParser(description="Create a release tag + manifest + registry row for a table.")
p.add_argument("--release-name", required=True)
p.add_argument("--table", default="lake.db1.messages")
p.add_argument("--nessie-uri", default=os.getenv("NESSIE_URI", "http://nessie:19120/api/v2"))
p.add_argument("--manifest-out", required=True)
p.add_argument("--description", default="Messages release")
p.add_argument("--created-by", default=os.getenv("USER", "unknown"))
p.add_argument("--releases-table", default=os.getenv("RELEASES_TABLE", "lake.db1.releases_v2"))
p.add_argument("--skip-registry", action="store_true")
args = p.parse_args()
created_at = now_iso()
tag_ref = ensure_tag(args.nessie_uri, args.release_name)
ref_hash = extract_ref_hash(tag_ref)
spark = SparkSession.builder.appName("create-release-manifest").getOrCreate()
snap_row = spark.sql(
f"SELECT snapshot_id FROM {args.table}.snapshots ORDER BY committed_at DESC LIMIT 1"
).collect()
if not snap_row:
raise RuntimeError(f"No snapshots found for table {args.table}")
snapshot_id = int(snap_row[0]["snapshot_id"])
meta_row = spark.sql(
f"SELECT file AS metadata_location FROM {args.table}.metadata_log_entries ORDER BY timestamp DESC LIMIT 1"
).collect()
if not meta_row:
raise RuntimeError(f"No metadata log entries found for table {args.table}")
metadata_location = str(meta_row[0]["metadata_location"])
manifest = {
"schema_version": "lakehouse-release-manifest/v1",
"release": {
"name": args.release_name,
"created_at_utc": created_at,
"created_by": args.created_by,
"description": args.description,
},
"nessie": {
"uri": args.nessie_uri,
"ref": {
"type": "tag",
"name": args.release_name,
"hash": ref_hash,
},
},
"tables": [
{
"identifier": args.table,
"format": "iceberg",
"current_snapshot_id": snapshot_id,
"metadata_location": metadata_location,
}
],
}
manifest_json = json.dumps(manifest, ensure_ascii=False, indent=2)
manifest_sha256 = hashlib.sha256(manifest_json.encode("utf-8")).hexdigest()
os.makedirs(os.path.dirname(args.manifest_out) or ".", exist_ok=True)
with open(args.manifest_out, "w", encoding="utf-8") as f:
f.write(manifest_json)
if not args.skip_registry:
create_registry_table_if_missing(spark, args.releases_table)
append_registry_row(
spark=spark,
releases_table=args.releases_table,
release_name=args.release_name,
ref_type="tag",
ref_name=args.release_name,
ref_hash=ref_hash,
created_at_utc=created_at,
ingested_at_utc=now_iso(),
table_identifier=args.table,
snapshot_id=snapshot_id,
metadata_location=metadata_location,
manifest_sha256=manifest_sha256,
manifest_json=manifest_json,
created_by=args.created_by,
description=args.description,
)
print(f"[INFO] release_name={args.release_name}")
print(f"[INFO] table={args.table}")
print(f"[INFO] ref_hash={ref_hash}")
print(f"[INFO] snapshot_id={snapshot_id}")
print(f"[INFO] manifest_out={args.manifest_out}")
if args.skip_registry:
print("[INFO] registry=skipped")
else:
print(f"[INFO] registry_table={args.releases_table}")
if __name__ == "__main__":
main()