Commit 453db233 authored by Wit, Allard de's avatar Wit, Allard de
Browse files

Multiprocessing works with distributed loading and error handling in subprocesses.

parent acc762c2
......@@ -3,6 +3,7 @@ from pathlib import Path
from csv import DictReader
import time
import multiprocessing as mp
import traceback
import sqlalchemy as sa
import pandas as pd
......@@ -15,6 +16,14 @@ from .util import count_lines, prepare_db, printProgressBar
dummy_date = "19000101"
class CSVLoadingError(Exception):
def __init__(self, error, traceback):
self.error = error
self.traceback = traceback
super().__init__()
def load_parcel_info(dsn, counts_file, shape_file, table_name):
"""Loads the parcel info from the
......@@ -103,8 +112,6 @@ def write_to_database(engine, table_name, mean_csv_readers, nlines):
# printProgressBar(this_line, nlines, prefix='Progress:', suffix='Complete', length=50)
except StopIteration:
break
except Exception as e:
pass
def load_satellite_csv(dataset_name, dsn, bands):
......@@ -117,6 +124,28 @@ def load_satellite_csv(dataset_name, dsn, bands):
write_to_database(engine, dataset_name, mean_csv_readers, nlines)
class Process(mp.Process):
def __init__(self, *args, **kwargs):
mp.Process.__init__(self, *args, **kwargs)
self._pconn, self._cconn = mp.Pipe()
self._exception = None
def run(self):
try:
mp.Process.run(self)
self._cconn.send(None)
except Exception as e:
tb = traceback.format_exc()
self._cconn.send((e, tb))
raise e # You can still rise this exception if you need to
@property
def exception(self):
if self._pconn.poll():
self._exception = self._pconn.recv()
return self._exception
def load_data(yaml_file):
"""Loads data point to by the YAML config file.
......@@ -131,17 +160,27 @@ def load_data(yaml_file):
process_list = []
for dataset_name, description in grompy_conf["datasets"].items():
print(f"Starting loading of: {dataset_name}")
p = mp.Process(target=load_satellite_csv, args=(dataset_name,), kwargs=description)
p = Process(target=load_satellite_csv, args=(dataset_name,), kwargs=description)
process_list.append(p)
for p in process_list:
p.start()
while any([p.is_alive() for p in process_list]):
try:
try:
while any([p.is_alive() for p in process_list]):
print("Data loading running.....")
time.sleep(10)
except KeyboardInterrupt:
for p in process_list:
p.kill()
sys.exit()
for p in process_list:
if p.exception:
error, traceback = p.exception
raise CSVLoadingError(error, traceback)
except KeyboardInterrupt:
for p in process_list:
p.kill()
sys.exit()
except CSVLoadingError as e:
print("Loading failed in one or more files:")
print(e.error)
print(e.traceback)
......@@ -11,16 +11,17 @@ def count_lines(files):
They should all be the same else throw an error.
"""
print("Checking file row counts...")
counts = {}
for band, fname in files.items():
with open(fname) as my_file:
c = sum(1 for _ in my_file)
counts[fname] = c
print(f" - {fname}: {c}")
if len(set(counts.values())) > 1:
msg = "CSV files do not have the same number of rows!"
msg = "CSV files do not have the same number of rows!\n"
for fname, c in counts.items():
msg += f" - {fname}: {c}\n"
raise RuntimeError(msg)
return take_first(counts.values())
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment