General Guide for Parallel Computing with Dask#

Author: Tom Vo

Last Updated: 03/17/25 (v0.8.0)

Overview#

This notebook serves as a general guide for parallel computing with xCDAT. It covers the following topics:

  • Dask Best Practices

  • Xarray and Dask

  • An Overview of Chunking in Dask

  • Using a Dask Cluster for Scalable Computations

  • Code Example - Parallelizing xCDAT Computations with Dask (Local Machine/Login Node)

  • Code Example - Parallelizing xCDAT Computations with Dask (HPC/Compute Node)

  • More Resources

  • FAQs

The data used in this example can be found in the xcdat-data repository.

Users can install their own instance of xcdat and follow these examples using their own environment (e.g., with vscode, Jupyter, Spyder, iPython) or enable xcdat with existing JupyterHub instances. The conda environment used in this notebook includes xcdat, xesmf, matplotlib, ipython, ipykernel, cartopy, and jupyter:

conda create -n xcdat_notebook_dask -c conda-forge xcdat matplotlib ipython ipykernel cartopy nc-time-axis jupyter jupyter-server-proxy pooch

Source#

This notebook excerpts a information about Dask from other documentation pages and summarizes the general concepts and principles.

Main pages include:

Dask Best Practices#

Dask logo

  • Use NumPy

    • If your data fits comfortably in RAM and you are not performance bound, then using NumPy might be the right choice.

    • Dask adds another layer of complexity which may get in the way.

    • If you are just looking for speedups rather than scalability then you may want to consider a project like Numba

  • Select a good chunk size

    • A common performance problem among Dask Array users is that they have chosen a chunk size that is either too small (leading to lots of overhead) or poorly aligned with their data (leading to inefficient reading).

  • Orient your chunks

    • When reading data you should align your chunks with your storage format.

  • Avoid Oversubscribing Threads

    • By default Dask will run as many concurrent tasks as you have logical cores. It assumes that each task will consume about one core. However, many array-computing libraries are themselves multi-threaded, which can cause contention and low performance.

  • Consider Xarray

    • The Xarray package wraps around Dask Array, and so offers the same scalability, but also adds convenience when dealing with complex datasets

  • Build your own Operations

    • Often we want to perform computations for which there is no exact function in Dask Array. In these cases we may be able to use some of the more generic functions to build our own.

https://docs.dask.org/en/stable/array-best-practices.html#best-practices

The Basics of Dask Arrays#

  • Dask divides arrays into many small pieces, called “chunks” (each presumed to be small enough to fit into memory)

  • Dask Array operations are lazy

    • Operations queue up a series of tasks mapped over blocks

    • No computation is performed until values need to be computed (hence “lazy”)

    • Data is loaded into memory and computation is performed in streaming fashion, block-by-block

  • Computation is controlled by multi-processing or thread pool

https://docs.xarray.dev/en/stable/user-guide/dask.html

Dask Array

Xarray and Dask#

xarray logo

Why does Xarray integrate with Dask?

Xarray integrates with Dask to support parallel computations and streaming computation on datasets that don’t fit into memory. Currently, Dask is an entirely optional feature for xarray. However, the benefits of using Dask are sufficiently strong that Dask may become a required dependency in a future version of xarray.

https://docs.xarray.dev/en/stable/use

Which Xarray features support Dask?

Nearly all existing xarray methods (including those for indexing, computation, concatenating and grouped operations) have been extended to work automatically with Dask arrays. When you load data as a Dask array in an xarray data structure, almost all xarray operations will keep it as a Dask array; when this is not possible, they will raise an exception rather than unexpectedly loading data into memory.

https://docs.xarray.dev/en/stable/user-guide/dask.html#using-dask-with-xarray

What is the default Dask behavior for distributing work on compute hardware?

By default, dask uses its multi-threaded scheduler, which distributes work across multiple cores and allows for processing some datasets that do not fit into memory. For running across a cluster, setup the distributed scheduler.

https://docs.xarray.dev/en/stable/user-guide/dask.html#using-dask-with-xarray

How do I use Dask arrays in an ``xarray.Dataset``?

The usual way to create a Dataset filled with Dask arrays is to load the data from a netCDF file or files. You can do this by supplying a chunks argument to open_dataset() or using the open_mfdataset() function.

What happens if I don’t specify ``chunks`` with ``open_mfdataset()``?

open_mfdataset() called without chunks argument will return dask arrays with chunk sizes equal to the individual files. Re-chunking the dataset after creation with ds.chunk() will lead to an ineffective use of memory and is not recommended.

https://docs.xarray.dev/en/stable/user-guide/dask.html#reading-and-writing-data

An Overview of Chunking in Dask#

For performance, a good choice of chunks follows the following rules:

  1. A chunk should be small enough to fit comfortably in memory. We’ll have many chunks in memory at once

  2. A chunk must be large enough so that computations on that chunk take significantly longer than the 1ms overhead per task that Dask scheduling incurs. A task should take longer than 100ms

  3. Chunk sizes between 10MB-1GB are common, depending on the availability of RAM and the duration of computations

  4. Chunks should align with the computation that you want to do.

    • For example, if you plan to frequently slice along a particular dimension, then it’s more efficient if your chunks are aligned so that you have to touch fewer chunks. If you want to add two arrays, then its convenient if those arrays have matching chunks patterns

  5. Chunks should align with your storage, if applicable.

    • Array data formats are often chunked as well. When loading or saving data, if is useful to have Dask array chunks that are aligned with the chunking of your storage, often an even multiple times larger in each direction

https://docs.dask.org/en/latest/array-chunks.html

Good rule of thumb with chunking#

  • Create arrays with a minimum chunksize of at least one million elements (e.g., a 1000x1000 > matrix).

  • With large arrays (10+ GB), the cost of queueing up Dask operations can be noticeable and you may need even > larger chunksizes.

Alternatively, you can let Dask automatically chunk for you then optimize after#

  • Dask Arrays can look for a .chunks attribute and use that to provide baseline chunking. This can help prevent users from specifying “too many chunks” and “too few chunks” which can lead to performance issues.

  • Automatic chunking expands or contracts all dimensions marked with "auto" to try to reach chunk sizes with a number of bytes equal to the config value array.chunk-size, which is set to 128MiB by default, but which you can change in your configuration.

  • Notice: Although Dask’s chunk auto-scaling tries its best to optimally align chunks to the ideal sizes using array.chunks-size, the auto-scaling is not optimal for ALL use cases. It is still recommended to manually chunk for ideal sizes once you are comfortable doing so.

https://docs.dask.org/en/latest/array-chunks.html#automatic-chunking

Chunking with Xarray and xCDAT using the chunks parameter#

To do this in open_dataset() and open_mfdataset(), you need to specify the chunks parameter either by:

  1. chunks={"time": "10"} - chunk the specified dimension(s) by a specified number integer

  2. chunks={"time": "auto"} - auto-scale the specified dimension(s) to get to accommodate ideal chunk sizes. In this example, replace "time" and/or add additional dims to the dictionary for additional auto-scaling.

  3. chunks="auto" - allow chunking all dimensions to accommodate ideal chunk sizes

https://docs.xarray.dev/en/stable/user-guide/dask.html#chunking-and-performance

Using a Dask Cluster for Scalable Computations#

  1. All of the large-scale Dask collections like Dask Array, Dask DataFrame, and Dask Bag and the fine-grained APIs like delayed and futures generate task graphs where each node in the graph is a normal Python function and edges between nodes are normal Python objects that are created by one task as outputs and used as inputs in another task.

  2. After Dask generates these task graphs, it needs to execute them on parallel hardware. This is the job of a task scheduler.

  3. Different task schedulers exist, and each will consume a task graph and compute the same result, but with different performance characteristics. Dask has two families of task schedulers:

    • Single-machine scheduler: This scheduler provides basic features on a local process or thread pool. This scheduler was made first and is the default. It is simple and cheap to use, although it can only be used on a single machine and does not scale

    • Distributed scheduler: This scheduler is more sophisticated, offers more features, but also requires a bit more effort to set up. It can run locally or distributed across a cluster

https://docs.dask.org/en/stable/scheduling.html

Dask Schedulers

https://docs.dask.org/en/stable/scheduling.html#dask-distributed-local

Code Example - Parallelizing xCDAT Computations with Dask (Local Machine/Login Node)#

Notice: This section sets up a local cluster directly on whatever machine you’re working on. If you are on a login node, it will setup the cluster on there. The login node might suffice for smallers jobs requiring less compute power. For larger jobs on HPC environments that require more compute power, make sure to look at Code Example - Parallelizing xCDAT Computations with Dask (HPC/Compute Node).

Disclaimer: The dataset used in the example is only a few hundred MBs to make downloading the input fast. This notebook demonstrates how to get up and running with Dask quickly, and does not aim to show the real-world performance improvements on large datasets.

Initial Setup#

[1]:
import xcdat as xc

ds = xc.tutorial.open_dataset("tas_amon_access")
/opt/miniconda3/envs/xcdat_notebook_dask/lib/python3.13/site-packages/esmpy/interface/loadESMF.py:94: VersionWarning: ESMF installation version 8.8.0, ESMPy version 8.8.0b0
  warnings.warn("ESMF installation version {}, ESMPy version {}".format(

1. Setup the Dask Cluster#

Note: You can skip “1. Setup the Dask Cluster” and “2. Open the Dask Dashboard UI” if you want to use Xarray with Dask’s default multi-threaded scheduler with a chunking configuration of 128 MiB per chunk. However, it is highly recommended that you set up the Dask cluster with the instructions below to enable more precise Dask configuration based on your machine specifications and resource requirements.

We will quickly setup a local cluster using the Dask Client and LocalCluster Python modules.

You can configure the Dask Client (e.g., memory limit) to your needs. In this case, we are deploying a cluster with:

  • n_workers=2: 2 workers

  • threads_per_worker=1: 1 thread per worker, since we’re using processes instead of threads

  • memory_limit="4GB": 4 GB memory limit per worker, dependent on the availability memory in your system. If the memory_limit given is greater than the available memory, the total available memory will be set for each worker

  • processes=True: use processes instead of threads (preferred for most Python code)

For info on cluster configurations, visit these links:

[ ]:
from dask.distributed import Client, LocalCluster

cluster = LocalCluster(
    n_workers=2, threads_per_worker=1, memory_limit="4G", processes=True
)

client = Client(cluster)
2025-03-17 13:24:02,343 [INFO]: scheduler.py(__init__:1754) >> State start
2025-03-17 13:24:02,343 [INFO]: scheduler.py(__init__:1754) >> State start
2025-03-17 13:24:02,345 [INFO]: diskutils.py(_check_lock_or_purge:252) >> Found stale lock file and directory '/var/folders/_h/t3wvkks5643fxnv07_kx9cx8000zpt/T/dask-scratch-space/worker-f2s9iesu', purging
2025-03-17 13:24:02,345 [INFO]: diskutils.py(_check_lock_or_purge:252) >> Found stale lock file and directory '/var/folders/_h/t3wvkks5643fxnv07_kx9cx8000zpt/T/dask-scratch-space/worker-f2s9iesu', purging
2025-03-17 13:24:02,347 [INFO]: diskutils.py(_check_lock_or_purge:252) >> Found stale lock file and directory '/var/folders/_h/t3wvkks5643fxnv07_kx9cx8000zpt/T/dask-scratch-space/scheduler-7wi1xgzo', purging
2025-03-17 13:24:02,347 [INFO]: diskutils.py(_check_lock_or_purge:252) >> Found stale lock file and directory '/var/folders/_h/t3wvkks5643fxnv07_kx9cx8000zpt/T/dask-scratch-space/scheduler-7wi1xgzo', purging
2025-03-17 13:24:02,348 [INFO]: diskutils.py(_check_lock_or_purge:252) >> Found stale lock file and directory '/var/folders/_h/t3wvkks5643fxnv07_kx9cx8000zpt/T/dask-scratch-space/worker-p55nwy0y', purging
2025-03-17 13:24:02,348 [INFO]: diskutils.py(_check_lock_or_purge:252) >> Found stale lock file and directory '/var/folders/_h/t3wvkks5643fxnv07_kx9cx8000zpt/T/dask-scratch-space/worker-p55nwy0y', purging
2025-03-17 13:24:02,351 [INFO]: scheduler.py(start_unsafe:4209) >>   Scheduler at:     tcp://127.0.0.1:59680
2025-03-17 13:24:02,351 [INFO]: scheduler.py(start_unsafe:4209) >>   Scheduler at:     tcp://127.0.0.1:59680
2025-03-17 13:24:02,352 [INFO]: scheduler.py(start_unsafe:4224) >>   dashboard at:  http://127.0.0.1:8787/status
2025-03-17 13:24:02,352 [INFO]: scheduler.py(start_unsafe:4224) >>   dashboard at:  http://127.0.0.1:8787/status
2025-03-17 13:24:02,352 [INFO]: scheduler.py(register_worker_plugin:8109) >> Registering Worker plugin shuffle
2025-03-17 13:24:02,352 [INFO]: scheduler.py(register_worker_plugin:8109) >> Registering Worker plugin shuffle
2025-03-17 13:24:02,366 [INFO]: nanny.py(start_unsafe:368) >>         Start Nanny at: 'tcp://127.0.0.1:59683'
2025-03-17 13:24:02,366 [INFO]: nanny.py(start_unsafe:368) >>         Start Nanny at: 'tcp://127.0.0.1:59683'
2025-03-17 13:24:02,384 [INFO]: nanny.py(start_unsafe:368) >>         Start Nanny at: 'tcp://127.0.0.1:59685'
2025-03-17 13:24:02,384 [INFO]: nanny.py(start_unsafe:368) >>         Start Nanny at: 'tcp://127.0.0.1:59685'
2025-03-17 13:24:03,034 [INFO]: scheduler.py(add_worker:4563) >> Register worker addr: tcp://127.0.0.1:59689 name: 1
2025-03-17 13:24:03,034 [INFO]: scheduler.py(add_worker:4563) >> Register worker addr: tcp://127.0.0.1:59689 name: 1
2025-03-17 13:24:03,037 [INFO]: scheduler.py(handle_worker:6163) >> Starting worker compute stream, tcp://127.0.0.1:59689
2025-03-17 13:24:03,037 [INFO]: scheduler.py(handle_worker:6163) >> Starting worker compute stream, tcp://127.0.0.1:59689
2025-03-17 13:24:03,037 [INFO]: core.py(handle_stream:883) >> Starting established connection to tcp://127.0.0.1:59693
2025-03-17 13:24:03,037 [INFO]: core.py(handle_stream:883) >> Starting established connection to tcp://127.0.0.1:59693
2025-03-17 13:24:03,038 [INFO]: scheduler.py(add_worker:4563) >> Register worker addr: tcp://127.0.0.1:59688 name: 0
2025-03-17 13:24:03,038 [INFO]: scheduler.py(add_worker:4563) >> Register worker addr: tcp://127.0.0.1:59688 name: 0
2025-03-17 13:24:03,039 [INFO]: scheduler.py(handle_worker:6163) >> Starting worker compute stream, tcp://127.0.0.1:59688
2025-03-17 13:24:03,039 [INFO]: scheduler.py(handle_worker:6163) >> Starting worker compute stream, tcp://127.0.0.1:59688
2025-03-17 13:24:03,039 [INFO]: core.py(handle_stream:883) >> Starting established connection to tcp://127.0.0.1:59692
2025-03-17 13:24:03,039 [INFO]: core.py(handle_stream:883) >> Starting established connection to tcp://127.0.0.1:59692
2025-03-17 13:24:03,064 [INFO]: scheduler.py(add_client:5898) >> Receive client connection: Client-c4724026-036d-11f0-8543-e2398742200b
2025-03-17 13:24:03,064 [INFO]: scheduler.py(add_client:5898) >> Receive client connection: Client-c4724026-036d-11f0-8543-e2398742200b
2025-03-17 13:24:03,064 [INFO]: core.py(handle_stream:883) >> Starting established connection to tcp://127.0.0.1:59695
2025-03-17 13:24:03,064 [INFO]: core.py(handle_stream:883) >> Starting established connection to tcp://127.0.0.1:59695
2025-03-17 13:24:37,604 [INFO]: scheduler.py(remove_client:5943) >> Remove client Client-c4724026-036d-11f0-8543-e2398742200b
2025-03-17 13:24:37,604 [INFO]: scheduler.py(remove_client:5943) >> Remove client Client-c4724026-036d-11f0-8543-e2398742200b
2025-03-17 13:24:37,605 [INFO]: core.py(handle_stream:908) >> Received 'close-stream' from tcp://127.0.0.1:59695; closing.
2025-03-17 13:24:37,605 [INFO]: core.py(handle_stream:908) >> Received 'close-stream' from tcp://127.0.0.1:59695; closing.
2025-03-17 13:24:37,607 [INFO]: scheduler.py(remove_client:5943) >> Remove client Client-c4724026-036d-11f0-8543-e2398742200b
2025-03-17 13:24:37,607 [INFO]: scheduler.py(remove_client:5943) >> Remove client Client-c4724026-036d-11f0-8543-e2398742200b
2025-03-17 13:24:37,613 [INFO]: scheduler.py(add_client:5935) >> Close client connection: Client-c4724026-036d-11f0-8543-e2398742200b
2025-03-17 13:24:37,613 [INFO]: scheduler.py(add_client:5935) >> Close client connection: Client-c4724026-036d-11f0-8543-e2398742200b
[3]:
client.cluster

2. Open the Dask Dashboard UI#

The Dask distributed scheduler provides an interactive dashboard containing many plots and tables with live information.

Check this Dask documentation page to learn how to interpret the information. There is also a general guide later down in the notebook.

Here’s an example:

Dask Dashboard UI Example

3. Open a dataset with xCDAT and chunk it#

[5]:
ds = xc.tutorial.open_dataset("tas_amon_access", chunks="auto")

ds
[5]:
<xarray.Dataset> Size: 7MB
Dimensions:    (time: 60, bnds: 2, lat: 145, lon: 192)
Coordinates:
  * lat        (lat) float64 1kB -90.0 -88.75 -87.5 -86.25 ... 87.5 88.75 90.0
  * lon        (lon) float64 2kB 0.0 1.875 3.75 5.625 ... 354.4 356.2 358.1
    height     float64 8B ...
  * time       (time) object 480B 1870-01-16 12:00:00 ... 1874-12-16 12:00:00
Dimensions without coordinates: bnds
Data variables:
    time_bnds  (time, bnds) object 960B dask.array<chunksize=(60, 2), meta=np.ndarray>
    lat_bnds   (lat, bnds) float64 2kB dask.array<chunksize=(145, 2), meta=np.ndarray>
    lon_bnds   (lon, bnds) float64 3kB dask.array<chunksize=(192, 2), meta=np.ndarray>
    tas        (time, lat, lon) float32 7MB dask.array<chunksize=(60, 145, 192), meta=np.ndarray>
Attributes: (12/48)
    Conventions:                     CF-1.7 CMIP-6.2
    activity_id:                     CMIP
    branch_method:                   standard
    branch_time_in_child:            0.0
    branch_time_in_parent:           87658.0
    creation_date:                   2020-06-05T04:06:11Z
    ...                              ...
    variant_label:                   r10i1p1f1
    version:                         v20200605
    license:                         CMIP6 model data produced by CSIRO is li...
    cmor_version:                    3.4.0
    tracking_id:                     hdl:21.14100/af78ae5e-f3a6-4e99-8cfe-5f2...
    DODS_EXTRA.Unlimited_Dimension:  time

Notice how the data variables contain dask.array.

4. Run your computations while viewing the dashboards in your browser#

Bad signs to watch out for in the dashboard#

Taskstream plot:

  • Lots of white space in the task stream plot

    • Nothing is happening.

    • Chunks may be too small.

  • Lots and lots of red in the task stream plot

    • Represents worker communication.

    • Dask workers need some communication, but if they are doing almost nothing except communication then there is not much productive work going on.

Worker memory plot:

  • Orange bars bars which are a sign you are getting close to the memory limit.

    • Chunks may be too big.

  • Gray bars which mean data is being spilled to disk.

    • Chunks may be too big.

https://blog.dask.org/2021/11/02/choosing-dask-chunk-sizes

First, we queue up the lazy group_average operation in Dask#

[6]:
tas_avg = ds.temporal.group_average("tas", freq="month")

tas_avg
[6]:
<xarray.Dataset> Size: 13MB
Dimensions:   (lat: 145, bnds: 2, lon: 192, time: 60)
Coordinates:
  * lat       (lat) float64 1kB -90.0 -88.75 -87.5 -86.25 ... 87.5 88.75 90.0
  * lon       (lon) float64 2kB 0.0 1.875 3.75 5.625 ... 352.5 354.4 356.2 358.1
    height    float64 8B ...
  * time      (time) object 480B 1870-01-01 00:00:00 ... 1874-12-01 00:00:00
Dimensions without coordinates: bnds
Data variables:
    lat_bnds  (lat, bnds) float64 2kB dask.array<chunksize=(145, 2), meta=np.ndarray>
    lon_bnds  (lon, bnds) float64 3kB dask.array<chunksize=(192, 2), meta=np.ndarray>
    tas       (time, lat, lon) float64 13MB dask.array<chunksize=(1, 145, 192), meta=np.ndarray>
Attributes: (12/48)
    Conventions:                     CF-1.7 CMIP-6.2
    activity_id:                     CMIP
    branch_method:                   standard
    branch_time_in_child:            0.0
    branch_time_in_parent:           87658.0
    creation_date:                   2020-06-05T04:06:11Z
    ...                              ...
    variant_label:                   r10i1p1f1
    version:                         v20200605
    license:                         CMIP6 model data produced by CSIRO is li...
    cmor_version:                    3.4.0
    tracking_id:                     hdl:21.14100/af78ae5e-f3a6-4e99-8cfe-5f2...
    DODS_EXTRA.Unlimited_Dimension:  time

Now we can trigger the queued up operations in the Dask task graph using .compute() or .load()

  • .compute() - Manually trigger loading and/or computation of this dataset’s data from disk or a remote source into memory and return a new dataset. Unlike load, the original dataset is left unaltered.

  • .load() - Manually trigger loading and/or computation of this dataset’s data from disk or a remote source into memory and return this dataset. Unlike compute, the original dataset is modified and returned.

[7]:
# Or call tas_avg.load() to modify the original dataset.
tas_avg_res = tas_avg.compute()
/opt/miniconda3/envs/xcdat_notebook_dask/lib/python3.13/site-packages/esmpy/interface/loadESMF.py:94: VersionWarning: ESMF installation version 8.8.0, ESMPy version 8.8.0b0
  warnings.warn("ESMF installation version {}, ESMPy version {}".format(

Explicit close the Dask Client#

It will close automatically when closing the notebook or killing the Python session too.

[8]:
client.close()

That’s it! You just performed parallel computing using Dask on your local machine!#

If you are on an HPC environment where interactive jobs must be requested on compute nodes for larger scale computations, please follow the guide below.

Code Example - Parallelizing xCDAT Computations with Dask (HPC/Compute Node)#

On an HPC environment, you typically need to request a interactive job on a compute node(s) for your computational needs. The guide below shows you how to do just that.

1. Start an interactive job#

The first step if you’re an HPC environment is to start a Dask job to get nodes.

Depending on your HPC environment, you might need to run salloc, srun, etc. to start an interactive job. Please refer to the documentation for your HPC environment. Here’s the documentation from NERSC for Interactive Jobs.

In the example below on the NERSC Perlmutter machine, we’re requesting 1 CPU node from the reservation and divide the resources on that node among 10 Dask workers. If you’re working on a reservation, request the respective number of hours needed for your node (e.g., 4 hrs/240 mins). When you are working outside a reservation, its best to limit your requested time to your expected working window.

salloc --reservation=dask_day1 -C cpu -N 1 -n 10 -A ntrain5 -t 240

It may take a moment to start, but once it does, you’ll get your prompt back.

Processes or Threads?#

Python has the global interpreter lock (GIL), which basically means that Python does not leverage multiple threads well.

The general exceptions to this rule include code that is mainly I/O (e.g., downloading data) or code that leverages mostly C++ and other non-Python libraries (e.g., NumPy).

For most use cases, using processes over threads might make more sense. Here’s an example with dask worker:

  • I have 20 GB of RAM

  • I have 2 cores. So I want 2 workers.

    • This means each worker can consume 10 GB of RAM.

$ dask worker tcp://127.0.0.1:8786 --nworkers 2 --nthreads 1 --memory-limit 20GB

https://saturncloud.io/blog/local-cluster/

2. Start the Dask Cluster#

There are several ways to deploy a Dask Cluster. You can check them out here. For this section, we’ll be manually deploying a cluster in your HPC environment.

Open a terminal and start the Dask scheduler:

$ conda activate xcdat_notebook_dask
$ dask scheduler

The default address for the dask scheduler is tcp://127.0.0.1:8786.

Open a second terminal and start a Dask worker:

$ conda activate xcdat_notebook_dask
$ dask worker tcp://127.0.0.1:8786 --nworkers 2 --nthreads=1 --memory-limit 20GB

3. Connect the Dask Client to the Cluster#

[9]:
# For the purpose of this notebook, ignore the VersionMismatchWarning if you get
# one.
# Related issue: https://github.com/dask/distributed/issues/3767
client_hpc = Client("tcp://127.0.0.1:8786")

You should see this output in your dask scheduler terminal:

2024-04-30 10:24:42,155 - distributed.core - INFO - Starting established connection to tcp://127.0.0.1:47522
2024-04-30 10:24:44,609 - distributed.scheduler - INFO - Receive client connection: Client-894d078d-0716-11ef-9ffd-f4e9d4af2192
2024-04-30 10:24:44,610 - distributed.core - INFO - Starting established connection to tcp://127.0.0.1:47748
[10]:
client_hpc
[10]:

Client

Client-0187f5e6-036e-11f0-8543-e2398742200b

Connection method: Direct
Dashboard: http://127.0.0.1:59876/status

Scheduler Info

Scheduler

Scheduler-2231af6a-c89f-4001-9c7e-6bddfc517811

Comm: tcp://192.168.1.102:8786 Workers: 2
Dashboard: http://192.168.1.102:59876/status Total threads: 2
Started: Just now Total memory: 32.00 GiB

Workers

Worker: tcp://127.0.0.1:59914

Comm: tcp://127.0.0.1:59914 Total threads: 1
Dashboard: http://127.0.0.1:59917/status Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:59911
Local directory: /var/folders/_h/t3wvkks5643fxnv07_kx9cx8000zpt/T/dask-scratch-space/worker-pt1h62s2
Tasks executing: Tasks in memory:
Tasks ready: Tasks in flight:
CPU usage: 2.0% Last seen: Just now
Memory usage: 145.27 MiB Spilled bytes: 0 B
Read bytes: 17.97 kiB Write bytes: 329.44 kiB

Worker: tcp://127.0.0.1:59915

Comm: tcp://127.0.0.1:59915 Total threads: 1
Dashboard: http://127.0.0.1:59916/status Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:59909
Local directory: /var/folders/_h/t3wvkks5643fxnv07_kx9cx8000zpt/T/dask-scratch-space/worker-f1czipzq
Tasks executing: Tasks in memory:
Tasks ready: Tasks in flight:
CPU usage: 2.1% Last seen: Just now
Memory usage: 145.34 MiB Spilled bytes: 0 B
Read bytes: 17.97 kiB Write bytes: 329.53 kiB

4. Perform Computations#

[11]:
ds_hpc = xc.tutorial.open_dataset("tas_amon_access", chunks="auto")

tas_avg_hpc = ds.temporal.group_average("tas", freq="month")
[12]:
tas_avg_hpc_res = tas_avg_hpc.compute()

NOTE: You might see these warnings:

From the code cell above:

UserWarning: Sending large graph of size 426.18 MiB.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.

In the dask worker terminal:

2024-04-30 10:26:04,806 - distributed.core - INFO - Event loop was unresponsive in Worker for 8.60s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

These warnings indicate sub-optimal Dask configurations based on the available resources, non-appropriate usage of Dask for parallelizing (e.g., very small files) , etc. As noted earlier, we are using a small dataset in the example for ease of download, which is not necessarily the best use of Dask.

More Resources#

To learn more in-depth about Dask and Xarray, please check these resources out:

FAQs#

Are there any other optimizations tips for working with Dask and Xarray?#

We HIGHLY recommend checking out the Optimization Tips section if you are using Dask with Xarray

Are there cases where xCDAT loads Dask arrays into memory?#

As of xarray=2023.5.0, Xarray does not support updating/setting multi-dimensional dask arrays. The following error is raised if this is attempted: xarray can't set arrays with multiple array indices to dask yet.

As a workaround, xCDAT loads coordinate bounds into memory if they are multi-dimensional Dask arrays before performing operations or computations. This loading occurs in the following APIs:

  • xcdat.axis.swap_lon_axis

    • swapping longitude axis orientation

    • aligning longitude bounds to (0, 360) axis

  • xarray.Dataset.spatial.average

    • generating weights using lat/lon coordinate bounds

    • swapping longitude axis orientation

    • scaling domain bounds to a specified region

  • xcdat.Dataset.temporal.<average|group_average|climatology|departures>

    • generating weights using time coordinate bounds