280 lines
9.3 KiB
Python
280 lines
9.3 KiB
Python
import argparse
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from datetime import datetime, timezone
|
|
|
|
from pyspark.sql import SparkSession
|
|
from pyspark.sql import types as T
|
|
|
|
|
|
def now_iso() -> str:
|
|
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace('+00:00', 'Z')
|
|
|
|
|
|
def http_json(method: str, url: str, payload: dict | None = None) -> dict:
|
|
data = json.dumps(payload).encode("utf-8") if payload is not None else None
|
|
req = urllib.request.Request(url, data=data, method=method)
|
|
req.add_header("Content-Type", "application/json")
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
body = resp.read().decode("utf-8")
|
|
return json.loads(body) if body else {}
|
|
|
|
|
|
def get_ref(nessie_uri: str, ref_name: str) -> dict | None:
|
|
try:
|
|
return http_json("GET", f"{nessie_uri.rstrip('/')}/trees/{urllib.parse.quote(ref_name, safe='')}")
|
|
except urllib.error.HTTPError as e:
|
|
if e.code == 404:
|
|
return None
|
|
raise
|
|
|
|
|
|
def extract_ref_hash(ref_obj: dict) -> str:
|
|
# Nessie responses can vary by endpoint/version:
|
|
# - {"type":"BRANCH","name":"main","hash":"..."}
|
|
# - {"reference":{"type":"BRANCH","name":"main","hash":"..."}}
|
|
if isinstance(ref_obj.get("hash"), str) and ref_obj["hash"]:
|
|
return ref_obj["hash"]
|
|
reference = ref_obj.get("reference")
|
|
if isinstance(reference, dict) and isinstance(reference.get("hash"), str) and reference["hash"]:
|
|
return reference["hash"]
|
|
raise KeyError("hash")
|
|
|
|
|
|
def ensure_tag(nessie_uri: str, tag_name: str) -> dict:
|
|
existing = get_ref(nessie_uri, tag_name)
|
|
if existing is not None:
|
|
return existing
|
|
|
|
main_ref = http_json("GET", f"{nessie_uri.rstrip('/')}/trees/main")
|
|
payload = {
|
|
"type": "BRANCH",
|
|
"name": "main",
|
|
"hash": extract_ref_hash(main_ref),
|
|
}
|
|
query = urllib.parse.urlencode({"name": tag_name, "type": "TAG"})
|
|
http_json("POST", f"{nessie_uri.rstrip('/')}/trees?{query}", payload)
|
|
created = get_ref(nessie_uri, tag_name)
|
|
if created is None:
|
|
raise RuntimeError(f"Tag creation appeared to succeed but tag '{tag_name}' is not retrievable")
|
|
return created
|
|
|
|
|
|
def create_registry_table_if_missing(spark: SparkSession, releases_table: str) -> None:
|
|
spark.sql(
|
|
f"""
|
|
CREATE TABLE IF NOT EXISTS {releases_table} (
|
|
release_name STRING,
|
|
ref_type STRING,
|
|
ref_name STRING,
|
|
ref_hash STRING,
|
|
created_at_utc STRING,
|
|
ingested_at_utc STRING,
|
|
table_identifier STRING,
|
|
snapshot_id BIGINT,
|
|
metadata_location STRING,
|
|
manifest_sha256 STRING,
|
|
manifest_json STRING
|
|
) USING iceberg
|
|
"""
|
|
)
|
|
|
|
|
|
def _to_utc_datetime(value: str):
|
|
# Accept ISO strings with 'Z' suffix.
|
|
return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(timezone.utc)
|
|
|
|
|
|
def _convert_value_for_type(field: T.StructField, value):
|
|
if value is None:
|
|
return None
|
|
dt = field.dataType
|
|
if isinstance(dt, T.StringType):
|
|
return str(value)
|
|
if isinstance(dt, T.LongType):
|
|
return int(value)
|
|
if isinstance(dt, T.IntegerType):
|
|
return int(value)
|
|
if isinstance(dt, T.ShortType):
|
|
return int(value)
|
|
if isinstance(dt, T.ByteType):
|
|
return int(value)
|
|
if isinstance(dt, T.BooleanType):
|
|
return bool(value)
|
|
if isinstance(dt, T.FloatType):
|
|
return float(value)
|
|
if isinstance(dt, T.DoubleType):
|
|
return float(value)
|
|
if isinstance(dt, T.TimestampType):
|
|
if isinstance(value, datetime):
|
|
return value
|
|
return _to_utc_datetime(str(value))
|
|
if isinstance(dt, T.DateType):
|
|
if isinstance(value, datetime):
|
|
return value.date()
|
|
return _to_utc_datetime(str(value)).date()
|
|
# Leave unsupported/complex types as-is; Spark can still validate and fail clearly.
|
|
return value
|
|
|
|
|
|
def append_registry_row(
|
|
spark: SparkSession,
|
|
releases_table: str,
|
|
release_name: str,
|
|
ref_type: str,
|
|
ref_name: str,
|
|
ref_hash: str,
|
|
created_at_utc: str,
|
|
ingested_at_utc: str,
|
|
table_identifier: str,
|
|
snapshot_id: int,
|
|
metadata_location: str,
|
|
manifest_sha256: str,
|
|
manifest_json: str,
|
|
created_by: str,
|
|
description: str,
|
|
) -> None:
|
|
target_schema = spark.table(releases_table).schema
|
|
base_values = {
|
|
"release_name": release_name,
|
|
"ref_type": ref_type,
|
|
"ref_name": ref_name,
|
|
"ref_hash": ref_hash,
|
|
"created_at_utc": created_at_utc,
|
|
"ingested_at_utc": ingested_at_utc,
|
|
"table_identifier": table_identifier,
|
|
"snapshot_id": int(snapshot_id),
|
|
"metadata_location": metadata_location,
|
|
"manifest_sha256": manifest_sha256,
|
|
"manifest_json": manifest_json,
|
|
"created_by": created_by,
|
|
"description": description,
|
|
"release_description": description,
|
|
}
|
|
|
|
row_values = []
|
|
missing_required = []
|
|
for field in target_schema.fields:
|
|
name = field.name
|
|
if name in base_values:
|
|
value = _convert_value_for_type(field, base_values[name])
|
|
row_values.append(value)
|
|
continue
|
|
if field.nullable:
|
|
row_values.append(None)
|
|
continue
|
|
missing_required.append(name)
|
|
|
|
if missing_required:
|
|
raise RuntimeError(
|
|
"Cannot append to registry table "
|
|
f"{releases_table}. Missing required columns with no known mapping: {', '.join(missing_required)}"
|
|
)
|
|
|
|
df = spark.createDataFrame([tuple(row_values)], schema=target_schema)
|
|
df.writeTo(releases_table).append()
|
|
|
|
|
|
def main() -> None:
|
|
p = argparse.ArgumentParser(description="Create a release tag + manifest + registry row for a table.")
|
|
p.add_argument("--release-name", required=True)
|
|
p.add_argument("--table", default="lake.db1.messages")
|
|
p.add_argument("--nessie-uri", default=os.getenv("NESSIE_URI", "http://nessie:19120/api/v2"))
|
|
p.add_argument("--manifest-out", required=True)
|
|
p.add_argument("--description", default="Messages release")
|
|
p.add_argument("--created-by", default=os.getenv("USER", "unknown"))
|
|
p.add_argument("--releases-table", default=os.getenv("RELEASES_TABLE", "lake.db1.releases_v2"))
|
|
p.add_argument("--skip-registry", action="store_true")
|
|
args = p.parse_args()
|
|
|
|
created_at = now_iso()
|
|
tag_ref = ensure_tag(args.nessie_uri, args.release_name)
|
|
ref_hash = extract_ref_hash(tag_ref)
|
|
|
|
spark = SparkSession.builder.appName("create-release-manifest").getOrCreate()
|
|
|
|
snap_row = spark.sql(
|
|
f"SELECT snapshot_id FROM {args.table}.snapshots ORDER BY committed_at DESC LIMIT 1"
|
|
).collect()
|
|
if not snap_row:
|
|
raise RuntimeError(f"No snapshots found for table {args.table}")
|
|
snapshot_id = int(snap_row[0]["snapshot_id"])
|
|
|
|
meta_row = spark.sql(
|
|
f"SELECT file AS metadata_location FROM {args.table}.metadata_log_entries ORDER BY timestamp DESC LIMIT 1"
|
|
).collect()
|
|
if not meta_row:
|
|
raise RuntimeError(f"No metadata log entries found for table {args.table}")
|
|
metadata_location = str(meta_row[0]["metadata_location"])
|
|
|
|
manifest = {
|
|
"schema_version": "lakehouse-release-manifest/v1",
|
|
"release": {
|
|
"name": args.release_name,
|
|
"created_at_utc": created_at,
|
|
"created_by": args.created_by,
|
|
"description": args.description,
|
|
},
|
|
"nessie": {
|
|
"uri": args.nessie_uri,
|
|
"ref": {
|
|
"type": "tag",
|
|
"name": args.release_name,
|
|
"hash": ref_hash,
|
|
},
|
|
},
|
|
"tables": [
|
|
{
|
|
"identifier": args.table,
|
|
"format": "iceberg",
|
|
"current_snapshot_id": snapshot_id,
|
|
"metadata_location": metadata_location,
|
|
}
|
|
],
|
|
}
|
|
|
|
manifest_json = json.dumps(manifest, ensure_ascii=False, indent=2)
|
|
manifest_sha256 = hashlib.sha256(manifest_json.encode("utf-8")).hexdigest()
|
|
|
|
os.makedirs(os.path.dirname(args.manifest_out) or ".", exist_ok=True)
|
|
with open(args.manifest_out, "w", encoding="utf-8") as f:
|
|
f.write(manifest_json)
|
|
|
|
if not args.skip_registry:
|
|
create_registry_table_if_missing(spark, args.releases_table)
|
|
append_registry_row(
|
|
spark=spark,
|
|
releases_table=args.releases_table,
|
|
release_name=args.release_name,
|
|
ref_type="tag",
|
|
ref_name=args.release_name,
|
|
ref_hash=ref_hash,
|
|
created_at_utc=created_at,
|
|
ingested_at_utc=now_iso(),
|
|
table_identifier=args.table,
|
|
snapshot_id=snapshot_id,
|
|
metadata_location=metadata_location,
|
|
manifest_sha256=manifest_sha256,
|
|
manifest_json=manifest_json,
|
|
created_by=args.created_by,
|
|
description=args.description,
|
|
)
|
|
|
|
print(f"[INFO] release_name={args.release_name}")
|
|
print(f"[INFO] table={args.table}")
|
|
print(f"[INFO] ref_hash={ref_hash}")
|
|
print(f"[INFO] snapshot_id={snapshot_id}")
|
|
print(f"[INFO] manifest_out={args.manifest_out}")
|
|
if args.skip_registry:
|
|
print("[INFO] registry=skipped")
|
|
else:
|
|
print(f"[INFO] registry_table={args.releases_table}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|