Spark Parquet Input

For Spark customers, the recommended pattern is to track the parquet source as the input asset rather than hashing the in-memory Spark DataFrame object.

Source: examples/custom-serialize.py

"""Example: use a custom wrapper for Spark DataFrames loaded from parquet.

A DataFrame is not a suitable object to hash directly because the in-memory
Python object does not represent the actual dataset contents in a stable way.

For parquet-backed Spark data:

1. Keep using the Spark DataFrame in your compute function.
2. Wrap it in a small Python class.
3. Implement `to_eqty_asset()` so eqty tracks the parquet source as the input
   asset instead of the opaque in-memory DataFrame object.

This example assumes the parquet dataset already exists on disk.
"""

from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path

from eqty_sdk import Dataset, Signer, compute, init, set_active_signer

try:
    from pyspark.sql import DataFrame, SparkSession
except ImportError as exc:  # pragma: no cover - example-only import guard
    raise SystemExit(
        "This example requires pyspark. Install it with `poetry add pyspark` "
        "or adapt the wrapper to your own Spark environment."
    ) from exc


@dataclass
class SparkParquetFrame:
    """Wrap a Spark DataFrame and expose a stable eqty asset for provenance."""

    dataframe: DataFrame
    parquet_path: Path
    name: str

    def to_eqty_asset(self) -> Dataset:
        # Track the parquet dataset on disk as the true input asset.
        # This is preferable to hashing the Spark DataFrame object itself.
        return Dataset.from_path(
            self.parquet_path,
            name=self.name,
            description="Spark DataFrame backed by a parquet dataset",
        )


@compute(
    metadata={
        "description": "Demonstrates how to pass a Spark DataFrame while tracking the parquet source asset.",
    }
)
def count_rows(frame: SparkParquetFrame) -> int:
    """Count rows in a parquet-backed Spark DataFrame."""
    return frame.dataframe.count()


def main() -> None:
    cfg = init()
    signer = Signer.new()
    set_active_signer(signer)

    parquet_path = Path("examples/data/sample.parquet").resolve()
    spark = SparkSession.builder.appName("eqty-spark-example").master("local[*]").getOrCreate()

    try:
        spark_df = spark.read.parquet(str(parquet_path))
        eqty_frame = SparkParquetFrame(
            dataframe=spark_df,
            parquet_path=parquet_path,
            name="customer-input-parquet",
        )

        result = count_rows(eqty_frame)
        print(f"default context: {cfg.get_default_context()}")
        print(f"row count: {result}")
    finally:
        spark.stop()


if __name__ == "__main__":
    main()

This works because compute inputs that implement to_eqty_asset() are converted into SDK assets before the computation is registered.