Commit 5ffc8c00 authored by Koehorst, Jasper's avatar Koehorst, Jasper
Browse files

sync improvements with kube info

parent 97f3d4c9
Pipeline #22335 passed with stage
in 1 minute and 17 seconds
......@@ -31,6 +31,8 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.util.*;
import static java.util.concurrent.TimeUnit.SECONDS;
public class Search {
private static final Logger log = Generic.getLogger(Yaml.class, Yaml.debug);
// static Domain domain;
......@@ -132,6 +134,7 @@ public class Search {
case "DNASeqAssay" :
assays.add(domain.make(DNASeqAssay.class, assayIRI));
break;
case "JERMOntology#Sample" :
case "PairedSequenceDataSet":
case "AmpliconLibraryAssay":
case "SingleSequenceDataSet":
......@@ -321,6 +324,11 @@ public class Search {
Kubernetes.createJobs(commandOptionsKubernetes, connection);
// Reset references...
Kubernetes.yamls = new HashSet<>();
if (yaml.contains("/demultiplexed/")) {
log.info("Demultiplexing job detected, sleeping for 5 minutes due to file transfer?...");
SECONDS.sleep(150);
}
}
}
scanner.close();
......@@ -670,18 +678,22 @@ public class Search {
// Get timestamp HDT file
IRODSFile irodsFolder = connection.fileFactory.instanceIRODSFile(folder + "/hdt/");
for (File file : irodsFolder.listFiles()) {
if (file.getName().endsWith(".hdt")) {
IRODSFile irodsHDTFile = connection.fileFactory.instanceIRODSFile(file.getAbsolutePath());
if (!irodsFolder.exists()) {
return true;
} else {
for (File file : irodsFolder.listFiles()) {
if (file.getName().endsWith(".hdt")) {
IRODSFile irodsHDTFile = connection.fileFactory.instanceIRODSFile(file.getAbsolutePath());
Date dateHDT = new Date(irodsHDTFile.lastModified());
Date dateHDT = new Date(irodsHDTFile.lastModified());
System.err.println(dateTTL);
System.err.println(dateHDT);
System.err.println(dateTTL);
System.err.println(dateHDT);
if (dateHDT.compareTo(dateTTL) == -1) {
// This is a job to be executed as TTL files that are newer has been identified
return true;
if (dateHDT.compareTo(dateTTL) == -1) {
// This is a job to be executed as TTL files that are newer has been identified
return true;
}
}
}
}
......
......@@ -44,7 +44,7 @@ public class Kubernetes {
createJobs(commandOptionsKubernetes, connection);
}
public static void createJobs(CommandOptionsKubernetes commandOptionsKubernetes, Connection connection) throws InterruptedException, ApiException, IOException, JargonException, JargonQueryException {
public static void createJobs(CommandOptionsKubernetes commandOptionsKubernetes, Connection connection) throws ApiException, IOException {
// Requires the kube config file
ApiClient client = Config.defaultClient();
......@@ -61,46 +61,52 @@ public class Kubernetes {
int totalItems = checkKubernetesJobs(client);
for (String yaml : yamls) {
assayCount = assayCount + 1;
try {
assayCount = assayCount + 1;
V1Job v1Job = generateKubernetesJobObject(commandOptionsKubernetes, yaml, connection);
V1Job v1Job = generateKubernetesJobObject(commandOptionsKubernetes, yaml, connection);
if (v1Job == null) continue;
if (v1Job == null) continue;
// Sleeping for 1 minute...
while (totalItems >= commandOptionsKubernetes.limit) {
log.info("Sleeping for 1 minute as there are " + totalItems + " jobs running");
// Count down?
for (int i = 60; i >= 0; i--) {
System.out.print("Sleeping for " + i + " seconds\r");
TimeUnit.SECONDS.sleep(1);
// Sleeping for 1 minute...
while (totalItems >= commandOptionsKubernetes.limit) {
log.info("Sleeping for 1 minute as there are " + totalItems + " jobs running");
// Count down?
for (int i = 60; i >= 0; i--) {
System.out.print("Sleeping for " + i + " seconds\r");
TimeUnit.SECONDS.sleep(1);
}
totalItems = checkKubernetesJobs(client);
}
totalItems = checkKubernetesJobs(client);
}
// v1Job.setKind("my-kind");
BatchV1Api batchV1Api = new BatchV1Api();
// v1Job.setKind("my-kind");
BatchV1Api batchV1Api = new BatchV1Api();
// Submit the job to nl.wur.ssb.kubernetes
try {
batchV1Api.createNamespacedJob("unlock", v1Job, false, null, null);
batchV1Api.createNamespacedJobCall("unlock", v1Job, null, null, null, null, null);
// Submit the job to nl.wur.ssb.kubernetes
try {
batchV1Api.createNamespacedJob("unlock", v1Job, false, null, null);
batchV1Api.createNamespacedJobCall("unlock", v1Job, null, null, null, null, null);
// Changing waiting to queue
queueAVU(connection, yaml);
// Changing waiting to queue
queueAVU(connection, yaml);
TimeUnit.SECONDS.sleep(commandOptionsKubernetes.delay);
} catch (ApiException e) {
log.error("Job submission failed");
e.printStackTrace();
}
TimeUnit.SECONDS.sleep(commandOptionsKubernetes.delay);
} catch (ApiException e) {
log.error("Job submission failed");
e.printStackTrace();
}
TimeUnit.MILLISECONDS.sleep(5);
TimeUnit.MILLISECONDS.sleep(5);
totalItems = totalItems + 1;
totalItems = totalItems + 1;
log.info("Jobs currently running " + totalItems);
} catch (Exception e){
// Failed to submit a job?....
log.error("Failed to submit a job to kubernetes");
}
log.info("Jobs currently running " + totalItems);
}
yamls = new HashSet<>();
}
......@@ -358,7 +364,13 @@ public class Kubernetes {
V1ObjectMeta v1ObjectMeta = new V1ObjectMeta();
v1ObjectMeta.generateName("cwl-" + new File(yamlFile).getName().toLowerCase().replaceAll("[_\\.]", "") +"-");
HashMap<String, String> metadata = new HashMap<>();
// Project information
metadata.put("project", commandOptions.project);
metadata.put("investigation", commandOptions.investigation);
metadata.put("study", commandOptions.study);
// ...
v1ObjectMeta.annotations(metadata);
V1Job v1Job = new V1Job();
v1Job.apiVersion("batch/v1");
v1Job.kind("Job");
......
......@@ -4,7 +4,9 @@ import com.esotericsoftware.yamlbeans.YamlWriter;
import nl.munlock.Generic;
import nl.munlock.irods.Connection;
import nl.munlock.irods.Search;
import nl.munlock.kubernetes.Kubernetes;
import nl.munlock.objects.WorkflowHDT;
import nl.munlock.options.kubernetes.CommandOptionsKubernetes;
import nl.munlock.options.workflow.CommandOptionsHDT;
import org.apache.log4j.Logger;
import org.irods.jargon.core.pub.DataObjectAO;
......@@ -73,6 +75,8 @@ public class HDT {
dataObjectAO.setAVUMetadata(destFile.getAbsolutePath(), avuMetaData);
// Start kubernetes?
// CommandOptionsKubernetes commandOptionsKubernetes = new CommandOptionsKubernetes();
}
}
}
......@@ -94,6 +94,7 @@ public class Yaml {
Set<String> analysis = new HashSet<>();
int count = 0;
for (String folder : folders) {
if (!folder.contains("O_272.0")) continue;
count++;
if (count > 10) {
connection.reconnect(commandOptions);
......@@ -101,6 +102,8 @@ public class Yaml {
boolean check = Search.checkTimeStampsHDT(connection, commandOptionsHDT, folder);
if (check) {
analysis.add(folder);
HDT.generateHDTWorkflow(commandOptionsHDT, connection, analysis);
analysis.removeAll(analysis);
}
}
......
......@@ -54,29 +54,44 @@ public class AppTest {
// String[] studies = {"S_Mocks_3_4"};
try {
// for (String study : studies) {
referenceDB = "/unlock/references/databases/Silva/SILVA_138.1_SSURef_tax_silva.fasta.gz";
project = "P_E2BN";
investigation = "%";
referenceDB = "/unlock/references/databases/Silva/SILVA_138.1_SSURef_tax_silva.fasta.gz";
project = "P_E2BN";
investigation = "%";
// study = "%";
// assay = "%";
limit = "150";
for (String readLength : ngtaxReadLength) {
String[] args = {
"-cwl", "workflow_ngtax.cwl",
"-wid", "AmpliconAnalysis_NGTAX_Silva138.1",
"-project", project,
"-investigation", investigation,
"-study", study,
"-assay", assay,
"-length", readLength,
"-referenceDB", referenceDB,
limit = "150";
for (String readLength : ngtaxReadLength) {
String[] args = {
"-cwl", "workflow_ngtax.cwl",
"-wid", "AmpliconAnalysis_NGTAX_Silva138.1",
"-project", project,
"-investigation", investigation,
"-study", study,
"-assay", assay,
"-length", readLength,
"-referenceDB", referenceDB,
// "-overwrite",
"-memory", "6000",
"-threads", "2"
};
App.main(args);
"-memory", "6000",
"-threads", "2"
};
App.main(args);
// }
// }
runKubernetes();
// Run HDT creation?
String[] args2 = {
"-cwl", "/unlock/infrastructure/cwl/irods/irods_hdt.cwl",
"-wid", "hdt_creation",
"-project", project,
"-investigation", investigation,
"-memory", "6000", // "200000", // "6000",
"-overwrite",
"-threads", "2" //,
};
App.main(args2);
// Ensure hdt overwrite
reset = "-reset";
observationUnit = "%";
runKubernetes();
return;
}
......@@ -99,7 +114,7 @@ public class AppTest {
"-investigation", investigation,
"-memory", "6000"
};
App.main(args);
// App.main(args);
project = "ampliconlibraries";
String[] args2 = {
......@@ -113,7 +128,6 @@ public class AppTest {
"-disableProv"
};
App.main(args2);
}
// Performing the HDT workflow
......@@ -203,7 +217,7 @@ public class AppTest {
// Performing the genome sync workflow test
@Test
public void testENA() throws Exception {
public void testENA() {
while (true) {
project = "references";
investigation = "";
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment