r102 - in trunk/diswork-daemon/src: main/java/org/nuiton/diswork/daemon test/java/org/nuiton/diswork/daemon
Author: bleny Date: 2010-07-13 15:13:18 +0200 (Tue, 13 Jul 2010) New Revision: 102 Url: http://nuiton.org/repositories/revision/diswork/102 Log: stats manquantes, documentation Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadConfigurationException.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadJobException.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkException.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSystemException.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/LocalFileException.java trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java 2010-07-09 11:41:12 UTC (rev 101) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java 2010-07-13 13:13:18 UTC (rev 102) @@ -154,6 +154,7 @@ private static final Log log = LogFactory.getLog(LimitedActivity.class); + /** give three load averages, update them every five minutes */ protected class LoadAverageMonitoring extends Thread { OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean(); @@ -211,7 +212,7 @@ protected DisworkConfig config; - protected long timeBeforeNextUpdate = 1000L; + protected long timeBeforeNextUpdate = 1000L; // 1 second is the minimum protected ScheduledActivity(DisworkConfig config) { this.config = config; @@ -219,11 +220,26 @@ @Override public boolean canWork() throws DisworkException { + // look in the schedule file if at least one of all the expression + // match the current date Date currentDate = new Date(); boolean result = false; + + // during the reading of the file, we will evaluate the next time + // the canWork method will change, we set it to (now + 1 hour) + // to begin and we will overwrite it if a preceding date is found Date nextChange = new Date(System.currentTimeMillis() + 60 * 60 * 1000); + + // browse all cron expressions found in the config file for (CronExpression pattern : config.getSchedule()) { + // canWork is true if current expression matches current date + // or if canWork was already true due to a previous expression result = result || pattern.isSatisfiedBy(currentDate); + + // for the current expression, look for the date when it will + // be invalid or valid and update nextChange + // Thus, nextChange will be the earliest date when we will have + // to check if canWork change Date aDate = pattern.getNextInvalidTimeAfter(currentDate); if (aDate.before(nextChange)) { nextChange = aDate; @@ -247,13 +263,13 @@ return "scheduled activity"; } } - - /** return true if a job can be run */ + /** + * @return true if a job can be run now + */ boolean canWork() throws DisworkException; - + /** - * * @return time to wait before next update, -1 is never (wait definitly) */ long timeBeforeNextUpdate(); Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadConfigurationException.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadConfigurationException.java 2010-07-09 11:41:12 UTC (rev 101) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadConfigurationException.java 2010-07-13 13:13:18 UTC (rev 102) @@ -1,5 +1,12 @@ package org.nuiton.diswork.daemon; +/** + * This exception occurs when the configuration data are not readable. It may be + * do to a mistake in config file. A config file is not found or is not + * correct + * + * @author bleny + */ public class BadConfigurationException extends DisworkException { private static final long serialVersionUID = 1L; Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadJobException.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadJobException.java 2010-07-09 11:41:12 UTC (rev 101) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadJobException.java 2010-07-13 13:13:18 UTC (rev 102) @@ -1,5 +1,12 @@ package org.nuiton.diswork.daemon; +/** + * This exception occurs when a job or his description is not valid. It may + * be the code calling DisworkDaemon as client that may be responsible for + * such an error. + * + * @author bleny + */ public class BadJobException extends DisworkException { private static final long serialVersionUID = 1L; Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java 2010-07-09 11:41:12 UTC (rev 101) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java 2010-07-13 13:13:18 UTC (rev 102) @@ -139,6 +139,7 @@ setDefaultOption("diswork.total_uptime", "0"); setDefaultOption("diswork.number_of_jobs_done", "0"); setDefaultOption("diswork.number_of_jobs_submitted", "0"); + setDefaultOption("diswork.worked_time", "0"); } @@ -162,12 +163,9 @@ /** * Read the tokens file if one is given in the config and merge the content - * of this file into {@link #tokens} - * @throws DisworkSystemException - * @throws BadConfigurationException - * @throws LocalFileException - * - * @throws DisworkException + * of this file into {@link #tokens} + * @throws BadConfigurationException if token file doesn't exists + * @throws LocalFileException if token file can't be read */ protected void initTokens() throws BadConfigurationException, LocalFileException { tokens = new HashMap<String, String>(); @@ -181,7 +179,8 @@ + " -Djava.io.tmpdir=%tmp" ; tokens.put("%java", java); - + tokens.put("%sep", File.separator); + File tokensFile = getTokensFile(); if (tokensFile != null) { try { @@ -246,8 +245,8 @@ /** * * @return null if no path for a file have been specified - * @throws BadConfigurationException - * @throws LocalFileException + * @throws BadConfigurationException if schedule file doesn't exists + * @throws LocalFileException if schedule file can't be read */ protected List<CronExpression> getSchedule() throws BadConfigurationException, LocalFileException { // lazy instanciation of schedule @@ -347,6 +346,19 @@ return getOptionAsInt("diswork.number_of_jobs_submitted"); } + protected void addWorkedTime(Long workedTime) { + Long newValue = getWorkedTime() + workedTime; + setOption("diswork.worked_time", newValue.toString()); + save(); + } + + protected Long getWorkedTime() { + Long workedTime = Long.parseLong(getOption("diswork.worked_time")); + return workedTime; + } + + + /* ** trivial applicationConfig setters and getters ** */ public String getTempDirectory() { @@ -417,7 +429,9 @@ setOption("diswork.http_front_end.port", httpFrontendPort.toString()); } - /** number of seconds to wait between two look for a jobs (seconds) */ + /** number of seconds to wait between two look for a jobs + * @return a number of seconds + */ public int getJobLooksWaitTime() { return getOptionAsInt("diswork.job_looks_wait_time"); } Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java 2010-07-09 11:41:12 UTC (rev 101) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java 2010-07-13 13:13:18 UTC (rev 102) @@ -24,17 +24,6 @@ */ package org.nuiton.diswork.daemon; -import java.io.IOException; -import java.io.InputStream; -import java.lang.management.ManagementFactory; -import java.lang.management.OperatingSystemMXBean; -import java.text.NumberFormat; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; - import org.apache.commons.io.FilenameUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; @@ -45,6 +34,12 @@ import org.nuiton.diswork.fs.DisworkFileSystemException.Type; import org.nuiton.util.FileUtil; +import java.io.*; +import java.lang.management.ManagementFactory; +import java.lang.management.OperatingSystemMXBean; +import java.text.NumberFormat; +import java.util.*; + /** * The diswork daemon is the gateway to the global diswork system. Instantiate * this class creates a new node on the system. The new node can be used @@ -87,11 +82,9 @@ * * You can create your application ready for diswork, submit it to the daemon * and then submit as much jobs as you want that depends of this application. - * Think an application as the common stuff (programs, scripts, files etc.) - * all the different job will need. An application is just a set of file. + * Think an application contains the common stuff (programs, scripts, files etc.) + * all the different jobs will need. An application is just a set of file. * - * TODO 20100617 bleny explain it better - * * Once the application has been made available, jobs can be created and * submitted. To do so, a programmer must write a class that * @@ -178,6 +171,18 @@ /** in a home directory, the place where the hardware info must be placed */ protected static final String HARDINFO_PATH = "hardinfo"; + + /** a keyword that will be put in a job log-file on a single line when the job is started */ + protected static final String LOG_KEYWORD_STARTED = "STARTED"; + + /** a keyword that will be put in a job log-file on a single line when the job is started */ + protected static final String LOG_KEYWORD_DONE = "DONE"; + + /** a keyword that will be put in a job log-file on a single line when the job is failed */ + protected static final String LOG_KEYWORD_FAILED = "FAILED"; + + /** a keyword that will be put in a job log-file on a single line when the job is failed */ + protected static final String LOG_KEYWORD_FINISHED = "FINISHED"; /** the distributed file system where jobs, data and results are stored */ @@ -212,7 +217,7 @@ initWorkersManager(); - writeHardwareInfos(); + writeLocalStats(); httpFrontEnd = new HttpFrontEnd(config, this); @@ -310,18 +315,16 @@ } } - protected void writeHardwareInfos() throws DisworkException { - // writing hardware info to homeDir + protected void writeLocalStats() throws DisworkException { try { - OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean(); - String hardinfos = os.getName() + "\n" + os.getArch() + "\n" + - config.getNumberOfWorkers(); - - // TODO 20100615 bleny add RAM size and HDD capacities to hardinfos - + String infos = ""; + Map<String, String> localStats = getLocalStats(); + for (String key : localStats.keySet()) { + infos += key + "\t" + localStats.get(key) + "\n"; + } fileSystem.write(homeDir + "/" + HARDINFO_PATH, - IOUtils.toInputStream(hardinfos)); - log.info("writing hardware infos " + hardinfos.replaceAll("\n", " ")); + IOUtils.toInputStream(infos)); + log.info("writing local infos " + infos.replaceAll("\n", " ")); } catch (DisworkFileSystemException e) { log.error("can't write hardware infos", e); throw new DisworkException("can't write hardware infos", e); @@ -532,7 +535,7 @@ + " that is not available"); } } else { - log.info("no dependency specified for " + jobDescription); + log.debug("no dependency specified for " + jobDescription); } String jobDir = getPathForJob(jobDescription); @@ -592,15 +595,15 @@ } public boolean isStarted(JobDescription job) throws DisworkException { - return checkLogContains(job, "STARTED"); + return checkLogContains(job, LOG_KEYWORD_STARTED); } public boolean isFinished(JobDescription job) throws DisworkException { - return checkLogContains(job, "FINISHED"); + return checkLogContains(job, LOG_KEYWORD_FINISHED); } public boolean isSuccessful(JobDescription job) throws DisworkException { - return checkLogContains(job, "DONE"); + return checkLogContains(job, LOG_KEYWORD_DONE); } public boolean isFailed(JobDescription job) throws DisworkException { @@ -703,6 +706,9 @@ } public Map<String, String> getLocalStats() throws DisworkException { + + // TODO 20100615 bleny add RAM size and HDD capacities to hardinfos + NumberFormat numberFormat = NumberFormat.getInstance(); numberFormat.setMaximumFractionDigits(2); @@ -712,16 +718,21 @@ result.put("jobs_done", config.getNumberOfJobsDone().toString()); result.put("jobs_submitted", config.getNumberOfJobsSubmitted().toString()); + result.put("worked_time", config.getWorkedTime().toString()); - if (config.getNumberOfJobsSubmitted() == 0) { - result.put("jobs_ratio", "∞"); - } else { + if (config.getNumberOfJobsSubmitted() != 0) { Double jobsRatio = (double) (config.getNumberOfJobsDone() / config.getNumberOfJobsSubmitted()); result.put("jobs_ratio", numberFormat.format(jobsRatio)); } Double karma = ((config.getNumberOfJobsDone() - config.getNumberOfJobsSubmitted()) * getUptimeRatio()); result.put("karma", numberFormat.format(karma)); + + OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean(); + result.put(os.getName(), "1"); + result.put(os.getArch(), "1"); + result.put("workers", config.getNumberOfWorkers().toString()); + return result; } @@ -731,8 +742,7 @@ protected Map<String, String> updateGlobalStats() throws DisworkException { try { log.info("global stats file doesn't exists, creating one"); - - Long currentTime = System.currentTimeMillis(); + Map<String, Long> stats = new HashMap<String, Long>(); Long availableProcessors = 0L; Map<String, String> result = new HashMap<String, String>(); @@ -741,27 +751,26 @@ for (String homeDir : homeDirs) { String hardInfoPath = HOME + "/" + homeDir + "/" + HARDINFO_PATH; if (fileSystem.exists(hardInfoPath)) { - String hardInfoContent = IOUtils.toString( - fileSystem.read(hardInfoPath)); - String[] infos = hardInfoContent.split("\n"); - - // first line, reading the OS name - if (!stats.containsKey(infos[0])) { - stats.put(infos[0], 0L); + BufferedReader in = new BufferedReader( + new InputStreamReader( + fileSystem.read(hardInfoPath))); + String line; + while ((line = in.readLine()) != null) { + if (!line.equals("")) { + log.debug("reading line" + line); + String[] keyValue = line.split("\t"); + String key = keyValue[0]; + Long value = Long.parseLong(keyValue[1]); + if (!stats.containsKey(key)) { + stats.put(key, 0L); + } + stats.put(key, stats.get(key) + value); + } } - stats.put(infos[0], stats.get(infos[0]) + 1); - - // second line, reading the architecture - if (!stats.containsKey(infos[1])) { - stats.put(infos[1], 0L); - } - stats.put(infos[1], stats.get(infos[1]) + 1); - - // third line, reading the number of processors - availableProcessors += Integer.parseInt(infos[2]); } } - stats.put("available_processors", availableProcessors); + + Long currentTime = System.currentTimeMillis(); stats.put("date", currentTime); // write the result @@ -800,12 +809,16 @@ Map<String, String> result = new HashMap<String, String>(); if (fileSystem.exists(GLOBAL_STATS_PATH)) { - String globalStats = IOUtils.toString(fileSystem.read(GLOBAL_STATS_PATH)); - log.debug("global stats file found, reading " + globalStats); - String[] lines = globalStats.split("\n"); - for (String line : lines) { - String[] keyValue = line.split("\t"); - result.put(keyValue[0], keyValue[1]); + log.debug("global stats file found, reading"); + BufferedReader globalStats = new BufferedReader( + new InputStreamReader( + fileSystem.read(GLOBAL_STATS_PATH))); + String line; + while ((line = globalStats.readLine()) != null) { + if (!line.equals("")) { + String[] keyValue = line.split("\t"); + result.put(keyValue[0], keyValue[1]); + } } // delete the file if it's too old @@ -831,7 +844,7 @@ public void activeNoActivityStrategy() throws DisworkException { if (workers == null) { - log.warn("trying to change activy while working is disabled"); + log.warn("trying to change activity while working is disabled"); } else { workers.activeNoActivityStrategy(); } @@ -839,7 +852,7 @@ public void activeUnlimitedActivityStrategy() throws DisworkException { if (workers == null) { - log.warn("trying to change activy while working is disabled"); + log.warn("trying to change activity while working is disabled"); } else { workers.activeUnlimitedActivityStrategy(); } @@ -847,7 +860,7 @@ public void activeLimitedActivityStrategy() throws DisworkException { if (workers == null) { - log.warn("trying to change activy while working is disabled"); + log.warn("trying to change activity while working is disabled"); } else { workers.activeLimitedActivityStrategy(); } @@ -855,7 +868,7 @@ public void activeScheduledActivityStrategy() throws DisworkException { if (workers == null) { - log.warn("trying to change activy while working is disabled"); + log.warn("trying to change activity while working is disabled"); } else { workers.activeScheduledActivityStrategy(); } Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java 2010-07-09 11:41:12 UTC (rev 101) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java 2010-07-13 13:13:18 UTC (rev 102) @@ -36,8 +36,20 @@ * * This class can be used to run a diswork daemon as a service on the OS. * Thus, it will be started when the machine boot and stopped just before the - * computer is shut down. + * computer is shut down. To install diswork as a daemon, see commons-daemon + * documentation. * + * Usage is + * + * <code> + * java DisworkDaemonRunner [bootstrap_ip bootstrap_port] + * </code> + * + * if no parameter provided, diswork will start and boot alone. If ip and port + * are provided, diswork will connect to this other node. + * + * @link http://commons.apache.org/daemon/ + * * @author bleny */ public class DisworkDaemonRunner implements Daemon { Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkException.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkException.java 2010-07-09 11:41:12 UTC (rev 101) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkException.java 2010-07-13 13:13:18 UTC (rev 102) @@ -25,6 +25,7 @@ package org.nuiton.diswork.daemon; /** + * Parent class for all exceptions diswork can raise. * * @author bleny */ Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSystemException.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSystemException.java 2010-07-09 11:41:12 UTC (rev 101) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSystemException.java 2010-07-13 13:13:18 UTC (rev 102) @@ -1,5 +1,12 @@ package org.nuiton.diswork.daemon; +/** + * This exception is raised when an error occurred in the inner structure of + * diswork. It may be due to a network failure, or to corrupted data into + * the File System. + * + * @author bleny + */ public class DisworkSystemException extends DisworkException { private static final long serialVersionUID = 1L; Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java 2010-07-09 11:41:12 UTC (rev 101) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java 2010-07-13 13:13:18 UTC (rev 102) @@ -15,14 +15,25 @@ import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.ServletHolder; +/** + * The HttpFrontEnd boot an embedded http server on same machine of the daemon. + * + * A user can open a web-browser to read the single page that is available. + * This page shows statistics and status of the different daemon components. + * + * @author bleny + */ public class HttpFrontEnd { private static final Log log = LogFactory.getLog(HttpFrontEnd.class); + /** config provides directive about the http server */ protected DisworkConfig config; + /** embedded http server */ protected Server server; + /** daemon will permit to retrieve stats */ protected DisworkDaemon daemon; public HttpFrontEnd(DisworkConfig config, DisworkDaemon daemon) @@ -35,13 +46,15 @@ } } + /** lazy instanciation of {@link #server} */ protected void initServer() { log.info("web server use port " + config.getHttpFrontendPort()); server = new Server(config.getHttpFrontendPort()); Context root = new Context(server, "/", Context.NO_SESSIONS); root.addServlet(new ServletHolder(new MainServlet()), "/"); } - + + /** start the server, the page will no longer be available for web-browser */ public void start() throws DisworkException { if (server == null) { initServer(); @@ -55,6 +68,7 @@ } } + /** stop the server, the page will no longer be available for web-browser */ public void stop() throws DisworkException { log.info("stopping web front-end"); try { @@ -67,6 +81,7 @@ } } + /** this servlet send a static html page that shows current status and stats */ public class MainServlet extends HttpServlet { private static final long serialVersionUID = 1L; @@ -76,7 +91,8 @@ log.info("page request"); - String pageContent = "<html>\n" + String pageContent = "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.01 Transitional//EN\">\n" + + "<html>\n" + "<h1>Diswork Node</h1>" + "\n\n" + "<h2>Submitted jobs</h2>" + "\n\n" + "<table>" + "\n" Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java 2010-07-09 11:41:12 UTC (rev 101) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java 2010-07-13 13:13:18 UTC (rev 102) @@ -57,6 +57,9 @@ * <dt>%tmp</dt> * <dd>will be replaced by the the path to dir where the job * is executed, it's nice to use this to set a temp directory</dd> + * <dt>%sep</dt> + * <dd>will be replaced by the file separator (ie "/" under Linux, + * "\" under Windows)</dd> * </dl> * * This class provides methods to read and parse those data to an XML file @@ -133,6 +136,19 @@ return commandLine; } + /** set the command line to run for this job + * command line is intended as a normal shell command line like + * + * program arg1 arg2 arg3 + * + * At all place of the command-line, tokens may be used. They will be + * replaced at runtime. Tokens, like "%java" or "%tmp" are described + * in class documentation. The worker will parse the command-line to + * replace token with something suitable to his environment. + * + * @link {@link DisworkConfig#parseCommandLine(String, String)} + * @param commandLine a string that may contain tokens + */ public void setCommandLine(String commandLine) { this.commandLine = commandLine; } Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/LocalFileException.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/LocalFileException.java 2010-07-09 11:41:12 UTC (rev 101) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/LocalFileException.java 2010-07-13 13:13:18 UTC (rev 102) @@ -1,5 +1,12 @@ package org.nuiton.diswork.daemon; +/** + * A raise of this exception is due to an error on the local file system. + * Maybe it's not writable (rights ? not enough space ?) or readable by + * the daemon. + * + * @author bleny + */ public class LocalFileException extends DisworkException { private static final long serialVersionUID = 1L; Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java =================================================================== --- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-07-09 11:41:12 UTC (rev 101) +++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-07-13 13:13:18 UTC (rev 102) @@ -24,24 +24,6 @@ */ package org.nuiton.diswork.daemon; -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.net.URL; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import org.apache.commons.io.FilenameUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; @@ -53,6 +35,10 @@ import org.nuiton.util.StringUtil; import org.nuiton.util.ZipUtil; +import java.io.*; +import java.net.URL; +import java.util.*; + /** * The workers-manager aims to run and manage the different workers. A worker * is a thread that try to find a jobs and execute them. @@ -120,9 +106,12 @@ protected final Object sem = new Object(); protected void updateFlag() throws DisworkException { - synchronized (sem) { - flag = activityStrategy.canWork(); - sem.notifyAll(); + flag = activityStrategy.canWork(); + if (flag) { + synchronized (sem) { + // workers can work, wake up sleeping workers + sem.notifyAll(); + } } } @@ -143,11 +132,48 @@ * to try it and move it the same may. If the job is failed three times, * it's moved to {@link DisworkDaemon#FAILED_3} meaning it will not been * tried again. + * + * Here is what a worker do all life long : + * <ol> + * <li>First, look at flag value (using {@link #getFlag()}), if true, try + * to find a job, if false, sleep until the flag change</li> + * <li>Try to find a job ({@link #findAJob()}), if no job found, wait few + * seconds and restart from the beginning.</li> + * <li>If a job is found, mark it as started + * ({@link #jobIsStarted(String)})</li> + * <li>create a temp directory specific for the job</li> + * <li>download the application for the job + * ({@link #downloadApplication()}) and unzip it in the job dir. + * Download way use a cache to prevent downloading multiple time the + * same application (cache is in {@link #getApplicationData})</li> + * <li>stage input files : get all input files needed for the job from + * diswork FS or from the provided URL. Download everything to the job + * dir ({@link #stageInputFiles()})</li> + * <li>do a last check of {@link #shouldStop} before running the job, if + * shouldStop is true, give up the job using + * {@link #jobIsInterrupted(String)}</li> + * <li>if shouldStop is false, run the process + * ({@link #prepareAndRunJob()}). It implies to start a Thread that + * constantly read the output of the sub process + * ({@link Worker.OutputReader}) and to put data in the input file on + * the standard input of the process</li> + * <li>wait for the process to end. It may be interrupted due to a query + * to the worker manager to stop all worker. If the process is + * interrupted is, mark it as interrupted to put it back to the + * proposed jobs ({@link #jobIsInterrupted(String)})</li> + * <li>When the process ends, whatever the exitValue, upload the results + * ({@link #stageOutputFiles}}.</li> + * <li>look at the exit value. If the job is a success, mark it + * ({@link #jobIsSuccessful(String)}, if it failed, mark it so it will + * be proposed to others nodes ({@link #jobIsFailed(String)}</li> + * </ol> * * @author bleny */ protected class Worker extends Thread { + private final Log log = LogFactory.getLog(Worker.class); + // TODO 20100614 bleny make it configurable /** after this time (ms), a job is considered as no longer running */ protected static final long MAX_JOB_RUNNING_TIME = 24 * 60 * 60 * 1000; @@ -167,6 +193,9 @@ /** current process, null if nothing is running */ protected Process currentProcess; + /** the date when the currentProcess started to be executed */ + protected Long currentProcessStartDate; + /** read the standard output of the subprocess * * By reading the standard output, this thread has multiple goals : @@ -208,10 +237,10 @@ outputStreamWriter = new OutputStreamWriter(outputFile); wr = new BufferedWriter(outputStreamWriter); } - - String line = null; + try { // read a line from standard output + String line; while ((line = br.readLine()) != null) { // copy this line to output file if (wr != null) { @@ -219,8 +248,8 @@ } // add reading line to logging output - if (log.isInfoEnabled()) { - log.debug(this.toString() + ">" + line); + if (log.isTraceEnabled()) { + log.trace(this.toString() + ">" + line); } } } catch (IOException e) { @@ -244,10 +273,9 @@ /** * this method add a line to a job-specific log * @param jobPath the path to the job concerned - * @param message the line to add the the log - * @throws DisworkFileSystemException if an error occurred while writing - * the log - * @throws IOException + * @param messages the line(s) to add to the log + * @throws DisworkSystemException if an error occurred while writing + * the log */ protected void log(String jobPath, String... messages) throws DisworkSystemException { @@ -348,8 +376,6 @@ log.error("unable to run process for job" + currentJob, e); throw new LocalFileException("unable to run process for job" + currentJob, e); } - - // catch (Throwable processError) {} // start a thread to constantly read on the standard output String standardOutputFileName = currentJob.getStandardOutput(); @@ -435,16 +461,14 @@ * the job, wait for it to end, write all the results. Mark the job * as running at the beginning and move it to DONE or FAILED at * the end, depending of the results - * @param currentJobPath - * @return - * @throws BadJobException - * @throws DisworkFileSystemException - * @throws IOException * @throws DisworkException */ protected void runJob() throws DisworkException { try { + + currentProcessStartDate = System.currentTimeMillis(); log.info("running job at " + currentJobPath); + try { String jsdlPath = currentJobPath + "/" + DisworkDaemon.JSDL_PATH; String jsdl = IOUtils.toString(fileSystem.read(jsdlPath)); @@ -478,51 +502,32 @@ // stop, last check of shouldStrop if (!shouldStop) { prepareAndRunJob(); - - // wait for the process to return + jobIsStarted(currentJobPath); - boolean processFinished = false; - while (!processFinished) { - Integer exitValue; // exitValue if the process is - // finished, null if the process - // is not finished - try { - exitValue = currentProcess.exitValue(); - processFinished = true; - } catch (IllegalThreadStateException e) { - // process is not finished - exitValue = null; - } - - if (exitValue == null) { - if (shouldStop) { - jobIsInterrupted(currentJobPath); - processFinished = true; - } else { - try { - Thread.sleep(10 * 1000); - } catch (InterruptedException e) { - log.error("worker interupted", e); - throw new DisworkException("worker interupted", e); - } - } + + try { + // wait for the process to return + int returnValue = currentProcess.waitFor(); + stageOutputFiles(); + if (returnValue == 0) { + // job is successful + jobIsSuccessful(currentJobPath); } else { - stageOutputFiles(); - // job is finished - if (exitValue == 0) { - // job is successful - jobIsSuccessful(currentJobPath); - } else { - jobIsFailed(currentJobPath); - } + jobIsFailed(currentJobPath); } + } catch (InterruptedException e) { + log.debug("process was interrupted", e); + jobIsInterrupted(currentJobPath); } - } else { - jobIsInterrupted(currentJobPath); } } catch (BadJobException e) { jobIsFailed(currentJobPath); } finally { + if (currentProcessStartDate != null) { + Long currentTime = System.currentTimeMillis(); + config.addWorkedTime(currentTime - currentProcessStartDate); + currentProcessStartDate = null; + } currentJob = null; currentProcess = null; // clean up the job directory @@ -546,96 +551,128 @@ return jobsNames.get(0); } } - + /** - * try to find a job, if found, take it and return the path - * @return the path to the job, null if no job found - * @throws DisworkSystemException + * browse all running directories. If a jobs is too old, it is considered + * as interrupted and moved back. All obsolete jobs found are moved to + * be available again. + * @return true if an obsolete job has been found + * @throws DisworkSystemException */ - protected String findAJob() throws DisworkSystemException { - try { - // Once a job is found, those two var will be set - String jobLinkDir = null; - String jobLinkName = null; - - // use a synchronized block because multiple workers - // may try to take a same job - synchronized (fileSystem) { - - // fist, try to find a job declared has running since - // too long to re-run it - String[] runningJobsDirs = { DisworkDaemon.FAILED_2_RUNNING, - DisworkDaemon.FAILED_1_RUNNING, - DisworkDaemon.TODO_RUNNING - }; - // browsing all "running" dirs - for (String path : runningJobsDirs) { - String oldName = getFistJobName(path); - if (oldName != null) { + protected boolean checkInteruptedJobs() throws DisworkSystemException { + // use a synchronized block because multiple workers + // may try to do concurrent move + boolean result = false; + synchronized (fileSystem) { + String[] runningJobsDirs = { DisworkDaemon.FAILED_2_RUNNING, + DisworkDaemon.FAILED_1_RUNNING, + DisworkDaemon.TODO_RUNNING + }; + for (String path : runningJobsDirs) { + try { + List<String> jobsNames = fileSystem.readDirectory(path); + Collections.sort(jobsNames); + Iterator<String> it = jobsNames.iterator(); + boolean obsoleteJobFound = true; + while (obsoleteJobFound && it.hasNext()) { + String jobName = it.next(); Long linkAge = System.currentTimeMillis() - - Long.parseLong(oldName); + - Long.parseLong(jobName); // check is oldest job is too old and should be // considered has to-be-rerun if (linkAge > MAX_JOB_RUNNING_TIME) { - log.info("taking old job (age = " + linkAge + ")"); - jobLinkDir = path; - jobLinkName = oldName; - // FIXME 20100617 bleny break s*cks - break; + String jobPath = path + "/" + jobName; + String newJobPath = INTERRUPTED_MOVE.get(path) + "/" + jobName; + try { + fileSystem.move(jobPath, newJobPath); + result = true; + } catch (DisworkFileSystemException e) { + log.debug("failed at moving" + jobPath); + // ignore, another node is moving it + // FIXME 20100712 bleny catch the exact exception + } + } else { + obsoleteJobFound = false; } } + } catch (DisworkFileSystemException e) { + log.warn("unable to read jobs directory", e); + throw new DisworkSystemException("unable to read jobs directory", e); } - - // if no job was found, search now in not running jobs - if (jobLinkDir == null) { - String[] jobsDirs = { DisworkDaemon.FAILED_2, - DisworkDaemon.FAILED_1, - DisworkDaemon.TODO - }; - for (String path : jobsDirs) { - String oldName = getFistJobName(path); - if (oldName != null) { // take it - jobLinkDir = path; - jobLinkName = oldName; - // FIXME 20100617 bleny break s*cks - break; - } - } + } + } + return result; + } + + protected String findAJobInDirectory(String dirPath) throws DisworkSystemException { + // use a synchronized block because multiple workers + // may try to take a same job + synchronized (fileSystem) { + List<String> jobsNames = null; + try { + jobsNames = fileSystem.readDirectory(dirPath); + Collections.sort(jobsNames); + } catch (DisworkFileSystemException e) { + log.warn("unable to read jobs directory", e); + throw new DisworkSystemException("unable to read jobs directory", e); + } + log.debug(jobsNames.size() + " jobs found at " + dirPath); + String result = null; + Iterator<String> it = jobsNames.iterator(); + while (result == null && it.hasNext()) { + String jobPath = dirPath + "/" + it.next(); + String newJobPath = RUNNING_MOVE.get(dirPath) + "/" + DisworkDaemon.newJobLinkName(); + try { + log.debug("job found at " + jobPath + ". moving it to " + newJobPath); + fileSystem.move(jobPath, newJobPath); + result = newJobPath; + } catch (DisworkFileSystemException e) { + log.debug("failed at moving" + jobPath); + // ignore, another node taking it + // FIXME 20100712 bleny catch the exact exception } - - if (jobLinkDir != null) { - // move the link before running the job - String oldPath = jobLinkDir + "/" + jobLinkName; - log.info("job found at " + oldPath); - - jobLinkDir = RUNNING_MOVE.get(jobLinkDir); - jobLinkName = DisworkDaemon.newJobLinkName(); - String newPath = jobLinkDir + "/" + jobLinkName; - - log.info("moving " + oldPath + " to " + newPath); - fileSystem.move(oldPath, newPath); - - } } + return result; + } + } + + /** + * try to find a job, if found, take it and return the path + * @return the path to the job, null if no job found + * @throws DisworkSystemException + */ + protected String findAJob() throws DisworkSystemException { + + List<String> jobsDirs = new ArrayList<String>(); + jobsDirs.add(DisworkDaemon.FAILED_2); + jobsDirs.add(DisworkDaemon.FAILED_1); + jobsDirs.add(DisworkDaemon.TODO); + + Iterator<String> it = jobsDirs.iterator(); + String result = null; + while (result == null && it.hasNext()) { + String jobDir = it.next(); + result = findAJobInDirectory(jobDir); + } - String jobPath = null; + if (result == null) { // now, if no job was found - if (jobLinkDir != null) { - jobPath = jobLinkDir + "/" + jobLinkName; + boolean checkResult = checkInteruptedJobs(); + if (checkResult) { + // obsolete jobs are now available, retry + return findAJob(); } else { + // try again later try { - log.info("look for a job was unsucessful, will wait " + config.getJobLooksWaitTime() + " seconds before next try"); + log.info("look for a job was unsuccessful, will wait " + config.getJobLooksWaitTime() + " seconds before next try"); Thread.sleep(config.getJobLooksWaitTime() * 1000); } catch (InterruptedException e) { log.error("worker interrupted while waiting before trying to find a new job", e); throw new DisworkSystemException("worker interrupted while waiting before trying to find a new job", e); } } - return jobPath; - } catch (DisworkFileSystemException e) { - log.error("error while trying to find a job", e); - throw new DisworkSystemException("error while trying to find a job", e); } + return result; } /** update the log of the job @@ -652,7 +689,8 @@ throw new DisworkSystemException("error while moving job link", e); } log.info("marking " + newJobPath + " as done and finished"); - log(newJobPath, "DONE", "FINISHED"); + log(newJobPath, DisworkDaemon.LOG_KEYWORD_DONE, + DisworkDaemon.LOG_KEYWORD_FINISHED); config.addOneJobDone(); } @@ -675,10 +713,11 @@ if (newDir.equals(DisworkDaemon.FAILED_3)) { log.info("marking " + newJobPath + " as failed and finished"); - log(newJobPath, "FAILED", "FINISHED"); + log(newJobPath, DisworkDaemon.LOG_KEYWORD_FAILED, + DisworkDaemon.LOG_KEYWORD_FINISHED); } else { log.info("marking " + newJobPath + " as failed"); - log(newJobPath, "FAILED"); + log(newJobPath, DisworkDaemon.LOG_KEYWORD_FAILED); } } @@ -703,7 +742,7 @@ * @link {@link DisworkDaemon#isStarted(JobDescription)} */ protected void jobIsStarted(String jobPath) throws DisworkSystemException { - log(jobPath, "STARTED"); + log(jobPath, DisworkDaemon.LOG_KEYWORD_STARTED); } /** @@ -711,8 +750,9 @@ */ @Override public void run() { - try { - while (! shouldStop) { + // we want the worker to continue working whatever occurs + while (! shouldStop) { + try { synchronized (sem) { if (getFlag()) { currentJobPath = findAJob(); @@ -731,14 +771,15 @@ sem.wait(waitTime); } } catch (InterruptedException e) { - log.error("interrupted while waiting for a change of activity", e); - throw new DisworkSystemException("interrupted while waiting for a change of activity", e); + log.warn("interrupted while waiting for a change of activity", e); + // throw new DisworkSystemException("interrupted while waiting for a change of activity", e); } } } + } catch (DisworkException e) { + log.warn("exception caught by worker", e); + // throw new RuntimeException("an error occured", e); } - } catch (DisworkException e) { - throw new RuntimeException("an error occured", e); } } @@ -763,7 +804,7 @@ log.info("will start " + config.getNumberOfWorkers() + " workers"); for (int i = 1 ; i <= config.getNumberOfWorkers() ; i++) { Worker worker = new Worker(); - worker.setName("disworker-" + i); + // worker.setName("disworker-" + i); worker.start(); workers.add(worker); } @@ -775,6 +816,9 @@ protected File getApplicationData(String applicationName, String applicationVersion) throws DisworkSystemException, LocalFileException { + if (!applicationCache.exists()) { + applicationCache.mkdirs(); + } File cachedApplicationData = new File(applicationCache, applicationName + "-" + applicationVersion + ".zip"); if (!cachedApplicationData.exists()) { @@ -821,15 +865,26 @@ } activeNoActivityStrategy(); - - FileUtil.deleteRecursively(applicationCache); - + + for (Worker worker : workers) { + if (worker.currentProcess != null) { + log.debug("killing " + worker + " process"); + worker.currentProcess.destroy(); + } + } + + if (applicationCache.exists()) { + FileUtil.deleteRecursively(applicationCache); + } + // waiting for them to actually have finished for (Worker worker : workers) { while (worker.isAlive()) { try { // worker may be sleeping - activeNoActivityStrategy(); + synchronized (sem) { + sem.notifyAll(); + } log.debug("waiting for " + worker + " to return"); Thread.sleep(1000); } catch (InterruptedException e) { @@ -838,6 +893,8 @@ } } } + + log.debug("all workers stopped"); } public ActivityStrategy getActivityStrategy() { @@ -847,7 +904,7 @@ public void setActivityStrategy(ActivityStrategy activityStrategy) throws DisworkException { this.activityStrategy = activityStrategy; - log.info("swithching to " + activityStrategy); + log.info("switching to " + activityStrategy); updateFlag(); } Modified: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java =================================================================== --- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java 2010-07-09 11:41:12 UTC (rev 101) +++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java 2010-07-13 13:13:18 UTC (rev 102) @@ -1,6 +1,15 @@ package org.nuiton.diswork.daemon; - +/** + * This test is the same as {@link DisworkDaemonTest} except that + * the single node has more than one worker. + * + * This tests shows that having multiple workers should not raise + * any problem like throwing concurrent exceptions, illegal thread + * state exception etc. + * + * @author bleny + */ public class DisworkDaemonConcurrencyTest extends DisworkDaemonTest { @Override Modified: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java =================================================================== --- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java 2010-07-09 11:41:12 UTC (rev 101) +++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java 2010-07-13 13:13:18 UTC (rev 102) @@ -2,13 +2,21 @@ import org.nuiton.diswork.fs.DisworkFileSystemConfig; - +/** + * This test is the same as {@link DisworkDaemonTest} except that + * there is no one but two nodes. One node has no workers and submit + * the jobs test after tests, the other node is configured to do + * the jobs. + * + * @author bleny + */ public class DisworkDaemonMultipleNodesTest extends DisworkDaemonTest { @Override protected void setConfigs() { config = newConfig(); - config.setActivityStrategy("none"); + config.setNumberOfWorkers(0); + // config.setActivityStrategy("none"); config2 = newConfig(); config2.setBootstrapIp(DisworkFileSystemConfig.getIp()); Modified: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java =================================================================== --- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java 2010-07-09 11:41:12 UTC (rev 101) +++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java 2010-07-13 13:13:18 UTC (rev 102) @@ -1,17 +1,17 @@ package org.nuiton.diswork.daemon; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import org.apache.commons.io.IOUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; import java.io.InputStream; import java.net.URL; import java.util.List; import java.util.Map; -import org.apache.commons.io.IOUtils; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class DisworkDaemonTest { @@ -21,14 +21,14 @@ protected DisworkDaemon daemon; protected DisworkDaemon daemon2; - protected static int port = 45500; + protected static int port = 3200; /** a factory method to ease the creation of configs */ protected DisworkConfig newConfig() { DisworkConfig config = new DisworkConfig(); port += 1; config.setUsedPort(port); - config.setJobLooksWaitTime(1); + config.setJobLooksWaitTime(5); // useless in tests config.setStartHttpFrontend(false); @@ -55,10 +55,14 @@ } @After - public void tearDown() throws Exception { - daemon.close(); - if (daemon2 != null) { - daemon2.close(); + public void tearDown() { + try { + daemon.close(); + if (daemon2 != null) { + daemon2.close(); + } + } catch (Exception e) { + // close raise errors due to DHT doesn't manage peer leaving } } @@ -171,23 +175,19 @@ /** * tests the stats given by the daemon - * @throws Exception */ @Test public void testStats() throws Exception { - daemon.getUptimeRatio(); + Map<String, String> stats = daemon.getLocalStats(); - Map<String, String> stats = daemon.getGlobalStats(); - // deamon should read 3 stats : 1 OS, 1 architecture, the number - // of processors and the date when the stats was computed - assertEquals(4, stats.size()); + // deamon should read 10 stats + stats = daemon.getGlobalStats(); + assertEquals(10, stats.size()); // a second read should return the same data, without re generate them // check logs stats = daemon.getGlobalStats(); - // deamon should read 4 stats : 1 OS, 1 architecture and the number - // of processors - assertEquals(4, stats.size()); + assertEquals(10, stats.size()); } @Test
participants (1)
-
bleny@users.nuiton.org