Author: bleny Date: 2010-05-06 10:25:12 +0200 (Thu, 06 May 2010) New Revision: 23 Url: http://nuiton.org/repositories/revision/diswork/23 Log: logs, lecture/ecriture des fichiers sur le FS, menage Added: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkConfig.java Removed: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/config/ Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkServicesManager.java trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/RunMe.java trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/SimpleDownload.java trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/AbstractDisworkService.java trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DownloadService.java trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/LookUpService.java trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/BytesChunk.java trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromChunks.java trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromLocalFile.java trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Address.java trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/jgroups/JGroupsTransport.java trunk/diswork-fs/src/main/resources/log4j.properties trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/AbstractSplitFileTest.java trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/DistributedFileSystemTest.java trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromChunksTest.java trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromLocalFileTest.java Copied: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkConfig.java (from rev 19, trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/config/DisworkConfig.java) =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkConfig.java (rev 0) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkConfig.java 2010-05-06 08:25:12 UTC (rev 23) @@ -0,0 +1,25 @@ +package org.nuiton.disworkfs; + +import java.io.File; +import java.util.Random; + +import org.nuiton.util.ApplicationConfig; + +public class DisworkConfig extends ApplicationConfig { + + public DisworkConfig() { + Random random = new Random(); + setDefaultOption("storage", "/tmp/disworkfs/storage" + random.nextInt()); + setDefaultOption("jgroups.cluster_name", "diswork-fs"); + + // replication strategy... + } + + public File getStoragePath() { + return getOptionAsFile("storage"); + } + + public String getJGroupsClusterName() { + return getOption("jgroups.cluster_name"); + } +} Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java 2010-05-04 17:54:32 UTC (rev 22) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkFileSystem.java 2010-05-06 08:25:12 UTC (rev 23) @@ -8,7 +8,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.nuiton.disworkfs.config.DisworkConfig; import org.nuiton.disworkfs.services.DownloadService; import org.nuiton.disworkfs.services.LookUpService; import org.nuiton.disworkfs.services.UploadService; @@ -21,7 +20,7 @@ private LookUpService lookUpService; private DisworkConfig disworkConfig; - private Log log = LogFactory.getLog(DisworkFileSystem.class); + private static final Log log = LogFactory.getLog(DisworkFileSystem.class); public DisworkFileSystem(DisworkConfig disworkConfig) { @@ -57,7 +56,7 @@ log.info("trying to read " + path); - File file = new File(disworkConfig.getOption("storage"), path); + File file = new File(disworkConfig.getStoragePath(), path); log.info("trying at local file system " + file.getAbsolutePath()); if (!file.exists()) { @@ -89,7 +88,7 @@ public void write(String path, File source) throws IOException { - File target = new File(disworkConfig.getOption("storage"), path); + File target = new File(disworkConfig.getStoragePath(), path); /* if (file.exists()) { // TODO Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkServicesManager.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkServicesManager.java 2010-05-04 17:54:32 UTC (rev 22) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/DisworkServicesManager.java 2010-05-06 08:25:12 UTC (rev 23) @@ -3,9 +3,6 @@ import java.util.ArrayList; import java.util.List; -import org.jgroups.Address; -import org.jgroups.View; -import org.nuiton.disworkfs.config.DisworkConfig; import org.nuiton.disworkfs.services.AbstractDisworkService; import org.nuiton.disworkfs.transport.Message; import org.nuiton.disworkfs.transport.Receiver; Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/RunMe.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/RunMe.java 2010-05-04 17:54:32 UTC (rev 22) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/RunMe.java 2010-05-06 08:25:12 UTC (rev 23) @@ -1,6 +1,5 @@ package org.nuiton.disworkfs; -import org.nuiton.disworkfs.config.DisworkConfig; public class RunMe { Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/SimpleDownload.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/SimpleDownload.java 2010-05-04 17:54:32 UTC (rev 22) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/SimpleDownload.java 2010-05-06 08:25:12 UTC (rev 23) @@ -18,7 +18,7 @@ private LookUpService lookUpService; private String filePath; - private Log log = LogFactory.getLog(SimpleDownload.class); + private static final Log log = LogFactory.getLog(SimpleDownload.class); // TODO timeout public SimpleDownload(String filePath, LookUpService lookUpService, DownloadService downloadService) throws Exception { @@ -30,10 +30,13 @@ public boolean initiateDownload() { lookUpService.lookForFileName(filePath, this); - + // FIXME bad implementation of a timeout int numberOfSecondsWaited = 0; - while (this.fileDescription == null && numberOfSecondsWaited <= 10) { + boolean responseReceived = false; + + // wait until there is a result or timeout + while (!responseReceived && numberOfSecondsWaited <= 10) { // response not yet received, wait again... try { Thread.sleep(1000); @@ -43,27 +46,35 @@ // TODO Auto-generated catch block e.printStackTrace(); } + + responseReceived = this.fileDescription != null; + if (responseReceived) + log.info("look-up response received"); } - return fileDescription != null; + return responseReceived; } public File startDownload() { + if (log.isDebugEnabled()) + log.info("starting download for " + fileDescription.getFileName()); downloadService.startDownload(fileDescription, this); // TODO throw file not found if timeout exceed while(! downloadFinised) { try { - Thread.sleep(1000); - log.info("waiting for download complete"); + Thread.sleep(10 * 1000); + log.info("still waiting for " + fileDescription.getFileName() + " download to complete"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } + if (log.isDebugEnabled()) + log.info("download " + fileDescription.getFileName() + " is complete"); return new File(fileDescription.getFileName()); } Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/AbstractDisworkService.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/AbstractDisworkService.java 2010-05-04 17:54:32 UTC (rev 22) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/AbstractDisworkService.java 2010-05-06 08:25:12 UTC (rev 23) @@ -4,9 +4,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.jgroups.Address; -import org.jgroups.View; -import org.nuiton.disworkfs.config.DisworkConfig; +import org.nuiton.disworkfs.DisworkConfig; import org.nuiton.disworkfs.messages.FileRequestMessage; import org.nuiton.disworkfs.messages.FileTransferMessage; import org.nuiton.disworkfs.messages.LookUpMessage; @@ -23,7 +21,7 @@ protected DisworkConfig disworkConfig; - private Log log = LogFactory.getLog(AbstractDisworkService.class); + private static final Log log = LogFactory.getLog(AbstractDisworkService.class); public AbstractDisworkService() {} Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DownloadService.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DownloadService.java 2010-05-04 17:54:32 UTC (rev 22) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/DownloadService.java 2010-05-06 08:25:12 UTC (rev 23) @@ -1,7 +1,6 @@ package org.nuiton.disworkfs.services; import java.io.File; -import java.io.IOException; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -30,7 +29,8 @@ public void receiveFileTransferMessage(Message message) { FileTransferMessage fileTransferMessage = (FileTransferMessage) message.getContent(); - + FileDescription fileDescription = fileTransferMessage.getFileDescrition(); + if (log.isDebugEnabled()) { log.info("received file chunk " + fileTransferMessage.getFileDescrition().getFileName() @@ -39,47 +39,51 @@ + ")"); } - Long checkSum = fileTransferMessage.getFileDescrition().getFileCheckSum(); + // Before do anything, check if we already have the complete file - if (! downloadsInProgress.containsKey(checkSum)) { - // Start download + File newFile = new File(disworkConfig.getStoragePath(), fileDescription.getFileName()); + File newFileStatus = new File(disworkConfig.getStoragePath(), "." + fileDescription.getFileName() + ".index"); + + // file must exists, have the expected size and index should not exist + // TODO : use checksum ? + boolean alreadyHaveFile = newFile.exists() + && (newFile.length() == fileDescription.getTotalSize()) + && !newFileStatus.exists(); + + if (!alreadyHaveFile) { + Long checkSum = fileDescription.getFileCheckSum(); - log.info("first chunk received, initiate download"); + if (! downloadsInProgress.containsKey(checkSum)) { + // Start download - SplitFileFromChunks newSplitFile = new SplitFileFromChunks(fileTransferMessage.getFileDescrition()); + log.info("first chunk received, initiate download"); - downloadsInProgress.put(checkSum, newSplitFile); - } + SplitFileFromChunks newSplitFile = new SplitFileFromChunks(fileDescription, newFile); - // we have received a file chunk, let's add it - SplitFileFromChunks downloadingFile = downloadsInProgress.get(checkSum); + downloadsInProgress.put(checkSum, newSplitFile); + } - downloadingFile.addChunk(fileTransferMessage.getFileChunk()); + // we have received a file chunk, let's add it + SplitFileFromChunks downloadingFile = downloadsInProgress.get(checkSum); - // maybe the download is complete - if (downloadingFile.isComplete()) { - // write file to the FS - - try { - File newFile = new File(disworkConfig.getStoragePath(), fileTransferMessage.getFileDescrition().getFileName()); - downloadingFile.writeToLocalFileSystem(newFile); + downloadingFile.addChunk(fileTransferMessage.getFileChunk()); + + // maybe the download is complete + if (downloadingFile.isComplete()) { + // write file to the FS log.info("file " + newFile.getAbsolutePath() + " written"); - } catch (IOException e) { - log.error("can't write file"); - } catch (Exception e) { - log.error("trying to write data with missing chunks"); - } - finishedDownloads.add(checkSum); - downloadsInProgress.remove(checkSum); + finishedDownloads.add(checkSum); + downloadsInProgress.remove(checkSum); - this.notifyAllDownloadObserversForFile(checkSum); + this.notifyAllDownloadObserversForFile(checkSum); + } } } public void registerObserver(FileDescription fileDescription, DownloadObserver downloadObserver) { List<DownloadObserver> observersList = downloadObservers.get(fileDescription.getFileCheckSum()); - + if (observersList == null) { // it's the first observer for this download ever, let's construct a list observersList = new LinkedList<DownloadObserver>(); @@ -90,11 +94,11 @@ private void notifyAllDownloadObserversForFile(Long checksum) { List<DownloadObserver> downloadObserversForThisFile = downloadObservers.get(checksum); - + // important check : maybe no observer for this file download // so the list is null. if (downloadObserversForThisFile != null) { - + for (DownloadObserver downloadObserver : downloadObserversForThisFile) { downloadObserver.updateDownloadStatus(this); } @@ -106,27 +110,7 @@ return finished; } - public double getProgess(FileDescription fileDescription) { - // FIXME this method never return intermediate values - double progress = 0.0; - if (isFinished(fileDescription)) { - progress = 1.0; - } else { - Long checksum = fileDescription.getFileCheckSum(); - if (downloadsInProgress.containsKey(checksum)) { - // download is in progress - long actualSize = downloadsInProgress.get(checksum).getActualSize(); - long expectedSize = fileDescription.getTotalSize(); - progress = actualSize / expectedSize; - } else { - // download not started - progress = -1.0; - } - } - return progress; - } - public void startDownload(FileDescription fileDescription, DownloadObserver downloadObserver) { Message message = transport.newMulticastMessage(); message.setContent(new FileRequestMessage(fileDescription)); Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/LookUpService.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/LookUpService.java 2010-05-04 17:54:32 UTC (rev 22) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/services/LookUpService.java 2010-05-06 08:25:12 UTC (rev 23) @@ -23,11 +23,11 @@ private static final Log log = LogFactory.getLog(LookUpService.class); - /** - * - * @param fileName the name of the name - * @return the FileDescription or null of file have not been found - */ + /** + * + * @param fileName the name of the file to search for + * @param lookUpObserver the object to notify when LookUpResponse have been sent back + */ public void lookForFileName(String fileName, LookUpObserver lookUpObserver) { LookUpMessage lookUpMessage = new LookUpMessage(fileName); Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/BytesChunk.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/BytesChunk.java 2010-05-04 17:54:32 UTC (rev 22) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/BytesChunk.java 2010-05-06 08:25:12 UTC (rev 23) @@ -2,7 +2,6 @@ import java.io.Serializable; -// TODO really implement Serializable public class BytesChunk implements Serializable { /** Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromChunks.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromChunks.java 2010-05-04 17:54:32 UTC (rev 22) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromChunks.java 2010-05-06 08:25:12 UTC (rev 23) @@ -1,80 +1,119 @@ package org.nuiton.disworkfs.split; import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.RandomAccessFile; +import java.util.BitSet; import java.util.zip.CRC32; import org.apache.commons.io.FileUtils; -import org.apache.commons.lang.NotImplementedException; public class SplitFileFromChunks { - private FileDescription fileDescription; - - private List<FileChunk> fileChunks; - + protected FileDescription fileDescription; + protected File destination; + protected File chunkStatusFile; - public SplitFileFromChunks(FileDescription fileDescription) { + public SplitFileFromChunks(FileDescription fileDescription, File destination) { this.fileDescription = fileDescription; - fileChunks = new ArrayList<FileChunk>(); + this.destination = destination; + this.chunkStatusFile = new File(destination.getParent(), "." + destination.getName() + ".index"); + + if (!chunkStatusFile.exists()) { + BitSet bitSet = new BitSet(fileDescription.getNumberOfChunks()); + bitSet.set(0, fileDescription.getNumberOfChunks()); + writeChunkStatusFile(bitSet); + bitSet = readChunkStatusFile(); + } + + if (!destination.exists()) { + try { + RandomAccessFile randomAccessFile = new RandomAccessFile(destination, "rw"); + randomAccessFile.setLength(fileDescription.getTotalSize()); + randomAccessFile.close(); + } catch (FileNotFoundException e) { + // we just checked !destination.exists() + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } } - public void addChunk(FileChunk fileChunk) { - fileChunks.add(fileChunk); - } - public boolean isComplete() { - return fileChunks.size() == fileDescription.getNumberOfChunks(); + BitSet bitSet = readChunkStatusFile(); + boolean fileIsComplete = bitSet.cardinality() == 0; + return fileIsComplete; } - public long getActualSize() { - int actualSize = 0; - for (FileChunk fileChunk : fileChunks) { - actualSize += fileChunk.getBytesChunk().getChunkSize(); + + protected BitSet readChunkStatusFile() { + BitSet bitSet = null; + try { + FileInputStream is = new FileInputStream(chunkStatusFile); + bitSet = (BitSet) new ObjectInputStream(is).readObject(); + is.close(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (ClassNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); } - return actualSize; + return bitSet; } - public List<Integer> getMissingChunksIds() { - throw new NotImplementedException(); + + protected void writeChunkStatusFile(BitSet bitSet) { + try { + FileOutputStream os = new FileOutputStream(chunkStatusFile); + new ObjectOutputStream(os).writeObject(bitSet); + os.close(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } } - public boolean writeToLocalFileSystem() throws IOException, Exception { - File newFile = new File(fileDescription.getFileName()); - return writeToLocalFileSystem(newFile); - } - - /** - * - * @return the result of the check integrity - * @throws IOException - * @throws Exception - */ - public boolean writeToLocalFileSystem(File newFile) throws IOException, Exception { + public void addChunk(FileChunk fileChunk) { + try { + RandomAccessFile randomAccessFile = new RandomAccessFile(destination, "rw"); + byte[] data = fileChunk.getBytesChunk().getData(); + int off = BytesChunk.MAX_CHUNK_SIZE * fileChunk.getBytesChunk().getChunkNumber(); + int len = fileChunk.getBytesChunk().getChunkSize(); + randomAccessFile.seek(off); + randomAccessFile.write(data, 0, len); + randomAccessFile.close(); + + // updating status + + BitSet chunkStatus = readChunkStatusFile(); + chunkStatus.clear(fileChunk.getBytesChunk().getChunkNumber()); + writeChunkStatusFile(chunkStatus); + + + if (isComplete()) { - if (fileChunks == null) { - throw new Exception("No data to write"); - } - - - long expectedSize = fileDescription.getTotalSize(); - - SplittedBytes splittedBytes = new SplittedBytes(expectedSize); - - for (FileChunk fileChunk : fileChunks) { - splittedBytes.addChunk(fileChunk.getBytesChunk()); - } - - FileUtils.writeByteArrayToFile(newFile, splittedBytes.getBytesFromChunks()); - - // checking integrity - long expectedChecksum = fileDescription.getFileCheckSum(); - long actualCheckSum = FileUtils.checksum(newFile, new CRC32()).getValue(); - - return actualCheckSum == expectedChecksum; + long expectedChecksum = fileDescription.getFileCheckSum(); + long actualCheckSum = FileUtils.checksum(destination, new CRC32()).getValue(); + + if (actualCheckSum != expectedChecksum) { + throw new IOException("checksum fail"); + } + } + } catch (FileNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } } } Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromLocalFile.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromLocalFile.java 2010-05-04 17:54:32 UTC (rev 22) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/split/SplitFileFromLocalFile.java 2010-05-06 08:25:12 UTC (rev 23) @@ -1,7 +1,9 @@ package org.nuiton.disworkfs.split; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; +import java.io.RandomAccessFile; import java.util.ArrayList; import java.util.List; import java.util.zip.CRC32; @@ -10,53 +12,56 @@ public class SplitFileFromLocalFile { - private File localFile; + protected File source; - public SplitFileFromLocalFile(File file) { - localFile = file; + public SplitFileFromLocalFile(File source) { + this.source = source; } - + public FileDescription getFileDescription() throws IOException { - String localFileName = localFile.getPath(); - long localFileTotalSize = localFile.length(); - long localFileCheckSum = FileUtils.checksum(localFile, new CRC32()).getValue(); + String localFileName = source.getPath(); + long localFileTotalSize = source.length(); + long localFileCheckSum = FileUtils.checksum(source, new CRC32()).getValue(); return new FileDescription(localFileName, localFileTotalSize, localFileCheckSum); } - - public List<FileChunk> getAllChunks() throws IOException { - byte[] data = FileUtils.readFileToByteArray(localFile); + + public List<FileChunk> getAllChunks() throws IOException, FileNotFoundException { + + RandomAccessFile randomAccessFile = new RandomAccessFile(source, "r"); + + // preparing an empty list to store the result + List<FileChunk> result = new ArrayList<FileChunk>(); - // Creating the file chunks - SplittedBytes splittedBytes = new SplittedBytes(); - splittedBytes.setChunksFromBytes(data); + // this array will contains some bytes read from the file + byte[] read = new byte[BytesChunk.MAX_CHUNK_SIZE]; + + // chunks have to be numbered + int chunkNumber = 0; - List<FileChunk> fileChunks = new ArrayList<FileChunk>(); - for(BytesChunk bytesChunk : splittedBytes.getChunks()) { - fileChunks.add(new FileChunk(bytesChunk)); + // the last chunk will not be complete, so we have to store the + // chunkSize for each chunk + int chunkSize; + + // reading the file until the end + while ((chunkSize = randomAccessFile.read(read)) != -1) { + + // creating a FileChunk from the data read + BytesChunk bytesChunk = new BytesChunk(read); + bytesChunk.setChunkNumber(chunkNumber); + bytesChunk.setChunkSize(chunkSize); + + FileChunk fileChunk = new FileChunk(bytesChunk); + result.add(fileChunk); + + // preparing data for next iteration + read = new byte[BytesChunk.MAX_CHUNK_SIZE]; + chunkNumber += 1; } - return fileChunks; - } - - public List<FileChunk> getSomeChunks(int[] indexes) throws IOException { - // TODO implement - List<FileChunk> allChunks = this.getAllChunks(); - List<FileChunk> someChunks = new ArrayList<FileChunk>(); - for (int index : indexes) { - someChunks.add(allChunks.get(index)); - } - return someChunks; - } - - public FileChunk getChunk(int index) throws IOException { - int singletonIndex[] = { index }; + + randomAccessFile.close(); - return this.getSomeChunks(singletonIndex).get(0); - + return result; } - - public int getNumberOfChunks() throws IOException { - return this.getFileDescription().getNumberOfChunks(); - } - - + + } Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Address.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Address.java 2010-05-04 17:54:32 UTC (rev 22) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/Address.java 2010-05-06 08:25:12 UTC (rev 23) @@ -3,5 +3,10 @@ import java.io.Serializable; public abstract class Address implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 4178387970911345672L; } Modified: trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/jgroups/JGroupsTransport.java =================================================================== --- trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/jgroups/JGroupsTransport.java 2010-05-04 17:54:32 UTC (rev 22) +++ trunk/diswork-fs/src/main/java/org/nuiton/disworkfs/transport/jgroups/JGroupsTransport.java 2010-05-06 08:25:12 UTC (rev 23) @@ -2,14 +2,15 @@ import java.io.Serializable; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.jgroups.Channel; import org.jgroups.ChannelClosedException; import org.jgroups.ChannelException; import org.jgroups.ChannelNotConnectedException; import org.jgroups.JChannel; import org.jgroups.View; -import org.nuiton.disworkfs.config.DisworkConfig; -import org.nuiton.disworkfs.services.AbstractDisworkService; +import org.nuiton.disworkfs.DisworkConfig; import org.nuiton.disworkfs.transport.Address; import org.nuiton.disworkfs.transport.Message; import org.nuiton.disworkfs.transport.Receiver; @@ -21,9 +22,11 @@ protected JChannel jChannel; protected Receiver receiver; + private static final Log log = LogFactory.getLog(JGroupsTransport.class); + /** - * This class is just a delegator. Every message received on the jChannel, - * will be sent to the Receiver who subscribed to transport + * This class is just a delegator. Every message received on this.jChannel, + * will be sent to this.receiver who subscribed to transport * @author bleny */ private class MyReceiver extends org.jgroups.ExtendedReceiverAdapter { @@ -34,16 +37,32 @@ this.tranport = tranport; } + /** + * receive a new message from the JChannel, send it to the + * transport layer + */ public void receive(org.jgroups.Message msg) { + // this method is call be the jChannel when a message is received + if (log.isDebugEnabled()) + log.info("message received from jChannel " + msg); + // let's create a new message for the transport layer Message message = tranport.newEmptyMessage(); + // setting the message attributes, according to the JGroups message message.setSource(new JGroupsAddress(msg.getSrc())); message.setSource(new JGroupsAddress(msg.getDest())); message.setContent((Serializable) msg.getObject()); + // sending our new message to the transport layer + if (log.isDebugEnabled()) + log.info("message received from jChannel sent to transport layer as " + message); receiver.receiveMessage(message); } + + public void viewAccepted(View new_view) { + log.info("now seeing " + new_view.size() + " nodes"); + } } @@ -52,49 +71,61 @@ this.disworkConfig = disworkConfig; try { jChannel = new JChannel("udp.xml"); - jChannel.connect("MonPremierGroupe"); + String clusterName = disworkConfig.getJGroupsClusterName(); + + log.info("connecting to JGroup " + clusterName); + jChannel.connect(clusterName); + // don't receive messages sent by myself jChannel.setOpt(Channel.LOCAL, new Boolean(false)); + // setting the receiver so every message received + // on jChannel will be received by the rest of + // the application MyReceiver myReceiver = new MyReceiver(this); jChannel.setReceiver(myReceiver); } catch (ChannelException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + log.error("error while creating and connecting to the JGroups channel"); } } + @Override public void setReceiver(Receiver receiver) { this.receiver = receiver; } + @Override public Address getMulticastAddress() { // null is the multicast destination in JGroups return new JGroupsAddress(null); } + @Override public boolean send(Message message) { org.jgroups.Message msg = ((JGroupsMessage) message).getMessage(); + boolean success = true; try { jChannel.send(msg); } catch (ChannelNotConnectedException e) { - // TODO log - return false; + log.error("JGroups channel not connected while trying to send a message"); + success = false; } catch (ChannelClosedException e) { - // TODO log - return false; + log.error("JGroups channel was closed while trying to send a message"); + success = false; } - return true; + return success; } + @Override public Message newEmptyMessage() { Message message = new JGroupsMessage(this, new org.jgroups.Message()); message.setSource(this.getLocalAddress()); return message; } + @Override public Message newMulticastMessage() { Message message = this.newEmptyMessage(); message.setDestination(this.getMulticastAddress()); @@ -103,7 +134,6 @@ @Override public Address getLocalAddress() { - // null is the local address in JGroups return new JGroupsAddress(jChannel.getAddress()); } Modified: trunk/diswork-fs/src/main/resources/log4j.properties =================================================================== --- trunk/diswork-fs/src/main/resources/log4j.properties 2010-05-04 17:54:32 UTC (rev 22) +++ trunk/diswork-fs/src/main/resources/log4j.properties 2010-05-06 08:25:12 UTC (rev 23) @@ -6,3 +6,6 @@ log4j.appender.stdout.layout.ConversionPattern=%d %5p [%t] (%F:%L) %M - %m%n # package level log4j.logger.org.nuiton.disworkfs=DEBUG +log4j.logger.org.nuiton.disworkfs.transport=WARN +#log4j.logger.org.nuiton.disworkfs.services.DownloadService=WARN +#log4j.logger.org.nuiton.disworkfs.services.UploadService=WARN Modified: trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/AbstractSplitFileTest.java =================================================================== --- trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/AbstractSplitFileTest.java 2010-05-04 17:54:32 UTC (rev 22) +++ trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/AbstractSplitFileTest.java 2010-05-06 08:25:12 UTC (rev 23) @@ -36,21 +36,22 @@ public void setUp() throws Exception { File tempDirectory = new File(tempDirectoryPath); tempDirectory.mkdir(); - + FileUtils.forceDeleteOnExit(tempDirectory); + // creating random data for the file byte[] randomBytes = new byte[randomFileSize]; random.nextBytes(randomBytes); // dumping random data into the file File randomFile = new File(randomFilePath); - FileUtils.writeByteArrayToFile(randomFile, randomBytes); + FileUtils.writeByteArrayToFile(randomFile, randomBytes); + } @After public void tearDown() throws Exception { // cleaning - new File(randomFilePath).delete(); - new File(tempDirectoryPath).delete(); + FileUtils.forceDelete(new File(tempDirectoryPath)); } } Modified: trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/DistributedFileSystemTest.java =================================================================== --- trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/DistributedFileSystemTest.java 2010-05-04 17:54:32 UTC (rev 22) +++ trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/DistributedFileSystemTest.java 2010-05-06 08:25:12 UTC (rev 23) @@ -7,7 +7,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.nuiton.disworkfs.config.DisworkConfig; import org.nuiton.util.FileUtil; public class DistributedFileSystemTest { @@ -27,7 +26,7 @@ /** * The file will have this fixed size */ - static protected int randomFileSize = 10 * 1000; + static protected int randomFileSize = 3000; static protected String storagePath1; static protected String storagePath2; Modified: trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromChunksTest.java =================================================================== --- trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromChunksTest.java 2010-05-04 17:54:32 UTC (rev 22) +++ trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromChunksTest.java 2010-05-06 08:25:12 UTC (rev 23) @@ -1,6 +1,5 @@ package org.nuiton.disworkfs.split; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -15,9 +14,8 @@ public class SplitFileFromChunksTest extends AbstractSplitFileTest { - @Test - public void simpleCopy() { + public void simpleCopyRandomAccess() { try { File randomFile = new File(randomFilePath); @@ -30,20 +28,16 @@ String splitedFileCopyPath = fileDescription.getFileName() + "_copy"; fileDescription.setFileName(splitedFileCopyPath); - SplitFileFromChunks splitFileCopy = new SplitFileFromChunks(fileDescription); + SplitFileFromChunks splitFileCopy = new SplitFileFromChunks(fileDescription, new File(splitedFileCopyPath)); // here is the simple copy from fist file to second file for (FileChunk fileChunk : splitFile.getAllChunks()) { assertFalse(splitFileCopy.isComplete()); - assertTrue(splitFileCopy.getActualSize() < fileDescription.getTotalSize()); splitFileCopy.addChunk(fileChunk); } - assertEquals(splitFileCopy.getActualSize(), fileDescription.getTotalSize()); assertTrue(splitFileCopy.isComplete()); // write the copy to the FS - boolean result = splitFileCopy.writeToLocalFileSystem(); - assertTrue("checking copy integrity", result); // compare the original and the copy byte by byte try { Modified: trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromLocalFileTest.java =================================================================== --- trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromLocalFileTest.java 2010-05-04 17:54:32 UTC (rev 22) +++ trunk/diswork-fs/src/test/java/org/nuiton/disworkfs/split/SplitFileFromLocalFileTest.java 2010-05-06 08:25:12 UTC (rev 23) @@ -6,6 +6,7 @@ import java.io.File; import java.io.IOException; +import java.util.List; import org.junit.Test; import org.nuiton.disworkfs.AbstractSplitFileTest; @@ -25,8 +26,16 @@ assertEquals(SplittedBytes.numberOfChunksNeededtoStore(randomFileSize), fileDescription.getNumberOfChunks()); assertEquals(fileDescription.getFileName(), randomFilePath); - splitFileFromLocalFile.getAllChunks(); + List<FileChunk> allChunks = splitFileFromLocalFile.getAllChunks(); + for (FileChunk fileChunk : allChunks) { + int chunkSize = fileChunk.getBytesChunk().getChunkSize(); + + assertTrue( + chunkSize == BytesChunk.MAX_CHUNK_SIZE + || chunkSize == randomFileSize % BytesChunk.MAX_CHUNK_SIZE); + } + } catch (IOException e) { fail(); e.printStackTrace();