> ## Documentation Index
> Fetch the complete documentation index at: https://docs.lancedb.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Batch User-Defined Table Functions (UDTFs)

> Use batch UDTFs for N:M transformations like deduplication, clustering, and aggregation across entire tables or partitions.

<Badge>Beta — introduced in Geneva 0.11.0</Badge>

Geneva's standard UDFs operate **row-at-a-time** — one input row produces exactly one output value. Batch User-Defined Table Functions (UDTFs) lift this restriction, enabling **N:M transformations** where the output can have a completely different schema and row count than the input.

| Workflow             | Input  | Output         | Cardinality |
| -------------------- | ------ | -------------- | ----------- |
| Deduplication        | N rows | M rows (M ≤ N) | N:M         |
| Clustering           | N rows | K cluster rows | N:K         |
| Aggregation          | N rows | 1 summary row  | N:1         |
| Cross-row join/merge | N rows | M rows         | N:M         |

## Defining a Batch UDTF

Use the `@udtf` decorator on a class or function. The UDTF receives a query builder over the source data and **yields** `pa.RecordBatch` objects with an arbitrary output schema.

### Class-based

```python theme={"theme":{"light":"vitesse-light","dark":"catppuccin-mocha"}}
from geneva import udtf
import pyarrow as pa
from collections.abc import Iterator

@udtf(
    output_schema=pa.schema([
        pa.field("row_id", pa.int64()),
        pa.field("cluster_id", pa.int64()),
        pa.field("is_duplicate", pa.bool_()),
    ]),
    input_columns=["row_id", "phash"],
    num_cpus=4,
    memory=8 * 1024**3,  # 8 GiB
)
class PHashDedupe:
    def __init__(self, threshold: int = 4):
        self.threshold = threshold

    def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]:
        data = source.to_arrow().select(["row_id", "phash"])
        clusters = self._cluster(data)
        yield pa.RecordBatch.from_pydict({
            "row_id": clusters["row_id"],
            "cluster_id": clusters["cluster_id"],
            "is_duplicate": clusters["is_duplicate"],
        })
```

### Function-based

```python theme={"theme":{"light":"vitesse-light","dark":"catppuccin-mocha"}}
@udtf(output_schema=pa.schema([
    pa.field("label", pa.string()),
    pa.field("count", pa.int64()),
    pa.field("mean_score", pa.float64()),
]))
def group_stats(source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]:
    df = source.to_pandas()
    agg = df.groupby("label").agg(
        count=("label", "size"),
        mean_score=("score", "mean"),
    ).reset_index()
    yield pa.RecordBatch.from_pandas(agg)
```

### Decorator Parameters

