Beta — introduced in Geneva 0.11.0
Standard UDFs produce exactly one output value per input row. Scalar UDTFs enable 1:N row expansion — each source row can produce multiple output rows. The results are stored as a materialized view with MV-style incremental refresh.
| Source Table | Derived Table | Expansion |
|---|
| 1 video row | → N clip rows | Video segmentation |
| 1 document row | → N chunk rows | Text chunking |
| 1 image row | → N tile rows | Image tiling |
Defining a Scalar UDTF
Use the @scalar_udtf decorator on a function that yields output rows. Geneva infers the output schema from the return type annotation.
from geneva import scalar_udtf
from typing import Iterator, NamedTuple
class Clip(NamedTuple):
clip_start: float
clip_end: float
clip_bytes: bytes
@scalar_udtf
def extract_clips(video_path: str, duration: float) -> Iterator[Clip]:
"""Yields multiple clips per video."""
clip_length = 10.0
for start in range(0, int(duration), int(clip_length)):
end = min(start + clip_length, duration)
clip_data = extract_video_segment(video_path, start, end)
yield Clip(clip_start=start, clip_end=end, clip_bytes=clip_data)
Input parameters are bound to source columns by name — the parameter video_path binds to source column video_path, just like standard UDFs.
A scalar UDTF can yield zero rows for a source row. The source row is still marked as processed and will not be retried on the next refresh.
List return pattern
If you prefer to build the full list in memory rather than yielding, you can return a list instead of an Iterator:
@scalar_udtf
def extract_clips(video_path: str, duration: float) -> list[Clip]:
clips = []
for start in range(0, int(duration), 10):
end = min(start + 10, duration)
clips.append(Clip(clip_start=start, clip_end=end, clip_bytes=b"..."))
return clips
Batched scalar UDTF
For vectorized processing, use batch=True. The function receives Arrow arrays and returns a RecordBatch of expanded rows:
@scalar_udtf(batch=True)
def extract_clips(batch: pa.RecordBatch) -> pa.RecordBatch:
"""Process rows in batches. Same 1:N semantic per row."""
...
Creating a Scalar UDTF View
Scalar UDTFs use the existing create_materialized_view API with a udtf= parameter:
import geneva
db = geneva.connect("/data/mydb")
videos = db.open_table("videos")
# Create the 1:N materialized view
clips = db.create_materialized_view(
"clips",
query=videos.search(None).select(["video_path", "metadata"]),
udtf=extract_clips,
)
# Populate — runs the UDTF on every source row
clips.refresh()
The query parameter controls which source columns are inherited. Columns listed in .select() are carried into every child row automatically.
Inheriting source columns
# Only video_path and metadata are inherited into the clips table
clips = db.create_materialized_view(
"clips",
query=videos.search(None).select(["video_path", "metadata"]),
udtf=extract_clips,
)
Inherited Columns
Child rows automatically include the parent’s columns — no manual join required. The columns available in the child table are determined by the query’s .select():
videos table (source)
| video_path | duration | metadata |
|---|
| /v/a.mp4 | 120.0 | {fps: 30} |
| /v/b.mp4 | 60.0 | {fps: 24} |
clips table (derived, 1:N)
| video_path | metadata | clip_start | clip_end | clip_bytes |
|---|
| /v/a.mp4 | {fps: 30} | 0.0 | 10.0 | b”\x00\x1a…” |
| /v/a.mp4 | {fps: 30} | 10.0 | 20.0 | b”\x00\x2b…” |
| /v/a.mp4 | {fps: 30} | 20.0 | 30.0 | b”\x00\x3c…” |
| | | | |
| /v/b.mp4 | {fps: 24} | 0.0 | 10.0 | b”\x00\x4d…” |
| /v/b.mp4 | {fps: 24} | 10.0 | 20.0 | b”\x00\x5e…” |
The first three rows come from the /v/a.mp4 source row, the last two from /v/b.mp4. Inherited columns (video_path, metadata) are carried over automatically; clip_start, clip_end, and clip_bytes are generated by the UDTF.
Adding Computed Columns After Creation
Since scalar UDTF views are materialized views, you can add UDF-computed columns to the child table and backfill them:
@udf(data_type=pa.list_(pa.float32(), 512))
def clip_embedding(clip_bytes: bytes) -> list[float]:
return embed_model.encode(clip_bytes)
# Add an embedding column to the clips table
clips.add_columns({"embedding": clip_embedding})
# Backfill computes embeddings for all existing clips
clips.refresh()
This is a powerful pattern: expand source rows with a scalar UDTF, then enrich the expanded rows with standard UDFs.
Incremental Refresh
Scalar UDTFs support incremental refresh, just like standard materialized views:
- New source rows: The UDTF runs on new rows, inserting child rows.
- Deleted source rows: Child rows linked to the deleted parent are cascade-deleted.
- Updated source rows: Old children are deleted, UDTF re-runs, new children inserted.
# Add new videos to the source table
videos.add(new_video_data)
# Incremental refresh — only processes the new videos
clips.refresh()
Only the new source rows are processed. Existing clips from previous refreshes are untouched.
Chaining UDTF Views
Scalar UDTF views are standard materialized views, so they can serve as the source for further views:
# videos → clips (1:N)
clips = db.create_materialized_view(
"clips", query=videos.search(None), udtf=extract_clips
)
# clips → frames (1:N)
frames = db.create_materialized_view(
"frames", query=clips.search(None), udtf=extract_frames
)
Full Example: Document Chunking
from geneva import connect, scalar_udtf, udf
from typing import Iterator, NamedTuple
import pyarrow as pa
class Chunk(NamedTuple):
chunk_index: int
chunk_text: str
@scalar_udtf
def chunk_document(text: str) -> Iterator[Chunk]:
"""Split a document into overlapping chunks."""
words = text.split()
chunk_size = 500
overlap = 50
for i, start in enumerate(range(0, len(words), chunk_size - overlap)):
chunk_words = words[start:start + chunk_size]
yield Chunk(chunk_index=i, chunk_text=" ".join(chunk_words))
db = connect("/data/mydb")
docs = db.open_table("documents")
# Create chunked view — inherits doc_id, title, etc. from source
chunks = db.create_materialized_view(
"doc_chunks",
query=docs.search(None).select(["doc_id", "title", "text"]),
udtf=chunk_document,
)
chunks.refresh()
# Add embeddings to chunks for semantic search
@udf(data_type=pa.list_(pa.float32(), 1536))
def embed_text(chunk_text: str) -> list[float]:
return embedding_model.encode(chunk_text)
chunks.add_columns({"embedding": embed_text})
chunks.refresh() # Backfills embeddings on all existing chunks
# Query — parent columns available alongside chunk columns
chunks.search(None).select(["doc_id", "title", "chunk_text", "embedding"]).to_pandas()
For a comparison of all three function types (UDFs, Scalar UDTFs, Batch UDTFs), see Understanding Transforms.
Reference: