Commit 89844dce authored by Peters, Wouter's avatar Peters, Wouter
Browse files

Added new aggregation routines, and expanded existing ones. The routines and their changes are:

1) 1x1 fluxes

Cycle fluxes are now written in cycle-by-cycle files instead of all into one file. This makes file handling easier. Also, I now 
write the full ensemble of 1x1 fluxes to each file, resulting in much more data. The advantage is that this data can be used to post-
aggregate to any mapping needed, including mapping of the full covariance through the ensemble.

2) State fluxes

State fluxes are also written per cycle for the same reasons.

3) Olson fluxes

mean fluxes as well as the ensemble are aggregated to 240 regions (11*19 Olson ecosystems and 30 Ocean regions)

4) TransCom fluxes

mean fluxes as well as the ensemble are aggregated to 23 regions. This is similar to the dedicated routine to make TransCom
fluxes. The disadvantage is that the dedicated routine uses a staevector-to-transcom mapping which is required to be part of the 
Statevector, whereas this is not needed for the new routine. From the ensemble, aggregated fluxes and TransCom covariance can be constructed.

5) Country fluxes

mean and ensemble fluxes are written for a number of large countries (USA, China, Russia, EU27, Brazil,...). This routine can in principle handle all countries of the World, but small countries get unreliable fluxes because of aggregation errors, and the uncertainty on the parameters on smaller scales.

For the aggregation, several dictionary are used to map from the grid to the aggregate regions. These are pickled to make the routine faster, and provided also through SVN.

parent 6a8a7352
......@@ -39,6 +39,7 @@ def SaveWeeklyAvg1x1Data(DaCycle, StateVector):
import da.tools.io4 as io
import logging
from da.analysis.tools_regions import globarea
#
dirname = 'data_flux1x1_weekly'
#
......@@ -58,13 +59,14 @@ def SaveWeeklyAvg1x1Data(DaCycle, StateVector):
#
# Create or open NetCDF output file
#
saveas = os.path.join(dirname,'flux_1x1.nc')
saveas = os.path.join(dirname,'flux_1x1.%s.nc'% startdate.strftime('%Y-%m-%d') )
ncf = io.CT_CDF(saveas,'write')
#
# Create dimensions and lat/lon grid
#
dimgrid = ncf.AddLatLonDim()
dimensemble = ncf.AddDim('members',StateVector.nmembers)
dimdate = ncf.AddDateDim()
#
# set title and tell GMT that we are using "pixel registration"
......@@ -112,7 +114,7 @@ def SaveWeeklyAvg1x1Data(DaCycle, StateVector):
filename = os.path.join(savedir,'savestate.nc')
if os.path.exists(filename):
dummy = StateVector.ReadFromFile(filename,qual=qual_short)
gridmean,gridvariance = StateVector.StateToGrid(lag=n)
gridmean,gridensemble = StateVector.StateToGrid(lag=n)
# Replace the mean statevector by all ones (assumed priors)
......@@ -125,7 +127,7 @@ def SaveWeeklyAvg1x1Data(DaCycle, StateVector):
savedir = DaCycle['dir.output']
filename = os.path.join(savedir,'savestate.nc')
dummy = StateVector.ReadFromFile(filename,qual=qual_short)
gridmean,gridvariance = StateVector.StateToGrid(lag=1)
gridmean,gridensemble = StateVector.StateToGrid(lag=1)
msg = 'Read posterior dataset from file %s, sds %d: '%(filename,1) ; logging.debug(msg)
#
......@@ -133,8 +135,8 @@ def SaveWeeklyAvg1x1Data(DaCycle, StateVector):
#
biomapped=bio*gridmean
oceanmapped=ocean*gridmean
biovarmapped=bio*gridvariance
oceanvarmapped=ocean*gridvariance
biovarmapped=bio*gridensemble
oceanvarmapped=ocean*gridensemble
#
#
......@@ -152,15 +154,15 @@ def SaveWeeklyAvg1x1Data(DaCycle, StateVector):
savedict['count']=next
ncf.AddData(savedict)
savedict=ncf.StandardVar(varname='bio_flux_%s_cov'%qual_short)
savedict=ncf.StandardVar(varname='bio_flux_%s_ensemble'%qual_short)
savedict['values']=biovarmapped.tolist()
savedict['dims']=dimdate+dimgrid
savedict['dims']=dimdate+dimensemble+dimgrid
savedict['count']=next
ncf.AddData(savedict)
#
savedict=ncf.StandardVar(varname='ocn_flux_%s_cov'%qual_short)
savedict=ncf.StandardVar(varname='ocn_flux_%s_ensemble'%qual_short)
savedict['values']=oceanvarmapped.tolist()
savedict['dims']=dimdate+dimgrid
savedict['dims']=dimdate+dimensemble+dimgrid
savedict['count']=next
ncf.AddData(savedict)
......@@ -177,6 +179,12 @@ def SaveWeeklyAvg1x1Data(DaCycle, StateVector):
savedict['dims']=dimdate+dimgrid
savedict['count']=next
ncf.AddData(savedict)
area=globarea()
savedict=ncf.StandardVar(varname='cell_area')
savedict['values']=area.tolist()
savedict['dims']=dimgrid
ncf.AddData(savedict)
#
savedict=ncf.StandardVar(varname='date')
savedict['values']=date2num(startdate)-dectime0+dt.days/2.0
......@@ -542,9 +550,9 @@ def SaveWeeklyAvgTCData(DaCycle, StateVector):
tcdata = np.array(tcdata)
try:
cov = tcdata.transpose().dot(tcdata)/(StateVector.nmembers-1)
except:
cov = np.dot(tcdata.transpose(),tcdata)/(StateVector.nmembers-1) # Huygens fix
cov = tcdata.transpose().dot(tcdata)/(StateVector.nmembers-1)
except:
cov = np.dot(tcdata.transpose(),tcdata)/(StateVector.nmembers-1) # Huygens fix
#print vname,cov.sum()
......@@ -695,6 +703,231 @@ def SaveWeeklyAvgExtTCData(DaCycle):
return saveas
def SaveWeeklyAvgAggData(DaCycle, region_aggregate='olson'):
"""
Function creates a NetCDF file with output on TransCom regions. It uses the flux input from the
function `SaveWeeklyAvg1x1Data` to create fluxes of length `nparameters`, which are then projected
onto TC regions using the internal methods from :class:`~da.baseclasses.statevector.StateVector`.
:param DaCycle: a :class:`~da.tools.initexit.CycleControl` object
:param StateVector: a :class:`~da.baseclasses.statevector.StateVector`
:rtype: None
This function only read the prior fluxes from the flux_1x1.nc files created before, because we want to convolve
these with the parameters in the StateVector. This creates posterior fluxes, and the posterior covariance for the complete
StateVector in units of mol/box/s which we then turn into TC fluxes and covariances.
"""
from da.analysis.tools_regions import globarea, StateToGrid
import da.analysis.tools_transcom as tc
import da.analysis.tools_country as ct
import da.tools.io4 as io
import logging
#
dirname = 'data_%s_weekly'%region_aggregate
#
dirname = CreateDirs(os.path.join(DaCycle['dir.analysis'],dirname))
#
# Some help variables
#
dectime0 = date2num(datetime(2000,1,1))
dt = DaCycle['cyclelength']
startdate = DaCycle['time.start']
enddate = DaCycle['time.end']
ncfdate = date2num(startdate)-dectime0+dt.days/2.0
msg = "DA Cycle start date is %s" % startdate.strftime('%Y-%m-%d %H:%M') ; logging.debug(msg)
msg = "DA Cycle end date is %s" % enddate.strftime('%Y-%m-%d %H:%M') ; logging.debug(msg)
msg = "Aggregating 1x1 fluxes to %s totals" % region_aggregate ; logging.debug(msg)
# Write/Create NetCDF output file
#
saveas = os.path.join(dirname,'%s_fluxes.%s.nc'%(region_aggregate, startdate.strftime('%Y-%m-%d'),))
ncf = io.CT_CDF(saveas,'write')
dimdate = ncf.AddDateDim()
dimidateformat = ncf.AddDateDimFormat()
dimgrid = ncf.AddLatLonDim() # for mask
#
# Select regions to aggregate to
#
if region_aggregate == "olson":
regionmask = tc.olson240mask
dimname = 'olson'
dimregs = ncf.AddDim(dimname,regionmask.max())
regionnames = []
for i in range(11):
for j in range(19):
regionnames.append("%s_%s"%(tc.transnams[i],tc.olsonnams[j],) )
regionnames.extend(tc.oifnams)
for i,name in enumerate(regionnames):
lab='Aggregate_Region_%03d'%(i+1,)
setattr(ncf,lab,name)
elif region_aggregate == "transcom":
regionmask = tc.transcommask
dimname = 'tc'
dimregs = ncf.AddRegionDim(type='tc')
elif region_aggregate == "country":
countrydict = ct.get_countrydict()
selected = ['Russia','Canada','China','United States','EU27','Brazil','Australia','India'] #,'G8','UNFCCC_annex1','UNFCCC_annex2']
regionmask = np.zeros((180,360,),'float')
for i,name in enumerate(selected):
lab='Country_%03d'%(i+1,)
setattr(ncf,lab,name)
if name == 'EU27':
namelist=ct.EU27
elif name == 'EU25':
namelist=ct.EU25
elif name == 'G8':
namelist=ct.G8
elif name == 'UNFCCC_annex1':
namelist=ct.annex1
elif name == 'UNFCCC_annex2':
namelist=ct.annex2
else:
namelist=[name]
for countryname in namelist:
try:
country=countrydict[countryname]
regionmask.put(country.gridnr,i+1)
except:
continue
dimname = 'country'
dimregs = ncf.AddDim(dimname,regionmask.max())
#
skip = ncf.has_date(ncfdate)
if skip:
msg = 'Skipping writing of data for date %s : already present in file %s' % (startdate.strftime('%Y-%m-%d'),saveas) ; logging.warning(msg)
else:
#
# set title and tell GMT that we are using "pixel registration"
#
setattr(ncf,'Title','CTDAS Aggregated fluxes')
setattr(ncf,'node_offset',1)
savedict = ncf.StandardVar('unknown')
savedict['name'] = 'regionmask'
savedict['comment'] = 'numerical mask used to aggregate 1x1 flux fields, each integer 0,...,N is one region aggregated'
savedict['values'] = regionmask.tolist()
savedict['units'] = '-'
savedict['dims'] = dimgrid
savedict['count'] = 0
dummy = ncf.AddData(savedict)
# Get input data from 1x1 degree flux files
area=globarea()
infile=os.path.join(DaCycle['dir.analysis'],'data_flux1x1_weekly','flux_1x1.%s.nc'%startdate.strftime('%Y-%m-%d') )
if not os.path.exists(infile):
msg="Needed input file (%s) does not exist yet, please create file first, returning..."%infile ; logging.error(msg)
return None
ncf_in = io.CT_Read(infile,'read')
# Transform data one by one
# Get the date variable, and find index corresponding to the DaCycle date
try:
dates = ncf_in.variables['date'][:]
except KeyError:
msg = "The variable date cannot be found in the requested input file (%s) " % infile ; logging.error(msg)
msg = "Please make sure you create gridded fluxes before making TC fluxes " ; logging.error(msg)
raise KeyError
try:
index = dates.tolist().index(ncfdate)
except ValueError:
msg = "The requested cycle date is not yet available in file %s "% infile ; logging.error(msg)
msg = "Please make sure you create state based fluxes before making TC fluxes " ; logging.error(msg)
raise ValueError
# First add the date for this cycle to the file, this grows the unlimited dimension
savedict = ncf.StandardVar(varname='date')
savedict['values'] = ncfdate
savedict['dims'] = dimdate
savedict['count'] = index
dummy = ncf.AddData(savedict)
# Now convert other variables that were inside the statevector file
vardict = ncf_in.variables
for vname, vprop in vardict.iteritems():
if vname=='latitude': continue
elif vname=='longitude': continue
elif vname=='date': continue
elif vname=='idate': continue
elif 'std' in vname: continue
elif 'ensemble' in vname:
data = ncf_in.GetVariable(vname)[index]
dimensemble = ncf.AddDim('members',data.shape[0])
regiondata=[]
for member in data:
aggdata = StateToGrid(member*area,regionmask,reverse=True,mapname=region_aggregate)
regiondata.append( aggdata )
regiondata = np.array(regiondata)
savedict = ncf.StandardVar(varname=vname)
savedict['units'] = 'mol/region/s'
savedict['dims'] = dimdate+dimensemble+dimregs
elif 'flux' in vname:
data = ncf_in.GetVariable(vname)[index]
regiondata = StateToGrid(data*area,regionmask,reverse=True,mapname=region_aggregate)
savedict = ncf.StandardVar(varname=vname)
savedict['dims'] = dimdate+dimregs
savedict['units'] = 'mol/region/s'
else:
data = ncf_in.GetVariable(vname)[:]
regiondata = StateToGrid(data,regionmask,reverse=True,mapname=region_aggregate)
savedict = ncf.StandardVar(varname=vname)
savedict['dims'] = dimdate+dimregs
savedict['count'] = index
savedict['values'] = regiondata
ncf.AddData(savedict)
ncf_in.close()
ncf.close()
msg="%s aggregated weekly average fluxes now written"%dimname ; logging.info(msg)
return saveas
def SaveTimeAvgData(DaCycle,infile,avg='monthly'):
""" Function saves time mean surface flux data to NetCDF files
......@@ -845,9 +1078,12 @@ if __name__ == "__main__":
while DaCycle['time.end'] < DaCycle['time.finish']:
savedas_1x1=SaveWeeklyAvg1x1Data(DaCycle, StateVector)
savedas_state=SaveWeeklyAvgStateData(DaCycle, StateVector)
savedas_tc=SaveWeeklyAvgTCData(DaCycle, StateVector)
savedas_tcext=SaveWeeklyAvgExtTCData(DaCycle)
#savedas_state=SaveWeeklyAvgStateData(DaCycle, StateVector)
#savedas_tc=SaveWeeklyAvgTCData(DaCycle, StateVector)
#savedas_tcext=SaveWeeklyAvgExtTCData(DaCycle)
savedas_olson=SaveWeeklyAvgAggData(DaCycle,region_aggregate='olson')
savedas_transcom=SaveWeeklyAvgAggData(DaCycle,region_aggregate='transcom')
savedas_country=SaveWeeklyAvgAggData(DaCycle,region_aggregate='country')
DaCycle.AdvanceCycleTimes()
......@@ -855,10 +1091,13 @@ if __name__ == "__main__":
for avg in ['monthly','yearly','longterm']:
#savedas_1x1 = SaveTimeAvgData(DaCycle,savedas_1x1,avg)
#savedas_state = SaveTimeAvgData(DaCycle,savedas_state,avg)
savedas_1x1 = SaveTimeAvgData(DaCycle,savedas_1x1,avg)
savedas_state = SaveTimeAvgData(DaCycle,savedas_state,avg)
savedas_tc = SaveTimeAvgData(DaCycle,savedas_tc,avg)
savedas_tcext = SaveTimeAvgData(DaCycle,savedas_tcext,avg)
savedas_olson = SaveTimeAvgData(DaCycle,savedas_olson,avg)
savedas_transcom = SaveTimeAvgData(DaCycle,savedas_transcom,avg)
savedas_country = SaveTimeAvgData(DaCycle,savedas_country,avg)
sys.exit(0)
#!/usr/bin/env python
# tools_country.py
"""
Author : peters
Revision History:
File created on 25 Jul 2008.
This module provides an interface to go from arrays of gridded data to aggregates for countries.
It uses the country information from the UNEP database, and the database created by them for this purpose. See:
http://na.unep.net/metadata/unep/GRID/GRIDCTRY.html
The module sets up a class 'countryinfo' that holds a number of attributes. These attributes can be used to
extract gridded information about the country. A build-in method agg_1x1 is provided for convencience.
The routine get_countrydict() creates a dictionary with this information for a large number of countries.
CAUTION:
The country data only covers the land areas of a nation and aggregation will exclude the fraction of a land covered
by oceans, sea, or open water. The aggregation will thus work best on arrays that are *not* defined by unit area!
"""
try:
from dbfpy import dbf
except:
print "the python DBF lib might be needed, please install from:"
print "http://dbfpy.sourceforge.net/"
print " Trying to complete anyway..."
from numpy import sum, array
from tm5tools import globarea
EU25 = ["Austria", "Belgium", "Cyprus","Czech Republic", "Denmark", "Estonia", "Finland", "France", "Germany", "Greece", "Hungary", "Ireland", "Italy", "Latvia", "Lithuania", "Luxembourg", "Malta", "Netherlands", "Poland", "Portugal", "Slovakia", "Slovenia", "Spain", "Sweden","United Kingdom"]
EU27 = ["Austria", "Belgium", "Bulgaria", "Cyprus","Czech Republic", "Denmark", "Estonia", "Finland", "France", "Germany", "Greece", "Hungary", "Ireland", "Italy", "Latvia", "Lithuania", "Luxembourg", "Malta", "Netherlands", "Poland", "Portugal", "Romania", "Slovakia", "Slovenia", "Spain", "Sweden","United Kingdom"]
G8 = [ "France", "Germany", "Italy", "Japan", "United Kingdom", "United States"]
annex1 = [ "Australia", "Austria", "Belarus", "Belgium", "Bulgaria", "Canada", "Croatia", "Czech Republic", "Denmark", "European Union", "Estonia", "Finland", "France", "Germany", "Greece", "Hungary", "Iceland", "Ireland", "Italy", "Japan", "Latvia", "Liechtenstein", "Lithuania", "Luxembourg", "Monaco", "Netherlands", "New Zealand", "Norway", "Poland", "Portugal", "Romania", "Russia", "Slovakia", "Slovenia", "Spain", "Sweden", "Switzerland", "Turkey", "Ukraine", "United Kingdom", "United States"]
annex2=[ "Australia", "Austria", "Belgium", "Canada", "Denmark", "Finland", "France", "Germany", "Greece", "Iceland", "Ireland", "Italy", "Japan", "Luxembourg", "Netherlands", "New Zealand", "Norway", "Portugal", "Spain", "Sweden", "Switzerland", "Turkey", "United Kingdom", "United States"]
class countryinfo(object):
""" Defines the information about 1x1 gridboxes covering each country """
def __init__(self,code):
self.country=code
self.ngrids=0
self.gridij=[]
self.gridnr=[]
self.gridcoords=[]
self.gridlandmask=[]
self.gridsharedocean=[]
self.gridsharedborder=[]
def add_gridinfo(self, grid_i, grid_j, gridlandmask, shared_border, shared_water):
""" add information from one gridbox to the object """
self.gridij.append( (grid_j, grid_i,) ) # add tuple with j,i coordinates
lon=-180+grid_i+0.5
lat=-90 +grid_j+0.5
self.gridcoords.append( (lat, lon,) ) # add tuple with lon,lat coordinates
self.gridnr.append( grid_j*360+grid_i ) # add grid number for take() function
self.gridlandmask.append( gridlandmask ) # this gives the total fraction of land
self.gridsharedocean.append( shared_water )
self.gridsharedborder.append( shared_border )
self.ngrids= self.ngrids+1
def agg_1x1(self,field):
""" aggregate a 1x1 input field to country total """
#print field.take(self.gridnr)
#print self.gridlandmask
return (field.take(self.gridnr)*array(self.gridlandmask)).sum()
def __str__(self):
return ' Country name : %s\n' % self.country + \
' Number of gridpoints : %d ' % self.ngrids
#' Gridpoint indices : %s ' % self.gridnr
def fix_eu(rec):
""" fix Czech Republic and Slovakia manually """
alternative_slov={
'140202': (2.0, 28.1),\
'140201': (2.0, 34.0),\
'140200': (2.0, 44.0),\
'140199': (3.0, 25.5),\
'139203': (3.0, 18.9),\
'139202': (2.0, 57.5),\
'139201': (2.0, 59.7),\
'139200': (2.0, 87.2),\
'139199': (1.0, 100.0),\
'139198': (2.0, 72.8),\
'138198': (3.0, 7.7),\
'138199':( 2.0, 10.0) }
alternative_czech={
'141193': (2.0, 23.0 ),\
'141194': (2.0, 62.1 ),\
'141195': (3.0, 89.5 ),\
'141196': (2.0, 79.4 ),\
'141197': (2.0, 42.3 ),\
'141198': (2.0, 24.5 ),\
'141199': (2.0, 0.1 ),\
'140193': (2.0, 20.6 ),\
'140194': (2.0, 88.9 ),\
'140195': (1.0, 100.0 ),\
'140196': (1.0, 100.0 ),\
'140197': (1.0, 100.0 ),\
'140198': (1.0, 100.0),\
'140199': (3.0, 50.0),\
'139195': (2.0, 70.6),\
'139196': (2.0, 12.4),\
'139197': (2.0, 30.9),\
'139198': (2.0, 25.0) }
id=str(int(rec['GRID']))
for dict in [alternative_slov,alternative_czech]:
if id in dict:
rec['COVER_ID']=dict[id][0]
rec['RATE_IN_GR']=dict[id][1]
return rec
def get_countrydict():
""" Create a dictionary with grid-to-country information from a dbf file"""
import cPickle
import mysettings
import os
file=os.path.join(mysettings.pylibdir,'datasets','country_dictionary.dat')
try:
countrydict = cPickle.load(open(file,'rb'))
except:
db=dbf.Dbf(os.path.join(mysettings.pylibdir,'datasets/GRIDCTRY.DBF'))
countrydict={}
for n,rec in enumerate(db):
code=rec['COUNTRY']
gridid=str(int(rec['GRID']))
if code in ['Czech Republic','Slovakia']:
rec=fix_eu(rec)
rate_in_gr=rec['RATE_IN_GR']*1.e-2
i=int(gridid[-3::])
j=int(gridid[0:-3])
lat=-91+j+0.5
lon=-181+i+0.5
if code in countrydict:
a=countrydict[code]
else:
a=countryinfo(code)
shared_border=False
shared_water=False
if rec['COVER_ID'] == 0.0:
shared_border=False
shared_water=True
if rec['COVER_ID'] >= 2.0:
shared_border=True
if rec['COVER_ID'] >= 10.0:
shared_water=True
a.add_gridinfo(i-1,j-1,rate_in_gr,shared_border,shared_water)
countrydict[code]=a
dummy = cPickle.dump(countrydict,open(file,'wb'),-1)
db.close()
return countrydict
if __name__ == "__main__":
countrydict=get_countrydict()
area=globarea()
areas=[]
for k,v in countrydict.items():
ar = v.agg_1x1(area)/1.e6
areas.append((ar,k))
areas.sort()
areas.reverse()
for a in areas: print a
v=countrydict['Netherlands']
print v.agg_1x1(area)
v=countrydict['Slovakia']
print v.agg_1x1(area)
v=countrydict['Czech Republic']
print v.agg_1x1(area)
v=countrydict['Czechoslovakia']
print v.agg_1x1(area)
......@@ -6,21 +6,17 @@ from da.analysis.tools_transcom import *
# Aggregated olson ecosystem regions for CT Europe
aggregates=[\
("Forests/Wooded" , (1, 2, 3, 5, 8, 10,) ) ,\
("Grass/Shrubs" , (4, 6, 13) ) ,\
("Crops/Agriculture", (14,) ) ,\
("Tundra/Taiga" , (7,9,16) ) ,\
("Coastal/Wet" , (11,15,17) ) ,\
("Ice/Water/Deserts" , (12,18,19) ) \
]
agg_dict={}
for k,v in aggregates:
agg_dict[k]=v
ext_econams=[a for a,b in aggregates]
ext_ecocomps=[b for a,b in aggregates]
aggregates={
"Forest" : (1, 2, 3, 5, 8, 10,) ,\
"Grass" : (4, 6, 13) ,\
"Crops": (14,) ,\
"Tundra" : (7,9,16) ,\
"Coastal" : (11,15,17) ,\
"IceWaterDeserts" : (12,18,19) \
}
ext_econams=[a for a,b in aggregates.iteritems()]
ext_ecocomps=[b for a,b in aggregates.iteritems()]