General Guide for Parallel Computing with Dask#
Author: Tom Vo
Last Updated: 03/17/25 (v0.8.0)
Overview#
This notebook serves as a general guide for parallel computing with xCDAT. It covers the following topics:
Dask Best Practices
Xarray and Dask
An Overview of Chunking in Dask
Using a Dask Cluster for Scalable Computations
Code Example - Parallelizing xCDAT Computations with Dask (Local Machine/Login Node)
Code Example - Parallelizing xCDAT Computations with Dask (HPC/Compute Node)
More Resources
FAQs
The data used in this example can be found in the xcdat-data repository.
Users can install their own instance of xcdat and follow these examples using their own environment (e.g., with vscode, Jupyter, Spyder, iPython) or enable xcdat with existing JupyterHub instances. The conda environment used in this notebook includes xcdat, xesmf, matplotlib, ipython, ipykernel, cartopy, and jupyter:
conda create -n xcdat_notebook_dask -c conda-forge xcdat matplotlib ipython ipykernel cartopy nc-time-axis jupyter jupyter-server-proxy pooch
Source#
This notebook excerpts a information about Dask from other documentation pages and summarizes the general concepts and principles.
Main pages include:
Dask Best Practices#
Use NumPy
If your data fits comfortably in RAM and you are not performance bound, then using NumPy might be the right choice.
Dask adds another layer of complexity which may get in the way.
If you are just looking for speedups rather than scalability then you may want to consider a project like Numba
Select a good chunk size
A common performance problem among Dask Array users is that they have chosen a chunk size that is either too small (leading to lots of overhead) or poorly aligned with their data (leading to inefficient reading).
Orient your chunks
When reading data you should align your chunks with your storage format.
Avoid Oversubscribing Threads
By default Dask will run as many concurrent tasks as you have logical cores. It assumes that each task will consume about one core. However, many array-computing libraries are themselves multi-threaded, which can cause contention and low performance.
Consider Xarray
The Xarray package wraps around Dask Array, and so offers the same scalability, but also adds convenience when dealing with complex datasets
Build your own Operations
Often we want to perform computations for which there is no exact function in Dask Array. In these cases we may be able to use some of the more generic functions to build our own.
— https://docs.dask.org/en/stable/array-best-practices.html#best-practices
The Basics of Dask Arrays#
Dask divides arrays into many small pieces, called “chunks” (each presumed to be small enough to fit into memory)
Dask Array operations are lazy
Operations queue up a series of tasks mapped over blocks
No computation is performed until values need to be computed (hence “lazy”)
Data is loaded into memory and computation is performed in streaming fashion, block-by-block
Computation is controlled by multi-processing or thread pool
— https://docs.xarray.dev/en/stable/user-guide/dask.html

Xarray and Dask#

