Author: bleny Date: 2010-06-16 17:55:53 +0200 (Wed, 16 Jun 2010) New Revision: 76 Url: http://nuiton.org/repositories/revision/diswork/76 Log: statistiques globales, parsing JSDL, activity strategy, config, documentation Added: trunk/diswork-daemon/.classpath trunk/diswork-daemon/.project trunk/diswork-daemon/.settings/ trunk/diswork-daemon/.settings/org.eclipse.jdt.core.prefs trunk/diswork-daemon/.settings/org.maven.ide.eclipse.prefs trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/JobDescriptionTest.java Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java Added: trunk/diswork-daemon/.classpath =================================================================== --- trunk/diswork-daemon/.classpath (rev 0) +++ trunk/diswork-daemon/.classpath 2010-06-16 15:55:53 UTC (rev 76) @@ -0,0 +1,8 @@ +<?xml version="1.0" encoding="UTF-8"?> +<classpath> + <classpathentry kind="src" output="target/classes" path="src/main/java"/> + <classpathentry kind="src" output="target/test-classes" path="src/test/java"/> + <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/> + <classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/> + <classpathentry kind="output" path="target/classes"/> +</classpath> Added: trunk/diswork-daemon/.project =================================================================== --- trunk/diswork-daemon/.project (rev 0) +++ trunk/diswork-daemon/.project 2010-06-16 15:55:53 UTC (rev 76) @@ -0,0 +1,23 @@ +<?xml version="1.0" encoding="UTF-8"?> +<projectDescription> + <name>diswork-daemon</name> + <comment></comment> + <projects> + </projects> + <buildSpec> + <buildCommand> + <name>org.eclipse.jdt.core.javabuilder</name> + <arguments> + </arguments> + </buildCommand> + <buildCommand> + <name>org.maven.ide.eclipse.maven2Builder</name> + <arguments> + </arguments> + </buildCommand> + </buildSpec> + <natures> + <nature>org.eclipse.jdt.core.javanature</nature> + <nature>org.maven.ide.eclipse.maven2Nature</nature> + </natures> +</projectDescription> Added: trunk/diswork-daemon/.settings/org.eclipse.jdt.core.prefs =================================================================== --- trunk/diswork-daemon/.settings/org.eclipse.jdt.core.prefs (rev 0) +++ trunk/diswork-daemon/.settings/org.eclipse.jdt.core.prefs 2010-06-16 15:55:53 UTC (rev 76) @@ -0,0 +1,6 @@ +#Tue Jun 08 14:19:53 CEST 2010 +eclipse.preferences.version=1 +org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6 +org.eclipse.jdt.core.compiler.compliance=1.6 +org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning +org.eclipse.jdt.core.compiler.source=1.6 Added: trunk/diswork-daemon/.settings/org.maven.ide.eclipse.prefs =================================================================== --- trunk/diswork-daemon/.settings/org.maven.ide.eclipse.prefs (rev 0) +++ trunk/diswork-daemon/.settings/org.maven.ide.eclipse.prefs 2010-06-16 15:55:53 UTC (rev 76) @@ -0,0 +1,9 @@ +#Mon Jun 07 18:20:47 CEST 2010 +activeProfiles= +eclipse.preferences.version=1 +fullBuildGoals=process-test-resources +includeModules=false +resolveWorkspaceProjects=true +resourceFilterGoals=process-resources resources\:testResources +skipCompilerPlugin=true +version=1 Added: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java (rev 0) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java 2010-06-16 15:55:53 UTC (rev 76) @@ -0,0 +1,89 @@ +package org.nuiton.diswork.daemon; + +import java.lang.management.ManagementFactory; +import java.lang.management.OperatingSystemMXBean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public interface ActivityStrategy { + + /** use this strategy to never run a job */ + public static class NoActivity implements ActivityStrategy { + @Override + public boolean canWork() { + return false; + } + } + + /** use this strategy to always run a job */ + public static class UnlimitedActivity implements ActivityStrategy { + @Override + public boolean canWork() { + return true; + } + } + + /** use this strategy to run a job only if load average is low */ + public static class LimitedActivity implements ActivityStrategy { + + private static final Log log = LogFactory.getLog(LimitedActivity.class); + + protected static class LoadAverageMonitoring extends Thread { + + OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean(); + + Double loadAverageNow = 1.0; + Double loadAverage5MinutesBefore = 1.0; + Double loadAverage10MinutesBefore = 1.0; + + @Override + public void run() { + while (true) { + try { + Thread.sleep(5 * 60 * 1000); // 5 min + } catch (InterruptedException e) { + // TODO 20100615 bleny Auto-generated catch block + log.info("exception catch", e); + e.printStackTrace(); + } + loadAverage10MinutesBefore = loadAverage5MinutesBefore; + loadAverage5MinutesBefore = loadAverageNow; + loadAverageNow = os.getSystemLoadAverage(); + log.info("load averages : " + loadAverageNow + " " + + loadAverage5MinutesBefore + " " + + loadAverage10MinutesBefore); + } + } + } + + LoadAverageMonitoring monitoring; + + public LimitedActivity() { + monitoring = new LoadAverageMonitoring(); + monitoring.start(); + } + + @Override + public boolean canWork() { + boolean canWork = monitoring.loadAverageNow < 1.0 + && monitoring.loadAverage5MinutesBefore < 1.0 + && monitoring.loadAverage10MinutesBefore < 1.0; + return canWork; + } + } + + /** use this strategy to run a job only at fixed times of the week */ + public static class ScheduledActivity implements ActivityStrategy { + + @Override + public boolean canWork() { + // TODO 20100615 bleny Auto-generated method stub + throw new UnsupportedOperationException("not yet implemented"); + } + } + + + /** return true if a job can be run */ + boolean canWork(); +} \ No newline at end of file Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java 2010-06-14 08:53:35 UTC (rev 75) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java 2010-06-16 15:55:53 UTC (rev 76) @@ -24,21 +24,68 @@ */ package org.nuiton.diswork.daemon; +import java.io.File; +import java.lang.management.ManagementFactory; + import org.nuiton.diswork.fs.DisworkFileSystemConfig; import org.nuiton.util.ApplicationConfig; +/** + * + * <dl> + * <dt>diswork.workers_number</dt> + * <dd>The number of jobs the daemon can run at the same time. Should be set + * to a value that consider the number of processors, core and RAM + * available. By default, it's the number processors available to the + * JVM. Set it to 0 will disable job-processing.</dd> + * <dt>diswork.temp_directory</dt> + * <dd>Diswork need a temporary directory to store temporary data for each + * jobs. By default, the temp dir of the OS is used (ie "/tmp/diswork" + * under Linux).</dd> + * <dt>diswork.activity_strategy</dt> + * <dd>This is the way the daemon is started, different values are + * available: + * <dl> + * <dt>none</dt> + * <dd>never try to do a job</dd> + * <dt>unlimited</dt> + * <dd>always try to job, whatever the cost</dd> + * <dt>limited</dt> + * <dd>run a job only if hardware resources are available (based + * on the system load average)</dd> + * <dt>scheduled</dt> + * <dd>run a job only at fixed time of the week (for example, + * nights, week-end, etc.). It needs to define a pattern.</dd> + * </dl> + * </dd> + * </dl> + * + * @author bleny + */ public class DisworkConfig extends ApplicationConfig { protected DisworkFileSystemConfig fileSystemConfig; public DisworkConfig() { setConfigFileName("diswork.config"); + + Integer availableProcessors = ManagementFactory + . getOperatingSystemMXBean() + . getAvailableProcessors(); + setDefaultOption("diswork.workers_number", availableProcessors.toString()); + + setDefaultOption("diswork.temp_directory", + System.getProperty("java.io.tmpdir") + + File.separator + "diswork"); + + setDefaultOption("diswork.activity_strategy", "unlimited"); + + // if no total_uptime saved, consider daemon has never run + setDefaultOption("diswork.total_uptime", "0"); } public String getTempDirectory() { - return System.getProperty("java.io.tmpdir", - System.getProperty("user.dir", - ".")) + "/diswork"; + return getOption("diswork.temp_directory"); } public static DisworkConfig newConfig() { @@ -90,5 +137,35 @@ public void setFileSystemConfig(DisworkFileSystemConfig fileSystemConfig) { this.fileSystemConfig = fileSystemConfig; } + + public Integer getNumberOfWorkers() { + return getOptionAsInt("diswork.workers_number"); + } + + public String getActivityStrategy() { + return getOption("diswork.activity_strategy"); + } + + public void setActivityStrategy(String activityStrategyName) { + setOption("diswork.activity_strategy", activityStrategyName); + } + + public Long getTotalUptime() { + String upTime = getOption("diswork.total_uptime"); + return Long.parseLong(upTime); + } + + public void setTotalUptime(Long upTime) { + setOption("diswork.total_uptime", upTime.toString()); + } + + public void setFirstRunTime(Long time) { + setOption("diswork.first_run_time", time.toString()); + } + public Long getFirstRunTime() { + String firstRunTime = getOption("diswork.first_run_time"); + return Long.parseLong(firstRunTime); + } + } Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java 2010-06-14 08:53:35 UTC (rev 75) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java 2010-06-16 15:55:53 UTC (rev 76) @@ -28,6 +28,8 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.lang.management.ManagementFactory; +import java.lang.management.OperatingSystemMXBean; import java.net.UnknownHostException; import java.util.ConcurrentModificationException; import java.util.HashMap; @@ -41,6 +43,7 @@ import org.apache.commons.logging.LogFactory; import org.nuiton.diswork.fs.DisworkFileSystem; import org.nuiton.diswork.fs.DisworkFileSystemConfig; +import org.nuiton.util.FileUtil; /** * @@ -55,16 +58,6 @@ private static final Log log = LogFactory.getLog(DisworkDaemon.class); - protected DisworkFileSystem fileSystem; - - protected DisworkConfig config; - - /** owner id for this node */ - protected String ownerId; - - /** path to owned directory on fileSystem */ - protected String homeDir; - /** contains applications */ protected static final String BIN = "/bin"; @@ -94,9 +87,6 @@ /** a place where are all user-directories */ protected static final String HOME = "/home"; - - /** worker-manager will make the daemon accomplish jobs */ - protected WorkersManager workers; /** in a job directory, the place where the JSDL must be placed */ protected static final String JSDL_PATH = ".diswork/job.jsdl"; @@ -104,6 +94,28 @@ /** in a job directory, the place where the log must be placed */ protected static final String LOG_PATH = ".diswork/job.log"; + /** in a home directory, the place where the hardware info must be placed */ + protected static final String HARDINFO_PATH = "hardinfo"; + + + /** the distributed file system where jobs, data and results are stored */ + protected DisworkFileSystem fileSystem; + + /** provide the configuration data about the daemon */ + protected DisworkConfig config; + + /** time when the deamon started this time, used for total uptime stat */ + protected Long sessionStartTime; + + /** owner id for this node */ + protected String ownerId; + + /** path to owned directory on fileSystem */ + protected String homeDir; + + /** worker-manager will make the daemon accomplish jobs */ + protected WorkersManager workers; + public DisworkDaemon(DisworkConfig config) throws DisworkException { this.config = config; @@ -122,39 +134,79 @@ throw new DisworkException("booting diswork file system failed", e); } - - ownerId = config.getOwnerId(); // get job owner id from config - - if (ownerId == null) { + ownerId = config.getOwnerId(); // get job owner id from config + + if (ownerId == null) { // first time running the daemon log.info("can't find owner id, generating a new one"); - // generate a new one by cheking if home dir exists - ownerId = System.getProperty("user.name", "anonymous"); + // check home dir do not exists + try { + Random random = new Random(); + // generate a new one by cheking if home dir exists + String simpleName = System.getProperty("user.name", "anonymous"); + ownerId = simpleName; + boolean alreadyExists = fileSystem.exists(HOME + "/" + ownerId); + + // if simpleName is already taken, try simpleName + random + while (alreadyExists) { + alreadyExists = fileSystem.exists(HOME + "/" + ownerId); + ownerId = simpleName + random.nextInt(); + } + homeDir = HOME + "/" + ownerId; + if (!fileSystem.exists(homeDir)) { + fileSystem.createDirectory(homeDir); + } + } catch (ConcurrentModificationException e) { + log.info("can't create home dir", e); + throw new DisworkException("can't create home dir", e); + } catch (IOException e) { + log.info("can't create home dir", e); + throw new DisworkException("can't create home dir", e); + } + config.setOwnerId(ownerId); - + + config.setFirstRunTime(System.currentTimeMillis()); + // config.saveForUser(); } - + log.info("owner id is " + ownerId); - + + // check if config implies to run a worker + if (config.getNumberOfWorkers() >= 0) { + workers = new WorkersManager(fileSystem, config); + } else { + log.info("worker manager disabled"); + } + + sessionStartTime = System.currentTimeMillis(); + + // writing hardware info to homeDir try { - homeDir = HOME + "/" + ownerId; - if (!fileSystem.exists(homeDir)) { - fileSystem.createDirectory(homeDir); - } + OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean(); + String hardinfos = os.getName() + "\n" + os.getArch() + "\n" + + os.getAvailableProcessors(); + + // TODO 20100615 bleny add RAM size and HDD capacities to hardinfos + + fileSystem.write(homeDir + "/" + HARDINFO_PATH, + IOUtils.toInputStream(hardinfos)); + log.info("writing hardware infos " + hardinfos); } catch (ConcurrentModificationException e) { - log.info("can't create home dir", e); - throw new DisworkException("can't create home dir", e); + log.info("can't write hardware info", e); + throw new DisworkException("can't write hardware info", e); } catch (IOException e) { - log.info("can't create home dir", e); - throw new DisworkException("can't create home dir", e); + log.info("can't write hardware info", e); + throw new DisworkException("can't write hardware info", e); } - - // check if config implies to run a worker - workers = new WorkersManager(fileSystem, config); - workers.start(); } + /** + * Create all base directories on the File System + * @throws ConcurrentModificationException + * @throws IOException + */ protected void initFileSystem() throws ConcurrentModificationException, IOException { String[] directories = { TODO, TODO_RUNNING, FAILED_1, FAILED_1_RUNNING, @@ -170,6 +222,8 @@ } } + /* *** methods for defining some usual paths *** */ + /** * given a name and a version for an application, returns the path where * application data can be found on diswork file system. @@ -188,6 +242,27 @@ } /** + * Given a job description, returns the place on disworkFS where all data + * for this jobs should be stored + * @param jobDescription + * @return an absolute path + */ + protected String getPathForJob(JobDescription jobDescription) { + return getPathForJob(jobDescription.getJobId()); + } + + /** + * Given a job id, returns the place on disworkFS where all data + * for this jobs should be stored + * @param jobDescription + * @return a path + */ + protected String getPathForJob(String jobId) { + // all jobs are stored in home dir + return homeDir + "/" + jobId; + } + + /** * every-time a link to a job is created or modified in the job, his name * has to be generated by this method * @return the name to use for a link @@ -196,9 +271,22 @@ return ((Long) System.currentTimeMillis()).toString(); } - public void submitApplication(String applicationName, - String applicationVersion, - InputStream applicationData) throws DisworkException { + /** + * Provide an application to all nodes. Once provided, all nodes will be + * able to perform a job with this application + * + * An application can be uploaded in different version. If a given + * application in the given version is already on the File System, nothing + * is done : the application file is <strong>NOT</strong> replaced. + * + * @param applicationName the name of the application + * @param applicationVersion the version of the application + * @param applicationData an InputStream on the application .zip file + * @throws DisworkException if an error occurs while uploading the file + */ + public void submitApplication(String applicationName, + String applicationVersion, InputStream applicationData) + throws DisworkException { // the place where dependency will be stored String path = getPathForDependency(applicationName, applicationVersion); @@ -221,108 +309,87 @@ throw new DisworkException("unable to write", e); } } - - /** - * Given a job description, returns the place on disworkFS where all data - * for this jobs should be stored - * @param jobDescription - * @return a path - */ - protected String getJobPath(JobDescription jobDescription) { - return getJobPath(jobDescription.getJobId()); - } - /** - * Given a job description, returns the place on disworkFS where all data - * for this jobs should be stored - * @param jobDescription - * @return a path - */ - protected String getJobPath(String jobId) { - // all jobs are stored in home dir - return homeDir + "/" + jobId; - } - - /** - * - */ - public JobDescription newJob() throws IOException { - Random random = new Random(); - - boolean alreadyExists = true; - String newJobIntendifier = null; - while (alreadyExists) { - Integer randomInteger = random.nextInt(); - newJobIntendifier = "job_" + randomInteger.toString(); - alreadyExists = fileSystem.exists(getJobPath(newJobIntendifier)); - } - - // create both job path and sub-directory .diswork - fileSystem.createDirectories(getJobPath(newJobIntendifier) + "/" + ".diswork"); - log.info("created new job " + newJobIntendifier); - return new JobDescription(newJobIntendifier); - } - public void submitJob(JobDescription jobDescription) throws DisworkException { - submitJob(jobDescription, new HashMap<String, InputStream>()); - } - - public void submitJob(JobDescription jobDescription, - Map<String, InputStream> inputFiles) throws DisworkException { - // check dependencies, throw exception - - if (inputFiles.size() + jobDescription.getStagingInputUrls().size() + if (jobDescription.getInputData().size() + jobDescription.getStagingInputUrls().size() < jobDescription.getStagingInput().size()) { // dependencies are missing } try { - String dependencyPath = - getPathForDependency(jobDescription.getApplicationName(), - jobDescription.getApplicationVersion()); - log.info("looking for " + dependencyPath); - if (!fileSystem.exists(dependencyPath)) { - throw new DisworkException("job require a dependency " + - jobDescription.getApplicationName() + "-" + - jobDescription.getApplicationVersion() + " that is " + - "not available"); + // trying to put the job in a new directory of home + Random random = new Random(); + boolean alreadyExists = true; + String newJobIntendifier = null; + while (alreadyExists) { + Integer randomInteger = random.nextInt(); + newJobIntendifier = "job_" + randomInteger.toString(); + alreadyExists = fileSystem.exists(getPathForJob(newJobIntendifier)); } + + jobDescription.setJobId(newJobIntendifier); - String jobDir = getJobPath(jobDescription); + // create both job path and sub-directory .diswork + fileSystem.createDirectories( + getPathForJob(jobDescription) + "/" + ".diswork"); + + if (jobDescription.applicationName != null) { + String dependencyPath = getPathForDependency( + jobDescription.getApplicationName(), + jobDescription.getApplicationVersion()); + log.info("looking for " + dependencyPath); + + if (!fileSystem.exists(dependencyPath)) { + throw new DisworkException("job require a dependency " + + jobDescription.getApplicationName() + "-" + + jobDescription.getApplicationVersion() + + " that is not available"); + } + } else { + log.info("no dependency specified for " + jobDescription); + } + String jobDir = getPathForJob(jobDescription); + if(!fileSystem.exists(jobDir)) { // strange ! } + // creating an empty log file + log.info("creating log file " + jobDir + "/" + LOG_PATH); fileSystem.write(jobDir + "/" + LOG_PATH, IOUtils.toInputStream("")); - + + // writing the JSDL file InputStream jobJSDL = IOUtils.toInputStream(jobDescription.toJSDL()); fileSystem.write(jobDir + "/" + JSDL_PATH, jobJSDL); // file staging - for (String fileName : inputFiles.keySet()) { - fileSystem.write(jobDir + "/" + fileName, inputFiles.get(fileName)); + for (String fileName : jobDescription.getInputData().keySet()) { + fileSystem.write(jobDir + "/" + fileName, + jobDescription.getInputData().get(fileName)); } - + + // FIXME 20100609 bleny may throws exception if jobs are proposed + // at a same time + // propose job String linkName = newJobLinkName(); - - // FIXME 20100609 bleny may throws exception if jobs are proposed - // at a same time fileSystem.createSymbolicLink(TODO + "/" + linkName, jobDir); + log.info("job submited"); + } catch (IOException e) { log.error("file system error", e); throw new DisworkFileSystemException("file system error", e); } } - public boolean checkLogContains(JobDescription job, + protected boolean checkLogContains(JobDescription job, String pattern) throws DisworkException { try { - String jobPath = getJobPath(job); + String jobPath = getPathForJob(job); List<?> entries = IOUtils.readLines(fileSystem.read(jobPath + "/" + LOG_PATH)); return entries.contains(pattern); } catch (FileNotFoundException e) { @@ -345,28 +412,107 @@ public boolean isFailed(JobDescription job) throws DisworkException { return isFinished(job) && !isSuccessful(job); } + + public Map<String, InputStream> getResults(JobDescription job) + throws DisworkException { + if (isFinished(job)) { + Map<String, InputStream> results = new HashMap<String, InputStream>(); + for (String fileName : job.getStagingOutput()) { + String jobPath = getPathForJob(job); + try { + InputStream result = fileSystem.read(jobPath + "/" + fileName); + results.put(fileName, result); + } catch (FileNotFoundException e) { + throw new DisworkException("an expected file is missing", e); + } catch (IOException e) { + log.info("file system error ", e); + throw new DisworkException("file system error ", e); + } + } + return results; + } else { + throw new DisworkException("can't get results for unfinished job " + + job); + } + } + + /** close the daemon (stop all workers) + * update statistices and delete all temporary data + */ @Override public void close() throws IOException { workers.stop(); + + // updating total uptime statistic + Long totalUptime = getTotalUptime(); + log.info("saving total uptime: " + totalUptime); + config.setTotalUptime(totalUptime); + //config.saveForUser(); + fileSystem.close(); + + FileUtil.deleteRecursively(config.getTempDirectory()); } - public Map<String, InputStream> getResults(JobDescription job) - throws DisworkException { - Map<String, InputStream> results = new HashMap<String, InputStream>(); - for (String fileName : job.getStagingOutput()) { - String jobPath = getJobPath(job); - try { - InputStream result = fileSystem.read(jobPath + "/" + fileName); - results.put(fileName, result); - } catch (FileNotFoundException e) { - throw new DisworkException("an expected file is missing", e); - } catch (IOException e) { - log.info("file system error ", e); - throw new DisworkException("file system error ", e); + /* *** methods about statistics *** */ + + protected Long getTotalUptime() { + Long currentTime = System.currentTimeMillis(); + Long sessionUptime = currentTime - sessionStartTime; + Long totalUptime = config.getTotalUptime() + sessionUptime; + return totalUptime; + } + + /** + * return a ratio of the total uptime on the time since the first demon + * run. For exemple, if daemon was up 2 hours and was installed 4 hours ago + * this method return 0.5. + * @return + */ + public Double getUptimeRatio() { + Double uptimeRatio = (double) getTotalUptime().longValue() + / ((double) System.currentTimeMillis() + - (double) config.getFirstRunTime().longValue()); + return uptimeRatio; + } + + /** get informations on hardware available on the global Diswork system + * + * @return + * @throws DisworkException + */ + public Map<String, Integer> getGlobalStats() throws DisworkException { + try { + Map<String, Integer> stats = new HashMap<String, Integer>(); + stats.put("available_processors", 0); + + List<String> homeDirs = fileSystem.readDirectory(HOME); + for (String homeDir : homeDirs) { + String hardinfo = IOUtils.toString(fileSystem.read( + HOME + "/" + homeDir + "/" + HARDINFO_PATH)); + String[] infos = hardinfo.split("\n"); + + // reading the OS name + if (!stats.containsKey(infos[0])) { + stats.put(infos[0], 0); + } + // reading the architecture + stats.put(infos[0], stats.get(infos[0]) + 1); + if (!stats.containsKey(infos[1])) { + stats.put(infos[1], 0); + } + stats.put(infos[1], stats.get(infos[1]) + 1); + + // reading the number of processors + stats.put("available_processors", + stats.get("available_processors") + + Integer.parseInt(infos[2])); } + return stats; + } catch (IOException e) { + log.info("file system error ", e); + throw new DisworkException("file system error ", e); } - return results; } } \ No newline at end of file Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java 2010-06-14 08:53:35 UTC (rev 75) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java 2010-06-16 15:55:53 UTC (rev 76) @@ -24,6 +24,9 @@ */ package org.nuiton.diswork.daemon; +import java.lang.management.ManagementFactory; +import java.lang.management.OperatingSystemMXBean; +import java.lang.management.RuntimeMXBean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,12 +37,21 @@ public class DisworkDaemonRunner { private static final Log log = LogFactory.getLog(DisworkDaemonRunner.class); - + /** * @param args */ public static void main(String[] args) { + OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean(); + System.out.println(os.getArch()); + System.out.println(os.getAvailableProcessors()); + System.out.println(os.getName()); + System.out.println(os.getVersion()); + + RuntimeMXBean run = ManagementFactory.getRuntimeMXBean(); + System.out.println(run.getUptime()); + // consider args // DisworkDaemon node = new DisworkDaemon(config); Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java 2010-06-14 08:53:35 UTC (rev 75) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java 2010-06-16 15:55:53 UTC (rev 76) @@ -24,46 +24,86 @@ */ package org.nuiton.diswork.daemon; -import java.io.Serializable; +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.input.NullInputStream; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jdom.Attribute; +import org.jdom.Document; +import org.jdom.Element; +import org.jdom.JDOMException; +import org.jdom.Namespace; +import org.jdom.input.SAXBuilder; + /** * + * When writing your command line, you should use provided tokens. Token + * are piece of command that will be replaced by the job-executor when + * needed. * + * <dl> + * <dt>%java</dt> + * </dd>will be replaced by the actual path of java executable + * into the JRE</dd> + * </dl> * + * This class provides methods to read and parse those data to an XML file + * following (as far as possible), the Job Submission Description Language + * Specification. + * + * @see http://en.wikipedia.org/wiki/Job_Submission_Description_Language + * @see http://www.gridforum.org/documents/GFD.56.pdf + * * @author bleny */ -public class JobDescription implements Serializable { +public class JobDescription { - private static final long serialVersionUID = -8493700934802808925L; - - /** an id for diswork */ + private static final Log log = LogFactory.getLog(JobDescription.class); + + /** an id for diswork, not stored in the JSDL file */ protected String jobId; + /** a name for the job, it's a convenience for the user */ protected String jobName; + /** the name of the application needed for this job + * can be null if no application is needed for complete this job + */ protected String applicationName; + + /** the version of the application + * can't be null applicationName is set + */ protected String applicationVersion; + /** the command line to execute this job */ protected String commandLine; /** all files expected at the beginning of the job */ - protected List<String> stagingInput = new ArrayList<String>(); + protected List<String> input = new ArrayList<String>(); /** all files expected at the end of the job */ - protected List<String> stagingOutput = new ArrayList<String>(); + protected List<String> output = new ArrayList<String>(); - /** the name of a file and the URI where to get it */ - protected Map<String, URL> stagingInputUrls = new HashMap<String, URL>(); + /** the name of some input files and the URI where to get it */ + protected Map<String, URL> inputUrls = new HashMap<String, URL>(); + /** */ + protected Map<String, InputStream> inputData = new HashMap<String, InputStream>(); + /** file where to read the standard input, may be null */ protected String standardInput; - /** file where to write the standard output */ + /** file where to write the standard output, may be null */ protected String standardOutput; /** TOKENS are piece of string you can use for writing command lines */ @@ -85,9 +125,15 @@ this.jobId = jobId; } + public JobDescription() {} + public String getJobId() { return jobId; } + + protected void setJobId(String jobId) { + this.jobId = jobId; + } public String getCommandLine() { String result = commandLine; @@ -108,19 +154,39 @@ public String getApplicationName() { return applicationName; } + + /** + * this method is protected to force the use of + * {@link #setApplication(String, String)} which need a version. + * This method is still here because it is used by + * {@link #parseJSDL(String)} + */ + protected void setApplicationName(String applicationName) { + this.applicationName = applicationName; + } public String getApplicationVersion() { return applicationVersion; } + /** + * this method is protected to force the use of + * {@link #setApplication(String, String)} which need a version. + * This method is still here because it is used by + * {@link #parseJSDL(String)} + */ + protected void setApplicationVersion(String applicationVersion) { + this.applicationVersion = applicationVersion; + } + public void setJobName(String jobName) { this.jobName = jobName; } public void setApplication(String applicationName, String applicationVersion) { - this.applicationName = applicationName; - this.applicationVersion = applicationVersion; + setApplicationName(applicationName); + setApplicationVersion(applicationVersion); } @Override @@ -132,38 +198,192 @@ protected static Map<String, JobDescription> map = new HashMap<String, JobDescription>(); public String toJSDL() { + /* count += 1; map.put(count.toString(), this); return count.toString(); - } + */ + + String jsdl = + "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" + + "<jsdl:JobDefinition xmlns=\"http://www.example.org/\"\n" + + " xmlns:jsdl=\"http://schemas.ggf.org/jsdl/2005/11/jsdl\"\n" + + " xmlns:jsdl-posix=\"http://schemas.ggf.org/jsdl/2005/11/jsdl-posix\"\n" + + " xmlns:diswork=\"http://nuiton.org/projects/show/diswork\"\n" + + " xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\">\n" + + "<jsdl:JobDescription>\n" + + " <jsdl:JobIdentification>\n" + + " <jsdl:JobName>" + jobName + "</jsdl:JobName>\n" + + " </jsdl:JobIdentification>\n" + + " <jsdl:Application>\n"; + if (applicationName != null) { + jsdl += " <jsdl:ApplicationName>" + applicationName + "</jsdl:ApplicationName>\n" + + " <jsdl:ApplicationVersion>" + applicationVersion + "</jsdl:ApplicationVersion>\n"; + } + jsdl += " <jsdl-posix:POSIXApplication>\n" + + " <jsdl-posix:Executable />\n" + + " <jsdl-posix:Argument>" + commandLine + "</jsdl-posix:Argument>\n"; + if (standardInput != null) { + jsdl += " <jsdl-posix:Input>" + standardInput + "</jsdl-posix:Input>\n"; + } + if (standardOutput != null) { + jsdl += " <jsdl-posix:Output>" + standardOutput + "</jsdl-posix:Output>\n"; + } + jsdl += " </jsdl-posix:POSIXApplication>\n" + + " </jsdl:Application>\n"; + + for (String inputName : input) { + jsdl += " <jsdl:DataStaging diswork:type=\"in\">\n" + + " <jsdl:FileName>" + inputName + "</jsdl:FileName>\n"; + if (inputUrls.containsKey(inputName)) { + jsdl += + " <jsdl:Source>\n" + + " <jsdl:URI>" + inputUrls.get(inputName) + "</jsdl:URI>\n" + + " </jsdl:Source>\n"; + } + jsdl += " </jsdl:DataStaging>\n"; + } - public static JobDescription parseJSDL(String jsdl) { - return map.get(jsdl); + for (String outputName : output) { + jsdl += " <jsdl:DataStaging diswork:type=\"out\">\n" + + " <jsdl:FileName>" + outputName + "</jsdl:FileName>\n" + + " </jsdl:DataStaging>\n"; + } + + jsdl += + "</jsdl:JobDescription>\n" + + "</jsdl:JobDefinition>\n"; + + return jsdl; + } - public void addStagingInput(String fileName, URL source) { - stagingInput.add(fileName); - stagingInputUrls.put(fileName, source); + /** + * Factory method to get a JobDescription from JSDL + * @param jsdl the content of the JSDL file + * @return a job description representing the content of the JSDL + * @throws IOException if JSDL is malformed or if an URL is malformed + */ + public static JobDescription parseJSDL(String jsdl) throws IOException { + // TODO 20100616 bleny correctly set dependency to JDOM in pom.xml + JobDescription result = new JobDescription(); + + try { + SAXBuilder builder = new SAXBuilder(); + Document document = builder.build(IOUtils.toInputStream(jsdl)); + Element jobDefinition = document.getRootElement(); + + // namespaces + Namespace jsdlNamespace = jobDefinition.getNamespace("jsdl"); + Namespace jsdlPosixNamespace = jobDefinition.getNamespace("jsdl-posix"); + Namespace disworkNamespace = jobDefinition.getNamespace("diswork"); + + // main element + Element jobDescription = jobDefinition.getChild("JobDescription", + jsdlNamespace); + + // job identification + Element jobIdentification = jobDescription.getChild + ("JobIdentification", jsdlNamespace); + Element jobName = jobIdentification.getChild("JobName", + jsdlNamespace); + result.setJobName(jobName.getText()); + + // application + Element application = jobDescription.getChild("Application", + jsdlNamespace); + Element applicationName = application.getChild("ApplicationName", + jsdlNamespace); + if (application != null) { + Element applicationVersion = application.getChild + ("ApplicationVersion", jsdlNamespace); + result.setApplication(applicationName.getText(), + applicationVersion.getText()); + } + + Element POSIXApplication = application.getChild("POSIXApplication", + jsdlPosixNamespace); + + Element argument = POSIXApplication.getChild("Argument", + jsdlPosixNamespace); + result.setCommandLine(argument.getText()); + + Element input = POSIXApplication.getChild("Input", + jsdlPosixNamespace); + if (input != null) { + result.setStandardInput(input.getText()); + } + Element output = POSIXApplication.getChild("Output", + jsdlPosixNamespace); + if (input != null) { + result.setStandardOutput(output.getText()); + } + + // staging + List<Element> dataStagings = jobDescription.getChildren + ("DataStaging", jsdlNamespace); + for (Element dataStaging : dataStagings) { + Attribute type = dataStaging.getAttribute("type", + disworkNamespace); + + Element fileName = dataStaging.getChild("FileName", + jsdlNamespace); + + if (type != null && "out".equals(type.getValue())) { + // type="out" + result.addOutput(fileName.getText()); + } else { + // type="in" + Element source = dataStaging.getChild("Source", + jsdlNamespace); + if (source != null) { + Element URI = source.getChild("URI", jsdlNamespace); + result.addInput(fileName.getText(), new URL(URI.getText())); + } else { + result.addInput(fileName.getText(), new NullInputStream(0)); + } + + // type not set + if (type == null) { + result.addOutput(fileName.getText()); + } + } + } + + } catch (JDOMException e) { + log.error("can't read malformed JSDL file", e); + throw new IOException("can't read malformed JSDL file", e); + } catch (MalformedURLException e) { + log.error("malformed URL", e); + throw new IOException("malformed URL", e); + } + return result; } - public void addStagingInput(String fileName) { - stagingInput.add(fileName); + public void addInput(String fileName, URL source) { + input.add(fileName); + inputUrls.put(fileName, source); } + + public void addInput(String fileName, InputStream source) { + input.add(fileName); + inputData.put(fileName, source); + } - public void addStagingOutput(String fileName) { - stagingOutput.add(fileName); + public void addOutput(String fileName) { + output.add(fileName); } public List<String> getStagingInput() { - return stagingInput; + return input; } public List<String> getStagingOutput() { - return stagingOutput; + return output; } public Map<String, URL> getStagingInputUrls() { - return stagingInputUrls; + return inputUrls; } public String getStandardInput() { @@ -182,4 +402,8 @@ standardOutput = fileName; } + public Map<String, InputStream> getInputData() { + return inputData; + } + } \ No newline at end of file Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-06-14 08:53:35 UTC (rev 75) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-06-16 15:55:53 UTC (rev 76) @@ -32,16 +32,19 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.URL; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import org.apache.commons.io.FilenameUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.nuiton.diswork.fs.DisworkFileSystem; +import org.nuiton.util.FileUtil; import org.nuiton.util.ZipUtil; /** @@ -57,14 +60,18 @@ /** time to wait beetween two look for a job */ protected static final int JOB_WAIT = 10 * 1000; + // TODO 20100614 bleny make it configurable + /** after this time (ms), a job is considered as no longer running */ + protected static final long MAX_JOB_RUNNING_TIME = 24 * 60 * 60 * 1000; + static { RUNNING_MOVE.put(DisworkDaemon.TODO, DisworkDaemon.TODO_RUNNING); RUNNING_MOVE.put(DisworkDaemon.FAILED_1, DisworkDaemon.FAILED_1_RUNNING); RUNNING_MOVE.put(DisworkDaemon.FAILED_2, DisworkDaemon.FAILED_2_RUNNING); - RUNNING_MOVE.put(DisworkDaemon.TODO, DisworkDaemon.FAILED_1); - RUNNING_MOVE.put(DisworkDaemon.FAILED_1, DisworkDaemon.FAILED_2); - RUNNING_MOVE.put(DisworkDaemon.FAILED_2, DisworkDaemon.FAILED_3); + FAILED_MOVE.put(DisworkDaemon.TODO, DisworkDaemon.FAILED_1); + FAILED_MOVE.put(DisworkDaemon.FAILED_1, DisworkDaemon.FAILED_2); + FAILED_MOVE.put(DisworkDaemon.FAILED_2, DisworkDaemon.FAILED_3); } private static final Log log = LogFactory.getLog(WorkersManager.class); @@ -72,15 +79,17 @@ protected DisworkFileSystem fileSystem; protected DisworkConfig config; - // Pool of workers ? - protected Worker worker; + // Pool of workers + protected List<Worker> workers = new ArrayList<Worker>(); - // Activity strategy ? - - protected class Worker implements Runnable { + protected ActivityStrategy activityStrategy; + + protected class Worker extends Thread { public boolean shouldStop = false; + public WorkersManager manager; + protected void log(String jobPath, String message) throws IOException { String logPath = jobPath + "/" + DisworkDaemon.LOG_PATH; InputStream oldLogAsStream = fileSystem.read(logPath); @@ -113,24 +122,32 @@ log.info("will run job " + jobDescription); // create temp dir - File jobDir = new File(config.getTempDirectory(), jobDescription.getJobId()); + Random random = new Random(); + File jobDir = new File(config.getTempDirectory(), + String.valueOf(random.nextInt())); jobDir.mkdirs(); // download application - String applicationPath = DisworkDaemon.getPathForDependency( + if (jobDescription.getApplicationName() != null) { + log.info("dependency needed for " + jobDescription + " (" + + jobDescription.getApplicationName() + "-" + + jobDescription.getApplicationVersion() + ")"); + String applicationPath = DisworkDaemon.getPathForDependency( jobDescription.getApplicationName(), jobDescription.getApplicationVersion()); - InputStream applicationData = fileSystem.read(applicationPath); - - - File application = new File(jobDir, FilenameUtils.getName(applicationPath)); - application.createNewFile(); - log.info("will create " + application.getAbsolutePath()); - OutputStream out = new FileOutputStream(application); - IOUtils.copy(applicationData, out); - // unzip application - ZipUtil.uncompress(application, jobDir); - + InputStream applicationData = fileSystem.read(applicationPath); + + File application = new File(jobDir, + FilenameUtils.getName(applicationPath)); + application.createNewFile(); + log.info("will create " + application.getAbsolutePath()); + OutputStream out = new FileOutputStream(application); + IOUtils.copy(applicationData, out); + // unzip application + ZipUtil.uncompress(application, jobDir); + } else { + log.info("no dependency specified for " + jobDescription); + } // staging input files for (String fileName : jobDescription.getStagingInput()) { File localCopy = new File(jobDir, fileName); @@ -151,16 +168,17 @@ // executing the job String commandLine = jobDescription.getCommandLine(); log.info("calling " + commandLine); - String[] bidule = commandLine.split(" "); - ProcessBuilder builder = new ProcessBuilder(bidule); + String[] commandLineElements = commandLine.split(" "); + ProcessBuilder builder = new ProcessBuilder(commandLineElements); builder.directory(jobDir); builder.redirectErrorStream(true); Process job = builder.start(); - // plugin a file on the standard input + // plugging a file on the standard input String standardInputFileName = jobDescription.getStandardInput(); if (standardInputFileName != null) { - InputStream input = new FileInputStream(new File(jobDir, standardInputFileName)); + InputStream input = new FileInputStream( + new File(jobDir, standardInputFileName)); IOUtils.copy(input, job.getOutputStream()); } @@ -177,7 +195,8 @@ // dump the standard output in a file String standardOutputFileName = jobDescription.getStandardOutput(); if (standardOutputFileName != null) { - OutputStream output = new FileOutputStream(new File(jobDir, standardOutputFileName)); + OutputStream output = new FileOutputStream( + new File(jobDir, standardOutputFileName)); IOUtils.copy(job.getInputStream(), output); } @@ -201,78 +220,207 @@ } // clean up the job directory - // FileUtil.deleteRecursively(jobDir); + FileUtil.deleteRecursively(jobDir); boolean success = exitValue == 0; if (success) { - log(jobPath, "DONE\nFINISHED"); + log(jobPath, "DONE"); } else { - log(jobPath, "FAILED\nFINISHED"); + log(jobPath, "FAILED"); } return success; } + + public String getFistJobName(String path) throws IOException { + List<String> jobsNames = fileSystem.readDirectory(path); + if (jobsNames.size() == 0) { + return null; + } else { + Collections.sort(jobsNames); + return jobsNames.get(0); + } + } - @Override - public void run() { - while (! shouldStop) { - // try to find a new job - try { - // TODO 20100609 bleny watch for other jobs - List<String> jobsNames = - fileSystem.readDirectory(DisworkDaemon.TODO); - if (jobsNames.size() != 0) { - // sort and choose the first, due to names, it should be - // the more ancient one - Collections.sort(jobsNames); - String oldName = jobsNames.get(0); - - String newName = DisworkDaemon.newJobLinkName(); - fileSystem.move(DisworkDaemon.TODO + "/" + oldName, - DisworkDaemon.TODO_RUNNING + "/" + newName); - - boolean jobSuccess = runJob(DisworkDaemon.TODO_RUNNING + "/" + newName); - - oldName = newName; - newName = DisworkDaemon.newJobLinkName(); - - if (jobSuccess) { - fileSystem.move( - DisworkDaemon.TODO_RUNNING + "/" + oldName, - DisworkDaemon.DONE + "/" + newName); - } else { - fileSystem.move( - DisworkDaemon.TODO_RUNNING + "/" + oldName, - DisworkDaemon.FAILED_3 + "/" + newName); + protected void findAJobAndRunIt() { + // try to find a new job + try { + String jobLinkDir = null; + String jobLinkName = null; + + String[] runningJobsDirs = { DisworkDaemon.FAILED_2_RUNNING, + DisworkDaemon.FAILED_1_RUNNING, + DisworkDaemon.TODO_RUNNING + }; + for (String path : runningJobsDirs) { + String oldName = getFistJobName(path); + if (oldName != null) { + Long linkAge = System.currentTimeMillis() + - Long.parseLong(oldName); + if (linkAge <= MAX_JOB_RUNNING_TIME) { + log.info("taking old job (age = " + linkAge + ")"); + jobLinkDir = path; + jobLinkName = oldName; } + } + } + + if (jobLinkDir == null) { + String[] jobsDirs = { DisworkDaemon.FAILED_2, + DisworkDaemon.FAILED_1, + DisworkDaemon.TODO + }; + for (String path : jobsDirs) { + String oldName = getFistJobName(path); + if (oldName != null) { // take it + jobLinkDir = path; + jobLinkName = oldName; + } + } + } + + if (jobLinkDir == null) { + log.info("nothing to do"); + Thread.sleep(JOB_WAIT); + } else { + // move the link before running the job + String oldPath = jobLinkDir + "/" + jobLinkName; + log.info("job found at " + oldPath); + + String newPath = RUNNING_MOVE.get(jobLinkDir) + "/" + + DisworkDaemon.newJobLinkName(); + + log.info("moving " + oldPath + " to " + newPath); + fileSystem.move(oldPath, newPath); + + // run the job + boolean jobSuccess = runJob(newPath); + + // move the link after the job + oldPath = newPath; + String newDir = null; + if (jobSuccess) { + newDir = DisworkDaemon.DONE; } else { - log.info("nothing to do"); - Thread.sleep(JOB_WAIT); + newDir = FAILED_MOVE.get(jobLinkDir); } + newPath = newDir + "/" + DisworkDaemon.newJobLinkName(); - } catch (IOException e) { - log.error("error while reading jobs", e); - // TODO 20100611 bleny manage exception + log.info("moving " + oldPath + " to " + newPath); + fileSystem.move(oldPath, newPath); + + // mark the job has finished if done or failed too many + // times + if ( newDir == DisworkDaemon.DONE || + newDir == DisworkDaemon.FAILED_3) { + log.info("marking " + newPath + " as finished"); + log(newPath, "FINISHED"); + } + } + + + } catch (IOException e) { + log.error("error while reading jobs", e); + // TODO 20100611 bleny manage exception + } catch (InterruptedException e) { + log.info("exception catch", e); + // TODO 20100611 bleny manage exception + } + + } + + @Override + public void run() { + while (! shouldStop) { + if (manager.getActivityStrategy().canWork()) { + findAJobAndRunIt(); + } + try { + Thread.sleep(10*1000); } catch (InterruptedException e) { + // TODO 20100615 bleny Auto-generated catch block log.info("exception catch", e); - // TODO 20100611 bleny manage exception + e.printStackTrace(); } } } - } public WorkersManager(DisworkFileSystem fileSystem, DisworkConfig config) { this.fileSystem = fileSystem; this.config = config; - } - public void start() { - worker = new Worker(); - Thread t = new Thread(worker); - t.start(); + log.info("will start " + config.getNumberOfWorkers() + " workers"); + for (int i = 1 ; i <= config.getNumberOfWorkers() ; i++) { + Worker worker = new Worker(); + worker.manager = this; + worker.start(); + workers.add(worker); + } + + String initialStrategy = config.getActivityStrategy(); + if ( "none".equals(initialStrategy)) { + activeNoActivityStrategy(); + } else if ("unlimited".equals(initialStrategy)) { + activeUnlimitedActivityStrategy(); + } else if ("limited".equals(initialStrategy)) { + activeLimitedActivityStrategy(); + } else if ("scheduled".equals(initialStrategy)) { + activeScheduledActivityStrategy(); + } else { + log.error("wrong config directive " + initialStrategy); + activeNoActivityStrategy(); + } + } public void stop() { - worker.shouldStop = true; + stop(false); } + + public void stop(boolean now) { + // asking to all threads to stop + for (Worker worker : workers) { + worker.shouldStop = true; + } + + if( !now ) { + // waiting for them to actually have finished + for (Worker worker : workers) { + while (worker.isAlive()) { + try { + Thread.sleep(10 * 1000); + } catch (InterruptedException e) { + // TODO 20100615 bleny Auto-generated catch block + log.info("exception catch", e); + e.printStackTrace(); + } + } + } + } + } + + public ActivityStrategy getActivityStrategy() { + return activityStrategy; + } + + public void setActivityStrategy(ActivityStrategy activityStrategy) { + this.activityStrategy = activityStrategy; + } + + public void activeNoActivityStrategy() { + activityStrategy = new ActivityStrategy.NoActivity(); + } + + public void activeUnlimitedActivityStrategy() { + activityStrategy = new ActivityStrategy.UnlimitedActivity(); + } + + public void activeLimitedActivityStrategy() { + activityStrategy = new ActivityStrategy.LimitedActivity(); + } + + public void activeScheduledActivityStrategy() { + activityStrategy = new ActivityStrategy.ScheduledActivity(); + } + } \ No newline at end of file Modified: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java =================================================================== --- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java 2010-06-14 08:53:35 UTC (rev 75) +++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java 2010-06-16 15:55:53 UTC (rev 76) @@ -5,7 +5,6 @@ import java.io.InputStream; import java.net.URL; -import java.util.HashMap; import java.util.Map; import org.apache.commons.io.IOUtils; @@ -15,9 +14,9 @@ public class DisworkDaemonTest { - protected DisworkDaemon daemon; + protected static DisworkDaemon daemon; - protected static int port = 39999; + protected static int port = 49999; @Before public void setUp() throws Exception { @@ -31,19 +30,27 @@ @After public void tearDown() throws Exception { + Thread.sleep(5000); daemon.close(); } + @Test + public void simpleSubmit() throws Exception { + JobDescription job = new JobDescription(); + job.setCommandLine("java -version"); + daemon.submitJob(job); + } + @Test(expected = DisworkException.class) public void testSubmitWithoutDependency() throws Exception { - JobDescription job = daemon.newJob(); + JobDescription job = new JobDescription(); job.setApplication("non-existing-application", "0.0"); daemon.submitJob(job); } @Test public void testSubmitSuccessfulJob() throws Exception { - JobDescription job = daemon.newJob(); + JobDescription job = new JobDescription(); job.setApplication("fake-app", "1.0"); job.setCommandLine("%java -jar fake-app.jar"); daemon.submitJob(job); @@ -53,11 +60,12 @@ } assertTrue(daemon.isSuccessful(job)); + } @Test public void testSubmitFailJob() throws Exception { - JobDescription job = daemon.newJob(); + JobDescription job = new JobDescription(); job.setApplication("fake-app", "1.0"); job.setCommandLine("%java -jar fake-app.jar fail"); daemon.submitJob(job); @@ -69,38 +77,73 @@ assertTrue(daemon.isFailed(job)); } + /** + * Create a complex job, submit it and check the results: + * <ul> + * <li>the job need an application (fake-app version 1.0);</li> + * <li>the job come with input data (input.txt);</li> + * <li>the job need another input file to be downloaded from http;</li> + * <li>the job ask for a file to be provided as a result at the end + * of the job.</li> + * <li>standard output is asked has result</li> + * </ul> + * + */ @Test public void testStaging() throws Exception { - JobDescription job = daemon.newJob(); + JobDescription job = new JobDescription(); job.setJobName("My Job"); job.setApplication("fake-app", "1.0"); job.setCommandLine("%java -jar fake-app.jar"); - job.addStagingInput("example.com_index", new URL("http://www.example.com/")); - job.addStagingInput("input.txt"); - job.addStagingOutput("output.txt"); + // defining data in input + job.addInput("example.com_index", new URL("http://www.example.com/")); + job.addInput("input.txt", ClassLoader.getSystemResourceAsStream("input.txt")); + // defining expected data in output + job.addOutput("output.txt"); + job.addOutput("example.com_index"); + + // setting standard input and output file job.setStandardInput("input.txt"); - job.setStandardOutput("output.txt"); - - Map<String, InputStream> data = new HashMap<String, InputStream>(); - data.put("input.txt", ClassLoader.getSystemResourceAsStream("input.txt")); - - daemon.submitJob(job, data); + job.setStandardOutput("output.txt"); + + // submit the job + daemon.submitJob(job); + // waiting for the job to finish while(! daemon.isFinished(job)) { - Thread.sleep(1 * 1000); + Thread.sleep(5 * 1000); } + // check that job is successful assertTrue(daemon.isSuccessful(job)); + // checking the presence of results Map<String, InputStream> results = daemon.getResults(job); - + assertEquals(2, results.size()); assertTrue(results.containsKey("output.txt")); + assertTrue(results.containsKey("example.com_index")); + // checking that results are what was expected String output = IOUtils.toString(results.get("output.txt")); assertEquals("a print on standard output\n", output); - + output = IOUtils.toString(results.get("example.com_index")); + assertTrue(output.contains("Example Web Page")); } + /** + * tests the stats given by the daemon + * @throws Exception + */ + @Test + public void testStats() throws Exception { + daemon.getUptimeRatio(); + + Map<String, Integer> stats = daemon.getGlobalStats(); + // deamon should read 3 stats : 1 OS, 1 architecture and the number + // of processors + assertEquals(3, stats); + } + } Added: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/JobDescriptionTest.java =================================================================== --- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/JobDescriptionTest.java (rev 0) +++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/JobDescriptionTest.java 2010-06-16 15:55:53 UTC (rev 76) @@ -0,0 +1,98 @@ +package org.nuiton.diswork.daemon; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.net.URL; + +import org.apache.commons.collections.ListUtils; +import org.apache.commons.io.IOUtils; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests in this class create a complex JobDescription and check that + * transformation to JSDL and back to an object keep all informations + * + * @author bleny + */ +public class JobDescriptionTest { + + protected static JobDescription job; + protected static JobDescription job2; + + @Before + public void setUp() throws Exception { + job = new JobDescription(); + job.setJobName("My Job"); + job.setApplication("fake-app", "1.0"); + job.setCommandLine("%java -jar fake-app.jar"); + + job2 = new JobDescription(); + job2.setJobName("My Job"); + job2.setApplication("fake-app", "1.0"); + job2.setCommandLine("%java -jar fake-app.jar"); + + + // defining data in input + job2.addInput("example.com_index", new URL("http://www.example.com/")); + job2.addInput("input.txt", IOUtils.toInputStream("")); + + // defining expected data in output + job2.addOutput("output.txt"); + job2.addOutput("example.com_index"); + + // setting standard input and output file + job2.setStandardInput("input.txt"); + job2.setStandardOutput("output.txt"); + + } + + @Test + public void testParseJSDL1() throws Exception { + try { + JobDescription jobCopy = JobDescription.parseJSDL(job.toJSDL()); + assertNotNull(jobCopy); + assertEquals(job.getJobName(), jobCopy.getJobName()); + assertEquals(job.getApplicationName(), + jobCopy.getApplicationName()); + assertEquals(job.getApplicationVersion(), + jobCopy.getApplicationVersion()); + assertEquals(job.getCommandLine(), jobCopy.getCommandLine()); + assertEquals(job.getStandardInput(), jobCopy.getStandardInput()); + assertEquals(job.getStandardOutput(), jobCopy.getStandardOutput()); + } catch (IOException e) { + fail(); + throw e; + } + } + + @Test + public void testParseJSDL2() throws Exception { + try { + JobDescription job2Copy = JobDescription.parseJSDL(job2.toJSDL()); + assertEquals(job2.getCommandLine(), job2Copy.getCommandLine()); + assertEquals(job2.getStandardInput(), job2Copy.getStandardInput()); + assertEquals(job2.getStandardOutput(), + job2Copy.getStandardOutput()); + + assertTrue(ListUtils.isEqualList(job2.getStagingInput(), + job2Copy.getStagingInput())); + assertTrue(ListUtils.isEqualList(job2.getStagingOutput(), + job2Copy.getStagingOutput())); + assertTrue(ListUtils.isEqualList( + job2.getStagingInputUrls().keySet(), + job2Copy.getStagingInputUrls().keySet())); + assertTrue(ListUtils.isEqualList( + job2.getStagingInputUrls().values(), + job2Copy.getStagingInputUrls().values())); + } catch (IOException e) { + fail(); + throw e; + } + } + +}