From 60f5f07dd68254ff61b3e1fed65c30a19d284631 Mon Sep 17 00:00:00 2001 From: kapodamy Date: Tue, 2 Jul 2019 21:07:21 -0300 Subject: [PATCH] commit (3 changes) * re-write download segmenting logic (issue #). * clean-up download threads handling. * fix race-condition if "pause" option in download context menu was selected, in the transition from "pending" to "finished" state. --- .../giga/get/DownloadInitializer.java | 42 +-- .../us/shandian/giga/get/DownloadMission.java | 254 +++++++++--------- .../shandian/giga/get/DownloadRunnable.java | 86 +++--- .../giga/get/DownloadRunnableFallback.java | 27 +- .../giga/ui/adapter/MissionAdapter.java | 2 + 5 files changed, 191 insertions(+), 220 deletions(-) diff --git a/app/src/main/java/us/shandian/giga/get/DownloadInitializer.java b/app/src/main/java/us/shandian/giga/get/DownloadInitializer.java index 5239c5bb70c..8f1b9ba4ebb 100644 --- a/app/src/main/java/us/shandian/giga/get/DownloadInitializer.java +++ b/app/src/main/java/us/shandian/giga/get/DownloadInitializer.java @@ -35,9 +35,7 @@ public void run() { int retryCount = 0; while (true) { try { - mMission.currentThreadCount = mMission.threadCount; - - if (mMission.blocks < 0 && mMission.current == 0) { + if (mMission.blocks == null && mMission.current == 0) { // calculate the whole size of the mission long finalLength = 0; long lowestSize = Long.MAX_VALUE; @@ -83,11 +81,9 @@ public void run() { // check for dynamic generated content if (mMission.length == -1 && mConn.getResponseCode() == 200) { - mMission.blocks = 0; + mMission.blocks = new int[0]; mMission.length = 0; - mMission.fallback = true; mMission.unknownLength = true; - mMission.currentThreadCount = 1; if (DEBUG) { Log.d(TAG, "falling back (unknown length)"); @@ -99,24 +95,17 @@ public void run() { if (!mMission.running || Thread.interrupted()) return; - synchronized (mMission.blockState) { + synchronized (mMission.LOCK) { if (mConn.getResponseCode() == 206) { - if (mMission.currentThreadCount > 1) { - mMission.blocks = mMission.length / DownloadMission.BLOCK_SIZE; - - if (mMission.currentThreadCount > mMission.blocks) { - mMission.currentThreadCount = (int) mMission.blocks; - } - if (mMission.currentThreadCount <= 0) { - mMission.currentThreadCount = 1; - } - if (mMission.blocks * DownloadMission.BLOCK_SIZE < mMission.length) { - mMission.blocks++; - } + + if (mMission.threadCount > 1) { + int count = (int) (mMission.length / DownloadMission.BLOCK_SIZE); + if ((count * DownloadMission.BLOCK_SIZE) < mMission.length) count++; + + mMission.blocks = new int[count]; } else { - // if one thread is solicited don't calculate blocks, is useless - mMission.blocks = 1; - mMission.fallback = true; + // if one thread is required don't calculate blocks, is useless + mMission.blocks = new int[0]; mMission.unknownLength = false; } @@ -125,20 +114,13 @@ public void run() { } } else { // Fallback to single thread - mMission.blocks = 0; - mMission.fallback = true; + mMission.blocks = new int[0]; mMission.unknownLength = false; - mMission.currentThreadCount = 1; if (DEBUG) { Log.d(TAG, "falling back due http response code = " + mConn.getResponseCode()); } } - - for (long i = 0; i < mMission.currentThreadCount; i++) { - mMission.threadBlockPositions.add(i); - mMission.threadBytePositions.add(0L); - } } if (!mMission.running || Thread.interrupted()) return; diff --git a/app/src/main/java/us/shandian/giga/get/DownloadMission.java b/app/src/main/java/us/shandian/giga/get/DownloadMission.java index b3e32a43cdf..eb96d53f4b5 100644 --- a/app/src/main/java/us/shandian/giga/get/DownloadMission.java +++ b/app/src/main/java/us/shandian/giga/get/DownloadMission.java @@ -9,15 +9,14 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.Serializable; import java.net.ConnectException; import java.net.HttpURLConnection; import java.net.SocketTimeoutException; import java.net.URL; import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; +import javax.annotation.Nullable; import javax.net.ssl.SSLException; import us.shandian.giga.io.StoredFileHelper; @@ -28,10 +27,13 @@ import static org.schabi.newpipe.BuildConfig.DEBUG; public class DownloadMission extends Mission { - private static final long serialVersionUID = 4L;// last bump: 27 march 2019 + private static final long serialVersionUID = 5L;// last bump: 30 june 2019 static final int BUFFER_SIZE = 64 * 1024; - final static int BLOCK_SIZE = 512 * 1024; + static final int BLOCK_SIZE = 512 * 1024; + + @SuppressWarnings("SpellCheckingInspection") + private static final String INSUFFICIENT_STORAGE = "ENOSPC"; private static final String TAG = "DownloadMission"; @@ -57,11 +59,6 @@ public class DownloadMission extends Mission { */ public String[] urls; - /** - * Number of blocks the size of {@link DownloadMission#BLOCK_SIZE} - */ - long blocks = -1; - /** * Number of bytes downloaded */ @@ -92,7 +89,7 @@ public class DownloadMission extends Mission { public Postprocessing psAlgorithm; /** - * The current resource to download, see {@code urls[current]} and {@code offsets[current]} + * The current resource to download, {@code urls[current]} and {@code offsets[current]} */ public int current; @@ -111,33 +108,42 @@ public class DownloadMission extends Mission { */ public long nearLength; + /** + * Download blocks, the size is multiple of {@link DownloadMission#BLOCK_SIZE}. + * Every entry (block) in this array holds an offset, used to resume the download. + * An block offset can be -1 if the block was downloaded successfully. + */ + int[] blocks; + + /** + * Download/File resume offset in fallback mode (if applicable) {@link DownloadRunnableFallback} + */ + long fallbackResumeOffset; + + /** + * Maximum of download threads running, chosen by the user + */ public int threadCount = 3; - boolean fallback; - private int finishCount; + + private transient int finishCount; public transient boolean running; public boolean enqueued; public int errCode = ERROR_NOTHING; - public Exception errObject = null; + public transient boolean recovered; public transient Handler mHandler; private transient boolean mWritingToFile; + private transient boolean[] blockAcquired; - @SuppressWarnings("UseSparseArrays")// LongSparseArray is not serializable - final HashMap blockState = new HashMap<>(); - final List threadBlockPositions = new ArrayList<>(); - final List threadBytePositions = new ArrayList<>(); + final Object LOCK = new Lock(); private transient boolean deleted; - int currentThreadCount; + public transient volatile Thread[] threads = new Thread[0]; private transient Thread init = null; - protected DownloadMission() { - - } - public DownloadMission(String[] urls, StoredFileHelper storage, char kind, Postprocessing psInstance) { if (urls == null) throw new NullPointerException("urls is null"); if (urls.length < 1) throw new IllegalArgumentException("urls is empty"); @@ -154,69 +160,40 @@ public DownloadMission(String[] urls, StoredFileHelper storage, char kind, Postp } } - private void checkBlock(long block) { - if (block < 0 || block >= blocks) { - throw new IllegalArgumentException("illegal block identifier"); - } - } - /** - * Check if a block is reserved + * Acquire a block * - * @param block the block identifier - * @return true if the block is reserved and false if otherwise - */ - boolean isBlockPreserved(long block) { - checkBlock(block); - //noinspection ConstantConditions - return blockState.containsKey(block) ? blockState.get(block) : false; - } - - void preserveBlock(long block) { - checkBlock(block); - synchronized (blockState) { - blockState.put(block, true); + * @return the block or {@code null} if no more blocks left + */ + @Nullable + Block acquireBlock() { + synchronized (LOCK) { + for (int i = 0; i < blockAcquired.length; i++) { + if (!blockAcquired[i] && blocks[i] >= 0) { + Block block = new Block(); + block.position = i; + block.done = blocks[i]; + + blockAcquired[i] = true; + return block; + } + } } - } - - /** - * Set the block of the file - * - * @param threadId the identifier of the thread - * @param position the block of the thread - */ - void setBlockPosition(int threadId, long position) { - threadBlockPositions.set(threadId, position); - } - - /** - * Get the block of a file - * - * @param threadId the identifier of the thread - * @return the block for the thread - */ - long getBlockPosition(int threadId) { - return threadBlockPositions.get(threadId); - } - /** - * Save the position of the desired thread - * - * @param threadId the identifier of the thread - * @param position the relative position in bytes or zero - */ - void setThreadBytePosition(int threadId, long position) { - threadBytePositions.set(threadId, position); + return null; } /** - * Get position inside of the thread, where thread will be resumed + * Release an block * - * @param threadId the identifier of the thread - * @return the relative position in bytes or zero + * @param position the index of the block + * @param done amount of bytes downloaded */ - long getThreadBytePosition(int threadId) { - return threadBytePositions.get(threadId); + void releaseBlock(int position, int done) { + synchronized (LOCK) { + blockAcquired[position] = false; + blocks[position] = done; + } } /** @@ -341,12 +318,11 @@ synchronized void notifyError(Exception err) { public synchronized void notifyError(int code, Exception err) { Log.e(TAG, "notifyError() code = " + code, err); - if (err instanceof IOException) { if (!storage.canWrite() || err.getMessage().contains("Permission denied")) { code = ERROR_PERMISSION_DENIED; err = null; - } else if (err.getMessage().contains("ENOSPC")) { + } else if (err.getMessage().contains(INSUFFICIENT_STORAGE)) { code = ERROR_INSUFFICIENT_STORAGE; err = null; } @@ -368,9 +344,13 @@ public synchronized void notifyError(int code, Exception err) { if (code < 500 || code > 599) enqueued = false; } - pause(); - notify(DownloadManagerService.MESSAGE_ERROR); + + if (running) { + running = false; + recovered = true; + if (threads != null) selfPause(); + } } synchronized void notifyFinished() { @@ -378,11 +358,11 @@ synchronized void notifyFinished() { finishCount++; - if (finishCount == currentThreadCount) { + if (blocks.length < 1 || threads == null || finishCount == threads.length) { if (errCode != ERROR_NOTHING) return; if (DEBUG) { - Log.d(TAG, "onFinish" + (current + 1) + "/" + urls.length); + Log.d(TAG, "onFinish: " + (current + 1) + "/" + urls.length); } if ((current + 1) < urls.length) { @@ -421,7 +401,7 @@ private void notifyPostProcessing(int state) { Log.d(TAG, action + " postprocessing on " + storage.getName()); - synchronized (blockState) { + synchronized (LOCK) { // don't return without fully write the current state psState = state; Utility.writeToFile(metadata, DownloadMission.this); @@ -442,39 +422,40 @@ public void start() { running = true; errCode = ERROR_NOTHING; - if (current >= urls.length && psAlgorithm != null) { - runAsync(1, () -> { - if (doPostprocessing()) { - running = false; - deleteThisFromFile(); - - notify(DownloadManagerService.MESSAGE_FINISHED); - } - }); - + if (current >= urls.length) { + threads = null; + runAsync(1, this::notifyFinished); return; } - if (blocks < 0) { + if (blocks == null) { initializer(); return; } init = null; + finishCount = 0; + blockAcquired = new boolean[blocks.length]; - if (threads == null || threads.length < 1) { - threads = new Thread[currentThreadCount]; - } - - if (fallback) { + if (blocks.length < 1) { if (unknownLength) { done = 0; length = 0; } - threads[0] = runAsync(1, new DownloadRunnableFallback(this)); + threads = new Thread[]{runAsync(1, new DownloadRunnableFallback(this))}; } else { - for (int i = 0; i < currentThreadCount; i++) { + int remainingBlocks = 0; + for (int block : blocks) if (block >= 0) remainingBlocks++; + + if (remainingBlocks < 1) { + runAsync(1, this::notifyFinished); + return; + } + + threads = new Thread[Math.min(threadCount, remainingBlocks)]; + + for (int i = 0; i < threads.length; i++) { threads[i] = runAsync(i + 1, new DownloadRunnable(this, i)); } } @@ -483,7 +464,7 @@ public void start() { /** * Pause the mission */ - public synchronized void pause() { + public void pause() { if (!running) return; if (isPsRunning()) { @@ -496,38 +477,42 @@ public synchronized void pause() { running = false; recovered = true; - if (init != null && Thread.currentThread() != init && init.isAlive()) { + if (init != null && init.isAlive()) { + // NOTE: if start() method is running ¡will no have effect! init.interrupt(); - synchronized (blockState) { + synchronized (LOCK) { resetState(false, true, ERROR_NOTHING); } return; } - if (DEBUG && blocks == 0) { + if (DEBUG && unknownLength) { Log.w(TAG, "pausing a download that can not be resumed (range requests not allowed by the server)."); } - if (threads == null || Thread.currentThread().isInterrupted()) { + // check if the calling thread (alias UI thread) is interrupted + if (Thread.currentThread().isInterrupted()) { writeThisToFile(); return; } // wait for all threads are suspended before save the state - runAsync(-1, () -> { - try { - for (Thread thread : threads) { - if (thread.isAlive()) { - thread.interrupt(); - thread.join(5000); - } + if (threads != null) runAsync(-1, this::selfPause); + } + + private void selfPause() { + try { + for (Thread thread : threads) { + if (thread.isAlive()) { + thread.interrupt(); + thread.join(5000); } - } catch (Exception e) { - // nothing to do - } finally { - writeThisToFile(); } - }); + } catch (Exception e) { + // nothing to do + } finally { + writeThisToFile(); + } } /** @@ -553,16 +538,13 @@ public boolean delete() { */ public void resetState(boolean rollback, boolean persistChanges, int errorCode) { done = 0; - blocks = -1; errCode = errorCode; errObject = null; - fallback = false; unknownLength = false; - finishCount = 0; - threadBlockPositions.clear(); - threadBytePositions.clear(); - blockState.clear(); - threads = new Thread[0]; + threads = null; + fallbackResumeOffset = 0; + blocks = null; + blockAcquired = null; if (rollback) current = 0; @@ -572,7 +554,6 @@ public void resetState(boolean rollback, boolean persistChanges, int errorCode) private void initializer() { init = runAsync(DownloadInitializer.mId, new DownloadInitializer(this)); - } /** @@ -580,7 +561,7 @@ private void initializer() { * if no thread is already running. */ private void writeThisToFile() { - synchronized (blockState) { + synchronized (LOCK) { if (deleted) return; Utility.writeToFile(metadata, DownloadMission.this); } @@ -626,7 +607,7 @@ public boolean isPsRunning() { * @return true, otherwise, false */ public boolean isInitialized() { - return blocks >= 0; // DownloadMissionInitializer was executed + return blocks != null; // DownloadMissionInitializer was executed } /** @@ -727,7 +708,7 @@ private boolean doPostprocessing() { } private boolean deleteThisFromFile() { - synchronized (blockState) { + synchronized (LOCK) { return metadata.delete(); } } @@ -789,7 +770,7 @@ private void joinForThread(Thread thread) { static class HttpError extends Exception { - int statusCode; + final int statusCode; HttpError(int statusCode) { this.statusCode = statusCode; @@ -797,7 +778,16 @@ static class HttpError extends Exception { @Override public String getMessage() { - return "HTTP " + String.valueOf(statusCode); + return "HTTP " + statusCode; } } + + static class Block { + int position; + int done; + } + + private static class Lock implements Serializable { + // java.lang.Object cannot be used because is not serializable + } } diff --git a/app/src/main/java/us/shandian/giga/get/DownloadRunnable.java b/app/src/main/java/us/shandian/giga/get/DownloadRunnable.java index 4380c0c68d0..6d1d2bfbce8 100644 --- a/app/src/main/java/us/shandian/giga/get/DownloadRunnable.java +++ b/app/src/main/java/us/shandian/giga/get/DownloadRunnable.java @@ -9,8 +9,11 @@ import java.net.HttpURLConnection; import java.nio.channels.ClosedByInterruptException; +import us.shandian.giga.get.DownloadMission.Block; + import static org.schabi.newpipe.BuildConfig.DEBUG; + /** * Runnable to download blocks of a file until the file is completely downloaded, * an error occurs or the process is stopped. @@ -29,14 +32,19 @@ public class DownloadRunnable extends Thread { mId = id; } + private void releaseBlock(Block block, long remain) { + // set the block offset to -1 if it is completed + mMission.releaseBlock(block.position, remain < 0 ? -1 : block.done); + } + @Override public void run() { - boolean retry = mMission.recovered; - long blockPosition = mMission.getBlockPosition(mId); + boolean retry = false; + Block block = null; + int retryCount = 0; if (DEBUG) { - Log.d(TAG, mId + ":default pos " + blockPosition); Log.d(TAG, mId + ":recovered: " + mMission.recovered); } @@ -50,65 +58,57 @@ public void run() { return; } - while (mMission.running && mMission.errCode == DownloadMission.ERROR_NOTHING && blockPosition < mMission.blocks) { - - if (DEBUG && retry) { - Log.d(TAG, mId + ":retry is true. Resuming at " + blockPosition); - } - - // Wait for an unblocked position - while (!retry && blockPosition < mMission.blocks && mMission.isBlockPreserved(blockPosition)) { - - if (DEBUG) { - Log.d(TAG, mId + ":position " + blockPosition + " preserved, passing"); - } - - blockPosition++; + while (mMission.running && mMission.errCode == DownloadMission.ERROR_NOTHING) { + if (!retry) { + block = mMission.acquireBlock(); } - retry = false; - - if (blockPosition >= mMission.blocks) { + if (block == null) { + if (DEBUG) Log.d(TAG, mId + ":no more blocks left, exiting"); break; } if (DEBUG) { - Log.d(TAG, mId + ":preserving position " + blockPosition); + if (retry) + Log.d(TAG, mId + ":retry block at position=" + block.position + " from the start"); + else + Log.d(TAG, mId + ":acquired block at position=" + block.position + " done=" + block.done); } - mMission.preserveBlock(blockPosition); - mMission.setBlockPosition(mId, blockPosition); - - long start = blockPosition * DownloadMission.BLOCK_SIZE; + long start = block.position * DownloadMission.BLOCK_SIZE; long end = start + DownloadMission.BLOCK_SIZE - 1; - long offset = mMission.getThreadBytePosition(mId); - start += offset; + start += block.done; if (end >= mMission.length) { end = mMission.length - 1; } - long total = 0; - try { mConn = mMission.openConnection(mId, start, end); mMission.establishConnection(mId, mConn); // check if the download can be resumed - if (mConn.getResponseCode() == 416 && offset > 0) { - retryCount--; + if (mConn.getResponseCode() == 416) { + if (block.done > 0) { + // try again from the start (of the block) + block.done = 0; + retry = true; + mConn.disconnect(); + continue; + } + throw new DownloadMission.HttpError(416); } + retry = false; + // The server may be ignoring the range request if (mConn.getResponseCode() != 206) { - mMission.notifyError(new DownloadMission.HttpError(mConn.getResponseCode())); - if (DEBUG) { Log.e(TAG, mId + ":Unsupported " + mConn.getResponseCode()); } - + mMission.notifyError(new DownloadMission.HttpError(mConn.getResponseCode())); break; } @@ -122,26 +122,14 @@ public void run() { while (start < end && mMission.running && (len = is.read(buf, 0, buf.length)) != -1) { f.write(buf, 0, len); start += len; - total += len; + block.done += len; mMission.notifyProgress(len); } if (DEBUG && mMission.running) { - Log.d(TAG, mId + ":position " + blockPosition + " finished, " + total + " bytes downloaded"); + Log.d(TAG, mId + ":position " + block.position + " stopped " + start + "/" + end); } - - if (mMission.running) - mMission.setThreadBytePosition(mId, 0L);// clear byte position for next block - else - mMission.setThreadBytePosition(mId, total);// download paused, save progress for this block - } catch (Exception e) { - if (DEBUG) { - Log.d(TAG, mId + ": position=" + blockPosition + " total=" + total + " stopped due exception", e); - } - - mMission.setThreadBytePosition(mId, total); - if (!mMission.running || e instanceof ClosedByInterruptException) break; if (retryCount++ >= mMission.maxRetry) { @@ -150,6 +138,8 @@ public void run() { } retry = true; + } finally { + if (!retry) releaseBlock(block, end - start); } } diff --git a/app/src/main/java/us/shandian/giga/get/DownloadRunnableFallback.java b/app/src/main/java/us/shandian/giga/get/DownloadRunnableFallback.java index d7ff208ce8e..d93053881ad 100644 --- a/app/src/main/java/us/shandian/giga/get/DownloadRunnableFallback.java +++ b/app/src/main/java/us/shandian/giga/get/DownloadRunnableFallback.java @@ -41,17 +41,25 @@ private void dispose() { if (mF != null) mF.close(); } + private long loadPosition() { + synchronized (mMission.LOCK) { + return mMission.fallbackResumeOffset; + } + } + + private void savePosition(long position) { + synchronized (mMission.LOCK) { + mMission.fallbackResumeOffset = position; + } + } + @Override public void run() { boolean done; + long start = loadPosition(); - long start = 0; - - if (!mMission.unknownLength) { - start = mMission.getThreadBytePosition(0); - if (DEBUG && start > 0) { - Log.i(TAG, "Resuming a single-thread download at " + start); - } + if (DEBUG && !mMission.unknownLength && start > 0) { + Log.i(TAG, "Resuming a single-thread download at " + start); } try { @@ -91,8 +99,7 @@ public void run() { } catch (Exception e) { dispose(); - // save position - mMission.setThreadBytePosition(0, start); + savePosition(start); if (!mMission.running || e instanceof ClosedByInterruptException) return; @@ -114,7 +121,7 @@ public void run() { if (done) { mMission.notifyFinished(); } else { - mMission.setThreadBytePosition(0, start); + savePosition(start); } } diff --git a/app/src/main/java/us/shandian/giga/ui/adapter/MissionAdapter.java b/app/src/main/java/us/shandian/giga/ui/adapter/MissionAdapter.java index 21cae18352d..ebc6e94c2de 100644 --- a/app/src/main/java/us/shandian/giga/ui/adapter/MissionAdapter.java +++ b/app/src/main/java/us/shandian/giga/ui/adapter/MissionAdapter.java @@ -540,6 +540,8 @@ public void clearFinishedDownloads() { } private boolean handlePopupItem(@NonNull ViewHolderItem h, @NonNull MenuItem option) { + if (h.item == null) return true; + int id = option.getItemId(); DownloadMission mission = h.item.mission instanceof DownloadMission ? (DownloadMission) h.item.mission : null;