Beta — introduced in Geneva 0.11.0
Geneva’s standard UDFs operate row-at-a-time — one input row produces exactly one output value. Batch User-Defined Table Functions (UDTFs) lift this restriction, enabling N:M transformations where the output can have a completely different schema and row count than the input.
| Workflow | Input | Output | Cardinality |
|---|
| Deduplication | N rows | M rows (M ≤ N) | N:M |
| Clustering | N rows | K cluster rows | N:K |
| Aggregation | N rows | 1 summary row | N:1 |
| Cross-row join/merge | N rows | M rows | N:M |
Defining a Batch UDTF
Use the @udtf decorator on a class or function. The UDTF receives a query builder over the source data and yields pa.RecordBatch objects with an arbitrary output schema.
Class-based
from geneva import udtf
import pyarrow as pa
from collections.abc import Iterator
@udtf(
output_schema=pa.schema([
pa.field("row_id", pa.int64()),
pa.field("cluster_id", pa.int64()),
pa.field("is_duplicate", pa.bool_()),
]),
input_columns=["row_id", "phash"],
num_cpus=4,
memory=8 * 1024**3, # 8 GiB
)
class PHashDedupe:
def __init__(self, threshold: int = 4):
self.threshold = threshold
def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]:
data = source.to_arrow().select(["row_id", "phash"])
clusters = self._cluster(data)
yield pa.RecordBatch.from_pydict({
"row_id": clusters["row_id"],
"cluster_id": clusters["cluster_id"],
"is_duplicate": clusters["is_duplicate"],
})
Function-based
@udtf(output_schema=pa.schema([
pa.field("label", pa.string()),
pa.field("count", pa.int64()),
pa.field("mean_score", pa.float64()),
]))
def group_stats(source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]:
df = source.to_pandas()
agg = df.groupby("label").agg(
count=("label", "size"),
mean_score=("score", "mean"),
).reset_index()
yield pa.RecordBatch.from_pandas(agg)
Decorator Parameters
| Parameter | Type | Description |
|---|
output_schema | pa.Schema | Required. Arrow schema of the output table. |
input_columns | list[str] | None | Restrict which source columns are visible. None means all. |
partition_by | str | None | Column name for partition-parallel execution. |
partition_by_indexed_column | str | None | Column name with an IVF index for index-based partitioning. Mutually exclusive with partition_by. |
num_cpus | float | Ray CPU resource request per worker. |
num_gpus | float | Ray GPU resource request per worker. |
memory | int | None | Ray memory resource request in bytes. |
on_error | | Error handling configuration (see Error Handling). |
Creating and Refreshing a UDTF View
Batch UDTFs are always attached to a persistent view via create_udtf_view(). Call refresh() to populate or update the view.
conn = geneva.connect("/data/mydb")
images = conn.open_table("images")
# Create the UDTF view
deduped = conn.create_udtf_view(
"deduped_images",
source=images.search(None).select(["row_id", "phash"]),
udtf=PHashDedupe(threshold=4),
)
# Populate the view
deduped.refresh()
UDTF views use the same cluster infrastructure as other Geneva jobs:
# KubeRay cluster
with conn.context(cluster="my-cluster", manifest="my-manifest"):
deduped.refresh()
# Local Ray
with conn.local_ray_context():
deduped.refresh()
Version-aware refresh
On each refresh, Geneva checks the source table’s version against the version stored in the view metadata. If the source has not changed, the refresh is skipped entirely — an O(1) check.
Execution Modes
Single-worker (no partitioning)
When neither partition_by nor partition_by_indexed_column is set, the UDTF runs as a single Ray task with access to the entire source dataset. Use this for global operations that need cross-row visibility.
@udtf(output_schema=...)
class GlobalAggregation:
def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]:
all_data = source.to_arrow()
result = expensive_cross_row_computation(all_data)
yield result.to_batches()[0]
Partition-parallel (partition_by)
The framework groups source data by the partition column and dispatches each partition as an independent Ray task. Use this when the computation is naturally parallelizable by some grouping key.
@udtf(
output_schema=pa.schema([
("row_id_a", pa.string()),
("row_id_b", pa.string()),
("hamming_dist", pa.int32()),
]),
partition_by="partition_id",
num_cpus=2,
)
class EdgeDetection:
def __init__(self, threshold: int = 4):
self.threshold = threshold
def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]:
data = source.to_arrow()
edges = self._pairwise_compare(data, self.threshold)
if edges:
yield pa.RecordBatch.from_pydict(edges)
Index-based partitioning (partition_by_indexed_column)
Instead of partitioning by a materialized column, the framework reads partition assignments directly from an existing IVF vector index (IVF_FLAT, IVF_PQ, IVF_HNSW_SQ, etc.). This avoids materializing a partition_id column and keeps partitions synchronized with the index.
from geneva.partitioning import create_ivf_flat_index
# 1. Build an IVF index on the source table
create_ivf_flat_index(images, "phash", k=16)
# 2. Define the UDTF with index-based partitioning
@udtf(
output_schema=pa.schema([
("row_id_a", pa.string()),
("row_id_b", pa.string()),
("hamming_dist", pa.int32()),
]),
partition_by_indexed_column="phash",
num_cpus=2,
)
class IndexPartitionedEdgeDetection:
def __init__(self, threshold: int = 4):
self.threshold = threshold
def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]:
data = source.to_arrow()
edges = self._pairwise_compare(data, self.threshold)
if edges:
yield pa.RecordBatch.from_pydict(edges)
| partition_by | partition_by_indexed_column |
|---|
| Partition source | Column values (SQL filter) | IVF index partitions (row ID take) |
| Requires materialized column | Yes | No — reads from index metadata |
| Partition count | Number of distinct values | Number of non-empty index partitions |
| Sync with index | Manual | Automatic — always reads latest index |
partition_by and partition_by_indexed_column are mutually exclusive. Setting both raises ValueError.
Yielding Batches
The UDTF yields one or more pa.RecordBatch or pa.Table objects. Each batch must conform to output_schema. The framework validates each batch, writes it, and optionally checkpoints it.
# Streaming — yield per source batch (memory-efficient)
def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]:
for batch in source.to_batches(batch_size=1024):
yield transform(batch)
# Bulk — load all, compute, yield once
def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]:
all_data = source.to_arrow()
result = expensive_cross_row_computation(all_data)
yield result.to_batches()[0]
Use the streaming pattern for memory-efficient processing. Use the bulk pattern when the computation inherently requires all data in memory (e.g., clustering, global deduplication).
Error Handling
Error handling operates at partition granularity — the unit of work is the entire __call__() execution for a partition (or the full table in single-worker mode).
| Mode | Behavior | Use case |
|---|
| Fail (default) | Exception kills the partition, refresh fails | Correctness-critical UDTFs |
| Retry | Retry the entire partition with configurable backoff | Transient failures (network, OOM) |
| Skip | Log error, continue with remaining partitions | Best-effort / tolerant workloads |
Unlike standard UDF error handling, there is no row-level skip — UDTFs yield whole batches, so the smallest error unit is the partition.
Checkpointing
Each yielded batch is checkpointed before reporting completion. On resume after a failure, completed batches are skipped and entire partitions with a __done__ marker are skipped.
Checkpoint keys include the source table version, so stale checkpoints from a previous source version are automatically ignored when the source changes.
Checkpointed UDTFs must be deterministic — the same input must yield the same batch sequence for resume to work correctly.
Examples
K-Means Clustering
@udtf(
output_schema=pa.schema([
pa.field("row_id", pa.int64()),
pa.field("cluster_id", pa.int64()),
pa.field("distance_to_centroid", pa.float64()),
]),
num_cpus=4,
memory=16 * 1024**3,
)
class KMeansClustering:
def __init__(self, k: int = 100):
self.k = k
def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]:
import numpy as np
embeddings = source.to_arrow().select(["row_id", "embedding"])
row_ids = embeddings.column("row_id").to_pylist()
vectors = np.stack(embeddings.column("embedding").to_pylist())
centroids, assignments, distances = self._fit(vectors)
chunk_size = 10_000
for start in range(0, len(row_ids), chunk_size):
end = min(start + chunk_size, len(row_ids))
yield pa.RecordBatch.from_pydict({
"row_id": row_ids[start:end],
"cluster_id": assignments[start:end].tolist(),
"distance_to_centroid": distances[start:end].tolist(),
})
Aggregation
@udtf(
output_schema=pa.schema([
pa.field("label", pa.string()),
pa.field("count", pa.int64()),
pa.field("mean_score", pa.float64()),
]),
)
class GroupStats:
def __init__(self, group_by: str = "label"):
self.group_by = group_by
def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]:
df = source.to_pandas()
agg = df.groupby(self.group_by).agg(
count=("label", "size"),
mean_score=("score", "mean"),
).reset_index()
yield pa.RecordBatch.from_pandas(agg)
Reference: