Skip to content

Sessions#

Sessions enable you to attach catalogs, tables, and create temporary objects which are accessible through both the Python and SQL APIs. Sessions hold configuration state such as current_catalog and current_namespace which are used in name resolution and can simplify your workflows. Learn more about Sessions in Daft User Guide.

Session #

Session()

Session holds a connection's state and orchestrates execution of DataFrame and SQL queries against catalogs.

Examples:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
>>> import daft
>>> from daft.session import Session
>>>
>>> sess = Session()
>>>
>>> # Create a temporary table from a DataFrame
>>> sess.create_temp_table("T", daft.from_pydict({"x": [1, 2, 3]}))
>>>
>>> # Read the table as a DataFrame
>>> df = sess.read_table("T")
>>>
>>> # Execute an SQL query
>>> sess.sql("SELECT * FROM T").show()
>>>
>>> # You can also retrieve the current session without creating a new one:
>>> from daft.session import current_session
>>> sess = current_session()

Methods:

Name Description
attach

Attaches a known attachable object like a Catalog, Table or UDF.

attach_catalog

Attaches an external catalog to this session.

attach_function

Attaches a Python function as a UDF in the current session.

attach_table

Attaches an external table instance to this session.

create_namespace

Creates a namespace in the current catalog.

create_namespace_if_not_exists

Creates a namespace in the current catalog if it does not already exist.

create_table

Creates a table in the current catalog.

create_table_if_not_exists

Creates a table in the current catalog if it does not already exist.

create_temp_table

Creates a temp table scoped to this session's lifetime.

current_catalog

Get the session's current catalog or None.

current_namespace

Get the session's current namespace or None.

detach_catalog

Detaches the catalog from this session or raises if the catalog does not exist.

detach_function

Detaches a Python function as a UDF in the current session.

detach_table

Detaches the table from this session or raises if the table does not exist.

drop_namespace

Drop the given namespace in the current catalog.

drop_table

Drop the given table in the current catalog.

get_catalog

Returns the catalog or raises an exception if it does not exist.

get_table

Returns the table or raises an exception if it does not exist.

has_catalog

Returns true if a catalog with the given identifier exists.

has_namespace

Returns true if a namespace with the given identifier exists.

has_table

Returns true if a table with the given identifier exists.

list_catalogs

Returns a list of available catalogs matching the pattern.

list_namespaces

Returns a list of matching namespaces in the current catalog.

list_tables

Returns a list of available tables.

read_table

Returns the table as a DataFrame or raises an exception if it does not exist.

set_catalog

Set the given catalog as current_catalog or raises an err if it does not exist.

set_namespace

Set the given namespace as current_namespace for table resolution.

sql

Executes the SQL statement using this session.

use

Use sets the current catalog and namespace.

write_table

Writes the DataFrame to the table specified by the identifier.

Source code in daft/session.py
74
75
def __init__(self) -> None:
    self._session = PySession.empty()

attach #

attach(
    object: Catalog | Table | UDF, alias: str | None = None
) -> None

Attaches a known attachable object like a Catalog, Table or UDF.

Parameters:

Name Type Description Default
object Catalog | Table | UDF

object which is attachable to a session

required

Returns:

Type Description
None

None

Source code in daft/session.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def attach(self, object: Catalog | Table | UDF, alias: str | None = None) -> None:
    """Attaches a known attachable object like a Catalog, Table or UDF.

    Args:
        object (Catalog|Table|UDF): object which is attachable to a session

    Returns:
        None
    """
    if isinstance(object, Catalog):
        self.attach_catalog(object, alias)
    elif isinstance(object, Table):
        self.attach_table(object, alias)
    elif isinstance(object, UDF):
        self.attach_function(object, alias)
    else:
        raise ValueError(f"Cannot attach object with type {type(object)}")

attach_catalog #

attach_catalog(
    catalog: Catalog | object, alias: str | None = None
) -> Catalog

Attaches an external catalog to this session.

Parameters:

