Skip to content

User-Defined Functions#

User-Defined Functions (UDFs) are a mechanism to run Python code on the data that lives in a DataFrame. A UDF can be used just like Expressions, allowing users to express computation that should be executed by Daft lazily.

To write a UDF, you should use the @udf decorator, which can decorate either a Python function or a Python class, producing a UDF.

Learn more about UDFs in Daft User Guide.

Creating UDFs#

udf #

udf(
    *,
    return_dtype: DataTypeLike,
    num_cpus: float | None = None,
    num_gpus: float | None = None,
    memory_bytes: int | None = None,
    batch_size: int | None = None,
    concurrency: int | None = None,
) -> Callable[[UserDefinedPyFuncLike], UDF]

@udf Decorator to convert a Python function/class into a UDF.

UDFs allow users to run arbitrary Python code on the outputs of Expressions.

Parameters:

Name Type Description Default
return_dtype DataType

Returned type of the UDF

required
num_cpus float | None

Number of CPUs to allocate each running instance of your UDF. Note that this is purely used for placement (e.g. if your machine has 8 CPUs and you specify num_cpus=4, then Daft can run at most 2 instances of your UDF at a time). The default None indicates that Daft is free to allocate as many instances of the UDF as it wants to.

None
num_gpus float | None

Number of GPUs to allocate each running instance of your UDF. This is used for placement and also for allocating the appropriate GPU to each UDF using CUDA_VISIBLE_DEVICES.

None
memory_bytes int | None

Amount of memory to allocate each running instance of your UDF in bytes. If your UDF is experiencing out-of-memory errors, this parameter can help hint Daft that each UDF requires a certain amount of heap memory for execution.

None
batch_size int | None

Enables batching of the input into batches of at most this size. Results between batches are concatenated.

None
concurrency int | None

Spin up N number of persistent replicas of the UDF to process all partitions. Defaults to None which will spin up one UDF per partition. This is especially useful for expensive initializations that need to be amortized across partitions such as loading model weights for model batch inference.

None

Returns:

Type Description
Callable[[UserDefinedPyFuncLike], UDF]

Callable[[UserDefinedPyFuncLike], UDF]: UDF decorator - converts a user-provided Python function as a UDF that can be called on Expressions

Note

In most cases, UDFs will be slower than a native kernel/expression because of the required Rust and Python overheads. If your computation can be expressed using Daft expressions, you should do so instead of writing a UDF. If your UDF expresses a common use-case that isn't already covered by Daft, you should file a ticket or contribute this functionality back to Daft as a kernel!

Examples:

In the example below, we create a UDF that:

  1. Receives data under the argument name x
  2. Iterates over the x Daft Series
  3. Adds a Python constant value c to every element in x
  4. Returns a new list of Python values which will be coerced to the specified return type: return_dtype=DataType.int64().
  5. We can call our UDF on a dataframe using any of the dataframe projection operations (df.with_column(), df.select(), etc.)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
>>> import daft
>>> @daft.udf(return_dtype=daft.DataType.int64())
... def add_constant(x: daft.Series, c=10):
...     return [v + c for v in x]
>>>
>>> df = daft.from_pydict({"x": [1, 2, 3]})
>>> df = df.with_column("new_x", add_constant(df["x"], c=20))
>>> df.show()
╭───────┬───────╮
│ x     ┆ new_x │
│ ---   ┆ ---   │
│ Int64 ┆ Int64 │
╞═══════╪═══════╡
│ 1     ┆ 21    │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 2     ┆ 22    │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 3     ┆ 23    │
╰───────┴───────╯

(Showing first 3 of 3 rows)

Resource Requests:

You can also hint Daft about the resources that your UDF will require to run. For example, the following UDF requires 2 CPUs to run. On a machine/cluster with 8 CPUs, Daft will be able to run up to 4 instances of this UDF at once!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
>>> import daft
>>> @daft.udf(return_dtype=daft.DataType.int64(), num_cpus=2)
... def udf_needs_2_cpus(x: daft.Series):
...     return x
>>>
>>> df = daft.from_pydict({"x": [1, 2, 3]})
>>> df = df.with_column("new_x", udf_needs_2_cpus(df["x"]))
>>> df.show()
╭───────┬───────╮
│ x     ┆ new_x │
│ ---   ┆ ---   │
│ Int64 ┆ Int64 │
╞═══════╪═══════╡
│ 1     ┆ 1     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 2     ┆ 2     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 3     ┆ 3     │
╰───────┴───────╯

(Showing first 3 of 3 rows)

Your UDF's resources can also be overridden before you call it like so:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
>>> import daft
>>> @daft.udf(return_dtype=daft.DataType.int64(), num_cpus=4)
... def udf_needs_4_cpus(x: daft.Series):
...     return x
>>>
>>> # Override the num_cpus to 2 instead
>>> udf_needs_2_cpus = udf_needs_4_cpus.override_options(num_cpus=2)
>>>
>>> df = daft.from_pydict({"x": [1, 2, 3]})
>>> df = df.with_column("new_x", udf_needs_2_cpus(df["x"]))
>>> df.show()
╭───────┬───────╮
│ x     ┆ new_x │
│ ---   ┆ ---   │
│ Int64 ┆ Int64 │
╞═══════╪═══════╡
│ 1     ┆ 1     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 2     ┆ 2     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 3     ┆ 3     │
╰───────┴───────╯

