2. Computing seasonal and monthly means from CFSR data#

Introduction#

These notes will show how to:

  1. Access the 6-hourly CFSR dataset on our local disks

  2. Use xarray.open_mfdataset() to concatenate data spread across many individual netCDF files into a single dataset

  3. Take advantage of xarray’s “lazy execution” model to define our climatology before explicitly computing any results

  4. Compute the climatologies (seasonal and monthly means) and save the resulting (vastly) reduced datasets to disk as netCDF files

Note, the commented-out code takes a VERY long time to run! And there is really no need to run it again because we are saving the results to disk.

This is just for demonstration purposes.

The CFSR data#

The Climate Forecase System Reanalysis or CFSR is a reanalysis data product giving a best estimate of the state of the coupled climate system since 1979 based on a blend of observations and numerical model.

The CFSR is a third generation reanalysis product. It is a global, high resolution, coupled atmosphere-ocean-land surface-sea ice system designed to provide the best estimate of the state of these coupled domains over this period. The CFSR includes (1) coupling of atmosphere and ocean during the generation of the 6 hour guess field, (2) an interactive sea-ice model, and (3) assimilation of satellite radiances. The CFSR global atmosphere resolution is ~38 km (T382) with 64 levels. The global ocean is 0.25° at the equator, extending to a global 0.5° beyond the tropics, with 40 levels. The global land surface model has 4 soil levels and the global sea ice model has 3 levels. The CFSR atmospheric model contains observed variations in carbon dioxide (CO2), together with changes in aerosols and other trace gases and solar variations. With these variable parameters, the analyzed state will include estimates of changes in the Earth system climate due to these factors. The current CFSR will be extended as an operational, real time product into the future.

We maintain a continuously updated local copy of the 6-hourly CFSR data on shared disks here in DAES. You can browse the available data through this catalog:

http://thredds.atmos.albany.edu:8080/thredds/catalog/CFSR/catalog.html

That links takes you to a so-called THREDDS server which provides remote access to the data. However we’re always going to get much better performance with local access to the file system.

thredds_path = 'http://thredds.atmos.albany.edu:8080/thredds/dodsC/CFSR/' 
local_path = '/cfsr/data/'

path = local_path  # switch this to thredds_path if running remotely ... but much poorer performance

Let’s take a peak at one directory of data from a particular year:

ls /cfsr/data/2000/
g.2000.0p5.anl.nc        q.2000.0p5.anl.nc       u_pv.2000.0p5.anl.nc
pmsl.2000.0p5.anl.nc     t.2000.0p5.anl.nc       v.2000.0p5.anl.nc
pres_pv.2000.0p5.anl.nc  t_pv.2000.0p5.anl.nc    v_isen.2000.0p5.anl.nc
psfc.2000.0p5.anl.nc     tsfc.2000.0p5.anl.nc    v_pv.2000.0p5.anl.nc
pv_isen.2000.0p5.anl.nc  u.2000.0p5.anl.nc       w.2000.0p5.anl.nc
pwat.2000.0p5.anl.nc     u_isen.2000.0p5.anl.nc

We can see 17 different fields stored in individual netCDF files labeled with their calendar year.

Our goal is to compute a 30-year seasonal climatology for all variables. So we are going to begin by opening handles to 30 years of data.

# It's helpful to store all the variable names in this list
variables = ['g',
             'pmsl',
             'pres_pv',
             'psfc',
             'pv_isen',
             'pwat',
             'q',
             't',
             't_pv',
             'tsfc',
             'u',
             'u_isen',
             'u_pv',
             'v',
             'v_isen',
             'v_pv',
             'w',
            ]

Opening handles to the dataset#

Here we are going to assemble a list of paths to every data file that we want to concatenate into a single dataset.

Assembling paths to all the data#

Here’s the logic for a single variable:

pmsl_files = []
var = 'pmsl'
for y in range(1988,2018):  # choosing years 1988 through 2017 because there is some missing data after 2017
    pmsl_filepath = path + str(y) + '/' + var + '.' + str(y) +'.0p5.anl.nc'
    pmsl_files.append(pmsl_filepath)