Name Type Description Default
catalog object

catalog instance or supported catalog object

required
alias str | None

optional alias for name resolution

None

Returns:

Name Type Description
Catalog Catalog

new daft catalog instance

Source code in daft/session.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def attach_catalog(self, catalog: Catalog | object, alias: str | None = None) -> Catalog:
    """Attaches an external catalog to this session.

    Args:
        catalog (object): catalog instance or supported catalog object
        alias (str|None): optional alias for name resolution

    Returns:
        Catalog: new daft catalog instance
    """
    c = catalog if isinstance(catalog, Catalog) else Catalog._from_obj(catalog)
    a = alias if alias else c.name
    self._session.attach_catalog(c, a)
    return c

attach_function #

attach_function(
    function: UDF, alias: str | None = None
) -> None

Attaches a Python function as a UDF in the current session.

Source code in daft/session.py
503
504
505
def attach_function(self, function: UDF, alias: str | None = None) -> None:
    """Attaches a Python function as a UDF in the current session."""
    self._session.attach_function(function, alias)

attach_table #

attach_table(
    table: Table | object, alias: str | None = None
) -> Table

Attaches an external table instance to this session.

Parameters:

Name Type Description Default
table Table | object

table instance or supported table object

required
alias str | None

optional alias for name resolution

None

Returns:

Name Type Description
Table Table

new daft table instance

Source code in daft/session.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def attach_table(self, table: Table | object, alias: str | None = None) -> Table:
    """Attaches an external table instance to this session.

    Args:
        table (Table | object): table instance or supported table object
        alias (str | None): optional alias for name resolution

    Returns:
        Table: new daft table instance
    """
    t = table if isinstance(table, Table) else Table._from_obj(table)
    a = alias if alias else t.name
    self._session.attach_table(t, a)
    return t

create_namespace #

create_namespace(identifier: Identifier | str) -> None

Creates a namespace in the current catalog.

Source code in daft/session.py
189
190
191
192
193
def create_namespace(self, identifier: Identifier | str) -> None:
    """Creates a namespace in the current catalog."""
    if not (catalog := self.current_catalog()):
        raise ValueError("Cannot create a namespace without a current catalog")
    return catalog.create_namespace(identifier)

create_namespace_if_not_exists #

create_namespace_if_not_exists(
    identifier: Identifier | str,
) -> None

Creates a namespace in the current catalog if it does not already exist.

Source code in daft/session.py
195
196
197
198
199
def create_namespace_if_not_exists(self, identifier: Identifier | str) -> None:
    """Creates a namespace in the current catalog if it does not already exist."""
    if not (catalog := self.current_catalog()):
        raise ValueError("Cannot create a namespace without a current catalog")
    return catalog.create_namespace_if_not_exists(identifier)

create_table #

create_table(
    identifier: Identifier | str,
    source: Schema | DataFrame,
    **properties: Any,
) -> Table

Creates a table in the current catalog.

If no namespace is specified, the current namespace is used.

Returns:

Name Type Description
Table Table

the newly created table instance.

Source code in daft/session.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def create_table(self, identifier: Identifier | str, source: Schema | DataFrame, **properties: Any) -> Table:
    """Creates a table in the current catalog.

    If no namespace is specified, the current namespace is used.

    Returns:
        Table: the newly created table instance.
    """
    if not (catalog := self.current_catalog()):
        # TODO relax this constraint by joining with the catalog name
        raise ValueError("Cannot create a table without a current catalog")

    if isinstance(identifier, str):
        identifier = Identifier.from_str(identifier)

    if len(identifier) == 1:
        if ns := self.current_namespace():
            identifier = ns + identifier

    return catalog.create_table(identifier, source, properties)

create_table_if_not_exists #

create_table_if_not_exists(
    identifier: Identifier | str,
    source: Schema | DataFrame,
    **properties: Any,
) -> Table

Creates a table in the current catalog if it does not already exist.

If no namespace is specified, the current namespace is used.

Returns:

Name Type Description
Table Table

the newly created instance, or the existing table instance.

Source code in daft/session.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def create_table_if_not_exists(
    self,
    identifier: Identifier | str,
    source: Schema | DataFrame,
    **properties: Any,
) -> Table:
    """Creates a table in the current catalog if it does not already exist.

    If no namespace is specified, the current namespace is used.

    Returns:
        Table: the newly created instance, or the existing table instance.
    """
    if not (catalog := self.current_catalog()):
        # TODO relax this constraint by joining with the catalog name
        raise ValueError("Cannot create a table without a current catalog")

    if isinstance(identifier, str):
        identifier = Identifier.from_str(identifier)

    if len(identifier) == 1:
        if ns := self.current_namespace():
            identifier = ns + identifier

    return catalog.create_table_if_not_exists(identifier, source, properties)

create_temp_table #

create_temp_table(
    identifier: str, source: Schema | DataFrame
) -> Table

Creates a temp table scoped to this session's lifetime.

Parameters:

Name Type Description Default
identifier str

table identifier (name)

required
source TableSource | object

table source like a schema or dataframe

required

Returns:

Name Type Description
Table Table

new table instance

Examples:

1
2
3
4
5
6
7
>>> import daft
>>> from daft.session import Session
>>> sess = Session()
>>> sess.create_temp_table("T", daft.from_pydict({"x": [1, 2, 3]}))
>>> sess.create_temp_table("S", daft.from_pydict({"y": [4, 5, 6]}))
>>> sess.list_tables()
[Identifier(''T''), Identifier(''S'')]

Parameters:

Name Type Description Default
identifier str

table identifier (name)

required
source Schema | DataFrame

table source is either a Schema or Dataframe

required

Returns:

Name Type Description
Table Table

new table instance

Source code in daft/session.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def create_temp_table(self, identifier: str, source: Schema | DataFrame) -> Table:
    """Creates a temp table scoped to this session's lifetime.

    Args:
        identifier (str): table identifier (name)
        source (TableSource|object): table source like a schema or dataframe

    Returns:
        Table: new table instance

    Examples:
        >>> import daft
        >>> from daft.session import Session
        >>> sess = Session()
        >>> sess.create_temp_table("T", daft.from_pydict({"x": [1, 2, 3]}))
        >>> sess.create_temp_table("S", daft.from_pydict({"y": [4, 5, 6]}))
        >>> sess.list_tables()
        [Identifier(''T''), Identifier(''S'')]

    Args:
        identifier (str): table identifier (name)
        source (Schema | DataFrame): table source is either a Schema or Dataframe

    Returns:
        Table: new table instance
    """
    if isinstance(source, Schema):
        py_source = PyTableSource.from_pyschema(source._schema)
    elif isinstance(source, DataFrame):
        py_source = PyTableSource.from_pybuilder(source._builder._builder)
    else:
        raise ValueError(
            f"Unsupported create_temp_table source, {type(source)}, expected either Schema or DataFrame."
        )
    return self._session.create_temp_table(identifier, py_source, replace=True)

current_catalog #

current_catalog() -> Catalog | None

Get the session's current catalog or None.

Returns:

Name Type Description
Catalog Catalog | None

current catalog or None if one is not set

Source code in daft/session.py
327
328
329
330
331
332
333
def current_catalog(self) -> Catalog | None:
    """Get the session's current catalog or None.

    Returns:
        Catalog: current catalog or None if one is not set
    """
    return self._session.current_catalog()

current_namespace #

current_namespace() -> Identifier | None

Get the session's current namespace or None.

Returns:

Name Type Description
Identifier Identifier | None

current namespace or none if one is not set

Source code in daft/session.py
335
336
337
338
339
340
341
342
def current_namespace(self) -> Identifier | None:
    """Get the session's current namespace or None.

    Returns:
        Identifier: current namespace or none if one is not set
    """
    ident = self._session.current_namespace()
    return Identifier._from_pyidentifier(ident) if ident else None

detach_catalog #

detach_catalog(alias: str) -> None

Detaches the catalog from this session or raises if the catalog does not exist.

Parameters:

Name Type Description Default
alias str

catalog alias to detach

required
Source code in daft/session.py
169
170
171
172
173
174
175
def detach_catalog(self, alias: str) -> None:
    """Detaches the catalog from this session or raises if the catalog does not exist.

    Args:
        alias (str): catalog alias to detach
    """
    return self._session.detach_catalog(alias)

detach_function #

detach_function(alias: str) -> None

Detaches a Python function as a UDF in the current session.

Source code in daft/session.py
507
508
509
def detach_function(self, alias: str) -> None:
    """Detaches a Python function as a UDF in the current session."""
    self._session.detach_function(alias)

detach_table #

detach_table(alias: str) -> None

Detaches the table from this session or raises if the table does not exist.

Parameters:

Name Type Description Default
alias str

catalog alias to detach

required
Source code in daft/session.py
177
178
179
180
181
182
183
def detach_table(self, alias: str) -> None:
    """Detaches the table from this session or raises if the table does not exist.

    Args:
        alias (str): catalog alias to detach
    """
    return self._session.detach_table(alias)

drop_namespace #

drop_namespace(identifier: Identifier | str) -> None

Drop the given namespace in the current catalog.

Parameters:

Name Type Description Default
identifier Identifier | str

table identifier

required
Source code in daft/session.py
288
289
290
291
292
293
294
295
296
def drop_namespace(self, identifier: Identifier | str) -> None:
    """Drop the given namespace in the current catalog.

    Args:
        identifier (Identifier|str): table identifier
    """
    if not (catalog := self.current_catalog()):
        raise ValueError("Cannot drop a namespace without a current catalog")
    return catalog.drop_namespace(identifier)

drop_table #

drop_table(identifier: Identifier | str) -> None

Drop the given table in the current catalog.

Parameters:

Name Type Description Default
identifier Identifier | str

table identifier

required
Source code in daft/session.py
298
299
300
301
302
303
304
305
306
307
def drop_table(self, identifier: Identifier | str) -> None:
    """Drop the given table in the current catalog.

    Args:
        identifier (Identifier|str): table identifier
    """
    if not (catalog := self.current_catalog()):
        raise ValueError("Cannot drop a table without a current catalog")
    # TODO join the identifier with the current namespace
    return catalog.drop_table(identifier)

get_catalog #

get_catalog(identifier: str) -> Catalog

Returns the catalog or raises an exception if it does not exist.

Parameters:

Name Type Description Default
identifier str

catalog identifier (name)

required

Returns:

Name Type Description
Catalog Catalog

The catalog object.

Raises:

Type Description
ValueError

If the catalog does not exist.

Source code in daft/session.py
348
349
350
351
352
353
354
355
356
357
358
359
360
def get_catalog(self, identifier: str) -> Catalog:
    """Returns the catalog or raises an exception if it does not exist.

    Args:
        identifier (str): catalog identifier (name)

    Returns:
        Catalog: The catalog object.

    Raises:
        ValueError: If the catalog does not exist.
    """
    return self._session.get_catalog(identifier)

get_table #

get_table(identifier: Identifier | str) -> Table

Returns the table or raises an exception if it does not exist.

Parameters:

Name Type Description Default
identifier Identifier | str

table identifier or identifier string

required

Returns:

Name Type Description
Table Table

The table object.

Raises:

Type Description
ValueError

If the table does not exist.

Source code in daft/session.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
def get_table(self, identifier: Identifier | str) -> Table:
    """Returns the table or raises an exception if it does not exist.

    Args:
        identifier (Identifier|str): table identifier or identifier string

    Returns:
        Table: The table object.

    Raises:
        ValueError: If the table does not exist.
    """
    if isinstance(identifier, str):
        identifier = Identifier.from_str(identifier)
    return self._session.get_table(identifier._ident)

has_catalog #

has_catalog(identifier: str) -> bool

