Commit 39e9c14c authored by Wit, Allard de's avatar Wit, Allard de
Browse files

Grompy data loading works for sequential loading of CSV files with parcel data.

parent 98a628b8
datasources:
- sentinel2
grompy:
version: 0.9
parcel_info:
dsn: sqlite:////home/wit015/Data/groenmonitor/parcel_info.db3
counts_file: /home/wit015/Data/groenmonitor/Optisch/perceelscount.csv
# shape_file: /home/wit015/Data/groenmonitor/BRP/gewaspercelen_2019.shp
shape_file: /home/wit015/Data/groenmonitor/BRP/BRP_10rows.shp
table_name: parcel_info
datasets:
sentinel2_values:
dsn: sqlite:////home/wit015/Data/groenmonitor/sentinel2_reflectance_values.db3
bands:
NDVI: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_2019_ADC.csv
B02: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B02_2019_ADC.csv
B03: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B03_2019_ADC.csv
B04: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B04_2019_ADC.csv
B05: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B05_2019_ADC.csv
B06: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B06_2019_ADC.csv
B07: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B07_2019_ADC.csv
B08: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B08_2019_ADC.csv
B11: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B11_2019_ADC.csv
B12: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B12_2019_ADC.csv
B8A: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_mean_B8A_2019_ADC.csv
sentinel2_std:
dsn: sqlite:////home/wit015/Data/groenmonitor/sentinel2_reflectance_std.db3
bands:
NDVI: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_2019_ADC.csv
B02: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B02_2019_ADC.csv
B03: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B03_2019_ADC.csv
B04: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B04_2019_ADC.csv
B05: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B05_2019_ADC.csv
B06: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B06_2019_ADC.csv
B07: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B07_2019_ADC.csv
B08: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B08_2019_ADC.csv
B11: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B11_2019_ADC.csv
B12: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B12_2019_ADC.csv
B8A: /home/wit015/Data/groenmonitor/Optisch/zonal_stats_std_B8A_2019_ADC.csv
sentinel1_backscatter:
dsn: sqlite:////home/wit015/Data/groenmonitor/sentinel1_backscatter.db3
bands:
VV: /home/wit015/Data/groenmonitor/Radar/zonal_stats_mean_VV_2019_ADC.csv
VH: /home/wit015/Data/groenmonitor/Radar/zonal_stats_mean_VH_2019_ADC.csv
VV_std: /home/wit015/Data/groenmonitor/Radar/zonal_stats_std_VV_2019_ADC.csv
VH_std: /home/wit015/Data/groenmonitor/Radar/zonal_stats_std_VH_2019_ADC.csv
sentinel1_coherence:
dsn: sqlite:////home/wit015/Data/groenmonitor/sentinel1_backscatter.db3
bands:
S1A_VV: /home/wit015/Data/groenmonitor/Radar/zonal_stats_mean_coh_S1A_VV_ALL_2019_ADC.csv
S1A_VV_std: /home/wit015/Data/groenmonitor/Radar/zonal_stats_std_coh_S1A_VV_ALL_2019_ADC.csv
S1B_VV: /home/wit015/Data/groenmonitor/Radar/zonal_stats_mean_coh_S1B_VV_ALL_2019_ADC.csv
S1B_VV_std: /home/wit015/Data/groenmonitor/Radar/zonal_stats_std_coh_S1B_VV_ALL_2019_ADC.csv
from .load_data import load_data
load_data("/home/wit015/Sources/grompy/grompy.yaml")
\ No newline at end of file
from pathlib import Path
from csv import DictReader
import time
import multiprocessing as mp
import sqlalchemy as sa
import pandas as pd
import geopandas as gpd
import numpy as np
import yaml
from .util import count_lines, prepare_db, printProgressBar
dummy_date = "19000101"
def load_parcel_info(dsn, counts_file, shape_file, table_name):
"""Loads the parcel info from the
:param dsn: Data source name where to write to
counts_file: CSV file from which pixel counts should be read
shape_file: shapefile whose .DBF file should be used as parcel info.
table_name: name of the table to write parcel info into
:return:
"""
shp_fname = Path(shape_file)
df = gpd.read_file(shp_fname)
df["area_ha"] = df.geometry.area/1e4
df = df.set_index("fieldid")
fname_counts = Path(counts_file)
df_counts = pd.read_csv(fname_counts)
df_counts.set_index("field_ID", inplace=True)
df["pixcount"] = df_counts["pixcount"]
df_out = pd.DataFrame({"fieldID": df.index,
"year": df.year.astype(np.int32),
"pixcount": df.pixcount.astype(np.int32),
"area_ha": df.area_ha,
"cat_gewasc": df.cat_gewasc.apply(str),
"gws_gewasc": df.gws_gewasc.astype(np.int32),
"gws_gewas": df.gws_gewas.apply(str),
"provincie": df.provincie.apply(str),
"gemeente": df.gemeente.apply(str),
"regio": df.regio.apply(str),
"pc4": df.PC4.apply(str),
"woonplaats": df.woonplaats.apply(str),
"waterschap": df.waterschap.apply(str),
})
engine = sa.create_engine(dsn)
df_out.to_sql(table_name, engine, if_exists="replace", index=False)
def process_rows(rows):
df = pd.DataFrame()
fieldIDs = []
for column_name, row in rows.items():
fieldIDs.append(int(row.pop("field_ID")))
count = row.pop("count")
recs = []
for sdate, value in row.items():
value = float(value)
if value == 0.:
continue
recs.append({"day": sdate, "value": float(value), "band": column_name})
if not recs: # only zero (null) values for the column
# We add one dummy record to make sure we can create the dataframe properly
recs.append({"day": dummy_date, "value": None, "band": column_name})
df_tmp = pd.DataFrame(recs)
try:
df_tmp["day"] = pd.to_datetime(df_tmp.day).dt.date
except:
pass
df = pd.concat([df, df_tmp])
df = df.pivot(index="day", columns="band", values="value")
df.reset_index(inplace=True)
if len(set(fieldIDs)) > 1:
msg = f"FieldIDs are not the same for this row: {fieldIDs}"
raise RuntimeError(msg)
df["fieldID"] = fieldIDs[0]
ix = (df.day == pd.to_datetime(dummy_date))
if any(ix):
df = df[~ix]
return df
def write_to_database(engine, table_name, mean_csv_readers, nlines):
# printProgressBar(0, nlines-1, prefix='Progress:', suffix='Complete', length=50)
this_line = 0
while True:
try:
rows = {column_name: next(reader) for column_name, reader in mean_csv_readers.items()}
df = process_rows(rows)
df.to_sql(table_name, engine, if_exists="append", index=False)
this_line += 1
# if this_line % 1000 == 0:
# printProgressBar(this_line, nlines, prefix='Progress:', suffix='Complete', length=50)
except StopIteration:
break
except Exception as e:
pass
def load_satellite_csv(dataset_name, dsn, bands):
# nlines = count_lines(bands) # 803016
nlines = 803016
mean_csv_readers = {}
for column_name, csv_fname in bands.items():
mean_csv_readers[column_name] = DictReader(open(csv_fname))
t1 = time.time()
engine = prepare_db(dsn, table_name=dataset_name, bands=mean_csv_readers.keys())
write_to_database(engine, dataset_name, mean_csv_readers, nlines)
def load_data(yaml_file):
"""Loads data point to by the YAML config file.
:param yaml_file:
:return:
"""
grompy_conf = yaml.safe_load(open(yaml_file))
parcel_info = grompy_conf.pop("parcel_info")
load_parcel_info(**parcel_info)
i =0
# pool = mp.Pool(mp.cpu_count())
for dataset_name, description in grompy_conf["datasets"].items():
i += 1
if i <=2:
continue
load_satellite_csv(dataset_name, **description)
\ No newline at end of file
import sqlalchemy
from sqlalchemy import MetaData, Table, Column, Integer, Date, Float, Text, create_engine
def take_first(iter):
for i in iter:
return i
def count_lines(files):
"""Checks the number of lines in the input CSV files.
They should all be the same else throw an error.
"""
print("Checking file row counts...")
counts = {}
for band, fname in files.items():
with open(fname) as my_file:
c = sum(1 for _ in my_file)
counts[fname] = c
print(f" - {fname}: {c}")
if len(set(counts.values())) > 1:
msg = "CSV files do not have the same number of rows!"
raise RuntimeError(msg)
return take_first(counts.values())
def printProgressBar (iteration, total, prefix = '', suffix = '', decimals = 1, length = 100, fill = '█', printEnd = "\r"):
"""
Call in a loop to create terminal progress bar
@params:
iteration - Required : current iteration (Int)
total - Required : total iterations (Int)
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str)
decimals - Optional : positive number of decimals in percent complete (Int)
length - Optional : character length of bar (Int)
fill - Optional : bar fill character (Str)
printEnd - Optional : end character (e.g. "\r", "\r\n") (Str)
"""
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filledLength = int(length * iteration // total)
bar = fill * filledLength + '-' * (length - filledLength)
print('\r%s |%s| %s%% %s' % (prefix, bar, percent, suffix), end = printEnd)
# Print New Line on Complete
if iteration == total:
print()
def prepare_db(dsn, table_name, bands, has_sensor_info):
engine = create_engine(dsn)
meta = MetaData(engine)
tbl = Table(table_name, meta,
Column('fieldID', Integer, primary_key=True, nullable=False),
Column('day', Date, primary_key=True, nullable=False),
)
if has_sensor_info:
tbl.append_column(Column('sensor', Text, primary_key=True, nullable=False))
for col_name in bands:
tbl.append_column(Column(col_name, Float, nullable=True))
try:
tbl.drop()
except sqlalchemy.exc.OperationalError:
pass
tbl.create()
return engine
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment