You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

BatchObjectLookup.java 7.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. /*
  2. * Copyright (C) 2011, Google Inc.
  3. * and other copyright owners as documented in the project's IP log.
  4. *
  5. * This program and the accompanying materials are made available
  6. * under the terms of the Eclipse Distribution License v1.0 which
  7. * accompanies this distribution, is reproduced below, and is
  8. * available at http://www.eclipse.org/org/documents/edl-v10.php
  9. *
  10. * All rights reserved.
  11. *
  12. * Redistribution and use in source and binary forms, with or
  13. * without modification, are permitted provided that the following
  14. * conditions are met:
  15. *
  16. * - Redistributions of source code must retain the above copyright
  17. * notice, this list of conditions and the following disclaimer.
  18. *
  19. * - Redistributions in binary form must reproduce the above
  20. * copyright notice, this list of conditions and the following
  21. * disclaimer in the documentation and/or other materials provided
  22. * with the distribution.
  23. *
  24. * - Neither the name of the Eclipse Foundation, Inc. nor the
  25. * names of its contributors may be used to endorse or promote
  26. * products derived from this software without specific prior
  27. * written permission.
  28. *
  29. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
  30. * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
  31. * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
  32. * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  33. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
  34. * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  35. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
  36. * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  37. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
  38. * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
  39. * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  40. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
  41. * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  42. */
  43. package org.eclipse.jgit.storage.dht;
  44. import static java.util.concurrent.TimeUnit.MILLISECONDS;
  45. import java.io.IOException;
  46. import java.util.ArrayList;
  47. import java.util.Collection;
  48. import java.util.Collections;
  49. import java.util.HashMap;
  50. import java.util.Iterator;
  51. import java.util.List;
  52. import java.util.Map;
  53. import java.util.concurrent.Semaphore;
  54. import java.util.concurrent.atomic.AtomicReference;
  55. import java.util.concurrent.locks.ReentrantLock;
  56. import org.eclipse.jgit.lib.NullProgressMonitor;
  57. import org.eclipse.jgit.lib.ObjectId;
  58. import org.eclipse.jgit.lib.ProgressMonitor;
  59. import org.eclipse.jgit.lib.ThreadSafeProgressMonitor;
  60. import org.eclipse.jgit.storage.dht.spi.Context;
  61. import org.eclipse.jgit.storage.dht.spi.Database;
  62. abstract class BatchObjectLookup<T extends ObjectId> {
  63. private final RepositoryKey repo;
  64. private final Database db;
  65. private final DhtReader reader;
  66. private final ThreadSafeProgressMonitor progress;
  67. private final Semaphore batches;
  68. private final ReentrantLock resultLock;
  69. private final AtomicReference<DhtException> error;
  70. private final int concurrentBatches;
  71. private final List<T> retry;
  72. private final ArrayList<ObjectInfo> tmp;
  73. private boolean retryMissingObjects;
  74. private boolean cacheLoadedInfo;
  75. BatchObjectLookup(DhtReader reader) {
  76. this(reader, null);
  77. }
  78. BatchObjectLookup(DhtReader reader, ProgressMonitor monitor) {
  79. this.repo = reader.getRepositoryKey();
  80. this.db = reader.getDatabase();
  81. this.reader = reader;
  82. if (monitor != null && monitor != NullProgressMonitor.INSTANCE)
  83. this.progress = new ThreadSafeProgressMonitor(monitor);
  84. else
  85. this.progress = null;
  86. this.concurrentBatches = reader.getOptions()
  87. .getObjectIndexConcurrentBatches();
  88. this.batches = new Semaphore(concurrentBatches);
  89. this.resultLock = new ReentrantLock();
  90. this.error = new AtomicReference<DhtException>();
  91. this.retry = new ArrayList<T>();
  92. this.tmp = new ArrayList<ObjectInfo>(4);
  93. }
  94. void setRetryMissingObjects(boolean on) {
  95. retryMissingObjects = on;
  96. }
  97. void setCacheLoadedInfo(boolean on) {
  98. cacheLoadedInfo = on;
  99. }
  100. void select(Iterable<T> objects) throws IOException {
  101. selectInBatches(Context.FAST_MISSING_OK, lookInCache(objects));
  102. // Not all of the selection ran with fast options.
  103. if (retryMissingObjects && !retry.isEmpty()) {
  104. batches.release(concurrentBatches);
  105. selectInBatches(Context.READ_REPAIR, retry);
  106. }
  107. if (progress != null)
  108. progress.pollForUpdates();
  109. }
  110. private Iterable<T> lookInCache(Iterable<T> objects) {
  111. RecentInfoCache infoCache = reader.getRecentInfoCache();
  112. List<T> missing = null;
  113. for (T obj : objects) {
  114. List<ObjectInfo> info = infoCache.get(obj);
  115. if (info != null) {
  116. onResult(obj, info);
  117. if (progress != null)
  118. progress.update(1);
  119. } else {
  120. if (missing == null) {
  121. if (objects instanceof List<?>)
  122. missing = new ArrayList<T>(((List<?>) objects).size());
  123. else
  124. missing = new ArrayList<T>();
  125. }
  126. missing.add(obj);
  127. }
  128. }
  129. if (missing != null)
  130. return missing;
  131. return Collections.emptyList();
  132. }
  133. private void selectInBatches(Context options, Iterable<T> objects)
  134. throws DhtException {
  135. final int batchSize = reader.getOptions()
  136. .getObjectIndexBatchSize();
  137. Map<ObjectIndexKey, T> batch = new HashMap<ObjectIndexKey, T>();
  138. Iterator<T> otpItr = objects.iterator();
  139. while (otpItr.hasNext()) {
  140. T otp = otpItr.next();
  141. batch.put(ObjectIndexKey.create(repo, otp), otp);
  142. if (batch.size() < batchSize && otpItr.hasNext())
  143. continue;
  144. if (error.get() != null)
  145. break;
  146. try {
  147. if (progress != null) {
  148. while (!batches.tryAcquire(500, MILLISECONDS))
  149. progress.pollForUpdates();
  150. progress.pollForUpdates();
  151. } else {
  152. batches.acquire();
  153. }
  154. } catch (InterruptedException err) {
  155. error.compareAndSet(null, new DhtTimeoutException(err));
  156. break;
  157. }
  158. startQuery(options, batch);
  159. batch = new HashMap<ObjectIndexKey, T>();
  160. }
  161. try {
  162. if (progress != null) {
  163. while (!batches.tryAcquire(concurrentBatches, 500, MILLISECONDS))
  164. progress.pollForUpdates();
  165. progress.pollForUpdates();
  166. } else {
  167. batches.acquire(concurrentBatches);
  168. }
  169. } catch (InterruptedException err) {
  170. error.compareAndSet(null, new DhtTimeoutException(err));
  171. }
  172. if (error.get() != null)
  173. throw error.get();
  174. // Make sure retry changes are visible to us.
  175. resultLock.lock();
  176. resultLock.unlock();
  177. }
  178. private void startQuery(final Context context,
  179. final Map<ObjectIndexKey, T> batch) {
  180. final AsyncCallback<Map<ObjectIndexKey, Collection<ObjectInfo>>> cb;
  181. cb = new AsyncCallback<Map<ObjectIndexKey, Collection<ObjectInfo>>>() {
  182. public void onSuccess(Map<ObjectIndexKey, Collection<ObjectInfo>> r) {
  183. resultLock.lock();
  184. try {
  185. processResults(context, batch, r);
  186. } finally {
  187. resultLock.unlock();
  188. batches.release();
  189. }
  190. }
  191. public void onFailure(DhtException e) {
  192. error.compareAndSet(null, e);
  193. batches.release();
  194. }
  195. };
  196. db.objectIndex().get(context, batch.keySet(), cb);
  197. }
  198. private void processResults(Context context, Map<ObjectIndexKey, T> batch,
  199. Map<ObjectIndexKey, Collection<ObjectInfo>> objects) {
  200. for (T obj : batch.values()) {
  201. Collection<ObjectInfo> matches = objects.get(obj);
  202. if (matches == null || matches.isEmpty()) {
  203. if (retryMissingObjects && context == Context.FAST_MISSING_OK)
  204. retry.add(obj);
  205. continue;
  206. }
  207. tmp.clear();
  208. tmp.addAll(matches);
  209. ObjectInfo.sort(tmp);
  210. if (cacheLoadedInfo)
  211. reader.getRecentInfoCache().put(obj, tmp);
  212. onResult(obj, tmp);
  213. }
  214. if (progress != null)
  215. progress.update(objects.size());
  216. }
  217. protected abstract void onResult(T obj, List<ObjectInfo> info);
  218. }