Commit f7718c03 authored by Koehorst, Jasper's avatar Koehorst, Jasper
Browse files

workflow hdt/ngtax improvements

parent f29814e5
Pipeline #17959 failed with stage
in 1 minute and 5 seconds
......@@ -88,16 +88,16 @@ public class Generic {
dataTransferOperationsAO.getOperation(irodsFile, localFile, null, null);
}
public static void destinationChecker(Connection connection, File file) {
Scanner scanner = new Scanner(file.getAbsolutePath());
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
System.err.println(line);
if (line.startsWith("destination")) {
System.err.println(line);
}
}
}
// public static void destinationChecker(Connection connection, File file) {
// Scanner scanner = new Scanner(file.getAbsolutePath());
// while (scanner.hasNextLine()) {
// String line = scanner.nextLine();
// System.err.println(line);
// if (line.startsWith("destination")) {
// System.err.println(line);
// }
// }
// }
public static String getEdamFormat(String fileName) throws Exception {
// Always remove compression end when gz
......
This diff is collapsed.
......@@ -17,6 +17,7 @@ import nl.munlock.irods.Connection;
import nl.munlock.irods.Search;
import nl.munlock.options.kubernetes.CommandOptionsKubernetes;
import nl.munlock.objects.Workflow;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.log4j.Logger;
import org.irods.jargon.core.exception.JargonException;
import org.irods.jargon.core.query.*;
......@@ -25,9 +26,13 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import static nl.munlock.irods.Search.queueAVU;
import static nl.munlock.irods.Search.resetFailures;
import static nl.munlock.yaml.Workflow.fixClass;
public class Kubernetes {
......@@ -39,13 +44,14 @@ public class Kubernetes {
createJobs(commandOptionsKubernetes, connection);
}
public static void createJobs(CommandOptionsKubernetes commandOptionsKubernetes, Connection connection) throws IOException, ApiException, JargonQueryException, JargonException, InterruptedException {
public static void createJobs(CommandOptionsKubernetes commandOptionsKubernetes, Connection connection) throws InterruptedException, ApiException, IOException, JargonException, JargonQueryException {
// Requires the kube config file
ApiClient client = Config.defaultClient();
client.setDebugging(false);
if (commandOptionsKubernetes.debug) {
client.setDebugging(true);
} else {
client.setDebugging(false);
}
Configuration.setDefaultApiClient(client);
......@@ -55,7 +61,6 @@ public class Kubernetes {
int totalItems = checkKubernetesJobs(client);
for (String yaml : yamls) {
assayCount = assayCount + 1;
V1Job v1Job = generateKubernetesJobObject(commandOptionsKubernetes, yaml, connection);
......@@ -64,7 +69,7 @@ public class Kubernetes {
// Sleeping for 1 minute...
while (totalItems >= commandOptionsKubernetes.limit) {
log.info("Sleeping for 1 minute as there are " + totalItems + " jobs running and have " + yamls.size() + " jobs in total");
log.info("Sleeping for 1 minute as there are " + totalItems + " jobs running");
// Count down?
for (int i = 60; i >= 0; i--) {
System.out.print("Sleeping for " + i + " seconds\r");
......@@ -77,29 +82,46 @@ public class Kubernetes {
BatchV1Api batchV1Api = new BatchV1Api();
// Submit the job to nl.wur.ssb.kubernetes
batchV1Api.createNamespacedJob("unlock", v1Job, false, null, null);
batchV1Api.createNamespacedJobCall("unlock", v1Job, null, null, null, null, null);
try {
batchV1Api.createNamespacedJob("unlock", v1Job, false, null, null);
batchV1Api.createNamespacedJobCall("unlock", v1Job, null, null, null, null, null);
// Changing waiting to queue
queueAVU(connection, yaml);
TimeUnit.SECONDS.sleep(commandOptionsKubernetes.delay);
} catch (ApiException e) {
log.error("Job submission failed");
e.printStackTrace();
}
TimeUnit.MILLISECONDS.sleep(500);
totalItems = totalItems + 1;
int remain = yamls.size() - assayCount;
log.info("Jobs currently running " + totalItems + " of " + remain);
log.info("Jobs currently running " + totalItems);
}
yamls = new HashSet<>();
}
private static void getYamls(CommandOptionsKubernetes commandOptionsKubernetes, Connection connection) throws JargonException, GenQueryBuilderException, JargonQueryException, IOException, ApiException, InterruptedException {
// List all project turtle files...
Search.getAllUnprocessed(commandOptionsKubernetes, connection);
Kubernetes.createJobs(commandOptionsKubernetes, connection);
if (commandOptionsKubernetes.project.contains("references")) {
Search.getAllUnprocessedReferences(commandOptionsKubernetes, connection, "/" + commandOptionsKubernetes.zone + "/references/genomes/bacteria%");
String path = "/" + commandOptionsKubernetes.zone + "/references/genomes/bacteria%";
if (commandOptionsKubernetes.reset)
resetFailures(connection, path);
Search.getAllUnprocessedReferences(commandOptionsKubernetes, connection, path);
Kubernetes.createJobs(commandOptionsKubernetes, connection);
}
if (commandOptionsKubernetes.project.contains("ampliconlibraries")) {
Search.getAllUnprocessedReferences(commandOptionsKubernetes, connection, "/" + commandOptionsKubernetes.zone + "/landingzone/ampliconlibraries%");
} else if (commandOptionsKubernetes.project.contains("ampliconlibraries")) {
String path ="/" + commandOptionsKubernetes.zone + "/landingzone%";
if (commandOptionsKubernetes.reset) {
resetFailures(connection, path);
}
Search.getAllUnprocessedReferences(commandOptionsKubernetes, connection, path);
Kubernetes.createJobs(commandOptionsKubernetes, connection);
} else {
Search.getAllUnprocessed(commandOptionsKubernetes, connection);
Kubernetes.createJobs(commandOptionsKubernetes, connection);
}
// Search.getAllYamls(commandOptionsKubernetes, connection);
......@@ -179,7 +201,6 @@ public class Kubernetes {
}
private static V1Job generateKubernetesJobObject(CommandOptionsKubernetes commandOptions, String yamlFile, Connection connection) throws JargonException, JargonQueryException, YamlException, FileNotFoundException {
// Load yaml file for cores and memory restrictions?
log.info("Loading ." + yamlFile);
fixClass("." + yamlFile);
......@@ -200,6 +221,9 @@ public class Kubernetes {
if (line.startsWith("memory")) {
workflow.memory = Integer.parseInt(line.split(" ")[1]);
}
if (line.startsWith("provenance")) {
workflow.provenance = Boolean.parseBoolean(line.split(" ")[1]);
}
}
}
......@@ -252,9 +276,25 @@ public class Kubernetes {
// List of arguments for the command during startup
List<String> args = new ArrayList<>();
// Execute.sh should sync the collection after the run
args.add("-x");
args.add("/run.sh");
// The cwl workflow file
args.add("-c");
args.add(cwlFile);
// The yaml file
args.add("-y");
args.add(yamlFile);
// Provenance
args.add("-p");
if (workflow.provenance) {
args.add("true");
} else {
args.add("false");
}
containerItem.args(args);
// Base command
......
......@@ -3,6 +3,7 @@ package nl.munlock.objects;
public class Workflow {
public int threads = 2;
public int memory = 5000;
public boolean provenance = true;
public void setThreads(int threads) {
this.threads = threads;
......@@ -20,4 +21,12 @@ public class Workflow {
public int getMemory() {
return memory;
}
public void setProvenance(boolean provenance) {
this.provenance = provenance;
}
public boolean getProvenance() {
return provenance;
}
}
package nl.munlock.objects;
import nl.munlock.Generic;
import nl.munlock.yaml.MetaGenomics;
import nl.munlock.yaml.Yaml;
import org.apache.log4j.Logger;
......@@ -9,21 +8,19 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
public class WorkflowMetagenomics extends Workflow {
private static final Logger log = Generic.getLogger(WorkflowMetagenomics.class, Yaml.debug);
public class WorkflowHDT extends Workflow {
private static final Logger log = Generic.getLogger(WorkflowHDT.class, Yaml.debug);
// public String destination;
public String folder;
public String destination;
public HashMap<String, String> irods = new HashMap<>();
public ArrayList<FileClass> forward_reads = new ArrayList<>();
public ArrayList<FileClass> reverse_reads = new ArrayList<>();
public ArrayList<FileClass> pacbio_reads = new ArrayList<>();
public String kraken_database; // = new ArrayList<>();
public boolean isolate;
public boolean metagenomics;
public Map<String, String> clazz;
public void addIRODS(String irodsPath) {
irods.put(irods.size() + "_irods", irodsPath);
public String getFolder() {
return folder;
}
public void setFolder(String folder) {
this.folder = folder;
}
public void setDestination(String destination) {
......@@ -33,69 +30,89 @@ public class WorkflowMetagenomics extends Workflow {
public String getDestination() {
return destination;
}
// public HashMap<String, String> irods = new HashMap<>();
// public ArrayList<FileClass> forward_reads = new ArrayList<>();
// public ArrayList<FileClass> reverse_reads = new ArrayList<>();
// public ArrayList<FileClass> pacbio_reads = new ArrayList<>();
// public String kraken_database; // = new ArrayList<>();
// public boolean isolate;
// public boolean metagenomics;
// public Map<String, String> clazz;
// public void addIRODS(String irodsPath) {
// irods.put(irods.size() + "_irods", irodsPath);
// }
// public void setDestination(String destination) {
// this.destination = destination;
// }
//
// public String getDestination() {
// return destination;
// }
// public ArrayList<FileClass> getForward_reads() {
// return forward_reads;
// }
// public void addForward_reads(String forward_reads) throws Exception {
// addIRODS(forward_reads);
// FileClass fileClass = new FileClass();
// fileClass.setClazz("File");
// fileClass.setLocation(forward_reads);
// this.forward_reads.add(fileClass);
// }
public ArrayList<FileClass> getForward_reads() {
return forward_reads;
}
public void addForward_reads(String forward_reads) throws Exception {
addIRODS(forward_reads);
FileClass fileClass = new FileClass();
fileClass.setClazz("File");
fileClass.setLocation(forward_reads);
this.forward_reads.add(fileClass);
}
// public boolean isIsolate() {
// return isolate;
// }
public boolean isIsolate() {
return isolate;
}
public void setIsolate(boolean isolate) {
this.isolate = isolate;
}
// public void setIsolate(boolean isolate) {
// this.isolate = isolate;
// }
public ArrayList<FileClass> getReverse_reads() {
return reverse_reads;
}
// public ArrayList<FileClass> getReverse_reads() {
// return reverse_reads;
// }
public void addReverse_reads(String reverse_reads) throws Exception {
addIRODS(reverse_reads);
FileClass fileClass = new FileClass();
fileClass.setClazz("File");
fileClass.setLocation(reverse_reads);
this.reverse_reads.add(fileClass);
}
// public void addReverse_reads(String reverse_reads) throws Exception {
// addIRODS(reverse_reads);
// FileClass fileClass = new FileClass();
// fileClass.setClazz("File");
// fileClass.setLocation(reverse_reads);
// this.reverse_reads.add(fileClass);
// }
public ArrayList<FileClass> getPacbio() {
return pacbio_reads;
}
// public ArrayList<FileClass> getPacbio() {
// return pacbio_reads;
// }
public void addPacbio(String pacbio) throws Exception {
addIRODS(pacbio);
FileClass fileClass = new FileClass();
fileClass.setClazz("File");
fileClass.setLocation(pacbio);
this.pacbio_reads.add(fileClass);
}
// public void addPacbio(String pacbio) throws Exception {
// addIRODS(pacbio);
// FileClass fileClass = new FileClass();
// fileClass.setClazz("File");
// fileClass.setLocation(pacbio);
// this.pacbio_reads.add(fileClass);
// }
public boolean isMetagenomics() {
return metagenomics;
}
// public boolean isMetagenomics() {
// return metagenomics;
// }
public void setMetagenomics(boolean metagenomics) {
this.metagenomics = metagenomics;
}
// public void setMetagenomics(boolean metagenomics) {
// this.metagenomics = metagenomics;
// }
public String getKraken_database() {
return kraken_database;
}
// public String getKraken_database() {
// return kraken_database;
// }
public void setKraken_database(String kraken_database) throws Exception {
this.kraken_database = kraken_database;
// public void setKraken_database(String kraken_database) throws Exception {
// this.kraken_database = kraken_database;
// FileClass fileClass = new FileClass();
// fileClass.setClazz("Directory");
// fileClass.setLocation(kraken_database);
// this.kraken_database.add(fileClass);
}
// }
}
......@@ -3,6 +3,8 @@ package nl.munlock.objects;
import java.util.ArrayList;
import java.util.HashMap;
import static nl.munlock.yaml.NGTax.fixPrimer;
public class WorkflowNgtax extends Workflow {
public String reference_db; // = new ArrayList<>();
......@@ -32,7 +34,7 @@ public class WorkflowNgtax extends Workflow {
}
public void setForward_primer(String forward_primer) {
this.forward_primer = forward_primer;
this.forward_primer = fixPrimer(forward_primer);
}
public String getForward_primer() {
......@@ -40,7 +42,7 @@ public class WorkflowNgtax extends Workflow {
}
public void setReverse_primer(String reverse_primer) {
this.reverse_primer = reverse_primer;
this.reverse_primer = fixPrimer(reverse_primer);
}
public String getReverse_primer() {
......@@ -114,4 +116,5 @@ public class WorkflowNgtax extends Workflow {
public double getMinimum_threshold() {
return minimum_threshold;
}
}
}
\ No newline at end of file
......@@ -21,6 +21,15 @@ public class CommandOptionsKubernetes extends CommandOptionsIRODS {
@Parameter(names = {"-priority"}, description = "Kubernetes priority value")
public int priority = 0;
@Parameter(names = {"-delay"}, description = "Kubernetes job submission delay (seconds)")
public int delay = 1;
@Parameter(names = {"-disableProv"}, description = "Disable provenance output")
public boolean disableProvenance = false;
@Parameter(names = {"-reset"}, description = "Reset jobs to waiting")
public boolean reset;
public CommandOptionsKubernetes(String args[]) {
try {
JCommander jc = new JCommander(this);
......
......@@ -6,19 +6,10 @@ import com.beust.jcommander.ParameterException;
import java.lang.reflect.Field;
public class CommandOptionsNGTAX extends CommandOptionsYAML {
public class CommandOptionsHDT extends CommandOptionsYAML {
@Parameter(names = {"-length"}, description = "Read length used in NGTAX (default maximum length)")
public int read_len = 0;
@Parameter(names = {"-minimumThreshold"}, description = "Minimum threshold detectable, expressed in percentage")
public double minimumThreshold = 0.1;
@Parameter(names = {"-referenceDB"}, description = "Reference database to disable provide empty string -referenceDB ''")
public String reference_db = "/unlock/references/databases/Silva/SILVA_132_SSURef_tax_silva.fasta.gz";
public CommandOptionsNGTAX(String args[]) {
public CommandOptionsHDT(String args[]) {
try {
JCommander jc = new JCommander(this);
jc.parse(args);
......
......@@ -23,7 +23,7 @@ public class CommandOptionsYAML extends CommandOptionsIRODS {
// public String yaml = "";
@Parameter(names = {"-overwrite"}, description = "Overwrites already existing yaml fiels")
public boolean overwrite;
public boolean overwrite = false;
@Parameter(names = {"-threads"}, description = "Number of threads to use")
public int threads = 2;
......
......@@ -120,6 +120,7 @@ public class ENA {
workflow = setGenomeSyncWorkflowsDefaults(workflow);
workflow.threads = commandOptions.threads;
workflow.memory = commandOptions.memory;
workflow.provenance = false;
workflow.setGca(commandOptions.id);
String lineage = getLineage(model, commandOptions.taxon);
......
package nl.munlock.yaml;public class HDT {
package nl.munlock.yaml;
import com.esotericsoftware.yamlbeans.YamlWriter;
import nl.munlock.Generic;
import nl.munlock.irods.Connection;
import nl.munlock.objects.WorkflowHDT;
import nl.munlock.options.workflow.CommandOptionsHDT;
import org.apache.log4j.Logger;
import org.irods.jargon.core.pub.DataObjectAO;
import org.irods.jargon.core.pub.DataTransferOperations;
import org.irods.jargon.core.pub.domain.AvuData;
import org.irods.jargon.core.pub.io.IRODSFile;
import java.io.File;
import java.io.FileWriter;
import java.util.Set;
public class HDT {
private static final Logger log = Generic.getLogger(HDT.class, false);
public static void generateHDTWorkflow(CommandOptionsHDT commandOptionsHDT, Connection connection, Set<String> folders) throws Exception {
DataTransferOperations dataTransferOperationsAO = connection.irodsFileSystem.getIRODSAccessObjectFactory().getDataTransferOperations(connection.irodsAccount);
for (String folder : folders) {
String yamlName = commandOptionsHDT.id + ".yaml";
// Save to iRODS
IRODSFile destFile = connection.fileFactory.instanceIRODSFile(folder + "/hdt/" + yamlName);
if (!destFile.exists()) {
// If not exists start building
} else if (commandOptionsHDT.overwrite && destFile.exists())
// If exists and overwrite, delete
destFile.delete();
else {
// Else skip this one
continue;
}
log.info("Uploading " + new File(yamlName) + " to " + destFile);
// System.err.println(folder);
IRODSFile irodsFile = connection.fileFactory.instanceIRODSFile(folder + "/hdt/");
// Prepare directory for YAML file
irodsFile.mkdirs();
WorkflowHDT workflowHDT = new WorkflowHDT();
workflowHDT.setFolder(folder);
workflowHDT.setDestination(folder + "/hdt/");
workflowHDT.setMemory(commandOptionsHDT.memory);
workflowHDT.setThreads(commandOptionsHDT.threads);
workflowHDT.setProvenance(false);
YamlWriter writer = new YamlWriter(new FileWriter(yamlName));
writer.write(workflowHDT);
writer.close();
// Fix Clazz > Class
Workflow.fixClazz(yamlName);
dataTransferOperationsAO.putOperation(new File(yamlName), destFile, null, null);
// Add metadata tag...
DataObjectAO dataObjectAO = connection.irodsFileSystem.getIRODSAccessObjectFactory().getDataObjectAO(connection.irodsAccount);
AvuData avuMetaData = new AvuData("cwl", commandOptionsHDT.cwl, "waiting");
dataObjectAO.setAVUMetadata(destFile.getAbsolutePath(), avuMetaData);
}
}
}
......@@ -14,6 +14,7 @@ import nl.munlock.options.workflow.CommandOptionsDemultiplexing;
import nl.munlock.options.workflow.CommandOptionsNGTAX;
import nl.wur.ssb.RDFSimpleCon.ResultLine;
import nl.wur.ssb.RDFSimpleCon.api.Domain;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.log4j.Logger;
import org.irods.jargon.core.exception.FileIntegrityException;
import org.irods.jargon.core.exception.JargonException;
......@@ -29,10 +30,7 @@ import org.jermontology.ontology.JERMOntology.domain.Data_sample;
import java.io.File;
import java.io.FileWriter;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.*;
import java.util.concurrent.TimeUnit;
import static com.esotericsoftware.yamlbeans.YamlConfig.Quote.NONE;
......@@ -189,19 +187,16 @@ public class NGTax {
if (destFile.exists()) {
if (commandOptions.overwrite) {
log.debug("Deleting remote file as destination folder does not exists and nl.wur.ssb.yaml information might have changed");
log.info("Deleting remote file as destination folder does not exists and nl.wur.ssb.yaml information might have changed");
destFile.delete();
} else {
log.debug("Destination file already exists " + destFile);
log.info("Destination file already exists " + destFile);
continue;
}
}
log.debug("Saving nl.wur.ssb.yaml file to " + destFile);
while (destFile.exists()) {
destFile.delete();
}
// Generic part here for both single and paired end data
dataTransferOperationsAO.putOperation(new File(yamlFileName), destFile, null, null);
......@@ -225,18 +220,27 @@ public class NGTax {
ResultLine assayLine = domain.getRDFSimpleCon().runQuery("getAssayFromAmpliconLibrary.txt", true, ampliconLibraryAssay.getResource().getURI()).iterator().next();
AmpliconAssay ampliconAssay = domain.make(AmpliconAssay.class, assayLine.getIRI("assay"));