Diswork-commits
Threads by month
- ----- 2026 -----
- June
- May
- April
- March
- February
- January
- ----- 2025 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2024 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2023 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2022 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2021 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2020 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2019 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2018 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2017 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2016 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2015 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2014 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2013 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2012 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2011 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2010 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
July 2010
- 2 participants
- 25 discussions
r107 - trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon
by bleny@users.nuiton.org 22 Jul '10
by bleny@users.nuiton.org 22 Jul '10
22 Jul '10
Author: bleny
Date: 2010-07-22 14:45:54 +0200 (Thu, 22 Jul 2010)
New Revision: 107
Url: http://nuiton.org/repositories/revision/diswork/107
Log:
bug fix
Modified:
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java 2010-07-22 10:22:03 UTC (rev 106)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java 2010-07-22 12:45:54 UTC (rev 107)
@@ -670,25 +670,25 @@
throw new NullPointerException("job is null");
}
- if (isFinished(job)) {
- Map<String, InputStream> results = new HashMap<String, InputStream>();
- for (String fileName : job.getOutput()) {
- String jobPath = getPathForJob(job);
- try {
+ if (!isFinished(job)) {
+ throw new DisworkException("can't get results for unfinished job "
+ + job);
+ }
+
+ Map<String, InputStream> results = new HashMap<String, InputStream>();
+ for (String fileName : job.getOutput()) {
+ String jobPath = getPathForJob(job);
+ try {
InputStream result = fileSystem.read(jobPath + "/" + fileName);
results.put(fileName, result);
- } catch (DisworkFileSystemException e) {
- log.error("file system error ", e);
- throw new DisworkException("file system error ", e);
- } catch (FileNotFoundException e) {
- log.warn("expected output file was not found : " + fileName, e);
- }
+ } catch (DisworkFileSystemException e) {
+ log.error("file system error ", e);
+ throw new DisworkException("file system error ", e);
+ } catch (FileNotFoundException e) {
+ log.warn("expected output file was not found : " + fileName, e);
}
- return results;
- } else {
- throw new DisworkException("can't get results for unfinished job "
- + job);
- }
+ }
+ return results;
}
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-07-22 10:22:03 UTC (rev 106)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-07-22 12:45:54 UTC (rev 107)
@@ -48,7 +48,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuiton.diswork.daemon.ActivityStrategy.ActivityStrategies;
-import org.nuiton.diswork.daemon.WorkersManager.Worker;
import org.nuiton.diswork.fs.DisworkFileSystem;
import org.nuiton.diswork.fs.DisworkFileSystemException;
import org.nuiton.util.FileUtil;
@@ -269,9 +268,10 @@
}
}
} catch (IOException e) {
- // FIXME 20100701 bleny throw exception
+ // may occur if process is destroyed
log.warn("error while reading the output of the subprocess", e);
} finally {
+ // close file to make it contain as much data as possible
try {
if (wr != null) {
wr.close();
@@ -577,27 +577,26 @@
int returnValue = currentProcess.waitFor();
log.info("process ended");
- // process returned and was not interrupted, output files
- // are interesting in both successful and failure case
- // upload them
- stageOutputFiles();
-
- // now check is the process ended as a success or a failure
- // and act accordingly
- if (returnValue == 0) {
- // job is successful
- jobIsSuccessful(currentJobPath);
+ if (shouldStop) {
+ // process has returned because stop() called Process.destroy()
+ // so it's not job's fault and should not be considered has a failure
+ jobIsInterrupted(currentJobPath);
} else {
- // check if it's due to a destroy call
- if (shouldStop) {
- // process has returned because stop() called Process.destroy()
- // so it's not job's fault and should not be considered has a failure
- jobIsInterrupted(currentJobPath);
+
+ // process returned and was not interrupted, output files
+ // are interesting in both successful and failure case
+ // upload them
+ stageOutputFiles();
+
+ if (returnValue == 0) {
+ // job is successful
+ jobIsSuccessful(currentJobPath);
} else {
// job is a failure, the process returned an error
jobIsFailed(currentJobPath);
}
}
+
} catch (InterruptedException e) {
// job was interrupted maybe by a call to WokersManager#stop()
log.debug("process was interrupted", e);
@@ -820,7 +819,7 @@
String newJobPath = newDir + "/" + DisworkDaemon.newJobLinkName();
try {
- // FIXME 20100720 bleny really useful to move to FAILED_3, a dir never read by anyone
+ // FIXME 20100720 bleny really useful to move to FAILED_3 ? a dir never read by anyone
fileSystem.move(jobPath, newJobPath);
log.info("moved " + jobPath + " to " + newJobPath);
} catch (DisworkFileSystemException e) {
@@ -952,14 +951,25 @@
throws DisworkException {
// method is synchronized to prevent multiple workers to download
// the same application at the same time
+
+ // a file on local File system is a copy of this application
if (!applicationCache.exists()) {
applicationCache.mkdirs();
}
File cachedApplicationData = new File(applicationCache,
applicationName + "-" + applicationVersion + ".zip");
- if (!cachedApplicationData.exists()) {
+
+ if (cachedApplicationData.exists()) {
+ // file is already available locally, it has been downloaded before
+ // cache worked
+ log.debug("cache matches for " + applicationName + "-" + applicationVersion);
+ } else {
+ // cache doesn't contains a copy of this application, creating one
+ // by downloading the application from diswork FS
+
log.debug("cache fail for " + applicationName + "-" + applicationVersion);
synchronized (cachedApplicationData) {
+ // getting application data from diswork File System
String applicationPath = DisworkDaemon.getPathForDependency(
applicationName, applicationVersion);
InputStream applicationData = null;
@@ -974,7 +984,8 @@
} finally {
IOUtils.closeQuietly(applicationData);
}
-
+
+ // writing data to local file system
OutputStream out = null;
try {
cachedApplicationData.createNewFile();
@@ -990,8 +1001,6 @@
IOUtils.closeQuietly(out);
}
}
- } else {
- log.debug("cache matches for " + applicationName + "-" + applicationVersion);
}
return cachedApplicationData;
}
@@ -1055,8 +1064,11 @@
worker.shouldStop = true;
}
+ // workers should not take new jobs
activeNoActivityStrategy();
+ // killing all the processes, workers waiting for the end of the process
+ // will wake up
for (Worker worker : workers) {
if (worker.currentProcess != null) {
log.debug("killing " + worker + " process");
@@ -1064,19 +1076,21 @@
}
}
+ // empty the application-cache
if (applicationCache.exists()) {
FileUtil.deleteRecursively(applicationCache);
}
- // waiting for them to actually have finished
+ // waiting for all the workers to actually have finished
for (Worker worker : workers) {
while (worker.isAlive()) {
+ log.debug("waiting for " + worker + " to return");
+ // worker may be sleeping, wake it up
+ synchronized (sem) {
+ sem.notifyAll();
+ }
+
try {
- // worker may be sleeping
- synchronized (sem) {
- sem.notifyAll();
- }
- log.debug("waiting for " + worker + " to return");
Thread.sleep(1000);
} catch (InterruptedException e) {
log.warn("interrupted while waiting for a worker to " +
1
0
22 Jul '10
Author: bleny
Date: 2010-07-22 12:22:03 +0200 (Thu, 22 Jul 2010)
New Revision: 106
Url: http://nuiton.org/repositories/revision/diswork/106
Log:
bugfix quand ip bootstrap non renseign?\195?\169e
Modified:
trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java
Modified: trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java 2010-07-22 08:22:04 UTC (rev 105)
+++ trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java 2010-07-22 10:22:03 UTC (rev 106)
@@ -137,14 +137,19 @@
setOption("diswork.fs.use_port", port.toString());
}
+ /**
+ *
+ * @return null if none specified
+ */
public String getBootstrapIp() {
- return getOption("diswork.fs.bootstrap.ip");
+ String value = getOption("diswork.fs.bootstrap.ip");
+ return "".equals(value) ? null : value;
}
public void setBootstrapIp(String ip) {
setOption("diswork.fs.bootstrap.ip", ip);
}
-
+
public Integer getBootstrapPort() {
return getOptionAsInt("diswork.fs.bootstrap.port");
}
1
0
Author: bleny
Date: 2010-07-22 10:22:04 +0200 (Thu, 22 Jul 2010)
New Revision: 105
Url: http://nuiton.org/repositories/revision/diswork/105
Log:
bugfix, refactor config FS
Modified:
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSimpleClient.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java
trunk/diswork-daemon/src/main/resources/log4j.properties
trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java
trunk/diswork-daemon/src/test/resources/log4j.properties
trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/Demo.java
trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystem.java
trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java
trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemException.java
trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/peerunit/DisworkFileSystemTest.java
trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/Storage.java
trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/package-info.java
trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemInMemoryTest.java
trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemKademliaTest.java
trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemPastryTest.java
trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/KademliaDisworkMapTest.java
trunk/diswork-fs/src/test/resources/log4j.properties
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java 2010-07-21 09:05:57 UTC (rev 104)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java 2010-07-22 08:22:04 UTC (rev 105)
@@ -129,7 +129,7 @@
setOption("diswork.http_front_end.start", "true");
setOption("diswork.http_front_end.port", "8080");
- setFileSystemConfig(DisworkFileSystemConfig.newKademliaDisworkConfig());
+ setFileSystemConfig(new DisworkFileSystemConfig());
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java 2010-07-21 09:05:57 UTC (rev 104)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java 2010-07-22 08:22:04 UTC (rev 105)
@@ -417,7 +417,17 @@
public void submitApplication(String applicationName,
String applicationVersion, InputStream applicationData)
throws DisworkException {
-
+
+ if (applicationName == null) {
+ throw new NullPointerException("application name is null");
+ }
+ if (applicationVersion == null) {
+ throw new NullPointerException("application version can't be null");
+ }
+ if (applicationData == null) {
+ throw new NullPointerException("application data can't be null");
+ }
+
// the place where dependency will be stored
String path = getPathForDependency(applicationName, applicationVersion);
@@ -491,6 +501,10 @@
* @throws DisworkException
*/
public void deleteJob(JobDescription jobDescription) throws DisworkException {
+ if (jobDescription == null) {
+ throw new NullPointerException("job is null");
+ }
+
try {
String jobPath = getPathForJob(jobDescription);
if (fileSystem.exists(jobPath)) {
@@ -506,7 +520,11 @@
}
public void submitJob(JobDescription jobDescription) throws DisworkException {
+ if (jobDescription == null) {
+ throw new NullPointerException("job is null");
+ }
+
// check all dependencies are provided
for (String name : jobDescription.getInput()) {
if (!jobDescription.getInputData().containsKey(name) &&
@@ -606,18 +624,34 @@
}
public boolean isStarted(JobDescription job) throws DisworkException {
+ if (job == null) {
+ throw new NullPointerException("job is null");
+ }
+
return checkLogContains(job, LOG_KEYWORD_STARTED);
}
public boolean isFinished(JobDescription job) throws DisworkException {
+ if (job == null) {
+ throw new NullPointerException("job is null");
+ }
+
return checkLogContains(job, LOG_KEYWORD_FINISHED);
}
public boolean isSuccessful(JobDescription job) throws DisworkException {
+ if (job == null) {
+ throw new NullPointerException("job is null");
+ }
+
return checkLogContains(job, LOG_KEYWORD_DONE);
}
public boolean isFailed(JobDescription job) throws DisworkException {
+ if (job == null) {
+ throw new NullPointerException("job is null");
+ }
+
return isFinished(job) && !isSuccessful(job);
}
@@ -631,6 +665,11 @@
*/
public Map<String, InputStream> getResults(JobDescription job)
throws DisworkException {
+
+ if (job == null) {
+ throw new NullPointerException("job is null");
+ }
+
if (isFinished(job)) {
Map<String, InputStream> results = new HashMap<String, InputStream>();
for (String fileName : job.getOutput()) {
@@ -684,7 +723,6 @@
Long totalUptime = getTotalUptime();
log.info("saving total uptime: " + totalUptime);
config.setTotalUptime(totalUptime);
- //config.saveForUser();
try {
fileSystem.close();
@@ -757,7 +795,6 @@
log.info("global stats file doesn't exists, creating one");
Map<String, Long> stats = new HashMap<String, Long>();
- Long availableProcessors = 0L;
Map<String, String> result = new HashMap<String, String>();
List<String> homeDirs = fileSystem.readDirectory(HOME);
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java 2010-07-21 09:05:57 UTC (rev 104)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java 2010-07-22 08:22:04 UTC (rev 105)
@@ -76,11 +76,9 @@
*/
protected void init(String[] args) throws DisworkException {
config = new DisworkConfig();
- config.setFileSystemConfig(
- DisworkFileSystemConfig.newKademliaDisworkConfig());
+ config.setFileSystemConfig(new DisworkFileSystemConfig());
if (args.length == 2) {
- config.setFileSystemConfig(
- DisworkFileSystemConfig.newKademliaDisworkConfig(args[0],
+ config.setFileSystemConfig(new DisworkFileSystemConfig(args[0],
Integer.parseInt(args[1])));
}
config.setActivityStrategy("unlimited");
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSimpleClient.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSimpleClient.java 2010-07-21 09:05:57 UTC (rev 104)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSimpleClient.java 2010-07-22 08:22:04 UTC (rev 105)
@@ -133,8 +133,7 @@
DisworkConfig config = new DisworkConfig();
Integer port = Integer.parseInt(args[1]);
System.out.println("port = " + port);
- config.setFileSystemConfig(
- DisworkFileSystemConfig.newKademliaDisworkConfig(args[0], port));
+ config.setFileSystemConfig(new DisworkFileSystemConfig(args[0], port));
config.setActivityStrategy("none");
config.setUsedPort(30000);
config.setStartHttpFrontend(false);
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-07-21 09:05:57 UTC (rev 104)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-07-22 08:22:04 UTC (rev 105)
@@ -442,6 +442,7 @@
OutputReader outputReader = new OutputReader(currentProcess.getInputStream(),
outputFileStream);
+ log.info("starting job now");
outputReader.start();
// plugging a file on the standard input
@@ -472,7 +473,6 @@
protected void stageOutputFiles() throws DisworkSystemException, BadJobException {
// output file staging
for (String fileName : currentJob.getOutput()) {
- log.info("staging file " + fileName);
File localCopy = new File(currentJobDir, fileName);
if (localCopy.exists()) {
@@ -487,7 +487,8 @@
if (fileSystem.exists(filePath)) {
fileSystem.delete(filePath);
}
-
+
+ log.info("staging file " + fileName);
fileSystem.write(filePath, localCopyStream);
} catch (FileNotFoundException e) {
// file exists, tested just before
@@ -498,7 +499,7 @@
IOUtils.closeQuietly(localCopyStream);
}
} else {
- throw new BadJobException("job " + currentJob + " do not produces a file" + fileName);
+ log.warn("job " + currentJob + " do not produces a file " + fileName);
}
}
}
@@ -549,6 +550,7 @@
currentJobDir = FileUtil.createTempDirectory("job", "",
new File(config.getTempDirectory()));
currentJobDir.mkdirs();
+ log.info("created directory " + currentJobDir.getAbsolutePath());
} catch (IOException e) {
log.error("unable to create temp directory for job", e);
throw new LocalFileException("unable to create temp directory for job", e);
@@ -571,7 +573,9 @@
try {
// wait for the process to return
+ log.info("waiting for the end of the process");
int returnValue = currentProcess.waitFor();
+ log.info("process ended");
// process returned and was not interrupted, output files
// are interesting in both successful and failure case
@@ -584,8 +588,15 @@
// job is successful
jobIsSuccessful(currentJobPath);
} else {
- // job is a failure
- jobIsFailed(currentJobPath);
+ // check if it's due to a destroy call
+ if (shouldStop) {
+ // process has returned because stop() called Process.destroy()
+ // so it's not job's fault and should not be considered has a failure
+ jobIsInterrupted(currentJobPath);
+ } else {
+ // job is a failure, the process returned an error
+ jobIsFailed(currentJobPath);
+ }
}
} catch (InterruptedException e) {
// job was interrupted maybe by a call to WokersManager#stop()
@@ -787,8 +798,8 @@
String newJobPath = DisworkDaemon.DONE + "/" + DisworkDaemon.newJobLinkName();
try {
// FIXME 20100720 bleny really useful ?
- log.info("moving " + jobPath + " to " + newJobPath);
fileSystem.move(jobPath, newJobPath);
+ log.info("moved " + jobPath + " to " + newJobPath);
} catch (DisworkFileSystemException e) {
log.error("error while moving job link", e);
throw new DisworkSystemException("error while moving job link", e);
@@ -809,9 +820,9 @@
String newJobPath = newDir + "/" + DisworkDaemon.newJobLinkName();
try {
- log.info("moving " + jobPath + " to " + newJobPath);
// FIXME 20100720 bleny really useful to move to FAILED_3, a dir never read by anyone
fileSystem.move(jobPath, newJobPath);
+ log.info("moved " + jobPath + " to " + newJobPath);
} catch (DisworkFileSystemException e) {
log.error("error while moving job link", e);
throw new DisworkSystemException("error while moving job link", e);
@@ -850,6 +861,8 @@
protected void jobIsStarted(String jobPath) throws DisworkSystemException {
log(jobPath, DisworkDaemon.LOG_KEYWORD_STARTED);
}
+
+ protected static final boolean NEVER_STOP_MODE = false;
/**
* Until shouldStop become true, check {@link WorkersManager#flag} value.
@@ -894,8 +907,12 @@
}
} catch (DisworkException e) {
log.warn("exception caught by worker", e);
- // no exception is thrown because we want the worker to continue
- // whatever occurs
+ if (NEVER_STOP_MODE) {
+ // no exception is thrown because we want the worker to continue
+ // whatever occurs
+ } else {
+ throw new RuntimeException(e);
+ }
}
}
}
@@ -930,9 +947,11 @@
/** read an application from the file system and use a cache
* @throws DisworkSystemException
* @throws LocalFileException */
- protected File getApplicationData(String applicationName,
- String applicationVersion)
- throws DisworkException {
+ protected synchronized File getApplicationData(String applicationName,
+ String applicationVersion)
+ throws DisworkException {
+ // method is synchronized to prevent multiple workers to download
+ // the same application at the same time
if (!applicationCache.exists()) {
applicationCache.mkdirs();
}
Modified: trunk/diswork-daemon/src/main/resources/log4j.properties
===================================================================
--- trunk/diswork-daemon/src/main/resources/log4j.properties 2010-07-21 09:05:57 UTC (rev 104)
+++ trunk/diswork-daemon/src/main/resources/log4j.properties 2010-07-22 08:22:04 UTC (rev 105)
@@ -4,6 +4,7 @@
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %5p [%t] (%F:%L) %M - %m%n
+
# package level
-log4j.logger.org.nuiton.diswork.fs.storage.KademliaDisworkMap=INFO
-log4j.logger.org.nuiton.diswork.daemon=INFO
\ No newline at end of file
+log4j.logger.org.nuiton.diswork.daemon=DEBUG
+log4j.logger.org.nuiton.diswork=INFO
\ No newline at end of file
Modified: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java
===================================================================
--- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java 2010-07-21 09:05:57 UTC (rev 104)
+++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java 2010-07-22 08:22:04 UTC (rev 105)
@@ -43,7 +43,7 @@
// config.setActivityStrategy("none");
config2 = newConfig();
- config2.setBootstrapIp(DisworkFileSystemConfig.getIp());
+ config2.setBootstrapIp(DisworkFileSystemConfig.getLocalIp());
config2.setBootstrapPort(config.getUsedPort());
config2.setActivityStrategy("unlimited");
}
Modified: trunk/diswork-daemon/src/test/resources/log4j.properties
===================================================================
--- trunk/diswork-daemon/src/test/resources/log4j.properties 2010-07-21 09:05:57 UTC (rev 104)
+++ trunk/diswork-daemon/src/test/resources/log4j.properties 2010-07-22 08:22:04 UTC (rev 105)
@@ -4,7 +4,7 @@
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %5p [%t] (%F:%L) %M - %m%n
+
# package level
-log4j.logger.org.nuiton.diswork.fs.storage.KademliaDisworkMap=INFO
log4j.logger.org.nuiton.diswork.daemon=DEBUG
-log4j.logger.org.planx.xmlstore.routing.messaging.MessageServer=INFO
\ No newline at end of file
+log4j.logger.org.nuiton.diswork=INFO
\ No newline at end of file
Modified: trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/Demo.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/Demo.java 2010-07-21 09:05:57 UTC (rev 104)
+++ trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/Demo.java 2010-07-22 08:22:04 UTC (rev 105)
@@ -195,8 +195,7 @@
*/
public static void main(String[] args) throws Exception {
if (args.length == 2) {
- DisworkFileSystemConfig config =
- DisworkFileSystemConfig.newKademliaDisworkConfig();
+ DisworkFileSystemConfig config = new DisworkFileSystemConfig();
config.setOption("diswork.fs.use_port", args[1]);
fileSystem = new DisworkFileSystem(config);
if ("producer".equals(args[0])) {
@@ -209,8 +208,7 @@
t.start();
}
} else if (args.length == 4) {
- DisworkFileSystemConfig config =
- DisworkFileSystemConfig.newKademliaDisworkConfig();
+ DisworkFileSystemConfig config = new DisworkFileSystemConfig();
config.setOption("diswork.fs.use_port", args[1]);
config.setOption("diswork.fs.bootstrap.ip", args[2]);
config.setOption("diswork.fs.bootstrap.port", args[3]);
Modified: trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystem.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystem.java 2010-07-21 09:05:57 UTC (rev 104)
+++ trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystem.java 2010-07-22 08:22:04 UTC (rev 105)
@@ -583,7 +583,7 @@
}
}
- log.info("readDirectory " + path + " returns " + result.size() + " results");
+ log.debug("readDirectory " + path + " returns " + result.size() + " results");
return result;
}
@@ -776,7 +776,7 @@
*/
protected String walk(String path) throws DisworkFileSystemException {
String result = walk(path, null, null);
- log.info("walking to " + path + " returns " + result);
+ log.debug("walking to " + path + " returns " + result);
return result;
}
@@ -817,7 +817,7 @@
String tail = path.substring(current.length());
String p = EntryUtil.getNameFromPath(tail);
- log.info("in final dir " + current + ", looking for " + p);
+ log.trace("in final dir " + current + ", looking for " + p);
String entry = EntryUtil.findEntryInDirectory(content, p);
result = entry;
@@ -835,12 +835,12 @@
tail = path.substring(current.length() + 1);
}
- log.debug("current = " + current);
- log.debug("tail = " + tail);
+ log.trace("current = " + current);
+ log.trace("tail = " + tail);
String[] elementsNames = tail.split(EntryUtil.PATH_SEPARATOR);
String p = elementsNames[0];
- log.info("in intermediate dir " + current + ", looking for " + p);
+ log.trace("in intermediate dir " + current + ", looking for " + p);
// updating current for recursion
if (current.equals(EntryUtil.ROOT_DIRECTORY)) {
Modified: trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java 2010-07-21 09:05:57 UTC (rev 104)
+++ trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java 2010-07-22 08:22:04 UTC (rev 105)
@@ -51,11 +51,11 @@
* a new port and the local IP.
*
* <pre>
- * c = newDisworkConfig(); // create a config for a bootstrap node
- * c2 = newDisworkConfig(c.getUsedPort()) // creates a config for a node that
- * // will bootstrap by joining the
- * // first node
- * c3 = newDisworkConfig(c.getUsedPort())
+ * c = DisworkFileSystemConfig(); // create a config for a bootstrap node
+ * c2 = DisworkFileSystemConfig(c.getUsedPort()) // creates a config for a node that
+ * // will bootstrap by joining the
+ * // first node
+ * c3 = DisworkFileSystemConfig(c.getUsedPort())
* </pre>
*/
public class DisworkFileSystemConfig extends ApplicationConfig {
@@ -80,7 +80,7 @@
* @return
* @throws UnknownHostException
*/
- public static String getIp() {
+ public static String getLocalIp() {
InetAddress result = null;
try {
result = InetAddress.getLocalHost();
@@ -102,10 +102,23 @@
}
public DisworkFileSystemConfig() {
+ this(null);
+ }
+
+ public DisworkFileSystemConfig(Integer bootstrapPort) {
+ this(bootstrapPort == null ? null : getLocalIp(), bootstrapPort);
+ }
+
+ public DisworkFileSystemConfig(String bootstrapIP, Integer bootstrapPort) {
setDefaultOption("diswork.fs.blocks_size", "10485760"); // 10 MiB
- setDefaultOption("diswork.fs.map_type", "inmemory");
- setDefaultOption("diswork.fs.use_port", port.toString());
+ setDefaultOption("diswork.fs.map_type", "kademlia");
+ setDefaultOption("diswork.fs.use_port", getPort().toString());
+
+ if (bootstrapIP != null) {
+ setOption("diswork.fs.bootstrap.ip", bootstrapIP);
+ setOption("diswork.fs.bootstrap.port", bootstrapPort.toString());
+ }
}
public void setBlockSize(Integer size) {
@@ -144,60 +157,7 @@
return getOption("diswork.fs.map_type");
}
- /**
- * returns a @link {@link DisworkFileSystemConfig} ready to be use as a
- * config for a single-node instance of DisworkFS
- * @return
- * @throws UnknownHostException
- */
- public static DisworkFileSystemConfig newPastryDisworkConfig() {
- return newPastryDisworkConfig(null);
+ public void setMapType(String mapType) {
+ setOption("diswork.fs.map_type", mapType);
}
-
- /**
- * returns a @link {@link DisworkFileSystemConfig} ready to be use as a
- * config for a multiple-node instance of DisworkFS on a same machine.
- * @param bootstrapPort the port on the same machine where another node can
- * be found to bootstrap
- * @return a complete config
- * @throws UnknownHostException
- */
- public static DisworkFileSystemConfig
- newPastryDisworkConfig(Integer bootstrapPort) {
- DisworkFileSystemConfig result = new DisworkFileSystemConfig();
- String port = getPort().toString();
- String ip = getIp();
- result.setOption("diswork.fs.map_type", "pastry");
- result.setOption("diswork.fs.use_port", port);
- result.setOption("diswork.fs.bootstrap.ip", ip);
- if (bootstrapPort == null) {
- result.setOption("diswork.fs.bootstrap.port", port);
- } else {
- result.setOption("diswork.fs.bootstrap.port", bootstrapPort.toString());
- }
- return result;
- }
-
- public static DisworkFileSystemConfig newKademliaDisworkConfig() {
- return newKademliaDisworkConfig(null);
- }
-
- public static DisworkFileSystemConfig
- newKademliaDisworkConfig (Integer bootstrapPort) {
- return newKademliaDisworkConfig(getIp(), bootstrapPort);
- }
-
- public static DisworkFileSystemConfig
- newKademliaDisworkConfig (String bootstrapIp,
- Integer bootstrapPort) {
- DisworkFileSystemConfig result = new DisworkFileSystemConfig();
- String port = getPort().toString();
- result.setOption("diswork.fs.map_type", "kademlia");
- result.setOption("diswork.fs.use_port", port);
- if (bootstrapPort != null) {
- result.setOption("diswork.fs.bootstrap.port", bootstrapPort.toString());
- result.setOption("diswork.fs.bootstrap.ip", bootstrapIp);
- }
- return result;
- }
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemException.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemException.java 2010-07-21 09:05:57 UTC (rev 104)
+++ trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemException.java 2010-07-22 08:22:04 UTC (rev 105)
@@ -69,23 +69,22 @@
/** the symlink point to a target that is not valid */
INVALID_TARGET
}
+
+ protected Type type;
- protected Type type;
-
public DisworkFileSystemException(Type type, String message,
Throwable cause) {
- super(message, cause);
+ super("[" + type + "]" + message, cause);
this.type = type;
}
public DisworkFileSystemException(Type type, String message) {
- super(message);
+ super("[" + type + "]" + message);
this.type = type;
}
public DisworkFileSystemException(Type type, Throwable cause) {
- super(cause);
- this.type = type;
+ this(type, "", cause);
}
public Type getType() {
Modified: trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/peerunit/DisworkFileSystemTest.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/peerunit/DisworkFileSystemTest.java 2010-07-21 09:05:57 UTC (rev 104)
+++ trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/peerunit/DisworkFileSystemTest.java 2010-07-22 08:22:04 UTC (rev 105)
@@ -24,8 +24,8 @@
*/
package org.nuiton.diswork.fs.peerunit;
+import static fr.inria.peerunit.test.assertion.Assert.assertEquals;
import static fr.inria.peerunit.test.assertion.Assert.assertTrue;
-import static fr.inria.peerunit.test.assertion.Assert.assertEquals;
import static fr.inria.peerunit.test.assertion.Assert.fail;
import java.io.ByteArrayInputStream;
@@ -55,15 +55,15 @@
@TestStep(range="0",timeout=1000000, order = 0)
public void testConnect() throws Exception {
- DisworkFileSystemConfig config =
- DisworkFileSystemConfig.newKademliaDisworkConfig();
+ DisworkFileSystemConfig config = new DisworkFileSystemConfig();
+ config.setMapType("kademlia");
config.setUsedPort(31000);
fileSystem = new DisworkFileSystem(config);
config.printConfig();
- log.info("i bootstrap : " + DisworkFileSystemConfig.getIp() + ":" + 31000);
- put(0, DisworkFileSystemConfig.getIp());
+ log.info("i bootstrap : " + DisworkFileSystemConfig.getLocalIp() + ":" + 31000);
+ put(0, DisworkFileSystemConfig.getLocalIp());
put(1, 31000);
}
@@ -72,8 +72,8 @@
public void testConnect1() throws Exception {
// Integer myPort = port + getId();
Integer myPort = 31001;
- DisworkFileSystemConfig config =
- DisworkFileSystemConfig.newKademliaDisworkConfig();
+ DisworkFileSystemConfig config = new DisworkFileSystemConfig();
+ config.setMapType("kademlia");
config.setUsedPort(myPort);
// get bootstrap info
Modified: trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/Storage.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/Storage.java 2010-07-21 09:05:57 UTC (rev 104)
+++ trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/Storage.java 2010-07-22 08:22:04 UTC (rev 105)
@@ -310,7 +310,7 @@
if (lockAcquired) {
try {
- log.info("lock on " + key + " acquired");
+ log.debug("lock on " + key + " acquired");
// here, we know we can write or an exception
// would have been thrown earlier
@@ -419,7 +419,7 @@
*/
public boolean tryToLock(String key) {
// trying to acquire lock
- log.info("trying to acquire a lock on " + key);
+ log.debug("trying to acquire a lock on " + key);
String lockKey = keyToLockKey(key);
byte[] lock = map.put(lockKey, EntryUtil.newLock(ownerId));
@@ -473,8 +473,8 @@
if (obsoleteMetaBlock != null) {
String[] obsoleteBlocksIds =
EntryUtil.getBlockIdsFromMetaBlock(obsoleteMetaBlock);
- log.info("removing " + obsoleteBlocksIds.length +
- " old blocks");
+ log.debug("removing " + obsoleteBlocksIds.length +
+ " old blocks");
for (String obsoleteBlockId : obsoleteBlocksIds) {
removeKey(obsoleteBlockId);
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/package-info.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/package-info.java 2010-07-21 09:05:57 UTC (rev 104)
+++ trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/package-info.java 2010-07-22 08:22:04 UTC (rev 105)
@@ -24,7 +24,7 @@
*/
/**
* <p>
- * This package provides to {@link org.nuiton.disworkfs.DisworkFileSystem} a
+ * This package provides to {@link org.nuiton.diswork.fs.DisworkFileSystem} a
* way to persistently store data. This is done by the
* {@link org.nuiton.diswork.fs.storage.Storage} class
* which permit to store different type of data.
@@ -224,7 +224,7 @@
* when needed (see
* {@link org.nuiton.diswork.fs.storage.Storage.SplitBlocksInputStream}).
* When writing, data are split in blocks of a maximum configurable size
- * (see {@link org.nuiton.disworkfs.DisworkFileSystemConfig}).
+ * (see {@link org.nuiton.diswork.fs.DisworkFileSystemConfig}).
* </p>
*/
Modified: trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemInMemoryTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemInMemoryTest.java 2010-07-21 09:05:57 UTC (rev 104)
+++ trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemInMemoryTest.java 2010-07-22 08:22:04 UTC (rev 105)
@@ -30,8 +30,6 @@
import org.junit.Before;
import org.junit.Test;
-import org.nuiton.diswork.fs.DisworkFileSystem;
-import org.nuiton.diswork.fs.DisworkFileSystemConfig;
public class DisworkFileSystemInMemoryTest extends AbstractDisworkFileSystemTest {
@@ -43,7 +41,7 @@
public void setUpFileSystem() throws Exception {
// finally, initiate the fileSystem
DisworkFileSystemConfig disworkConfig = new DisworkFileSystemConfig();
- disworkConfig.setOption("diswork.fs.map_type", "inmemory");
+ disworkConfig.setMapType("inmemory");
fileSystem = new DisworkFileSystem(disworkConfig);
}
Modified: trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemKademliaTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemKademliaTest.java 2010-07-21 09:05:57 UTC (rev 104)
+++ trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemKademliaTest.java 2010-07-22 08:22:04 UTC (rev 105)
@@ -47,8 +47,8 @@
@Before
public void setUpFileSystem() throws Exception {
// finally, initiate the fileSystem
- DisworkFileSystemConfig disworkConfig1 =
- DisworkFileSystemConfig.newKademliaDisworkConfig();
+ DisworkFileSystemConfig disworkConfig1 = new DisworkFileSystemConfig();
+ disworkConfig1.setMapType("kademlia");
bootstrapPort = disworkConfig1.getUsedPort();
fileSystem = new DisworkFileSystem(disworkConfig1);
}
@@ -56,8 +56,8 @@
@Test
public void testMultipleNodes1() throws Exception {
- DisworkFileSystemConfig disworkConfig =
- DisworkFileSystemConfig.newKademliaDisworkConfig(bootstrapPort);
+ DisworkFileSystemConfig disworkConfig = new DisworkFileSystemConfig(bootstrapPort);
+ disworkConfig.setMapType("kademlia");
DisworkFileSystem fileSystem2 = new DisworkFileSystem(disworkConfig);
fileSystem.write("/file", new FileInputStream(randomFilePath));
@@ -66,8 +66,8 @@
@Test
public void testMultipleNodes2() throws Exception {
- DisworkFileSystemConfig disworkConfig =
- DisworkFileSystemConfig.newKademliaDisworkConfig(bootstrapPort);
+ DisworkFileSystemConfig disworkConfig = new DisworkFileSystemConfig(bootstrapPort);
+ disworkConfig.setMapType("kademlia");
DisworkFileSystem fileSystem2 = new DisworkFileSystem(disworkConfig);
InputStream source = null;
@@ -109,8 +109,8 @@
@Test
public void testMultipleNodes3() throws Exception {
- DisworkFileSystemConfig disworkConfig =
- DisworkFileSystemConfig.newKademliaDisworkConfig(bootstrapPort);
+ DisworkFileSystemConfig disworkConfig = new DisworkFileSystemConfig(bootstrapPort);
+ disworkConfig.setMapType("kademlia");
DisworkFileSystem fileSystem2 = new DisworkFileSystem(disworkConfig);
fileSystem.createDirectory("/mydir");
Modified: trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemPastryTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemPastryTest.java 2010-07-21 09:05:57 UTC (rev 104)
+++ trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemPastryTest.java 2010-07-22 08:22:04 UTC (rev 105)
@@ -35,8 +35,6 @@
import org.apache.commons.io.IOUtils;
import org.junit.Before;
import org.junit.Test;
-import org.nuiton.diswork.fs.DisworkFileSystem;
-import org.nuiton.diswork.fs.DisworkFileSystemConfig;
public class DisworkFileSystemPastryTest extends AbstractDisworkFileSystemTest {
@@ -49,7 +47,8 @@
@Before
public void setUpFileSystem() throws Exception {
// finally, initiate the fileSystem
- DisworkFileSystemConfig disworkConfig1 = DisworkFileSystemConfig.newPastryDisworkConfig();
+ DisworkFileSystemConfig disworkConfig1 = new DisworkFileSystemConfig();
+ disworkConfig1.setMapType("pastry");
bootstrapPort = disworkConfig1.getUsedPort();
fileSystem = new DisworkFileSystem(disworkConfig1);
@@ -58,7 +57,8 @@
@Test
public void testMultipleNodes1() throws Exception {
- DisworkFileSystemConfig disworkConfig = DisworkFileSystemConfig.newPastryDisworkConfig(bootstrapPort);
+ DisworkFileSystemConfig disworkConfig = new DisworkFileSystemConfig(bootstrapPort);
+ disworkConfig.setMapType("pastry");
DisworkFileSystem fileSystem2 = new DisworkFileSystem(disworkConfig);
assertTrue(fileSystem.exists("/"));
@@ -67,7 +67,8 @@
@Test
public void testMultipleNodes2() throws Exception {
- DisworkFileSystemConfig disworkConfig = DisworkFileSystemConfig.newPastryDisworkConfig(bootstrapPort);
+ DisworkFileSystemConfig disworkConfig = new DisworkFileSystemConfig(bootstrapPort);
+ disworkConfig.setMapType("pastry");
DisworkFileSystem fileSystem2 = new DisworkFileSystem(disworkConfig);
byte[] bytes = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
Modified: trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/KademliaDisworkMapTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/KademliaDisworkMapTest.java 2010-07-21 09:05:57 UTC (rev 104)
+++ trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/KademliaDisworkMapTest.java 2010-07-22 08:22:04 UTC (rev 105)
@@ -44,12 +44,10 @@
@Before
public void setUp() throws Exception {
- DisworkFileSystemConfig config1 =
- DisworkFileSystemConfig.newKademliaDisworkConfig();
+ DisworkFileSystemConfig config1 = new DisworkFileSystemConfig();
map1 = new KademliaDisworkMap(config1);
- DisworkFileSystemConfig config2 = DisworkFileSystemConfig
- .newKademliaDisworkConfig(config1.getUsedPort());
+ DisworkFileSystemConfig config2 = new DisworkFileSystemConfig(config1.getUsedPort());
map2 = new KademliaDisworkMap(config2);
}
@@ -99,8 +97,7 @@
*/
@Test
public void testBadBootrap() throws Exception {
- DisworkFileSystemConfig config1 =
- DisworkFileSystemConfig.newKademliaDisworkConfig();
+ DisworkFileSystemConfig config1 = new DisworkFileSystemConfig();
config1.setBootstrapIp("microsoft.com");
config1.setBootstrapPort(80);
try {
Modified: trunk/diswork-fs/src/test/resources/log4j.properties
===================================================================
--- trunk/diswork-fs/src/test/resources/log4j.properties 2010-07-21 09:05:57 UTC (rev 104)
+++ trunk/diswork-fs/src/test/resources/log4j.properties 2010-07-22 08:22:04 UTC (rev 105)
@@ -4,9 +4,15 @@
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %5p [%t] (%F:%L) %M - %m%n
+
# package level
-log4j.logger.org.nuiton.diswork.fs=WARN
+log4j.logger.org.nuiton.diswork.fs=INFO
+
+log4j.logger.org.nuiton.diswork.fs.storage.MojitoDisworkMap=DEBUG
log4j.logger.org.nuiton.diswork.fs.storage.KademliaDisworkMap=INFO
-log4j.logger.org.nuiton.diswork.fs.Demo=INFO
-log4j.logger.org.planx.xmlstore.routing=DEBUG
-log4j.logger.org.nuiton.diswork.fs.storage.AbstractDisworkMapTest=DEBUG
\ No newline at end of file
+
+# dependencies
+log4j.logger.org.planx.xmlstore=WARN
+
+# executables
+log4j.logger.org.nuiton.diswork.fs.Demo=INFO
\ No newline at end of file
1
0
21 Jul '10
Author: bleny
Date: 2010-07-21 11:05:57 +0200 (Wed, 21 Jul 2010)
New Revision: 104
Url: http://nuiton.org/repositories/revision/diswork/104
Log:
m?\195?\160j en-tete des fichiers (tests), doc
Modified:
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadConfigurationException.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadJobException.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSystemException.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/LocalFileException.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/package-info.java
trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkConfigTest.java
trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java
trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java
trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java
trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/JobDescriptionTest.java
trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystem.java
trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemException.java
trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/Storage.java
trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/AbstractDisworkFileSystemTest.java
trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemInMemoryTest.java
trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemKademliaTest.java
trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemPastryTest.java
trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/AbstractDisworkMapTest.java
trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/EntryUtilTest.java
trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/InMemoryDisworkMapTest.java
trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/KademliaDisworkMapTest.java
trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/PastryDisworkMapTest.java
trunk/src/license/project.xml
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadConfigurationException.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadConfigurationException.java 2010-07-13 13:31:41 UTC (rev 103)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadConfigurationException.java 2010-07-21 09:05:57 UTC (rev 104)
@@ -1,3 +1,27 @@
+/*
+ * #%L
+ * Diswork daemon
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
package org.nuiton.diswork.daemon;
/**
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadJobException.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadJobException.java 2010-07-13 13:31:41 UTC (rev 103)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadJobException.java 2010-07-21 09:05:57 UTC (rev 104)
@@ -1,3 +1,27 @@
+/*
+ * #%L
+ * Diswork daemon
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
package org.nuiton.diswork.daemon;
/**
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java 2010-07-13 13:31:41 UTC (rev 103)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java 2010-07-21 09:05:57 UTC (rev 104)
@@ -294,7 +294,7 @@
// FIXME 20100607 bleny its not config data, it should be moved to a persistent file
protected void save() {
- // saveForUser(null);
+ saveForUser();
}
protected String getOwnerId() {
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java 2010-07-13 13:31:41 UTC (rev 103)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java 2010-07-21 09:05:57 UTC (rev 104)
@@ -24,6 +24,20 @@
*/
package org.nuiton.diswork.daemon;
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
@@ -34,12 +48,6 @@
import org.nuiton.diswork.fs.DisworkFileSystemException.Type;
import org.nuiton.util.FileUtil;
-import java.io.*;
-import java.lang.management.ManagementFactory;
-import java.lang.management.OperatingSystemMXBean;
-import java.text.NumberFormat;
-import java.util.*;
-
/**
* The diswork daemon is the gateway to the global diswork system. Instantiate
* this class creates a new node on the system. The new node can be used
@@ -420,7 +428,10 @@
fileSystem.createDirectory(applicationDirectory);
}
- if (!fileSystem.exists(path)) {
+ if (fileSystem.exists(path)) {
+ log.info("application " + applicationName + " is already available"
+ + " in version " + applicationVersion + " submit ignored");
+ } else {
fileSystem.write(path, applicationData);
}
} catch (DisworkFileSystemException e) {
@@ -630,6 +641,8 @@
} catch (DisworkFileSystemException e) {
log.error("file system error ", e);
throw new DisworkException("file system error ", e);
+ } catch (FileNotFoundException e) {
+ log.warn("expected output file was not found : " + fileName, e);
}
}
return results;
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSystemException.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSystemException.java 2010-07-13 13:31:41 UTC (rev 103)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSystemException.java 2010-07-21 09:05:57 UTC (rev 104)
@@ -1,3 +1,27 @@
+/*
+ * #%L
+ * Diswork daemon
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
package org.nuiton.diswork.daemon;
/**
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java 2010-07-13 13:31:41 UTC (rev 103)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java 2010-07-21 09:05:57 UTC (rev 104)
@@ -1,3 +1,27 @@
+/*
+ * #%L
+ * Diswork daemon
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
package org.nuiton.diswork.daemon;
import java.io.IOException;
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/LocalFileException.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/LocalFileException.java 2010-07-13 13:31:41 UTC (rev 103)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/LocalFileException.java 2010-07-21 09:05:57 UTC (rev 104)
@@ -1,3 +1,27 @@
+/*
+ * #%L
+ * Diswork daemon
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
package org.nuiton.diswork.daemon;
/**
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-07-13 13:31:41 UTC (rev 103)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-07-21 09:05:57 UTC (rev 104)
@@ -24,21 +24,37 @@
*/
package org.nuiton.diswork.daemon;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuiton.diswork.daemon.ActivityStrategy.ActivityStrategies;
+import org.nuiton.diswork.daemon.WorkersManager.Worker;
import org.nuiton.diswork.fs.DisworkFileSystem;
import org.nuiton.diswork.fs.DisworkFileSystemException;
import org.nuiton.util.FileUtil;
import org.nuiton.util.StringUtil;
import org.nuiton.util.ZipUtil;
-import java.io.*;
-import java.net.URL;
-import java.util.*;
-
/**
* The workers-manager aims to run and manage the different workers. A worker
* is a thread that try to find a jobs and execute them.
@@ -271,7 +287,7 @@
}
/**
- * this method add a line to a job-specific log
+ * add a line to a job-specific log
* @param jobPath the path to the job concerned
* @param messages the line(s) to add to the log
* @throws DisworkSystemException if an error occurred while writing
@@ -298,8 +314,14 @@
throw new DisworkSystemException("unable to read log file", e);
}
}
-
- protected void downloadApplication() throws DisworkSystemException, LocalFileException {
+
+ /**
+ * Check if the current job need an application. If needed,
+ * download the application and unzip it in current job temp
+ * dir
+ * @throws DisworkException
+ */
+ protected void downloadApplication() throws DisworkException {
// download application
if (currentJob.getApplicationName() != null) {
log.info("dependency needed for " + currentJob + " (" +
@@ -321,12 +343,24 @@
log.info("no dependency specified for " + currentJob);
}
}
-
- protected void stageInputFiles() throws DisworkSystemException, LocalFileException {
+
+ /**
+ * For all the input files of the current job, download them
+ * to the current job temp dir.
+ * @throws DisworkException
+ */
+ protected void stageInputFiles() throws DisworkException {
// staging input files
for (String fileName : currentJob.getInput()) {
log.info("staging " + fileName);
+
+ // source is a stream containing the data of the
+ // input file
InputStream source = null;
+
+ // set source according to job description, source
+ // may be obtained via an URL or via the Diswork
+ // File system
if (currentJob.getInputUrls().containsKey(fileName)) {
// download this file from URL
URL url = currentJob.getInputUrls().get(fileName);
@@ -344,9 +378,14 @@
} catch (DisworkFileSystemException e) {
log.error("unable to read input file from diswork", e);
throw new DisworkSystemException("unable to read input file from diswork", e);
+ } catch (FileNotFoundException e) {
+ log.warn("input file " + fileName + " is not provided", e);
+ throw new BadJobException("input file " + fileName + " is not provided", e);
}
}
+ // now, source is set, read those data and write it
+ // to a local copy in the current job temp dir
try {
File localCopy = new File(currentJobDir, fileName);
localCopy.createNewFile();
@@ -357,7 +396,15 @@
}
}
}
-
+
+ /**
+ * Compute the command-line for the current job, prepare the process
+ * by pluggin files in standard input/output. Start a thread to
+ * constantly read standard output. Finally, start the process and
+ * set {@link #currentProcess}.
+ *
+ * @throws DisworkException
+ */
protected void prepareAndRunJob() throws DisworkException {
log.info("preparing the job");
// prepare the job and run it
@@ -457,23 +504,38 @@
}
/**
- * Download all the files needed for a job in a temp directory, run
- * the job, wait for it to end, write all the results. Mark the job
- * as running at the beginning and move it to DONE or FAILED at
- * the end, depending of the results
+ * Read the JSDL file of the curernt job. create a temp directory for
+ * the current job and set {@link #currentJobDir}. Prepare the job
+ * (download application, stage input files and prepare the process).
+ *
+ * Then, if current worker is still allowed to work (if WorkersManager
+ * didn't ask to stop working recently), run the process until it returns
+ *
+ * Once the process is finished, check why and how the process returned
+ * (interrupted, successful run or failure) and act accordingly
+ * (update log and stage output files).
+ *
+ * Finally, remove temp directory created and the start of the method,
+ * unset all current variables, and update statistics about worked time.
+ *
* @throws DisworkException
*/
protected void runJob() throws DisworkException {
try {
+ // We want to have a stat about how many time is passed at
+ // working, start counting
currentProcessStartDate = System.currentTimeMillis();
log.info("running job at " + currentJobPath);
+ // read the JSDL for the current job, create the corresponding
+ // JobDescription and set currentJob
try {
String jsdlPath = currentJobPath + "/" + DisworkDaemon.JSDL_PATH;
String jsdl = IOUtils.toString(fileSystem.read(jsdlPath));
log.info("read jsdl " + jsdl);
currentJob = JobDescription.parseJSDL(jsdl);
+ log.info("will run job " + currentJob);
} catch (IOException e) {
log.error("unable to read or parse JSDL", e);
throw new DisworkSystemException("unable to read or parse JSDL", e);
@@ -482,9 +544,7 @@
throw new DisworkSystemException("unable to read JSDL", e);
}
- log.info("will run job " + currentJob);
-
- // create temp dir
+ // create a unique temp directory for this job only
try {
currentJobDir = FileUtil.createTempDirectory("job", "",
new File(config.getTempDirectory()));
@@ -493,66 +553,69 @@
log.error("unable to create temp directory for job", e);
throw new LocalFileException("unable to create temp directory for job", e);
}
-
+
+ // put application data in the temp dir
downloadApplication();
-
+
+ // put input data in temp dir
stageInputFiles();
// until there we didn't started the job, it's not too late to
- // stop, last check of shouldStrop
+ // stop, last check of shouldStrop before running the process
if (!shouldStop) {
+ // prepare the process and start currentProcess
prepareAndRunJob();
+ // mark this job as started
jobIsStarted(currentJobPath);
try {
// wait for the process to return
int returnValue = currentProcess.waitFor();
+
+ // process returned and was not interrupted, output files
+ // are interesting in both successful and failure case
+ // upload them
stageOutputFiles();
+
+ // now check is the process ended as a success or a failure
+ // and act accordingly
if (returnValue == 0) {
// job is successful
jobIsSuccessful(currentJobPath);
} else {
+ // job is a failure
jobIsFailed(currentJobPath);
}
} catch (InterruptedException e) {
+ // job was interrupted maybe by a call to WokersManager#stop()
log.debug("process was interrupted", e);
jobIsInterrupted(currentJobPath);
}
}
} catch (BadJobException e) {
+ // if job is a bad one, consider it as a failure
jobIsFailed(currentJobPath);
} finally {
+
+ // update stat about how many time was worked
if (currentProcessStartDate != null) {
Long currentTime = System.currentTimeMillis();
config.addWorkedTime(currentTime - currentProcessStartDate);
currentProcessStartDate = null;
}
+
+ // clean up the job directory
+ FileUtil.deleteRecursively(currentJobDir);
+
+ // unset all variables for this job
currentJob = null;
currentProcess = null;
- // clean up the job directory
- FileUtil.deleteRecursively(currentJobDir);
currentJobDir = null;
}
}
/**
- * In a directory, list the content, sort the content, and returns
- * the first element.
- * @throws DisworkFileSystemException
- */
- protected String getFistJobName(String path)
- throws DisworkFileSystemException {
- List<String> jobsNames = fileSystem.readDirectory(path);
- if (jobsNames.size() == 0) {
- return null;
- } else {
- Collections.sort(jobsNames);
- return jobsNames.get(0);
- }
- }
-
- /**
* browse all running directories. If a jobs is too old, it is considered
* as interrupted and moved back. All obsolete jobs found are moved to
* be available again.
@@ -564,27 +627,46 @@
// may try to do concurrent move
boolean result = false;
synchronized (fileSystem) {
+
+ // all those dirs will be browsed for unfinished obsolete jobs
String[] runningJobsDirs = { DisworkDaemon.FAILED_2_RUNNING,
DisworkDaemon.FAILED_1_RUNNING,
DisworkDaemon.TODO_RUNNING
};
+
+
for (String path : runningJobsDirs) {
try {
List<String> jobsNames = fileSystem.readDirectory(path);
+
+ // link names are dates so if we sort the content of this
+ // directory by names, old jobs are first and recent are
+ // last, so we can try to read from the beginning and
+ // stop without reading all the list
Collections.sort(jobsNames);
+
+
+ boolean obsoleteJobFound = true;
Iterator<String> it = jobsNames.iterator();
- boolean obsoleteJobFound = true;
+
+ // iterate until a non-obsolete job is found or until
+ // there is no more file in this directory
while (obsoleteJobFound && it.hasNext()) {
String jobName = it.next();
Long linkAge = System.currentTimeMillis()
- Long.parseLong(jobName);
+
// check is oldest job is too old and should be
// considered has to-be-rerun
if (linkAge > MAX_JOB_RUNNING_TIME) {
+
+ // this link is too old, move it back to proposed jobs
String jobPath = path + "/" + jobName;
String newJobPath = INTERRUPTED_MOVE.get(path) + "/" + jobName;
try {
fileSystem.move(jobPath, newJobPath);
+
+ // an obsolete job was found, update the return value
result = true;
} catch (DisworkFileSystemException e) {
log.debug("failed at moving" + jobPath);
@@ -592,6 +674,8 @@
// FIXME 20100712 bleny catch the exact exception
}
} else {
+ // break the iteration, the last elements of the
+ // list are still valid
obsoleteJobFound = false;
}
}
@@ -604,6 +688,14 @@
return result;
}
+ /**
+ * In a given directory, try to find a job. If a job is found, immediatly
+ * move it to running dir and return it.
+ *
+ * @param dirPath the directory to browse for a job
+ * @return the path to the job found or null if no job found
+ * @throws DisworkSystemException
+ */
protected String findAJobInDirectory(String dirPath) throws DisworkSystemException {
// use a synchronized block because multiple workers
// may try to take a same job
@@ -637,32 +729,43 @@
}
/**
- * try to find a job, if found, take it and return the path
- * @return the path to the job, null if no job found
- * @throws DisworkSystemException
+ * Check one-by-one the job directories to find a job. If a job
+ * is found,
+ *
+ * @return the path to the
+ * @throws DisworkSystemException
*/
protected String findAJob() throws DisworkSystemException {
+ // create a list with all directories where a job can be found
List<String> jobsDirs = new ArrayList<String>();
jobsDirs.add(DisworkDaemon.FAILED_2);
jobsDirs.add(DisworkDaemon.FAILED_1);
jobsDirs.add(DisworkDaemon.TODO);
+ // For all those directories, call findAJobInDirectory until a job
+ // is found
Iterator<String> it = jobsDirs.iterator();
String result = null;
while (result == null && it.hasNext()) {
String jobDir = it.next();
result = findAJobInDirectory(jobDir);
}
-
+
+ // if all directories have been read without finding any job,
+ // check for obsolete jobs, and if an obsolete job was found,
+ // retry to find a job
if (result == null) {
+
// now, if no job was found
boolean checkResult = checkInteruptedJobs();
+
if (checkResult) {
- // obsolete jobs are now available, retry
+ // jobs that were interrupted are now made available, retry
return findAJob();
} else {
- // try again later
+ // even with obsolete jobs, nothing found, waiting
+ // before try again later
try {
log.info("look for a job was unsuccessful, will wait " + config.getJobLooksWaitTime() + " seconds before next try");
Thread.sleep(config.getJobLooksWaitTime() * 1000);
@@ -675,13 +778,15 @@
return result;
}
- /** update the log of the job
- * permit the user to use
- * {@link DisworkDaemon#isSuccessful(JobDescription)}
+ /** update the log of a given job
+ * mark it as done and finished (permit the user to use
+ * {@link DisworkDaemon#isSuccessful(JobDescription)})
+ * and move the link
*/
protected void jobIsSuccessful(String jobPath) throws DisworkSystemException {
String newJobPath = DisworkDaemon.DONE + "/" + DisworkDaemon.newJobLinkName();
try {
+ // FIXME 20100720 bleny really useful ?
log.info("moving " + jobPath + " to " + newJobPath);
fileSystem.move(jobPath, newJobPath);
} catch (DisworkFileSystemException e) {
@@ -705,6 +810,7 @@
try {
log.info("moving " + jobPath + " to " + newJobPath);
+ // FIXME 20100720 bleny really useful to move to FAILED_3, a dir never read by anyone
fileSystem.move(jobPath, newJobPath);
} catch (DisworkFileSystemException e) {
log.error("error while moving job link", e);
@@ -746,7 +852,17 @@
}
/**
- * find
+ * Until shouldStop become true, check {@link WorkersManager#flag} value.
+ * The flag tells the worker if the {@link WorkersManager#activityStrategy}
+ * allows the worker to run a job.
+ *
+ * If flag is true : try to find a job. If a job is found, {@link #currentJobPath}
+ * is set and {@link #runJob()} is called.
+ *
+ * If flag is false : it means that Activity Strategy don't want the worker
+ * to work so the worker will sleep until the Activity Strategy update the
+ * flag. Sleep time will depend of the activity strategy,
+ * see {@link ActivityStrategy#timeBeforeNextUpdate()}.
*/
@Override
public void run() {
@@ -765,7 +881,7 @@
long waitTime = activityStrategy.timeBeforeNextUpdate();
try {
if (waitTime == -1) {
- // wait until notify
+ // wait until flag is updated
sem.wait();
} else {
sem.wait(waitTime);
@@ -778,7 +894,8 @@
}
} catch (DisworkException e) {
log.warn("exception caught by worker", e);
- // throw new RuntimeException("an error occured", e);
+ // no exception is thrown because we want the worker to continue
+ // whatever occurs
}
}
}
@@ -815,7 +932,7 @@
* @throws LocalFileException */
protected File getApplicationData(String applicationName,
String applicationVersion)
- throws DisworkSystemException, LocalFileException {
+ throws DisworkException {
if (!applicationCache.exists()) {
applicationCache.mkdirs();
}
@@ -832,6 +949,9 @@
} catch (DisworkFileSystemException e) {
log.error("unable to get application", e);
throw new DisworkSystemException("unable to get application", e);
+ } catch (FileNotFoundException e) {
+ log.error("application data is not available", e);
+ throw new BadJobException("application data is not available", e);
} finally {
IOUtils.closeQuietly(applicationData);
}
@@ -857,44 +977,17 @@
return cachedApplicationData;
}
- public void stop() throws DisworkException {
- // asking to all threads to stop
+ /** get a list of jobs that workers are doing. Only for monitoring purpose.
+ *
+ * returns list which size is the number of worker. Thus, the list may
+ * contains null elements for workers that are doing nothing.
+ */
+ public List<JobDescription> getAllWorkersCurrentJobs() {
+ List<JobDescription> result = new ArrayList<JobDescription>();
for (Worker worker : workers) {
- log.debug("asking " + worker + " to stop");
- worker.shouldStop = true;
+ result.add(worker.currentJob);
}
-
- activeNoActivityStrategy();
-
- for (Worker worker : workers) {
- if (worker.currentProcess != null) {
- log.debug("killing " + worker + " process");
- worker.currentProcess.destroy();
- }
- }
-
- if (applicationCache.exists()) {
- FileUtil.deleteRecursively(applicationCache);
- }
-
- // waiting for them to actually have finished
- for (Worker worker : workers) {
- while (worker.isAlive()) {
- try {
- // worker may be sleeping
- synchronized (sem) {
- sem.notifyAll();
- }
- log.debug("waiting for " + worker + " to return");
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- log.warn("interrupted while waiting for a worker to " +
- "stop", e);
- }
- }
- }
-
- log.debug("all workers stopped");
+ return result;
}
public ActivityStrategy getActivityStrategy() {
@@ -936,12 +1029,43 @@
setActivityStrategy(ActivityStrategies.SCHEDULED);
}
- /** this is only for monitoring purpose */
- public List<JobDescription> getAllWorkersCurrentJobs() {
- List<JobDescription> result = new ArrayList<JobDescription>();
+ public void stop() throws DisworkException {
+ // asking to all threads to stop
for (Worker worker : workers) {
- result.add(worker.currentJob);
+ log.debug("asking " + worker + " to stop");
+ worker.shouldStop = true;
}
- return result;
+
+ activeNoActivityStrategy();
+
+ for (Worker worker : workers) {
+ if (worker.currentProcess != null) {
+ log.debug("killing " + worker + " process");
+ worker.currentProcess.destroy();
+ }
+ }
+
+ if (applicationCache.exists()) {
+ FileUtil.deleteRecursively(applicationCache);
+ }
+
+ // waiting for them to actually have finished
+ for (Worker worker : workers) {
+ while (worker.isAlive()) {
+ try {
+ // worker may be sleeping
+ synchronized (sem) {
+ sem.notifyAll();
+ }
+ log.debug("waiting for " + worker + " to return");
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ log.warn("interrupted while waiting for a worker to " +
+ "stop", e);
+ }
+ }
+ }
+
+ log.debug("all workers stopped");
}
}
\ No newline at end of file
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/package-info.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/package-info.java 2010-07-13 13:31:41 UTC (rev 103)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/package-info.java 2010-07-21 09:05:57 UTC (rev 104)
@@ -1,3 +1,27 @@
+/*
+ * #%L
+ * Diswork daemon
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
/**
*
* {@link org.nuiton.diswork.daemon.DisworkDaemon} is the class the that
Modified: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkConfigTest.java
===================================================================
--- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkConfigTest.java 2010-07-13 13:31:41 UTC (rev 103)
+++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkConfigTest.java 2010-07-21 09:05:57 UTC (rev 104)
@@ -1,3 +1,27 @@
+/*
+ * #%L
+ * Diswork daemon
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
package org.nuiton.diswork.daemon;
import static org.junit.Assert.assertEquals;
Modified: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java
===================================================================
--- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java 2010-07-13 13:31:41 UTC (rev 103)
+++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java 2010-07-21 09:05:57 UTC (rev 104)
@@ -1,3 +1,27 @@
+/*
+ * #%L
+ * Diswork daemon
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
package org.nuiton.diswork.daemon;
/**
Modified: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java
===================================================================
--- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java 2010-07-13 13:31:41 UTC (rev 103)
+++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java 2010-07-21 09:05:57 UTC (rev 104)
@@ -1,3 +1,27 @@
+/*
+ * #%L
+ * Diswork daemon
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
package org.nuiton.diswork.daemon;
import org.nuiton.diswork.fs.DisworkFileSystemConfig;
Modified: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java
===================================================================
--- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java 2010-07-13 13:31:41 UTC (rev 103)
+++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java 2010-07-21 09:05:57 UTC (rev 104)
@@ -1,3 +1,27 @@
+/*
+ * #%L
+ * Diswork daemon
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
package org.nuiton.diswork.daemon;
import org.apache.commons.io.IOUtils;
Modified: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/JobDescriptionTest.java
===================================================================
--- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/JobDescriptionTest.java 2010-07-13 13:31:41 UTC (rev 103)
+++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/JobDescriptionTest.java 2010-07-21 09:05:57 UTC (rev 104)
@@ -1,3 +1,27 @@
+/*
+ * #%L
+ * Diswork daemon
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
package org.nuiton.diswork.daemon;
import static org.junit.Assert.assertEquals;
Modified: trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystem.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystem.java 2010-07-13 13:31:41 UTC (rev 103)
+++ trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystem.java 2010-07-21 09:05:57 UTC (rev 104)
@@ -57,11 +57,27 @@
*/
public class DisworkFileSystem {
+ public static class PathUtil {
+ public static final String separator = EntryUtil.PATH_SEPARATOR;
+ public static final String root = EntryUtil.ROOT_DIRECTORY;
+
+ /** build an absolute path from its elements */
+ static String path(String... elements) {
+ String path = "";
+ if (elements.length == 0) {
+ path = root;
+ } else {
+ for(String element : elements) {
+ path += separator + element;
+ }
+ path.replaceFirst(separator, root);
+ }
+ return path;
+ }
+ }
+
private static final Log log = LogFactory.getLog(DisworkFileSystem.class);
- /** the symbol to use to separate directories in a path */
- public static final String separator = EntryUtil.PATH_SEPARATOR;
-
/** storage will permit to save and read directories, files and links */
protected Storage storage;
@@ -90,8 +106,8 @@
/** tests the existence of a file/dir/link at a given path
* return true if something exists at path p, it will be true if a call
- * to <code>mkdir(p)</code>, <code>write(p, ?)</code> or
- * <code>ln(p, ?)</code> has been done before.
+ * to {@link #createDirectory(String)}, {@link #write(String, InputStream)}
+ * or {@link #createSymbolicLink(String, String)} has been done before.
* @param path a path in the virtual FS
* @return true is something (a link, a file, or a directory) exists at path
* @throws IOException
@@ -111,12 +127,13 @@
* @throws FileNotFoundException if no file exists at this path
* @throws IOException if path exists but is a directory
*/
- public InputStream read(String path) throws DisworkFileSystemException {
+ public InputStream read(String path) throws DisworkFileSystemException,
+ FileNotFoundException {
checkPathSyntax(path);
String entry = walk(path);
if (entry == null) {
- throw new DisworkFileSystemException(Type.NO_SUCH_FILE, path);
+ throw new FileNotFoundException(path);
}
InputStream result = null;
@@ -149,8 +166,7 @@
throws DisworkFileSystemException {
checkPathSyntax(path);
if (source == null) {
- throw new DisworkFileSystemException(Type.READ_LOCAL_DATA_FAILURE,
- "source stream is null");
+ throw new NullPointerException("source stream is null");
}
String parent = EntryUtil.getParentFromPath(path);
String name = EntryUtil.getNameFromPath(path);
@@ -875,15 +891,13 @@
* check a path is absolute and syntactically correct, throw exception if
* that's not the case.
*/
- protected void checkPathSyntax(String path)
- throws DisworkFileSystemException {
+ protected void checkPathSyntax(String path) {
if (path == null) {
- throw new DisworkFileSystemException(Type.INVALID_PATH,
- new NullPointerException());
+ throw new NullPointerException("path is null");
}
if (!path.startsWith(EntryUtil.ROOT_DIRECTORY)) {
- throw new DisworkFileSystemException(Type.INVALID_PATH,
+ throw new IllegalArgumentException(
"\"" + path + "\" is not correct, all pathes " +
"have to be absolute (thus, starts with)" +
EntryUtil.ROOT_DIRECTORY);
@@ -892,7 +906,7 @@
String doubleSeparator = EntryUtil.PATH_SEPARATOR
+ EntryUtil.PATH_SEPARATOR;
if (path.contains(doubleSeparator)) {
- throw new DisworkFileSystemException(Type.INVALID_PATH,
+ throw new IllegalArgumentException(
"\"" + path + "\" is not correct, it contains "
+ doubleSeparator);
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemException.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemException.java 2010-07-13 13:31:41 UTC (rev 103)
+++ trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemException.java 2010-07-21 09:05:57 UTC (rev 104)
@@ -40,8 +40,6 @@
ALREADY_EXISTS,
/** Error while writing because another node is writing at this place */
CONCURRENT_MODIFICATION,
- /** String do not describe a valid path */
- INVALID_PATH,
/** The directory has a content */
DIRECTORY_NOT_EMPTY,
/** directory doesn't exists */
Modified: trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/Storage.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/Storage.java 2010-07-13 13:31:41 UTC (rev 103)
+++ trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/Storage.java 2010-07-21 09:05:57 UTC (rev 104)
@@ -246,14 +246,14 @@
log.debug("getLink(\"" + id + "\") returns \"" + content + "\"");
return content;
}
-
+
public void putDirectory(String id, String content)
throws DisworkFileSystemException {
log.debug("putDirectory(\"" + id + "\", \"" + content + "\")");
InputStream value = IOUtils.toInputStream(content);
put(id, value);
}
-
+
public void putFile(String id, InputStream content)
throws DisworkFileSystemException {
try {
@@ -264,29 +264,30 @@
throw new DisworkFileSystemException(Type.READ_LOCAL_DATA_FAILURE, e);
}
}
-
+
public void putLink(String id, String content) {
+ // FIXME 20100715 bleny since put() is not used to skip split, there is no concurrency management
log.debug("putLink(\"" + id + "\", \"" + content + "\")");
byte[] contentAsBytes = EntryUtil.stringToBytes(content);
map.put(id, contentAsBytes);
}
-
+
public void removeDirectory(String id) throws DisworkFileSystemException {
log.debug("removeDirectory(\"" + id + "\")");
remove(id);
}
-
+
public void removeFile(String id) throws DisworkFileSystemException {
log.debug("removeFile(\"" + id + "\")");
remove(id);
}
-
+
public void removeLink(String id) {
log.debug("removeLink(\"" + id + "\")");
removeKey(id);
}
-
-
+
+
/**
* see {@link #get(String)}
*/
Modified: trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/AbstractDisworkFileSystemTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/AbstractDisworkFileSystemTest.java 2010-07-13 13:31:41 UTC (rev 103)
+++ trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/AbstractDisworkFileSystemTest.java 2010-07-21 09:05:57 UTC (rev 104)
@@ -1,3 +1,27 @@
+/*
+ * #%L
+ * Diswork File-System
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
package org.nuiton.diswork.fs;
import static org.junit.Assert.assertArrayEquals;
@@ -9,6 +33,7 @@
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.InputStream;
import java.util.ConcurrentModificationException;
import java.util.List;
@@ -116,8 +141,8 @@
try {
fileSystem.read("/not_existing_file");
fail();
- } catch (DisworkFileSystemException e) {
- assertEquals(Type.NO_SUCH_FILE, e.getType());
+ } catch (FileNotFoundException e) {
+ assertTrue(true);
}
}
Modified: trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemInMemoryTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemInMemoryTest.java 2010-07-13 13:31:41 UTC (rev 103)
+++ trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemInMemoryTest.java 2010-07-21 09:05:57 UTC (rev 104)
@@ -1,3 +1,27 @@
+/*
+ * #%L
+ * Diswork File-System
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
package org.nuiton.diswork.fs;
import static org.junit.Assert.assertEquals;
Modified: trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemKademliaTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemKademliaTest.java 2010-07-13 13:31:41 UTC (rev 103)
+++ trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemKademliaTest.java 2010-07-21 09:05:57 UTC (rev 104)
@@ -1,3 +1,27 @@
+/*
+ * #%L
+ * Diswork File-System
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
package org.nuiton.diswork.fs;
import static org.junit.Assert.assertEquals;
Modified: trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemPastryTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemPastryTest.java 2010-07-13 13:31:41 UTC (rev 103)
+++ trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/DisworkFileSystemPastryTest.java 2010-07-21 09:05:57 UTC (rev 104)
@@ -1,3 +1,27 @@
+/*
+ * #%L
+ * Diswork File-System
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
package org.nuiton.diswork.fs;
import static org.junit.Assert.assertArrayEquals;
Modified: trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/AbstractDisworkMapTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/AbstractDisworkMapTest.java 2010-07-13 13:31:41 UTC (rev 103)
+++ trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/AbstractDisworkMapTest.java 2010-07-21 09:05:57 UTC (rev 104)
@@ -1,3 +1,27 @@
+/*
+ * #%L
+ * Diswork File-System
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
package org.nuiton.diswork.fs.storage;
import static org.junit.Assert.assertArrayEquals;
Modified: trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/EntryUtilTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/EntryUtilTest.java 2010-07-13 13:31:41 UTC (rev 103)
+++ trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/EntryUtilTest.java 2010-07-21 09:05:57 UTC (rev 104)
@@ -1,3 +1,27 @@
+/*
+ * #%L
+ * Diswork File-System
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
/* *##%
* Copyright (c) 2010 poussin. All rights reserved.
*
Modified: trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/InMemoryDisworkMapTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/InMemoryDisworkMapTest.java 2010-07-13 13:31:41 UTC (rev 103)
+++ trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/InMemoryDisworkMapTest.java 2010-07-21 09:05:57 UTC (rev 104)
@@ -1,3 +1,27 @@
+/*
+ * #%L
+ * Diswork File-System
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
package org.nuiton.diswork.fs.storage;
import org.junit.Before;
Modified: trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/KademliaDisworkMapTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/KademliaDisworkMapTest.java 2010-07-13 13:31:41 UTC (rev 103)
+++ trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/KademliaDisworkMapTest.java 2010-07-21 09:05:57 UTC (rev 104)
@@ -1,3 +1,27 @@
+/*
+ * #%L
+ * Diswork File-System
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
package org.nuiton.diswork.fs.storage;
import static org.junit.Assert.assertEquals;
Modified: trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/PastryDisworkMapTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/PastryDisworkMapTest.java 2010-07-13 13:31:41 UTC (rev 103)
+++ trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/storage/PastryDisworkMapTest.java 2010-07-21 09:05:57 UTC (rev 104)
@@ -1,3 +1,27 @@
+/*
+ * #%L
+ * Diswork File-System
+ *
+ * $Id$
+ * $HeadURL$
+ * %%
+ * Copyright (C) 2010 CodeLutin
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Lesser Public License for more details.
+ *
+ * You should have received a copy of the GNU General Lesser Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/lgpl-3.0.html>.
+ * #L%
+ */
package org.nuiton.diswork.fs.storage;
//package org.nuiton.disworkfs.storage;
//
Modified: trunk/src/license/project.xml
===================================================================
--- trunk/src/license/project.xml 2010-07-13 13:31:41 UTC (rev 103)
+++ trunk/src/license/project.xml 2010-07-21 09:05:57 UTC (rev 104)
@@ -9,9 +9,10 @@
<commentStyle>java</commentStyle>
<fileSets>
<fileSet>
- <basedir>src/main/java</basedir>
+ <basedir>src</basedir>
<includes>
- <include>**/*.java</include>
+ <include>main/java/**/*.java</include>
+ <include>test/java/**/*.java</include>
</includes>
</fileSet>
</fileSets>
1
0
r103 - trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon
by bleny@users.nuiton.org 13 Jul '10
by bleny@users.nuiton.org 13 Jul '10
13 Jul '10
Author: bleny
Date: 2010-07-13 15:31:41 +0200 (Tue, 13 Jul 2010)
New Revision: 103
Url: http://nuiton.org/repositories/revision/diswork/103
Log:
package doc
Added:
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/package-info.java
Added: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/package-info.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/package-info.java (rev 0)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/package-info.java 2010-07-13 13:31:41 UTC (rev 103)
@@ -0,0 +1,25 @@
+/**
+ *
+ * {@link org.nuiton.diswork.daemon.DisworkDaemon} is the class the that
+ * provides all the code needed to submit a job, monitor it, and download
+ * the results.
+ *
+ * {@link org.nuiton.diswork.daemon.WorkersManager} is the component run by the
+ * daemon. It's purpose is to look for the jobs proposed on the global system.
+ * Its role is to find jobs, execute the process, publish the status and the
+ * results.
+ *
+ * For user responsible of the machine that will host the daemon,
+ * {@link org.nuiton.diswork.daemon.HttpFrontEnd} provides a web-based
+ * interface to monitor the status of the daemon and obtain local and global
+ * statistics about diswork global system.
+ *
+ * {@link org.nuiton.diswork.daemon.ActivityStrategy} interface and its
+ * realizations provides means for the daemon administrator to make the daemon
+ * more or less hardware-resources consumer.
+ *
+ * {@link org.nuiton.diswork.daemon.DisworkDaemonRunner} is a facility class
+ * that permit to run simply a daemon.
+ */
+
+package org.nuiton.diswork.daemon;
\ No newline at end of file
1
0
r102 - in trunk/diswork-daemon/src: main/java/org/nuiton/diswork/daemon test/java/org/nuiton/diswork/daemon
by bleny@users.nuiton.org 13 Jul '10
by bleny@users.nuiton.org 13 Jul '10
13 Jul '10
Author: bleny
Date: 2010-07-13 15:13:18 +0200 (Tue, 13 Jul 2010)
New Revision: 102
Url: http://nuiton.org/repositories/revision/diswork/102
Log:
stats manquantes, documentation
Modified:
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadConfigurationException.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadJobException.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkException.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSystemException.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/LocalFileException.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java
trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java
trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java
trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java 2010-07-09 11:41:12 UTC (rev 101)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java 2010-07-13 13:13:18 UTC (rev 102)
@@ -154,6 +154,7 @@
private static final Log log = LogFactory.getLog(LimitedActivity.class);
+ /** give three load averages, update them every five minutes */
protected class LoadAverageMonitoring extends Thread {
OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
@@ -211,7 +212,7 @@
protected DisworkConfig config;
- protected long timeBeforeNextUpdate = 1000L;
+ protected long timeBeforeNextUpdate = 1000L; // 1 second is the minimum
protected ScheduledActivity(DisworkConfig config) {
this.config = config;
@@ -219,11 +220,26 @@
@Override
public boolean canWork() throws DisworkException {
+ // look in the schedule file if at least one of all the expression
+ // match the current date
Date currentDate = new Date();
boolean result = false;
+
+ // during the reading of the file, we will evaluate the next time
+ // the canWork method will change, we set it to (now + 1 hour)
+ // to begin and we will overwrite it if a preceding date is found
Date nextChange = new Date(System.currentTimeMillis() + 60 * 60 * 1000);
+
+ // browse all cron expressions found in the config file
for (CronExpression pattern : config.getSchedule()) {
+ // canWork is true if current expression matches current date
+ // or if canWork was already true due to a previous expression
result = result || pattern.isSatisfiedBy(currentDate);
+
+ // for the current expression, look for the date when it will
+ // be invalid or valid and update nextChange
+ // Thus, nextChange will be the earliest date when we will have
+ // to check if canWork change
Date aDate = pattern.getNextInvalidTimeAfter(currentDate);
if (aDate.before(nextChange)) {
nextChange = aDate;
@@ -247,13 +263,13 @@
return "scheduled activity";
}
}
-
- /** return true if a job can be run */
+ /**
+ * @return true if a job can be run now
+ */
boolean canWork() throws DisworkException;
-
+
/**
- *
* @return time to wait before next update, -1 is never (wait definitly)
*/
long timeBeforeNextUpdate();
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadConfigurationException.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadConfigurationException.java 2010-07-09 11:41:12 UTC (rev 101)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadConfigurationException.java 2010-07-13 13:13:18 UTC (rev 102)
@@ -1,5 +1,12 @@
package org.nuiton.diswork.daemon;
+/**
+ * This exception occurs when the configuration data are not readable. It may be
+ * do to a mistake in config file. A config file is not found or is not
+ * correct
+ *
+ * @author bleny
+ */
public class BadConfigurationException extends DisworkException {
private static final long serialVersionUID = 1L;
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadJobException.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadJobException.java 2010-07-09 11:41:12 UTC (rev 101)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadJobException.java 2010-07-13 13:13:18 UTC (rev 102)
@@ -1,5 +1,12 @@
package org.nuiton.diswork.daemon;
+/**
+ * This exception occurs when a job or his description is not valid. It may
+ * be the code calling DisworkDaemon as client that may be responsible for
+ * such an error.
+ *
+ * @author bleny
+ */
public class BadJobException extends DisworkException {
private static final long serialVersionUID = 1L;
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java 2010-07-09 11:41:12 UTC (rev 101)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java 2010-07-13 13:13:18 UTC (rev 102)
@@ -139,6 +139,7 @@
setDefaultOption("diswork.total_uptime", "0");
setDefaultOption("diswork.number_of_jobs_done", "0");
setDefaultOption("diswork.number_of_jobs_submitted", "0");
+ setDefaultOption("diswork.worked_time", "0");
}
@@ -162,12 +163,9 @@
/**
* Read the tokens file if one is given in the config and merge the content
- * of this file into {@link #tokens}
- * @throws DisworkSystemException
- * @throws BadConfigurationException
- * @throws LocalFileException
- *
- * @throws DisworkException
+ * of this file into {@link #tokens}
+ * @throws BadConfigurationException if token file doesn't exists
+ * @throws LocalFileException if token file can't be read
*/
protected void initTokens() throws BadConfigurationException, LocalFileException {
tokens = new HashMap<String, String>();
@@ -181,7 +179,8 @@
+ " -Djava.io.tmpdir=%tmp"
;
tokens.put("%java", java);
-
+ tokens.put("%sep", File.separator);
+
File tokensFile = getTokensFile();
if (tokensFile != null) {
try {
@@ -246,8 +245,8 @@
/**
*
* @return null if no path for a file have been specified
- * @throws BadConfigurationException
- * @throws LocalFileException
+ * @throws BadConfigurationException if schedule file doesn't exists
+ * @throws LocalFileException if schedule file can't be read
*/
protected List<CronExpression> getSchedule() throws BadConfigurationException, LocalFileException {
// lazy instanciation of schedule
@@ -347,6 +346,19 @@
return getOptionAsInt("diswork.number_of_jobs_submitted");
}
+ protected void addWorkedTime(Long workedTime) {
+ Long newValue = getWorkedTime() + workedTime;
+ setOption("diswork.worked_time", newValue.toString());
+ save();
+ }
+
+ protected Long getWorkedTime() {
+ Long workedTime = Long.parseLong(getOption("diswork.worked_time"));
+ return workedTime;
+ }
+
+
+
/* ** trivial applicationConfig setters and getters ** */
public String getTempDirectory() {
@@ -417,7 +429,9 @@
setOption("diswork.http_front_end.port", httpFrontendPort.toString());
}
- /** number of seconds to wait between two look for a jobs (seconds) */
+ /** number of seconds to wait between two look for a jobs
+ * @return a number of seconds
+ */
public int getJobLooksWaitTime() {
return getOptionAsInt("diswork.job_looks_wait_time");
}
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java 2010-07-09 11:41:12 UTC (rev 101)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java 2010-07-13 13:13:18 UTC (rev 102)
@@ -24,17 +24,6 @@
*/
package org.nuiton.diswork.daemon;
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.management.ManagementFactory;
-import java.lang.management.OperatingSystemMXBean;
-import java.text.NumberFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
@@ -45,6 +34,12 @@
import org.nuiton.diswork.fs.DisworkFileSystemException.Type;
import org.nuiton.util.FileUtil;
+import java.io.*;
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+import java.text.NumberFormat;
+import java.util.*;
+
/**
* The diswork daemon is the gateway to the global diswork system. Instantiate
* this class creates a new node on the system. The new node can be used
@@ -87,11 +82,9 @@
*
* You can create your application ready for diswork, submit it to the daemon
* and then submit as much jobs as you want that depends of this application.
- * Think an application as the common stuff (programs, scripts, files etc.)
- * all the different job will need. An application is just a set of file.
+ * Think an application contains the common stuff (programs, scripts, files etc.)
+ * all the different jobs will need. An application is just a set of file.
*
- * TODO 20100617 bleny explain it better
- *
* Once the application has been made available, jobs can be created and
* submitted. To do so, a programmer must write a class that
*
@@ -178,6 +171,18 @@
/** in a home directory, the place where the hardware info must be placed */
protected static final String HARDINFO_PATH = "hardinfo";
+
+ /** a keyword that will be put in a job log-file on a single line when the job is started */
+ protected static final String LOG_KEYWORD_STARTED = "STARTED";
+
+ /** a keyword that will be put in a job log-file on a single line when the job is started */
+ protected static final String LOG_KEYWORD_DONE = "DONE";
+
+ /** a keyword that will be put in a job log-file on a single line when the job is failed */
+ protected static final String LOG_KEYWORD_FAILED = "FAILED";
+
+ /** a keyword that will be put in a job log-file on a single line when the job is failed */
+ protected static final String LOG_KEYWORD_FINISHED = "FINISHED";
/** the distributed file system where jobs, data and results are stored */
@@ -212,7 +217,7 @@
initWorkersManager();
- writeHardwareInfos();
+ writeLocalStats();
httpFrontEnd = new HttpFrontEnd(config, this);
@@ -310,18 +315,16 @@
}
}
- protected void writeHardwareInfos() throws DisworkException {
- // writing hardware info to homeDir
+ protected void writeLocalStats() throws DisworkException {
try {
- OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
- String hardinfos = os.getName() + "\n" + os.getArch() + "\n" +
- config.getNumberOfWorkers();
-
- // TODO 20100615 bleny add RAM size and HDD capacities to hardinfos
-
+ String infos = "";
+ Map<String, String> localStats = getLocalStats();
+ for (String key : localStats.keySet()) {
+ infos += key + "\t" + localStats.get(key) + "\n";
+ }
fileSystem.write(homeDir + "/" + HARDINFO_PATH,
- IOUtils.toInputStream(hardinfos));
- log.info("writing hardware infos " + hardinfos.replaceAll("\n", " "));
+ IOUtils.toInputStream(infos));
+ log.info("writing local infos " + infos.replaceAll("\n", " "));
} catch (DisworkFileSystemException e) {
log.error("can't write hardware infos", e);
throw new DisworkException("can't write hardware infos", e);
@@ -532,7 +535,7 @@
+ " that is not available");
}
} else {
- log.info("no dependency specified for " + jobDescription);
+ log.debug("no dependency specified for " + jobDescription);
}
String jobDir = getPathForJob(jobDescription);
@@ -592,15 +595,15 @@
}
public boolean isStarted(JobDescription job) throws DisworkException {
- return checkLogContains(job, "STARTED");
+ return checkLogContains(job, LOG_KEYWORD_STARTED);
}
public boolean isFinished(JobDescription job) throws DisworkException {
- return checkLogContains(job, "FINISHED");
+ return checkLogContains(job, LOG_KEYWORD_FINISHED);
}
public boolean isSuccessful(JobDescription job) throws DisworkException {
- return checkLogContains(job, "DONE");
+ return checkLogContains(job, LOG_KEYWORD_DONE);
}
public boolean isFailed(JobDescription job) throws DisworkException {
@@ -703,6 +706,9 @@
}
public Map<String, String> getLocalStats() throws DisworkException {
+
+ // TODO 20100615 bleny add RAM size and HDD capacities to hardinfos
+
NumberFormat numberFormat = NumberFormat.getInstance();
numberFormat.setMaximumFractionDigits(2);
@@ -712,16 +718,21 @@
result.put("jobs_done", config.getNumberOfJobsDone().toString());
result.put("jobs_submitted", config.getNumberOfJobsSubmitted().toString());
+ result.put("worked_time", config.getWorkedTime().toString());
- if (config.getNumberOfJobsSubmitted() == 0) {
- result.put("jobs_ratio", "∞");
- } else {
+ if (config.getNumberOfJobsSubmitted() != 0) {
Double jobsRatio = (double) (config.getNumberOfJobsDone() / config.getNumberOfJobsSubmitted());
result.put("jobs_ratio", numberFormat.format(jobsRatio));
}
Double karma = ((config.getNumberOfJobsDone() - config.getNumberOfJobsSubmitted()) * getUptimeRatio());
result.put("karma", numberFormat.format(karma));
+
+ OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
+ result.put(os.getName(), "1");
+ result.put(os.getArch(), "1");
+ result.put("workers", config.getNumberOfWorkers().toString());
+
return result;
}
@@ -731,8 +742,7 @@
protected Map<String, String> updateGlobalStats() throws DisworkException {
try {
log.info("global stats file doesn't exists, creating one");
-
- Long currentTime = System.currentTimeMillis();
+
Map<String, Long> stats = new HashMap<String, Long>();
Long availableProcessors = 0L;
Map<String, String> result = new HashMap<String, String>();
@@ -741,27 +751,26 @@
for (String homeDir : homeDirs) {
String hardInfoPath = HOME + "/" + homeDir + "/" + HARDINFO_PATH;
if (fileSystem.exists(hardInfoPath)) {
- String hardInfoContent = IOUtils.toString(
- fileSystem.read(hardInfoPath));
- String[] infos = hardInfoContent.split("\n");
-
- // first line, reading the OS name
- if (!stats.containsKey(infos[0])) {
- stats.put(infos[0], 0L);
+ BufferedReader in = new BufferedReader(
+ new InputStreamReader(
+ fileSystem.read(hardInfoPath)));
+ String line;
+ while ((line = in.readLine()) != null) {
+ if (!line.equals("")) {
+ log.debug("reading line" + line);
+ String[] keyValue = line.split("\t");
+ String key = keyValue[0];
+ Long value = Long.parseLong(keyValue[1]);
+ if (!stats.containsKey(key)) {
+ stats.put(key, 0L);
+ }
+ stats.put(key, stats.get(key) + value);
+ }
}
- stats.put(infos[0], stats.get(infos[0]) + 1);
-
- // second line, reading the architecture
- if (!stats.containsKey(infos[1])) {
- stats.put(infos[1], 0L);
- }
- stats.put(infos[1], stats.get(infos[1]) + 1);
-
- // third line, reading the number of processors
- availableProcessors += Integer.parseInt(infos[2]);
}
}
- stats.put("available_processors", availableProcessors);
+
+ Long currentTime = System.currentTimeMillis();
stats.put("date", currentTime);
// write the result
@@ -800,12 +809,16 @@
Map<String, String> result = new HashMap<String, String>();
if (fileSystem.exists(GLOBAL_STATS_PATH)) {
- String globalStats = IOUtils.toString(fileSystem.read(GLOBAL_STATS_PATH));
- log.debug("global stats file found, reading " + globalStats);
- String[] lines = globalStats.split("\n");
- for (String line : lines) {
- String[] keyValue = line.split("\t");
- result.put(keyValue[0], keyValue[1]);
+ log.debug("global stats file found, reading");
+ BufferedReader globalStats = new BufferedReader(
+ new InputStreamReader(
+ fileSystem.read(GLOBAL_STATS_PATH)));
+ String line;
+ while ((line = globalStats.readLine()) != null) {
+ if (!line.equals("")) {
+ String[] keyValue = line.split("\t");
+ result.put(keyValue[0], keyValue[1]);
+ }
}
// delete the file if it's too old
@@ -831,7 +844,7 @@
public void activeNoActivityStrategy() throws DisworkException {
if (workers == null) {
- log.warn("trying to change activy while working is disabled");
+ log.warn("trying to change activity while working is disabled");
} else {
workers.activeNoActivityStrategy();
}
@@ -839,7 +852,7 @@
public void activeUnlimitedActivityStrategy() throws DisworkException {
if (workers == null) {
- log.warn("trying to change activy while working is disabled");
+ log.warn("trying to change activity while working is disabled");
} else {
workers.activeUnlimitedActivityStrategy();
}
@@ -847,7 +860,7 @@
public void activeLimitedActivityStrategy() throws DisworkException {
if (workers == null) {
- log.warn("trying to change activy while working is disabled");
+ log.warn("trying to change activity while working is disabled");
} else {
workers.activeLimitedActivityStrategy();
}
@@ -855,7 +868,7 @@
public void activeScheduledActivityStrategy() throws DisworkException {
if (workers == null) {
- log.warn("trying to change activy while working is disabled");
+ log.warn("trying to change activity while working is disabled");
} else {
workers.activeScheduledActivityStrategy();
}
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java 2010-07-09 11:41:12 UTC (rev 101)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemonRunner.java 2010-07-13 13:13:18 UTC (rev 102)
@@ -36,8 +36,20 @@
*
* This class can be used to run a diswork daemon as a service on the OS.
* Thus, it will be started when the machine boot and stopped just before the
- * computer is shut down.
+ * computer is shut down. To install diswork as a daemon, see commons-daemon
+ * documentation.
*
+ * Usage is
+ *
+ * <code>
+ * java DisworkDaemonRunner [bootstrap_ip bootstrap_port]
+ * </code>
+ *
+ * if no parameter provided, diswork will start and boot alone. If ip and port
+ * are provided, diswork will connect to this other node.
+ *
+ * @link http://commons.apache.org/daemon/
+ *
* @author bleny
*/
public class DisworkDaemonRunner implements Daemon {
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkException.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkException.java 2010-07-09 11:41:12 UTC (rev 101)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkException.java 2010-07-13 13:13:18 UTC (rev 102)
@@ -25,6 +25,7 @@
package org.nuiton.diswork.daemon;
/**
+ * Parent class for all exceptions diswork can raise.
*
* @author bleny
*/
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSystemException.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSystemException.java 2010-07-09 11:41:12 UTC (rev 101)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSystemException.java 2010-07-13 13:13:18 UTC (rev 102)
@@ -1,5 +1,12 @@
package org.nuiton.diswork.daemon;
+/**
+ * This exception is raised when an error occurred in the inner structure of
+ * diswork. It may be due to a network failure, or to corrupted data into
+ * the File System.
+ *
+ * @author bleny
+ */
public class DisworkSystemException extends DisworkException {
private static final long serialVersionUID = 1L;
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java 2010-07-09 11:41:12 UTC (rev 101)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java 2010-07-13 13:13:18 UTC (rev 102)
@@ -15,14 +15,25 @@
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
+/**
+ * The HttpFrontEnd boot an embedded http server on same machine of the daemon.
+ *
+ * A user can open a web-browser to read the single page that is available.
+ * This page shows statistics and status of the different daemon components.
+ *
+ * @author bleny
+ */
public class HttpFrontEnd {
private static final Log log = LogFactory.getLog(HttpFrontEnd.class);
+ /** config provides directive about the http server */
protected DisworkConfig config;
+ /** embedded http server */
protected Server server;
+ /** daemon will permit to retrieve stats */
protected DisworkDaemon daemon;
public HttpFrontEnd(DisworkConfig config, DisworkDaemon daemon)
@@ -35,13 +46,15 @@
}
}
+ /** lazy instanciation of {@link #server} */
protected void initServer() {
log.info("web server use port " + config.getHttpFrontendPort());
server = new Server(config.getHttpFrontendPort());
Context root = new Context(server, "/", Context.NO_SESSIONS);
root.addServlet(new ServletHolder(new MainServlet()), "/");
}
-
+
+ /** start the server, the page will no longer be available for web-browser */
public void start() throws DisworkException {
if (server == null) {
initServer();
@@ -55,6 +68,7 @@
}
}
+ /** stop the server, the page will no longer be available for web-browser */
public void stop() throws DisworkException {
log.info("stopping web front-end");
try {
@@ -67,6 +81,7 @@
}
}
+ /** this servlet send a static html page that shows current status and stats */
public class MainServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
@@ -76,7 +91,8 @@
log.info("page request");
- String pageContent = "<html>\n"
+ String pageContent = "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.01 Transitional//EN\">\n"
+ + "<html>\n"
+ "<h1>Diswork Node</h1>" + "\n\n"
+ "<h2>Submitted jobs</h2>" + "\n\n"
+ "<table>" + "\n"
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java 2010-07-09 11:41:12 UTC (rev 101)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java 2010-07-13 13:13:18 UTC (rev 102)
@@ -57,6 +57,9 @@
* <dt>%tmp</dt>
* <dd>will be replaced by the the path to dir where the job
* is executed, it's nice to use this to set a temp directory</dd>
+ * <dt>%sep</dt>
+ * <dd>will be replaced by the file separator (ie "/" under Linux,
+ * "\" under Windows)</dd>
* </dl>
*
* This class provides methods to read and parse those data to an XML file
@@ -133,6 +136,19 @@
return commandLine;
}
+ /** set the command line to run for this job
+ * command line is intended as a normal shell command line like
+ *
+ * program arg1 arg2 arg3
+ *
+ * At all place of the command-line, tokens may be used. They will be
+ * replaced at runtime. Tokens, like "%java" or "%tmp" are described
+ * in class documentation. The worker will parse the command-line to
+ * replace token with something suitable to his environment.
+ *
+ * @link {@link DisworkConfig#parseCommandLine(String, String)}
+ * @param commandLine a string that may contain tokens
+ */
public void setCommandLine(String commandLine) {
this.commandLine = commandLine;
}
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/LocalFileException.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/LocalFileException.java 2010-07-09 11:41:12 UTC (rev 101)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/LocalFileException.java 2010-07-13 13:13:18 UTC (rev 102)
@@ -1,5 +1,12 @@
package org.nuiton.diswork.daemon;
+/**
+ * A raise of this exception is due to an error on the local file system.
+ * Maybe it's not writable (rights ? not enough space ?) or readable by
+ * the daemon.
+ *
+ * @author bleny
+ */
public class LocalFileException extends DisworkException {
private static final long serialVersionUID = 1L;
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-07-09 11:41:12 UTC (rev 101)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-07-13 13:13:18 UTC (rev 102)
@@ -24,24 +24,6 @@
*/
package org.nuiton.diswork.daemon;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
@@ -53,6 +35,10 @@
import org.nuiton.util.StringUtil;
import org.nuiton.util.ZipUtil;
+import java.io.*;
+import java.net.URL;
+import java.util.*;
+
/**
* The workers-manager aims to run and manage the different workers. A worker
* is a thread that try to find a jobs and execute them.
@@ -120,9 +106,12 @@
protected final Object sem = new Object();
protected void updateFlag() throws DisworkException {
- synchronized (sem) {
- flag = activityStrategy.canWork();
- sem.notifyAll();
+ flag = activityStrategy.canWork();
+ if (flag) {
+ synchronized (sem) {
+ // workers can work, wake up sleeping workers
+ sem.notifyAll();
+ }
}
}
@@ -143,11 +132,48 @@
* to try it and move it the same may. If the job is failed three times,
* it's moved to {@link DisworkDaemon#FAILED_3} meaning it will not been
* tried again.
+ *
+ * Here is what a worker do all life long :
+ * <ol>
+ * <li>First, look at flag value (using {@link #getFlag()}), if true, try
+ * to find a job, if false, sleep until the flag change</li>
+ * <li>Try to find a job ({@link #findAJob()}), if no job found, wait few
+ * seconds and restart from the beginning.</li>
+ * <li>If a job is found, mark it as started
+ * ({@link #jobIsStarted(String)})</li>
+ * <li>create a temp directory specific for the job</li>
+ * <li>download the application for the job
+ * ({@link #downloadApplication()}) and unzip it in the job dir.
+ * Download way use a cache to prevent downloading multiple time the
+ * same application (cache is in {@link #getApplicationData})</li>
+ * <li>stage input files : get all input files needed for the job from
+ * diswork FS or from the provided URL. Download everything to the job
+ * dir ({@link #stageInputFiles()})</li>
+ * <li>do a last check of {@link #shouldStop} before running the job, if
+ * shouldStop is true, give up the job using
+ * {@link #jobIsInterrupted(String)}</li>
+ * <li>if shouldStop is false, run the process
+ * ({@link #prepareAndRunJob()}). It implies to start a Thread that
+ * constantly read the output of the sub process
+ * ({@link Worker.OutputReader}) and to put data in the input file on
+ * the standard input of the process</li>
+ * <li>wait for the process to end. It may be interrupted due to a query
+ * to the worker manager to stop all worker. If the process is
+ * interrupted is, mark it as interrupted to put it back to the
+ * proposed jobs ({@link #jobIsInterrupted(String)})</li>
+ * <li>When the process ends, whatever the exitValue, upload the results
+ * ({@link #stageOutputFiles}}.</li>
+ * <li>look at the exit value. If the job is a success, mark it
+ * ({@link #jobIsSuccessful(String)}, if it failed, mark it so it will
+ * be proposed to others nodes ({@link #jobIsFailed(String)}</li>
+ * </ol>
*
* @author bleny
*/
protected class Worker extends Thread {
+ private final Log log = LogFactory.getLog(Worker.class);
+
// TODO 20100614 bleny make it configurable
/** after this time (ms), a job is considered as no longer running */
protected static final long MAX_JOB_RUNNING_TIME = 24 * 60 * 60 * 1000;
@@ -167,6 +193,9 @@
/** current process, null if nothing is running */
protected Process currentProcess;
+ /** the date when the currentProcess started to be executed */
+ protected Long currentProcessStartDate;
+
/** read the standard output of the subprocess
*
* By reading the standard output, this thread has multiple goals :
@@ -208,10 +237,10 @@
outputStreamWriter = new OutputStreamWriter(outputFile);
wr = new BufferedWriter(outputStreamWriter);
}
-
- String line = null;
+
try {
// read a line from standard output
+ String line;
while ((line = br.readLine()) != null) {
// copy this line to output file
if (wr != null) {
@@ -219,8 +248,8 @@
}
// add reading line to logging output
- if (log.isInfoEnabled()) {
- log.debug(this.toString() + ">" + line);
+ if (log.isTraceEnabled()) {
+ log.trace(this.toString() + ">" + line);
}
}
} catch (IOException e) {
@@ -244,10 +273,9 @@
/**
* this method add a line to a job-specific log
* @param jobPath the path to the job concerned
- * @param message the line to add the the log
- * @throws DisworkFileSystemException if an error occurred while writing
- * the log
- * @throws IOException
+ * @param messages the line(s) to add to the log
+ * @throws DisworkSystemException if an error occurred while writing
+ * the log
*/
protected void log(String jobPath, String... messages)
throws DisworkSystemException {
@@ -348,8 +376,6 @@
log.error("unable to run process for job" + currentJob, e);
throw new LocalFileException("unable to run process for job" + currentJob, e);
}
-
- // catch (Throwable processError) {}
// start a thread to constantly read on the standard output
String standardOutputFileName = currentJob.getStandardOutput();
@@ -435,16 +461,14 @@
* the job, wait for it to end, write all the results. Mark the job
* as running at the beginning and move it to DONE or FAILED at
* the end, depending of the results
- * @param currentJobPath
- * @return
- * @throws BadJobException
- * @throws DisworkFileSystemException
- * @throws IOException
* @throws DisworkException
*/
protected void runJob() throws DisworkException {
try {
+
+ currentProcessStartDate = System.currentTimeMillis();
log.info("running job at " + currentJobPath);
+
try {
String jsdlPath = currentJobPath + "/" + DisworkDaemon.JSDL_PATH;
String jsdl = IOUtils.toString(fileSystem.read(jsdlPath));
@@ -478,51 +502,32 @@
// stop, last check of shouldStrop
if (!shouldStop) {
prepareAndRunJob();
-
- // wait for the process to return
+
jobIsStarted(currentJobPath);
- boolean processFinished = false;
- while (!processFinished) {
- Integer exitValue; // exitValue if the process is
- // finished, null if the process
- // is not finished
- try {
- exitValue = currentProcess.exitValue();
- processFinished = true;
- } catch (IllegalThreadStateException e) {
- // process is not finished
- exitValue = null;
- }
-
- if (exitValue == null) {
- if (shouldStop) {
- jobIsInterrupted(currentJobPath);
- processFinished = true;
- } else {
- try {
- Thread.sleep(10 * 1000);
- } catch (InterruptedException e) {
- log.error("worker interupted", e);
- throw new DisworkException("worker interupted", e);
- }
- }
+
+ try {
+ // wait for the process to return
+ int returnValue = currentProcess.waitFor();
+ stageOutputFiles();
+ if (returnValue == 0) {
+ // job is successful
+ jobIsSuccessful(currentJobPath);
} else {
- stageOutputFiles();
- // job is finished
- if (exitValue == 0) {
- // job is successful
- jobIsSuccessful(currentJobPath);
- } else {
- jobIsFailed(currentJobPath);
- }
+ jobIsFailed(currentJobPath);
}
+ } catch (InterruptedException e) {
+ log.debug("process was interrupted", e);
+ jobIsInterrupted(currentJobPath);
}
- } else {
- jobIsInterrupted(currentJobPath);
}
} catch (BadJobException e) {
jobIsFailed(currentJobPath);
} finally {
+ if (currentProcessStartDate != null) {
+ Long currentTime = System.currentTimeMillis();
+ config.addWorkedTime(currentTime - currentProcessStartDate);
+ currentProcessStartDate = null;
+ }
currentJob = null;
currentProcess = null;
// clean up the job directory
@@ -546,96 +551,128 @@
return jobsNames.get(0);
}
}
-
+
/**
- * try to find a job, if found, take it and return the path
- * @return the path to the job, null if no job found
- * @throws DisworkSystemException
+ * browse all running directories. If a jobs is too old, it is considered
+ * as interrupted and moved back. All obsolete jobs found are moved to
+ * be available again.
+ * @return true if an obsolete job has been found
+ * @throws DisworkSystemException
*/
- protected String findAJob() throws DisworkSystemException {
- try {
- // Once a job is found, those two var will be set
- String jobLinkDir = null;
- String jobLinkName = null;
-
- // use a synchronized block because multiple workers
- // may try to take a same job
- synchronized (fileSystem) {
-
- // fist, try to find a job declared has running since
- // too long to re-run it
- String[] runningJobsDirs = { DisworkDaemon.FAILED_2_RUNNING,
- DisworkDaemon.FAILED_1_RUNNING,
- DisworkDaemon.TODO_RUNNING
- };
- // browsing all "running" dirs
- for (String path : runningJobsDirs) {
- String oldName = getFistJobName(path);
- if (oldName != null) {
+ protected boolean checkInteruptedJobs() throws DisworkSystemException {
+ // use a synchronized block because multiple workers
+ // may try to do concurrent move
+ boolean result = false;
+ synchronized (fileSystem) {
+ String[] runningJobsDirs = { DisworkDaemon.FAILED_2_RUNNING,
+ DisworkDaemon.FAILED_1_RUNNING,
+ DisworkDaemon.TODO_RUNNING
+ };
+ for (String path : runningJobsDirs) {
+ try {
+ List<String> jobsNames = fileSystem.readDirectory(path);
+ Collections.sort(jobsNames);
+ Iterator<String> it = jobsNames.iterator();
+ boolean obsoleteJobFound = true;
+ while (obsoleteJobFound && it.hasNext()) {
+ String jobName = it.next();
Long linkAge = System.currentTimeMillis()
- - Long.parseLong(oldName);
+ - Long.parseLong(jobName);
// check is oldest job is too old and should be
// considered has to-be-rerun
if (linkAge > MAX_JOB_RUNNING_TIME) {
- log.info("taking old job (age = " + linkAge + ")");
- jobLinkDir = path;
- jobLinkName = oldName;
- // FIXME 20100617 bleny break s*cks
- break;
+ String jobPath = path + "/" + jobName;
+ String newJobPath = INTERRUPTED_MOVE.get(path) + "/" + jobName;
+ try {
+ fileSystem.move(jobPath, newJobPath);
+ result = true;
+ } catch (DisworkFileSystemException e) {
+ log.debug("failed at moving" + jobPath);
+ // ignore, another node is moving it
+ // FIXME 20100712 bleny catch the exact exception
+ }
+ } else {
+ obsoleteJobFound = false;
}
}
+ } catch (DisworkFileSystemException e) {
+ log.warn("unable to read jobs directory", e);
+ throw new DisworkSystemException("unable to read jobs directory", e);
}
-
- // if no job was found, search now in not running jobs
- if (jobLinkDir == null) {
- String[] jobsDirs = { DisworkDaemon.FAILED_2,
- DisworkDaemon.FAILED_1,
- DisworkDaemon.TODO
- };
- for (String path : jobsDirs) {
- String oldName = getFistJobName(path);
- if (oldName != null) { // take it
- jobLinkDir = path;
- jobLinkName = oldName;
- // FIXME 20100617 bleny break s*cks
- break;
- }
- }
+ }
+ }
+ return result;
+ }
+
+ protected String findAJobInDirectory(String dirPath) throws DisworkSystemException {
+ // use a synchronized block because multiple workers
+ // may try to take a same job
+ synchronized (fileSystem) {
+ List<String> jobsNames = null;
+ try {
+ jobsNames = fileSystem.readDirectory(dirPath);
+ Collections.sort(jobsNames);
+ } catch (DisworkFileSystemException e) {
+ log.warn("unable to read jobs directory", e);
+ throw new DisworkSystemException("unable to read jobs directory", e);
+ }
+ log.debug(jobsNames.size() + " jobs found at " + dirPath);
+ String result = null;
+ Iterator<String> it = jobsNames.iterator();
+ while (result == null && it.hasNext()) {
+ String jobPath = dirPath + "/" + it.next();
+ String newJobPath = RUNNING_MOVE.get(dirPath) + "/" + DisworkDaemon.newJobLinkName();
+ try {
+ log.debug("job found at " + jobPath + ". moving it to " + newJobPath);
+ fileSystem.move(jobPath, newJobPath);
+ result = newJobPath;
+ } catch (DisworkFileSystemException e) {
+ log.debug("failed at moving" + jobPath);
+ // ignore, another node taking it
+ // FIXME 20100712 bleny catch the exact exception
}
-
- if (jobLinkDir != null) {
- // move the link before running the job
- String oldPath = jobLinkDir + "/" + jobLinkName;
- log.info("job found at " + oldPath);
-
- jobLinkDir = RUNNING_MOVE.get(jobLinkDir);
- jobLinkName = DisworkDaemon.newJobLinkName();
- String newPath = jobLinkDir + "/" + jobLinkName;
-
- log.info("moving " + oldPath + " to " + newPath);
- fileSystem.move(oldPath, newPath);
-
- }
}
+ return result;
+ }
+ }
+
+ /**
+ * try to find a job, if found, take it and return the path
+ * @return the path to the job, null if no job found
+ * @throws DisworkSystemException
+ */
+ protected String findAJob() throws DisworkSystemException {
+
+ List<String> jobsDirs = new ArrayList<String>();
+ jobsDirs.add(DisworkDaemon.FAILED_2);
+ jobsDirs.add(DisworkDaemon.FAILED_1);
+ jobsDirs.add(DisworkDaemon.TODO);
+
+ Iterator<String> it = jobsDirs.iterator();
+ String result = null;
+ while (result == null && it.hasNext()) {
+ String jobDir = it.next();
+ result = findAJobInDirectory(jobDir);
+ }
- String jobPath = null;
+ if (result == null) {
// now, if no job was found
- if (jobLinkDir != null) {
- jobPath = jobLinkDir + "/" + jobLinkName;
+ boolean checkResult = checkInteruptedJobs();
+ if (checkResult) {
+ // obsolete jobs are now available, retry
+ return findAJob();
} else {
+ // try again later
try {
- log.info("look for a job was unsucessful, will wait " + config.getJobLooksWaitTime() + " seconds before next try");
+ log.info("look for a job was unsuccessful, will wait " + config.getJobLooksWaitTime() + " seconds before next try");
Thread.sleep(config.getJobLooksWaitTime() * 1000);
} catch (InterruptedException e) {
log.error("worker interrupted while waiting before trying to find a new job", e);
throw new DisworkSystemException("worker interrupted while waiting before trying to find a new job", e);
}
}
- return jobPath;
- } catch (DisworkFileSystemException e) {
- log.error("error while trying to find a job", e);
- throw new DisworkSystemException("error while trying to find a job", e);
}
+ return result;
}
/** update the log of the job
@@ -652,7 +689,8 @@
throw new DisworkSystemException("error while moving job link", e);
}
log.info("marking " + newJobPath + " as done and finished");
- log(newJobPath, "DONE", "FINISHED");
+ log(newJobPath, DisworkDaemon.LOG_KEYWORD_DONE,
+ DisworkDaemon.LOG_KEYWORD_FINISHED);
config.addOneJobDone();
}
@@ -675,10 +713,11 @@
if (newDir.equals(DisworkDaemon.FAILED_3)) {
log.info("marking " + newJobPath + " as failed and finished");
- log(newJobPath, "FAILED", "FINISHED");
+ log(newJobPath, DisworkDaemon.LOG_KEYWORD_FAILED,
+ DisworkDaemon.LOG_KEYWORD_FINISHED);
} else {
log.info("marking " + newJobPath + " as failed");
- log(newJobPath, "FAILED");
+ log(newJobPath, DisworkDaemon.LOG_KEYWORD_FAILED);
}
}
@@ -703,7 +742,7 @@
* @link {@link DisworkDaemon#isStarted(JobDescription)}
*/
protected void jobIsStarted(String jobPath) throws DisworkSystemException {
- log(jobPath, "STARTED");
+ log(jobPath, DisworkDaemon.LOG_KEYWORD_STARTED);
}
/**
@@ -711,8 +750,9 @@
*/
@Override
public void run() {
- try {
- while (! shouldStop) {
+ // we want the worker to continue working whatever occurs
+ while (! shouldStop) {
+ try {
synchronized (sem) {
if (getFlag()) {
currentJobPath = findAJob();
@@ -731,14 +771,15 @@
sem.wait(waitTime);
}
} catch (InterruptedException e) {
- log.error("interrupted while waiting for a change of activity", e);
- throw new DisworkSystemException("interrupted while waiting for a change of activity", e);
+ log.warn("interrupted while waiting for a change of activity", e);
+ // throw new DisworkSystemException("interrupted while waiting for a change of activity", e);
}
}
}
+ } catch (DisworkException e) {
+ log.warn("exception caught by worker", e);
+ // throw new RuntimeException("an error occured", e);
}
- } catch (DisworkException e) {
- throw new RuntimeException("an error occured", e);
}
}
@@ -763,7 +804,7 @@
log.info("will start " + config.getNumberOfWorkers() + " workers");
for (int i = 1 ; i <= config.getNumberOfWorkers() ; i++) {
Worker worker = new Worker();
- worker.setName("disworker-" + i);
+ // worker.setName("disworker-" + i);
worker.start();
workers.add(worker);
}
@@ -775,6 +816,9 @@
protected File getApplicationData(String applicationName,
String applicationVersion)
throws DisworkSystemException, LocalFileException {
+ if (!applicationCache.exists()) {
+ applicationCache.mkdirs();
+ }
File cachedApplicationData = new File(applicationCache,
applicationName + "-" + applicationVersion + ".zip");
if (!cachedApplicationData.exists()) {
@@ -821,15 +865,26 @@
}
activeNoActivityStrategy();
-
- FileUtil.deleteRecursively(applicationCache);
-
+
+ for (Worker worker : workers) {
+ if (worker.currentProcess != null) {
+ log.debug("killing " + worker + " process");
+ worker.currentProcess.destroy();
+ }
+ }
+
+ if (applicationCache.exists()) {
+ FileUtil.deleteRecursively(applicationCache);
+ }
+
// waiting for them to actually have finished
for (Worker worker : workers) {
while (worker.isAlive()) {
try {
// worker may be sleeping
- activeNoActivityStrategy();
+ synchronized (sem) {
+ sem.notifyAll();
+ }
log.debug("waiting for " + worker + " to return");
Thread.sleep(1000);
} catch (InterruptedException e) {
@@ -838,6 +893,8 @@
}
}
}
+
+ log.debug("all workers stopped");
}
public ActivityStrategy getActivityStrategy() {
@@ -847,7 +904,7 @@
public void setActivityStrategy(ActivityStrategy activityStrategy)
throws DisworkException {
this.activityStrategy = activityStrategy;
- log.info("swithching to " + activityStrategy);
+ log.info("switching to " + activityStrategy);
updateFlag();
}
Modified: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java
===================================================================
--- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java 2010-07-09 11:41:12 UTC (rev 101)
+++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java 2010-07-13 13:13:18 UTC (rev 102)
@@ -1,6 +1,15 @@
package org.nuiton.diswork.daemon;
-
+/**
+ * This test is the same as {@link DisworkDaemonTest} except that
+ * the single node has more than one worker.
+ *
+ * This tests shows that having multiple workers should not raise
+ * any problem like throwing concurrent exceptions, illegal thread
+ * state exception etc.
+ *
+ * @author bleny
+ */
public class DisworkDaemonConcurrencyTest extends DisworkDaemonTest {
@Override
Modified: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java
===================================================================
--- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java 2010-07-09 11:41:12 UTC (rev 101)
+++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java 2010-07-13 13:13:18 UTC (rev 102)
@@ -2,13 +2,21 @@
import org.nuiton.diswork.fs.DisworkFileSystemConfig;
-
+/**
+ * This test is the same as {@link DisworkDaemonTest} except that
+ * there is no one but two nodes. One node has no workers and submit
+ * the jobs test after tests, the other node is configured to do
+ * the jobs.
+ *
+ * @author bleny
+ */
public class DisworkDaemonMultipleNodesTest extends DisworkDaemonTest {
@Override
protected void setConfigs() {
config = newConfig();
- config.setActivityStrategy("none");
+ config.setNumberOfWorkers(0);
+ // config.setActivityStrategy("none");
config2 = newConfig();
config2.setBootstrapIp(DisworkFileSystemConfig.getIp());
Modified: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java
===================================================================
--- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java 2010-07-09 11:41:12 UTC (rev 101)
+++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java 2010-07-13 13:13:18 UTC (rev 102)
@@ -1,17 +1,17 @@
package org.nuiton.diswork.daemon;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import org.apache.commons.io.IOUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
import java.io.InputStream;
import java.net.URL;
import java.util.List;
import java.util.Map;
-import org.apache.commons.io.IOUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class DisworkDaemonTest {
@@ -21,14 +21,14 @@
protected DisworkDaemon daemon;
protected DisworkDaemon daemon2;
- protected static int port = 45500;
+ protected static int port = 3200;
/** a factory method to ease the creation of configs */
protected DisworkConfig newConfig() {
DisworkConfig config = new DisworkConfig();
port += 1;
config.setUsedPort(port);
- config.setJobLooksWaitTime(1);
+ config.setJobLooksWaitTime(5);
// useless in tests
config.setStartHttpFrontend(false);
@@ -55,10 +55,14 @@
}
@After
- public void tearDown() throws Exception {
- daemon.close();
- if (daemon2 != null) {
- daemon2.close();
+ public void tearDown() {
+ try {
+ daemon.close();
+ if (daemon2 != null) {
+ daemon2.close();
+ }
+ } catch (Exception e) {
+ // close raise errors due to DHT doesn't manage peer leaving
}
}
@@ -171,23 +175,19 @@
/**
* tests the stats given by the daemon
- * @throws Exception
*/
@Test
public void testStats() throws Exception {
- daemon.getUptimeRatio();
+ Map<String, String> stats = daemon.getLocalStats();
- Map<String, String> stats = daemon.getGlobalStats();
- // deamon should read 3 stats : 1 OS, 1 architecture, the number
- // of processors and the date when the stats was computed
- assertEquals(4, stats.size());
+ // deamon should read 10 stats
+ stats = daemon.getGlobalStats();
+ assertEquals(10, stats.size());
// a second read should return the same data, without re generate them
// check logs
stats = daemon.getGlobalStats();
- // deamon should read 4 stats : 1 OS, 1 architecture and the number
- // of processors
- assertEquals(4, stats.size());
+ assertEquals(10, stats.size());
}
@Test
1
0
r101 - in trunk: diswork-daemon/src/main/java/org/nuiton/diswork/daemon diswork-daemon/src/test/java/org/nuiton/diswork/daemon diswork-fs/src/main/java/org/nuiton/diswork/fs
by bleny@users.nuiton.org 09 Jul '10
by bleny@users.nuiton.org 09 Jul '10
09 Jul '10
Author: bleny
Date: 2010-07-09 13:41:12 +0200 (Fri, 09 Jul 2010)
New Revision: 101
Url: http://nuiton.org/repositories/revision/diswork/101
Log:
stats, gestion des exceptions dans le d?\195?\169mon
Added:
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadConfigurationException.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadJobException.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSystemException.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/LocalFileException.java
Modified:
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSimpleClient.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java
trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java
trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java
trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java
trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/JobDescriptionTest.java
trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java 2010-07-07 16:23:54 UTC (rev 100)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java 2010-07-09 11:41:12 UTC (rev 101)
@@ -112,6 +112,7 @@
/** use this strategy to never run a job */
public static class NoActivity implements ActivityStrategy {
+
@Override
public boolean canWork() {
return false;
Added: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadConfigurationException.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadConfigurationException.java (rev 0)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadConfigurationException.java 2010-07-09 11:41:12 UTC (rev 101)
@@ -0,0 +1,19 @@
+package org.nuiton.diswork.daemon;
+
+public class BadConfigurationException extends DisworkException {
+
+ private static final long serialVersionUID = 1L;
+
+ public BadConfigurationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public BadConfigurationException(String message) {
+ super(message);
+ }
+
+ public BadConfigurationException(Throwable cause) {
+ super(cause);
+ }
+
+}
Added: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadJobException.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadJobException.java (rev 0)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/BadJobException.java 2010-07-09 11:41:12 UTC (rev 101)
@@ -0,0 +1,19 @@
+package org.nuiton.diswork.daemon;
+
+public class BadJobException extends DisworkException {
+
+ private static final long serialVersionUID = 1L;
+
+ public BadJobException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public BadJobException(String message) {
+ super(message);
+ }
+
+ public BadJobException(Throwable cause) {
+ super(cause);
+ }
+
+}
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java 2010-07-07 16:23:54 UTC (rev 100)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java 2010-07-09 11:41:12 UTC (rev 101)
@@ -78,6 +78,8 @@
* <dt>diswork.tokens_file</dt>
* <dd>a path to a file containing token-names and the replacement strings
* this is a property file</dd>
+ * <dt>diswork.job_looks_wait_time</dt>
+ * <dd>the number of seconds to wait between two looks for a job.</dd>
* </dl>
*
* For the http front-end :
@@ -122,6 +124,7 @@
+ File.separator + "diswork");
setDefaultOption("diswork.activity_strategy", "unlimited");
+ setDefaultOption("diswork.job_looks_wait_time", "60"); // 1 minute
setOption("diswork.http_front_end.start", "true");
setOption("diswork.http_front_end.port", "8080");
@@ -160,10 +163,13 @@
/**
* Read the tokens file if one is given in the config and merge the content
* of this file into {@link #tokens}
+ * @throws DisworkSystemException
+ * @throws BadConfigurationException
+ * @throws LocalFileException
*
* @throws DisworkException
*/
- protected void initTokens() throws DisworkException {
+ protected void initTokens() throws BadConfigurationException, LocalFileException {
tokens = new HashMap<String, String>();
String java = // full java path
@@ -193,11 +199,11 @@
}
} catch (FileNotFoundException e) {
log.warn("tokens file not found, 0 tokens loaded", e);
- throw new DisworkException("tokens file not found,"
+ throw new BadConfigurationException("tokens file not found,"
+ "no token loaded", e);
} catch (IOException e) {
log.error("can't read tokens file", e);
- throw new DisworkException("can't read tokens file", e);
+ throw new LocalFileException("can't read tokens file", e);
}
}
}
@@ -218,7 +224,7 @@
}
protected String parseCommandLine(String commandLine, String tempDir)
- throws DisworkException {
+ throws BadConfigurationException, LocalFileException {
if (tokens == null) {
initTokens();
}
@@ -240,9 +246,10 @@
/**
*
* @return null if no path for a file have been specified
- * @throws DisworkException
+ * @throws BadConfigurationException
+ * @throws LocalFileException
*/
- protected List<CronExpression> getSchedule() throws DisworkException {
+ protected List<CronExpression> getSchedule() throws BadConfigurationException, LocalFileException {
// lazy instanciation of schedule
if (schedule == null) {
String path = getOption("diswork.schedule_file");
@@ -263,12 +270,14 @@
}
}
} catch (FileNotFoundException e) {
- log.error(e);
- throw new DisworkException(e);
+ log.error("schedule file doesn't exists", e);
+ throw new BadConfigurationException("schedule file doesn't exists", e);
} catch (IOException e) {
log.error(e);
- throw new DisworkException(e);
+ throw new LocalFileException("can't read schedule file", e);
}
+ } else {
+ throw new BadConfigurationException("schedule file has not been specified");
}
}
return schedule;
@@ -407,4 +416,13 @@
public void setHttpFrontendPort(Integer httpFrontendPort) {
setOption("diswork.http_front_end.port", httpFrontendPort.toString());
}
+
+ /** number of seconds to wait between two look for a jobs (seconds) */
+ public int getJobLooksWaitTime() {
+ return getOptionAsInt("diswork.job_looks_wait_time");
+ }
+
+ public void setJobLooksWaitTime(Integer seconds) {
+ setOption("diswork.job_looks_wait_time", seconds.toString());
+ }
}
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java 2010-07-07 16:23:54 UTC (rev 100)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java 2010-07-09 11:41:12 UTC (rev 101)
@@ -493,13 +493,15 @@
public void submitJob(JobDescription jobDescription) throws DisworkException {
- if (jobDescription.getInputData().size() + jobDescription.getInputUrls().size()
- < jobDescription.getInput().size()) {
- // dependencies are missing
+ // check all dependencies are provided
+ for (String name : jobDescription.getInput()) {
+ if (!jobDescription.getInputData().containsKey(name) &&
+ !jobDescription.getInputUrls().containsKey(name)) {
+ throw new BadJobException("dependency " + name + " is missing");
+ }
}
-
+
try {
-
// trying to put the job in a new directory of home
Random random = new Random();
boolean alreadyExists = true;
@@ -510,8 +512,9 @@
alreadyExists = fileSystem.exists(getPathForJob(newJobIntendifier));
}
+ // XXX side-effect
jobDescription.setJobId(newJobIntendifier);
-
+
// create both job path and sub-directory .diswork
fileSystem.createDirectories(
getPathForJob(jobDescription) + "/" + ".diswork");
@@ -565,11 +568,8 @@
}
}
}
-
config.addOneJobSubmitted();
-
log.info("job submited");
-
} catch (DisworkFileSystemException e) {
log.error("file system error", e);
throw new DisworkException(e);
@@ -710,12 +710,17 @@
result.put("total_uptime", getTotalUptime().toString());
result.put("uptime_ratio", numberFormat.format(getUptimeRatio()));
- // TODO 20100706 bleny compute number of jobs done, number of jobs submitted, ratio,
result.put("jobs_done", config.getNumberOfJobsDone().toString());
result.put("jobs_submitted", config.getNumberOfJobsSubmitted().toString());
- result.put("jobs_ratio", "?");
+
+ if (config.getNumberOfJobsSubmitted() == 0) {
+ result.put("jobs_ratio", "∞");
+ } else {
+ Double jobsRatio = (double) (config.getNumberOfJobsDone() / config.getNumberOfJobsSubmitted());
+ result.put("jobs_ratio", numberFormat.format(jobsRatio));
+ }
- Double karma = (config.getNumberOfJobsDone() - config.getNumberOfJobsSubmitted()) * getUptimeRatio();
+ Double karma = ((config.getNumberOfJobsDone() - config.getNumberOfJobsSubmitted()) * getUptimeRatio());
result.put("karma", numberFormat.format(karma));
return result;
}
@@ -855,4 +860,16 @@
workers.activeScheduledActivityStrategy();
}
}
+
+ /**
+ *
+ * @return null if working is disabled
+ * @throws DisworkException
+ */
+ public List<JobDescription> getAllWorkersCurrentJobs() throws DisworkException {
+ if (workers != null) {
+ return workers.getAllWorkersCurrentJobs();
+ }
+ return null;
+ }
}
\ No newline at end of file
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSimpleClient.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSimpleClient.java 2010-07-07 16:23:54 UTC (rev 100)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSimpleClient.java 2010-07-09 11:41:12 UTC (rev 101)
@@ -137,9 +137,10 @@
DisworkFileSystemConfig.newKademliaDisworkConfig(args[0], port));
config.setActivityStrategy("none");
config.setUsedPort(30000);
+ config.setStartHttpFrontend(false);
daemon = new DisworkDaemon(config);
- //userPrompt();
- isisSubmit();
+ userPrompt();
+ //isisSubmit();
}
}
Added: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSystemException.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSystemException.java (rev 0)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkSystemException.java 2010-07-09 11:41:12 UTC (rev 101)
@@ -0,0 +1,19 @@
+package org.nuiton.diswork.daemon;
+
+public class DisworkSystemException extends DisworkException {
+
+ private static final long serialVersionUID = 1L;
+
+ public DisworkSystemException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public DisworkSystemException(String message) {
+ super(message);
+ }
+
+ public DisworkSystemException(Throwable cause) {
+ super(cause);
+ }
+
+}
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java 2010-07-07 16:23:54 UTC (rev 100)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java 2010-07-09 11:41:12 UTC (rev 101)
@@ -132,6 +132,45 @@
pageContent += "</table>\n\n";
+
+
+
+ pageContent += "<h2>Currently running jobs</h2>" + "\n\n"
+ + "<table>" + "\n"
+ + " <tr>" + "\n"
+ + " <th>Name</th>" + "\n"
+ + " <th>Application</th>" + "\n"
+ + " </tr>" + "\n";
+
+ try {
+ jobs = daemon.getAllWorkersCurrentJobs();
+ } catch (DisworkException e) {
+ log.error("error while retrieving local stats", e);
+ throw new ServletException("error while retrieving local stats", e);
+ }
+
+ if (jobs.isEmpty()) {
+ pageContent += " <tr>\n"
+ + " <td colspan=\"2\"><em>working is disabled</em></td>\n"
+ + " </tr>\n";
+ } else {
+ for (JobDescription job : jobs) {
+
+ if (job == null) {
+ pageContent += " <tr>\n"
+ + " <td colspan=\"2\"><em>no job</em></td>\n"
+ + " </tr>\n";
+ } else {
+ pageContent += " <tr>\n"
+ + " <td>" + job.getJobName() + "</td>\n"
+ + " <td>" + job.getApplicationName() + "</td>\n"
+ + " </tr>\n";
+ }
+ }
+ }
+ pageContent += "</table>\n\n";
+
+
pageContent += "<h2>Diswork statistics</h2>\n\n";
Map<String, String> stats;
try {
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java 2010-07-07 16:23:54 UTC (rev 100)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/JobDescription.java 2010-07-09 11:41:12 UTC (rev 101)
@@ -243,9 +243,10 @@
* Factory method to get a JobDescription from JSDL
* @param jsdl the content of the JSDL file
* @return a job description representing the content of the JSDL
+ * @throws BadJobException
* @throws IOException if JSDL is malformed or if an URL is malformed
*/
- public static JobDescription parseJSDL(String jsdl) throws IOException {
+ public static JobDescription parseJSDL(String jsdl) throws BadJobException {
// TODO 20100616 bleny correctly set dependency to JDOM in pom.xml
JobDescription result = new JobDescription();
@@ -324,10 +325,10 @@
}
} catch (JDOMException e) {
log.error("can't read malformed JSDL file", e);
- throw new IOException("can't read malformed JSDL file", e);
+ throw new BadJobException("can't read malformed JSDL file", e);
} catch (MalformedURLException e) {
log.error("malformed URL", e);
- throw new IOException("malformed URL", e);
+ throw new BadJobException("malformed URL", e);
}
return result;
}
Added: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/LocalFileException.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/LocalFileException.java (rev 0)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/LocalFileException.java 2010-07-09 11:41:12 UTC (rev 101)
@@ -0,0 +1,19 @@
+package org.nuiton.diswork.daemon;
+
+public class LocalFileException extends DisworkException {
+
+ private static final long serialVersionUID = 1L;
+
+ public LocalFileException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public LocalFileException(String message) {
+ super(message);
+ }
+
+ public LocalFileException(Throwable cause) {
+ super(cause);
+ }
+
+}
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-07-07 16:23:54 UTC (rev 100)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-07-09 11:41:12 UTC (rev 101)
@@ -28,6 +28,7 @@
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -41,6 +42,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -81,6 +83,9 @@
/** a job found in dir "key" that fail should be moved to "value" */
protected static Map<String, String> FAILED_MOVE = new HashMap<String, String>();
+ /** a job found in dir "key" that is interrupted should be moved to "value" */
+ protected static Map<String, String> INTERRUPTED_MOVE = new HashMap<String, String>();
+
static {
// initialize RUNNING_MOVE and FAILED_MOVE constants
RUNNING_MOVE.put(DisworkDaemon.TODO, DisworkDaemon.TODO_RUNNING);
@@ -93,6 +98,10 @@
FAILED_MOVE.put(DisworkDaemon.TODO_RUNNING, DisworkDaemon.FAILED_1);
FAILED_MOVE.put(DisworkDaemon.FAILED_1_RUNNING, DisworkDaemon.FAILED_2);
FAILED_MOVE.put(DisworkDaemon.FAILED_2_RUNNING, DisworkDaemon.FAILED_3);
+
+ INTERRUPTED_MOVE.put(DisworkDaemon.TODO_RUNNING, DisworkDaemon.TODO);
+ INTERRUPTED_MOVE.put(DisworkDaemon.FAILED_1_RUNNING, DisworkDaemon.FAILED_1);
+ INTERRUPTED_MOVE.put(DisworkDaemon.FAILED_2_RUNNING, DisworkDaemon.FAILED_2);
}
protected DisworkFileSystem fileSystem;
@@ -106,17 +115,14 @@
protected File applicationCache;
- protected Boolean flag = Boolean.FALSE;
+ protected Boolean flag = false;
protected final Object sem = new Object();
protected void updateFlag() throws DisworkException {
- boolean newStatus = activityStrategy.canWork();
- if (newStatus != flag) {
- synchronized (sem) {
- flag = newStatus;
- sem.notifyAll();
- }
+ synchronized (sem) {
+ flag = activityStrategy.canWork();
+ sem.notifyAll();
}
}
@@ -142,10 +148,6 @@
*/
protected class Worker extends Thread {
- // TODO 20100611 bleny make it configurable
- /** time to wait between two look for a job */
- protected static final int JOB_WAIT = 10 * 1000;
-
// TODO 20100614 bleny make it configurable
/** after this time (ms), a job is considered as no longer running */
protected static final long MAX_JOB_RUNNING_TIME = 24 * 60 * 60 * 1000;
@@ -153,6 +155,18 @@
/** set this field to true will make run() return and thread stop */
protected boolean shouldStop = false;
+ /** the current job, null if worker do nothing */
+ protected JobDescription currentJob = null;
+
+ /** current job path on diwork FS */
+ protected String currentJobPath = null;
+
+ /** current job temp directory on the local file-system */
+ protected File currentJobDir = null;
+
+ /** current process, null if nothing is running */
+ protected Process currentProcess;
+
/** read the standard output of the subprocess
*
* By reading the standard output, this thread has multiple goals :
@@ -224,7 +238,7 @@
log.warn("error while closing the output of the subprocess", e);
}
}
- }
+ }
}
/**
@@ -235,162 +249,286 @@
* the log
* @throws IOException
*/
- protected void log(String jobPath, String message)
- throws DisworkFileSystemException, IOException {
- String logPath = jobPath + "/" + DisworkDaemon.LOG_PATH;
- InputStream oldLogAsStream = fileSystem.read(logPath);
- String oldLog = IOUtils.toString(oldLogAsStream);
- String logEntry = message + "\n";
- String newLog = oldLog + logEntry;
- fileSystem.write(logPath, IOUtils.toInputStream(newLog));
+ protected void log(String jobPath, String... messages)
+ throws DisworkSystemException {
+ try {
+ String logPath = jobPath + "/" + DisworkDaemon.LOG_PATH;
+ InputStream oldLogAsStream = fileSystem.read(logPath);
+ String oldLog = IOUtils.toString(oldLogAsStream);
+ String logEntry = "";
+ for (String message : messages) {
+ logEntry += message + "\n";
+ }
+ String newLog = oldLog + logEntry;
+ log.debug("writing new log content" + newLog);
+ fileSystem.write(logPath, IOUtils.toInputStream(newLog));
+ } catch (DisworkFileSystemException e) {
+ log.error("unable to read or write log file", e);
+ throw new DisworkSystemException("unable to read or write log file", e);
+ } catch (IOException e) {
+ log.error("unable to read log file", e);
+ throw new DisworkSystemException("unable to read log file", e);
+ }
}
- /**
- * Download all the files needed for a job in a temp directory, run
- * the job, wait for it to end, write all the results. Mark the job
- * as running at the beginning and move it to DONE or FAILED at
- * the end, depending of the results
- * @param jobPath
- * @return
- * @throws DisworkFileSystemException
- * @throws IOException
- * @throws DisworkException
- */
- protected boolean runJob(String jobPath)
- throws IOException,
- DisworkFileSystemException,
- DisworkException {
- log.info("running job at " + jobPath);
-
- String jsdlPath = jobPath + "/" + DisworkDaemon.JSDL_PATH;
- String jsdl = IOUtils.toString(fileSystem.read(jsdlPath));
-
-
- log.info("read jsdl " + jsdl);
-
- JobDescription jobDescription = JobDescription.parseJSDL(jsdl);
-
- log.info("will run job " + jobDescription);
-
- // create temp dir
- File jobDir = FileUtil.createTempDirectory("job", "",
- new File(config.getTempDirectory()));
- jobDir.mkdirs();
-
+ protected void downloadApplication() throws DisworkSystemException, LocalFileException {
// download application
- if (jobDescription.getApplicationName() != null) {
- log.info("dependency needed for " + jobDescription + " (" +
- jobDescription.getApplicationName() + "-" +
- jobDescription.getApplicationVersion() + ")");
+ if (currentJob.getApplicationName() != null) {
+ log.info("dependency needed for " + currentJob + " (" +
+ currentJob.getApplicationName() + "-" +
+ currentJob.getApplicationVersion() + ")");
File application = getApplicationData(
- jobDescription.getApplicationName(),
- jobDescription.getApplicationVersion());
+ currentJob.getApplicationName(),
+ currentJob.getApplicationVersion());
// unzip application
log.info("unzip application start");
- ZipUtil.uncompress(application, jobDir);
+ try {
+ ZipUtil.uncompress(application, currentJobDir);
+ } catch (IOException e) {
+ log.error("error occured while extracting the application", e);
+ throw new LocalFileException("error occured while extracting the application", e);
+ }
log.info("unzip application finished");
} else {
- log.info("no dependency specified for " + jobDescription);
- }
-
+ log.info("no dependency specified for " + currentJob);
+ }
+ }
+
+ protected void stageInputFiles() throws DisworkSystemException, LocalFileException {
// staging input files
- for (String fileName : jobDescription.getInput()) {
+ for (String fileName : currentJob.getInput()) {
log.info("staging " + fileName);
- File localCopy = new File(jobDir, fileName);
- localCopy.createNewFile();
InputStream source = null;
- if (jobDescription.getInputUrls().containsKey(fileName)) {
+ if (currentJob.getInputUrls().containsKey(fileName)) {
// download this file from URL
- URL url = jobDescription.getInputUrls().get(fileName);
+ URL url = currentJob.getInputUrls().get(fileName);
log.info("downloading from " + url);
- source = url.openStream();
+ try {
+ source = url.openStream();
+ } catch (IOException e) {
+ log.error("failed to download input data from" + url, e);
+ throw new DisworkSystemException("failed to download input data from" + url, e);
+ }
} else {
// download this file from diswork
- source = fileSystem.read(jobPath + "/" + fileName);
+ try {
+ source = fileSystem.read(currentJobPath + "/" + fileName);
+ } catch (DisworkFileSystemException e) {
+ log.error("unable to read input file from diswork", e);
+ throw new DisworkSystemException("unable to read input file from diswork", e);
+ }
}
- IOUtils.copy(source, new FileOutputStream(localCopy));
+
+ try {
+ File localCopy = new File(currentJobDir, fileName);
+ localCopy.createNewFile();
+ IOUtils.copy(source, new FileOutputStream(localCopy));
+ } catch (IOException e) {
+ log.error("unable to write input file to local dir", e);
+ throw new LocalFileException("unable to write input file to local dir", e);
+ }
}
-
+ }
+
+ protected void prepareAndRunJob() throws DisworkException {
log.info("preparing the job");
// prepare the job and run it
String commandLine = config.parseCommandLine(
- jobDescription.getCommandLine(),
- jobDir.getAbsolutePath());
+ currentJob.getCommandLine(),
+ currentJobDir.getAbsolutePath());
// String[] commandLineElements = commandLine.split(" ");
String[] commandLineElements = StringUtil.split(commandLine, " ");
ProcessBuilder builder = new ProcessBuilder(commandLineElements);
- builder.directory(jobDir);
+ builder.directory(currentJobDir);
builder.redirectErrorStream(true);
- log.info("calling " + commandLine);
- Process job = builder.start();
+ log.info("process will call " + commandLine);
+ try {
+ currentProcess = builder.start();
+ } catch (IOException e) {
+ log.error("unable to run process for job" + currentJob, e);
+ throw new LocalFileException("unable to run process for job" + currentJob, e);
+ }
+
+ // catch (Throwable processError) {}
// start a thread to constantly read on the standard output
- String standardOutputFileName = jobDescription.getStandardOutput();
+ String standardOutputFileName = currentJob.getStandardOutput();
log.info("standardOutputFileName is " + standardOutputFileName);
OutputStream outputFileStream = null;
if (standardOutputFileName != null) {
- File outputFile = new File(jobDir, standardOutputFileName);
+ File outputFile = new File(currentJobDir, standardOutputFileName);
log.info("writing standard output in " + outputFile);
- outputFileStream = new FileOutputStream(outputFile);
+ try {
+ outputFileStream = new FileOutputStream(outputFile);
+ } catch (FileNotFoundException e) {
+ // should not occur since we just created the file
+ log.error("unable to find standard output file", e);
+ throw new LocalFileException("unable to find standard output file", e);
+ }
}
- OutputReader outputReader = new OutputReader(job.getInputStream(),
+ OutputReader outputReader = new OutputReader(currentProcess.getInputStream(),
outputFileStream);
outputReader.start();
// plugging a file on the standard input
- String standardInputFileName = jobDescription.getStandardInput();
+ String standardInputFileName = currentJob.getStandardInput();
if (standardInputFileName != null) {
log.info("writing " + standardInputFileName + " on standard "
+ "input");
- InputStream input = new FileInputStream(
- new File(jobDir, standardInputFileName));
- IOUtils.copy(input, job.getOutputStream());
- }
-
- // run the process and wait for it to return
- int exitValue = -1;
- try {
- log.info("waiting for the end of the process");
- exitValue = job.waitFor();
- } catch (InterruptedException e) {
- log.error("job " + jobDescription + " was interrupted", e);
- // FIXME 20100611 bleny job is considered has failed
- exitValue = 1;
+ try {
+ InputStream input = new FileInputStream(
+ new File(currentJobDir, standardInputFileName));
+ IOUtils.copy(input, currentProcess.getOutputStream());
+ } catch (FileNotFoundException e) {
+ // file may not have been provided by job submitter
+ if (currentJob.getInput().contains(standardInputFileName)) {
+ log.error("standard input file is not found", e);
+ throw new DisworkSystemException("standard input file is not found", e);
+ } else {
+ throw new BadJobException("standard input file is not provided in " + currentJob);
+ }
+ } catch (IOException e) {
+ log.error("unable to read data from input file", e);
+ throw new DisworkSystemException("unable to read data from input file", e);
+ }
}
- log.info("job returned " + exitValue);
+ }
+ protected void stageOutputFiles() throws DisworkSystemException, BadJobException {
// output file staging
- for (String fileName : jobDescription.getOutput()) {
+ for (String fileName : currentJob.getOutput()) {
log.info("staging file " + fileName);
- File localCopy = new File(jobDir, fileName);
-
+ File localCopy = new File(currentJobDir, fileName);
+
if (localCopy.exists()) {
- InputStream localCopyStream = new FileInputStream(localCopy);
-
- String filePath = jobPath + "/" + fileName;
-
- // erase before write
- if (fileSystem.exists(filePath)) {
- fileSystem.delete(filePath);
+ String filePath = currentJobPath + "/" + fileName;
+ // copy local file to diswork FS
+
+ InputStream localCopyStream = null;
+ try {
+ localCopyStream = new FileInputStream(localCopy);
+
+ // erase before write if an input file as an output too
+ if (fileSystem.exists(filePath)) {
+ fileSystem.delete(filePath);
+ }
+
+ fileSystem.write(filePath, localCopyStream);
+ } catch (FileNotFoundException e) {
+ // file exists, tested just before
+ } catch (DisworkFileSystemException e) {
+ log.error("error while uploading results", e);
+ throw new DisworkSystemException("error while uploading results", e);
+ } finally {
+ IOUtils.closeQuietly(localCopyStream);
}
+ } else {
+ throw new BadJobException("job " + currentJob + " do not produces a file" + fileName);
+ }
+ }
+ }
+
+ /**
+ * Download all the files needed for a job in a temp directory, run
+ * the job, wait for it to end, write all the results. Mark the job
+ * as running at the beginning and move it to DONE or FAILED at
+ * the end, depending of the results
+ * @param currentJobPath
+ * @return
+ * @throws BadJobException
+ * @throws DisworkFileSystemException
+ * @throws IOException
+ * @throws DisworkException
+ */
+ protected void runJob() throws DisworkException {
+ try {
+ log.info("running job at " + currentJobPath);
+ try {
+ String jsdlPath = currentJobPath + "/" + DisworkDaemon.JSDL_PATH;
+ String jsdl = IOUtils.toString(fileSystem.read(jsdlPath));
+ log.info("read jsdl " + jsdl);
+ currentJob = JobDescription.parseJSDL(jsdl);
+ } catch (IOException e) {
+ log.error("unable to read or parse JSDL", e);
+ throw new DisworkSystemException("unable to read or parse JSDL", e);
+ } catch (DisworkFileSystemException e) {
+ log.error("unable to read JSDL", e);
+ throw new DisworkSystemException("unable to read JSDL", e);
+ }
- fileSystem.write(filePath, localCopyStream);
- localCopyStream.close();
+ log.info("will run job " + currentJob);
+
+ // create temp dir
+ try {
+ currentJobDir = FileUtil.createTempDirectory("job", "",
+ new File(config.getTempDirectory()));
+ currentJobDir.mkdirs();
+ } catch (IOException e) {
+ log.error("unable to create temp directory for job", e);
+ throw new LocalFileException("unable to create temp directory for job", e);
}
+
+ downloadApplication();
+
+ stageInputFiles();
+
+ // until there we didn't started the job, it's not too late to
+ // stop, last check of shouldStrop
+ if (!shouldStop) {
+ prepareAndRunJob();
+
+ // wait for the process to return
+ jobIsStarted(currentJobPath);
+ boolean processFinished = false;
+ while (!processFinished) {
+ Integer exitValue; // exitValue if the process is
+ // finished, null if the process
+ // is not finished
+ try {
+ exitValue = currentProcess.exitValue();
+ processFinished = true;
+ } catch (IllegalThreadStateException e) {
+ // process is not finished
+ exitValue = null;
+ }
+
+ if (exitValue == null) {
+ if (shouldStop) {
+ jobIsInterrupted(currentJobPath);
+ processFinished = true;
+ } else {
+ try {
+ Thread.sleep(10 * 1000);
+ } catch (InterruptedException e) {
+ log.error("worker interupted", e);
+ throw new DisworkException("worker interupted", e);
+ }
+ }
+ } else {
+ stageOutputFiles();
+ // job is finished
+ if (exitValue == 0) {
+ // job is successful
+ jobIsSuccessful(currentJobPath);
+ } else {
+ jobIsFailed(currentJobPath);
+ }
+ }
+ }
+ } else {
+ jobIsInterrupted(currentJobPath);
+ }
+ } catch (BadJobException e) {
+ jobIsFailed(currentJobPath);
+ } finally {
+ currentJob = null;
+ currentProcess = null;
+ // clean up the job directory
+ FileUtil.deleteRecursively(currentJobDir);
+ currentJobDir = null;
}
-
- // clean up the job directory
- FileUtil.deleteRecursively(jobDir);
-
- boolean success = exitValue == 0;
- if (success) {
- log(jobPath, "DONE");
- } else {
- log(jobPath, "FAILED");
- }
- return success;
}
/**
@@ -409,156 +547,205 @@
}
}
- protected void findAJobAndRunIt() throws IOException,
- DisworkFileSystemException,
- DisworkException {
-
- // Once a job is found, those two var will be set
- String jobLinkDir = null;
- String jobLinkName = null;
-
- // use a synchronized block because multiple workers
- // may try to take a same job
- synchronized (fileSystem) {
+ /**
+ * try to find a job, if found, take it and return the path
+ * @return the path to the job, null if no job found
+ * @throws DisworkSystemException
+ */
+ protected String findAJob() throws DisworkSystemException {
+ try {
+ // Once a job is found, those two var will be set
+ String jobLinkDir = null;
+ String jobLinkName = null;
- // fist, try to find a job declared has running since
- // too long to re-run it
- String[] runningJobsDirs = { DisworkDaemon.FAILED_2_RUNNING,
- DisworkDaemon.FAILED_1_RUNNING,
- DisworkDaemon.TODO_RUNNING
- };
- // browsing all "running" dirs
- for (String path : runningJobsDirs) {
- String oldName = getFistJobName(path);
- if (oldName != null) {
- Long linkAge = System.currentTimeMillis()
- - Long.parseLong(oldName);
- // check is oldest job is too old and should be
- // considered has to-be-rerun
- if (linkAge > MAX_JOB_RUNNING_TIME) {
- log.info("taking old job (age = " + linkAge + ")");
- jobLinkDir = path;
- jobLinkName = oldName;
- // FIXME 20100617 bleny break s*cks
- break;
+ // use a synchronized block because multiple workers
+ // may try to take a same job
+ synchronized (fileSystem) {
+
+ // fist, try to find a job declared has running since
+ // too long to re-run it
+ String[] runningJobsDirs = { DisworkDaemon.FAILED_2_RUNNING,
+ DisworkDaemon.FAILED_1_RUNNING,
+ DisworkDaemon.TODO_RUNNING
+ };
+ // browsing all "running" dirs
+ for (String path : runningJobsDirs) {
+ String oldName = getFistJobName(path);
+ if (oldName != null) {
+ Long linkAge = System.currentTimeMillis()
+ - Long.parseLong(oldName);
+ // check is oldest job is too old and should be
+ // considered has to-be-rerun
+ if (linkAge > MAX_JOB_RUNNING_TIME) {
+ log.info("taking old job (age = " + linkAge + ")");
+ jobLinkDir = path;
+ jobLinkName = oldName;
+ // FIXME 20100617 bleny break s*cks
+ break;
+ }
}
}
- }
-
- // if no job was found, search now in not running jobs
- if (jobLinkDir == null) {
- String[] jobsDirs = { DisworkDaemon.FAILED_2,
- DisworkDaemon.FAILED_1,
- DisworkDaemon.TODO
- };
- for (String path : jobsDirs) {
- String oldName = getFistJobName(path);
- if (oldName != null) { // take it
- jobLinkDir = path;
- jobLinkName = oldName;
- // FIXME 20100617 bleny break s*cks
- break;
+
+ // if no job was found, search now in not running jobs
+ if (jobLinkDir == null) {
+ String[] jobsDirs = { DisworkDaemon.FAILED_2,
+ DisworkDaemon.FAILED_1,
+ DisworkDaemon.TODO
+ };
+ for (String path : jobsDirs) {
+ String oldName = getFistJobName(path);
+ if (oldName != null) { // take it
+ jobLinkDir = path;
+ jobLinkName = oldName;
+ // FIXME 20100617 bleny break s*cks
+ break;
+ }
}
}
+
+ if (jobLinkDir != null) {
+ // move the link before running the job
+ String oldPath = jobLinkDir + "/" + jobLinkName;
+ log.info("job found at " + oldPath);
+
+ jobLinkDir = RUNNING_MOVE.get(jobLinkDir);
+ jobLinkName = DisworkDaemon.newJobLinkName();
+ String newPath = jobLinkDir + "/" + jobLinkName;
+
+ log.info("moving " + oldPath + " to " + newPath);
+ fileSystem.move(oldPath, newPath);
+
+ }
}
-
+
+ String jobPath = null;
+ // now, if no job was found
if (jobLinkDir != null) {
- // move the link before running the job
- String oldPath = jobLinkDir + "/" + jobLinkName;
- log.info("job found at " + oldPath);
-
- jobLinkDir = RUNNING_MOVE.get(jobLinkDir);
- jobLinkName = DisworkDaemon.newJobLinkName();
- String newPath = jobLinkDir + "/" + jobLinkName;
-
- log.info("moving " + oldPath + " to " + newPath);
- fileSystem.move(oldPath, newPath);
-
+ jobPath = jobLinkDir + "/" + jobLinkName;
+ } else {
+ try {
+ log.info("look for a job was unsucessful, will wait " + config.getJobLooksWaitTime() + " seconds before next try");
+ Thread.sleep(config.getJobLooksWaitTime() * 1000);
+ } catch (InterruptedException e) {
+ log.error("worker interrupted while waiting before trying to find a new job", e);
+ throw new DisworkSystemException("worker interrupted while waiting before trying to find a new job", e);
+ }
}
+ return jobPath;
+ } catch (DisworkFileSystemException e) {
+ log.error("error while trying to find a job", e);
+ throw new DisworkSystemException("error while trying to find a job", e);
}
-
- // now, if no job was found, do nothing
- if (jobLinkDir == null) {
- log.info("nothing to do");
- try {
- Thread.sleep(JOB_WAIT);
- } catch (InterruptedException e) {
- log.warn("worker interrupted while waiting", e);
- // TODO 20100629 bleny ?
- }
- // if a job was found, take it
+ }
+
+ /** update the log of the job
+ * permit the user to use
+ * {@link DisworkDaemon#isSuccessful(JobDescription)}
+ */
+ protected void jobIsSuccessful(String jobPath) throws DisworkSystemException {
+ String newJobPath = DisworkDaemon.DONE + "/" + DisworkDaemon.newJobLinkName();
+ try {
+ log.info("moving " + jobPath + " to " + newJobPath);
+ fileSystem.move(jobPath, newJobPath);
+ } catch (DisworkFileSystemException e) {
+ log.error("error while moving job link", e);
+ throw new DisworkSystemException("error while moving job link", e);
+ }
+ log.info("marking " + newJobPath + " as done and finished");
+ log(newJobPath, "DONE", "FINISHED");
+ config.addOneJobDone();
+ }
+
+ /** update the log of the job and move the link
+ * permit the job submitter to use
+ * {@link DisworkDaemon#isFailed(JobDescription)}
+ */
+ protected void jobIsFailed(String jobPath) throws DisworkSystemException {
+ String jobDir = FilenameUtils.getFullPathNoEndSeparator(jobPath);
+ String newDir = FAILED_MOVE.get(jobDir);
+ String newJobPath = newDir + "/" + DisworkDaemon.newJobLinkName();
+
+ try {
+ log.info("moving " + jobPath + " to " + newJobPath);
+ fileSystem.move(jobPath, newJobPath);
+ } catch (DisworkFileSystemException e) {
+ log.error("error while moving job link", e);
+ throw new DisworkSystemException("error while moving job link", e);
+ }
+
+ if (newDir.equals(DisworkDaemon.FAILED_3)) {
+ log.info("marking " + newJobPath + " as failed and finished");
+ log(newJobPath, "FAILED", "FINISHED");
} else {
+ log.info("marking " + newJobPath + " as failed");
+ log(newJobPath, "FAILED");
+ }
+ }
- String jobPath = jobLinkDir + "/" + jobLinkName;
-
- if (fileSystem.exists(jobPath + "/" + DisworkDaemon.JSDL_PATH)) {
-
- log(jobPath, "STARTED");
-
- boolean jobSuccess = runJob(jobPath);
-
- // move the link after the job
- String newDir = null;
- if (jobSuccess) {
- newDir = DisworkDaemon.DONE;
- config.addOneJobDone();
- } else {
- newDir = FAILED_MOVE.get(jobLinkDir);
- }
- String newPath = newDir + "/" + DisworkDaemon.newJobLinkName();
-
- log.info("moving " + jobPath + " to " + newPath);
- fileSystem.move(jobPath, newPath);
-
- // mark the job has finished if done or failed too many
- // times
- if ( newDir == DisworkDaemon.DONE ||
- newDir == DisworkDaemon.FAILED_3) {
- log.info("marking " + newPath + " as finished");
- log(newPath, "FINISHED");
- }
- } else {
- // job has been canceled
- fileSystem.delete(jobPath);
- }
+ /** put the job back where it was found */
+ protected void jobIsInterrupted(String jobPath) throws DisworkSystemException {
+ String jobDir = FilenameUtils.getFullPathNoEndSeparator(jobPath);
+ String jobName = FilenameUtils.getName(jobPath);
+ String newDir = INTERRUPTED_MOVE.get(jobDir);
+ String newJobPath = newDir + "/" + jobName;
+
+ try {
+ log.info("moving " + jobPath + " to " + newJobPath);
+ fileSystem.move(jobPath, newJobPath);
+ } catch (DisworkFileSystemException e) {
+ log.error("error while moving job link", e);
+ throw new DisworkSystemException("error while moving job link", e);
}
}
-
+
+ /** update the log the job
+ * permit the job submitter to use
+ * @link {@link DisworkDaemon#isStarted(JobDescription)}
+ */
+ protected void jobIsStarted(String jobPath) throws DisworkSystemException {
+ log(jobPath, "STARTED");
+ }
+
/**
* find
*/
@Override
public void run() {
- while (! shouldStop) {
- try {
+ try {
+ while (! shouldStop) {
synchronized (sem) {
if (getFlag()) {
- findAJobAndRunIt();
+ currentJobPath = findAJob();
+ if (currentJobPath != null) {
+ runJob();
+ currentJobPath = null;
+ }
} else {
log.debug("sleeping until a change on flag");
long waitTime = activityStrategy.timeBeforeNextUpdate();
- if (waitTime == -1) {
- sem.wait();
- } else {
- sem.wait(waitTime);
+ try {
+ if (waitTime == -1) {
+ // wait until notify
+ sem.wait();
+ } else {
+ sem.wait(waitTime);
+ }
+ } catch (InterruptedException e) {
+ log.error("interrupted while waiting for a change of activity", e);
+ throw new DisworkSystemException("interrupted while waiting for a change of activity", e);
}
}
}
- } catch (DisworkException e) {
- log.error("worker error " + e);
- throw new RuntimeException("worker error " + e);
- } catch (IOException e) {
- log.error("worker error " + e);
- throw new RuntimeException("worker error " + e);
- } catch (DisworkFileSystemException e) {
- log.error("worker error " + e);
- throw new RuntimeException("worker error " + e);
- } catch (InterruptedException e) {
- log.error("worker error " + e);
- throw new RuntimeException("worker error " + e);
}
+ } catch (DisworkException e) {
+ throw new RuntimeException("an error occured", e);
}
}
+
+ @Override
+ public String toString() {
+ return getName();
+ }
}
public WorkersManager(DisworkFileSystem fileSystem, DisworkConfig config)
@@ -582,11 +769,12 @@
}
}
- /** read an application from the file system and use a cache */
+ /** read an application from the file system and use a cache
+ * @throws DisworkSystemException
+ * @throws LocalFileException */
protected File getApplicationData(String applicationName,
String applicationVersion)
- throws DisworkFileSystemException,
- IOException {
+ throws DisworkSystemException, LocalFileException {
File cachedApplicationData = new File(applicationCache,
applicationName + "-" + applicationVersion + ".zip");
if (!cachedApplicationData.exists()) {
@@ -594,12 +782,30 @@
synchronized (cachedApplicationData) {
String applicationPath = DisworkDaemon.getPathForDependency(
applicationName, applicationVersion);
- InputStream applicationData = fileSystem.read(applicationPath);
- cachedApplicationData.createNewFile();
- log.info("will create " + cachedApplicationData.getAbsolutePath());
- OutputStream out = new FileOutputStream(cachedApplicationData);
- log.debug("starting copy of " + applicationData.available() + " bytes");
- IOUtils.copy(applicationData, out);
+ InputStream applicationData = null;
+ try {
+ applicationData = fileSystem.read(applicationPath);
+ } catch (DisworkFileSystemException e) {
+ log.error("unable to get application", e);
+ throw new DisworkSystemException("unable to get application", e);
+ } finally {
+ IOUtils.closeQuietly(applicationData);
+ }
+
+ OutputStream out = null;
+ try {
+ cachedApplicationData.createNewFile();
+ log.info("will create " + cachedApplicationData.getAbsolutePath());
+ out = new FileOutputStream(cachedApplicationData);
+ log.debug("starting copy of " + applicationData.available() + " bytes");
+ IOUtils.copy(applicationData, out);
+ } catch (IOException e) {
+ log.error("unable to write application in cache", e);
+ throw new LocalFileException("unable to write application in cache", e);
+ } finally {
+ IOUtils.closeQuietly(applicationData);
+ IOUtils.closeQuietly(out);
+ }
}
} else {
log.debug("cache matches for " + applicationName + "-" + applicationVersion);
@@ -608,12 +814,9 @@
}
public void stop() throws DisworkException {
- stop(false);
- }
-
- public void stop(boolean now) throws DisworkException {
// asking to all threads to stop
for (Worker worker : workers) {
+ log.debug("asking " + worker + " to stop");
worker.shouldStop = true;
}
@@ -621,16 +824,17 @@
FileUtil.deleteRecursively(applicationCache);
- if( !now ) {
- // waiting for them to actually have finished
- for (Worker worker : workers) {
- while (worker.isAlive()) {
- try {
- Thread.sleep(10 * 1000);
- } catch (InterruptedException e) {
- log.warn("interrupted while waiting for a worker to " +
- "stop", e);
- }
+ // waiting for them to actually have finished
+ for (Worker worker : workers) {
+ while (worker.isAlive()) {
+ try {
+ // worker may be sleeping
+ activeNoActivityStrategy();
+ log.debug("waiting for " + worker + " to return");
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ log.warn("interrupted while waiting for a worker to " +
+ "stop", e);
}
}
}
@@ -674,4 +878,13 @@
public void activeScheduledActivityStrategy() throws DisworkException {
setActivityStrategy(ActivityStrategies.SCHEDULED);
}
+
+ /** this is only for monitoring purpose */
+ public List<JobDescription> getAllWorkersCurrentJobs() {
+ List<JobDescription> result = new ArrayList<JobDescription>();
+ for (Worker worker : workers) {
+ result.add(worker.currentJob);
+ }
+ return result;
+ }
}
\ No newline at end of file
Modified: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java
===================================================================
--- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java 2010-07-07 16:23:54 UTC (rev 100)
+++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java 2010-07-09 11:41:12 UTC (rev 101)
@@ -5,7 +5,7 @@
@Override
protected void setConfigs() {
- config = super.newConfig();
+ config = newConfig();
config.setNumberOfWorkers(32);
}
Modified: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java
===================================================================
--- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java 2010-07-07 16:23:54 UTC (rev 100)
+++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java 2010-07-09 11:41:12 UTC (rev 101)
@@ -7,10 +7,10 @@
@Override
protected void setConfigs() {
- config = super.newConfig();
+ config = newConfig();
config.setActivityStrategy("none");
- config2 = super.newConfig();
+ config2 = newConfig();
config2.setBootstrapIp(DisworkFileSystemConfig.getIp());
config2.setBootstrapPort(config.getUsedPort());
config2.setActivityStrategy("unlimited");
Modified: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java
===================================================================
--- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java 2010-07-07 16:23:54 UTC (rev 100)
+++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java 2010-07-09 11:41:12 UTC (rev 101)
@@ -17,10 +17,10 @@
protected DisworkConfig config;
protected DisworkConfig config2;
-
- protected static DisworkDaemon daemon;
- protected static DisworkDaemon daemon2;
-
+
+ protected DisworkDaemon daemon;
+ protected DisworkDaemon daemon2;
+
protected static int port = 45500;
/** a factory method to ease the creation of configs */
@@ -28,6 +28,7 @@
DisworkConfig config = new DisworkConfig();
port += 1;
config.setUsedPort(port);
+ config.setJobLooksWaitTime(1);
// useless in tests
config.setStartHttpFrontend(false);
@@ -66,7 +67,6 @@
JobDescription job = new JobDescription();
job.setCommandLine("java -version");
daemon.submitJob(job);
-
}
@Test(expected = DisworkException.class)
@@ -87,6 +87,7 @@
Thread.sleep(5 * 1000);
}
+ assertTrue(daemon.isStarted(job));
assertTrue(daemon.isSuccessful(job));
// check getAllJobs return
@@ -193,4 +194,25 @@
public void testJobsManagement() throws Exception {
assertEquals(0, daemon.getAllJobs().size());
}
+
+ @Test
+ public void testJobError() throws Exception {
+ JobDescription job = new JobDescription();
+ job.setJobName("My Job with wrong command line");
+ job.setApplication("fake-app", "1.0");
+ job.setCommandLine("%java -jar no_such_jar.jar");
+ job.setStandardOutput("output.txt");
+ job.addOutput("output.txt");
+
+ daemon.submitJob(job);
+
+ while(! daemon.isFinished(job)) {
+ Thread.sleep(5 * 1000);
+ }
+
+ assertTrue(daemon.isFailed(job));
+ assertTrue(daemon.getResults(job).containsKey("output.txt"));
+ String output = IOUtils.toString(daemon.getResults(job).get("output.txt"));
+ assertTrue(output.contains("Unable to access jarfile no_such_jar.jar"));
+ }
}
Modified: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/JobDescriptionTest.java
===================================================================
--- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/JobDescriptionTest.java 2010-07-07 16:23:54 UTC (rev 100)
+++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/JobDescriptionTest.java 2010-07-09 11:41:12 UTC (rev 101)
@@ -5,7 +5,6 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import java.io.IOException;
import java.net.URL;
import org.apache.commons.collections.ListUtils;
@@ -53,8 +52,13 @@
@Test
public void testParseJSDL1() throws Exception {
- try {
- JobDescription jobCopy = JobDescription.parseJSDL(job.toJSDL());
+ JobDescription jobCopy;
+ try {
+ jobCopy = JobDescription.parseJSDL(job.toJSDL());
+ } catch (BadJobException e) {
+ fail();
+ throw e;
+ }
assertNotNull(jobCopy);
assertEquals(job.getJobName(), jobCopy.getJobName());
assertEquals(job.getApplicationName(),
@@ -64,35 +68,31 @@
assertEquals(job.getCommandLine(), jobCopy.getCommandLine());
assertEquals(job.getStandardInput(), jobCopy.getStandardInput());
assertEquals(job.getStandardOutput(), jobCopy.getStandardOutput());
- } catch (IOException e) {
- fail();
- throw e;
- }
}
@Test
public void testParseJSDL2() throws Exception {
+ JobDescription job2Copy;
try {
- JobDescription job2Copy = JobDescription.parseJSDL(job2.toJSDL());
- assertEquals(job2.getCommandLine(), job2Copy.getCommandLine());
- assertEquals(job2.getStandardInput(), job2Copy.getStandardInput());
- assertEquals(job2.getStandardOutput(),
- job2Copy.getStandardOutput());
-
- assertTrue(ListUtils.isEqualList(job2.getInput(),
- job2Copy.getInput()));
- assertTrue(ListUtils.isEqualList(job2.getOutput(),
- job2Copy.getOutput()));
- assertTrue(ListUtils.isEqualList(
- job2.getInputUrls().keySet(),
- job2Copy.getInputUrls().keySet()));
- assertTrue(ListUtils.isEqualList(
- job2.getInputUrls().values(),
- job2Copy.getInputUrls().values()));
- } catch (IOException e) {
+ job2Copy = JobDescription.parseJSDL(job2.toJSDL());
+ } catch (BadJobException e) {
fail();
throw e;
}
+ assertEquals(job2.getCommandLine(), job2Copy.getCommandLine());
+ assertEquals(job2.getStandardInput(), job2Copy.getStandardInput());
+ assertEquals(job2.getStandardOutput(),
+ job2Copy.getStandardOutput());
+
+ assertTrue(ListUtils.isEqualList(job2.getInput(),
+ job2Copy.getInput()));
+ assertTrue(ListUtils.isEqualList(job2.getOutput(),
+ job2Copy.getOutput()));
+ assertTrue(ListUtils.isEqualList(
+ job2.getInputUrls().keySet(),
+ job2Copy.getInputUrls().keySet()));
+ assertTrue(ListUtils.isEqualList(
+ job2.getInputUrls().values(),
+ job2Copy.getInputUrls().values()));
}
-
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java 2010-07-07 16:23:54 UTC (rev 100)
+++ trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystemConfig.java 2010-07-09 11:41:12 UTC (rev 101)
@@ -63,7 +63,7 @@
private static final Log log =
LogFactory.getLog(DisworkFileSystemConfig.class);
- protected static Integer port = 19000;
+ protected static Integer port = 18999;
/**
* returns a new port, returned value change at each call.
1
0
r100 - trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon
by bleny@users.nuiton.org 07 Jul '10
by bleny@users.nuiton.org 07 Jul '10
07 Jul '10
Author: bleny
Date: 2010-07-07 18:23:54 +0200 (Wed, 07 Jul 2010)
New Revision: 100
Url: http://nuiton.org/repositories/revision/diswork/100
Log:
config
Modified:
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java 2010-07-07 14:56:36 UTC (rev 99)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java 2010-07-07 16:23:54 UTC (rev 100)
@@ -82,11 +82,11 @@
*
* For the http front-end :
* <dl>
- * <dt>diswork.httpfrontend.start</dt>
+ * <dt>diswork.http_front_end.start</dt>
* <dd>"true" and the front end will start at diswork boot. Note that
* the front-end can be started later even if this config directive is
* at "false"</dd>
- * <dt>diswork.httpfrontend.port</dt>
+ * <dt>diswork.http_front_end.port</dt>
* <dd>the port to use for HTTP server. It means that the browser while
* have to use this port to get connected to the front-end. Default
* port is 8080.</dd>
@@ -123,13 +123,19 @@
setDefaultOption("diswork.activity_strategy", "unlimited");
- // if no total_uptime saved, consider daemon has never run
- setDefaultOption("diswork.total_uptime", "0");
+ setOption("diswork.http_front_end.start", "true");
+ setOption("diswork.http_front_end.port", "8080");
+
+ setFileSystemConfig(DisworkFileSystemConfig.newKademliaDisworkConfig());
+
+
+
- setOption("diswork.httpfrontend.start", "true");
- setOption("diswork.httpfrontend.port", "8080");
- setFileSystemConfig(DisworkFileSystemConfig.newKademliaDisworkConfig());
+ // init data never initialized
+ setDefaultOption("diswork.total_uptime", "0");
+ setDefaultOption("diswork.number_of_jobs_done", "0");
+ setDefaultOption("diswork.number_of_jobs_submitted", "0");
}
@@ -277,12 +283,19 @@
/* ** those config data are not set by the user but computed by daemon ** */
+ // FIXME 20100607 bleny its not config data, it should be moved to a persistent file
+
+ protected void save() {
+ // saveForUser(null);
+ }
+
protected String getOwnerId() {
return getOption("diswork.owner");
}
protected void setOwnerId(String ownerId) {
setOption("diswork.owner", ownerId);
+ save();
}
protected Long getTotalUptime() {
@@ -292,10 +305,12 @@
protected void setTotalUptime(Long upTime) {
setOption("diswork.total_uptime", upTime.toString());
+ save();
}
protected void setFirstRunTime(Long time) {
setOption("diswork.first_run_time", time.toString());
+ save();
}
protected Long getFirstRunTime() {
@@ -303,6 +318,26 @@
return Long.parseLong(firstRunTime);
}
+ protected void addOneJobDone() {
+ Integer newValue = getNumberOfJobsDone() + 1;
+ setOption("diswork.number_of_jobs_done", newValue.toString());
+ save();
+ }
+
+ protected Integer getNumberOfJobsDone() {
+ return getOptionAsInt("diswork.number_of_jobs_done");
+ }
+
+ protected void addOneJobSubmitted() {
+ Integer newValue = getNumberOfJobsDone() + 1;
+ setOption("diswork.number_of_jobs_submitted", newValue.toString());
+ save();
+ }
+
+ protected Integer getNumberOfJobsSubmitted() {
+ return getOptionAsInt("diswork.number_of_jobs_submitted");
+ }
+
/* ** trivial applicationConfig setters and getters ** */
public String getTempDirectory() {
@@ -358,18 +393,18 @@
}
public Boolean getStartHttpFrontend() {
- return getOptionAsBoolean("diswork.httpfrontend.start");
+ return getOptionAsBoolean("diswork.http_front_end.start");
}
public void setStartHttpFrontend(Boolean startHttpFrontend) {
- setOption("diswork.httpfrontend.start", startHttpFrontend.toString());
+ setOption("diswork.http_front_end.start", startHttpFrontend.toString());
}
public Integer getHttpFrontendPort() {
- return getOptionAsInt("diswork.httpfrontend.port");
+ return getOptionAsInt("diswork.http_front_end.port");
}
public void setHttpFrontendPort(Integer httpFrontendPort) {
- setOption("diswork.httpfrontend.port", httpFrontendPort.toString());
+ setOption("diswork.http_front_end.port", httpFrontendPort.toString());
}
}
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java 2010-07-07 14:56:36 UTC (rev 99)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java 2010-07-07 16:23:54 UTC (rev 100)
@@ -28,6 +28,7 @@
import java.io.InputStream;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
+import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -565,6 +566,8 @@
}
}
+ config.addOneJobSubmitted();
+
log.info("job submited");
} catch (DisworkFileSystemException e) {
@@ -700,17 +703,20 @@
}
public Map<String, String> getLocalStats() throws DisworkException {
+ NumberFormat numberFormat = NumberFormat.getInstance();
+ numberFormat.setMaximumFractionDigits(2);
+
Map<String, String> result = new HashMap<String, String>();
result.put("total_uptime", getTotalUptime().toString());
- result.put("uptime_ratio", getUptimeRatio().toString());
+ result.put("uptime_ratio", numberFormat.format(getUptimeRatio()));
// TODO 20100706 bleny compute number of jobs done, number of jobs submitted, ratio,
- result.put("jobs_done", "?");
- result.put("jobs_submitted", "?");
+ result.put("jobs_done", config.getNumberOfJobsDone().toString());
+ result.put("jobs_submitted", config.getNumberOfJobsSubmitted().toString());
result.put("jobs_ratio", "?");
-
- // TODO 20100706 bleny compute score
- result.put("score", "0");
+
+ Double karma = (config.getNumberOfJobsDone() - config.getNumberOfJobsSubmitted()) * getUptimeRatio();
+ result.put("karma", numberFormat.format(karma));
return result;
}
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java 2010-07-07 14:56:36 UTC (rev 99)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java 2010-07-07 16:23:54 UTC (rev 100)
@@ -68,6 +68,8 @@
}
public class MainServlet extends HttpServlet {
+ private static final long serialVersionUID = 1L;
+
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-07-07 14:56:36 UTC (rev 99)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-07-07 16:23:54 UTC (rev 100)
@@ -167,7 +167,6 @@
* </ul>
*
* @author bleny
- *
*/
protected class OutputReader extends Thread {
@@ -225,8 +224,7 @@
log.warn("error while closing the output of the subprocess", e);
}
}
- }
-
+ }
}
/**
@@ -503,6 +501,7 @@
String newDir = null;
if (jobSuccess) {
newDir = DisworkDaemon.DONE;
+ config.addOneJobDone();
} else {
newDir = FAILED_MOVE.get(jobLinkDir);
}
@@ -519,7 +518,7 @@
log(newPath, "FINISHED");
}
} else {
- // job has been cancelled
+ // job has been canceled
fileSystem.delete(jobPath);
}
}
@@ -562,7 +561,8 @@
}
}
- public WorkersManager(DisworkFileSystem fileSystem, DisworkConfig config) throws DisworkException {
+ public WorkersManager(DisworkFileSystem fileSystem, DisworkConfig config)
+ throws DisworkException {
this.fileSystem = fileSystem;
this.config = config;
@@ -640,18 +640,23 @@
return activityStrategy;
}
- public void setActivityStrategy(ActivityStrategy activityStrategy) throws DisworkException {
+ public void setActivityStrategy(ActivityStrategy activityStrategy)
+ throws DisworkException {
this.activityStrategy = activityStrategy;
log.info("swithching to " + activityStrategy);
updateFlag();
}
- public void setActivityStrategy(String activityStrategyLabel) throws DisworkException {
- setActivityStrategy(ActivityStrategies.getNewInstance(config, activityStrategyLabel));
+ public void setActivityStrategy(String activityStrategyLabel)
+ throws DisworkException {
+ setActivityStrategy(ActivityStrategies.getNewInstance
+ (config, activityStrategyLabel));
}
- public void setActivityStrategy(ActivityStrategies activityStrategies) throws DisworkException {
- setActivityStrategy(ActivityStrategies.getNewInstance(config, activityStrategies));
+ public void setActivityStrategy(ActivityStrategies activityStrategies)
+ throws DisworkException {
+ setActivityStrategy(ActivityStrategies.getNewInstance
+ (config, activityStrategies));
}
public void activeNoActivityStrategy() throws DisworkException {
1
0
r99 - trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon
by bleny@users.nuiton.org 07 Jul '10
by bleny@users.nuiton.org 07 Jul '10
07 Jul '10
Author: bleny
Date: 2010-07-07 16:56:36 +0200 (Wed, 07 Jul 2010)
New Revision: 99
Url: http://nuiton.org/repositories/revision/diswork/99
Log:
suppression des attentes actives
Modified:
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java 2010-07-07 09:22:04 UTC (rev 98)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java 2010-07-07 14:56:36 UTC (rev 99)
@@ -61,13 +61,72 @@
*
*/
public interface ActivityStrategy {
+
+ public enum ActivityStrategies {
+ NONE ("none"),
+ UNLIMITED ("unlimited"),
+ LIMITED ("limited"),
+ SCHEDULED ("scheduled");
+ /** those labels will be used in the config file */
+ public String label;
+
+ ActivityStrategies(String name) {
+ this.label = name;
+ }
+
+ /** return the enumeration element given its label */
+ static ActivityStrategies valueOfLabel(String label) {
+ for (ActivityStrategies aStrategy : values()) {
+ if (aStrategy.label == label) {
+ return aStrategy;
+ }
+ }
+ throw new IllegalArgumentException("wrong strategy label : " + label);
+ }
+
+ /** factory method, instantiate the good strategy given its label */
+ static ActivityStrategy getNewInstance(DisworkConfig config, String label) {
+ return getNewInstance(config, valueOfLabel(label));
+ }
+
+ static ActivityStrategy getNewInstance(DisworkConfig config, ActivityStrategies strat) {
+ ActivityStrategy result = null;
+ switch (strat) {
+ case NONE:
+ result = new NoActivity();
+ break;
+ case UNLIMITED:
+ result = new UnlimitedActivity();
+ break;
+ case LIMITED:
+ result = new LimitedActivity();
+ break;
+ case SCHEDULED:
+ result = new ScheduledActivity(config);
+ break;
+ }
+ return result;
+ }
+ }
+
/** use this strategy to never run a job */
public static class NoActivity implements ActivityStrategy {
@Override
public boolean canWork() {
return false;
}
+
+ @Override
+ public long timeBeforeNextUpdate() {
+ // this will never change
+ return -1;
+ }
+
+ @Override
+ public String toString() {
+ return "no activity";
+ }
}
/** use this strategy to always run a job */
@@ -76,21 +135,32 @@
public boolean canWork() {
return true;
}
+
+ @Override
+ public long timeBeforeNextUpdate() {
+ // this will never change
+ return -1;
+ }
+
+ @Override
+ public String toString() {
+ return "unlimited activity";
+ }
}
/** use this strategy to run a job only if computer is idling */
public static class LimitedActivity implements ActivityStrategy {
-
+
private static final Log log = LogFactory.getLog(LimitedActivity.class);
-
- protected static class LoadAverageMonitoring extends Thread {
+ protected class LoadAverageMonitoring extends Thread {
+
OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
Double loadAverageNow = 1.0;
Double loadAverage5MinutesBefore = 1.0;
Double loadAverage10MinutesBefore = 1.0;
-
+
@Override
public void run() {
while (true) {
@@ -108,9 +178,9 @@
}
}
}
-
- LoadAverageMonitoring monitoring;
-
+
+ LoadAverageMonitoring monitoring;
+
public LimitedActivity() {
monitoring = new LoadAverageMonitoring();
monitoring.start();
@@ -123,13 +193,25 @@
&& monitoring.loadAverage10MinutesBefore < 1.0;
return canWork;
}
+
+ @Override
+ public long timeBeforeNextUpdate() {
+ return 5 * 60 * 1000;
+ }
+
+ @Override
+ public String toString() {
+ return "limited activity";
+ }
}
/** use this strategy to run a job only at fixed times of the week */
public static class ScheduledActivity implements ActivityStrategy {
protected DisworkConfig config;
-
+
+ protected long timeBeforeNextUpdate = 1000L;
+
protected ScheduledActivity(DisworkConfig config) {
this.config = config;
}
@@ -138,14 +220,40 @@
public boolean canWork() throws DisworkException {
Date currentDate = new Date();
boolean result = false;
+ Date nextChange = new Date(System.currentTimeMillis() + 60 * 60 * 1000);
for (CronExpression pattern : config.getSchedule()) {
result = result || pattern.isSatisfiedBy(currentDate);
+ Date aDate = pattern.getNextInvalidTimeAfter(currentDate);
+ if (aDate.before(nextChange)) {
+ nextChange = aDate;
+ }
+ aDate = pattern.getNextValidTimeAfter(currentDate);
+ if (aDate.before(nextChange)) {
+ nextChange = aDate;
+ }
}
+ timeBeforeNextUpdate = nextChange.getTime() - currentDate.getTime();
return result;
}
+
+ @Override
+ public long timeBeforeNextUpdate() {
+ return timeBeforeNextUpdate;
+ }
+
+ @Override
+ public String toString() {
+ return "scheduled activity";
+ }
}
/** return true if a job can be run */
- boolean canWork() throws DisworkException;
+ boolean canWork() throws DisworkException;
+
+ /**
+ *
+ * @return time to wait before next update, -1 is never (wait definitly)
+ */
+ long timeBeforeNextUpdate();
}
\ No newline at end of file
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java 2010-07-07 09:22:04 UTC (rev 98)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java 2010-07-07 14:56:36 UTC (rev 99)
@@ -196,7 +196,7 @@
/** worker-manager will make the daemon accomplish jobs */
protected WorkersManager workers;
-
+
protected HttpFrontEnd httpFrontEnd;
public DisworkDaemon(DisworkConfig config) throws DisworkException {
@@ -817,4 +817,36 @@
throw new DisworkException("can't read hardware infos ", e);
}
}
+
+ public void activeNoActivityStrategy() throws DisworkException {
+ if (workers == null) {
+ log.warn("trying to change activy while working is disabled");
+ } else {
+ workers.activeNoActivityStrategy();
+ }
+ }
+
+ public void activeUnlimitedActivityStrategy() throws DisworkException {
+ if (workers == null) {
+ log.warn("trying to change activy while working is disabled");
+ } else {
+ workers.activeUnlimitedActivityStrategy();
+ }
+ }
+
+ public void activeLimitedActivityStrategy() throws DisworkException {
+ if (workers == null) {
+ log.warn("trying to change activy while working is disabled");
+ } else {
+ workers.activeLimitedActivityStrategy();
+ }
+ }
+
+ public void activeScheduledActivityStrategy() throws DisworkException {
+ if (workers == null) {
+ log.warn("trying to change activy while working is disabled");
+ } else {
+ workers.activeScheduledActivityStrategy();
+ }
+ }
}
\ No newline at end of file
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-07-07 09:22:04 UTC (rev 98)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-07-07 14:56:36 UTC (rev 99)
@@ -44,6 +44,7 @@
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.nuiton.diswork.daemon.ActivityStrategy.ActivityStrategies;
import org.nuiton.diswork.fs.DisworkFileSystem;
import org.nuiton.diswork.fs.DisworkFileSystemException;
import org.nuiton.util.FileUtil;
@@ -105,6 +106,25 @@
protected File applicationCache;
+ protected Boolean flag = Boolean.FALSE;
+
+ protected final Object sem = new Object();
+
+ protected void updateFlag() throws DisworkException {
+ boolean newStatus = activityStrategy.canWork();
+ if (newStatus != flag) {
+ synchronized (sem) {
+ flag = newStatus;
+ sem.notifyAll();
+ }
+ }
+ }
+
+ protected Boolean getFlag() throws DisworkException {
+ updateFlag();
+ return flag;
+ }
+
/** A worker search a job and execute it.
*
* Jobs are found on the file-system by browsing some special directories,
@@ -512,11 +532,18 @@
public void run() {
while (! shouldStop) {
try {
- if (activityStrategy.canWork()) {
- findAJobAndRunIt();
- } else {
- // waiting for strategy to change state
- Thread.sleep(10 * 1000);
+ synchronized (sem) {
+ if (getFlag()) {
+ findAJobAndRunIt();
+ } else {
+ log.debug("sleeping until a change on flag");
+ long waitTime = activityStrategy.timeBeforeNextUpdate();
+ if (waitTime == -1) {
+ sem.wait();
+ } else {
+ sem.wait(waitTime);
+ }
+ }
}
} catch (DisworkException e) {
log.error("worker error " + e);
@@ -535,28 +562,15 @@
}
}
- public WorkersManager(DisworkFileSystem fileSystem, DisworkConfig config) {
+ public WorkersManager(DisworkFileSystem fileSystem, DisworkConfig config) throws DisworkException {
this.fileSystem = fileSystem;
this.config = config;
applicationCache = new File(config.getTempDirectory(), "cache");
applicationCache.mkdirs();
- // initialize activityStrategy according to config
- String initialStrategy = config.getActivityStrategy();
- if ("none".equals(initialStrategy)) {
- activeNoActivityStrategy();
- } else if ("unlimited".equals(initialStrategy)) {
- activeUnlimitedActivityStrategy();
- } else if ("limited".equals(initialStrategy)) {
- activeLimitedActivityStrategy();
- } else if ("scheduled".equals(initialStrategy)) {
- activeScheduledActivityStrategy();
- } else {
- log.error("wrong config directive for initial strategy" +
- initialStrategy);
- activeNoActivityStrategy();
- }
+ // initialize activityStrategy according to config
+ setActivityStrategy(config.getActivityStrategy());
// start as many workers as needed
log.info("will start " + config.getNumberOfWorkers() + " workers");
@@ -593,16 +607,18 @@
return cachedApplicationData;
}
- public void stop() {
+ public void stop() throws DisworkException {
stop(false);
}
- public void stop(boolean now) {
+ public void stop(boolean now) throws DisworkException {
// asking to all threads to stop
for (Worker worker : workers) {
worker.shouldStop = true;
}
+ activeNoActivityStrategy();
+
FileUtil.deleteRecursively(applicationCache);
if( !now ) {
@@ -624,24 +640,33 @@
return activityStrategy;
}
- public void setActivityStrategy(ActivityStrategy activityStrategy) {
+ public void setActivityStrategy(ActivityStrategy activityStrategy) throws DisworkException {
this.activityStrategy = activityStrategy;
+ log.info("swithching to " + activityStrategy);
+ updateFlag();
}
-
- public void activeNoActivityStrategy() {
- setActivityStrategy(new ActivityStrategy.NoActivity());
+
+ public void setActivityStrategy(String activityStrategyLabel) throws DisworkException {
+ setActivityStrategy(ActivityStrategies.getNewInstance(config, activityStrategyLabel));
}
-
- public void activeUnlimitedActivityStrategy() {
- setActivityStrategy(new ActivityStrategy.UnlimitedActivity());
+
+ public void setActivityStrategy(ActivityStrategies activityStrategies) throws DisworkException {
+ setActivityStrategy(ActivityStrategies.getNewInstance(config, activityStrategies));
}
-
- public void activeLimitedActivityStrategy() {
- setActivityStrategy(new ActivityStrategy.LimitedActivity());
+
+ public void activeNoActivityStrategy() throws DisworkException {
+ setActivityStrategy(ActivityStrategies.NONE);
}
-
- public void activeScheduledActivityStrategy() {
- setActivityStrategy(new ActivityStrategy.ScheduledActivity(config));
+
+ public void activeUnlimitedActivityStrategy() throws DisworkException {
+ setActivityStrategy(ActivityStrategies.UNLIMITED);
}
-
+
+ public void activeLimitedActivityStrategy() throws DisworkException {
+ setActivityStrategy(ActivityStrategies.LIMITED);
+ }
+
+ public void activeScheduledActivityStrategy() throws DisworkException {
+ setActivityStrategy(ActivityStrategies.SCHEDULED);
+ }
}
\ No newline at end of file
1
0
07 Jul '10
Author: bleny
Date: 2010-07-07 11:22:04 +0200 (Wed, 07 Jul 2010)
New Revision: 98
Url: http://nuiton.org/repositories/revision/diswork/98
Log:
scheduler, doc, ajout fonctionnalit?\195?\169 dans le FS, front-end web
Added:
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java
trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkConfigTest.java
Modified:
trunk/diswork-daemon/pom.xml
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkException.java
trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java
trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java
trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java
trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java
trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystem.java
trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/Storage.java
trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/AbstractDisworkFileSystemTest.java
trunk/pom.xml
trunk/src/site/rst/diswork-daemon/devel.rst
trunk/src/site/rst/user/how_to_use.rst
Modified: trunk/diswork-daemon/pom.xml
===================================================================
--- trunk/diswork-daemon/pom.xml 2010-07-01 14:35:38 UTC (rev 97)
+++ trunk/diswork-daemon/pom.xml 2010-07-07 09:22:04 UTC (rev 98)
@@ -31,6 +31,10 @@
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.nuiton</groupId>
<artifactId>nuiton-utils</artifactId>
</dependency>
@@ -38,6 +42,19 @@
<groupId>commons-daemon</groupId>
<artifactId>commons-daemon</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.quartz-scheduler</groupId>
+ <artifactId>quartz</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1-jetty</artifactId>
+ <version>6.1.24</version>
+ </dependency>
<!-- test -->
<dependency>
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java 2010-07-01 14:35:38 UTC (rev 97)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/ActivityStrategy.java 2010-07-07 09:22:04 UTC (rev 98)
@@ -26,9 +26,11 @@
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
+import java.util.Date;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.quartz.CronExpression;
/**
* This interface introduce the concept of "activity strategy" in
@@ -59,7 +61,7 @@
*
*/
public interface ActivityStrategy {
-
+
/** use this strategy to never run a job */
public static class NoActivity implements ActivityStrategy {
@Override
@@ -126,14 +128,24 @@
/** use this strategy to run a job only at fixed times of the week */
public static class ScheduledActivity implements ActivityStrategy {
+ protected DisworkConfig config;
+
+ protected ScheduledActivity(DisworkConfig config) {
+ this.config = config;
+ }
+
@Override
- public boolean canWork() {
- // TODO 20100615 bleny Auto-generated method stub
- throw new UnsupportedOperationException("not yet implemented");
+ public boolean canWork() throws DisworkException {
+ Date currentDate = new Date();
+ boolean result = false;
+ for (CronExpression pattern : config.getSchedule()) {
+ result = result || pattern.isSatisfiedBy(currentDate);
+ }
+ return result;
}
}
/** return true if a job can be run */
- boolean canWork();
+ boolean canWork() throws DisworkException;
}
\ No newline at end of file
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java 2010-07-01 14:35:38 UTC (rev 97)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkConfig.java 2010-07-07 09:22:04 UTC (rev 98)
@@ -24,20 +24,28 @@
*/
package org.nuiton.diswork.daemon;
+import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
+import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
+import java.io.Reader;
import java.lang.management.ManagementFactory;
+import java.text.ParseException;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
+import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuiton.diswork.fs.DisworkFileSystemConfig;
import org.nuiton.util.ApplicationConfig;
+import org.quartz.CronExpression;
/**
*
@@ -72,6 +80,18 @@
* this is a property file</dd>
* </dl>
*
+ * For the http front-end :
+ * <dl>
+ * <dt>diswork.httpfrontend.start</dt>
+ * <dd>"true" and the front end will start at diswork boot. Note that
+ * the front-end can be started later even if this config directive is
+ * at "false"</dd>
+ * <dt>diswork.httpfrontend.port</dt>
+ * <dd>the port to use for HTTP server. It means that the browser while
+ * have to use this port to get connected to the front-end. Default
+ * port is 8080.</dd>
+ * </dl>
+ *
* @author bleny
*/
public class DisworkConfig extends ApplicationConfig {
@@ -105,15 +125,33 @@
// if no total_uptime saved, consider daemon has never run
setDefaultOption("diswork.total_uptime", "0");
- }
- public static DisworkConfig newConfig() {
- DisworkConfig newConfig = new DisworkConfig();
- newConfig.setFileSystemConfig(DisworkFileSystemConfig.newKademliaDisworkConfig());
- return newConfig;
+ setOption("diswork.httpfrontend.start", "true");
+ setOption("diswork.httpfrontend.port", "8080");
+
+ setFileSystemConfig(DisworkFileSystemConfig.newKademliaDisworkConfig());
}
+
/**
+ *
+ * @return null if no file specified
+ */
+ public File getTokensFile() {
+ String path = getOption("diswork.tokens_file");
+ if (path == null) {
+ return null;
+ } else {
+ File file = new File(path);
+ return file;
+ }
+ }
+
+ public void setTokensFile(String tokensFilePath) {
+ setOption("diswork.tokens_file", tokensFilePath);
+ }
+
+ /**
* Read the tokens file if one is given in the config and merge the content
* of this file into {@link #tokens}
*
@@ -139,9 +177,13 @@
Properties userTokens = new Properties();
userTokens.load(tokensStream);
for (String token : userTokens.stringPropertyNames()) {
- log.debug("adding token " + token + " → " +
- userTokens.getProperty(token));
- tokens.put(token, userTokens.getProperty(token));
+ String replacement = userTokens.getProperty(token);
+ // prevent java to replace \t by a tab in C:\temp
+ String escapedReplacement = StringEscapeUtils.escapeJava
+ (StringEscapeUtils.escapeJava(replacement));
+
+ log.debug("adding token " + token + " → " + escapedReplacement);
+ tokens.put(token, escapedReplacement);
}
} catch (FileNotFoundException e) {
log.warn("tokens file not found, 0 tokens loaded", e);
@@ -154,6 +196,8 @@
}
}
+
+
protected String applyTokensRecursively(String commandLine) {
String result = commandLine;
for (String token : tokens.keySet()) {
@@ -173,7 +217,9 @@
initTokens();
}
- tokens.put("%tmp", tempDir);
+ if (tempDir != null) {
+ tokens.put("%tmp", tempDir);
+ }
return applyTokensRecursively(commandLine);
}
@@ -183,18 +229,86 @@
- public String getTempDirectory() {
- return getOption("diswork.temp_directory");
+ protected List<CronExpression> schedule;
+
+ /**
+ *
+ * @return null if no path for a file have been specified
+ * @throws DisworkException
+ */
+ protected List<CronExpression> getSchedule() throws DisworkException {
+ // lazy instanciation of schedule
+ if (schedule == null) {
+ String path = getOption("diswork.schedule_file");
+ if (path != null) {
+ File file = new File(path);
+ schedule = new ArrayList<CronExpression>();
+ try {
+ Reader reader = new FileReader(file);
+ BufferedReader in = new BufferedReader(reader);
+ String line;
+ while ((line = in.readLine()) != null) {
+ if (!line.startsWith("#") && !"".equals(line)) {
+ try {
+ schedule.add(new CronExpression(line));
+ } catch (ParseException e) {
+ log.warn("failed to parse " + line + " : line ignored");
+ }
+ }
+ }
+ } catch (FileNotFoundException e) {
+ log.error(e);
+ throw new DisworkException(e);
+ } catch (IOException e) {
+ log.error(e);
+ throw new DisworkException(e);
+ }
+ }
+ }
+ return schedule;
}
+
+ public void setSheduleFile(String path) {
+ setOption("diswork.schedule_file", path);
+ }
+
+
+
- public String getOwnerId() {
+ /* ** those config data are not set by the user but computed by daemon ** */
+
+ protected String getOwnerId() {
return getOption("diswork.owner");
}
- public void setOwnerId(String ownerId) {
+ protected void setOwnerId(String ownerId) {
setOption("diswork.owner", ownerId);
}
-
+
+ protected Long getTotalUptime() {
+ String upTime = getOption("diswork.total_uptime");
+ return Long.parseLong(upTime);
+ }
+
+ protected void setTotalUptime(Long upTime) {
+ setOption("diswork.total_uptime", upTime.toString());
+ }
+
+ protected void setFirstRunTime(Long time) {
+ setOption("diswork.first_run_time", time.toString());
+ }
+
+ protected Long getFirstRunTime() {
+ String firstRunTime = getOption("diswork.first_run_time");
+ return Long.parseLong(firstRunTime);
+ }
+
+ /* ** trivial applicationConfig setters and getters ** */
+
+ public String getTempDirectory() {
+ return getOption("diswork.temp_directory");
+ }
+
public String getBootstrapIp() {
return fileSystemConfig.getBootstrapIp();
}
@@ -242,40 +356,20 @@
public void setActivityStrategy(String activityStrategyName) {
setOption("diswork.activity_strategy", activityStrategyName);
}
-
- public Long getTotalUptime() {
- String upTime = getOption("diswork.total_uptime");
- return Long.parseLong(upTime);
+
+ public Boolean getStartHttpFrontend() {
+ return getOptionAsBoolean("diswork.httpfrontend.start");
}
- public void setTotalUptime(Long upTime) {
- setOption("diswork.total_uptime", upTime.toString());
+ public void setStartHttpFrontend(Boolean startHttpFrontend) {
+ setOption("diswork.httpfrontend.start", startHttpFrontend.toString());
}
- public void setFirstRunTime(Long time) {
- setOption("diswork.first_run_time", time.toString());
+ public Integer getHttpFrontendPort() {
+ return getOptionAsInt("diswork.httpfrontend.port");
}
-
- public Long getFirstRunTime() {
- String firstRunTime = getOption("diswork.first_run_time");
- return Long.parseLong(firstRunTime);
- }
- /**
- *
- * @return null if no file specified
- */
- public File getTokensFile() {
- String path = getOption("diswork.tokens_file");
- if (path == null) {
- return null;
- } else {
- File file = new File(path);
- return file;
- }
+ public void setHttpFrontendPort(Integer httpFrontendPort) {
+ setOption("diswork.httpfrontend.port", httpFrontendPort.toString());
}
-
- public void setTokensFile(String tokensFilePath) {
- setOption("diswork.tokens_file", tokensFilePath);
- }
}
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java 2010-07-01 14:35:38 UTC (rev 97)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkDaemon.java 2010-07-07 09:22:04 UTC (rev 98)
@@ -24,7 +24,6 @@
*/
package org.nuiton.diswork.daemon;
-import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.lang.management.ManagementFactory;
@@ -133,7 +132,7 @@
*
* @author bleny
*/
-public class DisworkDaemon implements Closeable {
+public class DisworkDaemon {
private static final Log log = LogFactory.getLog(DisworkDaemon.class);
@@ -198,12 +197,14 @@
/** worker-manager will make the daemon accomplish jobs */
protected WorkersManager workers;
+ protected HttpFrontEnd httpFrontEnd;
+
public DisworkDaemon(DisworkConfig config) throws DisworkException {
this.config = config;
+ sessionStartTime = System.currentTimeMillis();
// step by step, set all the dependencies of the daemon
- // init the file-system
initFileSystem();
initOwnerIdAndHomeDir();
@@ -212,7 +213,8 @@
writeHardwareInfos();
- sessionStartTime = System.currentTimeMillis();
+ httpFrontEnd = new HttpFrontEnd(config, this);
+
}
/* *** init methods, used once by the constructor */
@@ -281,7 +283,7 @@
config.setOwnerId(ownerId);
- config.setFirstRunTime(System.currentTimeMillis());
+ config.setFirstRunTime(sessionStartTime);
// config.saveForUser();
}
@@ -324,7 +326,17 @@
throw new DisworkException("can't write hardware infos", e);
}
}
-
+
+ /* *** methods about the web based front end *** */
+
+ public void startHttpFrontEnd() throws DisworkException {
+ httpFrontEnd.start();
+ }
+
+ public void stopHttpFrontEnd() throws DisworkException {
+ httpFrontEnd.stop();
+ }
+
/* *** methods for defining some usual paths *** */
/**
@@ -412,22 +424,31 @@
throw new DisworkException("unable to publish application", e);
}
}
-
+
/**
- * Returns a list with all jobs submitted before.
- * @return
+ * return a list of jobs submitted before for a particular application
+ * @param applicationName the name of the application, can be null
+ * @return the submitted jobs (all if parameter was null, and only jobs
+ * for a particular application if parameter was set)
* @throws DisworkException
*/
- public List<JobDescription> getAllJobs() throws DisworkException {
+ public List<JobDescription> getAllJobs(String applicationName)
+ throws DisworkException {
List<JobDescription> result = new ArrayList<JobDescription>();
try {
- List<String> jobs = fileSystem.readDirectory(homeDir + "/" + JOBS_DIR);
- for (String jobId : jobs) {
- String jsdl = IOUtils.toString(fileSystem.read(
+ String jobsDir = homeDir + "/" + JOBS_DIR;
+ if (fileSystem.exists(jobsDir)) {
+ List<String> jobs = fileSystem.readDirectory(jobsDir);
+ for (String jobId : jobs) {
+ String jsdl = IOUtils.toString(fileSystem.read(
getPathForJob(jobId) + "/" + JSDL_PATH));
- JobDescription jobDescription = JobDescription.parseJSDL(jsdl);
- jobDescription.setJobId(jobId);
- result.add(jobDescription);
+ JobDescription jobDescription = JobDescription.parseJSDL(jsdl);
+ if (applicationName == null // don't filter application
+ || jobDescription.getApplicationName().equals(applicationName)) {
+ jobDescription.setJobId(jobId);
+ result.add(jobDescription);
+ }
+ }
}
} catch (IOException e) {
log.error("error in file while reading home-dir", e);
@@ -437,18 +458,38 @@
throw new DisworkException("file-system error", e);
}
return result;
+
}
-
+
/**
+ * Returns a list with all jobs submitted before.
+ * @return
+ * @throws DisworkException
+ */
+ public List<JobDescription> getAllJobs() throws DisworkException {
+ return getAllJobs(null);
+ }
+
+ /**
* Cancel the submission of a job
* @param jobDescription
* @throws DisworkException
*/
- public void deleteJob(JobDescription jobDescription) {
- // TODO 20100618 bleny stub
- throw new UnsupportedOperationException();
+ public void deleteJob(JobDescription jobDescription) throws DisworkException {
+ try {
+ String jobPath = getPathForJob(jobDescription);
+ if (fileSystem.exists(jobPath)) {
+ log.info("cancelling job " + jobDescription);
+ fileSystem.deleteRecursively(jobPath);
+ } else {
+ log.warn("can't cancel unknown job " + jobDescription);
+ }
+ } catch (DisworkFileSystemException e) {
+ log.info("error while deleting a job", e);
+ throw new DisworkException("error while deleting a job", e);
+ }
}
-
+
public void submitJob(JobDescription jobDescription) throws DisworkException {
if (jobDescription.getInputData().size() + jobDescription.getInputUrls().size()
@@ -531,7 +572,7 @@
throw new DisworkException(e);
}
}
-
+
protected boolean checkLogContains(JobDescription job,
String pattern) throws DisworkException {
try {
@@ -542,23 +583,27 @@
log.info("unable to read log ", e);
throw new DisworkException("unable to read log ", e);
} catch (DisworkFileSystemException e) {
- log.error("unable to read log file" + job, e);
- throw new DisworkException("unable to read log file" + job, e);
+ log.error("unable to read log file " + job, e);
+ throw new DisworkException("unable to read log file " + job, e);
}
}
-
+
+ public boolean isStarted(JobDescription job) throws DisworkException {
+ return checkLogContains(job, "STARTED");
+ }
+
public boolean isFinished(JobDescription job) throws DisworkException {
return checkLogContains(job, "FINISHED");
}
-
+
public boolean isSuccessful(JobDescription job) throws DisworkException {
return checkLogContains(job, "DONE");
}
-
+
public boolean isFailed(JobDescription job) throws DisworkException {
return isFinished(job) && !isSuccessful(job);
}
-
+
/**
* Must not be called until {@link #isFinished(JobDescription)} returns
* true for this job.
@@ -589,22 +634,45 @@
}
+
+ /**
+ *
+ */
+ public void leave() throws DisworkException {
+ try {
+ fileSystem.deleteRecursively(homeDir);
+ } catch (DisworkFileSystemException e) {
+ log.error("error while leaving the system", e);
+ throw new DisworkException("error while leaving the system", e);
+ }
+ }
+
/** close the daemon (stop all workers)
* update statistics and delete all temporary data
*/
- @Override
- public void close() throws IOException {
+ public void close() throws DisworkException {
if (workers != null) {
workers.stop();
}
+ try {
+ fileSystem.delete(homeDir + "/" + HARDINFO_PATH);
+ } catch (DisworkFileSystemException e) {
+ log.warn(e);
+ }
+
// updating total uptime statistic
- Long totalUptime = getTotalUptime();
+ Long totalUptime = getTotalUptime();
log.info("saving total uptime: " + totalUptime);
config.setTotalUptime(totalUptime);
//config.saveForUser();
- fileSystem.close();
+ try {
+ fileSystem.close();
+ } catch (DisworkFileSystemException e) {
+ log.error("unable to close file system", e);
+ throw new DisworkException(e);
+ }
FileUtil.deleteRecursively(config.getTempDirectory());
}
@@ -624,20 +692,93 @@
* this method return 0.5.
* @return
*/
- public Double getUptimeRatio() {
+ protected Double getUptimeRatio() {
Double uptimeRatio = (double) getTotalUptime().longValue()
/ ((double) System.currentTimeMillis()
- (double) config.getFirstRunTime().longValue());
return uptimeRatio;
}
+
+ public Map<String, String> getLocalStats() throws DisworkException {
+ Map<String, String> result = new HashMap<String, String>();
+ result.put("total_uptime", getTotalUptime().toString());
+ result.put("uptime_ratio", getUptimeRatio().toString());
+
+ // TODO 20100706 bleny compute number of jobs done, number of jobs submitted, ratio,
+ result.put("jobs_done", "?");
+ result.put("jobs_submitted", "?");
+ result.put("jobs_ratio", "?");
+
+ // TODO 20100706 bleny compute score
+ result.put("score", "0");
+ return result;
+ }
+
+ /** a file that contains global stats about diswork */
+ protected static final String GLOBAL_STATS_PATH = "/proc/globalstats";
+ protected Map<String, String> updateGlobalStats() throws DisworkException {
+ try {
+ log.info("global stats file doesn't exists, creating one");
+
+ Long currentTime = System.currentTimeMillis();
+ Map<String, Long> stats = new HashMap<String, Long>();
+ Long availableProcessors = 0L;
+ Map<String, String> result = new HashMap<String, String>();
+
+ List<String> homeDirs = fileSystem.readDirectory(HOME);
+ for (String homeDir : homeDirs) {
+ String hardInfoPath = HOME + "/" + homeDir + "/" + HARDINFO_PATH;
+ if (fileSystem.exists(hardInfoPath)) {
+ String hardInfoContent = IOUtils.toString(
+ fileSystem.read(hardInfoPath));
+ String[] infos = hardInfoContent.split("\n");
+
+ // first line, reading the OS name
+ if (!stats.containsKey(infos[0])) {
+ stats.put(infos[0], 0L);
+ }
+ stats.put(infos[0], stats.get(infos[0]) + 1);
+
+ // second line, reading the architecture
+ if (!stats.containsKey(infos[1])) {
+ stats.put(infos[1], 0L);
+ }
+ stats.put(infos[1], stats.get(infos[1]) + 1);
+
+ // third line, reading the number of processors
+ availableProcessors += Integer.parseInt(infos[2]);
+ }
+ }
+ stats.put("available_processors", availableProcessors);
+ stats.put("date", currentTime);
+
+ // write the result
+ String statsFileContent = "";
+ for (String key : stats.keySet()) {
+ result.put(key, stats.get(key).toString());
+ statsFileContent += key + "\t" + stats.get(key) + "\n";
+ }
+
+ log.debug("writing stats file " + statsFileContent);
+ fileSystem.write(GLOBAL_STATS_PATH, IOUtils.toInputStream(statsFileContent));
+
+ return result;
+ } catch (DisworkFileSystemException e) {
+ log.error("file system error ", e);
+ throw new DisworkException("file system error ", e);
+ } catch (IOException e) {
+ log.error("can't write hardware infos ", e);
+ throw new DisworkException("can't read hardware infos ", e);
+ }
+ }
+
/** get infos on hardware available on the global Diswork system
*
* @return
* @throws DisworkException
*/
public Map<String, String> getGlobalStats() throws DisworkException {
- final String globalStatsPath = "/proc/globalstats";
final int timeBeforeGlobalStatsAreObsolete = 1 * 60 * 60 * 1000;
// in this file, key and values are split with \t and entries are split
@@ -645,11 +786,10 @@
// it and date it. If file is obsolete read it and delete it
try {
- Long currentTime = System.currentTimeMillis();
Map<String, String> result = new HashMap<String, String>();
- if (fileSystem.exists(globalStatsPath)) {
- String globalStats = IOUtils.toString(fileSystem.read(globalStatsPath));
+ if (fileSystem.exists(GLOBAL_STATS_PATH)) {
+ String globalStats = IOUtils.toString(fileSystem.read(GLOBAL_STATS_PATH));
log.debug("global stats file found, reading " + globalStats);
String[] lines = globalStats.split("\n");
for (String line : lines) {
@@ -658,49 +798,14 @@
}
// delete the file if it's too old
+ Long currentTime = System.currentTimeMillis();
Long statsTime = Long.parseLong(result.get("date"));
if (currentTime - statsTime > timeBeforeGlobalStatsAreObsolete) {
log.info("deleting global stats file");
- fileSystem.delete(globalStatsPath);
+ fileSystem.delete(GLOBAL_STATS_PATH);
}
} else {
- log.info("global stats file doesn't exists, creating one");
- Map<String, Long> stats = new HashMap<String, Long>();
- Long availableProcessors = 0L;
-
- List<String> homeDirs = fileSystem.readDirectory(HOME);
- for (String homeDir : homeDirs) {
- String hardinfo = IOUtils.toString(fileSystem.read(
- HOME + "/" + homeDir + "/" + HARDINFO_PATH));
- String[] infos = hardinfo.split("\n");
-
- // reading the OS name
- if (!stats.containsKey(infos[0])) {
- stats.put(infos[0], 0L);
- }
- stats.put(infos[0], stats.get(infos[0]) + 1);
-
- // reading the architecture
- if (!stats.containsKey(infos[1])) {
- stats.put(infos[1], 0L);
- }
- stats.put(infos[1], stats.get(infos[1]) + 1);
-
- // reading the number of processors
- availableProcessors += Integer.parseInt(infos[2]);
- }
- stats.put("available_processors", availableProcessors);
- stats.put("date", currentTime);
-
- // write the result
- String statsFileContent = "";
- for (String key : stats.keySet()) {
- result.put(key, stats.get(key).toString());
- statsFileContent += key + "\t" + stats.get(key) + "\n";
- }
-
- log.debug("writing stats file " + statsFileContent);
- fileSystem.write(globalStatsPath, IOUtils.toInputStream(statsFileContent));
+ result = updateGlobalStats();
}
return result;
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkException.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkException.java 2010-07-01 14:35:38 UTC (rev 97)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/DisworkException.java 2010-07-07 09:22:04 UTC (rev 98)
@@ -30,7 +30,7 @@
*/
public class DisworkException extends Exception {
- private static final long serialVersionUID = -6434751198109021511L;
+ private static final long serialVersionUID = 1L;
public DisworkException(String message, Throwable cause) {
super(message, cause);
Added: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java (rev 0)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/HttpFrontEnd.java 2010-07-07 09:22:04 UTC (rev 98)
@@ -0,0 +1,190 @@
+package org.nuiton.diswork.daemon;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+
+public class HttpFrontEnd {
+
+ private static final Log log = LogFactory.getLog(HttpFrontEnd.class);
+
+ protected DisworkConfig config;
+
+ protected Server server;
+
+ protected DisworkDaemon daemon;
+
+ public HttpFrontEnd(DisworkConfig config, DisworkDaemon daemon)
+ throws DisworkException {
+ this.config = config;
+ this.daemon = daemon;
+
+ if (config.getStartHttpFrontend()) {
+ start();
+ }
+ }
+
+ protected void initServer() {
+ log.info("web server use port " + config.getHttpFrontendPort());
+ server = new Server(config.getHttpFrontendPort());
+ Context root = new Context(server, "/", Context.NO_SESSIONS);
+ root.addServlet(new ServletHolder(new MainServlet()), "/");
+ }
+
+ public void start() throws DisworkException {
+ if (server == null) {
+ initServer();
+ }
+ log.info("starting web front-end");
+ try {
+ server.start();
+ } catch (Exception e) {
+ log.error("error while booting http server", e);
+ throw new DisworkException("error while booting http server", e);
+ }
+ }
+
+ public void stop() throws DisworkException {
+ log.info("stopping web front-end");
+ try {
+ if (server != null) {
+ server.stop();
+ } // else server has never been started
+ } catch (Exception e) {
+ log.error("error while stopping server", e);
+ throw new DisworkException("error while stopping server", e);
+ }
+ }
+
+ public class MainServlet extends HttpServlet {
+ @Override
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+ throws ServletException, IOException {
+
+ log.info("page request");
+
+ String pageContent = "<html>\n"
+ + "<h1>Diswork Node</h1>" + "\n\n"
+ + "<h2>Submitted jobs</h2>" + "\n\n"
+ + "<table>" + "\n"
+ + " <tr>" + "\n"
+ + " <th>Name</th>" + "\n"
+ + " <th>Application</th>" + "\n"
+ + " <th>Status</th>" + "\n"
+ + " </tr>" + "\n";
+
+ List<JobDescription> jobs;
+ try {
+ jobs = daemon.getAllJobs();
+ } catch (DisworkException e) {
+ log.error("error while retrieving local stats", e);
+ throw new ServletException("error while retrieving local stats", e);
+ }
+
+ if (jobs.isEmpty()) {
+ pageContent += " <tr>\n"
+ + " <td colspan=\"3\"><em>no job submitted</em></td>\n"
+ + " </tr>\n";
+ } else {
+ for (JobDescription job : jobs) {
+
+ String status = "unknown";
+ try {
+ if (daemon.isFinished(job)) {
+ if (daemon.isSuccessful(job)) {
+ status = "done";
+ } else {
+ status = "failed";
+ }
+ } else {
+ if (daemon.isStarted(job)) {
+ status = "started";
+ } else {
+ status = "waiting";
+ }
+ }
+ } catch (DisworkException e) {
+ log.error("unable to read job status", e);
+ throw new ServletException("unable to read job status", e);
+ }
+
+
+ pageContent += " <tr>\n"
+ + " <td>" + job.getJobName() + "</td>\n"
+ + " <td>" + job.getApplicationName() + "</td>\n"
+ + " <td>" + status + "</td>\n"
+ + " </tr>\n";
+ }
+ }
+
+ pageContent += "</table>\n\n";
+
+ pageContent += "<h2>Diswork statistics</h2>\n\n";
+ Map<String, String> stats;
+ try {
+ stats = daemon.getLocalStats();
+ } catch (DisworkException e) {
+ log.error("error while retrieving local stats", e);
+ throw new ServletException("error while retrieving local stats", e);
+ }
+
+ pageContent += "<h3>Local statistics</h3>" + "\n"
+ + "<table>" + "\n"
+ + " <tr>" + "\n"
+ + " <th>Key</th>" + "\n"
+ + " <th>Value</th>" + "\n"
+ + " </tr>" + "\n";
+ for (String stat : stats.keySet()) {
+ pageContent += " <tr>" + "\n"
+ + " <td>" + "\n"
+ + " " + stat + "\n"
+ + " </td>" + "\n"
+ + " <td>" + "\n"
+ + " " + stats.get(stat) + "\n"
+ + " </td>" + "\n"
+ + " </tr>" + "\n";
+ }
+
+ pageContent += "</table>" + "\n"
+ + "<h3>Global statistics</h3>" + "\n"
+ + "<table>" + "\n"
+ + " <tr>" + "\n"
+ + " <th>Key</th>" + "\n"
+ + " <th>Value</th>" + "\n"
+ + " </tr>" + "\n";
+
+ try {
+ stats = daemon.getGlobalStats();
+ } catch (DisworkException e) {
+ log.error("error while retrieving local stats", e);
+ throw new ServletException("error while retrieving local stats", e);
+ }
+ for (String stat : stats.keySet()) {
+ pageContent += " <tr>" + "\n"
+ + " <td>" + "\n"
+ + " " + stat + "\n"
+ + " </td>" + "\n"
+ + " <td>" + "\n"
+ + " " + stats.get(stat) + "\n"
+ + " </td>" + "\n"
+ + " </tr>" + "\n";
+ }
+
+ pageContent += "</table>\n<html>" + "\n";
+
+ resp.getWriter().write(pageContent);
+ }
+ }
+
+}
Modified: trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java
===================================================================
--- trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-07-01 14:35:38 UTC (rev 97)
+++ trunk/diswork-daemon/src/main/java/org/nuiton/diswork/daemon/WorkersManager.java 2010-07-07 09:22:04 UTC (rev 98)
@@ -41,13 +41,13 @@
import java.util.List;
import java.util.Map;
-import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuiton.diswork.fs.DisworkFileSystem;
import org.nuiton.diswork.fs.DisworkFileSystemException;
import org.nuiton.util.FileUtil;
+import org.nuiton.util.StringUtil;
import org.nuiton.util.ZipUtil;
/**
@@ -103,6 +103,8 @@
/** the current activity strategy followed by all the workers */
protected ActivityStrategy activityStrategy;
+ protected File applicationCache;
+
/** A worker search a job and execute it.
*
* Jobs are found on the file-system by browsing some special directories,
@@ -262,20 +264,11 @@
log.info("dependency needed for " + jobDescription + " (" +
jobDescription.getApplicationName() + "-" +
jobDescription.getApplicationVersion() + ")");
- String applicationPath = DisworkDaemon.getPathForDependency(
+ File application = getApplicationData(
jobDescription.getApplicationName(),
jobDescription.getApplicationVersion());
- InputStream applicationData = fileSystem.read(applicationPath);
-
- File application = new File(jobDir,
- FilenameUtils.getName(applicationPath));
- application.createNewFile();
- log.info("will create " + application.getAbsolutePath());
- OutputStream out = new FileOutputStream(application);
- log.debug("starting copy of " + applicationData.available() + " bytes");
- IOUtils.copy(applicationData, out);
+ // unzip application
log.info("unzip application start");
- // unzip application
ZipUtil.uncompress(application, jobDir);
log.info("unzip application finished");
} else {
@@ -305,7 +298,8 @@
String commandLine = config.parseCommandLine(
jobDescription.getCommandLine(),
jobDir.getAbsolutePath());
- String[] commandLineElements = commandLine.split(" ");
+ // String[] commandLineElements = commandLine.split(" ");
+ String[] commandLineElements = StringUtil.split(commandLine, " ");
ProcessBuilder builder = new ProcessBuilder(commandLineElements);
builder.directory(jobDir);
builder.redirectErrorStream(true);
@@ -353,18 +347,20 @@
for (String fileName : jobDescription.getOutput()) {
log.info("staging file " + fileName);
File localCopy = new File(jobDir, fileName);
- // FIXME 20100616 bleny may not exists if job has fail
- InputStream localCopyStream = new FileInputStream(localCopy);
- String filePath = jobPath + "/" + fileName;
+ if (localCopy.exists()) {
+ InputStream localCopyStream = new FileInputStream(localCopy);
- // erase before write
- if (fileSystem.exists(filePath)) {
- fileSystem.delete(filePath);
+ String filePath = jobPath + "/" + fileName;
+
+ // erase before write
+ if (fileSystem.exists(filePath)) {
+ fileSystem.delete(filePath);
+ }
+
+ fileSystem.write(filePath, localCopyStream);
+ localCopyStream.close();
}
-
- fileSystem.write(filePath, localCopyStream);
- localCopyStream.close();
}
// clean up the job directory
@@ -477,27 +473,35 @@
String jobPath = jobLinkDir + "/" + jobLinkName;
- boolean jobSuccess = runJob(jobPath);
-
- // move the link after the job
- String newDir = null;
- if (jobSuccess) {
- newDir = DisworkDaemon.DONE;
+ if (fileSystem.exists(jobPath + "/" + DisworkDaemon.JSDL_PATH)) {
+
+ log(jobPath, "STARTED");
+
+ boolean jobSuccess = runJob(jobPath);
+
+ // move the link after the job
+ String newDir = null;
+ if (jobSuccess) {
+ newDir = DisworkDaemon.DONE;
+ } else {
+ newDir = FAILED_MOVE.get(jobLinkDir);
+ }
+ String newPath = newDir + "/" + DisworkDaemon.newJobLinkName();
+
+ log.info("moving " + jobPath + " to " + newPath);
+ fileSystem.move(jobPath, newPath);
+
+ // mark the job has finished if done or failed too many
+ // times
+ if ( newDir == DisworkDaemon.DONE ||
+ newDir == DisworkDaemon.FAILED_3) {
+ log.info("marking " + newPath + " as finished");
+ log(newPath, "FINISHED");
+ }
} else {
- newDir = FAILED_MOVE.get(jobLinkDir);
+ // job has been cancelled
+ fileSystem.delete(jobPath);
}
- String newPath = newDir + "/" + DisworkDaemon.newJobLinkName();
-
- log.info("moving " + jobPath + " to " + newPath);
- fileSystem.move(jobPath, newPath);
-
- // mark the job has finished if done or failed too many
- // times
- if ( newDir == DisworkDaemon.DONE ||
- newDir == DisworkDaemon.FAILED_3) {
- log.info("marking " + newPath + " as finished");
- log(newPath, "FINISHED");
- }
}
}
@@ -507,19 +511,25 @@
@Override
public void run() {
while (! shouldStop) {
- if (activityStrategy.canWork()) {
- try {
+ try {
+ if (activityStrategy.canWork()) {
findAJobAndRunIt();
- } catch (DisworkException e) {
- log.error("worker error " + e);
- throw new RuntimeException("worker error " + e);
- } catch (IOException e) {
- log.error("worker error " + e);
- throw new RuntimeException("worker error " + e);
- } catch (DisworkFileSystemException e) {
- log.error("worker error " + e);
- throw new RuntimeException("worker error " + e);
+ } else {
+ // waiting for strategy to change state
+ Thread.sleep(10 * 1000);
}
+ } catch (DisworkException e) {
+ log.error("worker error " + e);
+ throw new RuntimeException("worker error " + e);
+ } catch (IOException e) {
+ log.error("worker error " + e);
+ throw new RuntimeException("worker error " + e);
+ } catch (DisworkFileSystemException e) {
+ log.error("worker error " + e);
+ throw new RuntimeException("worker error " + e);
+ } catch (InterruptedException e) {
+ log.error("worker error " + e);
+ throw new RuntimeException("worker error " + e);
}
}
}
@@ -529,6 +539,9 @@
this.fileSystem = fileSystem;
this.config = config;
+ applicationCache = new File(config.getTempDirectory(), "cache");
+ applicationCache.mkdirs();
+
// initialize activityStrategy according to config
String initialStrategy = config.getActivityStrategy();
if ("none".equals(initialStrategy)) {
@@ -549,11 +562,37 @@
log.info("will start " + config.getNumberOfWorkers() + " workers");
for (int i = 1 ; i <= config.getNumberOfWorkers() ; i++) {
Worker worker = new Worker();
+ worker.setName("disworker-" + i);
worker.start();
workers.add(worker);
}
}
+ /** read an application from the file system and use a cache */
+ protected File getApplicationData(String applicationName,
+ String applicationVersion)
+ throws DisworkFileSystemException,
+ IOException {
+ File cachedApplicationData = new File(applicationCache,
+ applicationName + "-" + applicationVersion + ".zip");
+ if (!cachedApplicationData.exists()) {
+ log.debug("cache fail for " + applicationName + "-" + applicationVersion);
+ synchronized (cachedApplicationData) {
+ String applicationPath = DisworkDaemon.getPathForDependency(
+ applicationName, applicationVersion);
+ InputStream applicationData = fileSystem.read(applicationPath);
+ cachedApplicationData.createNewFile();
+ log.info("will create " + cachedApplicationData.getAbsolutePath());
+ OutputStream out = new FileOutputStream(cachedApplicationData);
+ log.debug("starting copy of " + applicationData.available() + " bytes");
+ IOUtils.copy(applicationData, out);
+ }
+ } else {
+ log.debug("cache matches for " + applicationName + "-" + applicationVersion);
+ }
+ return cachedApplicationData;
+ }
+
public void stop() {
stop(false);
}
@@ -564,6 +603,8 @@
worker.shouldStop = true;
}
+ FileUtil.deleteRecursively(applicationCache);
+
if( !now ) {
// waiting for them to actually have finished
for (Worker worker : workers) {
@@ -600,7 +641,7 @@
}
public void activeScheduledActivityStrategy() {
- setActivityStrategy(new ActivityStrategy.ScheduledActivity());
+ setActivityStrategy(new ActivityStrategy.ScheduledActivity(config));
}
}
\ No newline at end of file
Added: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkConfigTest.java
===================================================================
--- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkConfigTest.java (rev 0)
+++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkConfigTest.java 2010-07-07 09:22:04 UTC (rev 98)
@@ -0,0 +1,47 @@
+package org.nuiton.diswork.daemon;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+
+import org.junit.Test;
+import org.nuiton.util.FileUtil;
+
+public class DisworkConfigTest {
+
+ @Test
+ public void testSchedule() throws Exception {
+ String schedule = "# every night of the week on working day" + "\n"
+ + "* * 22-8 ? * MON-FRI" + "\n"
+ + "" + "\n"
+ + "# all day long during the week-end" + "\n"
+ + "* * * ? * SAT-SUN" + "\n"
+ + "" + "\n";
+ File scheduleFile = FileUtil.getTempFile(schedule);
+
+ DisworkConfig config = new DisworkConfig();
+ config.setSheduleFile(scheduleFile.getAbsolutePath());
+
+ // schedule file contains 2 expressions
+ assertEquals(2, config.getSchedule().size());
+ }
+
+ @Test
+ public void testTokens() throws Exception {
+ // directories can be fake, only strings are considered
+ String tokens = "# setting the path to python" + "\n"
+ + "%python=/usr/bin/python" + "\n"
+ + "" + "\n"
+ + "# overwriting temp dir C:\temp" + "\n"
+ + "%tmp=C:\temp" + "\n"
+ + "" + "\n";
+ File tokensFile = FileUtil.getTempFile(tokens);
+
+ DisworkConfig config = new DisworkConfig();
+ config.setTokensFile(tokensFile.getAbsolutePath());
+
+ assertEquals("/usr/bin/python C:\\temp",
+ config.parseCommandLine("%python %tmp", null));
+ }
+
+}
Modified: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java
===================================================================
--- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java 2010-07-01 14:35:38 UTC (rev 97)
+++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonConcurrencyTest.java 2010-07-07 09:22:04 UTC (rev 98)
@@ -1,21 +1,12 @@
package org.nuiton.diswork.daemon;
-import java.io.InputStream;
-import org.junit.Before;
-
public class DisworkDaemonConcurrencyTest extends DisworkDaemonTest {
- @Before
- public void setUp() throws Exception {
- DisworkConfig config = DisworkConfig.newConfig();
- port += 1;
- config.setUsedPort(port);
+ @Override
+ protected void setConfigs() {
+ config = super.newConfig();
config.setNumberOfWorkers(32);
- daemon = new DisworkDaemon(config);
- InputStream application = ClassLoader.getSystemResourceAsStream("fake-app-1.0.zip");
- daemon.submitApplication("fake-app", "1.0", application);
}
-
}
Modified: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java
===================================================================
--- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java 2010-07-01 14:35:38 UTC (rev 97)
+++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonMultipleNodesTest.java 2010-07-07 09:22:04 UTC (rev 98)
@@ -1,46 +1,19 @@
package org.nuiton.diswork.daemon;
-import java.io.InputStream;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
import org.nuiton.diswork.fs.DisworkFileSystemConfig;
+
public class DisworkDaemonMultipleNodesTest extends DisworkDaemonTest {
- /** another deamon
- * this second deamon will be connected to {@link DisworkDaemonTest#daemon}
- * and will to his jobs.
- */
- protected static DisworkDaemon daemon2;
-
- @Before
@Override
- public void setUp() throws Exception {
- DisworkConfig config = DisworkConfig.newConfig();
- port += 1;
- config.setUsedPort(port);
+ protected void setConfigs() {
+ config = super.newConfig();
config.setActivityStrategy("none");
- config.fileSystemConfig.setBlockSize(500);
- daemon = new DisworkDaemon(config);
- InputStream application = ClassLoader.getSystemResourceAsStream("fake-app-1.0.zip");
- daemon.submitApplication("fake-app", "1.0", application);
-
-
- DisworkConfig config2 = DisworkConfig.newConfig();
+ config2 = super.newConfig();
config2.setBootstrapIp(DisworkFileSystemConfig.getIp());
- config2.setBootstrapPort(port);
- port += 1;
- config2.setUsedPort(port);
+ config2.setBootstrapPort(config.getUsedPort());
config2.setActivityStrategy("unlimited");
- daemon2 = new DisworkDaemon(config2);
}
-
- @After
- public void tearDown() throws Exception {
- daemon2.close();
- }
}
Modified: trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java
===================================================================
--- trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java 2010-07-01 14:35:38 UTC (rev 97)
+++ trunk/diswork-daemon/src/test/java/org/nuiton/diswork/daemon/DisworkDaemonTest.java 2010-07-07 09:22:04 UTC (rev 98)
@@ -15,23 +15,50 @@
public class DisworkDaemonTest {
+ protected DisworkConfig config;
+ protected DisworkConfig config2;
+
protected static DisworkDaemon daemon;
+ protected static DisworkDaemon daemon2;
- protected static int port = 49999;
+ protected static int port = 45500;
+
+ /** a factory method to ease the creation of configs */
+ protected DisworkConfig newConfig() {
+ DisworkConfig config = new DisworkConfig();
+ port += 1;
+ config.setUsedPort(port);
+
+ // useless in tests
+ config.setStartHttpFrontend(false);
+ return config;
+ }
+ /** initialize config and config2
+ * override this method permit to create different configuration to test,
+ * see sub-classes
+ */
+ protected void setConfigs() {
+ config = newConfig();
+ }
+
@Before
public void setUp() throws Exception {
- DisworkConfig config = DisworkConfig.newConfig();
- port += 1;
- config.setUsedPort(port);
+ setConfigs();
daemon = new DisworkDaemon(config);
+ if (config2 != null) {
+ daemon2 = new DisworkDaemon(config2);
+ }
InputStream application = ClassLoader.getSystemResourceAsStream("fake-app-1.0.zip");
- daemon.submitApplication("fake-app", "1.0", application);
+ daemon.submitApplication("fake-app", "1.0", application);
}
@After
public void tearDown() throws Exception {
daemon.close();
+ if (daemon2 != null) {
+ daemon2.close();
+ }
}
@Test
@@ -162,4 +189,8 @@
assertEquals(4, stats.size());
}
+ @Test
+ public void testJobsManagement() throws Exception {
+ assertEquals(0, daemon.getAllJobs().size());
+ }
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystem.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystem.java 2010-07-01 14:35:38 UTC (rev 97)
+++ trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/DisworkFileSystem.java 2010-07-07 09:22:04 UTC (rev 98)
@@ -24,7 +24,6 @@
*/
package org.nuiton.diswork.fs;
-import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
@@ -56,7 +55,7 @@
* lead to errors.
*
*/
-public class DisworkFileSystem implements Closeable {
+public class DisworkFileSystem {
private static final Log log = LogFactory.getLog(DisworkFileSystem.class);
@@ -435,7 +434,7 @@
}
String parent = EntryUtil.getParentFromPath(path);
String name = EntryUtil.getNameFromPath(path);
- log.info("trying to remove " + path);
+ log.info("trying to delete " + path);
delete(parent, name);
}
@@ -700,7 +699,34 @@
}
}
+
+ public boolean isLink(String path) throws DisworkFileSystemException {
+ checkPathSyntax(path);
+ String entry = walk(path);
+ if (entry == null) {
+ throw new DisworkFileSystemException(Type.NO_SUCH_ENTITY, path);
+ }
+ return EntryUtil.isLink(entry);
+ }
+ public boolean isFile(String path) throws DisworkFileSystemException {
+ checkPathSyntax(path);
+ String entry = walk(path);
+ if (entry == null) {
+ throw new DisworkFileSystemException(Type.NO_SUCH_FILE, path);
+ }
+ return EntryUtil.isFile(entry);
+ }
+
+ public boolean isDirectory(String path) throws DisworkFileSystemException {
+ checkPathSyntax(path);
+ String entry = walk(path);
+ if (entry == null) {
+ throw new DisworkFileSystemException(Type.NO_SUCH_FILE, path);
+ }
+ return EntryUtil.isDirectory(entry);
+ }
+
public void createDirectories(String path)
throws DisworkFileSystemException {
log.info("trying create directories for " + path);
@@ -716,7 +742,17 @@
}
}
}
-
+
+ public void deleteRecursively(String path) throws DisworkFileSystemException {
+ if (isDirectory(path)) {
+ List<String> directoryContent = readDirectory(path);
+ for (String contentName : directoryContent) {
+ deleteRecursively(path + EntryUtil.PATH_SEPARATOR + contentName);
+ }
+ }
+ delete(path);
+ }
+
/**
* return the entry of the element at the end of <code>path</code>
* @param path
@@ -827,8 +863,7 @@
return result;
}
- @Override
- public void close() throws IOException {
+ public void close() throws DisworkFileSystemException {
storage.close();
}
Modified: trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/Storage.java
===================================================================
--- trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/Storage.java 2010-07-01 14:35:38 UTC (rev 97)
+++ trunk/diswork-fs/src/main/java/org/nuiton/diswork/fs/storage/Storage.java 2010-07-07 09:22:04 UTC (rev 98)
@@ -25,7 +25,6 @@
package org.nuiton.diswork.fs.storage;
import java.io.ByteArrayInputStream;
-import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
@@ -46,7 +45,7 @@
*
* It needs a place to store data called a {@link DisworkMap}.
*/
-public class Storage implements Closeable {
+public class Storage {
private static final Log log = LogFactory.getLog(Storage.class);
@@ -401,10 +400,15 @@
}
}
- @Override
- public void close() throws IOException {
+ public void close() throws DisworkFileSystemException {
clean();
- map.close();
+ try {
+ map.close();
+ } catch (IOException e) {
+ log.error("error while closing map", e);
+ throw new DisworkFileSystemException(Type.NETWORK_FAILURE,
+ "unable to close the map", e);
+ }
}
/**
Modified: trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/AbstractDisworkFileSystemTest.java
===================================================================
--- trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/AbstractDisworkFileSystemTest.java 2010-07-01 14:35:38 UTC (rev 97)
+++ trunk/diswork-fs/src/test/java/org/nuiton/diswork/fs/AbstractDisworkFileSystemTest.java 2010-07-07 09:22:04 UTC (rev 98)
@@ -505,4 +505,17 @@
fail();
}
}
+
+ @Test
+ public void testDeleteRecursively() throws Exception {
+ try {
+ fileSystem.createDirectories("/dir/subdir/subsubdir");
+ fileSystem.createDirectory("/dir/subdir/other");
+ fileSystem.createSymbolicLink("/dir/subdir/link", "/dir/subdir/other");
+ fileSystem.deleteRecursively("/dir/subdir");
+ assertEquals(0, fileSystem.readDirectory("/dir").size());
+ } catch (DisworkFileSystemException e) {
+ fail();
+ }
+ }
}
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2010-07-01 14:35:38 UTC (rev 97)
+++ trunk/pom.xml 2010-07-07 09:22:04 UTC (rev 98)
@@ -49,6 +49,11 @@
<version>1.4</version>
</dependency>
<dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <version>2.5</version>
+ </dependency>
+ <dependency>
<groupId>org.nuiton</groupId>
<artifactId>nuiton-utils</artifactId>
<version>1.2.2</version>
@@ -80,6 +85,16 @@
<artifactId>commons-daemon</artifactId>
<version>1.0.1</version>
</dependency>
+ <dependency>
+ <groupId>org.quartz-scheduler</groupId>
+ <artifactId>quartz</artifactId>
+ <version>1.8.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ <version>6.1.9</version>
+ </dependency>
Modified: trunk/src/site/rst/diswork-daemon/devel.rst
===================================================================
--- trunk/src/site/rst/diswork-daemon/devel.rst 2010-07-01 14:35:38 UTC (rev 97)
+++ trunk/src/site/rst/diswork-daemon/devel.rst 2010-07-07 09:22:04 UTC (rev 98)
@@ -10,8 +10,98 @@
Overview
--------
-Diswork Daemon uses Diswork File System.
+Diswork Daemon uses Diswork File System. Since it's distributed, all diswork
+nodes acesses to the same data.
+Jobs are stored on the FS with all necessary data. A simple directory structure
+make easy for diswork nodes to find those jobs.
+
+When jobs are finished, results are written on the file system.
+
+Job specification
+-----------------
+
+First, a job has a owner. It's an important because every different owner puts
+his jobs in his own home directory. For user john_doe, jobs are stored in
+/home/john_doe/jobs/
+
+A job is a folder that contains
+
++ a job description
+
++ a log file
+
++ the specific data for this job
+
+The job description describe the details of the jobs : command line to run,
+input files, output files, standard input, standard output etc.
+
+Here is a sample job description, as generated by isis-fish :
+
+::
+
+ <jsdl:JobDefinition xmlns="http://www.example.org/"
+ xmlns:jsdl="http://schemas.ggf.org/jsdl/2005/11/jsdl"
+ xmlns:jsdl-posix="http://schemas.ggf.org/jsdl/2005/11/jsdl-posix"
+ xmlns:diswork="http://nuiton.org/projects/show/diswork"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <jsdl:JobDescription>
+ <jsdl:JobIdentification>
+ <jsdl:JobName>sim__2010-07-02-12-34</jsdl:JobName>
+ </jsdl:JobIdentification>
+ <jsdl:Application>
+ <jsdl:ApplicationName>isis-fish</jsdl:ApplicationName>
+ <jsdl:ApplicationVersion>3.3.0.4</jsdl:ApplicationVersion>
+ <jsdl-posix:POSIXApplication>
+ <jsdl-posix:Executable />
+ <jsdl-posix:Argument>%java -Xmx2500M -jar isis-fish-3.3.0.4/isis-fish-3.3.0.4.jar --option launch.ui false --option perform.vcsupdate false --option perform.migration false --option perform.cron false --simulateRemotelly sim__2010-07-02-12-34 simulation-sim__2010-07-02-12-34-preparation.zip simulation-sim__2010-07-02-12-34-result.zip</jsdl-posix:Argument>
+ <jsdl-posix:Output>simulation-sim__2010-07-02-12-34-output.txt</jsdl-posix:Output>
+ </jsdl-posix:POSIXApplication>
+ </jsdl:Application>
+ <jsdl:DataStaging diswork:type="in">
+ <jsdl:FileName>simulation-sim__2010-07-02-12-34-preparation.zip</jsdl:FileName>
+ </jsdl:DataStaging>
+ <jsdl:DataStaging diswork:type="out">
+ <jsdl:FileName>simulation-sim__2010-07-02-12-34-result.zip</jsdl:FileName>
+ </jsdl:DataStaging>
+ <jsdl:DataStaging diswork:type="out">
+ <jsdl:FileName>simulation-sim__2010-07-02-12-34-output.txt</jsdl:FileName>
+ </jsdl:DataStaging>
+ </jsdl:JobDescription>
+ </jsdl:JobDefinition>
+
+
+As you can read, the job description references an application (isis-fish
+3.3.0.4). It's a simple zip File containing all the data that are common
+to all isis-related jobs.
+
+The log file is a simple text file. Each time a worker try to execute the job,
+he write "DONE" or "FAILED". If this is the last time the job is tried, the worker
+add "FINISHED". Thus, a typical log file can be
+
+::
+
+ DONE
+ FINISHED
+
+and another would be :
+
+::
+
+ FAILED
+ FAILED
+ FAILED
+ FINISHED
+
+In the job directory will be uploaded files needed for this specific job.
+According to the above JSDL file, the job directory will content a file called
+"simulation-sim__2010-07-02-12-34-preparation.zip".
+
+
+Once the job is done, the job directory will contain the files asked as results.
+In the above example, two files will be available for download to the requester :
+"simulation-sim__2010-07-02-12-34-result.zip" and "simulation-sim__2010-07-02-12-34-output.txt"
+
Submit a job
------------
Modified: trunk/src/site/rst/user/how_to_use.rst
===================================================================
--- trunk/src/site/rst/user/how_to_use.rst 2010-07-01 14:35:38 UTC (rev 97)
+++ trunk/src/site/rst/user/how_to_use.rst 2010-07-07 09:22:04 UTC (rev 98)
@@ -12,13 +12,13 @@
svn checkout http://svn.nuiton.org/svn/diswork/trunk/ diswork
cd diswork/diswork-daemon
- mvn assembly:assembly -Dmaven.test.skip -DdescriptorId=jar-with-dependencies
+ mvn assembly:assembly -DdescriptorId=jar-with-dependencies
cd ../..
# under debian commons-daemon can be found at /usr/share/java/commons-daemon.jar
- jsvc -cp /usr/share/java/commons-daemon.jar:diswork/diswork-daemon/target/diswork-daemon-0.0.1-SNAPSHOT-jar-with-dependencies.jar -pidfile ./pid -outfile ./out -errfile ./err DisworkDaemonRunner
+ # run as root :
+ jsvc -cp /usr/share/java/commons-daemon.jar:/usr/share/maven-repo/log4j/log4j/1.2.15/log4j-1.2.15.jar:diswork/diswork-daemon/target/diswork-daemon-0.0.1-SNAPSHOT-jar-with-dependencies.jar -pidfile ./pid -outfile ./out -errfile ./err org.nuiton.diswork.daemon.DisworkDaemonRunner
-
How to make my own application ready for being run by all diswork nodes
-----------------------------------------------------------------------
1
0