Why does Xarray integrate with Dask?
Xarray integrates with Dask to support parallel computations and streaming computation on datasets that don’t fit into memory. Currently, Dask is an entirely optional feature for xarray. However, the benefits of using Dask are sufficiently strong that Dask may become a required dependency in a future version of xarray.
— https://docs.xarray.dev/en/stable/use
Which Xarray features support Dask?
Nearly all existing xarray methods (including those for indexing, computation, concatenating and grouped operations) have been extended to work automatically with Dask arrays. When you load data as a Dask array in an xarray data structure, almost all xarray operations will keep it as a Dask array; when this is not possible, they will raise an exception rather than unexpectedly loading data into memory.
— https://docs.xarray.dev/en/stable/user-guide/dask.html#using-dask-with-xarray
What is the default Dask behavior for distributing work on compute hardware?
By default, dask uses its multi-threaded scheduler, which distributes work across multiple cores and allows for processing some datasets that do not fit into memory. For running across a cluster, setup the distributed scheduler.
— https://docs.xarray.dev/en/stable/user-guide/dask.html#using-dask-with-xarray
How do I use Dask arrays in an ``xarray.Dataset``?
The usual way to create a Dataset filled with Dask arrays is to load the data from a netCDF file or files. You can do this by supplying a
chunksargument to open_dataset() or using the open_mfdataset() function.
What happens if I don’t specify ``chunks`` with ``open_mfdataset()``?
open_mfdataset()called withoutchunksargument will return dask arrays with chunk sizes equal to the individual files. Re-chunking the dataset after creation withds.chunk()will lead to an ineffective use of memory and is not recommended.
— https://docs.xarray.dev/en/stable/user-guide/dask.html#reading-and-writing-data
An Overview of Chunking in Dask#
For performance, a good choice of
chunksfollows the following rules:
A chunk should be small enough to fit comfortably in memory. We’ll have many chunks in memory at once
A chunk must be large enough so that computations on that chunk take significantly longer than the 1ms overhead per task that Dask scheduling incurs. A task should take longer than 100ms
Chunk sizes between 10MB-1GB are common, depending on the availability of RAM and the duration of computations
Chunks should align with the computation that you want to do.
For example, if you plan to frequently slice along a particular dimension, then it’s more efficient if your chunks are aligned so that you have to touch fewer chunks. If you want to add two arrays, then its convenient if those arrays have matching chunks patterns
Chunks should align with your storage, if applicable.
Array data formats are often chunked as well. When loading or saving data, if is useful to have Dask array chunks that are aligned with the chunking of your storage, often an even multiple times larger in each direction
— https://docs.dask.org/en/latest/array-chunks.html
Good rule of thumb with chunking#
Create arrays with a minimum chunksize of at least one million elements (e.g., a 1000x1000 > matrix).
With large arrays (10+ GB), the cost of queueing up Dask operations can be noticeable and you may need even > larger chunksizes.
Alternatively, you can let Dask automatically chunk for you then optimize after#
Dask Arrays can look for a
.chunksattribute and use that to provide baseline chunking. This can help prevent users from specifying “too many chunks” and “too few chunks” which can lead to performance issues.Automatic chunking expands or contracts all dimensions marked with
"auto"to try to reach chunk sizes with a number of bytes equal to the config valuearray.chunk-size, which is set to 128MiB by default, but which you can change in your configuration.Notice: Although Dask’s chunk auto-scaling tries its best to optimally align chunks to the ideal sizes using
array.chunks-size, the auto-scaling is not optimal for ALL use cases. It is still recommended to manually chunk for ideal sizes once you are comfortable doing so.
— https://docs.dask.org/en/latest/array-chunks.html#automatic-chunking
Chunking with Xarray and xCDAT using the chunks parameter#
To do this in open_dataset() and open_mfdataset(), you need to specify the chunks parameter either by:
chunks={"time": "10"}- chunk the specified dimension(s) by a specified number integerchunks={"time": "auto"}- auto-scale the specified dimension(s) to get to accommodate ideal chunk sizes. In this example, replace"time"and/or add additional dims to the dictionary for additional auto-scaling.chunks="auto"- allow chunking all dimensions to accommodate ideal chunk sizes
— https://docs.xarray.dev/en/stable/user-guide/dask.html#chunking-and-performance
Using a Dask Cluster for Scalable Computations#
All of the large-scale Dask collections like Dask Array, Dask DataFrame, and Dask Bag and the fine-grained APIs like delayed and futures generate task graphs where each node in the graph is a normal Python function and edges between nodes are normal Python objects that are created by one task as outputs and used as inputs in another task.
After Dask generates these task graphs, it needs to execute them on parallel hardware. This is the job of a task scheduler.
Different task schedulers exist, and each will consume a task graph and compute the same result, but with different performance characteristics. Dask has two families of task schedulers:
Single-machine scheduler: This scheduler provides basic features on a local process or thread pool. This scheduler was made first and is the default. It is simple and cheap to use, although it can only be used on a single machine and does not scale
Distributed scheduler: This scheduler is more sophisticated, offers more features, but also requires a bit more effort to set up. It can run locally or distributed across a cluster
— https://docs.dask.org/en/stable/scheduling.html
— https://docs.dask.org/en/stable/scheduling.html#dask-distributed-local
Code Example - Parallelizing xCDAT Computations with Dask (Local Machine/Login Node)#
Notice: This section sets up a local cluster directly on whatever machine you’re working on. If you are on a login node, it will setup the cluster on there. The login node might suffice for smallers jobs requiring less compute power. For larger jobs on HPC environments that require more compute power, make sure to look at Code Example - Parallelizing xCDAT Computations with Dask (HPC/Compute Node).
Disclaimer: The dataset used in the example is only a few hundred MBs to make downloading the input fast. This notebook demonstrates how to get up and running with Dask quickly, and does not aim to show the real-world performance improvements on large datasets.
Initial Setup#
[1]:
import xcdat as xc
ds = xc.tutorial.open_dataset("tas_amon_access")
/opt/miniconda3/envs/xcdat_notebook_dask/lib/python3.13/site-packages/esmpy/interface/loadESMF.py:94: VersionWarning: ESMF installation version 8.8.0, ESMPy version 8.8.0b0
warnings.warn("ESMF installation version {}, ESMPy version {}".format(
1. Setup the Dask Cluster#
Note: You can skip “1. Setup the Dask Cluster” and “2. Open the Dask Dashboard UI” if you want to use Xarray with Dask’s default multi-threaded scheduler with a chunking configuration of 128 MiB per chunk. However, it is highly recommended that you set up the Dask cluster with the instructions below to enable more precise Dask configuration based on your machine specifications and resource requirements.
We will quickly setup a local cluster using the Dask Client and LocalCluster Python modules.
You can configure the Dask Client (e.g., memory limit) to your needs. In this case, we are deploying a cluster with:
n_workers=2: 2 workersthreads_per_worker=1: 1 thread per worker, since we’re using processes instead of threadsmemory_limit="4GB": 4 GB memory limit per worker, dependent on the availability memory in your system. If thememory_limitgiven is greater than the available memory, the total available memory will be set for each workerprocesses=True: use processes instead of threads (preferred for most Python code)
For info on cluster configurations, visit these links:
[ ]:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(
n_workers=2, threads_per_worker=1, memory_limit="4G", processes=True
)
client = Client(cluster)
2025-03-17 13:24:02,343 [INFO]: scheduler.py(__init__:1754) >> State start
2025-03-17 13:24:02,343 [INFO]: scheduler.py(__init__:1754) >> State start
2025-03-17 13:24:02,345 [INFO]: diskutils.py(_check_lock_or_purge:252) >> Found stale lock file and directory '/var/folders/_h/t3wvkks5643fxnv07_kx9cx8000zpt/T/dask-scratch-space/worker-f2s9iesu', purging
2025-03-17 13:24:02,345 [INFO]: diskutils.py(_check_lock_or_purge:252) >> Found stale lock file and directory '/var/folders/_h/t3wvkks5643fxnv07_kx9cx8000zpt/T/dask-scratch-space/worker-f2s9iesu', purging
2025-03-17 13:24:02,347 [INFO]: diskutils.py(_check_lock_or_purge:252) >> Found stale lock file and directory '/var/folders/_h/t3wvkks5643fxnv07_kx9cx8000zpt/T/dask-scratch-space/scheduler-7wi1xgzo', purging
2025-03-17 13:24:02,347 [INFO]: diskutils.py(_check_lock_or_purge:252) >> Found stale lock file and directory '/var/folders/_h/t3wvkks5643fxnv07_kx9cx8000zpt/T/dask-scratch-space/scheduler-7wi1xgzo', purging
2025-03-17 13:24:02,348 [INFO]: diskutils.py(_check_lock_or_purge:252) >> Found stale lock file and directory '/var/folders/_h/t3wvkks5643fxnv07_kx9cx8000zpt/T/dask-scratch-space/worker-p55nwy0y', purging
2025-03-17 13:24:02,348 [INFO]: diskutils.py(_check_lock_or_purge:252) >> Found stale lock file and directory '/var/folders/_h/t3wvkks5643fxnv07_kx9cx8000zpt/T/dask-scratch-space/worker-p55nwy0y', purging
2025-03-17 13:24:02,351 [INFO]: scheduler.py(start_unsafe:4209) >> Scheduler at: tcp://127.0.0.1:59680
2025-03-17 13:24:02,351 [INFO]: scheduler.py(start_unsafe:4209) >> Scheduler at: tcp://127.0.0.1:59680
2025-03-17 13:24:02,352 [INFO]: scheduler.py(start_unsafe:4224) >> dashboard at: http://127.0.0.1:8787/status
2025-03-17 13:24:02,352 [INFO]: scheduler.py(start_unsafe:4224) >> dashboard at: http://127.0.0.1:8787/status
2025-03-17 13:24:02,352 [INFO]: scheduler.py(register_worker_plugin:8109) >> Registering Worker plugin shuffle
2025-03-17 13:24:02,352 [INFO]: scheduler.py(register_worker_plugin:8109) >> Registering Worker plugin shuffle
2025-03-17 13:24:02,366 [INFO]: nanny.py(start_unsafe:368) >> Start Nanny at: 'tcp://127.0.0.1:59683'
2025-03-17 13:24:02,366 [INFO]: nanny.py(start_unsafe:368) >> Start Nanny at: 'tcp://127.0.0.1:59683'
2025-03-17 13:24:02,384 [INFO]: nanny.py(start_unsafe:368) >> Start Nanny at: 'tcp://127.0.0.1:59685'
2025-03-17 13:24:02,384 [INFO]: nanny.py(start_unsafe:368) >> Start Nanny at: 'tcp://127.0.0.1:59685'
2025-03-17 13:24:03,034 [INFO]: scheduler.py(add_worker:4563) >> Register worker addr: tcp://127.0.0.1:59689 name: 1
2025-03-17 13:24:03,034 [INFO]: scheduler.py(add_worker:4563) >> Register worker addr: tcp://127.0.0.1:59689 name: 1
2025-03-17 13:24:03,037 [INFO]: scheduler.py(handle_worker:6163) >> Starting worker compute stream, tcp://127.0.0.1:59689
2025-03-17 13:24:03,037 [INFO]: scheduler.py(handle_worker:6163) >> Starting worker compute stream, tcp://127.0.0.1:59689
2025-03-17 13:24:03,037 [INFO]: core.py(handle_stream:883) >> Starting established connection to tcp://127.0.0.1:59693
2025-03-17 13:24:03,037 [INFO]: core.py(handle_stream:883) >> Starting established connection to tcp://127.0.0.1:59693
2025-03-17 13:24:03,038 [INFO]: scheduler.py(add_worker:4563) >> Register worker addr: tcp://127.0.0.1:59688 name: 0
2025-03-17 13:24:03,038 [INFO]: scheduler.py(add_worker:4563) >> Register worker addr: tcp://127.0.0.1:59688 name: 0
2025-03-17 13:24:03,039 [INFO]: scheduler.py(handle_worker:6163) >> Starting worker compute stream, tcp://127.0.0.1:59688
2025-03-17 13:24:03,039 [INFO]: scheduler.py(handle_worker:6163) >> Starting worker compute stream, tcp://127.0.0.1:59688
2025-03-17 13:24:03,039 [INFO]: core.py(handle_stream:883) >> Starting established connection to tcp://127.0.0.1:59692
2025-03-17 13:24:03,039 [INFO]: core.py(handle_stream:883) >> Starting established connection to tcp://127.0.0.1:59692
2025-03-17 13:24:03,064 [INFO]: scheduler.py(add_client:5898) >> Receive client connection: Client-c4724026-036d-11f0-8543-e2398742200b
2025-03-17 13:24:03,064 [INFO]: scheduler.py(add_client:5898) >> Receive client connection: Client-c4724026-036d-11f0-8543-e2398742200b
2025-03-17 13:24:03,064 [INFO]: core.py(handle_stream:883) >> Starting established connection to tcp://127.0.0.1:59695
2025-03-17 13:24:03,064 [INFO]: core.py(handle_stream:883) >> Starting established connection to tcp://127.0.0.1:59695
2025-03-17 13:24:37,604 [INFO]: scheduler.py(remove_client:5943) >> Remove client Client-c4724026-036d-11f0-8543-e2398742200b
2025-03-17 13:24:37,604 [INFO]: scheduler.py(remove_client:5943) >> Remove client Client-c4724026-036d-11f0-8543-e2398742200b
2025-03-17 13:24:37,605 [INFO]: core.py(handle_stream:908) >> Received 'close-stream' from tcp://127.0.0.1:59695; closing.
2025-03-17 13:24:37,605 [INFO]: core.py(handle_stream:908) >> Received 'close-stream' from tcp://127.0.0.1:59695; closing.
2025-03-17 13:24:37,607 [INFO]: scheduler.py(remove_client:5943) >> Remove client Client-c4724026-036d-11f0-8543-e2398742200b
2025-03-17 13:24:37,607 [INFO]: scheduler.py(remove_client:5943) >> Remove client Client-c4724026-036d-11f0-8543-e2398742200b
2025-03-17 13:24:37,613 [INFO]: scheduler.py(add_client:5935) >> Close client connection: Client-c4724026-036d-11f0-8543-e2398742200b
2025-03-17 13:24:37,613 [INFO]: scheduler.py(add_client:5935) >> Close client connection: Client-c4724026-036d-11f0-8543-e2398742200b
[3]:
client.cluster
2. Open the Dask Dashboard UI#
The Dask distributed scheduler provides an interactive dashboard containing many plots and tables with live information.
Check this Dask documentation page to learn how to interpret the information. There is also a general guide later down in the notebook.
Here’s an example:

Open the link to the dashboard#
[4]:
client.dashboard_link
[4]:
'http://127.0.0.1:8787/status'
3. Open a dataset with xCDAT and chunk it#
[5]:
ds = xc.tutorial.open_dataset("tas_amon_access", chunks="auto")
ds
[5]:
<xarray.Dataset> Size: 7MB
Dimensions: (time: 60, bnds: 2, lat: 145, lon: 192)
Coordinates:
* lat (lat) float64 1kB -90.0 -88.75 -87.5 -86.25 ... 87.5 88.75 90.0
* lon (lon) float64 2kB 0.0 1.875 3.75 5.625 ... 354.4 356.2 358.1
height float64 8B ...
* time (time) object 480B 1870-01-16 12:00:00 ... 1874-12-16 12:00:00
Dimensions without coordinates: bnds
Data variables:
time_bnds (time, bnds) object 960B dask.array<chunksize=(60, 2), meta=np.ndarray>
lat_bnds (lat, bnds) float64 2kB dask.array<chunksize=(145, 2), meta=np.ndarray>
lon_bnds (lon, bnds) float64 3kB dask.array<chunksize=(192, 2), meta=np.ndarray>
tas (time, lat, lon) float32 7MB dask.array<chunksize=(60, 145, 192), meta=np.ndarray>
Attributes: (12/48)
Conventions: CF-1.7 CMIP-6.2
activity_id: CMIP
branch_method: standard
branch_time_in_child: 0.0
branch_time_in_parent: 87658.0
creation_date: 2020-06-05T04:06:11Z
... ...
variant_label: r10i1p1f1
version: v20200605
license: CMIP6 model data produced by CSIRO is li...
cmor_version: 3.4.0
tracking_id: hdl:21.14100/af78ae5e-f3a6-4e99-8cfe-5f2...
DODS_EXTRA.Unlimited_Dimension: timeNotice how the data variables contain dask.array.
4. Run your computations while viewing the dashboards in your browser#
Bad signs to watch out for in the dashboard#
Taskstream plot:
Lots of white space in the task stream plot
Nothing is happening.
Chunks may be too small.
Lots and lots of red in the task stream plot
Represents worker communication.
Dask workers need some communication, but if they are doing almost nothing except communication then there is not much productive work going on.
Worker memory plot:
Orange bars bars which are a sign you are getting close to the memory limit.
Chunks may be too big.
Gray bars which mean data is being spilled to disk.
Chunks may be too big.
— https://blog.dask.org/2021/11/02/choosing-dask-chunk-sizes
First, we queue up the lazy group_average operation in Dask#
[6]:
tas_avg = ds.temporal.group_average("tas", freq="month")
tas_avg
[6]:
<xarray.Dataset> Size: 13MB
Dimensions: (lat: 145, bnds: 2, lon: 192, time: 60)
Coordinates:
* lat (lat) float64 1kB -90.0 -88.75 -87.5 -86.25 ... 87.5 88.75 90.0
* lon (lon) float64 2kB 0.0 1.875 3.75 5.625 ... 352.5 354.4 356.2 358.1
height float64 8B ...
* time (time) object 480B 1870-01-01 00:00:00 ... 1874-12-01 00:00:00
Dimensions without coordinates: bnds
Data variables:
lat_bnds (lat, bnds) float64 2kB dask.array<chunksize=(145, 2), meta=np.ndarray>
lon_bnds (lon, bnds) float64 3kB dask.array<chunksize=(192, 2), meta=np.ndarray>
tas (time, lat, lon) float64 13MB dask.array<chunksize=(1, 145, 192), meta=np.ndarray>
Attributes: (12/48)
Conventions: CF-1.7 CMIP-6.2
activity_id: CMIP
branch_method: standard
branch_time_in_child: 0.0
branch_time_in_parent: 87658.0
creation_date: 2020-06-05T04:06:11Z
... ...
variant_label: r10i1p1f1
version: v20200605
license: CMIP6 model data produced by CSIRO is li...
cmor_version: 3.4.0
tracking_id: hdl:21.14100/af78ae5e-f3a6-4e99-8cfe-5f2...
DODS_EXTRA.Unlimited_Dimension: timeNow we can trigger the queued up operations in the Dask task graph using .compute() or .load()
.compute() - Manually trigger loading and/or computation of this dataset’s data from disk or a remote source into memory and return a new dataset. Unlike load, the original dataset is left unaltered.
.load() - Manually trigger loading and/or computation of this dataset’s data from disk or a remote source into memory and return this dataset. Unlike compute, the original dataset is modified and returned.
[7]:
# Or call tas_avg.load() to modify the original dataset.
tas_avg_res = tas_avg.compute()
/opt/miniconda3/envs/xcdat_notebook_dask/lib/python3.13/site-packages/esmpy/interface/loadESMF.py:94: VersionWarning: ESMF installation version 8.8.0, ESMPy version 8.8.0b0
warnings.warn("ESMF installation version {}, ESMPy version {}".format(
Explicit close the Dask Client#
It will close automatically when closing the notebook or killing the Python session too.
[8]:
client.close()
That’s it! You just performed parallel computing using Dask on your local machine!#
If you are on an HPC environment where interactive jobs must be requested on compute nodes for larger scale computations, please follow the guide below.
Code Example - Parallelizing xCDAT Computations with Dask (HPC/Compute Node)#
On an HPC environment, you typically need to request a interactive job on a compute node(s) for your computational needs. The guide below shows you how to do just that.
1. Start an interactive job#
The first step if you’re an HPC environment is to start a Dask job to get nodes.
Depending on your HPC environment, you might need to run salloc, srun, etc. to start an interactive job. Please refer to the documentation for your HPC environment. Here’s the documentation from NERSC for Interactive Jobs.
In the example below on the NERSC Perlmutter machine, we’re requesting 1 CPU node from the reservation and divide the resources on that node among 10 Dask workers. If you’re working on a reservation, request the respective number of hours needed for your node (e.g., 4 hrs/240 mins). When you are working outside a reservation, its best to limit your requested time to your expected working window.
salloc --reservation=dask_day1 -C cpu -N 1 -n 10 -A ntrain5 -t 240
It may take a moment to start, but once it does, you’ll get your prompt back.
Processes or Threads?#
Python has the global interpreter lock (GIL), which basically means that Python does not leverage multiple threads well.
The general exceptions to this rule include code that is mainly I/O (e.g., downloading data) or code that leverages mostly C++ and other non-Python libraries (e.g., NumPy).
For most use cases, using processes over threads might make more sense. Here’s an example with dask worker:
I have 20 GB of RAM
I have 2 cores. So I want 2 workers.
This means each worker can consume 10 GB of RAM.
$ dask worker tcp://127.0.0.1:8786 --nworkers 2 --nthreads 1 --memory-limit 20GB
2. Start the Dask Cluster#
There are several ways to deploy a Dask Cluster. You can check them out here. For this section, we’ll be manually deploying a cluster in your HPC environment.
Open a terminal and start the Dask scheduler:
$ conda activate xcdat_notebook_dask
$ dask scheduler
The default address for the dask scheduler is tcp://127.0.0.1:8786.
Open a second terminal and start a Dask worker:
$ conda activate xcdat_notebook_dask
$ dask worker tcp://127.0.0.1:8786 --nworkers 2 --nthreads=1 --memory-limit 20GB
3. Connect the Dask Client to the Cluster#
[9]:
# For the purpose of this notebook, ignore the VersionMismatchWarning if you get
# one.
# Related issue: https://github.com/dask/distributed/issues/3767
client_hpc = Client("tcp://127.0.0.1:8786")
You should see this output in your dask scheduler terminal:
2024-04-30 10:24:42,155 - distributed.core - INFO - Starting established connection to tcp://127.0.0.1:47522
2024-04-30 10:24:44,609 - distributed.scheduler - INFO - Receive client connection: Client-894d078d-0716-11ef-9ffd-f4e9d4af2192
2024-04-30 10:24:44,610 - distributed.core - INFO - Starting established connection to tcp://127.0.0.1:47748
[10]:
client_hpc
[10]:
Client
Client-0187f5e6-036e-11f0-8543-e2398742200b
| Connection method: Direct | |
| Dashboard: http://127.0.0.1:59876/status |
Scheduler Info
Scheduler
Scheduler-2231af6a-c89f-4001-9c7e-6bddfc517811
| Comm: tcp://192.168.1.102:8786 | Workers: 2 |
| Dashboard: http://192.168.1.102:59876/status | Total threads: 2 |
| Started: Just now | Total memory: 32.00 GiB |
Workers
Worker: tcp://127.0.0.1:59914
| Comm: tcp://127.0.0.1:59914 | Total threads: 1 |
| Dashboard: http://127.0.0.1:59917/status | Memory: 16.00 GiB |
| Nanny: tcp://127.0.0.1:59911 | |
| Local directory: /var/folders/_h/t3wvkks5643fxnv07_kx9cx8000zpt/T/dask-scratch-space/worker-pt1h62s2 | |
| Tasks executing: | Tasks in memory: |
| Tasks ready: | Tasks in flight: |
| CPU usage: 2.0% | Last seen: Just now |
| Memory usage: 145.27 MiB | Spilled bytes: 0 B |
| Read bytes: 17.97 kiB | Write bytes: 329.44 kiB |
Worker: tcp://127.0.0.1:59915
| Comm: tcp://127.0.0.1:59915 | Total threads: 1 |
| Dashboard: http://127.0.0.1:59916/status | Memory: 16.00 GiB |
| Nanny: tcp://127.0.0.1:59909 | |
| Local directory: /var/folders/_h/t3wvkks5643fxnv07_kx9cx8000zpt/T/dask-scratch-space/worker-f1czipzq | |
| Tasks executing: | Tasks in memory: |
| Tasks ready: | Tasks in flight: |
| CPU usage: 2.1% | Last seen: Just now |
| Memory usage: 145.34 MiB | Spilled bytes: 0 B |
| Read bytes: 17.97 kiB | Write bytes: 329.53 kiB |
4. Perform Computations#
[11]:
ds_hpc = xc.tutorial.open_dataset("tas_amon_access", chunks="auto")
tas_avg_hpc = ds.temporal.group_average("tas", freq="month")
[12]:
tas_avg_hpc_res = tas_avg_hpc.compute()
NOTE: You might see these warnings:
From the code cell above:
UserWarning: Sending large graph of size 426.18 MiB.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
In the dask worker terminal:
2024-04-30 10:26:04,806 - distributed.core - INFO - Event loop was unresponsive in Worker for 8.60s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
These warnings indicate sub-optimal Dask configurations based on the available resources, non-appropriate usage of Dask for parallelizing (e.g., very small files) , etc. As noted earlier, we are using a small dataset in the example for ease of download, which is not necessarily the best use of Dask.
More Resources#
To learn more in-depth about Dask and Xarray, please check these resources out:
FAQs#
Are there any other optimizations tips for working with Dask and Xarray?#
We HIGHLY recommend checking out the Optimization Tips section if you are using Dask with Xarray
Are there cases where xCDAT loads Dask arrays into memory?#
As of xarray=2023.5.0, Xarray does not support updating/setting multi-dimensional dask arrays. The following error is raised if this is attempted: xarray can't set arrays with multiple array indices to dask yet.
As a workaround, xCDAT loads coordinate bounds into memory if they are multi-dimensional Dask arrays before performing operations or computations. This loading occurs in the following APIs:
xcdat.axis.swap_lon_axisswapping longitude axis orientation
aligning longitude bounds to (0, 360) axis
xarray.Dataset.spatial.averagegenerating weights using lat/lon coordinate bounds
swapping longitude axis orientation
scaling domain bounds to a specified region
xcdat.Dataset.temporal.<average|group_average|climatology|departures>generating weights using time coordinate bounds