summaryrefslogtreecommitdiffstats
path: root/org.eclipse.jgit/src
diff options
context:
space:
mode:
Diffstat (limited to 'org.eclipse.jgit/src')
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/internal/JGitText.java1
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/lib/RepositoryCache.java31
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/lib/RepositoryCacheConfig.java4
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/transport/JschSession.java32
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/util/io/IsolatedOutputStream.java241
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java28
6 files changed, 263 insertions, 74 deletions
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/JGitText.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/JGitText.java
index 29b319fe65..212cb7fdef 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/JGitText.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/JGitText.java
@@ -175,6 +175,7 @@ public class JGitText extends TranslationBundle {
/***/ public String checkoutUnexpectedResult;
/***/ public String classCastNotA;
/***/ public String cloneNonEmptyDirectory;
+ /***/ public String closed;
/***/ public String collisionOn;
/***/ public String commandRejectedByHook;
/***/ public String commandWasCalledInTheWrongState;
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/lib/RepositoryCache.java b/org.eclipse.jgit/src/org/eclipse/jgit/lib/RepositoryCache.java
index 9a57349f56..543511609d 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/lib/RepositoryCache.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/lib/RepositoryCache.java
@@ -45,8 +45,6 @@ package org.eclipse.jgit.lib;
import java.io.File;
import java.io.IOException;
-import java.lang.ref.Reference;
-import java.lang.ref.SoftReference;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
@@ -202,8 +200,7 @@ public class RepositoryCache {
return false;
}
FileKey key = new FileKey(gitDir, repo.getFS());
- Reference<Repository> repoRef = cache.cacheMap.get(key);
- return repoRef != null && repoRef.get() == repo;
+ return cache.cacheMap.get(key) == repo;
}
/** Unregister all repositories from the cache. */
@@ -219,7 +216,7 @@ public class RepositoryCache {
cache.configureEviction(repositoryCacheConfig);
}
- private final ConcurrentHashMap<Key, Reference<Repository>> cacheMap;
+ private final ConcurrentHashMap<Key, Repository> cacheMap;
private final Lock[] openLocks;
@@ -228,7 +225,7 @@ public class RepositoryCache {
private volatile long expireAfter;
private RepositoryCache() {
- cacheMap = new ConcurrentHashMap<Key, Reference<Repository>>();
+ cacheMap = new ConcurrentHashMap<Key, Repository>();
openLocks = new Lock[4];
for (int i = 0; i < openLocks.length; i++) {
openLocks[i] = new Lock();
@@ -261,19 +258,15 @@ public class RepositoryCache {
}
}
- @SuppressWarnings("resource")
private Repository openRepository(final Key location,
final boolean mustExist) throws IOException {
- Reference<Repository> ref = cacheMap.get(location);
- Repository db = ref != null ? ref.get() : null;
+ Repository db = cacheMap.get(location);
if (db == null) {
synchronized (lockFor(location)) {
- ref = cacheMap.get(location);
- db = ref != null ? ref.get() : null;
+ db = cacheMap.get(location);
if (db == null) {
db = location.open(mustExist);
- ref = new SoftReference<Repository>(db);
- cacheMap.put(location, ref);
+ cacheMap.put(location, db);
} else {
db.incrementOpen();
}
@@ -285,16 +278,13 @@ public class RepositoryCache {
}
private void registerRepository(final Key location, final Repository db) {
- SoftReference<Repository> newRef = new SoftReference<Repository>(db);
- Reference<Repository> oldRef = cacheMap.put(location, newRef);
- Repository oldDb = oldRef != null ? oldRef.get() : null;
+ Repository oldDb = cacheMap.put(location, db);
if (oldDb != null)
oldDb.close();
}
private Repository unregisterRepository(final Key location) {
- Reference<Repository> oldRef = cacheMap.remove(location);
- return oldRef != null ? oldRef.get() : null;
+ return cacheMap.remove(location);
}
private boolean isExpired(Repository db) {
@@ -316,8 +306,7 @@ public class RepositoryCache {
}
private void clearAllExpired() {
- for (Reference<Repository> ref : cacheMap.values()) {
- Repository db = ref.get();
+ for (Repository db : cacheMap.values()) {
if (isExpired(db)) {
RepositoryCache.close(db);
}
@@ -325,7 +314,7 @@ public class RepositoryCache {
}
private void clearAll() {
- for (Iterator<Map.Entry<Key, Reference<Repository>>> i = cacheMap
+ for (Iterator<Map.Entry<Key, Repository>> i = cacheMap
.entrySet().iterator(); i.hasNext();) {
unregisterAndCloseRepository(i.next().getKey());
}
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/lib/RepositoryCacheConfig.java b/org.eclipse.jgit/src/org/eclipse/jgit/lib/RepositoryCacheConfig.java
index 428dea3e67..28cdaae443 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/lib/RepositoryCacheConfig.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/lib/RepositoryCacheConfig.java
@@ -53,8 +53,8 @@ public class RepositoryCacheConfig {
/**
* Set cleanupDelayMillis to this value in order to switch off time-based
- * cache eviction. The JVM can still expire cache entries when heap memory
- * runs low.
+ * cache eviction. Expired cache entries will only be evicted when
+ * RepositoryCache.clearExpired or RepositoryCache.clear are called.
*/
public static final long NO_CLEANUP = 0;
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/JschSession.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/JschSession.java
index 1dfe5d9797..fa27bfce5f 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/JschSession.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/JschSession.java
@@ -48,15 +48,14 @@
package org.eclipse.jgit.transport;
+import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
import org.eclipse.jgit.errors.TransportException;
import org.eclipse.jgit.internal.JGitText;
-import org.eclipse.jgit.util.io.StreamCopyThread;
+import org.eclipse.jgit.util.io.IsolatedOutputStream;
import com.jcraft.jsch.Channel;
import com.jcraft.jsch.ChannelExec;
@@ -178,33 +177,12 @@ public class JschSession implements RemoteSession {
// that we spawn a background thread to shuttle data through a pipe,
// as we can issue an interrupted write out of that. Its slower, so
// we only use this route if there is a timeout.
- final OutputStream out = channel.getOutputStream();
+ OutputStream out = channel.getOutputStream();
if (timeout <= 0) {
outputStream = out;
} else {
- final PipedInputStream pipeIn = new PipedInputStream();
- final StreamCopyThread copier = new StreamCopyThread(pipeIn,
- out);
- final PipedOutputStream pipeOut = new PipedOutputStream(pipeIn) {
- @Override
- public void flush() throws IOException {
- super.flush();
- copier.flush();
- }
-
- @Override
- public void close() throws IOException {
- super.close();
- try {
- copier.join(timeout * 1000);
- } catch (InterruptedException e) {
- // Just wake early, the thread will terminate
- // anyway.
- }
- }
- };
- copier.start();
- outputStream = pipeOut;
+ IsolatedOutputStream i = new IsolatedOutputStream(out);
+ outputStream = new BufferedOutputStream(i, 16 * 1024);
}
errStream = channel.getErrStream();
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/util/io/IsolatedOutputStream.java b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/IsolatedOutputStream.java
new file mode 100644
index 0000000000..cdc4a4d863
--- /dev/null
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/IsolatedOutputStream.java
@@ -0,0 +1,241 @@
+/*
+ * Copyright (C) 2016, Google Inc.
+ * and other copyright owners as documented in the project's IP log.
+ *
+ * This program and the accompanying materials are made available
+ * under the terms of the Eclipse Distribution License v1.0 which
+ * accompanies this distribution, is reproduced below, and is
+ * available at http://www.eclipse.org/org/documents/edl-v10.php
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * - Neither the name of the Eclipse Foundation, Inc. nor the
+ * names of its contributors may be used to endorse or promote
+ * products derived from this software without specific prior
+ * written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
+ * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+ * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.eclipse.jgit.util.io;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.eclipse.jgit.internal.JGitText;
+
+/**
+ * OutputStream isolated from interrupts.
+ * <p>
+ * Wraps an OutputStream to prevent interrupts during writes from being made
+ * visible to that stream instance. This works around buggy or difficult
+ * OutputStream implementations like JSch that cannot gracefully handle an
+ * interrupt during write.
+ * <p>
+ * Every write (or flush) requires a context switch to another thread. Callers
+ * should wrap this stream with {@code BufferedOutputStream} using a suitable
+ * buffer size to amortize the cost of context switches.
+ */
+public class IsolatedOutputStream extends OutputStream {
+ private final OutputStream dst;
+ private final ExecutorService copier;
+ private Future<Void> pending;
+
+ /**
+ * Wraps an OutputStream.
+ *
+ * @param out
+ * stream to send all writes to.
+ */
+ public IsolatedOutputStream(OutputStream out) {
+ dst = out;
+ copier = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<Runnable>(1), new NamedThreadFactory());
+ }
+
+ @Override
+ public void write(int ch) throws IOException {
+ write(new byte[] { (byte) ch }, 0, 1);
+ }
+
+ @Override
+ public void write(final byte[] buf, final int pos, final int cnt)
+ throws IOException {
+ checkClosed();
+ execute(new Callable<Void>() {
+ @Override
+ public Void call() throws IOException {
+ dst.write(buf, pos, cnt);
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public void flush() throws IOException {
+ checkClosed();
+ execute(new Callable<Void>() {
+ @Override
+ public Void call() throws IOException {
+ dst.flush();
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!copier.isShutdown()) {
+ try {
+ if (pending == null || tryCleanClose()) {
+ cleanClose();
+ } else {
+ dirtyClose();
+ }
+ } finally {
+ copier.shutdown();
+ }
+ }
+ }
+
+ private boolean tryCleanClose() {
+ /*
+ * If the caller stopped waiting for a prior write or flush, they could
+ * be trying to close a stream that is still in-use. Check if the prior
+ * operation ended in a predictable way.
+ */
+ try {
+ pending.get(0, TimeUnit.MILLISECONDS);
+ pending = null;
+ return true;
+ } catch (TimeoutException | InterruptedException e) {
+ return false;
+ } catch (ExecutionException e) {
+ pending = null;
+ return true;
+ }
+ }
+
+ private void cleanClose() throws IOException {
+ execute(new Callable<Void>() {
+ @Override
+ public Void call() throws IOException {
+ dst.close();
+ return null;
+ }
+ });
+ }
+
+ private void dirtyClose() throws IOException {
+ /*
+ * Interrupt any still pending write or flush operation. This may cause
+ * massive failures inside of the stream, but its going to be closed as
+ * the next step.
+ */
+ pending.cancel(true);
+
+ Future<Void> close;
+ try {
+ close = copier.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws IOException {
+ dst.close();
+ return null;
+ }
+ });
+ } catch (RejectedExecutionException e) {
+ throw new IOException(e);
+ }
+ try {
+ close.get(200, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | TimeoutException e) {
+ close.cancel(true);
+ throw new IOException(e);
+ } catch (ExecutionException e) {
+ throw new IOException(e.getCause());
+ }
+ }
+
+ private void checkClosed() throws IOException {
+ if (copier.isShutdown()) {
+ throw new IOException(JGitText.get().closed);
+ }
+ }
+
+ private void execute(Callable<Void> task) throws IOException {
+ if (pending != null) {
+ // Check (and rethrow) any prior failed operation.
+ checkedGet(pending);
+ }
+ try {
+ pending = copier.submit(task);
+ } catch (RejectedExecutionException e) {
+ throw new IOException(e);
+ }
+ checkedGet(pending);
+ pending = null;
+ }
+
+ private static void checkedGet(Future<Void> future) throws IOException {
+ try {
+ future.get();
+ } catch (InterruptedException e) {
+ throw interrupted(e);
+ } catch (ExecutionException e) {
+ throw new IOException(e.getCause());
+ }
+ }
+
+ private static InterruptedIOException interrupted(InterruptedException c) {
+ InterruptedIOException e = new InterruptedIOException();
+ e.initCause(c);
+ return e;
+ }
+
+ private static class NamedThreadFactory implements ThreadFactory {
+ private static final AtomicInteger cnt = new AtomicInteger();
+
+ @Override
+ public Thread newThread(Runnable r) {
+ int n = cnt.incrementAndGet();
+ String name = IsolatedOutputStream.class.getSimpleName() + '-' + n;
+ return new Thread(r, name);
+ }
+ }
+}
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java
index ae760e3b8e..329a7a161f 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java
@@ -59,8 +59,6 @@ public class StreamCopyThread extends Thread {
private volatile boolean done;
- private final AtomicInteger flushCount = new AtomicInteger(0);
-
/** Lock held by flush to avoid interrupting a write. */
private final Object writeLock;
@@ -88,8 +86,8 @@ public class StreamCopyThread extends Thread {
* happen at some future point in time, when the thread wakes up to process
* the request.
*/
+ @Deprecated
public void flush() {
- flushCount.incrementAndGet();
synchronized (writeLock) {
interrupt();
}
@@ -119,7 +117,6 @@ public class StreamCopyThread extends Thread {
public void run() {
try {
final byte[] buf = new byte[BUFFER_SIZE];
- int flushCountBeforeRead = 0;
boolean readInterrupted = false;
for (;;) {
try {
@@ -132,18 +129,11 @@ public class StreamCopyThread extends Thread {
}
}
readInterrupted = false;
- if (!flushCount.compareAndSet(flushCountBeforeRead, 0)) {
- // There was a flush() call since last blocked read.
- // Set interrupt status, so next blocked read will throw
- // an InterruptedIOException and we will flush again.
- interrupt();
- }
}
if (done)
break;
- flushCountBeforeRead = flushCount.get();
final int n;
try {
n = src.read(buf);
@@ -156,19 +146,9 @@ public class StreamCopyThread extends Thread {
synchronized (writeLock) {
boolean writeInterrupted = Thread.interrupted();
- for (;;) {
- try {
- dst.write(buf, 0, n);
- } catch (InterruptedIOException wakey) {
- writeInterrupted = true;
- continue;
- }
-
- // set interrupt status, which will be checked
- // when we block in src.read
- if (writeInterrupted || flushCount.get() > 0)
- interrupt();
- break;
+ dst.write(buf, 0, n);
+ if (writeInterrupted) {
+ interrupt();
}
}
} catch (IOException e) {