(Showing first 3 of 3 rows)
Source code in daft/udf.py
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
def udf(
    *,
    return_dtype: DataTypeLike,
    num_cpus: float | None = None,
    num_gpus: float | None = None,
    memory_bytes: int | None = None,
    batch_size: int | None = None,
    concurrency: int | None = None,
) -> Callable[[UserDefinedPyFuncLike], UDF]:
    """`@udf` Decorator to convert a Python function/class into a `UDF`.

    UDFs allow users to run arbitrary Python code on the outputs of Expressions.

    Args:
        return_dtype (DataType): Returned type of the UDF
        num_cpus: Number of CPUs to allocate each running instance of your UDF. Note that this is purely used for placement (e.g. if your
            machine has 8 CPUs and you specify num_cpus=4, then Daft can run at most 2 instances of your UDF at a time). The default `None`
            indicates that Daft is free to allocate as many instances of the UDF as it wants to.
        num_gpus: Number of GPUs to allocate each running instance of your UDF. This is used for placement and also for allocating
            the appropriate GPU to each UDF using `CUDA_VISIBLE_DEVICES`.
        memory_bytes: Amount of memory to allocate each running instance of your UDF in bytes. If your UDF is experiencing out-of-memory errors,
            this parameter can help hint Daft that each UDF requires a certain amount of heap memory for execution.
        batch_size: Enables batching of the input into batches of at most this size. Results between batches are concatenated.
        concurrency: Spin up `N` number of persistent replicas of the UDF to process all partitions. Defaults to `None` which will spin up one
            UDF per partition. This is especially useful for expensive initializations that need to be amortized across partitions such as
            loading model weights for model batch inference.

    Returns:
        Callable[[UserDefinedPyFuncLike], UDF]: UDF decorator - converts a user-provided Python function as a UDF that can be called on Expressions

    Note:
        In most cases, UDFs will be slower than a native kernel/expression because of the required Rust and Python overheads. If
        your computation can be expressed using Daft expressions, you should do so instead of writing a UDF. If your UDF expresses a
        common use-case that isn't already covered by Daft, you should file a ticket or contribute this functionality back to Daft
        as a kernel!

    Examples:
        In the example below, we create a UDF that:

        1. Receives data under the argument name ``x``
        2. Iterates over the ``x`` Daft Series
        3. Adds a Python constant value ``c`` to every element in ``x``
        4. Returns a new list of Python values which will be coerced to the specified return type: ``return_dtype=DataType.int64()``.
        5. We can call our UDF on a dataframe using any of the dataframe projection operations ([df.with_column()](https://docs.getdaft.io/en/latest/api/dataframe/#daft.DataFrame.with_column),
        [df.select()](https://docs.getdaft.io/en/latest/api/dataframe/#daft.DataFrame.select), etc.)

        >>> import daft
        >>> @daft.udf(return_dtype=daft.DataType.int64())
        ... def add_constant(x: daft.Series, c=10):
        ...     return [v + c for v in x]
        >>>
        >>> df = daft.from_pydict({"x": [1, 2, 3]})
        >>> df = df.with_column("new_x", add_constant(df["x"], c=20))
        >>> df.show()
        ╭───────┬───────╮
        │ x     ┆ new_x │
        │ ---   ┆ ---   │
        │ Int64 ┆ Int64 │
        ╞═══════╪═══════╡
        │ 1     ┆ 21    │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
        │ 2     ┆ 22    │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
        │ 3     ┆ 23    │
        ╰───────┴───────╯
        <BLANKLINE>
        (Showing first 3 of 3 rows)

        **Resource Requests:**

        You can also hint Daft about the resources that your UDF will require to run. For example, the following UDF requires 2 CPUs to run. On a
        machine/cluster with 8 CPUs, Daft will be able to run up to 4 instances of this UDF at once!

        >>> import daft
        >>> @daft.udf(return_dtype=daft.DataType.int64(), num_cpus=2)
        ... def udf_needs_2_cpus(x: daft.Series):
        ...     return x
        >>>
        >>> df = daft.from_pydict({"x": [1, 2, 3]})
        >>> df = df.with_column("new_x", udf_needs_2_cpus(df["x"]))
        >>> df.show()
        ╭───────┬───────╮
        │ x     ┆ new_x │
        │ ---   ┆ ---   │
        │ Int64 ┆ Int64 │
        ╞═══════╪═══════╡
        │ 1     ┆ 1     │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
        │ 2     ┆ 2     │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
        │ 3     ┆ 3     │
        ╰───────┴───────╯
        <BLANKLINE>
        (Showing first 3 of 3 rows)

        Your UDF's resources can also be overridden before you call it like so:

        >>> import daft
        >>> @daft.udf(return_dtype=daft.DataType.int64(), num_cpus=4)
        ... def udf_needs_4_cpus(x: daft.Series):
        ...     return x
        >>>
        >>> # Override the num_cpus to 2 instead
        >>> udf_needs_2_cpus = udf_needs_4_cpus.override_options(num_cpus=2)
        >>>
        >>> df = daft.from_pydict({"x": [1, 2, 3]})
        >>> df = df.with_column("new_x", udf_needs_2_cpus(df["x"]))
        >>> df.show()
        ╭───────┬───────╮
        │ x     ┆ new_x │
        │ ---   ┆ ---   │
        │ Int64 ┆ Int64 │
        ╞═══════╪═══════╡
        │ 1     ┆ 1     │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
        │ 2     ┆ 2     │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
        │ 3     ┆ 3     │
        ╰───────┴───────╯
        <BLANKLINE>
        (Showing first 3 of 3 rows)

    """
    inferred_return_dtype = DataType._infer_type(return_dtype)

    def _udf(f: UserDefinedPyFuncLike) -> UDF:
        # Grab a name for the UDF. It **should** be unique.
        module_name = getattr(f, "__module__", "")
        qual_name = getattr(f, "__qualname__")

        if module_name:
            name = f"{module_name}.{qual_name}"
        else:
            name = qual_name

        resource_request = (
            None
            if num_cpus is None and num_gpus is None and memory_bytes is None
            else ResourceRequest(
                num_cpus=num_cpus,
                num_gpus=num_gpus,
                memory_bytes=memory_bytes,
            )
        )
        udf = UDF(
            inner=f,
            name=name,
            return_dtype=inferred_return_dtype,
            resource_request=resource_request,
            batch_size=batch_size,
            concurrency=concurrency,
        )

        daft.attach_function(udf)
        return udf

    return _udf

Using UDFs#

UDF #

UDF(
    inner: UserDefinedPyFuncLike,
    name: str,
    return_dtype: DataType,
    init_args: InitArgsType = None,
    concurrency: int | None = None,
    resource_request: ResourceRequest | None = None,
    batch_size: int | None = None,
)

A class produced by applying the @daft.udf decorator over a Python function or class.

Calling this class produces a daft.Expression that can be used in a DataFrame function.

Examples:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
>>> import daft
>>> @daft.udf(return_dtype=daft.DataType.float64())
... def multiply_and_add(x: daft.Series, y: float, z: float):
...     return x.to_arrow().to_numpy() * y + z
>>>
>>> df = daft.from_pydict({"x": [1, 2, 3]})
>>> df = df.with_column("result", multiply_and_add(df["x"], 2.0, z=1.5))
>>> df.show()
╭───────┬─────────╮
│ x     ┆ result  │
│ ---   ┆ ---     │
│ Int64 ┆ Float64 │
╞═══════╪═════════╡
│ 1     ┆ 3.5     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 2     ┆ 5.5     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 3     ┆ 7.5     │
╰───────┴─────────╯

(Showing first 3 of 3 rows)

Methods:

Name Description
__call__
override_options

Replace the resource requests for running each instance of your UDF.

with_concurrency

Override the concurrency of this UDF, which tells Daft how many instances of your UDF to run concurrently.

with_init_args

Replace initialization arguments for a class UDF when calling __init__ at runtime on each instance of the UDF.

Attributes:

Name Type Description
batch_size int | None
concurrency int | None
init_args InitArgsType
inner UserDefinedPyFuncLike
name str
resource_request ResourceRequest | None
return_dtype DataType

batch_size #

batch_size: int | None = None

concurrency #

concurrency: int | None = None

init_args #

init_args: InitArgsType = None

inner #

inner: UserDefinedPyFuncLike

name #

name: str

resource_request #

resource_request: ResourceRequest | None = None

return_dtype #

return_dtype: DataType

__call__ #

__call__(*args: Any, **kwargs: Any) -> Expression
Source code in daft/udf.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def __call__(self, *args: Any, **kwargs: Any) -> Expression:
    self._validate_init_args()

    bound_args = self._bind_args(*args, **kwargs)
    expressions = list(bound_args.expressions().values())

    return Expression.udf(
        name=self.name,
        inner=self.wrapped_inner,
        bound_args=bound_args,
        expressions=expressions,
        return_dtype=self.return_dtype,
        init_args=self.init_args,
        resource_request=self.resource_request,
        batch_size=self.batch_size,
        concurrency=self.concurrency,
    )

override_options #

override_options(
    *,
    num_cpus: float | None = _UnsetMarker,
    num_gpus: float | None = _UnsetMarker,
    memory_bytes: int | None = _UnsetMarker,
    batch_size: int | None = _UnsetMarker,
) -> UDF

Replace the resource requests for running each instance of your UDF.

Parameters:

Name Type Description Default
num_cpus float | None

Number of CPUs to allocate each running instance of your UDF. Note that this is purely used for placement (e.g. if your machine has 8 CPUs and you specify num_cpus=4, then Daft can run at most 2 instances of your UDF at a time).

_UnsetMarker
num_gpus float | None

Number of GPUs to allocate each running instance of your UDF. This is used for placement and also for allocating the appropriate GPU to each UDF using CUDA_VISIBLE_DEVICES.

_UnsetMarker
memory_bytes int | None

Amount of memory to allocate each running instance of your UDF in bytes. If your UDF is experiencing out-of-memory errors, this parameter can help hint Daft that each UDF requires a certain amount of heap memory for execution.

_UnsetMarker
batch_size int | None

