Benchmarking a data-intensive operation (snow version)#
We’re going to compute the zonal, time average of a product of two high-frequency fields from the CFSR dataset.
Specifically we will compute
for the month of January (one single year)
I want to compare various ways of accelerating the calculation.
Compute environment#
I logged into https://jupyterlab.arcc.albany.edu, and spawned on snow with 8 cores and 64 GB memory
This resource is only open to snow group members
Python kernel is daes_nov22.
Eager execution in memory#
First we do the “normal” style: open the datasets, execute the calculation immediately
import xarray as xr
files = ['/network/daes/cfsr/data/2021/v.2021.0p5.anl.nc',
'/network/daes/cfsr/data/2021/t.2021.0p5.anl.nc']
v_ds = xr.open_dataset(files[0])
T_ds = xr.open_dataset(files[1])
v_ds
<xarray.Dataset>
Dimensions: (time: 1460, lat: 361, lon: 720, lev: 32)
Coordinates:
* time (time) datetime64[ns] 2021-01-01 ... 2021-12-31T18:00:00
* lat (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0
* lon (lon) float32 -180.0 -179.5 -179.0 -178.5 ... 178.5 179.0 179.5
* lev (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0
Data variables:
v (time, lev, lat, lon) float32 ...
Attributes:
description: v 1000-10 hPa
year: 2021
source: http://nomads.ncdc.noaa.gov/data.php?name=access#CFSR-data
references: Saha, et. al., (2010)
created_by: User: ab473731
creation_date: Sat Jan 2 06:00:41 UTC 2021Computing the product \(vT\) over one month involves taking the product of two arrays of size
124 x 32 x 361 x 720
The total number of grid points for the whole year is
v_ds.time.size * v_ds.lev.size * v_ds.lat.size * v_ds.lon.size
12143462400
The month of January alone is about \(1\times 10^9\) points.
Using 32-bit real numbers (4 bytes per numbers), this means we’re taking the product of two arrays of size around 4 GB each.
Timing the eager execution#
%time (v_ds.v.groupby(v_ds.time.dt.month)[1] * T_ds.t.groupby(T_ds.time.dt.month)[1]).mean(dim=('lon','time'))
CPU times: user 56.9 s, sys: 7.19 s, total: 1min 4s
Wall time: 1min 8s
<xarray.DataArray (lev: 32, lat: 361)>
array([[-5.6487140e-03, -4.9008570e+00, 9.0137497e+01, ...,
-2.2839991e+01, -1.6109011e+01, -5.6033172e-03],
[-5.1496424e-02, -4.9253478e+00, 8.9656609e+01, ...,
-1.0113486e+01, -8.7076464e+00, -5.2921850e-02],
[-7.3452897e-02, -4.9121408e+00, 8.9197853e+01, ...,
8.5336580e+00, -6.0262263e-01, -6.8927042e-02],
...,
[-2.8627560e-01, -2.4832463e-01, -1.9483636e-01, ...,
1.4787066e+01, 8.3816442e+00, -2.6661530e-01],
[ 2.6218587e-01, 6.6386479e-01, 9.0554982e-01, ...,
3.2305588e+01, 1.7912203e+01, 2.3270084e-01],
[-7.3782736e-01, 1.6531569e-01, 1.1146791e+00, ...,
9.4755917e+00, 6.2672334e+00, -6.2244219e-01]], dtype=float32)
Coordinates:
* lat (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0
* lev (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0v_ds.close()
T_ds.close()
Lazy execution using Dask locally#
ds = xr.open_mfdataset(files, chunks={'time':30, 'lev': 1})
ds
<xarray.Dataset>
Dimensions: (time: 1460, lat: 361, lon: 720, lev: 32)
Coordinates:
* time (time) datetime64[ns] 2021-01-01 ... 2021-12-31T18:00:00
* lat (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0
* lon (lon) float32 -180.0 -179.5 -179.0 -178.5 ... 178.5 179.0 179.5
* lev (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0
Data variables:
t (time, lev, lat, lon) float32 dask.array<chunksize=(30, 1, 361, 720), meta=np.ndarray>
v (time, lev, lat, lon) float32 dask.array<chunksize=(30, 1, 361, 720), meta=np.ndarray>
Attributes:
description: t 1000-10 hPa
year: 2021
source: http://nomads.ncdc.noaa.gov/data.php?name=access#CFSR-data
references: Saha, et. al., (2010)
created_by: User: ab473731
creation_date: Sat Jan 2 06:00:24 UTC 2021vT = ds.v * ds.t
meanfield = vT.mean(dim='lon').groupby(ds.time.dt.month).mean()
meanfield
<xarray.DataArray (month: 12, lev: 32, lat: 361)> dask.array<transpose, shape=(12, 32, 361), dtype=float32, chunksize=(1, 1, 361), chunktype=numpy.ndarray> Coordinates: * lat (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0 * lev (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0 * month (month) int64 1 2 3 4 5 6 7 8 9 10 11 12
%time meanfield.sel(month=1).load()
CPU times: user 1min 12s, sys: 5.86 s, total: 1min 18s
Wall time: 1min 5s
<xarray.DataArray (lev: 32, lat: 361)>
array([[-5.64870983e-03, -4.90085554e+00, 9.01375198e+01, ...,
-2.28399925e+01, -1.61090107e+01, -5.60331950e-03],
[-5.14964201e-02, -4.92534685e+00, 8.96566238e+01, ...,
-1.01134815e+01, -8.70764828e+00, -5.29218502e-02],
[-7.34528974e-02, -4.91214132e+00, 8.91978302e+01, ...,
8.53365612e+00, -6.02623105e-01, -6.89270422e-02],
...,
[-2.86275655e-01, -2.48324722e-01, -1.94836378e-01, ...,
1.47870646e+01, 8.38164520e+00, -2.66615331e-01],
[ 2.62185901e-01, 6.63864732e-01, 9.05549943e-01, ...,
3.23055954e+01, 1.79122047e+01, 2.32700869e-01],
[-7.37827361e-01, 1.65315807e-01, 1.11467922e+00, ...,
9.47559261e+00, 6.26723337e+00, -6.22442186e-01]], dtype=float32)
Coordinates:
* lat (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0
* lev (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0
month int64 1ds.close()
Conclusion#
Similar to running on batch, we get about the same performance either way – giving advantage to the lazy method, because the code is nicer to work with.
Snow is slower than batch – probably just because the cpu hardware is significantly older.
Lazy execution using Dask and a distributed cluster#
Now we’ll try the same thing but farm the computation out to a dask cluster.
from dask_jobqueue import SLURMCluster
from dask.distributed import Client
cluster = SLURMCluster(processes=8, #By default Dask will run one Python process per job.
# However, you can optionally choose to cut up that job into multiple processes
cores=32, # size of a single job -- typically one node of the HPC cluster
memory="256GB", # snow has 8 GB ram per physical core I believe
walltime="01:00:00",
queue="snow",
interface="ib0",
)
cluster.scale(1)
client = Client(cluster)
client
Client
Client-974a185b-66f6-11ed-96c3-80000208fe80
| Connection method: Cluster object | Cluster type: dask_jobqueue.SLURMCluster |
| Dashboard: http://10.77.8.107:8787/status |
Cluster Info
SLURMCluster
407d9434
| Dashboard: http://10.77.8.107:8787/status | Workers: 0 |
| Total threads: 0 | Total memory: 0 B |
Scheduler Info
Scheduler
Scheduler-04bf2212-5777-4b86-9a7d-beee2a36c4b4
| Comm: tcp://10.77.8.107:40409 | Workers: 0 |
| Dashboard: http://10.77.8.107:8787/status | Total threads: 0 |
| Started: Just now | Total memory: 0 B |
Workers
Effects of different chunking#
ds = xr.open_mfdataset(files, chunks={'time':30, 'lev': 1})
vT = ds.v * ds.t
vT
<xarray.DataArray (time: 1460, lev: 32, lat: 361, lon: 720)> dask.array<mul, shape=(1460, 32, 361, 720), dtype=float32, chunksize=(30, 1, 361, 720), chunktype=numpy.ndarray> Coordinates: * time (time) datetime64[ns] 2021-01-01 ... 2021-12-31T18:00:00 * lat (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0 * lon (lon) float32 -180.0 -179.5 -179.0 -178.5 ... 178.5 179.0 179.5 * lev (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0
meanfield = vT.mean(dim='lon').groupby(ds.time.dt.month).mean()
meanfield
<xarray.DataArray (month: 12, lev: 32, lat: 361)> dask.array<transpose, shape=(12, 32, 361), dtype=float32, chunksize=(1, 1, 361), chunktype=numpy.ndarray> Coordinates: * lat (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0 * lev (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0 * month (month) int64 1 2 3 4 5 6 7 8 9 10 11 12
%time meanfield.sel(month=1).load()
CPU times: user 2.5 s, sys: 455 ms, total: 2.96 s
Wall time: 41.9 s
<xarray.DataArray (lev: 32, lat: 361)>
array([[-5.64870983e-03, -4.90085554e+00, 9.01375198e+01, ...,
-2.28399925e+01, -1.61090107e+01, -5.60331950e-03],
[-5.14964201e-02, -4.92534685e+00, 8.96566238e+01, ...,
-1.01134815e+01, -8.70764828e+00, -5.29218502e-02],
[-7.34528974e-02, -4.91214132e+00, 8.91978302e+01, ...,
8.53365612e+00, -6.02623105e-01, -6.89270422e-02],
...,
[-2.86275655e-01, -2.48324722e-01, -1.94836378e-01, ...,
1.47870646e+01, 8.38164520e+00, -2.66615331e-01],
[ 2.62185901e-01, 6.63864732e-01, 9.05549943e-01, ...,
3.23055954e+01, 1.79122047e+01, 2.32700869e-01],
[-7.37827361e-01, 1.65315807e-01, 1.11467922e+00, ...,
9.47559261e+00, 6.26723337e+00, -6.22442186e-01]], dtype=float32)
Coordinates:
* lat (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0
* lev (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0
month int64 1ds.close()
Same thing, different chunking#
ds = xr.open_mfdataset(files, chunks={'time':10, 'lev': 32})
meanfield = (ds.v*ds.t).mean(dim='lon').groupby(ds.time.dt.month).mean()
meanfield
<xarray.DataArray (month: 12, lev: 32, lat: 361)> dask.array<transpose, shape=(12, 32, 361), dtype=float32, chunksize=(1, 32, 361), chunktype=numpy.ndarray> Coordinates: * lat (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0 * lev (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0 * month (month) int64 1 2 3 4 5 6 7 8 9 10 11 12
%time meanfield.sel(month=1).load()
CPU times: user 812 ms, sys: 120 ms, total: 932 ms
Wall time: 29.9 s
<xarray.DataArray (lev: 32, lat: 361)>
array([[-5.6487117e-03, -4.9008560e+00, 9.0137527e+01, ...,
-2.2839994e+01, -1.6109011e+01, -5.6033176e-03],
[-5.1496424e-02, -4.9253478e+00, 8.9656616e+01, ...,
-1.0113482e+01, -8.7076483e+00, -5.2921850e-02],
[-7.3452897e-02, -4.9121413e+00, 8.9197830e+01, ...,
8.5336571e+00, -6.0262257e-01, -6.8927050e-02],
...,
[-2.8627566e-01, -2.4832460e-01, -1.9483612e-01, ...,
1.4787064e+01, 8.3816442e+00, -2.6661530e-01],
[ 2.6218590e-01, 6.6386473e-01, 9.0554994e-01, ...,
3.2305595e+01, 1.7912203e+01, 2.3270084e-01],
[-7.3782736e-01, 1.6531578e-01, 1.1146792e+00, ...,
9.4755926e+00, 6.2672329e+00, -6.2244225e-01]], dtype=float32)
Coordinates:
* lat (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0
* lev (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0
month int64 1ds.close()
And yet another chunking#
ds = xr.open_mfdataset(files, chunks={'time':124, 'lev': 32})
vT = ds.v * ds.t
vT
<xarray.DataArray (time: 1460, lev: 32, lat: 361, lon: 720)> dask.array<mul, shape=(1460, 32, 361, 720), dtype=float32, chunksize=(124, 32, 361, 720), chunktype=numpy.ndarray> Coordinates: * time (time) datetime64[ns] 2021-01-01 ... 2021-12-31T18:00:00 * lat (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0 * lon (lon) float32 -180.0 -179.5 -179.0 -178.5 ... 178.5 179.0 179.5 * lev (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0
meanfield = vT.mean(dim='lon').groupby(ds.time.dt.month).mean()
meanfield
<xarray.DataArray (month: 12, lev: 32, lat: 361)> dask.array<transpose, shape=(12, 32, 361), dtype=float32, chunksize=(2, 32, 361), chunktype=numpy.ndarray> Coordinates: * lat (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0 * lev (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0 * month (month) int64 1 2 3 4 5 6 7 8 9 10 11 12
%time meanfield.sel(month=1).load()
CPU times: user 1.7 s, sys: 217 ms, total: 1.91 s
Wall time: 1min 13s
<xarray.DataArray (lev: 32, lat: 361)>
array([[-5.6487140e-03, -4.9008555e+00, 9.0137527e+01, ...,
-2.2839994e+01, -1.6109011e+01, -5.6033167e-03],
[-5.1496420e-02, -4.9253469e+00, 8.9656624e+01, ...,
-1.0113482e+01, -8.7076483e+00, -5.2921850e-02],
[-7.3452890e-02, -4.9121413e+00, 8.9197838e+01, ...,
8.5336561e+00, -6.0262299e-01, -6.8927050e-02],
...,
[-2.8627566e-01, -2.4832465e-01, -1.9483635e-01, ...,
1.4787064e+01, 8.3816452e+00, -2.6661530e-01],
[ 2.6218587e-01, 6.6386473e-01, 9.0554988e-01, ...,
3.2305595e+01, 1.7912203e+01, 2.3270085e-01],
[-7.3782736e-01, 1.6531581e-01, 1.1146793e+00, ...,
9.4755907e+00, 6.2672334e+00, -6.2244219e-01]], dtype=float32)
Coordinates:
* lat (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0
* lev (lev) float32 1e+03 975.0 950.0 925.0 900.0 ... 50.0 30.0 20.0 10.0
month int64 1ds.close()
Results#
Same as for batch, the smallest wall time for this calculation was achieved using the distributed cluster and a chunking strategy chunks={'time':10, 'lev': 32}.
For this one-month calculation, using the distributed cluster reduces our wall time from about 65 s down to about 30 s.
That’s about a factor of two speedup.
Compared to batch, snow is about 1.5x slower (30 seconds vs 20 seconds).
cluster.close()