Spark Parquet Input
For Spark customers, the recommended pattern is to track the parquet source as the input asset rather than hashing the in-memory Spark DataFrame object.
Source: examples/custom-serialize.py
"""Example: use a custom wrapper for Spark DataFrames loaded from parquet.
A DataFrame is not a suitable object to hash directly because the in-memory
Python object does not represent the actual dataset contents in a stable way.
For parquet-backed Spark data:
1. Keep using the Spark DataFrame in your compute function.
2. Wrap it in a small Python class.
3. Implement `to_eqty_asset()` so eqty tracks the parquet source as the input
asset instead of the opaque in-memory DataFrame object.
This example assumes the parquet dataset already exists on disk.
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from eqty_sdk import Dataset, Signer, compute, init, set_active_signer
try:
from pyspark.sql import DataFrame, SparkSession
except ImportError as exc: # pragma: no cover - example-only import guard
raise SystemExit(
"This example requires pyspark. Install it with `poetry add pyspark` "
"or adapt the wrapper to your own Spark environment."
) from exc
@dataclass
class SparkParquetFrame:
"""Wrap a Spark DataFrame and expose a stable eqty asset for provenance."""
dataframe: DataFrame
parquet_path: Path
name: str
def to_eqty_asset(self) -> Dataset:
# Track the parquet dataset on disk as the true input asset.
# This is preferable to hashing the Spark DataFrame object itself.
return Dataset.from_path(
self.parquet_path,
name=self.name,
description="Spark DataFrame backed by a parquet dataset",
)
@compute(
metadata={
"description": "Demonstrates how to pass a Spark DataFrame while tracking the parquet source asset.",
}
)
def count_rows(frame: SparkParquetFrame) -> int:
"""Count rows in a parquet-backed Spark DataFrame."""
return frame.dataframe.count()
def main() -> None:
cfg = init()
signer = Signer.new()
set_active_signer(signer)
parquet_path = Path("examples/data/sample.parquet").resolve()
spark = SparkSession.builder.appName("eqty-spark-example").master("local[*]").getOrCreate()
try:
spark_df = spark.read.parquet(str(parquet_path))
eqty_frame = SparkParquetFrame(
dataframe=spark_df,
parquet_path=parquet_path,
name="customer-input-parquet",
)
result = count_rows(eqty_frame)
print(f"default context: {cfg.get_default_context()}")
print(f"row count: {result}")
finally:
spark.stop()
if __name__ == "__main__":
main()
This works because compute inputs that implement to_eqty_asset() are converted into SDK assets before the computation is registered.