Configuration#
Configure the execution backend, Daft in various ways during execution, and how Daft interacts with storage.
Setting the Runner#
Control the execution backend that Daft will run on by calling these functions once at the start of your application.
set_runner_native #
set_runner_native(
num_threads: int | None = None,
) -> DaftContext
Configure Daft to execute dataframes using native multi-threaded processing.
This is the default execution mode for Daft.
Returns:
Name | Type | Description |
---|---|---|
DaftContext | DaftContext | Updated Daft execution context configured for native execution. |
Note
Can also be configured via environment variable: DAFT_RUNNER=native
Source code in daft/context.py
96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
|
set_runner_ray #
set_runner_ray(
address: str | None = None,
noop_if_initialized: bool = False,
max_task_backlog: int | None = None,
force_client_mode: bool = False,
) -> DaftContext
Configure Daft to execute dataframes using the Ray distributed computing framework.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
address | str | None | Ray cluster address to connect to. If None, connects to or starts a local Ray instance. | None |
noop_if_initialized | bool | If True, skip initialization if Ray is already running. | False |
max_task_backlog | int | None | Maximum number of tasks that can be queued. None means Daft will automatically determine a good default. | None |
force_client_mode | bool | If True, forces Ray to run in client mode. | False |
Returns:
Name | Type | Description |
---|---|---|
DaftContext | DaftContext | Updated Daft execution context configured for Ray. |
Note
Can also be configured via environment variable: DAFT_RUNNER=ray
Source code in daft/context.py
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
|
Setting Configurations#
Configure Daft in various ways during execution.
set_planning_config #
set_planning_config(
config: PyDaftPlanningConfig | None = None,
default_io_config: IOConfig | None = None,
) -> DaftContext
Globally sets various configuration parameters which control Daft plan construction behavior.
These configuration values are used when a Dataframe is being constructed (e.g. calls to create a Dataframe, or to build on an existing Dataframe).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config | PyDaftPlanningConfig | None | A PyDaftPlanningConfig object to set the config to, before applying other kwargs. Defaults to None which indicates that the old (current) config should be used. | None |
default_io_config | IOConfig | None | A default IOConfig to use in the absence of one being explicitly passed into any Expression (e.g. | None |
Source code in daft/context.py
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
|
planning_config_ctx #
planning_config_ctx(
**kwargs: Any,
) -> Generator[None, None, None]
Context manager that wraps set_planning_config to reset the config to its original setting afternwards.
Source code in daft/context.py
112 113 114 115 116 117 118 119 120 |
|
set_execution_config #
set_execution_config(
config: PyDaftExecutionConfig | None = None,
scan_tasks_min_size_bytes: int | None = None,
scan_tasks_max_size_bytes: int | None = None,
max_sources_per_scan_task: int | None = None,
broadcast_join_size_bytes_threshold: int | None = None,
parquet_split_row_groups_max_files: int | None = None,
sort_merge_join_sort_with_aligned_boundaries: bool
| None = None,
hash_join_partition_size_leniency: float | None = None,
sample_size_for_sort: int | None = None,
num_preview_rows: int | None = None,
parquet_target_filesize: int | None = None,
parquet_target_row_group_size: int | None = None,
parquet_inflation_factor: float | None = None,
csv_target_filesize: int | None = None,
csv_inflation_factor: float | None = None,
shuffle_aggregation_default_partitions: int
| None = None,
partial_aggregation_threshold: int | None = None,
high_cardinality_aggregation_threshold: float
| None = None,
read_sql_partition_size_bytes: int | None = None,
enable_aqe: bool | None = None,
default_morsel_size: int | None = None,
shuffle_algorithm: str | None = None,
pre_shuffle_merge_threshold: int | None = None,
flight_shuffle_dirs: list[str] | None = None,
enable_ray_tracing: bool | None = None,
scantask_splitting_level: int | None = None,
native_parquet_writer: bool | None = None,
flotilla: bool | None = None,
min_cpu_per_task: float | None = None,
) -> DaftContext
Globally sets various configuration parameters which control various aspects of Daft execution.
These configuration values are used when a Dataframe is executed (e.g. calls to DataFrame.write_*
, DataFrame.collect() or DataFrame.show()).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config | PyDaftExecutionConfig | None | A PyDaftExecutionConfig object to set the config to, before applying other kwargs. Defaults to None which indicates that the old (current) config should be used. | None |
scan_tasks_min_size_bytes | int | None | Minimum size in bytes when merging ScanTasks when reading files from storage. Increasing this value will make Daft perform more merging of files into a single partition before yielding, which leads to bigger but fewer partitions. (Defaults to 96 MiB) | None |
scan_tasks_max_size_bytes | int | None | Maximum size in bytes when merging ScanTasks when reading files from storage. Increasing this value will increase the upper bound of the size of merged ScanTasks, which leads to bigger but fewer partitions. (Defaults to 384 MiB) | None |
max_sources_per_scan_task | int | None | Maximum number of sources in a single ScanTask. (Defaults to 10) | None |
broadcast_join_size_bytes_threshold | int | None | If one side of a join is smaller than this threshold, a broadcast join will be used. Default is 10 MiB. | None |
parquet_split_row_groups_max_files | int | None | Maximum number of files to read in which the row group splitting should happen. (Defaults to 10) | None |
sort_merge_join_sort_with_aligned_boundaries | bool | None | Whether to use a specialized algorithm for sorting both sides of a sort-merge join such that they have aligned boundaries. This can lead to a faster merge-join at the cost of more skewed sorted join inputs, increasing the risk of OOMs. | None |
hash_join_partition_size_leniency | float | None | If the left side of a hash join is already correctly partitioned and the right side isn't, and the ratio between the left and right size is at least this value, then the right side is repartitioned to have an equal number of partitions as the left. Defaults to 0.5. | None |
sample_size_for_sort | int | None | number of elements to sample from each partition when running sort, Default is 20. | None |
num_preview_rows | int | None | number of rows to when showing a dataframe preview, Default is 8. | None |
parquet_target_filesize | int | None | Target File Size when writing out Parquet Files. Defaults to 512MB | None |
parquet_target_row_group_size | int | None | Target Row Group Size when writing out Parquet Files. Defaults to 128MB | None |
parquet_inflation_factor | float | None | Inflation Factor of parquet files (In-Memory-Size / File-Size) ratio. Defaults to 3.0 | None |
csv_target_filesize | int | None | Target File Size when writing out CSV Files. Defaults to 512MB | None |
csv_inflation_factor | float | None | Inflation Factor of CSV files (In-Memory-Size / File-Size) ratio. Defaults to 0.5 | None |
shuffle_aggregation_default_partitions | int | None | Maximum number of partitions to create when performing aggregations on the Ray Runner. Defaults to 200, unless the number of input partitions is less than 200. | None |
partial_aggregation_threshold | int | None | Threshold for performing partial aggregations on the Native Runner. Defaults to 10000 rows. | None |
high_cardinality_aggregation_threshold | float | None | Threshold selectivity for performing high cardinality aggregations on the Native Runner. Defaults to 0.8. | None |
read_sql_partition_size_bytes | int | None | Target size of partition when reading from SQL databases. Defaults to 512MB | None |
enable_aqe | bool | None | Enables Adaptive Query Execution, Defaults to False | None |
default_morsel_size | int | None | Default size of morsels used for the new local executor. Defaults to 131072 rows. | None |
shuffle_algorithm | str | None | The shuffle algorithm to use. Defaults to "auto", which will let Daft determine the algorithm. Options are "map_reduce" and "pre_shuffle_merge". | None |
pre_shuffle_merge_threshold | int | None | Memory threshold in bytes for pre-shuffle merge. Defaults to 1GB | None |
flight_shuffle_dirs | list[str] | None | The directories to use for flight shuffle. Defaults to ["/tmp"]. | None |
enable_ray_tracing | bool | None | Enable tracing for Ray. Accessible in | None |
scantask_splitting_level | int | None | How aggressively to split scan tasks. Setting this to | None |
native_parquet_writer | bool | None | Whether to use the native parquet writer vs the pyarrow parquet writer. Defaults to | None |
Source code in daft/context.py
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 |
|
execution_config_ctx #
execution_config_ctx(
**kwargs: Any,
) -> Generator[None, None, None]
Context manager that wraps set_execution_config to reset the config to its original setting afternwards.
Source code in daft/context.py
149 150 151 152 153 154 155 156 157 |
|
I/O Configurations#
Configure behavior when Daft interacts with storage (e.g. credentials, retry policies and various other knobs to control performance/resource usage)
These configurations are most often used as inputs to Daft when reading I/O functions such as in I/O.
IOConfig #
IOConfig(
s3: S3Config | None = None,
azure: AzureConfig | None = None,
gcs: GCSConfig | None = None,
http: HTTPConfig | None = None,
unity: UnityConfig | None = None,
)
Configuration for the native I/O layer, e.g. credentials for accessing cloud storage systems.
Methods:
Name | Description |
---|---|
replace | Replaces values if provided, returning a new IOConfig. |
Attributes:
Name | Type | Description |
---|---|---|
azure | AzureConfig | |
gcs | GCSConfig | |
http | HTTPConfig | |
s3 | S3Config | |
unity | UnityConfig | |
Source code in daft/daft/__init__.pyi
673 674 675 676 677 678 679 680 |
|
replace #
replace(
s3: S3Config | None = None,
azure: AzureConfig | None = None,
gcs: GCSConfig | None = None,
http: HTTPConfig | None = None,
unity: UnityConfig | None = None,
) -> IOConfig
Replaces values if provided, returning a new IOConfig.
Source code in daft/daft/__init__.pyi
681 682 683 684 685 686 687 688 689 690 |
|
S3Config #
S3Config(
region_name: str | None = None,
endpoint_url: str | None = None,
key_id: str | None = None,
session_token: str | None = None,
access_key: str | None = None,
credentials_provider: Callable[[], S3Credentials]
| None = None,
buffer_time: int | None = None,
max_connections: int | None = None,
retry_initial_backoff_ms: int | None = None,
connect_timeout_ms: int | None = None,
read_timeout_ms: int | None = None,
num_tries: int | None = None,
retry_mode: str | None = None,
anonymous: bool | None = None,
use_ssl: bool | None = None,
verify_ssl: bool | None = None,
check_hostname_ssl: bool | None = None,
requester_pays: bool | None = None,
force_virtual_addressing: bool | None = None,
profile_name: str | None = None,
)
I/O configuration for accessing an S3-compatible system.
Methods:
Name | Description |
---|---|
from_env | Creates an S3Config, retrieving credentials and configurations from the current environment. |
provide_cached_credentials | Wrapper around call to |
replace | Replaces values if provided, returning a new S3Config. |
Attributes:
Name | Type | Description |
---|---|---|
access_key | str | None | |
anonymous | bool | |
check_hostname_ssl | bool | |
connect_timeout_ms | int | |
credentials_provider | Callable[[], S3Credentials] | None | |
endpoint_url | str | None | |
force_virtual_addressing | bool | None | |
key_id | str | None | |
max_connections | int | |
num_tries | int | |
profile_name | str | None | |
read_timeout_ms | int | |
region_name | str | None | |
requester_pays | bool | None | |
retry_initial_backoff_ms | int | |
retry_mode | str | None | |
session_token | str | None | |
use_ssl | bool | |
verify_ssl | bool | |
Source code in daft/daft/__init__.pyi
488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 |
|
from_env #
from_env() -> S3Config
Creates an S3Config, retrieving credentials and configurations from the current environment.
Source code in daft/daft/__init__.pyi
536 537 538 539 |
|
provide_cached_credentials #
provide_cached_credentials() -> S3Credentials | None
Wrapper around call to S3Config.credentials_provider
to cache credentials until expiry.
Source code in daft/daft/__init__.pyi
541 542 543 |
|
replace #
replace(
region_name: str | None = None,
endpoint_url: str | None = None,
key_id: str | None = None,
session_token: str | None = None,
access_key: str | None = None,
credentials_provider: Callable[[], S3Credentials]
| None = None,
max_connections: int | None = None,
retry_initial_backoff_ms: int | None = None,
connect_timeout_ms: int | None = None,
read_timeout_ms: int | None = None,
num_tries: int | None = None,
retry_mode: str | None = None,
anonymous: bool | None = None,
use_ssl: bool | None = None,
verify_ssl: bool | None = None,
check_hostname_ssl: bool | None = None,
requester_pays: bool | None = None,
force_virtual_addressing: bool | None = None,
profile_name: str | None = None,
) -> S3Config
Replaces values if provided, returning a new S3Config.
Source code in daft/daft/__init__.pyi
511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 |
|
S3Credentials #
S3Credentials(
key_id: str,
access_key: str,
session_token: str | None = None,
expiry: datetime | None = None,
)
Attributes:
Name | Type | Description |
---|---|---|
access_key | str | |
expiry | datetime | None | |
key_id | str | |
session_token | str | None | |
Source code in daft/daft/__init__.pyi
551 552 553 554 555 556 557 |
|
GCSConfig #
GCSConfig(
project_id: str | None = None,
credentials: str | None = None,
token: str | None = None,
anonymous: bool | None = None,
max_connections: int | None = None,
retry_initial_backoff_ms: int | None = None,
connect_timeout_ms: int | None = None,
read_timeout_ms: int | None = None,
num_tries: int | None = None,
)
I/O configuration for accessing Google Cloud Storage.
Methods:
Name | Description |
---|---|
replace | Replaces values if provided, returning a new GCSConfig. |
Attributes:
Name | Type | Description |
---|---|---|
anonymous | bool | |
connect_timeout_ms | int | |
credentials | str | None | |
max_connections | int | |
num_tries | int | |
project_id | str | None | |
read_timeout_ms | int | |
retry_initial_backoff_ms | int | |
token | str | None | |
Source code in daft/daft/__init__.pyi
618 619 620 621 622 623 624 625 626 627 628 629 |
|
replace #
replace(
project_id: str | None = None,
credentials: str | None = None,
token: str | None = None,
anonymous: bool | None = None,
max_connections: int | None = None,
retry_initial_backoff_ms: int | None = None,
connect_timeout_ms: int | None = None,
read_timeout_ms: int | None = None,
num_tries: int | None = None,
) -> GCSConfig
Replaces values if provided, returning a new GCSConfig.
Source code in daft/daft/__init__.pyi
630 631 632 633 634 635 636 637 638 639 640 641 642 643 |
|
AzureConfig #
AzureConfig(
storage_account: str | None = None,
access_key: str | None = None,
sas_token: str | None = None,
bearer_token: str | None = None,
tenant_id: str | None = None,
client_id: str | None = None,
client_secret: str | None = None,
use_fabric_endpoint: bool | None = None,
anonymous: bool | None = None,
endpoint_url: str | None = None,
use_ssl: bool | None = None,
)
I/O configuration for accessing Azure Blob Storage.
Methods:
Name | Description |
---|---|
replace | Replaces values if provided, returning a new AzureConfig. |
Attributes:
Name | Type | Description |
---|---|---|
access_key | str | None | |
anonymous | bool | None | |
bearer_token | str | None | |
client_id | str | None | |
client_secret | str | None | |
endpoint_url | str | None | |
sas_token | str | None | |
storage_account | str | None | |
tenant_id | str | None | |
use_fabric_endpoint | bool | None | |
use_ssl | bool | None | |
Source code in daft/daft/__init__.pyi
574 575 576 577 578 579 580 581 582 583 584 585 586 587 |
|
replace #
replace(
storage_account: str | None = None,
access_key: str | None = None,
sas_token: str | None = None,
bearer_token: str | None = None,
tenant_id: str | None = None,
client_id: str | None = None,
client_secret: str | None = None,
use_fabric_endpoint: bool | None = None,
anonymous: bool | None = None,
endpoint_url: str | None = None,
use_ssl: bool | None = None,
) -> AzureConfig
Replaces values if provided, returning a new AzureConfig.
Source code in daft/daft/__init__.pyi
588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 |
|