Commit 7a35c19b authored by Peters, Wouter's avatar Peters, Wouter
Browse files

moved package structure to trunk

parent 3f50d921
.DEFAULT_GOAL: clean
.PHONY: help clean
help:
@echo " "
@echo " Usage:"
@echo " make clean # remove temporary (log) files"
@echo " "
clean:
# -------------------
/bin/rm -f jb.[0-9]*.jb
/bin/rm -f jb.[0-9]*.rc
/bin/rm -f jb.[0-9]*.log
/bin/rm -f *.pyc
/bin/rm -f */*/jb.[0-9]*.rc
/bin/rm -f */*/jb.[0-9]*.jb
/bin/rm -f */*/jb.[0-9]*.log
/bin/rm -f */*/*.pyc
/bin/rm -f das.[0-9]*.log
/bin/rm -f *.o*
/bin/rm -f *.po*
/bin/rm -f *.e*
/bin/rm -f *.pe*
!!! Info for the CarbonTracker data assimilation system
!datadir : /lfs0/projects/co2/input/
datadir : /data/CO2/carbontracker/ct08/
obs.input.dir : ${datadir}/obsnc/with_fillvalue/
obs.input.fname : obs_final.nc
ocn.covariance : ${datadir}/oif_p3_era40.dpco2.2000.01.hdf
bio.covariance : ${datadir}/covariance_bio_olson19.nc
deltaco2.prefix : oif_p3_era40.dpco2
regtype : olson19_oif30
nparameters : 240
random.seed : 4385
regionsfile : transcom_olson19_oif30.hdf
!!! Info for the CarbonTracker data assimilation system
datadir : /lfs0/projects/co2/input/ct_new_2010
obs.input.dir : ${datadir}/obsnc/with_fillvalue
obs.input.fname : obs_final.nc
ocn.covariance : ${datadir}/oif_p3_era40.dpco2.2000.01.hdf
bio.covariance : ${datadir}/covariance_bio_olson19.nc
deltaco2.prefix : oif_p3_era40.dpco2
regtype : olson19_oif30
nparameters : 240
random.seed : 4385
regionsfile : transcom_olson19_oif30.hdf
! Info on the data assimilation cycle
time.restart : False
time.start : 2000-01-01 00:00:00
time.finish : 2000-01-08 00:00:00
time.cycle : 7
time.nlag : 2
dir.da_run : ${HOME}/tmp/test_da
! Info on the DA system used
da.system : CarbonTracker
da.system.rc : carbontracker.rc
! Info on the forward model to be used
forecast.model : TM5
forecast.model.rc : ${HOME}/Modeling/TM5/ct_new_intel.rc
forecast.nmembers : 2
#!/usr/bin/env python
# control.py
"""
Author : peters
Revision History:
File created on 26 Aug 2010.
"""
import os
import sys
import logging
import datetime
################### Begin Class DaSystem ###################
class DaSystem(dict):
""" Information on the data assimilation system used. This is normally an rc-file with settings.
"""
def __init__(self,rcfilename):
"""
Initialization occurs from passed rc-file name
"""
self.LoadRc(rcfilename)
def __str__(self):
"""
String representation of a DaInfo object
"""
msg = "===============================================================" ; print msg
msg = "DA System Info rc-file is %s" % self.RcFileName ; print msg
msg = "===============================================================" ; print msg
return ""
def LoadRc(self,RcFileName):
"""
This method loads a DA System Info rc-file with settings for this simulation
"""
import da.tools.rc as rc
for k,v in rc.read(RcFileName).iteritems():
self[k] = v
self.RcFileName = RcFileName
self.DaRcLoaded = True
msg = 'DA System Info rc-file (%s) loaded successfully'%self.RcFileName ; logging.info(msg)
return True
def Initialize(self ):
"""
Initialize the object
"""
def Validate(self ):
"""
Validate the contents of the rc-file given a dictionary of required keys
"""
needed_rc_items={}
for k,v in self.iteritems():
if v == 'True' : self[k] = True
if v == 'False': self[k] = False
for key in needed_rc_items:
if not self.has_key(key):
status,msg = ( False,'Missing a required value in rc-file : %s' % key)
logging.error(msg)
raise IOError,msg
status,msg = ( True,'DA System Info settings have been validated succesfully' ) ; logging.debug(msg)
return None
################### End Class DaSystem ###################
if __name__ == "__main__":
pass
#!/usr/bin/env python
# obs.py
"""
Author : peters
Revision History:
File created on 28 Jul 2010.
"""
import os
import sys
import logging
import datetime
identifier = 'Observation baseclass'
version = '0.0'
################### Begin Class Observation ###################
class Observation(object):
""" an object that holds data + methods and attributes needed to manipulate observations for a DA cycle """
def __init__(self):
self.Identifier = self.getid()
self.Version = self.getversion()
self.Data = ObservationList([]) # Initialize with an empty list of obs
msg = 'Observation object initialized: %s'%self.Identifier ; logging.info(msg)
def getid(self):
return identifier
def getversion(self):
return identifier
def __str__(self):
""" Prints a list of Observation objects"""
return "This is a %s object, version %s"%(self.Identifier,self.Version)
def Initialize(self):
""" Perform all steps needed to start working with observational data, this can include moving data, concatenating files,
selecting datasets, etc.
"""
def Validate(self):
""" Make sure that data needed for the ObservationOperator (such as observation input lists, or parameter files)
are present.
"""
def AddObs(self):
""" Add the observation data to the Observation object. This is in a form of a list that goes under the name Data. The
list has as only requirement that it can return the observed+simulated values through a method "getvalues"
"""
def AddSimulations(self):
""" Add the simulation data to the Observation object.
"""
def AddModelDataMismatch(self, DaCycle):
"""
Get the model-data mismatch values for this cycle.
"""
def WriteSampleInfo(self, DaCycle):
"""
Write the information needed by the observation operator to a file. Return the filename that was written for later use
"""
################### End Class Observations ###################
################### Begin Class ObservationList ###################
class ObservationList(list):
""" This is a special type of list that holds observed sample objects. It has methods to extract data from such a list easily """
from numpy import array, ndarray
def getvalues(self,name,constructor=array):
from numpy import ndarray
result = constructor([getattr(o,name) for o in self])
if isinstance(result,ndarray):
return result.squeeze()
else:
return result
################### End Class MixingRatioList ###################
#!/usr/bin/env python
# model.py
"""
Author : peters
Revision History:
File created on 30 Aug 2010.
"""
import os
import sys
import logging
import datetime
identifier = 'GeneralObservationOperator'
version = '0.0'
################### Begin Class ObservationOperator ###################
class ObservationOperator(object):
"""
This is a class that defines an ObervationOperator. This object is used to control the sampling of
a statevector in the ensemble Kalman filter framework. The methods of this class specify which (external) code
is called to perform the sampling, and which files should be read for input and are written for output.
The baseclasses consist mainly of empty mehtods that require an application specific application
"""
def __init__(self):
""" The instance of an ObservationOperator is application dependent """
self.Identifier = self.getid()
self.Version = self.getversion()
msg = 'Observation Operator object initialized: %s'%self.Identifier ; logging.info(msg)
def getid(self):
return identifier
def getversion(self):
return version
def __str__(self):
return "This is a %s object, version %s"%(self.Identifier,self.Version)
def Initialize(self,DaCycle):
""" Perform all steps necessary to start the observation operator through a simple Run() call """
def ValidateInput(self,DaCycle):
""" Make sure that data needed for the ObservationOperator (such as observation input lists, or parameter files)
are present.
"""
def SaveData(self):
""" Write the data that is needed for a restart or recovery of the Observation Operator to the save directory """
################### End Class ObservationOperator ###################
if __name__ == "__main__":
pass
#!/usr/bin/env python
# optimizer.py
"""
Author : peters
Revision History:
File created on 28 Jul 2010.
"""
import os
import sys
import logging
import datetime
identifier = 'Optimizer baseclass'
version = '0.0'
################### Begin Class Optimizer ###################
class Optimizer(object):
"""
This creates an instance of an optimization object. It handles the minimum least squares optimization
of the state vector given a set of sample objects. Two routines will be implemented: one where the optimization
is sequential and one where it is the equivalent matrix solution. The choice can be made based on considerations of speed
and efficiency.
"""
def __init__(self):
self.Identifier = self.getid()
self.Version = self.getversion()
msg = 'Optimizer object initialized: %s'%self.Identifier ; logging.info(msg)
def getid(self):
return identifier
def getversion(self):
return version
def Initialize(self, dims):
self.nlag = dims[0]
self.nmembers = dims[1]
self.nparams = dims[2]
self.nobs = dims[3]
self.localization = False
self.localization = False
self.CreateMatrices()
return None
def CreateMatrices(self):
""" Create Matrix space needed in optimization routine """
import numpy as np
# mean state [X]
self.x = np.zeros( (self.nlag*self.nparams,), float)
# deviations from mean state [X']
self.X_prime = np.zeros( (self.nlag*self.nparams,self.nmembers,), float)
# mean state, transported to observation space [ H(X) ]
self.Hx = np.zeros( (self.nobs,), float)
# deviations from mean state, transported to observation space [ H(X') ]
self.HX_prime = np.zeros( (self.nobs,self.nmembers), float)
# observations
self.obs = np.zeros( (self.nobs,), float)
# covariance of observations
self.R = np.zeros( (self.nobs,self.nobs,), float)
# Total covariance of fluxes and obs in units of obs [H P H^t + R]
self.HPHR = np.zeros( (self.nobs,self.nobs,), float)
# Kalman Gain matrix
self.KG = np.zeros( (self.nlag*self.nparams,self.nobs,), float)
# flags of observations
self.flags = np.zeros( (self.nobs,), int)
def StateToMatrix(self,StateVector):
import numpy as np
for n in range(self.nlag):
members = StateVector.EnsembleMembers[n]
self.x[n*self.nparams:(n+1)*self.nparams] = members[0].ParameterValues[n]
self.X_prime[n*self.nparams:(n+1)*self.nparams,:] = np.transpose(np.array([m.ParameterValues for m in members]))
self.X_prime = self.X_prime - self.x[:,np.newaxis] # make into a deviation matrix
self.obs[:] = StateVector.EnsembleMembers[-1][0].ModelSample.Data.getvalues('obs')
self.Hx[:] = StateVector.EnsembleMembers[-1][0].ModelSample.Data.getvalues('simulated')
for m,mem in enumerate(StateVector.EnsembleMembers[-1]):
self.HX_prime[:,m] = mem.ModelSample.Data.getvalues('simulated')
self.HX_prime = self.HX_prime - self.Hx[:,np.newaxis] # make a deviation matrix
self.R[:,:] = np.identity(self.nobs)
return None
def MatrixToState(self,StateVector):
import numpy as np
for n in range(self.nlag):
members = StateVector.EnsembleMembers[n]
for m,mem in enumerate(members):
members[m].ParameterValues[:] = self.X_prime[n*self.nparams:(n+1)*self.nparams,m] + self.x[n*self.nparams:(n+1)*self.nparams]
return None
def SerialMinimumLeastSquares(self):
""" Make minimum least squares solution by looping over obs"""
import numpy as np
import numpy .linalg as la
tvalue=1.97591
for n in range(self.nobs):
if self.flags[n] == 1: continue
PHt = 1./(self.nmembers-1)*np.dot(self.X_prime,self.HX_prime[n,:])
self.HPHR[n,n] = 1./(self.nmembers-1)*(self.HX_prime[n,:]*self.HX_prime[n,:]).sum()+self.R[n,n]
self.KG[:,n] = PHt/self.HPHR[n,n]
dummy = self.Localize(n)
alpha = np.double(1.0)/(np.double(1.0)+np.sqrt( (self.R[n,n])/self.HPHR[n,n] ) )
res = self.obs[n]-self.Hx[n]
self.x[:] = self.x + self.KG[:,n]*res
for r in range(self.nmembers):
self.X_prime[:,r] = self.X_prime[:,r]-alpha*self.KG[:,n]*(self.HX_prime[n,r])
#WP !!!! Very important to first do all obervations from n=1 through the end, and only then update 1,...,n. The current observation
#WP should always be updated last because it features in the loop of the adjustments !!!!
for m in range(n+1,self.nobs):
res = self.obs[n]-self.Hx[n]
fac = 1.0/(self.nmembers-1)*(self.HX_prime[n,:]*self.HX_prime[m,:]).sum()/self.HPHR[n,n]
self.Hx[m] = self.Hx[m] + fac * res
self.HX_prime[m,:] = self.HX_prime[m,:] - alpha*fac*self.HX_prime[n,:]
for m in range(1,n+1):
res = self.obs[n]-self.Hx[n]
fac = 1.0/(self.nmembers-1)*(self.HX_prime[n,:]*self.HX_prime[m,:]).sum()/self.HPHR[n,n]
self.Hx[m] = self.Hx[m] + fac * res
self.HX_prime[m,:] = self.HX_prime[m,:] - alpha*fac*self.HX_prime[n,:]
def BulkMinimumLeastSquares(self):
""" Make minimum least squares solution by solving matrix equations"""
import numpy as np
import numpy.linalg as la
# Create full solution, first calculate the mean of the posterior analysis
HPH = np.dot(self.HX_prime,np.transpose(self.HX_prime))/(self.nmembers-1) # HPH = 1/N * HX' * (HX')^T
self.HPHR[:,:] = HPH+self.R # HPHR = HPH + R
HPb = np.dot(self.X_prime,np.transpose(self.HX_prime))/(self.nmembers-1) # HP = 1/N X' * (HX')^T
self.KG[:,:] = np.dot(HPb,la.inv(self.HPHR)) # K = HP/(HPH+R)
for n in range(self.nobs):
dummy = self.Localize(n)
self.x[:] = self.x + np.dot(self.KG,self.obs-self.Hx) # xa = xp + K (y-Hx)
# And next make the updated ensemble deviations. Note that we calculate P by using the full equation (10) at once, and
# not in a serial update fashion as described in Whitaker and Hamill.
# For the current problem with limited N_obs this is easier, or at least more straightforward to do.
I = np.identity(self.nlag*self.nparams)
sHPHR = la.cholesky(self.HPHR) # square root of HPH+R
part1 = np.dot(HPb,np.transpose(la.inv(sHPHR))) # HP(sqrt(HPH+R))^-1
part2 = la.inv(sHPHR+np.sqrt(self.R)) # (sqrt(HPH+R)+sqrt(R))^-1
Kw = np.dot(part1,part2) # K~
self.X_prime[:,:] = np.dot(I,self.X_prime)-np.dot(Kw,self.HX_prime) # HX' = I - K~ * HX'
P_opt = np.dot(self.X_prime,np.transpose(self.X_prime))/(self.nmembers-1)
# Now do the adjustments of the modeled mixing ratios using the linearized ensemble. These are not strictly needed but can be used
# for diagnosis.
part3 = np.dot(HPH,np.transpose(la.inv(sHPHR))) # HPH(sqrt(HPH+R))^-1
Kw = np.dot(part3,part2) # K~
self.Hx[:] = self.Hx + np.dot(np.dot(HPH,la.inv(self.HPHR)),self.obs-self.Hx) # Hx = Hx+ HPH/HPH+R (y-Hx)
self.HX_prime[:,:] = self.HX_prime-np.dot(Kw,self.HX_prime) # HX' = HX'- K~ * HX'
msg = 'Minimum Least Squares solution was calculated, returning' ; logging.info(msg)
return None
def SetLocalization(self):
""" determine which localization to use """
self.localization = True
self.localizetype = "None"
msg = "Current localization option is set to %s"%self.localizetype ; logging.info(msg)
def Localize(self,n):
""" localize the Kalman Gain matrix """
import numpy as np
if not self.localization: return
return
################### End Class Optimizer ###################
if __name__ == "__main__":
sys.path.append('../../')
import os
import sys
from da.tools.general import StartLogger
from da.tools.initexit import CycleControl
from da.ct.statevector import CtStateVector, PrepareState
from da.ct.obs import CtObservations
import numpy as np
import datetime
import da.tools.rc as rc
opts = ['-v']
args = {'rc':'../../da.rc','logfile':'da_initexit.log','jobrcfilename':'test.rc'}
StartLogger()
DaCycle = CycleControl(opts,args)
DaCycle.Initialize()
print DaCycle
StateVector = PrepareState(DaCycle)
samples = CtObservations(DaCycle.DaSystem,datetime.datetime(2005,3,5))
dummy = samples.AddObs()
dummy = samples.AddSimulations('/Users/peters/tmp/test_da/output/20050305/samples.000.nc')
nobs = len(samples.Data)
dims = ( int(DaCycle['time.nlag']),
int(DaCycle['forecast.nmembers']),
int(DaCycle.DaSystem['nparameters']),
nobs, )
opt = CtOptimizer(dims)
opt.StateToMatrix(StateVector)
opt.MinimumLeastSquares()
opt.MatrixToState(StateVector)
#!/usr/bin/env python
# jobcontrol.py
"""
Author : peters
Revision History:
File created on 06 Sep 2010.
"""
import sys
import os
import logging
import subprocess
std_joboptions={'jobname':'test','jobaccount':'co2','jobnodes':'nserial 1','jobshell':'/bin/sh','depends':'','jobtime':'00:30:00'}