/** Type hint indicating the caller doesn't know the type. */
protected static final int OBJ_ANY = -1;
+ /**
+ * Construct a new reader from the same data.
+ * <p>
+ * Applications can use this method to build a new reader from the same data
+ * source, but for an different thread.
+ *
+ * @return a brand new reader, using the same data source.
+ */
+ public abstract ObjectReader newReader();
+
/**
* Does the requested object exist in this database?
*
--- /dev/null
+/*
+ * Copyright (C) 2010, Google Inc.
+ * and other copyright owners as documented in the project's IP log.
+ *
+ * This program and the accompanying materials are made available
+ * under the terms of the Eclipse Distribution License v1.0 which
+ * accompanies this distribution, is reproduced below, and is
+ * available at http://www.eclipse.org/org/documents/edl-v10.php
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * - Neither the name of the Eclipse Foundation, Inc. nor the
+ * names of its contributors may be used to endorse or promote
+ * products derived from this software without specific prior
+ * written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
+ * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+ * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.eclipse.jgit.lib;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Wrapper around the general {@link ProgressMonitor} to make it thread safe.
+ */
+public class ThreadSafeProgressMonitor implements ProgressMonitor {
+ private final ProgressMonitor pm;
+
+ private final ReentrantLock lock;
+
+ /**
+ * Wrap a ProgressMonitor to be thread safe.
+ *
+ * @param pm
+ * the underlying monitor to receive events.
+ */
+ public ThreadSafeProgressMonitor(ProgressMonitor pm) {
+ this.pm = pm;
+ this.lock = new ReentrantLock();
+ }
+
+ public void start(int totalTasks) {
+ lock.lock();
+ try {
+ pm.start(totalTasks);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void beginTask(String title, int totalWork) {
+ lock.lock();
+ try {
+ pm.beginTask(title, totalWork);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void update(int completed) {
+ lock.lock();
+ try {
+ pm.update(completed);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean isCancelled() {
+ lock.lock();
+ try {
+ return pm.isCancelled();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void endTask() {
+ lock.lock();
+ try {
+ pm.endTask();
+ } finally {
+ lock.unlock();
+ }
+ }
+}
this.db = db;
}
+ @Override
+ public ObjectReader newReader() {
+ return new WindowCursor(db);
+ }
+
public boolean has(AnyObjectId objectId) throws IOException {
return db.has(objectId);
}
// The caller may have had to allocate more space than is
// required. If we are about to waste anything, shrink it.
//
- if (data.length != actLen) {
- byte[] nbuf = new byte[actLen];
- System.arraycopy(data, 0, nbuf, 0, actLen);
- data = nbuf;
- }
+ data = resize(data, actLen);
// When we reserved space for this item we did it for the
// inflated size of the delta, but we were just given the
return new Ref(data, queue);
}
+ byte[] resize(byte[] data, int actLen) {
+ if (data.length != actLen) {
+ byte[] nbuf = new byte[actLen];
+ System.arraycopy(data, 0, nbuf, 0, actLen);
+ data = nbuf;
+ }
+ return data;
+ }
+
private void checkForGarbageCollectedObjects() {
Ref r;
while ((r = (Ref) queue.poll()) != null)
final long bigFileThreshold;
+ final int threads;
+
private PackConfig(Config rc) {
deltaWindow = rc.getInt("pack", "window", PackWriter.DEFAULT_DELTA_SEARCH_WINDOW_SIZE);
deltaWindowMemory = rc.getLong("pack", null, "windowmemory", 0);
compression = compression(rc);
indexVersion = rc.getInt("pack", "indexversion", 2);
bigFileThreshold = rc.getLong("core", null, "bigfilethreshold", PackWriter.DEFAULT_BIG_FILE_THRESHOLD);
+ threads = rc.getInt("pack", "threads", 0);
}
private static int compression(Config rc) {
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import org.eclipse.jgit.lib.ObjectReader;
import org.eclipse.jgit.lib.ProgressMonitor;
import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.lib.ThreadSafeProgressMonitor;
import org.eclipse.jgit.revwalk.ObjectWalk;
import org.eclipse.jgit.revwalk.RevFlag;
import org.eclipse.jgit.revwalk.RevObject;
private long bigFileThreshold = DEFAULT_BIG_FILE_THRESHOLD;
+ private int threads = 1;
+
private boolean thin;
private boolean ignoreMissingUninteresting = true;
compressionLevel = pc.compression;
indexVersion = pc.indexVersion;
bigFileThreshold = pc.bigFileThreshold;
+ threads = pc.threads;
}
private static Config configOf(final Repository repo) {
/**
* Get the number of objects to try when looking for a delta base.
+ * <p>
+ * This limit is per thread, if 4 threads are used the actual memory
+ * used will be 4 times this value.
*
* @return the object count to be searched.
*/
/**
* Get the size of the in-memory delta cache.
+ * <p>
+ * This limit is for the entire writer, even if multiple threads are used.
*
* @return maximum number of bytes worth of delta data to cache in memory.
* If 0 the cache is infinite in size (up to the JVM heap limit
compressionLevel = level;
}
+ /** @return number of threads used for delta compression. */
+ public int getThreads() {
+ return threads;
+ }
+
+ /**
+ * Set the number of threads to use for delta compression.
+ * <p>
+ * During delta compression, if there are enough objects to be considered
+ * the writer will start up concurrent threads and allow them to compress
+ * different sections of the repository concurrently.
+ *
+ * @param threads
+ * number of threads to use. If <= 0 the number of available
+ * processors for this JVM is used.
+ */
+ public void setThread(int threads) {
+ this.threads = threads;
+ }
+
/** @return true if this writer is producing a thin pack. */
public boolean isThin() {
return thin;
return true;
}
- private void searchForDeltas(ProgressMonitor monitor,
- ObjectToPack[] list, int cnt) throws MissingObjectException,
- IncorrectObjectTypeException, LargeObjectException, IOException {
- DeltaCache dc = new DeltaCache(this);
- DeltaWindow dw = new DeltaWindow(this, dc, reader);
- dw.search(monitor, list, 0, cnt);
+ private void searchForDeltas(final ProgressMonitor monitor,
+ final ObjectToPack[] list, final int cnt)
+ throws MissingObjectException, IncorrectObjectTypeException,
+ LargeObjectException, IOException {
+ if (threads == 0)
+ threads = Runtime.getRuntime().availableProcessors();
+
+ if (threads <= 1 || cnt <= 2 * getDeltaSearchWindowSize()) {
+ DeltaCache dc = new DeltaCache(this);
+ DeltaWindow dw = new DeltaWindow(this, dc, reader);
+ dw.search(monitor, list, 0, cnt);
+ return;
+ }
+
+ final List<Throwable> errors = Collections
+ .synchronizedList(new ArrayList<Throwable>());
+ final DeltaCache dc = new ThreadSafeDeltaCache(this);
+ final ProgressMonitor pm = new ThreadSafeProgressMonitor(monitor);
+ final ExecutorService pool = Executors.newFixedThreadPool(threads);
+
+ // Guess at the size of batch we want. Because we don't really
+ // have a way for a thread to steal work from another thread if
+ // it ends early, we over partition slightly so the work units
+ // are a bit smaller.
+ //
+ int estSize = cnt / (threads * 2);
+ if (estSize < 2 * getDeltaSearchWindowSize())
+ estSize = 2 * getDeltaSearchWindowSize();
+
+ for (int i = 0; i < cnt;) {
+ final int start = i;
+ final int batchSize;
+
+ if (cnt - i < estSize) {
+ // If we don't have enough to fill the remaining block,
+ // schedule what is left over as a single block.
+ //
+ batchSize = cnt - i;
+ } else {
+ // Try to split the block at the end of a path.
+ //
+ int end = start + estSize;
+ while (end < cnt) {
+ ObjectToPack a = list[end - 1];
+ ObjectToPack b = list[end];
+ if (a.getPathHash() == b.getPathHash())
+ end++;
+ else
+ break;
+ }
+ batchSize = end - start;
+ }
+ i += batchSize;
+
+ pool.submit(new Runnable() {
+ public void run() {
+ try {
+ final ObjectReader or = reader.newReader();
+ try {
+ DeltaWindow dw;
+ dw = new DeltaWindow(PackWriter.this, dc, or);
+ dw.search(pm, list, start, batchSize);
+ } finally {
+ or.release();
+ }
+ } catch (Throwable err) {
+ errors.add(err);
+ }
+ }
+ });
+ }
+
+ // Tell the pool to stop.
+ //
+ pool.shutdown();
+ for (;;) {
+ try {
+ if (pool.awaitTermination(60, TimeUnit.SECONDS))
+ break;
+ } catch (InterruptedException e) {
+ throw new IOException(
+ JGitText.get().packingCancelledDuringObjectsWriting);
+ }
+ }
+
+ // If any thread threw an error, try to report it back as
+ // though we weren't using a threaded search algorithm.
+ //
+ if (!errors.isEmpty()) {
+ Throwable err = errors.get(0);
+ if (err instanceof Error)
+ throw (Error) err;
+ if (err instanceof RuntimeException)
+ throw (RuntimeException) err;
+ if (err instanceof IOException)
+ throw (IOException) err;
+
+ IOException fail = new IOException(err.getMessage());
+ fail.initCause(err);
+ throw fail;
+ }
}
private void writeObjects(ProgressMonitor writeMonitor, PackOutputStream out)
--- /dev/null
+/*
+ * Copyright (C) 2010, Google Inc.
+ * and other copyright owners as documented in the project's IP log.
+ *
+ * This program and the accompanying materials are made available
+ * under the terms of the Eclipse Distribution License v1.0 which
+ * accompanies this distribution, is reproduced below, and is
+ * available at http://www.eclipse.org/org/documents/edl-v10.php
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * - Neither the name of the Eclipse Foundation, Inc. nor the
+ * names of its contributors may be used to endorse or promote
+ * products derived from this software without specific prior
+ * written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
+ * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+ * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.eclipse.jgit.storage.pack;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+class ThreadSafeDeltaCache extends DeltaCache {
+ private final ReentrantLock lock;
+
+ ThreadSafeDeltaCache(PackWriter pw) {
+ super(pw);
+ lock = new ReentrantLock();
+ }
+
+ @Override
+ boolean canCache(int length, ObjectToPack src, ObjectToPack res) {
+ lock.lock();
+ try {
+ return super.canCache(length, src, res);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ void credit(int reservedSize) {
+ lock.lock();
+ try {
+ super.credit(reservedSize);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ Ref cache(byte[] data, int actLen, int reservedSize) {
+ data = resize(data, actLen);
+ lock.lock();
+ try {
+ return super.cache(data, actLen, reservedSize);
+ } finally {
+ lock.unlock();
+ }
+ }
+}