Skip to main content
Beta — introduced in Geneva 0.11.0 Geneva’s standard UDFs operate row-at-a-time — one input row produces exactly one output value. Batch User-Defined Table Functions (UDTFs) lift this restriction, enabling N:M transformations where the output can have a completely different schema and row count than the input.
WorkflowInputOutputCardinality
DeduplicationN rowsM rows (M ≤ N)N:M
ClusteringN rowsK cluster rowsN:K
AggregationN rows1 summary rowN:1
Cross-row join/mergeN rowsM rowsN:M

Defining a Batch UDTF

Use the @udtf decorator on a class or function. The UDTF receives a query builder over the source data and yields pa.RecordBatch objects with an arbitrary output schema.

Class-based

from geneva import udtf
import pyarrow as pa
from collections.abc import Iterator

@udtf(
    output_schema=pa.schema([
        pa.field("row_id", pa.int64()),
        pa.field("cluster_id", pa.int64()),
        pa.field("is_duplicate", pa.bool_()),
    ]),
    input_columns=["row_id", "phash"],
    num_cpus=4,
    memory=8 * 1024**3,  # 8 GiB
)
class PHashDedupe:
    def __init__(self, threshold: int = 4):
        self.threshold = threshold

    def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]:
        data = source.to_arrow().select(["row_id", "phash"])
        clusters = self._cluster(data)
        yield pa.RecordBatch.from_pydict({
            "row_id": clusters["row_id"],
            "cluster_id": clusters["cluster_id"],
            "is_duplicate": clusters["is_duplicate"],
        })

Function-based

@udtf(output_schema=pa.schema([
    pa.field("label", pa.string()),
    pa.field("count", pa.int64()),
    pa.field("mean_score", pa.float64()),
]))
def group_stats(source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]:
    df = source.to_pandas()
    agg = df.groupby("label").agg(
        count=("label", "size"),
        mean_score=("score", "mean"),
    ).reset_index()
    yield pa.RecordBatch.from_pandas(agg)

Decorator Parameters

ParameterTypeDescription
output_schemapa.SchemaRequired. Arrow schema of the output table.
input_columnslist[str] | NoneRestrict which source columns are visible. None means all.
partition_bystr | NoneColumn name for partition-parallel execution.
partition_by_indexed_columnstr | NoneColumn name with an IVF index for index-based partitioning. Mutually exclusive with partition_by.
num_cpusfloatRay CPU resource request per worker.
num_gpusfloatRay GPU resource request per worker.
memoryint | NoneRay memory resource request in bytes.
on_errorError handling configuration (see Error Handling).

Creating and Refreshing a UDTF View

Batch UDTFs are always attached to a persistent view via create_udtf_view(). Call refresh() to populate or update the view.
conn = geneva.connect("/data/mydb")
images = conn.open_table("images")

# Create the UDTF view
deduped = conn.create_udtf_view(
    "deduped_images",
    source=images.search(None).select(["row_id", "phash"]),
    udtf=PHashDedupe(threshold=4),
)

# Populate the view
deduped.refresh()
UDTF views use the same cluster infrastructure as other Geneva jobs:
# KubeRay cluster
with conn.context(cluster="my-cluster", manifest="my-manifest"):
    deduped.refresh()

# Local Ray
with conn.local_ray_context():
    deduped.refresh()

Version-aware refresh

On each refresh, Geneva checks the source table’s version against the version stored in the view metadata. If the source has not changed, the refresh is skipped entirely — an O(1) check.

Execution Modes

Single-worker (no partitioning)

When neither partition_by nor partition_by_indexed_column is set, the UDTF runs as a single Ray task with access to the entire source dataset. Use this for global operations that need cross-row visibility.
@udtf(output_schema=...)
class GlobalAggregation:
    def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]:
        all_data = source.to_arrow()
        result = expensive_cross_row_computation(all_data)
        yield result.to_batches()[0]

Partition-parallel (partition_by)

The framework groups source data by the partition column and dispatches each partition as an independent Ray task. Use this when the computation is naturally parallelizable by some grouping key.
@udtf(
    output_schema=pa.schema([
        ("row_id_a", pa.string()),
        ("row_id_b", pa.string()),
        ("hamming_dist", pa.int32()),
    ]),
    partition_by="partition_id",
    num_cpus=2,
)
class EdgeDetection:
    def __init__(self, threshold: int = 4):
        self.threshold = threshold

    def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]:
        data = source.to_arrow()
        edges = self._pairwise_compare(data, self.threshold)
        if edges:
            yield pa.RecordBatch.from_pydict(edges)