Let’s take a peek at part of the list we just created:

pmsl_files
['/cfsr/data/1988/pmsl.1988.0p5.anl.nc',
 '/cfsr/data/1989/pmsl.1989.0p5.anl.nc',
 '/cfsr/data/1990/pmsl.1990.0p5.anl.nc',
 '/cfsr/data/1991/pmsl.1991.0p5.anl.nc',
 '/cfsr/data/1992/pmsl.1992.0p5.anl.nc',
 '/cfsr/data/1993/pmsl.1993.0p5.anl.nc',
 '/cfsr/data/1994/pmsl.1994.0p5.anl.nc',
 '/cfsr/data/1995/pmsl.1995.0p5.anl.nc',
 '/cfsr/data/1996/pmsl.1996.0p5.anl.nc',
 '/cfsr/data/1997/pmsl.1997.0p5.anl.nc',
 '/cfsr/data/1998/pmsl.1998.0p5.anl.nc',
 '/cfsr/data/1999/pmsl.1999.0p5.anl.nc',
 '/cfsr/data/2000/pmsl.2000.0p5.anl.nc',
 '/cfsr/data/2001/pmsl.2001.0p5.anl.nc',
 '/cfsr/data/2002/pmsl.2002.0p5.anl.nc',
 '/cfsr/data/2003/pmsl.2003.0p5.anl.nc',
 '/cfsr/data/2004/pmsl.2004.0p5.anl.nc',
 '/cfsr/data/2005/pmsl.2005.0p5.anl.nc',
 '/cfsr/data/2006/pmsl.2006.0p5.anl.nc',
 '/cfsr/data/2007/pmsl.2007.0p5.anl.nc',
 '/cfsr/data/2008/pmsl.2008.0p5.anl.nc',
 '/cfsr/data/2009/pmsl.2009.0p5.anl.nc',
 '/cfsr/data/2010/pmsl.2010.0p5.anl.nc',
 '/cfsr/data/2011/pmsl.2011.0p5.anl.nc',
 '/cfsr/data/2012/pmsl.2012.0p5.anl.nc',
 '/cfsr/data/2013/pmsl.2013.0p5.anl.nc',
 '/cfsr/data/2014/pmsl.2014.0p5.anl.nc',
 '/cfsr/data/2015/pmsl.2015.0p5.anl.nc',
 '/cfsr/data/2016/pmsl.2016.0p5.anl.nc',
 '/cfsr/data/2017/pmsl.2017.0p5.anl.nc']

Looks good!

But that was just to demonstrate the list logic. we’re actually going to make a single list with all variables:

files = []
for y in range(1988,2018):   # choosing years 1988 through 2017 because there is some missing data after 2017
    for varname in variables:
        filepath = path + str(y) + '/' + varname + '.' + str(y) +'.0p5.anl.nc'
        files.append(filepath)

Passing through paths to xarray.open_mfdataset()#

Now we pass our list of path and let Xarray (with Dask in the background) handle the concatenation:

Note the chunks= argument, which gives instructions about how to divide up the data into “chunks” that can be moved in and out of memory. I needed some trial and error to find chunk sizes that seemed to make sense here.

import xarray as xr

cfsr_6hourly = xr.open_mfdataset(files, chunks={'time':30*4, 'lev': 4}, parallel=True)
cfsr_6hourly
/knight/anaconda_aug22/envs/aug22_env/lib/python3.10/site-packages/xarray/core/indexing.py:1374: PerformanceWarning: Slicing is producing a large chunk. To accept the large
chunk and silence this warning, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ...     array[indexer]

To avoid creating the large chunks, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ...     array[indexer]
  return self.array[key]
/knight/anaconda_aug22/envs/aug22_env/lib/python3.10/site-packages/xarray/core/indexing.py:1374: PerformanceWarning: Slicing is producing a large chunk. To accept the large
chunk and silence this warning, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ...     array[indexer]

To avoid creating the large chunks, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ...     array[indexer]
  return self.array[key]
/knight/anaconda_aug22/envs/aug22_env/lib/python3.10/site-packages/xarray/core/indexing.py:1374: PerformanceWarning: Slicing is producing a large chunk. To accept the large
chunk and silence this warning, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ...     array[indexer]

To avoid creating the large chunks, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ...     array[indexer]
  return self.array[key]
/knight/anaconda_aug22/envs/aug22_env/lib/python3.10/site-packages/xarray/core/indexing.py:1374: PerformanceWarning: Slicing is producing a large chunk. To accept the large
chunk and silence this warning, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ...     array[indexer]

To avoid creating the large chunks, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ...     array[indexer]
  return self.array[key]
/knight/anaconda_aug22/envs/aug22_env/lib/python3.10/site-packages/xarray/core/indexing.py:1374: PerformanceWarning: Slicing is producing a large chunk. To accept the large
chunk and silence this warning, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ...     array[indexer]

To avoid creating the large chunks, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ...     array[indexer]
  return self.array[key]
/knight/anaconda_aug22/envs/aug22_env/lib/python3.10/site-packages/xarray/core/indexing.py:1374: PerformanceWarning: Slicing is producing a large chunk. To accept the large
chunk and silence this warning, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ...     array[indexer]

To avoid creating the large chunks, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ...     array[indexer]
  return self.array[key]
/knight/anaconda_aug22/envs/aug22_env/lib/python3.10/site-packages/xarray/core/indexing.py:1374: PerformanceWarning: Slicing is producing a large chunk. To accept the large
chunk and silence this warning, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ...     array[indexer]

To avoid creating the large chunks, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ...     array[indexer]
  return self.array[key]
<xarray.Dataset>
Dimensions:  (time: 43832, lat: 361, lon: 720, lev: 40)
Coordinates:
  * time     (time) datetime64[ns] 1988-01-01 ... 2017-12-31T18:00:00
  * lat      (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0
  * lon      (lon) float32 -180.0 -179.5 -179.0 -178.5 ... 178.5 179.0 179.5
  * lev      (lev) float32 -2e-06 2e-06 10.0 20.0 ... 925.0 950.0 975.0 1e+03
Data variables: (12/17)
    g        (time, lev, lat, lon) float32 dask.array<chunksize=(120, 6, 361, 720), meta=np.ndarray>
    pmsl     (time, lat, lon) float32 dask.array<chunksize=(120, 361, 720), meta=np.ndarray>
    pres_pv  (time, lev, lat, lon) float32 dask.array<chunksize=(120, 40, 361, 720), meta=np.ndarray>
    psfc     (time, lat, lon) float32 dask.array<chunksize=(120, 361, 720), meta=np.ndarray>
    pv_isen  (time, lev, lat, lon) float32 dask.array<chunksize=(120, 18, 361, 720), meta=np.ndarray>
    pwat     (time, lat, lon) float32 dask.array<chunksize=(120, 361, 720), meta=np.ndarray>
    ...       ...
    u_isen   (time, lev, lat, lon) float32 dask.array<chunksize=(120, 18, 361, 720), meta=np.ndarray>
    u_pv     (time, lev, lat, lon) float32 dask.array<chunksize=(120, 40, 361, 720), meta=np.ndarray>
    v        (time, lev, lat, lon) float32 dask.array<chunksize=(120, 6, 361, 720), meta=np.ndarray>
    v_isen   (time, lev, lat, lon) float32 dask.array<chunksize=(120, 18, 361, 720), meta=np.ndarray>
    v_pv     (time, lev, lat, lon) float32 dask.array<chunksize=(120, 40, 361, 720), meta=np.ndarray>
    w        (time, lev, lat, lon) float32 dask.array<chunksize=(120, 6, 361, 720), meta=np.ndarray>
Attributes:
    description:    g 1000-10 hPa
    year:           1988
    source:         http://nomads.ncdc.noaa.gov/data.php?name=access#CFSR-data
    references:     Saha, et. al., (2010)
    created_by:     User: kgriffin
    creation_date:  Fri Apr 13 14:12:27 UTC 2012

We can see that we have a dataset with

  • 17 data variables

  • 1/4 degree lat-lon resolution

  • 40 pressure levels

  • 43,832 time elements (that’s 4x daily for 30 years, accounting for leap years!)

Dask and lazy execution#

Note that we’ve just loaded in handles to a LOT of data. Each 4-dimensional field (lat,lon,pressure,time) is roughly \(4.6 \times 10^{11}\) data points, which at single-precision floating point storage works out to about 1.7 TB per variable!

The total size of this 30-year dataset is about 24 TB.

Actually we can see this in the dataset object like this:

cfsr_6hourly.nbytes / 1E12 # in TB
23.87933732538

How does this work? We definitely do not have 24 TB of available memory on this system!

This is an example of “lazy execution”. We have not actually read all that data off of disk. All we have done is read the metadata to effectively create a map of the dataset so that Xarray knows which computations should be valid on this dataset.

Creating the seasonal climatology#

Now let’s get started on reducing this dataset into a form more amenable to our analysis. We are going to take time averages over individual seasons.

Grouping by season#

Our time axis already understands meteorological seasons DJF (December, January, February), JJA (June, July, August), etc. So creating groups for the four seasons is very simple. Here’s a quick example of using a groupby operation (using one variable as example: t or air temperature):

cfsr_6hourly.t.groupby(cfsr_6hourly.time.dt.season)
DataArrayGroupBy, grouped over 'season'
4 groups with labels 'DJF', 'JJA', 'MAM', 'SON'.

Definining the climatology#

We use a grouped operation to take averages over all data points within each season.

Note, this operation is very fast because no actualy computations are being done at this point, just laying out a map for how to do the computations.

t_seas_clim = cfsr_6hourly.t.groupby(cfsr_6hourly.time.dt.season).mean()
t_seas_clim
/knight/anaconda_aug22/envs/aug22_env/lib/python3.10/site-packages/dask/array/routines.py:1995: PerformanceWarning: Slicing is producing a large chunk. To accept the large
chunk and silence this warning, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ...     array[indexer]

To avoid creating the large chunks, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ...     array[indexer]
  return a[(slice(None),) * axis + (indices,)]
<xarray.DataArray 't' (season: 4, lev: 40, lat: 361, lon: 720)>
dask.array<transpose, shape=(4, 40, 361, 720), dtype=float32, chunksize=(1, 7, 361, 720), chunktype=numpy.ndarray>
Coordinates:
  * lat      (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0
  * lon      (lon) float32 -180.0 -179.5 -179.0 -178.5 ... 178.5 179.0 179.5
  * lev      (lev) float32 -2e-06 2e-06 10.0 20.0 ... 925.0 950.0 975.0 1e+03
  * season   (season) object 'DJF' 'JJA' 'MAM' 'SON'

Now we can see that the resulting dataset has a seasons axis with our four seasons ‘DJF’, ‘MAM’, ‘JJA’, ‘SON’.

With this time averaging, we have reduced the data for this one variable from 1.7 TB down to about 159 MB.

Our task is now to save the reduced dataset to disk, so that we can do our analyses on data that fits much more comfortably into memory.

Now comes all the heavy lifting. In order to save the climatology to disk, we have to actually carry out the computations. This part will be very slow.

Saving the climatology to disk#

The code is extremely simple to write, but the execution will be very slow.

output_path = '/nfs/roselab_rit/data/cfsr_climatology/'

#  Here's what the code looks like to save to a netcdf file... don't actually execute this!
# pmsl_seas_clim.to_netcdf(output_path + 'pmsl.seas_clim.0p5.nc')

The production run: compute and save seasonal climatologies for all variables#

We’re going to define a function to do all the heavy lifting for us:

def make_seasonal_climatology(varname, dataset=cfsr_6hourly,
                              output_path='/nfs/roselab_rit/data/cfsr_climatology/'):
    var_seas_clim = dataset[varname].groupby(dataset.time.dt.season).mean(skipna=True)
    var_seas_clim.to_netcdf(output_path + varname + '.seas_clim.0p5.nc')

Each of the calls to the make_seasonal_climatology function will take a long time (like several hours for the 4-dimensional fields).

First the 3D fields (no pressure levels … much smaller):

# No need to run this code again! It tooks hours to complete. 
# The results are stored on disk

# for varname in ['pmsl', 'pwat', 'tsfc']:
#     make_seasonal_climatology(varname)

And then the really big ones (4D fields on pressure grids):

# No need to run this code again! It tooks days (seriously) to complete. 
# The results are stored on disk

# for varname in ['u','v','w','t','q','g',]:
#     make_seasonal_climatology(varname)

Monthly climatologies#

In addition to the seasonal climatologies (DJF, MAM, JJA, SON), we’ll also compute monthly climatologies.

Here’s what that looks like, again using lazy execution. We can easily define the climatology for the entire dataset like this:

cfsr_monthly = cfsr_6hourly.groupby(cfsr_6hourly.time.dt.month).mean()
cfsr_monthly
/knight/anaconda_aug22/envs/aug22_env/lib/python3.10/site-packages/dask/array/routines.py:1995: PerformanceWarning: Slicing is producing a large chunk. To accept the large
chunk and silence this warning, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ...     array[indexer]

To avoid creating the large chunks, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ...     array[indexer]
  return a[(slice(None),) * axis + (indices,)]
<xarray.Dataset>
Dimensions:  (month: 12, lev: 40, lat: 361, lon: 720)
Coordinates:
  * lat      (lat) float32 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0
  * lon      (lon) float32 -180.0 -179.5 -179.0 -178.5 ... 178.5 179.0 179.5
  * lev      (lev) float32 -2e-06 2e-06 10.0 20.0 ... 925.0 950.0 975.0 1e+03
  * month    (month) int64 1 2 3 4 5 6 7 8 9 10 11 12
Data variables: (12/17)
    g        (month, lev, lat, lon) float32 dask.array<chunksize=(1, 6, 361, 720), meta=np.ndarray>
    pmsl     (month, lat, lon) float32 dask.array<chunksize=(1, 361, 720), meta=np.ndarray>
    pres_pv  (month, lev, lat, lon) float32 dask.array<chunksize=(1, 40, 361, 720), meta=np.ndarray>
    psfc     (month, lat, lon) float32 dask.array<chunksize=(1, 361, 720), meta=np.ndarray>
    pv_isen  (month, lev, lat, lon) float32 dask.array<chunksize=(1, 18, 361, 720), meta=np.ndarray>
    pwat     (month, lat, lon) float32 dask.array<chunksize=(1, 361, 720), meta=np.ndarray>
    ...       ...
    u_isen   (month, lev, lat, lon) float32 dask.array<chunksize=(1, 18, 361, 720), meta=np.ndarray>
    u_pv     (month, lev, lat, lon) float32 dask.array<chunksize=(1, 40, 361, 720), meta=np.ndarray>
    v        (month, lev, lat, lon) float32 dask.array<chunksize=(1, 6, 361, 720), meta=np.ndarray>
    v_isen   (month, lev, lat, lon) float32 dask.array<chunksize=(1, 18, 361, 720), meta=np.ndarray>
    v_pv     (month, lev, lat, lon) float32 dask.array<chunksize=(1, 40, 361, 720), meta=np.ndarray>
    w        (month, lev, lat, lon) float32 dask.array<chunksize=(1, 6, 361, 720), meta=np.ndarray>

Here instead of the season axis, we have a month axis with integer values 1 = January, 2 = February, etc.

Computing monthly results#

In principle we could compute the entire climatology in one line of code (and into a single file) like

cfsr_monthly.to_netcdf(output_path + 'cfsr.mon_clim.0p5.nc')

Instead I’m going to work with one variable at a time, like we did for the seasonal climatology. Here’s another helper function:

def make_monthly_climatology(varname, dataset=cfsr_6hourly,
                              output_path='/nfs/roselab_rit/data/cfsr_climatology/'):
    var_mon_clim = dataset[varname].groupby(dataset.time.dt.month).mean(skipna=True)
    var_mon_clim.to_netcdf(output_path + varname + '.mon_clim.0p5.nc')

Again, this is the heavy lifting. The code is just here for reference, please do not try to re-run it!

# for varname in ['pmsl', 'pwat', 'tsfc']:
#     make_monthly_climatology(varname)
# for varname in ['u','v','w','t','q','g',]:
#     make_monthly_climatology(varname)