Async Compute

The compute decorator works with both async functions and async generators.

Source: examples/async-compute.py

import asyncio
from pathlib import Path

from eqty_sdk import Dataset, Signer, compute, init, set_active_signer

cfg = init()
signer = Signer.new()
set_active_signer(signer)

model_url = Dataset.from_object("https://huggingface.co/example", name="Model URL")


@compute(
    metadata={
        "name": "Async Function",
        "description": "Download a model asynchronously and return a dataset.",
    }
)
async def compute_download_async(url: Dataset) -> Dataset:
    await asyncio.sleep(1)
    output_model = f"downloaded bytes of {url.value}"
    return Dataset.from_object(output_model, name="Downloaded model")


@compute(
    metadata={
        "name": "Generator Function",
        "description": "Stream model output chunks from an async generator.",
    }
)
async def compute_download_gen(url: Dataset):
    yield f"starting download for {url.name or 'model'}: "
    await asyncio.sleep(0.2)
    yield "this "
    yield "is "
    await asyncio.sleep(0.3)
    yield "a stream"


async def main():
    downloaded = await compute_download_async(model_url)
    print(downloaded.value)

    async for chunk in compute_download_gen(model_url):
        print(chunk, end="")
    print()


asyncio.run(main())

ctx = cfg.get_default_context()
ctx.export(Path("./manifests/async-compute.json"))

The key distinction:

  • async functions finalize once they return a value
  • async generators stream chunks and finalize when the generator completes