diff options
Diffstat (limited to 'org.eclipse.jgit/src')
6 files changed, 263 insertions, 74 deletions
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/JGitText.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/JGitText.java index 29b319fe65..212cb7fdef 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/JGitText.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/JGitText.java @@ -175,6 +175,7 @@ public class JGitText extends TranslationBundle { /***/ public String checkoutUnexpectedResult; /***/ public String classCastNotA; /***/ public String cloneNonEmptyDirectory; + /***/ public String closed; /***/ public String collisionOn; /***/ public String commandRejectedByHook; /***/ public String commandWasCalledInTheWrongState; diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/lib/RepositoryCache.java b/org.eclipse.jgit/src/org/eclipse/jgit/lib/RepositoryCache.java index 9a57349f56..543511609d 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/lib/RepositoryCache.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/lib/RepositoryCache.java @@ -45,8 +45,6 @@ package org.eclipse.jgit.lib; import java.io.File; import java.io.IOException; -import java.lang.ref.Reference; -import java.lang.ref.SoftReference; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; @@ -202,8 +200,7 @@ public class RepositoryCache { return false; } FileKey key = new FileKey(gitDir, repo.getFS()); - Reference<Repository> repoRef = cache.cacheMap.get(key); - return repoRef != null && repoRef.get() == repo; + return cache.cacheMap.get(key) == repo; } /** Unregister all repositories from the cache. */ @@ -219,7 +216,7 @@ public class RepositoryCache { cache.configureEviction(repositoryCacheConfig); } - private final ConcurrentHashMap<Key, Reference<Repository>> cacheMap; + private final ConcurrentHashMap<Key, Repository> cacheMap; private final Lock[] openLocks; @@ -228,7 +225,7 @@ public class RepositoryCache { private volatile long expireAfter; private RepositoryCache() { - cacheMap = new ConcurrentHashMap<Key, Reference<Repository>>(); + cacheMap = new ConcurrentHashMap<Key, Repository>(); openLocks = new Lock[4]; for (int i = 0; i < openLocks.length; i++) { openLocks[i] = new Lock(); @@ -261,19 +258,15 @@ public class RepositoryCache { } } - @SuppressWarnings("resource") private Repository openRepository(final Key location, final boolean mustExist) throws IOException { - Reference<Repository> ref = cacheMap.get(location); - Repository db = ref != null ? ref.get() : null; + Repository db = cacheMap.get(location); if (db == null) { synchronized (lockFor(location)) { - ref = cacheMap.get(location); - db = ref != null ? ref.get() : null; + db = cacheMap.get(location); if (db == null) { db = location.open(mustExist); - ref = new SoftReference<Repository>(db); - cacheMap.put(location, ref); + cacheMap.put(location, db); } else { db.incrementOpen(); } @@ -285,16 +278,13 @@ public class RepositoryCache { } private void registerRepository(final Key location, final Repository db) { - SoftReference<Repository> newRef = new SoftReference<Repository>(db); - Reference<Repository> oldRef = cacheMap.put(location, newRef); - Repository oldDb = oldRef != null ? oldRef.get() : null; + Repository oldDb = cacheMap.put(location, db); if (oldDb != null) oldDb.close(); } private Repository unregisterRepository(final Key location) { - Reference<Repository> oldRef = cacheMap.remove(location); - return oldRef != null ? oldRef.get() : null; + return cacheMap.remove(location); } private boolean isExpired(Repository db) { @@ -316,8 +306,7 @@ public class RepositoryCache { } private void clearAllExpired() { - for (Reference<Repository> ref : cacheMap.values()) { - Repository db = ref.get(); + for (Repository db : cacheMap.values()) { if (isExpired(db)) { RepositoryCache.close(db); } @@ -325,7 +314,7 @@ public class RepositoryCache { } private void clearAll() { - for (Iterator<Map.Entry<Key, Reference<Repository>>> i = cacheMap + for (Iterator<Map.Entry<Key, Repository>> i = cacheMap .entrySet().iterator(); i.hasNext();) { unregisterAndCloseRepository(i.next().getKey()); } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/lib/RepositoryCacheConfig.java b/org.eclipse.jgit/src/org/eclipse/jgit/lib/RepositoryCacheConfig.java index 428dea3e67..28cdaae443 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/lib/RepositoryCacheConfig.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/lib/RepositoryCacheConfig.java @@ -53,8 +53,8 @@ public class RepositoryCacheConfig { /** * Set cleanupDelayMillis to this value in order to switch off time-based - * cache eviction. The JVM can still expire cache entries when heap memory - * runs low. + * cache eviction. Expired cache entries will only be evicted when + * RepositoryCache.clearExpired or RepositoryCache.clear are called. */ public static final long NO_CLEANUP = 0; diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/JschSession.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/JschSession.java index 1dfe5d9797..fa27bfce5f 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/JschSession.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/JschSession.java @@ -48,15 +48,14 @@ package org.eclipse.jgit.transport; +import java.io.BufferedOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; import org.eclipse.jgit.errors.TransportException; import org.eclipse.jgit.internal.JGitText; -import org.eclipse.jgit.util.io.StreamCopyThread; +import org.eclipse.jgit.util.io.IsolatedOutputStream; import com.jcraft.jsch.Channel; import com.jcraft.jsch.ChannelExec; @@ -178,33 +177,12 @@ public class JschSession implements RemoteSession { // that we spawn a background thread to shuttle data through a pipe, // as we can issue an interrupted write out of that. Its slower, so // we only use this route if there is a timeout. - final OutputStream out = channel.getOutputStream(); + OutputStream out = channel.getOutputStream(); if (timeout <= 0) { outputStream = out; } else { - final PipedInputStream pipeIn = new PipedInputStream(); - final StreamCopyThread copier = new StreamCopyThread(pipeIn, - out); - final PipedOutputStream pipeOut = new PipedOutputStream(pipeIn) { - @Override - public void flush() throws IOException { - super.flush(); - copier.flush(); - } - - @Override - public void close() throws IOException { - super.close(); - try { - copier.join(timeout * 1000); - } catch (InterruptedException e) { - // Just wake early, the thread will terminate - // anyway. - } - } - }; - copier.start(); - outputStream = pipeOut; + IsolatedOutputStream i = new IsolatedOutputStream(out); + outputStream = new BufferedOutputStream(i, 16 * 1024); } errStream = channel.getErrStream(); diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/util/io/IsolatedOutputStream.java b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/IsolatedOutputStream.java new file mode 100644 index 0000000000..cdc4a4d863 --- /dev/null +++ b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/IsolatedOutputStream.java @@ -0,0 +1,241 @@ +/* + * Copyright (C) 2016, Google Inc. + * and other copyright owners as documented in the project's IP log. + * + * This program and the accompanying materials are made available + * under the terms of the Eclipse Distribution License v1.0 which + * accompanies this distribution, is reproduced below, and is + * available at http://www.eclipse.org/org/documents/edl-v10.php + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * - Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided + * with the distribution. + * + * - Neither the name of the Eclipse Foundation, Inc. nor the + * names of its contributors may be used to endorse or promote + * products derived from this software without specific prior + * written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND + * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package org.eclipse.jgit.util.io; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.eclipse.jgit.internal.JGitText; + +/** + * OutputStream isolated from interrupts. + * <p> + * Wraps an OutputStream to prevent interrupts during writes from being made + * visible to that stream instance. This works around buggy or difficult + * OutputStream implementations like JSch that cannot gracefully handle an + * interrupt during write. + * <p> + * Every write (or flush) requires a context switch to another thread. Callers + * should wrap this stream with {@code BufferedOutputStream} using a suitable + * buffer size to amortize the cost of context switches. + */ +public class IsolatedOutputStream extends OutputStream { + private final OutputStream dst; + private final ExecutorService copier; + private Future<Void> pending; + + /** + * Wraps an OutputStream. + * + * @param out + * stream to send all writes to. + */ + public IsolatedOutputStream(OutputStream out) { + dst = out; + copier = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<Runnable>(1), new NamedThreadFactory()); + } + + @Override + public void write(int ch) throws IOException { + write(new byte[] { (byte) ch }, 0, 1); + } + + @Override + public void write(final byte[] buf, final int pos, final int cnt) + throws IOException { + checkClosed(); + execute(new Callable<Void>() { + @Override + public Void call() throws IOException { + dst.write(buf, pos, cnt); + return null; + } + }); + } + + @Override + public void flush() throws IOException { + checkClosed(); + execute(new Callable<Void>() { + @Override + public Void call() throws IOException { + dst.flush(); + return null; + } + }); + } + + @Override + public void close() throws IOException { + if (!copier.isShutdown()) { + try { + if (pending == null || tryCleanClose()) { + cleanClose(); + } else { + dirtyClose(); + } + } finally { + copier.shutdown(); + } + } + } + + private boolean tryCleanClose() { + /* + * If the caller stopped waiting for a prior write or flush, they could + * be trying to close a stream that is still in-use. Check if the prior + * operation ended in a predictable way. + */ + try { + pending.get(0, TimeUnit.MILLISECONDS); + pending = null; + return true; + } catch (TimeoutException | InterruptedException e) { + return false; + } catch (ExecutionException e) { + pending = null; + return true; + } + } + + private void cleanClose() throws IOException { + execute(new Callable<Void>() { + @Override + public Void call() throws IOException { + dst.close(); + return null; + } + }); + } + + private void dirtyClose() throws IOException { + /* + * Interrupt any still pending write or flush operation. This may cause + * massive failures inside of the stream, but its going to be closed as + * the next step. + */ + pending.cancel(true); + + Future<Void> close; + try { + close = copier.submit(new Callable<Void>() { + @Override + public Void call() throws IOException { + dst.close(); + return null; + } + }); + } catch (RejectedExecutionException e) { + throw new IOException(e); + } + try { + close.get(200, TimeUnit.MILLISECONDS); + } catch (InterruptedException | TimeoutException e) { + close.cancel(true); + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } + } + + private void checkClosed() throws IOException { + if (copier.isShutdown()) { + throw new IOException(JGitText.get().closed); + } + } + + private void execute(Callable<Void> task) throws IOException { + if (pending != null) { + // Check (and rethrow) any prior failed operation. + checkedGet(pending); + } + try { + pending = copier.submit(task); + } catch (RejectedExecutionException e) { + throw new IOException(e); + } + checkedGet(pending); + pending = null; + } + + private static void checkedGet(Future<Void> future) throws IOException { + try { + future.get(); + } catch (InterruptedException e) { + throw interrupted(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } + } + + private static InterruptedIOException interrupted(InterruptedException c) { + InterruptedIOException e = new InterruptedIOException(); + e.initCause(c); + return e; + } + + private static class NamedThreadFactory implements ThreadFactory { + private static final AtomicInteger cnt = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + int n = cnt.incrementAndGet(); + String name = IsolatedOutputStream.class.getSimpleName() + '-' + n; + return new Thread(r, name); + } + } +} diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java index ae760e3b8e..329a7a161f 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java @@ -59,8 +59,6 @@ public class StreamCopyThread extends Thread { private volatile boolean done; - private final AtomicInteger flushCount = new AtomicInteger(0); - /** Lock held by flush to avoid interrupting a write. */ private final Object writeLock; @@ -88,8 +86,8 @@ public class StreamCopyThread extends Thread { * happen at some future point in time, when the thread wakes up to process * the request. */ + @Deprecated public void flush() { - flushCount.incrementAndGet(); synchronized (writeLock) { interrupt(); } @@ -119,7 +117,6 @@ public class StreamCopyThread extends Thread { public void run() { try { final byte[] buf = new byte[BUFFER_SIZE]; - int flushCountBeforeRead = 0; boolean readInterrupted = false; for (;;) { try { @@ -132,18 +129,11 @@ public class StreamCopyThread extends Thread { } } readInterrupted = false; - if (!flushCount.compareAndSet(flushCountBeforeRead, 0)) { - // There was a flush() call since last blocked read. - // Set interrupt status, so next blocked read will throw - // an InterruptedIOException and we will flush again. - interrupt(); - } } if (done) break; - flushCountBeforeRead = flushCount.get(); final int n; try { n = src.read(buf); @@ -156,19 +146,9 @@ public class StreamCopyThread extends Thread { synchronized (writeLock) { boolean writeInterrupted = Thread.interrupted(); - for (;;) { - try { - dst.write(buf, 0, n); - } catch (InterruptedIOException wakey) { - writeInterrupted = true; - continue; - } - - // set interrupt status, which will be checked - // when we block in src.read - if (writeInterrupted || flushCount.get() > 0) - interrupt(); - break; + dst.write(buf, 0, n); + if (writeInterrupted) { + interrupt(); } } } catch (IOException e) { |