Returns true if a catalog with the given identifier exists.

Source code in daft/session.py
382
383
384
def has_catalog(self, identifier: str) -> bool:
    """Returns true if a catalog with the given identifier exists."""
    return self._session.has_catalog(identifier)

has_namespace #

has_namespace(identifier: Identifier | str) -> bool

Returns true if a namespace with the given identifier exists.

Source code in daft/session.py
386
387
388
389
390
def has_namespace(self, identifier: Identifier | str) -> bool:
    """Returns true if a namespace with the given identifier exists."""
    if not (catalog := self.current_catalog()):
        raise ValueError("Cannot call has_namespace without a current catalog")
    return catalog.has_namespace(identifier)

has_table #

has_table(identifier: Identifier | str) -> bool

Returns true if a table with the given identifier exists.

Source code in daft/session.py
392
393
394
395
396
def has_table(self, identifier: Identifier | str) -> bool:
    """Returns true if a table with the given identifier exists."""
    if isinstance(identifier, str):
        identifier = Identifier.from_str(identifier)
    return self._session.has_table(identifier._ident)

list_catalogs #

list_catalogs(pattern: str | None = None) -> list[str]

Returns a list of available catalogs matching the pattern.

This API currently returns a list of catalog names for backwards compatibility. In 0.5.0 this API will return a list of Catalog objects.

Parameters:

Name Type Description Default
pattern str

catalog name pattern

None

Returns:

Type Description
list[str]

list[str]: list of available catalog names

Source code in daft/session.py
402
403
404
405
406
407
408
409
410
411
412
413
414
def list_catalogs(self, pattern: str | None = None) -> list[str]:
    """Returns a list of available catalogs matching the pattern.

    This API currently returns a list of catalog names for backwards compatibility.
    In 0.5.0 this API will return a list of Catalog objects.

    Args:
        pattern (str): catalog name pattern

    Returns:
        list[str]: list of available catalog names
    """
    return self._session.list_catalogs(pattern)

list_namespaces #

list_namespaces(
    pattern: str | None = None,
) -> list[Identifier]

Returns a list of matching namespaces in the current catalog.

Source code in daft/session.py
416
417
418
419
420
def list_namespaces(self, pattern: str | None = None) -> list[Identifier]:
    """Returns a list of matching namespaces in the current catalog."""
    if not (catalog := self.current_catalog()):
        raise ValueError("Cannot list namespaces without a current catalog")
    return catalog.list_namespaces(pattern)

list_tables #

list_tables(pattern: str | None = None) -> list[Identifier]

Returns a list of available tables.

Parameters:

Name Type Description Default
pattern str

table name pattern

None

Returns:

Type Description
list[Identifier]

list[Identifier]: list of available tables

Source code in daft/session.py
422
423
424
425
426
427
428
429
430
431
def list_tables(self, pattern: str | None = None) -> list[Identifier]:
    """Returns a list of available tables.

    Args:
        pattern (str): table name pattern

    Returns:
        list[Identifier]: list of available tables
    """
    return [Identifier._from_pyidentifier(i) for i in self._session.list_tables(pattern)]

read_table #

read_table(
    identifier: Identifier | str, **options: Any
) -> DataFrame

Returns the table as a DataFrame or raises an exception if it does not exist.

Parameters:

Name Type Description Default
identifier Identifier | str

table identifier

required

Returns:

Name Type Description
DataFrame DataFrame

Raises:

Type Description
ValueError

If the tables does not exist.

Source code in daft/session.py
437
438
439
440
441
442
443
444
445
446
447
448
449
def read_table(self, identifier: Identifier | str, **options: Any) -> DataFrame:
    """Returns the table as a DataFrame or raises an exception if it does not exist.

    Args:
        identifier (Identifier|str): table identifier

    Returns:
        DataFrame:

    Raises:
        ValueError: If the tables does not exist.
    """
    return self.get_table(identifier).read(**options)

set_catalog #

set_catalog(identifier: str | None) -> None

