Benchmarking a data-intensive operation#

We’re going to compute the zonal, time average of a product of two high-frequency fields from the CFSR dataset.

Specifically we will compute

\[[\overline{vT}]\]

for the month of January (one single year)

I want to compare various ways of accelerating the calculation.

Compute environment#

I logged into https://jupyterlab.arcc.albany.edu, and spawned on batch with 4 cores and 32 GB memory

This resource is open to anyone with UAlbany login credentials.

Python kernel is daes_nov22.

Eager execution in memory#

First we do the “normal” style: open the datasets, execute the calculation immediately

import xarray as xr

files = ['/network/daes/cfsr/data/2021/v.2021.0p5.anl.nc',
         '/network/daes/cfsr/data/2021/t.2021.0p5.anl.nc']

v_ds = xr.open_dataset(files[0])
T_ds = xr.open_dataset(files[1])
v_ds
<xarray.Dataset>
Dimensions:  (time: 1460, lat: 361, lon: 720, lev: 32)
Coordinates:
  * time     (time) datetime64[ns] 2021-01-01 ... 2021-12-31T18:00:00
  * lat      (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0
  * lon      (lon) float32 -180.0 -179.5 -179.0 -178.5 ... 178.5 179.0 179.5
  * lev      (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0
Data variables:
    v        (time, lev, lat, lon) float32 ...
Attributes:
    description:    v 1000-10 hPa
    year:           2021
    source:         http://nomads.ncdc.noaa.gov/data.php?name=access#CFSR-data
    references:     Saha, et. al., (2010)
    created_by:     User: ab473731
    creation_date:  Sat Jan  2 06:00:41 UTC 2021

Computing the product \(vT\) over one month involves taking the product of two arrays of size

124 x 32 x 361 x 720

The total number of grid points for the whole year is

v_ds.time.size * v_ds.lev.size * v_ds.lat.size * v_ds.lon.size
12143462400

The month of January alone is about \(1\times 10^9\) points.

Using 32-bit real numbers (4 bytes per numbers), this means we’re taking the product of two arrays of size around 4 GB each.

Timing the eager execution#

%time (v_ds.v.groupby(v_ds.time.dt.month)[1] * T_ds.t.groupby(T_ds.time.dt.month)[1]).mean(dim=('lon','time'))
CPU times: user 38 s, sys: 6.03 s, total: 44 s
Wall time: 46.1 s
<xarray.DataArray (lev: 32, lat: 361)>
array([[-5.6487140e-03, -4.9008570e+00,  9.0137497e+01, ...,
        -2.2839991e+01, -1.6109011e+01, -5.6033172e-03],
       [-5.1496424e-02, -4.9253478e+00,  8.9656609e+01, ...,
        -1.0113486e+01, -8.7076464e+00, -5.2921850e-02],
       [-7.3452897e-02, -4.9121408e+00,  8.9197853e+01, ...,
         8.5336580e+00, -6.0262263e-01, -6.8927042e-02],
       ...,
       [-2.8627560e-01, -2.4832463e-01, -1.9483636e-01, ...,
         1.4787066e+01,  8.3816442e+00, -2.6661530e-01],
       [ 2.6218587e-01,  6.6386479e-01,  9.0554982e-01, ...,
         3.2305588e+01,  1.7912203e+01,  2.3270084e-01],
       [-7.3782736e-01,  1.6531569e-01,  1.1146791e+00, ...,
         9.4755917e+00,  6.2672334e+00, -6.2244219e-01]], dtype=float32)
Coordinates:
  * lat      (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0
  * lev      (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0
v_ds.close()
T_ds.close()

Lazy execution using Dask locally#

ds = xr.open_mfdataset(files, chunks={'time':30, 'lev': 1})
ds
<xarray.Dataset>
Dimensions:  (time: 1460, lat: 361, lon: 720, lev: 32)
Coordinates:
  * time     (time) datetime64[ns] 2021-01-01 ... 2021-12-31T18:00:00
  * lat      (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0
  * lon      (lon) float32 -180.0 -179.5 -179.0 -178.5 ... 178.5 179.0 179.5
  * lev      (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0
Data variables:
    t        (time, lev, lat, lon) float32 dask.array<chunksize=(30, 1, 361, 720), meta=np.ndarray>
    v        (time, lev, lat, lon) float32 dask.array<chunksize=(30, 1, 361, 720), meta=np.ndarray>
Attributes:
    description:    t 1000-10 hPa
    year:           2021
    source:         http://nomads.ncdc.noaa.gov/data.php?name=access#CFSR-data
    references:     Saha, et. al., (2010)
    created_by:     User: ab473731
    creation_date:  Sat Jan  2 06:00:24 UTC 2021
vT = ds.v * ds.t
meanfield = vT.mean(dim='lon').groupby(ds.time.dt.month).mean()
meanfield
<xarray.DataArray (month: 12, lev: 32, lat: 361)>
dask.array<transpose, shape=(12, 32, 361), dtype=float32, chunksize=(1, 1, 361), chunktype=numpy.ndarray>
Coordinates:
  * lat      (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0
  * lev      (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0
  * month    (month) int64 1 2 3 4 5 6 7 8 9 10 11 12
%time meanfield.sel(month=1).load()
CPU times: user 49.3 s, sys: 3.27 s, total: 52.5 s
Wall time: 43.1 s
<xarray.DataArray (lev: 32, lat: 361)>
array([[-5.64870983e-03, -4.90085554e+00,  9.01375198e+01, ...,
        -2.28399925e+01, -1.61090107e+01, -5.60331950e-03],
       [-5.14964201e-02, -4.92534685e+00,  8.96566238e+01, ...,
        -1.01134815e+01, -8.70764828e+00, -5.29218502e-02],
       [-7.34528974e-02, -4.91214132e+00,  8.91978302e+01, ...,
         8.53365612e+00, -6.02623105e-01, -6.89270422e-02],
       ...,
       [-2.86275655e-01, -2.48324722e-01, -1.94836378e-01, ...,
         1.47870646e+01,  8.38164520e+00, -2.66615331e-01],
       [ 2.62185901e-01,  6.63864732e-01,  9.05549943e-01, ...,
         3.23055954e+01,  1.79122047e+01,  2.32700869e-01],
       [-7.37827361e-01,  1.65315807e-01,  1.11467922e+00, ...,
         9.47559261e+00,  6.26723337e+00, -6.22442186e-01]], dtype=float32)
Coordinates:
  * lat      (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0
  * lev      (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0
    month    int64 1
ds.close()

Conclusion#

On this particular system, we seem to get about the same performance either way – giving advantage to the lazy method, because the code is nicer to work with.

Lazy execution using Dask and a distributed cluster#

Now we’ll try the same thing but farm the computation out to a dask cluster.

from dask_jobqueue import SLURMCluster
from dask.distributed import Client

cluster = SLURMCluster(processes=4,  #By default Dask will run one Python process per job. 
                       # However, you can optionally choose to cut up that job into multiple processes
                       cores=80, # size of a single job -- typically one node of the HPC cluster
                       memory="376.2GB",  # the max memory on the 80-core batch nodes, I believe
                       walltime="01:00:00",
                       queue="batch",
                      )
cluster.scale(1)
client = Client(cluster)
client

Client

Client-cc027e39-66f3-11ed-936c-80000086fe80

Connection method: Cluster object Cluster type: dask_jobqueue.SLURMCluster
Dashboard: http://169.226.65.166:8787/status

Cluster Info

Effects of different chunking#

ds = xr.open_mfdataset(files, chunks={'time':30, 'lev': 1})
vT = ds.v * ds.t
vT
<xarray.DataArray (time: 1460, lev: 32, lat: 361, lon: 720)>
dask.array<mul, shape=(1460, 32, 361, 720), dtype=float32, chunksize=(30, 1, 361, 720), chunktype=numpy.ndarray>
Coordinates:
  * time     (time) datetime64[ns] 2021-01-01 ... 2021-12-31T18:00:00
  * lat      (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0
  * lon      (lon) float32 -180.0 -179.5 -179.0 -178.5 ... 178.5 179.0 179.5
  * lev      (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0
meanfield = vT.mean(dim='lon').groupby(ds.time.dt.month).mean()
meanfield
<xarray.DataArray (month: 12, lev: 32, lat: 361)>
dask.array<transpose, shape=(12, 32, 361), dtype=float32, chunksize=(1, 1, 361), chunktype=numpy.ndarray>
Coordinates:
  * lat      (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0
  * lev      (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0
  * month    (month) int64 1 2 3 4 5 6 7 8 9 10 11 12
%time meanfield.sel(month=1).load()
CPU times: user 994 ms, sys: 224 ms, total: 1.22 s
Wall time: 18.4 s
<xarray.DataArray (lev: 32, lat: 361)>
array([[-5.64870983e-03, -4.90085554e+00,  9.01375198e+01, ...,
        -2.28399925e+01, -1.61090107e+01, -5.60331950e-03],
       [-5.14964201e-02, -4.92534685e+00,  8.96566238e+01, ...,
        -1.01134815e+01, -8.70764828e+00, -5.29218502e-02],
       [-7.34528974e-02, -4.91214132e+00,  8.91978302e+01, ...,
         8.53365612e+00, -6.02623105e-01, -6.89270422e-02],
       ...,
       [-2.86275655e-01, -2.48324722e-01, -1.94836378e-01, ...,
         1.47870646e+01,  8.38164520e+00, -2.66615331e-01],
       [ 2.62185901e-01,  6.63864732e-01,  9.05549943e-01, ...,
         3.23055954e+01,  1.79122047e+01,  2.32700869e-01],
       [-7.37827361e-01,  1.65315807e-01,  1.11467922e+00, ...,
         9.47559261e+00,  6.26723337e+00, -6.22442186e-01]], dtype=float32)
Coordinates:
  * lat      (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0
  * lev      (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0
    month    int64 1
ds.close()

Same thing, different chunking#

ds = xr.open_mfdataset(files, chunks={'time':10, 'lev': 32})
meanfield = (ds.v*ds.t).mean(dim='lon').groupby(ds.time.dt.month).mean()
meanfield
<xarray.DataArray (month: 12, lev: 32, lat: 361)>
dask.array<transpose, shape=(12, 32, 361), dtype=float32, chunksize=(1, 32, 361), chunktype=numpy.ndarray>
Coordinates:
  * lat      (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0
  * lev      (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0
  * month    (month) int64 1 2 3 4 5 6 7 8 9 10 11 12
%time meanfield.sel(month=1).load()
CPU times: user 261 ms, sys: 62.7 ms, total: 324 ms
Wall time: 20.2 s
<xarray.DataArray (lev: 32, lat: 361)>
array([[-5.6487117e-03, -4.9008560e+00,  9.0137527e+01, ...,
        -2.2839994e+01, -1.6109011e+01, -5.6033176e-03],
       [-5.1496424e-02, -4.9253478e+00,  8.9656616e+01, ...,
        -1.0113482e+01, -8.7076483e+00, -5.2921850e-02],
       [-7.3452897e-02, -4.9121413e+00,  8.9197830e+01, ...,
         8.5336571e+00, -6.0262257e-01, -6.8927050e-02],
       ...,
       [-2.8627566e-01, -2.4832460e-01, -1.9483612e-01, ...,
         1.4787064e+01,  8.3816442e+00, -2.6661530e-01],
       [ 2.6218590e-01,  6.6386473e-01,  9.0554994e-01, ...,
         3.2305595e+01,  1.7912203e+01,  2.3270084e-01],
       [-7.3782736e-01,  1.6531578e-01,  1.1146792e+00, ...,
         9.4755926e+00,  6.2672329e+00, -6.2244225e-01]], dtype=float32)
Coordinates:
  * lat      (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0
  * lev      (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0
    month    int64 1
ds.close()

And yet another chunking#

ds = xr.open_mfdataset(files, chunks={'time':124, 'lev': 32})
vT = ds.v * ds.t
vT
<xarray.DataArray (time: 1460, lev: 32, lat: 361, lon: 720)>
dask.array<mul, shape=(1460, 32, 361, 720), dtype=float32, chunksize=(124, 32, 361, 720), chunktype=numpy.ndarray>
Coordinates:
  * time     (time) datetime64[ns] 2021-01-01 ... 2021-12-31T18:00:00
  * lat      (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0
  * lon      (lon) float32 -180.0 -179.5 -179.0 -178.5 ... 178.5 179.0 179.5
  * lev      (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0
meanfield = vT.mean(dim='lon').groupby(ds.time.dt.month).mean()
meanfield
<xarray.DataArray (month: 12, lev: 32, lat: 361)>
dask.array<transpose, shape=(12, 32, 361), dtype=float32, chunksize=(2, 32, 361), chunktype=numpy.ndarray>
Coordinates:
  * lat      (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0
  * lev      (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0
  * month    (month) int64 1 2 3 4 5 6 7 8 9 10 11 12
%time meanfield.sel(month=1).load()
CPU times: user 475 ms, sys: 119 ms, total: 593 ms
Wall time: 45 s
<xarray.DataArray (lev: 32, lat: 361)>
array([[-5.6487140e-03, -4.9008555e+00,  9.0137527e+01, ...,
        -2.2839994e+01, -1.6109011e+01, -5.6033167e-03],
       [-5.1496420e-02, -4.9253469e+00,  8.9656624e+01, ...,
        -1.0113482e+01, -8.7076483e+00, -5.2921850e-02],
       [-7.3452890e-02, -4.9121413e+00,  8.9197838e+01, ...,
         8.5336561e+00, -6.0262299e-01, -6.8927050e-02],
       ...,
       [-2.8627566e-01, -2.4832465e-01, -1.9483635e-01, ...,
         1.4787064e+01,  8.3816452e+00, -2.6661530e-01],
       [ 2.6218587e-01,  6.6386473e-01,  9.0554988e-01, ...,
         3.2305595e+01,  1.7912203e+01,  2.3270085e-01],
       [-7.3782736e-01,  1.6531581e-01,  1.1146793e+00, ...,
         9.4755907e+00,  6.2672334e+00, -6.2244219e-01]], dtype=float32)
Coordinates:
  * lat      (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0
  * lev      (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0
    month    int64 1
ds.close()

Results#

The smallest wall time for this calculation was achieved using the distributed cluster and a chunking strategy chunks={'time':10, 'lev': 32}.

For this one-month calculation, using the distributed cluster reduces our wall time from about 50 s down to about 20 s.

That’s close to a factor of three speed-up.

Can we do better through more intelligent chunking and/or fine-tuning of the distributed cluster? Probably.

cluster.close()
2022-11-17 22:49:54,749 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client