Commit 248bc596 authored by Luijkx, Ingrid's avatar Luijkx, Ingrid
Browse files

Added preprocessing observations pipeline, which reads the obspack files and...

Added preprocessing observations pipeline, which reads the obspack files and writes them as pickle files. Small fix in obs_gvp_co2.py for encoding on special characters in dataset_summary. Small bug fix in observationoperator_tm5_cteco2.py in logging message. Small string conversion bug fix in initexit_cteco2 for keeping opts from initial job. Added module load NCO for platform file Aether. Fix for statevectorNHgridded_cteco2.py for optimizing ocean fluxes, which per default should be true, but was now false in case the ocn.optimize key was not present.
parent ec3c057d
......@@ -566,9 +566,9 @@ class CycleControl(dict):
nextrestartfilename = self['da.restart.fname'].replace(jobid,nextjobid)
nextlogfilename = logfile.replace(jobid,nextjobid)
if self.daplatform.ID == 'WU capegrim':
template += '\nexport icycle_in_job=%d\npython3 %s rc=%s %s >&%s &\n' % (cycle+1,execcommand, nextrestartfilename, str.join(str(self.opts), ''), nextlogfilename,)
template += '\nexport icycle_in_job=%d\npython3 %s rc=%s %s >&%s &\n' % (cycle+1,execcommand, nextrestartfilename, ''.join(self.opts), nextlogfilename,)
else:
template += '\nexport icycle_in_job=%d\npython3 %s rc=%s %s >&%s\n' % (cycle+1,execcommand, nextrestartfilename, str.join(str(self.opts), ''), nextlogfilename,)
template += '\nexport icycle_in_job=%d\npython3 %s rc=%s %s >&%s\n' % (cycle+1,execcommand, nextrestartfilename, ''.join(self.opts), nextlogfilename,)
# write and submit
self.daplatform.write_job(jobfile, template, jobid)
......
......@@ -95,7 +95,7 @@ class ObsPackObservations(Observations):
# Step 1: Read list of available site files in package
infile = os.path.join(self.obspack_dir, 'summary', '%s_dataset_summary.txt' % (self.obspack_id,))
f = open(infile, 'r')
f = open(infile, 'r+', encoding="utf-8")
lines = f.readlines()
f.close()
......
......@@ -524,7 +524,7 @@ class TM5ObservationOperator(ObservationOperator):
tm5submitdir = os.path.join(self.tm_settings[self.rundirkey])
logging.info('tm5submitdir', tm5submitdir)
logging.info('tm5submitdir: %s' %tm5submitdir)
# Go to executable directory and start the subprocess, using a new logfile
......
......@@ -28,6 +28,7 @@ import os
import sys
import datetime
import copy
import pickle
header = '\n\n *************************************** '
footer = ' *************************************** \n '
......@@ -168,18 +169,18 @@ def analysis_pipeline(dacycle, platform, dasystem, samples, statevector):
save_weekly_avg_ext_tc_data(dacycle)
save_weekly_avg_agg_data(dacycle,region_aggregate='transcom')
save_weekly_avg_agg_data(dacycle,region_aggregate='transcom_extended')
save_weekly_avg_agg_data(dacycle,region_aggregate='olson')
save_weekly_avg_agg_data(dacycle,region_aggregate='olson_extended')
save_weekly_avg_agg_data(dacycle,region_aggregate='country')
#save_weekly_avg_agg_data(dacycle,region_aggregate='olson')
#save_weekly_avg_agg_data(dacycle,region_aggregate='olson_extended')
#save_weekly_avg_agg_data(dacycle,region_aggregate='country')
logging.info(header + "Starting monthly and yearly averages" + footer)
time_avg(dacycle,'flux1x1')
time_avg(dacycle,'transcom')
time_avg(dacycle,'transcom_extended')
time_avg(dacycle,'olson')
time_avg(dacycle,'olson_extended')
time_avg(dacycle,'country')
#time_avg(dacycle,'olson')
#time_avg(dacycle,'olson_extended')
#time_avg(dacycle,'country')
logging.info(header + "Finished analysis" + footer)
......@@ -221,6 +222,21 @@ def archive_pipeline(dacycle, platform, dasystem):
jobid = platform.submit_job(jobfile, joblog=logfile)
def preprocessobs_pipeline(dacycle, platform, dasystem, samples, statevector, obsoperator):
""" The main point of entry for the pipeline """
sys.path.append(os.getcwd())
samples = samples if isinstance(samples,list) else [samples]
logging.info(header + "Initializing current cycle" + footer)
start_job(dacycle, dasystem, platform, statevector, samples, obsoperator)
prepare_state(dacycle, statevector)
sample_state(dacycle, samples, statevector, obsoperator)
save_and_submit(dacycle, statevector)
logging.info("Cycle finished...exiting pipeline")
def start_job(dacycle, dasystem, platform, statevector, samples, obsoperator):
""" Set up the job specific directory structure and create an expanded rc-file """
......@@ -355,12 +371,22 @@ def sample_step(dacycle, samples, statevector, obsoperator, lag, advance=False):
statevector.write_members_to_file(lag, dacycle['dir.input'], obsoperator=obsoperator)
for sample in samples:
for i,sample in enumerate(samples):
sample.setup(dacycle)
# Read observations + perform observation selection
sample.add_observations()
obspickle_filename = os.path.join(dacycle['dir.restart'], sample.get_samples_type()+'samples_%s.pickle' % dacycle['time.sample.stamp'])
if os.path.isfile(obspickle_filename):
if dacycle['da.preprocessobs'] == True: continue
sample = pickle.load(open(obspickle_filename, 'rb'))
logging.info("Loaded the samples object from file: %s"%obspickle_filename)
else:
sample.add_observations()
if dacycle['da.preprocessobs'] == True:
pickle.dump(sample, open(obspickle_filename, 'wb'), -1)
logging.info("Saved the samples object to file: %s"%obspickle_filename)
continue
# Add model-data mismatch to all samples, this *might* use output from the ensemble in the future??
sample.add_model_data_mismatch(dacycle.dasystem['obs.sites.rc'])
......@@ -370,9 +396,13 @@ def sample_step(dacycle, samples, statevector, obsoperator, lag, advance=False):
# Write filename to dacycle, and to output collection list
dacycle['ObsOperator.inputfile.'+sample.get_samples_type()] = sampling_coords_file
samples[i] = sample
del sample
if dacycle['da.preprocessobs'] == True: return
# Run the observation operator
obsoperator.run_forecast_model(samples)
......
......@@ -101,6 +101,7 @@ class AetherPlatform(Platform):
"""#SBATCH -t jobtime \n""" + \
"""#SBATCH -o joblog \n""" + \
"""module load TM5-MP \n""" + \
"""module load NCO/4.6.6-intel-2017a \n""" + \
"""module load netcdf4-python/1.2.9-intel-2017a-Python-3.6.1 \n""" + \
"""\n"""
......
!!! Info for the CarbonTracker data assimilation system
datadir : /mnt/beegfs/user/gkoren/ctdas_2012
! For ObsPack
obspack.input.id : obspack_co2_111_MERGE_GVP6.1_NRT6.1.1_2021-07-05
obspack.input.dir : ${datadir}/obspacks/${obspack.input.id}
obs.sites.rc : ${obspack.input.dir}/summary/sites_weights_GCP2021.rc
obs.obspack.localization : False
obs.obspack.localization.length.tower: 500
obs.obspack.localization.length.land : 300
ocn.covariance : ${datadir}/oceans/oif/cov_ocean.2000.01.nc
deltaco2.prefix : oif_p3_era40.dpco2
bio.cov.dir : ${datadir}/covariances/gridded_NH/
bio.cov.prefix : cov_ecoregion
regtype : gridded_oif30
nparameters : 9835
random.seed : 4385
regionsfile : ${datadir}/covariances/gridded_NH/griddedNHparameters.nc
random.seed.init: ${datadir}/randomseedinit.pickle
!random.seed.init: /projects/0/ctdas/ingrid/seed_py2.pickle
! Include a naming scheme for the variables
#include NamingScheme.wp_Mar2011.rc
......@@ -89,11 +89,11 @@ class CO2GriddedStateVector(StateVector):
if 'pco2' in file or 'cov_ocean' in file:
parnr = list(range(9805,9835))
if 'ocn.optimize' in dacycle.dasystem and dacycle.dasystem['ocn.optimize']:
cov = f.get_variable('CORMAT')
else:
cov = np.identity(30)*1E-20
if 'ocn.optimize' in dacycle.dasystem and dacycle.dasystem['ocn.optimize'] == False:
cov = np.identity(30)*1E-20 | cov = np.identity(30)*1E-20
logging.debug('Prior ocean covariance matrix = Identity matrix')
else:
cov = f.get_variable('CORMAT')
elif 'tropic' in file:
cov = f.get_variable('covariance')
parnr = f.get_variable('parameternumber')
......
#! /bin/env bash
#SBATCH -p short
#SBATCH -p all
#SBATCH -t 01:00:00
#SBATCH -n 48
#SBATCH -n 56
echo "All output piped to file template.log"
export HOST='cartesius'
export HOST='aether'
export icycle_in_job=999
. /projects/0/tm5meteo/admin/bash_profile__2019
module load TM5-MP
module load NCO/4.6.6-intel-2017a
module load netcdf4-python/1.2.9-intel-2017a-Python-3.6.1
module load Python/3.6.6-foss-2018b
module load pre2019
module load hdf5/impi/intel/1.8.9
module load hdf4/intel/4.2.9
module load netcdf/impi/intel/4.1.3
module load szip/intel/2.1
module load fortran
module load mpi
module load nco
python3 template.py rc=template.rc -v $1 >& template.log
python template.py rc=template.rc -v $1 >& template.log
......@@ -28,7 +28,7 @@ sys.path.append(os.getcwd())
from da.cyclecontrol.initexit_cteco2 import start_logger, validate_opts_args, parse_options, CycleControl
from da.pipelines.pipeline_cteco2 import ensemble_smoother_pipeline, header, footer, analysis_pipeline, archive_pipeline
from da.dasystems.dasystem_cteco2 import CO2DaSystem
from da.platform.cartesius import CartesiusPlatform
from da.platform.aether import AetherPlatform
from da.statevectors.statevectorNHgridded_cteco2 import CO2GriddedStateVector
from da.observations.obs_gvp_co2 import ObsPackObservations
from da.obsoperators.observationoperator_tm5_cteco2 import TM5ObservationOperator
......@@ -49,7 +49,7 @@ opts, args = validate_opts_args(opts, args)
dacycle = CycleControl(opts, args)
platform = CartesiusPlatform()
platform = AetherPlatform()
dasystem = CO2DaSystem(dacycle['da.system.rc'])
obsoperator = TM5ObservationOperator(dacycle['da.obsoperator.rc'])
samples = [ObsPackObservations()]
......
......@@ -28,8 +28,8 @@
!
! The time for which to start and end the data assimilation experiment in format YYYY-MM-DD HH:MM:SS
time.start : 2014-12-27 00:00:00
time.finish : 2016-02-01 00:00:00
time.start : 2000-01-01 00:00:00
time.finish : 2021-02-01 00:00:00
! Whether to restart the CTDAS system from a previous cycle, or to start the sequence fresh. Valid entries are T/F/True/False/TRUE/FALSE
......@@ -37,11 +37,11 @@ time.restart : False
! The length of a cycle is given in days, such that the integer 7 denotes the typically used weekly cycle. Valid entries are integers > 1
time.cycle : 2
time.cycle : 7
! The number of cycles of lag to use for a smoother version of CTDAS. CarbonTracker CO2 typically uses 5 weeks of lag. Valid entries are integers > 0
time.nlag : 2
time.nlag : 5
! The directory under which the code, input, and output will be stored. This is the base directory for a run. The word
! '/' will be replaced through the start_ctdas.sh script by a user-specified folder name. DO NOT REPLACE
......@@ -53,19 +53,19 @@ dir.da_run : template
! allows you to complete many cycles before resubmitting a job to the queue and having to wait again for resources.
! Valid entries are integers > 0
da.resources.ncycles_per_job : 1
da.resources.ncycles_per_job : 90
! The ntasks specifies the number of threads to use for the MPI part of the code, if relevant. Note that the CTDAS code
! itself is not parallelized and the python code underlying CTDAS does not use multiple processors. The chosen observation
! operator though might use many processors, like TM5. Valid entries are integers > 0
da.resources.ntasks : 48
da.resources.ntasks : 56
! This specifies the amount of wall-clock time to request for each job. Its value depends on your computing platform and might take
! any form appropriate for your system. Typically, HPC queueing systems allow you a certain number of hours of usage before
! your job is killed, and you are expected to finalize and submit a next job before that time. Valid entries are strings.
da.resources.ntime : 5:00:00
da.resources.ntime : 168:00:00
! The resource settings above will cause the creation of a job file in which 2 cycles will be run, and 30 threads
! are asked for a duration of 4 hours
......@@ -77,7 +77,7 @@ da.system : CarbonTracker
! The specific settings for your system are read from a separate rc-file, which points to the data directories, observations, etc
da.system.rc : da/rc/carbontracker_gcp2020_insitu_OCO2.rc
da.system.rc : da/rc/cteco2/carbontracker_gcp2021_insitu.rc
! This flag should probably be moved to the da.system.rc file. It denotes which type of filtering to use in the optimizer
......@@ -93,7 +93,7 @@ da.obsoperator : TM5
!
da.obsoperator.home : ${HOME}/TM5
da.obsoperator.rc : ${da.obsoperator.home}/tm5-test_py3.rc
da.obsoperator.rc : ${da.obsoperator.home}/tm5-cte2021-era5-sib4.rc
!
! The number of ensemble members used in the experiment. Valid entries are integers > 2
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment