Benchmarking a data-intensive operation#
We’re going to compute the zonal, time average of a product of two high-frequency fields from the CFSR dataset.
Specifically we will compute
for the month of January (one single year)
I want to compare various ways of accelerating the calculation.
Compute environment#
I logged into https://jupyterlab.arcc.albany.edu, and spawned on batch
with 4 cores and 32 GB memory
This resource is open to anyone with UAlbany login credentials.
Python kernel is daes_nov22
.
Eager execution in memory#
First we do the “normal” style: open the datasets, execute the calculation immediately
import xarray as xr
files = ['/network/daes/cfsr/data/2021/v.2021.0p5.anl.nc',
'/network/daes/cfsr/data/2021/t.2021.0p5.anl.nc']
v_ds = xr.open_dataset(files[0])
T_ds = xr.open_dataset(files[1])
v_ds
<xarray.Dataset> Dimensions: (time: 1460, lat: 361, lon: 720, lev: 32) Coordinates: * time (time) datetime64[ns] 2021-01-01 ... 2021-12-31T18:00:00 * lat (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0 * lon (lon) float32 -180.0 -179.5 -179.0 -178.5 ... 178.5 179.0 179.5 * lev (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0 Data variables: v (time, lev, lat, lon) float32 ... Attributes: description: v 1000-10 hPa year: 2021 source: http://nomads.ncdc.noaa.gov/data.php?name=access#CFSR-data references: Saha, et. al., (2010) created_by: User: ab473731 creation_date: Sat Jan 2 06:00:41 UTC 2021
Computing the product \(vT\) over one month involves taking the product of two arrays of size
124 x 32 x 361 x 720
The total number of grid points for the whole year is
v_ds.time.size * v_ds.lev.size * v_ds.lat.size * v_ds.lon.size
12143462400
The month of January alone is about \(1\times 10^9\) points.
Using 32-bit real numbers (4 bytes per numbers), this means we’re taking the product of two arrays of size around 4 GB each.
Timing the eager execution#
%time (v_ds.v.groupby(v_ds.time.dt.month)[1] * T_ds.t.groupby(T_ds.time.dt.month)[1]).mean(dim=('lon','time'))
CPU times: user 38 s, sys: 6.03 s, total: 44 s
Wall time: 46.1 s
<xarray.DataArray (lev: 32, lat: 361)> array([[-5.6487140e-03, -4.9008570e+00, 9.0137497e+01, ..., -2.2839991e+01, -1.6109011e+01, -5.6033172e-03], [-5.1496424e-02, -4.9253478e+00, 8.9656609e+01, ..., -1.0113486e+01, -8.7076464e+00, -5.2921850e-02], [-7.3452897e-02, -4.9121408e+00, 8.9197853e+01, ..., 8.5336580e+00, -6.0262263e-01, -6.8927042e-02], ..., [-2.8627560e-01, -2.4832463e-01, -1.9483636e-01, ..., 1.4787066e+01, 8.3816442e+00, -2.6661530e-01], [ 2.6218587e-01, 6.6386479e-01, 9.0554982e-01, ..., 3.2305588e+01, 1.7912203e+01, 2.3270084e-01], [-7.3782736e-01, 1.6531569e-01, 1.1146791e+00, ..., 9.4755917e+00, 6.2672334e+00, -6.2244219e-01]], dtype=float32) Coordinates: * lat (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0 * lev (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0
v_ds.close()
T_ds.close()
Lazy execution using Dask locally#
ds = xr.open_mfdataset(files, chunks={'time':30, 'lev': 1})
ds
<xarray.Dataset> Dimensions: (time: 1460, lat: 361, lon: 720, lev: 32) Coordinates: * time (time) datetime64[ns] 2021-01-01 ... 2021-12-31T18:00:00 * lat (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0 * lon (lon) float32 -180.0 -179.5 -179.0 -178.5 ... 178.5 179.0 179.5 * lev (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0 Data variables: t (time, lev, lat, lon) float32 dask.array<chunksize=(30, 1, 361, 720), meta=np.ndarray> v (time, lev, lat, lon) float32 dask.array<chunksize=(30, 1, 361, 720), meta=np.ndarray> Attributes: description: t 1000-10 hPa year: 2021 source: http://nomads.ncdc.noaa.gov/data.php?name=access#CFSR-data references: Saha, et. al., (2010) created_by: User: ab473731 creation_date: Sat Jan 2 06:00:24 UTC 2021
vT = ds.v * ds.t
meanfield = vT.mean(dim='lon').groupby(ds.time.dt.month).mean()
meanfield
<xarray.DataArray (month: 12, lev: 32, lat: 361)> dask.array<transpose, shape=(12, 32, 361), dtype=float32, chunksize=(1, 1, 361), chunktype=numpy.ndarray> Coordinates: * lat (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0 * lev (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0 * month (month) int64 1 2 3 4 5 6 7 8 9 10 11 12
%time meanfield.sel(month=1).load()
CPU times: user 49.3 s, sys: 3.27 s, total: 52.5 s
Wall time: 43.1 s
<xarray.DataArray (lev: 32, lat: 361)> array([[-5.64870983e-03, -4.90085554e+00, 9.01375198e+01, ..., -2.28399925e+01, -1.61090107e+01, -5.60331950e-03], [-5.14964201e-02, -4.92534685e+00, 8.96566238e+01, ..., -1.01134815e+01, -8.70764828e+00, -5.29218502e-02], [-7.34528974e-02, -4.91214132e+00, 8.91978302e+01, ..., 8.53365612e+00, -6.02623105e-01, -6.89270422e-02], ..., [-2.86275655e-01, -2.48324722e-01, -1.94836378e-01, ..., 1.47870646e+01, 8.38164520e+00, -2.66615331e-01], [ 2.62185901e-01, 6.63864732e-01, 9.05549943e-01, ..., 3.23055954e+01, 1.79122047e+01, 2.32700869e-01], [-7.37827361e-01, 1.65315807e-01, 1.11467922e+00, ..., 9.47559261e+00, 6.26723337e+00, -6.22442186e-01]], dtype=float32) Coordinates: * lat (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0 * lev (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0 month int64 1
ds.close()
Conclusion#
On this particular system, we seem to get about the same performance either way – giving advantage to the lazy method, because the code is nicer to work with.
Lazy execution using Dask and a distributed cluster#
Now we’ll try the same thing but farm the computation out to a dask cluster.
from dask_jobqueue import SLURMCluster
from dask.distributed import Client
cluster = SLURMCluster(processes=4, #By default Dask will run one Python process per job.
# However, you can optionally choose to cut up that job into multiple processes
cores=80, # size of a single job -- typically one node of the HPC cluster
memory="376.2GB", # the max memory on the 80-core batch nodes, I believe
walltime="01:00:00",
queue="batch",
)
cluster.scale(1)
client = Client(cluster)
client
Client
Client-cc027e39-66f3-11ed-936c-80000086fe80
Connection method: Cluster object | Cluster type: dask_jobqueue.SLURMCluster |
Dashboard: http://169.226.65.166:8787/status |
Cluster Info
SLURMCluster
9664ea60
Dashboard: http://169.226.65.166:8787/status | Workers: 0 |
Total threads: 0 | Total memory: 0 B |
Scheduler Info
Scheduler
Scheduler-f8b790f3-f390-4d98-b539-c12bce77e44e
Comm: tcp://169.226.65.166:33445 | Workers: 0 |
Dashboard: http://169.226.65.166:8787/status | Total threads: 0 |
Started: Just now | Total memory: 0 B |
Workers
Effects of different chunking#
ds = xr.open_mfdataset(files, chunks={'time':30, 'lev': 1})
vT = ds.v * ds.t
vT
<xarray.DataArray (time: 1460, lev: 32, lat: 361, lon: 720)> dask.array<mul, shape=(1460, 32, 361, 720), dtype=float32, chunksize=(30, 1, 361, 720), chunktype=numpy.ndarray> Coordinates: * time (time) datetime64[ns] 2021-01-01 ... 2021-12-31T18:00:00 * lat (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0 * lon (lon) float32 -180.0 -179.5 -179.0 -178.5 ... 178.5 179.0 179.5 * lev (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0
meanfield = vT.mean(dim='lon').groupby(ds.time.dt.month).mean()
meanfield
<xarray.DataArray (month: 12, lev: 32, lat: 361)> dask.array<transpose, shape=(12, 32, 361), dtype=float32, chunksize=(1, 1, 361), chunktype=numpy.ndarray> Coordinates: * lat (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0 * lev (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0 * month (month) int64 1 2 3 4 5 6 7 8 9 10 11 12
%time meanfield.sel(month=1).load()
CPU times: user 994 ms, sys: 224 ms, total: 1.22 s
Wall time: 18.4 s
<xarray.DataArray (lev: 32, lat: 361)> array([[-5.64870983e-03, -4.90085554e+00, 9.01375198e+01, ..., -2.28399925e+01, -1.61090107e+01, -5.60331950e-03], [-5.14964201e-02, -4.92534685e+00, 8.96566238e+01, ..., -1.01134815e+01, -8.70764828e+00, -5.29218502e-02], [-7.34528974e-02, -4.91214132e+00, 8.91978302e+01, ..., 8.53365612e+00, -6.02623105e-01, -6.89270422e-02], ..., [-2.86275655e-01, -2.48324722e-01, -1.94836378e-01, ..., 1.47870646e+01, 8.38164520e+00, -2.66615331e-01], [ 2.62185901e-01, 6.63864732e-01, 9.05549943e-01, ..., 3.23055954e+01, 1.79122047e+01, 2.32700869e-01], [-7.37827361e-01, 1.65315807e-01, 1.11467922e+00, ..., 9.47559261e+00, 6.26723337e+00, -6.22442186e-01]], dtype=float32) Coordinates: * lat (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0 * lev (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0 month int64 1
ds.close()
Same thing, different chunking#
ds = xr.open_mfdataset(files, chunks={'time':10, 'lev': 32})
meanfield = (ds.v*ds.t).mean(dim='lon').groupby(ds.time.dt.month).mean()
meanfield
<xarray.DataArray (month: 12, lev: 32, lat: 361)> dask.array<transpose, shape=(12, 32, 361), dtype=float32, chunksize=(1, 32, 361), chunktype=numpy.ndarray> Coordinates: * lat (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0 * lev (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0 * month (month) int64 1 2 3 4 5 6 7 8 9 10 11 12
%time meanfield.sel(month=1).load()
CPU times: user 261 ms, sys: 62.7 ms, total: 324 ms
Wall time: 20.2 s
<xarray.DataArray (lev: 32, lat: 361)> array([[-5.6487117e-03, -4.9008560e+00, 9.0137527e+01, ..., -2.2839994e+01, -1.6109011e+01, -5.6033176e-03], [-5.1496424e-02, -4.9253478e+00, 8.9656616e+01, ..., -1.0113482e+01, -8.7076483e+00, -5.2921850e-02], [-7.3452897e-02, -4.9121413e+00, 8.9197830e+01, ..., 8.5336571e+00, -6.0262257e-01, -6.8927050e-02], ..., [-2.8627566e-01, -2.4832460e-01, -1.9483612e-01, ..., 1.4787064e+01, 8.3816442e+00, -2.6661530e-01], [ 2.6218590e-01, 6.6386473e-01, 9.0554994e-01, ..., 3.2305595e+01, 1.7912203e+01, 2.3270084e-01], [-7.3782736e-01, 1.6531578e-01, 1.1146792e+00, ..., 9.4755926e+00, 6.2672329e+00, -6.2244225e-01]], dtype=float32) Coordinates: * lat (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0 * lev (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0 month int64 1
ds.close()
And yet another chunking#
ds = xr.open_mfdataset(files, chunks={'time':124, 'lev': 32})
vT = ds.v * ds.t
vT
<xarray.DataArray (time: 1460, lev: 32, lat: 361, lon: 720)> dask.array<mul, shape=(1460, 32, 361, 720), dtype=float32, chunksize=(124, 32, 361, 720), chunktype=numpy.ndarray> Coordinates: * time (time) datetime64[ns] 2021-01-01 ... 2021-12-31T18:00:00 * lat (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0 * lon (lon) float32 -180.0 -179.5 -179.0 -178.5 ... 178.5 179.0 179.5 * lev (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0
meanfield = vT.mean(dim='lon').groupby(ds.time.dt.month).mean()
meanfield
<xarray.DataArray (month: 12, lev: 32, lat: 361)> dask.array<transpose, shape=(12, 32, 361), dtype=float32, chunksize=(2, 32, 361), chunktype=numpy.ndarray> Coordinates: * lat (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0 * lev (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0 * month (month) int64 1 2 3 4 5 6 7 8 9 10 11 12
%time meanfield.sel(month=1).load()
CPU times: user 475 ms, sys: 119 ms, total: 593 ms
Wall time: 45 s
<xarray.DataArray (lev: 32, lat: 361)> array([[-5.6487140e-03, -4.9008555e+00, 9.0137527e+01, ..., -2.2839994e+01, -1.6109011e+01, -5.6033167e-03], [-5.1496420e-02, -4.9253469e+00, 8.9656624e+01, ..., -1.0113482e+01, -8.7076483e+00, -5.2921850e-02], [-7.3452890e-02, -4.9121413e+00, 8.9197838e+01, ..., 8.5336561e+00, -6.0262299e-01, -6.8927050e-02], ..., [-2.8627566e-01, -2.4832465e-01, -1.9483635e-01, ..., 1.4787064e+01, 8.3816452e+00, -2.6661530e-01], [ 2.6218587e-01, 6.6386473e-01, 9.0554988e-01, ..., 3.2305595e+01, 1.7912203e+01, 2.3270085e-01], [-7.3782736e-01, 1.6531581e-01, 1.1146793e+00, ..., 9.4755907e+00, 6.2672334e+00, -6.2244219e-01]], dtype=float32) Coordinates: * lat (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0 * lev (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0 month int64 1
ds.close()
Results#
The smallest wall time for this calculation was achieved using the distributed cluster and a chunking strategy chunks={'time':10, 'lev': 32}
.
For this one-month calculation, using the distributed cluster reduces our wall time from about 50 s down to about 20 s.
That’s close to a factor of three speed-up.
Can we do better through more intelligent chunking and/or fine-tuning of the distributed cluster? Probably.
cluster.close()
2022-11-17 22:49:54,749 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client