Index-based partitioning (partition_by_indexed_column)

Instead of partitioning by a materialized column, the framework reads partition assignments directly from an existing IVF vector index (IVF_FLAT, IVF_PQ, IVF_HNSW_SQ, etc.). This avoids materializing a partition_id column and keeps partitions synchronized with the index.
from geneva.partitioning import create_ivf_flat_index

# 1. Build an IVF index on the source table
create_ivf_flat_index(images, "phash", k=16)

# 2. Define the UDTF with index-based partitioning
@udtf(
    output_schema=pa.schema([
        ("row_id_a", pa.string()),
        ("row_id_b", pa.string()),
        ("hamming_dist", pa.int32()),
    ]),
    partition_by_indexed_column="phash",
    num_cpus=2,
)
class IndexPartitionedEdgeDetection:
    def __init__(self, threshold: int = 4):
        self.threshold = threshold

    def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]:
        data = source.to_arrow()
        edges = self._pairwise_compare(data, self.threshold)
        if edges:
            yield pa.RecordBatch.from_pydict(edges)
partition_bypartition_by_indexed_column
Partition sourceColumn values (SQL filter)IVF index partitions (row ID take)
Requires materialized columnYesNo — reads from index metadata
Partition countNumber of distinct valuesNumber of non-empty index partitions
Sync with indexManualAutomatic — always reads latest index
partition_by and partition_by_indexed_column are mutually exclusive. Setting both raises ValueError.

Yielding Batches

The UDTF yields one or more pa.RecordBatch or pa.Table objects. Each batch must conform to output_schema. The framework validates each batch, writes it, and optionally checkpoints it.
# Streaming — yield per source batch (memory-efficient)
def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]:
    for batch in source.to_batches(batch_size=1024):
        yield transform(batch)

# Bulk — load all, compute, yield once
def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]:
    all_data = source.to_arrow()
    result = expensive_cross_row_computation(all_data)
    yield result.to_batches()[0]
Use the streaming pattern for memory-efficient processing. Use the bulk pattern when the computation inherently requires all data in memory (e.g., clustering, global deduplication).

Error Handling

Error handling operates at partition granularity — the unit of work is the entire __call__() execution for a partition (or the full table in single-worker mode).
ModeBehaviorUse case
Fail (default)Exception kills the partition, refresh failsCorrectness-critical UDTFs
RetryRetry the entire partition with configurable backoffTransient failures (network, OOM)
SkipLog error, continue with remaining partitionsBest-effort / tolerant workloads
Unlike standard UDF error handling, there is no row-level skip — UDTFs yield whole batches, so the smallest error unit is the partition.

Checkpointing

Each yielded batch is checkpointed before reporting completion. On resume after a failure, completed batches are skipped and entire partitions with a __done__ marker are skipped. Checkpoint keys include the source table version, so stale checkpoints from a previous source version are automatically ignored when the source changes.
Checkpointed UDTFs must be deterministic — the same input must yield the same batch sequence for resume to work correctly.

Examples

K-Means Clustering

@udtf(
    output_schema=pa.schema([
        pa.field("row_id", pa.int64()),
        pa.field("cluster_id", pa.int64()),
        pa.field("distance_to_centroid", pa.float64()),
    ]),
    num_cpus=4,
    memory=16 * 1024**3,
)
class KMeansClustering:
    def __init__(self, k: int = 100):
        self.k = k

    def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]:
        import numpy as np
        embeddings = source.to_arrow().select(["row_id", "embedding"])
        row_ids = embeddings.column("row_id").to_pylist()
        vectors = np.stack(embeddings.column("embedding").to_pylist())
        centroids, assignments, distances = self._fit(vectors)

        chunk_size = 10_000
        for start in range(0, len(row_ids), chunk_size):
            end = min(start + chunk_size, len(row_ids))
            yield pa.RecordBatch.from_pydict({
                "row_id": row_ids[start:end],
                "cluster_id": assignments[start:end].tolist(),
                "distance_to_centroid": distances[start:end].tolist(),
            })

Aggregation

@udtf(
    output_schema=pa.schema([
        pa.field("label", pa.string()),
        pa.field("count", pa.int64()),
        pa.field("mean_score", pa.float64()),
    ]),
)
class GroupStats:
    def __init__(self, group_by: str = "label"):
        self.group_by = group_by

    def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]:
        df = source.to_pandas()
        agg = df.groupby(self.group_by).agg(
            count=("label", "size"),
            mean_score=("score", "mean"),
        ).reset_index()
        yield pa.RecordBatch.from_pandas(agg)
Reference: