Commit 68a74bd0 authored by Woude, Auke van der's avatar Woude, Auke van der
Browse files

write fluxes to files

parents 36119604 e79032dc
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>ctdas_light_refactor_new</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.python.pydev.PyDevBuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.python.pydev.pythonNature</nature>
</natures>
</projectDescription>
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?eclipse-pydev version="1.0"?>
<pydev_project>
<pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH">
<path>/ctdas_light_refactor_new</path>
</pydev_pathproperty>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python 2.7</pydev_property>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Default</pydev_property>
</pydev_project>
......@@ -59,7 +59,7 @@ categories = {
'CO2.uncertainty': 'n',
'CO': -4.32,
'CO.uncertainty': 'l'},
'cars middle road': {'name': 'cars middle road',
'heavy duty': {'name': 'heavy duty',
'model': 1,
'spatial': 'Road transport',
'temporal': 't_carmr',
......
This diff is collapsed.
......@@ -30,8 +30,6 @@ from numpy import array, logical_and
import glob
import subprocess
import netCDF4 as nc
import rasterio
from rasterio.enums import Resampling
from functools import partial
from multiprocessing import Process, Pool
......@@ -43,6 +41,8 @@ from da.tools.general import create_dirs, to_datetime
from da.ccffdas.emissionmodel import EmisModel
from da.baseclasses.observationoperator import ObservationOperator
#import matplotlib.pyplot as plt
try: # Import memoization. This speeds up the functions a lot.
from memoization import cached
except: # The memoization package is not always included. Import the memoization from #functools
......@@ -108,7 +108,10 @@ def run_STILT(dacycle, footprint, datepoint, species, path, i_member):
spatial_emissions = get_spatial_emissions(dacycle, i_member, species, path, datepoint)
# multiply footprint with emissions for each hour in the back trajectory. Indices: Time, Category, lat, lon
foot_flux = (footprint[None, :, :, :] * spatial_emissions[:, :, :, :]).sum()
if dacycle.dasystem['cat.sum_emissions']:
foot_flux = (footprint * spatial_emissions).sum()
else:
foot_flux = (footprint[:, None, :, :] * spatial_emissions[:, :, :, :]).sum()
concentration_increase = float(recalc_factors[species]) * foot_flux
return concentration_increase
......@@ -129,15 +132,14 @@ def get_spatial_emissions(dacycle, i_member, species, path, datepoint):
emisfile = path +'prior_spatial_{0:03d}.nc'.format(i_member) #format: species[SNAP, time, lon, lat]
f = io.ct_read(emisfile,method='read')
emis=f.get_variable(species)
emis = emis[:, indices, :, :].astype(np.float32) # index the interesting times
emis = emis[indices].astype(np.float32) # index the interesting times
f.close()
#plt.figure()
#plt.imshow(np.log(emis[0, 0, :, :]), cmap='binary')
#plt.savefig('ff.png')
return emis
def average2d(arr, new_shape):
""" Function to average the paint-by-colour NEE to the domain size"""
shape = (new_shape[0], arr.shape[0] // new_shape[0],
new_shape[1], arr.shape[1] // new_shape[1])
return arr.reshape(shape).mean(-1).mean(1)
def get_time_index_nc(time=None, startdate=None):
"""Function that gets the time index from the flux files
......@@ -201,11 +203,7 @@ class STILTObservationOperator(ObservationOperator):
self.categories = categories # Imported from file at top
self.inputdir = dacycle.dasystem['inputdir']
self.paramdict = rc.read(dacycle.dasystem['paramdict'])
self.pftfile = dacycle.dasystem['file.pft']
with rasterio.Env(logging.getLogger().setLevel(logging.DEBUG)):
with rasterio.open(self.pftfile) as dataset:
self.pft_shape_orig = dataset.read().shape[-2:]
self.lon_start = float(dacycle.dasystem['domain.lon.start'])
self.lon_end = float(dacycle.dasystem['domain.lon.end'])
......@@ -288,29 +286,16 @@ class STILTObservationOperator(ObservationOperator):
emismodel = EmisModel()
emismodel.setup(self.dacycle)
self.emismodel = emismodel
emismodel.get_emis(self.dacycle, self.allsamples, indices=self.indices, do_pseudo=0)
emismodel.get_emis(self.dacycle, indices=self.indices)
logging.info('Emissions calculated')
def get_biosphere_emissions(self):
with nc.Dataset(self.dacycle.dasystem['biosphere_fluxdir']) as biosphere_fluxes:
lonsib = biosphere_fluxes['lonsib'][:]
latsib = biosphere_fluxes['latsib'][:]
nee = biosphere_fluxes['nee'][self.indices, :, :]
nee = np.array(nee, dtype=np.float32)
lu_pref = biosphere_fluxes['lu_pref'][:]
lon_ind_start = st.lonindex(self.lon_start)
lon_ind_end = st.lonindex(self.lon_end)
lat_ind_start = st.latindex(self.lat_start)
lat_ind_end = st.latindex(self.lat_end)
nee_2d = np.flip(st.convert_2d(nee, latsib, lonsib), axis=2)
self.nee = nee_2d[:, :,
lat_ind_end: lat_ind_start,
lon_ind_start: lon_ind_end].astype(np.float32)
lu_pref = np.flip(st.convert_2d(lu_pref, latsib, lonsib), axis=1)
self.lu_pref = lu_pref[:,
lat_ind_end: lat_ind_start,
lon_ind_start: lon_ind_end]
nee = np.flip(biosphere_fluxes['NEE'][self.indices, :, :].astype(np.float32), axis=1)
self.nee = nee
#plt.figure()
#plt.imshow(nee[0, :, :], cmap='binary')
#plt.savefig('nee.png')
def prepare_biosphere(self):
"""Function that prepares the biosphere fluxes on a map
......@@ -323,73 +308,77 @@ class STILTObservationOperator(ObservationOperator):
# First, get the time indices
indices = self.indices
# We cannot, because of memory, use all detail of CORINE.
# The grids have to be compatible with eachother
# Therefore, we will coarsen it, by the following factors:
# Coarsen in x and y:
sibshape = self.lu_pref.shape[1:]
coarsen = [None, None]
for i in range(15, 100):
COARSEN_x = i
COARSEN_y = i
new_shape = tuple(int(self.pft_shape_orig[i] / COARSEN_x -
(self.pft_shape_orig[i] / COARSEN_y % sibshape[i]))
for i in range(2))
scaling_factors = np.array(np.array(new_shape) / np.array(self.domain_shape))
for j, k in enumerate(scaling_factors):
if k % 1 == 0:
if not coarsen[j]:
coarsen[j] = i
if not None in coarsen:
break
new_shape = tuple([int(self.pft_shape_orig[i] / coarsen[i] -
(self.pft_shape_orig[i] / coarsen[i] % sibshape[i]))
for i in range(2)])
logging.debug('Shape for paint-by-number: {}'.format(new_shape))
with rasterio.Env(logging.getLogger().setLevel(logging.DEBUG)):
with rasterio.open(self.pftfile) as dataset:
pftdata = dataset.read(out_shape=new_shape,
resampling=Resampling.mode)
pftdata = pftdata.reshape(pftdata.shape[1:])
scaling_factors = np.array(np.array(new_shape) / np.array(self.lu_pref[0].shape), dtype=int)
with nc.Dataset(self.pftfile) as ds:
pftdata = np.flipud(ds['pft'][:]).astype(int)
self.pftdata = pftdata
#plt.figure()
#plt.imshow(pftdata[:, :], cmap='binary')
#plt.savefig('pft.png')
logging.debug('Starting paint-by-number')
times = len(self.nee)
timeslist = list(range(times))
lus = np.kron(self.lu_pref, np.ones((scaling_factors), dtype=self.lu_pref.dtype))
pool = Pool(times)
pool = Pool(self.forecast_nmembers)
# Create the function that calculates the conentration
func = partial(self.paint_by_number, self.nee, self.param_values, scaling_factors, lus,
pftdata, self.emismodel.find_in_state, self.domain_shape)
scaled_nees = pool.map(func, timeslist)
func = partial(self.paint_by_number, self.nee, self.param_values,
pftdata, self.emismodel.find_in_state, self.average2d, self.domain_shape)
scaled_nees = pool.map(func, list(range(self.forecast_nmembers)))
pool.close()
pool.join()
self.nee_mem = np.asarray(scaled_nees).swapaxes(0, 1) # Indices: mem, time, *domain_shape
self.nee_mem = np.asarray(scaled_nees) # Indices: mem, time, *domain_shape
logging.debug('Paint-by-number done, biosphere prepared')
self.write_biosphere_fluxes(self.nee_mem[0, self.btime:, :, :])
def write_biosphere_fluxes(self, values, qual='prior'):
# Write the biosphere fluxes to a file
bio_emis_file = os.path.join(self.outputdir, 'bio_emissions_{}_{}.nc'.format(qual, self.dacycle['time.sample.start'].strftime('%Y%m%d')))
f = io.CT_CDF(bio_emis_file, method='create')
dimtime= f.add_dim('time', values.shape[0])
dimlat = f.add_dim('lat', values.shape[1])
dimlon = f.add_dim('lon', values.shape[2])
savedict = io.std_savedict.copy()
savedict['name'] = 'CO2'
savedict['long_name'] = 'Biosphere CO2 emissions'
savedict['units'] = "micromole/m2/s"
dims = dimtime + dimlat + dimlon
savedict['dims'] = dims
savedict['values'] = values
savedict['dtype'] = 'float'
f.add_data(savedict)
f.close()
#plt.figure()
#plt.imshow(np.log(self.nee_mem[0, 0, :, :]), cmap='binary')
#plt.savefig('bio.png')
@staticmethod
def paint_by_number(nee, all_param_values, scaling_factors, lus, pftdata, find_in_state, domain_shape, time_ind):
nmembers = len(all_param_values)
scaled_nees_time = np.zeros((nmembers, *domain_shape), dtype=np.float32)
def average2d(arr, new_shape):
""" Function to average the paint-by-colour NEE to the domain size"""
shape = (new_shape[0], arr.shape[0] // new_shape[0],
new_shape[1], arr.shape[1] // new_shape[1])
return arr.reshape(shape).mean(-1).mean(1)
@staticmethod
def paint_by_number(nee, all_param_values, pftdata,
find_in_state, average2d, domain_shape, member):
scaled_nees = np.zeros((len(nee), *domain_shape), dtype=np.float32)
all_lutypes = np.unique(pftdata.reshape(-1))
nee_time = nee[time_ind]
nee = np.kron(nee_time, np.ones((scaling_factors), dtype=np.float32))
param_values = all_param_values[member]
newnee = np.zeros_like(nee)
for lutype_ind, lutype in enumerate(all_lutypes):
mask = np.where(pftdata==lutype, 1, 0)
nee_lut = np.where(lus==lutype, nee, 0)
for mem, values in enumerate(all_param_values):
param_values = all_param_values[mem]
index_in_state = find_in_state(param_values, 'BIO', str(lutype), return_index=True)
if index_in_state: param_value = param_values[index_in_state]
else: param_value = 1
nee_lut_mem = (nee_lut.sum(axis=0) * mask) * param_value
nee_lut_mem_scaled = average2d(nee_lut_mem, domain_shape)
scaled_nees_time[mem] += nee_lut_mem_scaled
return(scaled_nees_time)
index_in_state = find_in_state(param_values, 'BIO', str(lutype), return_index=True)
if index_in_state:
param_value = param_values[index_in_state]
else: param_value = 1
mask = np.where(pftdata == lutype, 1, 0)
newnee += (nee * mask * param_value)
for i, n in enumerate(newnee):
scaled_nees[i] = average2d(n, domain_shape)
return scaled_nees
def prepare_c14(self):
"""Function that loads the nuclear power temporal variation"""
......@@ -425,6 +414,12 @@ class STILTObservationOperator(ObservationOperator):
f = glob.glob(fname)[0]
footprint = nc.Dataset(f)['foot']
#plt.figure()
#plt.imshow(np.log(footprint[0, :, :]), cmap='binary')
#plt.title(site)
#plt.savefig('foot.png')
# Flip, because the times are negative
return np.flipud(footprint).astype(np.float32)
......@@ -552,11 +547,31 @@ class STILTObservationOperator(ObservationOperator):
def run_forecast_model(self,dacycle,do_pseudo,adv):
logging.info('Preparing run...')
#self.make_observations(dacycle)
self.prepare_run(do_pseudo,adv)
logging.info('Starting to simulate concentrations...')
self.run(dacycle,do_pseudo)
self.save_data()
def make_observations(self, dacycle, do_pseudo=0, adv=0):
logging.info('Prepare run...')
self.prepare_run(do_pseudo, adv)
logging.info('Starting to simulate concentrations...')
import pandas as pd
import copy
all_samples = copy.deepcopy(self.samples)
sites = set([sample.code.split('/')[1].split('_')[1] for sample in all_samples])
print(sites)
for site in sites:
current_samples = [sample for sample in all_samples if site in sample.code]
self.samples = current_samples
self.run(dacycle, do_pseudo)
times = [sample.xdate for sample in current_samples]
df = pd.DataFrame(self.calculated_concentrations, times, columns=['concentration'])
df.to_csv(site + dtm.datetime.strftime(dacycle['time.sample.start'], "%Y%m%d") + '.csv')
save_and_submit(dacycle, statevector)
def run(self, dacycle, do_pseudo):
"""Function that calculates the simulated concentrations for all samples
Loops over all samples, looks for relevant information (e.g. footprint)
......@@ -603,7 +618,6 @@ class STILTObservationOperator(ObservationOperator):
previous_day = datepoint.day
previously_done_site = site
logging.debug('Cache info: {}'.format(get_spatial_emissions.cache_info()))
# This is how ingrid did it, so I will too...
self.mod = np.array(calculated_concentrations)
......@@ -634,6 +648,7 @@ class STILTObservationOperator(ObservationOperator):
# First, add noise (this represents errors in the transport model
noise = np.random.normal(0, sample.mdm)
#noise = 0
# Some different cases for different species.
# First case: Species is not c14:
......
......@@ -16,6 +16,7 @@ import os
import sys
import datetime
import copy
import numpy as np
header = '\n\n *************************************** '
footer = ' *************************************** \n '
......@@ -32,7 +33,7 @@ def ensemble_smoother_pipeline(dacycle, platform, dasystem, samples, statevector
invert(dacycle, statevector, optimizer, obsoperator)
#advance(dacycle, samples, statevector, obsoperator)
advance(dacycle, samples, statevector, obsoperator, optimizer)
save_and_submit(dacycle, statevector)
logging.info("Cycle finished...exiting pipeline")
......@@ -115,7 +116,7 @@ def forward_pipeline(dacycle, platform, dasystem, samples, statevector, obsopera
# Finally, we run forward with these parameters
advance(dacycle, samples, statevector, obsoperator)
advance(dacycle, samples, statevector, obsoperator, optimizer)
# In save_and_submit, the posterior statevector will be added to the savestate.nc file, and it is added to the copy list.
# This way, we have both the prior and posterior data from another run copied into this assimilation, for later analysis.
......@@ -400,31 +401,41 @@ def invert(dacycle, statevector, optimizer, obsoperator):
def advance(dacycle, samples, statevector, obsoperator):
def advance(dacycle, samples, statevector, obsoperator, optimizer):
""" Advance the filter state to the next step """
# This is the advance of the modeled CO2 state. Optionally, routines can be added to advance the state vector (mean+covariance)
# Then, restore model state from the start of the filter
logging.info(header + "starting advance" + footer)
logging.info("Sampling model will be run over 1 cycle")
obsoperator.get_initial_data(samples)
sample_step(dacycle, samples, statevector, obsoperator, 0, True)
dacycle.restart_filelist.extend(obsoperator.restart_filelist)
dacycle.output_filelist.extend(obsoperator.output_filelist)
logging.debug("Appended ObsOperator restart and output file lists to dacycle for collection ")
dacycle.output_filelist.append(dacycle['ObsOperator.inputfile'])
logging.debug("Appended Observation filename to dacycle for collection: %s"%(dacycle['ObsOperator.inputfile']))
sampling_coords_file = os.path.join(dacycle['dir.input'], 'sample_coordinates_%s.nc' % dacycle['time.sample.stamp'])
if os.path.exists(sampling_coords_file):
outfile = os.path.join(dacycle['dir.output'], 'sample_auxiliary_%s.nc' % dacycle['time.sample.stamp'])
samples.write_sample_auxiliary(outfile, obsoperator.simulated_file)
else: logging.warning("Sample auxiliary output not written, because input file does not exist (no samples found in obspack)")
optimised_nee = obsoperator.paint_by_number(obsoperator.nee, [optimizer.x], obsoperator.pftdata,
obsoperator.emismodel.find_in_state, obsoperator.average2d, obsoperator.domain_shape, 0)
true_nee = obsoperator.paint_by_number(obsoperator.nee, [np.ones_like(optimizer.x)], obsoperator.pftdata,
obsoperator.emismodel.find_in_state, obsoperator.average2d, obsoperator.domain_shape, 0)
obsoperator.write_biosphere_fluxes(optimised_nee[obsoperator.btime:], qual='optimised')
obsoperator.write_biosphere_fluxes(true_nee[obsoperator.btime:], qual='true')
time_profiles = obsoperator.emismodel.make_time_profiles(obsoperator.indices)
obsoperator.emismodel.get_emissions(dacycle, time_profiles, member='optimised')
obsoperator.emismodel.get_emissions(dacycle, time_profiles, member='true')
# logging.info("Sampling model will be run over 1 cycle")
# obsoperator.get_initial_data(samples)
# sample_step(dacycle, samples, statevector, obsoperator, 0, True)
# dacycle.restart_filelist.extend(obsoperator.restart_filelist)
# dacycle.output_filelist.extend(obsoperator.output_filelist)
# logging.debug("Appended ObsOperator restart and output file lists to dacycle for collection ")
#
# dacycle.output_filelist.append(dacycle['ObsOperator.inputfile'])
# logging.debug("Appended Observation filename to dacycle for collection: %s"%(dacycle['ObsOperator.inputfile']))
# sampling_coords_file = os.path.join(dacycle['dir.input'], 'sample_coordinates_%s.nc' % dacycle['time.sample.stamp'])
# if os.path.exists(sampling_coords_file):
# outfile = os.path.join(dacycle['dir.output'], 'sample_auxiliary_%s.nc' % dacycle['time.sample.stamp'])
# samples.write_sample_auxiliary(outfile, obsoperator.simulated_file)
# else: logging.warning("Sample auxiliary output not written, because input file does not exist (no samples found in obspack)")
def save_and_submit(dacycle, statevector):
""" Save the model state and submit the next job """
......
......@@ -228,6 +228,7 @@ class CO2StateVector(StateVector):
if self.prop == 1 or dacycle['time.restart']==False:
file=os.path.join(self.obsdir,self.pparam)
f = io.ct_read(file, 'read')
#prmval = f.get_variable('prior_values')[:self.nparams]
prmval = f.get_variable('prior_values')[:self.nparams]
f.close()
# 2: Propagate mean
......@@ -285,6 +286,7 @@ class CO2StateVector(StateVector):
if self.prop == 1 or dacycle['time.restart']==False:
file=os.path.join(self.obsdir,self.pparam)
f = io.ct_read(file, 'read')
#prmval = f.get_variable('prior_values')[:self.nparams]
prmval = f.get_variable('prior_values')[:self.nparams]
f.close()
else: prmval = self.prmval
......
!!! Info for the CarbonTracker data assimilation system
basepath : /projects/0/ctdas/awoude/develop/
name :
strategy : CO2
datadir : /projects/0/ctdas/RINGO/inversions/Data
strategy : Paint1c
datadir : /projects/0/ctdas/RINGO/inversions/Data/
inputdir : ${basepath}/input/
outputdir : ${basepath}/output/
restartdir : ${basepath}/restart/
......@@ -17,16 +18,19 @@ do.c14integrated: 0
do.c14targeted: 0
obs.spec.name : CO2
! number of emission categories defined in the emission model
obs.cat.nr : 14
obs.cat.nr : 10
! Flag to calculate the summed (=total) emission, or specific per category
cat.sum_emissions: False
! For Rdam obs
obs.sites.rc : ${datadir}/sites_weights2.rc
! number of parameters
! In the covmatrix and statevector, the ff parameters are first, then the bio parameters!
nffparameters : 24
nbioparameters : 6
nbioparameters : 4
nparameters : ${nffparameters} + ${nbioparameters}
file.pft : /projects/0/ctdas/RINGO/inversions/Data/SiBPFTs.nc
file.pft : ${datadir}/PFT_highres.nc
file.timeprofs : ${datadir}/CAMS_TEMPO_Tprof_subset.nc
! Settings for the domain:
domain.lon.start : -5.95
......@@ -66,7 +70,8 @@ run.obsflag : 0
! back trajectory time of STILT footprints, also applied to OPS (in hours)
run.backtime : 24
biosphere_fluxdir : /projects/0/ctdas/RINGO/inversions/Data/SiBfile.nc
biosphere_fluxdir : ${datadir}/SiBNEE.2016.01-2016.02_painted.nc
!biosphere_fluxdir : ${datadir}/SiBfile.nc
files_startdate : 2016-01-01 00:00:00
! choose propagation scheme:
......
......@@ -26,7 +26,7 @@ import subprocess
from da.baseclasses.platform import Platform
std_joboptions = {'jobname':'test', 'jobaccount':'co2', 'jobtype':'serial', 'jobshell':'/bin/sh', 'depends':'', 'jobtime':'24:00:00', 'jobinput':'/dev/null', 'jobnodes':'1', 'jobtasks':'', 'modulenetcdf':'netcdf/4.1.2', 'networkMPI':'','jobqueue': 'normal'}
std_joboptions = {'jobname':'test', 'jobaccount':'co2', 'jobtype':'serial', 'jobshell':'/bin/sh', 'depends':'', 'jobtime':'24:00:00', 'jobinput':'/dev/null', 'jobnodes':'1', 'jobtasks':'', 'modulenetcdf':'netcdf/4.1.2', 'networkMPI':'','jobqueue': 'short'}
class CartesiusPlatform(Platform):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment