# Benchmarking a data-intensive operation (snow version)

We're going to compute the zonal, time average of a product of two high-frequency fields from the CFSR dataset.

Specifically we will compute

$$[\overline{vT}]$$

for the month of January (one single year)

I want to compare various ways of accelerating the calculation.

## Compute environment

I logged into https://jupyterlab.arcc.albany.edu, and spawned on `snow` with 8 cores and 64 GB memory

This resource is only open to snow group members

Python kernel is `daes_nov22`.

## Eager execution in memory

First we do the "normal" style: open the datasets, execute the calculation immediately

In [1]:
import xarray as xr

files = ['/network/daes/cfsr/data/2021/v.2021.0p5.anl.nc',
         '/network/daes/cfsr/data/2021/t.2021.0p5.anl.nc']

v_ds = xr.open_dataset(files[0])
T_ds = xr.open_dataset(files[1])
v_ds

Computing the product $vT$ over one month involves taking the product of two arrays of size

```
124 x 32 x 361 x 720
```

The total number of grid points for the whole year is

In [2]:
v_ds.time.size * v_ds.lev.size * v_ds.lat.size * v_ds.lon.size

12143462400

The month of January alone is about $1\times 10^9$ points.

Using 32-bit real numbers (4 bytes per numbers), this means we're taking the product of two arrays of size around 4 GB each.

### Timing the eager execution

In [3]:
%time (v_ds.v.groupby(v_ds.time.dt.month)[1] * T_ds.t.groupby(T_ds.time.dt.month)[1]).mean(dim=('lon','time'))

CPU times: user 56.9 s, sys: 7.19 s, total: 1min 4s
Wall time: 1min 8s


In [4]:
v_ds.close()
T_ds.close()

## Lazy execution using Dask locally

In [5]:
ds = xr.open_mfdataset(files, chunks={'time':30, 'lev': 1})
ds

Unnamed: 0,Array,Chunk
Bytes,45.24 GiB,29.75 MiB
Shape,"(1460, 32, 361, 720)","(30, 1, 361, 720)"
Dask graph,1568 chunks in 2 graph layers,1568 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 45.24 GiB 29.75 MiB Shape (1460, 32, 361, 720) (30, 1, 361, 720) Dask graph 1568 chunks in 2 graph layers Data type float32 numpy.ndarray",1460  1  720  361  32,

Unnamed: 0,Array,Chunk
Bytes,45.24 GiB,29.75 MiB
Shape,"(1460, 32, 361, 720)","(30, 1, 361, 720)"
Dask graph,1568 chunks in 2 graph layers,1568 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,45.24 GiB,29.75 MiB
Shape,"(1460, 32, 361, 720)","(30, 1, 361, 720)"
Dask graph,1568 chunks in 2 graph layers,1568 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 45.24 GiB 29.75 MiB Shape (1460, 32, 361, 720) (30, 1, 361, 720) Dask graph 1568 chunks in 2 graph layers Data type float32 numpy.ndarray",1460  1  720  361  32,

Unnamed: 0,Array,Chunk
Bytes,45.24 GiB,29.75 MiB
Shape,"(1460, 32, 361, 720)","(30, 1, 361, 720)"
Dask graph,1568 chunks in 2 graph layers,1568 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [6]:
vT = ds.v * ds.t
meanfield = vT.mean(dim='lon').groupby(ds.time.dt.month).mean()
meanfield

Unnamed: 0,Array,Chunk
Bytes,541.50 kiB,1.41 kiB
Shape,"(12, 32, 361)","(1, 1, 361)"
Dask graph,384 chunks in 60 graph layers,384 chunks in 60 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 541.50 kiB 1.41 kiB Shape (12, 32, 361) (1, 1, 361) Dask graph 384 chunks in 60 graph layers Data type float32 numpy.ndarray",361  32  12,

Unnamed: 0,Array,Chunk
Bytes,541.50 kiB,1.41 kiB
Shape,"(12, 32, 361)","(1, 1, 361)"
Dask graph,384 chunks in 60 graph layers,384 chunks in 60 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [7]:
%time meanfield.sel(month=1).load()

CPU times: user 1min 12s, sys: 5.86 s, total: 1min 18s
Wall time: 1min 5s


In [8]:
ds.close()

### Conclusion

Similar to running on `batch`, we get about the same performance either way -- giving advantage to the lazy method, because the code is nicer to work with.

Snow is slower than batch -- probably just because the cpu hardware is significantly older.

## Lazy execution using Dask and a distributed cluster

Now we'll try the same thing but farm the computation out to a dask cluster.


In [9]:
from dask_jobqueue import SLURMCluster
from dask.distributed import Client

cluster = SLURMCluster(processes=8,  #By default Dask will run one Python process per job. 
                       # However, you can optionally choose to cut up that job into multiple processes
                       cores=32,  # size of a single job -- typically one node of the HPC cluster
                       memory="256GB", # snow has 8 GB ram per physical core I believe
                       walltime="01:00:00",
                       queue="snow",
                       interface="ib0",
                      )
cluster.scale(1)
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.SLURMCluster
Dashboard: http://10.77.8.107:8787/status,

0,1
Dashboard: http://10.77.8.107:8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.77.8.107:40409,Workers: 0
Dashboard: http://10.77.8.107:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


### Effects of different chunking

In [10]:
ds = xr.open_mfdataset(files, chunks={'time':30, 'lev': 1})
vT = ds.v * ds.t
vT

Unnamed: 0,Array,Chunk
Bytes,45.24 GiB,29.75 MiB
Shape,"(1460, 32, 361, 720)","(30, 1, 361, 720)"
Dask graph,1568 chunks in 5 graph layers,1568 chunks in 5 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 45.24 GiB 29.75 MiB Shape (1460, 32, 361, 720) (30, 1, 361, 720) Dask graph 1568 chunks in 5 graph layers Data type float32 numpy.ndarray",1460  1  720  361  32,

Unnamed: 0,Array,Chunk
Bytes,45.24 GiB,29.75 MiB
Shape,"(1460, 32, 361, 720)","(30, 1, 361, 720)"
Dask graph,1568 chunks in 5 graph layers,1568 chunks in 5 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [11]:
meanfield = vT.mean(dim='lon').groupby(ds.time.dt.month).mean()
meanfield

Unnamed: 0,Array,Chunk
Bytes,541.50 kiB,1.41 kiB
Shape,"(12, 32, 361)","(1, 1, 361)"
Dask graph,384 chunks in 60 graph layers,384 chunks in 60 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 541.50 kiB 1.41 kiB Shape (12, 32, 361) (1, 1, 361) Dask graph 384 chunks in 60 graph layers Data type float32 numpy.ndarray",361  32  12,

Unnamed: 0,Array,Chunk
Bytes,541.50 kiB,1.41 kiB
Shape,"(12, 32, 361)","(1, 1, 361)"
Dask graph,384 chunks in 60 graph layers,384 chunks in 60 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [12]:
%time meanfield.sel(month=1).load()

CPU times: user 2.5 s, sys: 455 ms, total: 2.96 s
Wall time: 41.9 s


In [13]:
ds.close()

### Same thing, different chunking

In [14]:
ds = xr.open_mfdataset(files, chunks={'time':10, 'lev': 32})

In [15]:
meanfield = (ds.v*ds.t).mean(dim='lon').groupby(ds.time.dt.month).mean()
meanfield

Unnamed: 0,Array,Chunk
Bytes,541.50 kiB,45.12 kiB
Shape,"(12, 32, 361)","(1, 32, 361)"
Dask graph,12 chunks in 62 graph layers,12 chunks in 62 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 541.50 kiB 45.12 kiB Shape (12, 32, 361) (1, 32, 361) Dask graph 12 chunks in 62 graph layers Data type float32 numpy.ndarray",361  32  12,

Unnamed: 0,Array,Chunk
Bytes,541.50 kiB,45.12 kiB
Shape,"(12, 32, 361)","(1, 32, 361)"
Dask graph,12 chunks in 62 graph layers,12 chunks in 62 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [16]:
%time meanfield.sel(month=1).load()

CPU times: user 812 ms, sys: 120 ms, total: 932 ms
Wall time: 29.9 s


In [17]:
ds.close()

### And yet another chunking

In [18]:
ds = xr.open_mfdataset(files, chunks={'time':124, 'lev': 32})
vT = ds.v * ds.t
vT

Unnamed: 0,Array,Chunk
Bytes,45.24 GiB,3.84 GiB
Shape,"(1460, 32, 361, 720)","(124, 32, 361, 720)"
Dask graph,12 chunks in 5 graph layers,12 chunks in 5 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 45.24 GiB 3.84 GiB Shape (1460, 32, 361, 720) (124, 32, 361, 720) Dask graph 12 chunks in 5 graph layers Data type float32 numpy.ndarray",1460  1  720  361  32,

Unnamed: 0,Array,Chunk
Bytes,45.24 GiB,3.84 GiB
Shape,"(1460, 32, 361, 720)","(124, 32, 361, 720)"
Dask graph,12 chunks in 5 graph layers,12 chunks in 5 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [19]:
meanfield = vT.mean(dim='lon').groupby(ds.time.dt.month).mean()
meanfield

Unnamed: 0,Array,Chunk
Bytes,541.50 kiB,90.25 kiB
Shape,"(12, 32, 361)","(2, 32, 361)"
Dask graph,11 chunks in 47 graph layers,11 chunks in 47 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 541.50 kiB 90.25 kiB Shape (12, 32, 361) (2, 32, 361) Dask graph 11 chunks in 47 graph layers Data type float32 numpy.ndarray",361  32  12,

Unnamed: 0,Array,Chunk
Bytes,541.50 kiB,90.25 kiB
Shape,"(12, 32, 361)","(2, 32, 361)"
Dask graph,11 chunks in 47 graph layers,11 chunks in 47 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [20]:
%time meanfield.sel(month=1).load()

CPU times: user 1.7 s, sys: 217 ms, total: 1.91 s
Wall time: 1min 13s


In [21]:
ds.close()

## Results

Same as for batch, the smallest wall time for this calculation was achieved using the distributed cluster and a chunking strategy `chunks={'time':10, 'lev': 32}`.

For this one-month calculation, using the distributed cluster **reduces our wall time from about 65 s down to about 30 s**. 

That's about a factor of two speedup. 

Compared to batch, snow is about 1.5x slower (30 seconds vs 20 seconds).

In [22]:
cluster.close()