Commit ecacd19c authored by Koehorst, Jasper's avatar Koehorst, Jasper
Browse files

manager for cron jobs added and test case updates

parent 600f6bc6
Pipeline #16983 canceled with stage
......@@ -20,6 +20,8 @@ public class App {
} else if (Arrays.asList(args).contains("-kubernetes")) {
log.info("Executing kubernetes workflows");
nl.munlock.kubernetes.App.main(args);
} else if (Arrays.asList(args).contains("-manager")) {
nl.munlock.kubernetes.App.manager();
}
}
}
package nl.munlock.kubernetes;
import io.kubernetes.client.ApiClient;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.Configuration;
import io.kubernetes.client.apis.BatchV1Api;
import io.kubernetes.client.custom.Quantity;
import io.kubernetes.client.models.*;
import io.kubernetes.client.util.Config;
import nl.munlock.options.kubernetes.CommandOptionsKubernetes;
import nl.munlock.irods.Connection;
import org.apache.log4j.Level;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static nl.munlock.kubernetes.Kubernetes.setSecret;
public class App {
public static void main(String[] args) throws Exception {
org.apache.log4j.Logger.getLogger("org").setLevel(Level.OFF);
......@@ -12,4 +28,118 @@ public class App {
Connection connection = new Connection(commandOptions);
Kubernetes.main(commandOptions, connection);
}
/**
* Method to start the cron job manager basically executes this program on a kubernetes cluster for all projects
*/
public static void manager() throws ApiException, IOException {
V1PodTemplateSpec template = new V1PodTemplateSpec();
// Defines the docker container
V1Container containerItem = new V1Container();
containerItem.name("jobmanager");
containerItem.image("docker-registry.wur.nl/unlock/docker");
V1ResourceRequirements resource = new V1ResourceRequirements();
// Set limits to the job
Map<String, Quantity> limit = new HashMap<>();
Quantity quantity_cpu = new Quantity(2000 + "m");
Quantity quantity_mem = new Quantity(1000 + "Mi");
limit.put("cpu", quantity_cpu);
limit.put("memory", quantity_mem);
resource.limits(limit);
// Set requests for the job
Map<String, Quantity> request = new HashMap<>();
quantity_mem = new Quantity(1000 + "Mi");
quantity_cpu = new Quantity(2000 + "m");
request.put("memory", quantity_mem);
request.put("cpu", quantity_cpu);
resource.requests(request);
containerItem.resources(resource);
// List of arguments for the command during startup
List<String> args = new ArrayList<>();
// Execute.sh should sync the collection after the run
args.add("/scripts/jobManager.sh");
containerItem.args(args);
// Base command
List<String> command = new ArrayList<>();
command.add("bash");
containerItem.command(command);
// Unlock mount
containerItem.addVolumeMountsItem(new V1VolumeMount().name("unlock").mountPath("/unlock"));
// Environmental path for java etc...
HashMap<String, String> envMap = new HashMap<>();
envMap.put("PATH", "/root/.sdkman/candidates/maven/current/bin:/root/.sdkman/candidates/java/current/bin:/root/.sdkman/candidates/gradle/current/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin");
List<V1EnvVar> env = new ArrayList();
for (String key : envMap.keySet()) {
V1EnvVar v1EnvVar = new V1EnvVar();
v1EnvVar.setName(key);
v1EnvVar.setValue(envMap.get(key));
env.add(v1EnvVar);
}
containerItem.env(env);
// Set the name
containerItem.name("jobmanager");
// Setting secrets
containerItem.addEnvItem(setSecret("irodsHost","unlock-secret", "irodsHost"));
containerItem.addEnvItem(setSecret("irodsPort","unlock-secret", "irodsPort"));
containerItem.addEnvItem(setSecret("irodsUserName","unlock-secret", "irodsUserName"));
containerItem.addEnvItem(setSecret("irodsZone","unlock-secret", "irodsZone"));
containerItem.addEnvItem(setSecret("irodsAuthScheme","unlock-secret", "irodsAuthScheme"));
containerItem.addEnvItem(setSecret("irodsHome","unlock-secret", "irodsHome"));
containerItem.addEnvItem(setSecret("irodsCwd","unlock-secret", "irodsCwd"));
containerItem.addEnvItem(setSecret("irodsPassword","unlock-secret", "irodsPassword"));
containerItem.addEnvItem(setSecret("irodsSSL","unlock-secret", "irodsSSL"));
// Job specifications
V1JobSpec v1JobSpec = new V1JobSpec();
v1JobSpec.ttlSecondsAfterFinished(1000);
v1JobSpec.setTtlSecondsAfterFinished(1000);
v1JobSpec.setBackoffLimit(1);
// 60 min runtime, disabled as when queue is large jobs get killed when not even started
// specs.activeDeadlineSeconds((long) 3600);
v1JobSpec.template(template);
V1PodSpec v1PodSpec = new V1PodSpec();
// https://rancher.com/docs/k3s/latest/en/storage/
v1PodSpec.addVolumesItem(new V1Volume().name("unlock").persistentVolumeClaim(new V1PersistentVolumeClaimVolumeSource().claimName("unlock")));
v1PodSpec.addContainersItem(containerItem);
v1PodSpec.restartPolicy("OnFailure");
template.spec(v1PodSpec);
// Set priority level
// v1PodSpec.setPriority(commandOptions.priority);
V1ObjectMeta v1ObjectMeta = new V1ObjectMeta();
v1ObjectMeta.generateName("jobmanager");
V1Job v1Job = new V1Job();
v1Job.apiVersion("batch/v1");
v1Job.kind("Job");
v1Job.metadata(v1ObjectMeta);
v1Job.spec(v1JobSpec);
// Load and make connection
ApiClient client = Config.defaultClient();
client.setDebugging(true);
Configuration.setDefaultApiClient(client);
// v1Job.setKind("my-kind");
BatchV1Api batchV1Api = new BatchV1Api();
// Submit the job to nl.wur.ssb.kubernetes
batchV1Api.createNamespacedJob("unlock", v1Job, false, null, null);
batchV1Api.createNamespacedJobCall("unlock", v1Job, null, null, null, null, null);
}
}
......@@ -42,7 +42,12 @@ public class Kubernetes {
public static void createJobs(CommandOptionsKubernetes commandOptionsKubernetes, Connection connection) throws IOException, ApiException, JargonQueryException, JargonException, InterruptedException {
// Requires the kube config file
ApiClient client = Config.defaultClient();
client.setDebugging(false);
if (commandOptionsKubernetes.debug) {
client.setDebugging(true);
}
Configuration.setDefaultApiClient(client);
int assayCount = 0;
......@@ -321,7 +326,7 @@ public class Kubernetes {
return v1Job;
}
private static V1EnvVar setSecret(String name, String secretName, String secretKey) {
static V1EnvVar setSecret(String name, String secretName, String secretKey) {
// Container secrets? TODO validate...
V1EnvVarSource v1EnvVarSource = new V1EnvVarSource();
V1SecretKeySelector v1SecretKeySelector = new V1SecretKeySelector();
......
......@@ -43,4 +43,7 @@ public class CommandOptionsIRODS {
@Parameter(names = {"-assay"}, description = "Specify for which assay this nl.wur.ssb.workflow should be executed")
public String assay;
@Parameter(names = {"-debug"}, description = "Enables debug mode")
public Boolean debug = false;
}
......@@ -18,13 +18,15 @@ import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Map;
import java.util.Scanner;
import java.util.Timer;
import java.util.concurrent.TimeUnit;
import static nl.munlock.kubernetes.Kubernetes.checkKubernetesJobs;
public class AppTest {
private String project = "P_MIB-Amplicon"; // ""P_EXPLODIV"; // "P_FIRM-Project"; // "P_SIAM"; // "P_NWO_THP"; // "P_UNLOCK";
private String investigation = ""; // "I_BIOGAS"; // "I_FIRM-Broilers"; // I_InvestigationID"; // "I_UNCOUPLED"; // "I_CAMI"; // "I_UNCOUPLED";
private String[] ngtaxReadLength = {"70", "100", "120"};
private String project = "P_UNLOCK"; // "P_MIB-Amplicon"; // "P_FIRM-Project";// "P_E2BN"; // ""P_EXPLODIV"; // "P_FIRM-Project"; // "P_SIAM"; // "P_NWO_THP"; // "P_UNLOCK";
private String investigation = "I_SRA_Amplicon"; // "I_Mocks"; // "I_BIOGAS"; // "I_FIRM-Broilers"; // I_InvestigationID"; // "I_UNCOUPLED"; // "I_CAMI"; // "I_UNCOUPLED";
private String[] ngtaxReadLength = {"100", "70", "120"}; //{"70", "100", "120"};
private String minimumThreshold = "0.001";
private String referenceDB = "/unlock/references/databases/Silva/SILVA_138_SSURef_tax_silva.fasta.gz";
......@@ -43,21 +45,43 @@ public class AppTest {
// Performing the NGTAX workflow test
@Test
public void testNGTAX() throws Exception {
for (String readLength : ngtaxReadLength) {
public void testNGTAX() throws InterruptedException {
while (true) {
try {
for (String readLength : ngtaxReadLength) {
String[] args = {
"-cwl", "workflow_ngtax.cwl",
"-id", "AmpliconAnalysis_NGTAX",
"-project", project,
"-investigation", investigation,
"-length", readLength,
"-referenceDB", referenceDB,
"-memory", "6000" //,
};
// App.main(args);
}
runKubernetes();
return;
} catch (Exception e) {
e.printStackTrace();
TimeUnit.MINUTES.sleep(5);
}
}
}
String[] args = {
"-cwl", "workflow_ngtax.cwl",
"-id", "AmpliconAnalysis_NGTAX",
"-project", project,
"-investigation", investigation,
"-length", readLength,
"-referenceDB", referenceDB,
"-memory", "6000"
};
App.main(args);
// Performing the NGTAX demultiplex test
@Test
public void testNGTAXDemultiplexing() throws Exception {
project = "P_MIB-Amplicon";
String[] args = {
"-cwl", "workflow_demultiplexing.cwl",
"-id", "Demultiplexing_NGTAX",
"-project", project,
"-memory", "6000"
};
App.main(args);
// project = "ampliconlibraries";
// runKubernetes();
}
}
// Performing the Assembly workflow test
......@@ -69,7 +93,7 @@ public class AppTest {
"-project", project,
"-investigation", investigation,
"-memory", "100000",
"-threads", "30"}; // "-yaml","",
"-threads", "30" }; // "-yaml","",
App.main(args);
// runKubernetes();
}
......@@ -84,10 +108,10 @@ public class AppTest {
"-id", "metagenomics",
"-project", project,
"-investigation", investigation,
"-memory","200000",
"-threads", "16",
"-memory", "250000",
"-threads", "20",
};
App.main(args);
// App.main(args);
runKubernetes();
}
......@@ -98,39 +122,70 @@ public class AppTest {
"-cwl", "workflow_indexer.cwl",
"-project", project,
"-investigation", investigation,
"......"}; //, "-id", "Spades_test_run_new_yaml", "-project", "P_NWO_unlock_test", "-memory", "100000", "-threads", "30"}; // "-yaml","",
"......" }; //, "-id", "Spades_test_run_new_yaml", "-project", "P_NWO_unlock_test", "-memory", "100000", "-threads", "30"}; // "-yaml","",
App.main(args);
// runKubernetes();
}
// Performing the genome sync workflow test
@Test
public void testENA() throws Exception {
String[] args = {
public void testENA() {
while (true) {
project = "references";
investigation = "";
String[] args = {
"-cwl", "workflow_ena_annotation.cwl",
"-id","ID",
"-taxon","0",
"-threads", "10",
"-memory", "10000"
"-id", "ID",
"-taxon", "0",
"-threads", "5",
"-memory", "10000"
};
App.main(args);
// runKubernetes();
// App.main(args);
try {
String[] argsK = {
"-kubernetes",
"-project", project,
"-limit", "25"
};
App.main(argsK);
return;
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Test
public void runKubernetes() throws Exception {
// project = "references";
String[] args = {
"-kubernetes",
"-project", project,
"-investigation", investigation,
// "-study", "S_PREDIMED_Microbiota",
// "-observationUnit", "O_Mock%",
"-limit", "100"
// "-priority", "1000"
};
App.main(args);
}
@Test
public void startManager() throws Exception {
String[] args = { "-manager" };
App.main(args);
}
@Test
public void runEverything() throws Exception {
String[] args = {
"-kubernetes",
"-project", "%",
"-investigation", "%",
"-limit", "100"
};
App.main(args);
}
@Test
public void runReferences() throws Exception {
String[] args = {
......@@ -140,6 +195,7 @@ public class AppTest {
App.main(args);
}
@Test
public void cleanKubernetes() throws ApiException, IOException {
ApiClient client = Config.defaultClient();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment