package org.eclipse.jgit.internal.storage.dfs;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.ReentrantLock;
cache = nc;
if (oc != null) {
- if (oc.readAheadService != null)
- oc.readAheadService.shutdown();
for (DfsPackFile pack : oc.getPackFiles())
pack.key.cachedSize.set(0);
}
/** As {@link #blockSize} is a power of 2, bits to shift for a / blockSize. */
private final int blockSizeShift;
- /** Number of bytes to read-ahead from current read position. */
- private final int readAheadLimit;
-
- /** Thread pool to handle optimistic read-ahead. */
- private final ThreadPoolExecutor readAheadService;
-
/** Cache of pack files, indexed by description. */
private final Map<DfsPackDescription, DfsPackFile> packCache;
clockHand = new Ref<Object>(new DfsPackKey(), -1, 0, null);
clockHand.next = clockHand;
- readAheadLimit = cfg.getReadAheadLimit();
- readAheadService = cfg.getReadAheadService();
-
packCache = new ConcurrentHashMap<DfsPackDescription, DfsPackFile>(
16, 0.75f, 1);
packFiles = Collections.unmodifiableCollection(packCache.values());
return val;
}
- boolean readAhead(ReadableChannel rc, DfsPackKey key, int size, long pos,
- long len, DfsReader ctx) {
- if (!ctx.wantReadAhead() || readAheadLimit <= 0 || readAheadService == null)
- return false;
-
- int cap = readAheadLimit / size;
- long readAheadEnd = pos + readAheadLimit;
- List<ReadAheadTask.BlockFuture> blocks = new ArrayList<ReadAheadTask.BlockFuture>(cap);
- while (pos < readAheadEnd && pos < len) {
- long end = Math.min(pos + size, len);
- if (!contains(key, pos))
- blocks.add(new ReadAheadTask.BlockFuture(key, pos, end));
- pos = end;
- }
- if (blocks.isEmpty())
- return false;
-
- ReadAheadTask task = new ReadAheadTask(this, rc, blocks);
- ReadAheadTask.TaskFuture t = new ReadAheadTask.TaskFuture(task);
- for (ReadAheadTask.BlockFuture b : blocks)
- b.setTask(t);
- readAheadService.execute(t);
- ctx.startedReadAhead(blocks);
- return true;
- }
-
private <T> T scan(HashEntry n, DfsPackKey pack, long position) {
Ref<T> r = scanRef(n, pack, position);
return r != null ? r.get() : null;
import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_DFS_SECTION;
import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_BLOCK_LIMIT;
import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_BLOCK_SIZE;
-import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_READ_AHEAD_LIMIT;
-import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_READ_AHEAD_THREADS;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jgit.lib.Config;
private int blockSize;
- private int readAheadLimit;
-
- private ThreadPoolExecutor readAheadService;
-
/** Create a default configuration. */
public DfsBlockCacheConfig() {
setBlockLimit(32 * MB);
return this;
}
- /** @return number of bytes to read ahead sequentially by. */
- public int getReadAheadLimit() {
- return readAheadLimit;
- }
-
- /**
- * @param newSize
- * new read-ahead limit, in bytes.
- * @return {@code this}
- */
- public DfsBlockCacheConfig setReadAheadLimit(final int newSize) {
- readAheadLimit = Math.max(0, newSize);
- return this;
- }
-
- /** @return service to perform read-ahead of sequential blocks. */
- public ThreadPoolExecutor getReadAheadService() {
- return readAheadService;
- }
-
- /**
- * @param svc
- * service to perform read-ahead of sequential blocks with. If
- * not null the {@link RejectedExecutionHandler} must be managed
- * by the JGit DFS library and not the application.
- * @return {@code this}.
- */
- public DfsBlockCacheConfig setReadAheadService(ThreadPoolExecutor svc) {
- if (svc != null)
- svc.setRejectedExecutionHandler(ReadAheadRejectedExecutionHandler.INSTANCE);
- readAheadService = svc;
- return this;
- }
-
/**
* Update properties by setting fields from the configuration.
* <p>
CONFIG_DFS_SECTION,
CONFIG_KEY_BLOCK_SIZE,
getBlockSize()));
-
- setReadAheadLimit(rc.getInt(
- CONFIG_CORE_SECTION,
- CONFIG_DFS_SECTION,
- CONFIG_KEY_READ_AHEAD_LIMIT,
- getReadAheadLimit()));
-
- int readAheadThreads = rc.getInt(
- CONFIG_CORE_SECTION,
- CONFIG_DFS_SECTION,
- CONFIG_KEY_READ_AHEAD_THREADS,
- 0);
-
- if (0 < getReadAheadLimit() && 0 < readAheadThreads) {
- setReadAheadService(new ThreadPoolExecutor(
- 1, // Minimum number of threads kept alive.
- readAheadThreads, // Maximum threads active.
- 60, TimeUnit.SECONDS, // Idle threads wait this long before ending.
- new ArrayBlockingQueue<Runnable>(1), // Do not queue deeply.
- new ThreadFactory() {
- private final String name = "JGit-DFS-ReadAhead"; //$NON-NLS-1$
- private final AtomicInteger cnt = new AtomicInteger();
- private final ThreadGroup group = new ThreadGroup(name);
-
- public Thread newThread(Runnable body) {
- int id = cnt.incrementAndGet();
- Thread thread = new Thread(group, body, name + "-" + id); //$NON-NLS-1$
- thread.setDaemon(true);
- thread.setContextClassLoader(getClass().getClassLoader());
- return thread;
- }
- }, ReadAheadRejectedExecutionHandler.INSTANCE));
- }
return this;
}
}
if (invalid)
throw new PackInvalidException(getPackName());
- boolean close = true;
ReadableChannel rc = ctx.db.openFile(packDesc, PACK);
try {
// If the block alignment is not yet known, discover it. Prefer the
}
DfsBlock v = new DfsBlock(key, pos, buf);
- if (v.end < len)
- close = !cache.readAhead(rc, key, size, v.end, len, ctx);
return v;
} finally {
- if (close)
- rc.close();
+ rc.close();
}
}
import static org.eclipse.jgit.lib.Constants.OBJ_TREE;
import java.io.IOException;
-import java.io.InterruptedIOException;
import java.security.MessageDigest;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
import org.eclipse.jgit.lib.ObjectLoader;
import org.eclipse.jgit.lib.ObjectReader;
import org.eclipse.jgit.lib.ProgressMonitor;
-import org.eclipse.jgit.revwalk.ObjectWalk;
-import org.eclipse.jgit.revwalk.RevCommit;
-import org.eclipse.jgit.revwalk.RevWalk;
import org.eclipse.jgit.util.BlockList;
/**
private DfsPackFile last;
- private boolean wantReadAhead;
-
private boolean avoidUnreachable;
- private List<ReadAheadTask.BlockFuture> pendingReadAhead;
-
DfsReader(DfsObjDatabase db) {
this.db = db;
}
@Override
public <T extends ObjectId> AsyncObjectLoaderQueue<T> open(
Iterable<T> objectIds, final boolean reportMissing) {
- wantReadAhead = true;
-
Iterable<FoundObject<T>> order;
IOException error = null;
try {
} else if (findAllError != null) {
throw findAllError;
} else {
- cancelReadAhead();
return false;
}
}
}
public boolean cancel(boolean mayInterruptIfRunning) {
- cancelReadAhead();
return true;
}
public void release() {
- cancelReadAhead();
+ // Nothing to clean up.
}
};
}
@Override
public <T extends ObjectId> AsyncObjectSizeQueue<T> getObjectSize(
Iterable<T> objectIds, final boolean reportMissing) {
- wantReadAhead = true;
-
Iterable<FoundObject<T>> order;
IOException error = null;
try {
} else if (findAllError != null) {
throw findAllError;
} else {
- cancelReadAhead();
return false;
}
}
}
public boolean cancel(boolean mayInterruptIfRunning) {
- cancelReadAhead();
return true;
}
public void release() {
- cancelReadAhead();
+ // Nothing to clean up.
}
};
}
- @Override
- public void walkAdviceBeginCommits(RevWalk walk, Collection<RevCommit> roots) {
- wantReadAhead = true;
- }
-
- @Override
- public void walkAdviceBeginTrees(ObjectWalk ow, RevCommit min, RevCommit max) {
- wantReadAhead = true;
- }
-
- @Override
- public void walkAdviceEnd() {
- cancelReadAhead();
- }
-
@Override
public long getObjectSize(AnyObjectId objectId, int typeHint)
throws MissingObjectException, IncorrectObjectTypeException,
if (tmp.isEmpty())
continue;
Collections.sort(tmp, OFFSET_SORT);
- try {
- wantReadAhead = true;
- PackReverseIndex rev = pack.getReverseIdx(this);
- DfsObjectRepresentation rep = new DfsObjectRepresentation(
- pack,
- packIndex);
- for (DfsObjectToPack otp : tmp) {
- pack.representation(rep, otp.getOffset(), this, rev);
- otp.setOffset(0);
- packer.select(otp, rep);
- if (!otp.isFound()) {
- otp.setFound();
- monitor.update(1);
- }
+ PackReverseIndex rev = pack.getReverseIdx(this);
+ DfsObjectRepresentation rep = new DfsObjectRepresentation(
+ pack,
+ packIndex);
+ for (DfsObjectToPack otp : tmp) {
+ pack.representation(rep, otp.getOffset(), this, rev);
+ otp.setOffset(0);
+ packer.select(otp, rep);
+ if (!otp.isFound()) {
+ otp.setFound();
+ monitor.update(1);
}
- } finally {
- cancelReadAhead();
}
}
}
case OBJ_BLOB:
Collections.sort(list, WRITE_SORT);
}
-
- try {
- wantReadAhead = true;
- for (ObjectToPack otp : list)
- out.writeObject(otp);
- } finally {
- cancelReadAhead();
- }
+ for (ObjectToPack otp : list)
+ out.writeObject(otp);
}
public void copyPackAsIs(PackOutputStream out, CachedPack pack,
boolean validate) throws IOException {
- try {
- wantReadAhead = true;
- ((DfsCachedPack) pack).copyAsIs(out, validate, this);
- } finally {
- cancelReadAhead();
- }
+ ((DfsCachedPack) pack).copyAsIs(out, validate, this);
}
/**
// be cleaned up by the GC during the get for the next window.
// So we always clear it, even though we are just going to set
// it again.
- //
block = null;
-
- if (pendingReadAhead != null)
- waitForBlock(pack.key, position);
block = pack.getOrLoadBlock(position, this);
}
}
- boolean wantReadAhead() {
- return wantReadAhead;
- }
-
- void startedReadAhead(List<ReadAheadTask.BlockFuture> blocks) {
- if (pendingReadAhead == null)
- pendingReadAhead = new LinkedList<ReadAheadTask.BlockFuture>();
- pendingReadAhead.addAll(blocks);
- }
-
- private void cancelReadAhead() {
- if (pendingReadAhead != null) {
- for (ReadAheadTask.BlockFuture f : pendingReadAhead)
- f.cancel(true);
- pendingReadAhead = null;
- }
- wantReadAhead = false;
- }
-
- private void waitForBlock(DfsPackKey key, long position)
- throws InterruptedIOException {
- Iterator<ReadAheadTask.BlockFuture> itr = pendingReadAhead.iterator();
- while (itr.hasNext()) {
- ReadAheadTask.BlockFuture f = itr.next();
- if (f.contains(key, position)) {
- try {
- f.get();
- } catch (InterruptedException e) {
- throw new InterruptedIOException();
- } catch (ExecutionException e) {
- // Exceptions should never be thrown by get(). Ignore
- // this and let the normal load paths identify any error.
- }
- itr.remove();
- if (pendingReadAhead.isEmpty())
- pendingReadAhead = null;
- break;
- }
- }
- }
-
/** Release the current window cursor. */
@Override
public void release() {
- cancelReadAhead();
last = null;
block = null;
baseCache = null;
+++ /dev/null
-/*
- * Copyright (C) 2011, Google Inc.
- * and other copyright owners as documented in the project's IP log.
- *
- * This program and the accompanying materials are made available
- * under the terms of the Eclipse Distribution License v1.0 which
- * accompanies this distribution, is reproduced below, and is
- * available at http://www.eclipse.org/org/documents/edl-v10.php
- *
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * - Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- *
- * - Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials provided
- * with the distribution.
- *
- * - Neither the name of the Eclipse Foundation, Inc. nor the
- * names of its contributors may be used to endorse or promote
- * products derived from this software without specific prior
- * written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
- * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
- * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
- * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
- * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
- * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
- * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
- * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
- * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-package org.eclipse.jgit.internal.storage.dfs;
-
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadPoolExecutor;
-
-/** This handler aborts a {@link ReadAheadTask} when the queue is full. */
-final class ReadAheadRejectedExecutionHandler implements
- RejectedExecutionHandler {
- static final ReadAheadRejectedExecutionHandler INSTANCE = new ReadAheadRejectedExecutionHandler();
-
- private ReadAheadRejectedExecutionHandler() {
- // Singleton, do not create more instances.
- }
-
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- ((ReadAheadTask.TaskFuture) r).task.abort();
- }
-}
+++ /dev/null
-/*
- * Copyright (C) 2011, Google Inc.
- * and other copyright owners as documented in the project's IP log.
- *
- * This program and the accompanying materials are made available
- * under the terms of the Eclipse Distribution License v1.0 which
- * accompanies this distribution, is reproduced below, and is
- * available at http://www.eclipse.org/org/documents/edl-v10.php
- *
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * - Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- *
- * - Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials provided
- * with the distribution.
- *
- * - Neither the name of the Eclipse Foundation, Inc. nor the
- * names of its contributors may be used to endorse or promote
- * products derived from this software without specific prior
- * written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
- * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
- * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
- * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
- * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
- * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
- * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
- * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
- * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-package org.eclipse.jgit.internal.storage.dfs;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.eclipse.jgit.util.IO;
-
-final class ReadAheadTask implements Callable<Void> {
- private final DfsBlockCache cache;
-
- private final ReadableChannel channel;
-
- private final List<BlockFuture> futures;
-
- private boolean running;
-
- ReadAheadTask(DfsBlockCache cache, ReadableChannel channel,
- List<BlockFuture> futures) {
- this.cache = cache;
- this.channel = channel;
- this.futures = futures;
- }
-
- public Void call() {
- int idx = 0;
- try {
- synchronized (this) {
- if (channel.isOpen())
- running = true;
- else
- return null;
- }
-
- long position = channel.position();
- for (; idx < futures.size() && !Thread.interrupted(); idx++) {
- BlockFuture f = futures.get(idx);
- if (cache.contains(f.pack, f.start)) {
- f.done();
- continue;
- }
-
- if (position != f.start)
- channel.position(f.start);
-
- int size = (int) (f.end - f.start);
- byte[] buf = new byte[size];
- if (IO.read(channel, buf, 0, size) != size)
- throw new EOFException();
-
- cache.put(new DfsBlock(f.pack, f.start, buf));
- f.done();
- position = f.end;
- }
- } catch (IOException err) {
- // Ignore read-ahead errors. These will be caught later on.
- } finally {
- for (; idx < futures.size(); idx++)
- futures.get(idx).abort();
- close();
- }
- return null;
- }
-
- void abort() {
- for (BlockFuture f : futures)
- f.abort();
-
- synchronized (this) {
- if (!running)
- close();
- }
- }
-
- private synchronized void close() {
- try {
- if (channel.isOpen())
- channel.close();
- } catch (IOException err) {
- // Ignore close errors on a read-only channel.
- }
- }
-
- static final class TaskFuture extends java.util.concurrent.FutureTask<Void> {
- final ReadAheadTask task;
-
- TaskFuture(ReadAheadTask task) {
- super(task);
- this.task = task;
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- if (super.cancel(mayInterruptIfRunning)) {
- task.abort();
- return true;
- }
- return false;
- }
- }
-
- /** A scheduled read-ahead block load. */
- static final class BlockFuture implements Future<Void> {
- private static enum State {
- PENDING, DONE, CANCELLED;
- }
-
- private volatile State state;
-
- private volatile Future<?> task;
-
- private final CountDownLatch latch;
-
- final DfsPackKey pack;
-
- final long start;
-
- final long end;
-
- BlockFuture(DfsPackKey key, long start, long end) {
- this.state = State.PENDING;
- this.latch = new CountDownLatch(1);
- this.pack = key;
- this.start = start;
- this.end = end;
- }
-
- synchronized void setTask(Future<?> task) {
- if (state == State.PENDING)
- this.task = task;
- }
-
- boolean contains(DfsPackKey want, long pos) {
- return pack == want && start <= pos && pos < end;
- }
-
- synchronized void done() {
- if (state == State.PENDING) {
- latch.countDown();
- state = State.DONE;
- task = null;
- }
- }
-
- synchronized void abort() {
- if (state == State.PENDING) {
- latch.countDown();
- state = State.CANCELLED;
- task = null;
- }
- }
-
- public boolean cancel(boolean mayInterruptIfRunning) {
- Future<?> t = task;
- if (t == null)
- return false;
-
- boolean r = t.cancel(mayInterruptIfRunning);
- abort();
- return r;
- }
-
- public Void get() throws InterruptedException, ExecutionException {
- latch.await();
- return null;
- }
-
- public Void get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException,
- TimeoutException {
- if (latch.await(timeout, unit))
- return null;
- else
- throw new TimeoutException();
- }
-
- public boolean isCancelled() {
- State s = state;
- if (s == State.DONE)
- return false;
- if (s == State.CANCELLED)
- return true;
-
- Future<?> t = task;
- return t != null ? t.isCancelled() : true;
- }
-
- public boolean isDone() {
- return state == State.DONE;
- }
- }
-}
/** The "blockSize" key */
public static final String CONFIG_KEY_BLOCK_SIZE = "blockSize";
- /** The "readAheadLimit" key */
- public static final String CONFIG_KEY_READ_AHEAD_LIMIT = "readAheadLimit";
-
- /** The "readAheadThreads" key */
- public static final String CONFIG_KEY_READ_AHEAD_THREADS = "readAheadThreads";
-
/** The "deltaBaseCacheLimit" key */
public static final String CONFIG_KEY_DELTA_BASE_CACHE_LIMIT = "deltaBaseCacheLimit";