Set the given catalog as current_catalog or raises an err if it does not exist.

Parameters:

Name Type Description Default
identifier str

sets the current catalog

required

Raises:

Type Description
ValueError

If the catalog does not exist.

Source code in daft/session.py
455
456
457
458
459
460
461
462
463
464
def set_catalog(self, identifier: str | None) -> None:
    """Set the given catalog as current_catalog or raises an err if it does not exist.

    Args:
        identifier (str): sets the current catalog

    Raises:
        ValueError: If the catalog does not exist.
    """
    self._session.set_catalog(identifier)

set_namespace #

set_namespace(identifier: Identifier | str | None) -> None

Set the given namespace as current_namespace for table resolution.

Parameters:

Name Type Description Default
identifier Identifier | str

namespace identifier

required
Source code in daft/session.py
466
467
468
469
470
471
472
473
474
def set_namespace(self, identifier: Identifier | str | None) -> None:
    """Set the given namespace as current_namespace for table resolution.

    Args:
        identifier (Identifier | str): namespace identifier
    """
    if isinstance(identifier, str):
        identifier = Identifier.from_str(identifier)
    self._session.set_namespace(identifier._ident if identifier else None)

sql #

sql(sql: str) -> DataFrame | None

Executes the SQL statement using this session.

Parameters:

Name Type Description Default
sql str

input SQL statement

required

Returns:

Name Type Description
DataFrame DataFrame | None

dataframe instance if this was a data statement (DQL, DDL, DML).

Source code in daft/session.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def sql(self, sql: str) -> DataFrame | None:
    """Executes the SQL statement using this session.

    Args:
        sql (str): input SQL statement

    Returns:
        DataFrame: dataframe instance if this was a data statement (DQL, DDL, DML).
    """
    py_sess = self._session
    py_config = get_context().daft_planning_config
    py_object = sql_exec(sql, py_sess, {}, py_config)
    if py_object is None:
        return None
    elif isinstance(py_object, PyBuilder):
        return DataFrame(LogicalPlanBuilder(py_object))
    else:
        raise ValueError(f"Unsupported return type from sql exec: {type(py_object)}")

use #

use(identifier: Identifier | str | None = None) -> None

Use sets the current catalog and namespace.

Source code in daft/session.py
313
314
315
316
317
318
319
320
321
322
323
324
325
def use(self, identifier: Identifier | str | None = None) -> None:
    """Use sets the current catalog and namespace."""
    if identifier is None:
        self.set_catalog(None)
        self.set_namespace(None)
        return
    if isinstance(identifier, str):
        identifier = Identifier.from_str(identifier)
    if len(identifier) == 1:
        self.set_catalog(str(identifier[0]))
    else:
        self.set_catalog(str(identifier[0]))
        self.set_namespace(identifier.drop(1))

write_table #

write_table(
    identifier: Identifier | str,
    df: DataFrame,
    mode: Literal["append", "overwrite"] = "append",
    **options: dict[str, Any],
) -> None

Writes the DataFrame to the table specified by the identifier.

Parameters:

Name Type Description Default
identifier Identifier | str

table identifier

required
df DataFrame

dataframe to write

required
mode 'append' | 'overwrite'

write mode, defaults to "append"

'append'
**options dict[str, Any]

additional, format-specific write options

{}
Source code in daft/session.py
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
def write_table(
    self,
    identifier: Identifier | str,
    df: DataFrame,
    mode: Literal["append", "overwrite"] = "append",
    **options: dict[str, Any],
) -> None:
    """Writes the DataFrame to the table specified by the identifier.

    Args:
        identifier (Identifier|str): table identifier
        df (DataFrame): dataframe to write
        mode ("append"|"overwrite"): write mode, defaults to "append"
        **options (dict[str,Any]): additional, format-specific write options
    """
    if isinstance(identifier, str):
        identifier = Identifier.from_str(identifier)

    self._session.get_table(identifier._ident).write(df, mode=mode, **options)