| Parameter                     | Type                | Description                                                                                         |
| ----------------------------- | ------------------- | --------------------------------------------------------------------------------------------------- |
| `output_schema`               | `pa.Schema`         | **Required.** Arrow schema of the output table.                                                     |
| `input_columns`               | `list[str] \| None` | Restrict which source columns are visible. `None` means all.                                        |
| `partition_by`                | `str \| None`       | Column name for partition-parallel execution.                                                       |
| `partition_by_indexed_column` | `str \| None`       | Column name with an IVF index for index-based partitioning. Mutually exclusive with `partition_by`. |
| `num_cpus`                    | `float`             | Ray CPU resource request per worker.                                                                |
| `num_gpus`                    | `float`             | Ray GPU resource request per worker.                                                                |
| `memory`                      | `int \| None`       | Ray memory resource request in bytes.                                                               |
| `on_error`                    |                     | Error handling configuration (see [Error Handling](#error-handling)).                               |

## Creating and Refreshing a UDTF View

Batch UDTFs are always attached to a persistent view via `create_udtf_view()`. Call `refresh()` to populate or update the view.

```python theme={"theme":{"light":"vitesse-light","dark":"catppuccin-mocha"}}
conn = geneva.connect("/data/mydb")
images = conn.open_table("images")

# Create the UDTF view
deduped = conn.create_udtf_view(
    "deduped_images",
    source=images.search(None).select(["row_id", "phash"]),
    udtf=PHashDedupe(threshold=4),
)

# Populate the view
deduped.refresh()
```

UDTF views use the same cluster infrastructure as other Geneva jobs:

```python theme={"theme":{"light":"vitesse-light","dark":"catppuccin-mocha"}}
# KubeRay cluster
with conn.context(cluster="my-cluster", manifest="my-manifest"):
    deduped.refresh()

# Local Ray
with conn.local_ray_context():
    deduped.refresh()
```

### Version-aware refresh

On each refresh, Geneva checks the source table's version against the version stored in the view metadata. If the source has not changed, the refresh is skipped entirely — an O(1) check.

## Execution Modes

### Single-worker (no partitioning)

When neither `partition_by` nor `partition_by_indexed_column` is set, the UDTF runs as a **single Ray task** with access to the entire source dataset. Use this for global operations that need cross-row visibility.

```python theme={"theme":{"light":"vitesse-light","dark":"catppuccin-mocha"}}
@udtf(output_schema=...)
class GlobalAggregation:
    def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]:
        all_data = source.to_arrow()
        result = expensive_cross_row_computation(all_data)
        yield result.to_batches()[0]
```

### Partition-parallel (`partition_by`)

The framework groups source data by the partition column and dispatches each partition as an independent Ray task. Use this when the computation is naturally parallelizable by some grouping key.

```python theme={"theme":{"light":"vitesse-light","dark":"catppuccin-mocha"}}
@udtf(
    output_schema=pa.schema([
        ("row_id_a", pa.string()),
        ("row_id_b", pa.string()),
        ("hamming_dist", pa.int32()),
    ]),
    partition_by="partition_id",
    num_cpus=2,
)
class EdgeDetection:
    def __init__(self, threshold: int = 4):
        self.threshold = threshold

    def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]:
        data = source.to_arrow()
        edges = self._pairwise_compare(data, self.threshold)
        if edges:
            yield pa.RecordBatch.from_pydict(edges)
```

### Index-based partitioning (`partition_by_indexed_column`)

Instead of partitioning by a materialized column, the framework reads partition assignments directly from an existing **IVF vector index** (IVF\_FLAT, IVF\_PQ, IVF\_HNSW\_FLAT, IVF\_HNSW\_SQ, etc.). This avoids materializing a `partition_id` column and keeps partitions synchronized with the index.

```python theme={"theme":{"light":"vitesse-light","dark":"catppuccin-mocha"}}
from geneva.partitioning import create_ivf_flat_index

# 1. Build an IVF index on the source table
create_ivf_flat_index(images, "phash", k=16)

# 2. Define the UDTF with index-based partitioning
@udtf(
    output_schema=pa.schema([
        ("row_id_a", pa.string()),
        ("row_id_b", pa.string()),
        ("hamming_dist", pa.int32()),
    ]),
    partition_by_indexed_column="phash",
    num_cpus=2,
)
class IndexPartitionedEdgeDetection:
    def __init__(self, threshold: int = 4):
        self.threshold = threshold

    def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]:
        data = source.to_arrow()
        edges = self._pairwise_compare(data, self.threshold)
        if edges:
            yield pa.RecordBatch.from_pydict(edges)
```

|                              | `partition_by`             | `partition_by_indexed_column`         |
| ---------------------------- | -------------------------- | ------------------------------------- |
| Partition source             | Column values (SQL filter) | IVF index partitions (row ID take)    |
| Requires materialized column | Yes                        | No — reads from index metadata        |
| Partition count              | Number of distinct values  | Number of non-empty index partitions  |
| Sync with index              | Manual                     | Automatic — always reads latest index |

<Warning>
  `partition_by` and `partition_by_indexed_column` are **mutually exclusive**. Setting both raises `ValueError`.
</Warning>

## Yielding Batches

The UDTF yields one or more `pa.RecordBatch` or `pa.Table` objects. Each batch must conform to `output_schema`. The framework validates each batch, writes it, and optionally checkpoints it.

```python theme={"theme":{"light":"vitesse-light","dark":"catppuccin-mocha"}}
# Streaming — yield per source batch (memory-efficient)
def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]:
    for batch in source.to_batches(batch_size=1024):
        yield transform(batch)

# Bulk — load all, compute, yield once
def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]:
    all_data = source.to_arrow()
    result = expensive_cross_row_computation(all_data)
    yield result.to_batches()[0]
```

<Tip>
  Use the streaming pattern for memory-efficient processing. Use the bulk pattern when the computation inherently requires all data in memory (e.g., clustering, global deduplication).
</Tip>

## Error Handling

Error handling operates at **partition granularity** — the unit of work is the entire `__call__()` execution for a partition (or the full table in single-worker mode).

| Mode               | Behavior                                             | Use case                          |
| ------------------ | ---------------------------------------------------- | --------------------------------- |
| **Fail** (default) | Exception kills the partition, refresh fails         | Correctness-critical UDTFs        |
| **Retry**          | Retry the entire partition with configurable backoff | Transient failures (network, OOM) |
| **Skip**           | Log error, continue with remaining partitions        | Best-effort / tolerant workloads  |

<Warning>
  Unlike standard UDF error handling, there is no row-level skip — UDTFs yield whole batches, so the smallest error unit is the partition.
</Warning>

## Checkpointing

Each yielded batch is checkpointed before reporting completion. On resume after a failure, completed batches are skipped and entire partitions with a `__done__` marker are skipped.

Checkpoint keys include the source table version, so stale checkpoints from a previous source version are automatically ignored when the source changes.

<Warning>
  Checkpointed UDTFs must be **deterministic** — the same input must yield the same batch sequence for resume to work correctly.
</Warning>

## Examples

### K-Means Clustering

```python theme={"theme":{"light":"vitesse-light","dark":"catppuccin-mocha"}}
@udtf(
    output_schema=pa.schema([
        pa.field("row_id", pa.int64()),
        pa.field("cluster_id", pa.int64()),
        pa.field("distance_to_centroid", pa.float64()),
    ]),
    num_cpus=4,
    memory=16 * 1024**3,
)
class KMeansClustering:
    def __init__(self, k: int = 100):
        self.k = k

    def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]:
        import numpy as np
        embeddings = source.to_arrow().select(["row_id", "embedding"])
        row_ids = embeddings.column("row_id").to_pylist()
        vectors = np.stack(embeddings.column("embedding").to_pylist())
        centroids, assignments, distances = self._fit(vectors)

        chunk_size = 10_000
        for start in range(0, len(row_ids), chunk_size):
            end = min(start + chunk_size, len(row_ids))
            yield pa.RecordBatch.from_pydict({
                "row_id": row_ids[start:end],
                "cluster_id": assignments[start:end].tolist(),
                "distance_to_centroid": distances[start:end].tolist(),
            })
```

### Aggregation

```python theme={"theme":{"light":"vitesse-light","dark":"catppuccin-mocha"}}
@udtf(
    output_schema=pa.schema([
        pa.field("label", pa.string()),
        pa.field("count", pa.int64()),
        pa.field("mean_score", pa.float64()),
    ]),
)
class GroupStats:
    def __init__(self, group_by: str = "label"):
        self.group_by = group_by

    def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]:
        df = source.to_pandas()
        agg = df.groupby(self.group_by).agg(
            count=("label", "size"),
            mean_score=("score", "mean"),
        ).reset_index()
        yield pa.RecordBatch.from_pandas(agg)
```

Reference:

* [`create_udtf_view` API](https://lancedb.github.io/geneva/api/connection/#geneva.db.Connection.create_udtf_view)
* [UDTF](https://lancedb.github.io/geneva/api/udtf/) — full `@udtf` / `@batch_udtf` decorator reference including `output_schema`, `partition_by`, `num_gpus`, and `on_error`