Enables batching of the input into batches of at most this size. Results between batches are concatenated.

_UnsetMarker

Examples:

For instance, if your UDF requires 4 CPUs to run, you can configure it like so:

1
2
3
4
5
6
7
8
9
>>> import daft
>>>
>>> @daft.udf(return_dtype=daft.DataType.string())
... def example_udf(inputs):
...     # You will have access to 4 CPUs here if you configure your UDF correctly!
...     return inputs
>>>
>>> # Parametrize the UDF to run with 4 CPUs
>>> example_udf_4CPU = example_udf.override_options(num_cpus=4)
Source code in daft/udf.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
def override_options(
    self,
    *,
    num_cpus: float | None = _UnsetMarker,
    num_gpus: float | None = _UnsetMarker,
    memory_bytes: int | None = _UnsetMarker,
    batch_size: int | None = _UnsetMarker,
) -> UDF:
    """Replace the resource requests for running each instance of your UDF.

    Args:
        num_cpus: Number of CPUs to allocate each running instance of your UDF. Note that this is purely used for placement (e.g. if your
            machine has 8 CPUs and you specify num_cpus=4, then Daft can run at most 2 instances of your UDF at a time).
        num_gpus: Number of GPUs to allocate each running instance of your UDF. This is used for placement and also for allocating
            the appropriate GPU to each UDF using `CUDA_VISIBLE_DEVICES`.
        memory_bytes: Amount of memory to allocate each running instance of your UDF in bytes. If your UDF is experiencing out-of-memory errors,
            this parameter can help hint Daft that each UDF requires a certain amount of heap memory for execution.
        batch_size: Enables batching of the input into batches of at most this size. Results between batches are concatenated.

    Examples:
        For instance, if your UDF requires 4 CPUs to run, you can configure it like so:

        >>> import daft
        >>>
        >>> @daft.udf(return_dtype=daft.DataType.string())
        ... def example_udf(inputs):
        ...     # You will have access to 4 CPUs here if you configure your UDF correctly!
        ...     return inputs
        >>>
        >>> # Parametrize the UDF to run with 4 CPUs
        >>> example_udf_4CPU = example_udf.override_options(num_cpus=4)

    """
    new_resource_request = ResourceRequest() if self.resource_request is None else self.resource_request
    if num_cpus is not _UnsetMarker:
        new_resource_request = new_resource_request.with_num_cpus(num_cpus)
    if num_gpus is not _UnsetMarker:
        new_resource_request = new_resource_request.with_num_gpus(num_gpus)
    if memory_bytes is not _UnsetMarker:
        new_resource_request = new_resource_request.with_memory_bytes(memory_bytes)

    new_batch_size = self.batch_size if batch_size is _UnsetMarker else batch_size

    return dataclasses.replace(self, resource_request=new_resource_request, batch_size=new_batch_size)

with_concurrency #

with_concurrency(concurrency: int) -> UDF

Override the concurrency of this UDF, which tells Daft how many instances of your UDF to run concurrently.

Examples:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
>>> import daft
>>>
>>> @daft.udf(return_dtype=daft.DataType.string(), num_gpus=1)
... class MyGpuUdf:
...     def __init__(self, text=" world"):
...         self.text = text
...
...     def __call__(self, data):
...         return [x + self.text for x in data]
>>>
>>> # New UDF that will have 8 concurrent running instances (will require 8 total GPUs)
>>> MyGpuUdf_8_concurrency = MyGpuUdf.with_concurrency(8)
Source code in daft/udf.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
def with_concurrency(self, concurrency: int) -> UDF:
    """Override the concurrency of this UDF, which tells Daft how many instances of your UDF to run concurrently.

    Examples:
        >>> import daft
        >>>
        >>> @daft.udf(return_dtype=daft.DataType.string(), num_gpus=1)
        ... class MyGpuUdf:
        ...     def __init__(self, text=" world"):
        ...         self.text = text
        ...
        ...     def __call__(self, data):
        ...         return [x + self.text for x in data]
        >>>
        >>> # New UDF that will have 8 concurrent running instances (will require 8 total GPUs)
        >>> MyGpuUdf_8_concurrency = MyGpuUdf.with_concurrency(8)
    """
    return dataclasses.replace(self, concurrency=concurrency)

with_init_args #

with_init_args(*args: Any, **kwargs: Any) -> UDF

Replace initialization arguments for a class UDF when calling __init__ at runtime on each instance of the UDF.

Examples:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
>>> import daft
>>>
>>> @daft.udf(return_dtype=daft.DataType.string())
... class MyUdfWithInit:
...     def __init__(self, text=" world"):
...         self.text = text
...
...     def __call__(self, data):
...         return [x + self.text for x in data]
>>>
>>> # Create a customized version of MyUdfWithInit by overriding the init args
>>> MyUdfWithInit_CustomInitArgs = MyUdfWithInit.with_init_args(text=" my old friend")
>>>
>>> df = daft.from_pydict({"foo": ["hello", "hello", "hello"]})
>>> df = df.with_column("bar_world", MyUdfWithInit(df["foo"]))
>>> df = df.with_column("bar_custom", MyUdfWithInit_CustomInitArgs(df["foo"]))
>>> df.show()
╭───────┬─────────────┬─────────────────────╮
│ foo   ┆ bar_world   ┆ bar_custom          │
│ ---   ┆ ---         ┆ ---                 │
│ Utf8  ┆ Utf8        ┆ Utf8                │
╞═══════╪═════════════╪═════════════════════╡
│ hello ┆ hello world ┆ hello my old friend │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ hello ┆ hello world ┆ hello my old friend │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ hello ┆ hello world ┆ hello my old friend │
╰───────┴─────────────┴─────────────────────╯

(Showing first 3 of 3 rows)
Source code in daft/udf.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
def with_init_args(self, *args: Any, **kwargs: Any) -> UDF:
    """Replace initialization arguments for a class UDF when calling `__init__` at runtime on each instance of the UDF.

    Examples:
        >>> import daft
        >>>
        >>> @daft.udf(return_dtype=daft.DataType.string())
        ... class MyUdfWithInit:
        ...     def __init__(self, text=" world"):
        ...         self.text = text
        ...
        ...     def __call__(self, data):
        ...         return [x + self.text for x in data]
        >>>
        >>> # Create a customized version of MyUdfWithInit by overriding the init args
        >>> MyUdfWithInit_CustomInitArgs = MyUdfWithInit.with_init_args(text=" my old friend")
        >>>
        >>> df = daft.from_pydict({"foo": ["hello", "hello", "hello"]})
        >>> df = df.with_column("bar_world", MyUdfWithInit(df["foo"]))
        >>> df = df.with_column("bar_custom", MyUdfWithInit_CustomInitArgs(df["foo"]))
        >>> df.show()
        ╭───────┬─────────────┬─────────────────────╮
        │ foo   ┆ bar_world   ┆ bar_custom          │
        │ ---   ┆ ---         ┆ ---                 │
        │ Utf8  ┆ Utf8        ┆ Utf8                │
        ╞═══════╪═════════════╪═════════════════════╡
        │ hello ┆ hello world ┆ hello my old friend │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
        │ hello ┆ hello world ┆ hello my old friend │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
        │ hello ┆ hello world ┆ hello my old friend │
        ╰───────┴─────────────┴─────────────────────╯
        <BLANKLINE>
        (Showing first 3 of 3 rows)
    """
    if not isinstance(self.inner, type):
        raise ValueError("Function UDFs cannot have init args.")

    init_sig = inspect.signature(self.inner.__init__)  # type: ignore
    init_sig.bind(
        # Placeholder for `self`
        None,
        *args,
        **kwargs,
    )
    return dataclasses.replace(self, init_args=(args, kwargs))