You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

Prefetcher.java 12KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. /*
  2. * Copyright (C) 2011, Google Inc.
  3. * and other copyright owners as documented in the project's IP log.
  4. *
  5. * This program and the accompanying materials are made available
  6. * under the terms of the Eclipse Distribution License v1.0 which
  7. * accompanies this distribution, is reproduced below, and is
  8. * available at http://www.eclipse.org/org/documents/edl-v10.php
  9. *
  10. * All rights reserved.
  11. *
  12. * Redistribution and use in source and binary forms, with or
  13. * without modification, are permitted provided that the following
  14. * conditions are met:
  15. *
  16. * - Redistributions of source code must retain the above copyright
  17. * notice, this list of conditions and the following disclaimer.
  18. *
  19. * - Redistributions in binary form must reproduce the above
  20. * copyright notice, this list of conditions and the following
  21. * disclaimer in the documentation and/or other materials provided
  22. * with the distribution.
  23. *
  24. * - Neither the name of the Eclipse Foundation, Inc. nor the
  25. * names of its contributors may be used to endorse or promote
  26. * products derived from this software without specific prior
  27. * written permission.
  28. *
  29. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
  30. * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
  31. * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
  32. * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  33. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
  34. * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  35. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
  36. * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  37. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
  38. * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
  39. * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  40. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
  41. * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  42. */
  43. package org.eclipse.jgit.storage.dht;
  44. import static org.eclipse.jgit.lib.Constants.OBJ_COMMIT;
  45. import static org.eclipse.jgit.lib.Constants.OBJ_TREE;
  46. import java.util.ArrayList;
  47. import java.util.Collection;
  48. import java.util.Collections;
  49. import java.util.HashMap;
  50. import java.util.HashSet;
  51. import java.util.LinkedHashSet;
  52. import java.util.LinkedList;
  53. import java.util.List;
  54. import java.util.Map;
  55. import java.util.Set;
  56. import java.util.concurrent.TimeoutException;
  57. import org.eclipse.jgit.errors.MissingObjectException;
  58. import org.eclipse.jgit.generated.storage.dht.proto.GitStore.ChunkMeta;
  59. import org.eclipse.jgit.lib.AnyObjectId;
  60. import org.eclipse.jgit.revwalk.RevCommit;
  61. import org.eclipse.jgit.revwalk.RevTree;
  62. import org.eclipse.jgit.storage.dht.DhtReader.ChunkAndOffset;
  63. import org.eclipse.jgit.storage.dht.spi.Context;
  64. import org.eclipse.jgit.storage.dht.spi.Database;
  65. class Prefetcher implements StreamingCallback<Collection<PackChunk.Members>> {
  66. private static enum Status {
  67. ON_QUEUE, LOADING, WAITING, READY, DONE;
  68. }
  69. private final Database db;
  70. private final DhtReader.Statistics stats;
  71. private final int objectType;
  72. private final HashMap<ChunkKey, PackChunk> ready;
  73. private final HashMap<ChunkKey, Status> status;
  74. private final LinkedList<ChunkKey> queue;
  75. private final boolean followEdgeHints;
  76. private final int averageChunkSize;
  77. private final int highWaterMark;
  78. private final int lowWaterMark;
  79. private boolean first = true;
  80. private boolean automaticallyPushHints = true;
  81. private ChunkKey stopAt;
  82. private int bytesReady;
  83. private int bytesLoading;
  84. private DhtException error;
  85. Prefetcher(DhtReader reader, int objectType, int prefetchLimitInBytes) {
  86. this.db = reader.getDatabase();
  87. this.stats = reader.getStatistics();
  88. this.objectType = objectType;
  89. this.ready = new HashMap<ChunkKey, PackChunk>();
  90. this.status = new HashMap<ChunkKey, Status>();
  91. this.queue = new LinkedList<ChunkKey>();
  92. this.followEdgeHints = reader.getOptions().isPrefetchFollowEdgeHints();
  93. this.averageChunkSize = reader.getInserterOptions().getChunkSize();
  94. this.highWaterMark = prefetchLimitInBytes;
  95. int lwm = (highWaterMark / averageChunkSize) - 4;
  96. if (lwm <= 0)
  97. lwm = (highWaterMark / averageChunkSize) / 2;
  98. lowWaterMark = lwm * averageChunkSize;
  99. }
  100. boolean isType(int type) {
  101. return objectType == type;
  102. }
  103. void push(DhtReader ctx, Collection<RevCommit> roots) {
  104. // Approximate walk by using hints from the most recent commit.
  105. // Since the commits were recently parsed by the reader, we can
  106. // ask the reader for their chunk locations and most likely get
  107. // cache hits.
  108. int time = -1;
  109. PackChunk chunk = null;
  110. for (RevCommit cmit : roots) {
  111. if (time < cmit.getCommitTime()) {
  112. ChunkAndOffset p = ctx.getChunkGently(cmit);
  113. if (p != null && p.chunk.getMeta() != null) {
  114. time = cmit.getCommitTime();
  115. chunk = p.chunk;
  116. }
  117. }
  118. }
  119. if (chunk != null) {
  120. synchronized (this) {
  121. status.put(chunk.getChunkKey(), Status.DONE);
  122. push(chunk.getMeta());
  123. }
  124. }
  125. }
  126. void push(DhtReader ctx, RevTree start, RevTree end) throws DhtException,
  127. MissingObjectException {
  128. // Unlike commits, trees aren't likely to be loaded when they
  129. // are pushed into the prefetcher. Find the tree and load it
  130. // as necessary to get the prefetch meta established.
  131. //
  132. Sync<Map<ObjectIndexKey, Collection<ObjectInfo>>> sync = Sync.create();
  133. Set<ObjectIndexKey> toFind = new HashSet<ObjectIndexKey>();
  134. toFind.add(ObjectIndexKey.create(ctx.getRepositoryKey(), start));
  135. toFind.add(ObjectIndexKey.create(ctx.getRepositoryKey(), end));
  136. db.objectIndex().get(Context.READ_REPAIR, toFind, sync);
  137. Map<ObjectIndexKey, Collection<ObjectInfo>> trees;
  138. try {
  139. trees = sync.get(ctx.getOptions().getTimeout());
  140. } catch (InterruptedException e) {
  141. throw new DhtTimeoutException(e);
  142. } catch (TimeoutException e) {
  143. throw new DhtTimeoutException(e);
  144. }
  145. ChunkKey startKey = chunk(trees.get(start));
  146. if (startKey == null)
  147. throw DhtReader.missing(start, OBJ_TREE);
  148. ChunkKey endKey = chunk(trees.get(end));
  149. if (endKey == null)
  150. throw DhtReader.missing(end, OBJ_TREE);
  151. synchronized (this) {
  152. stopAt = endKey;
  153. push(startKey);
  154. maybeStartGet();
  155. }
  156. }
  157. private static ChunkKey chunk(Collection<ObjectInfo> info) {
  158. if (info == null || info.isEmpty())
  159. return null;
  160. List<ObjectInfo> infoList = new ArrayList<ObjectInfo>(info);
  161. ObjectInfo.sort(infoList);
  162. return infoList.get(0).getChunkKey();
  163. }
  164. void push(ChunkKey key) {
  165. push(Collections.singleton(key));
  166. }
  167. void push(ChunkMeta meta) {
  168. if (meta == null)
  169. return;
  170. ChunkMeta.PrefetchHint hint;
  171. switch (objectType) {
  172. case OBJ_COMMIT:
  173. hint = meta.getCommitPrefetch();
  174. break;
  175. case OBJ_TREE:
  176. hint = meta.getTreePrefetch();
  177. break;
  178. default:
  179. return;
  180. }
  181. if (hint != null) {
  182. synchronized (this) {
  183. if (followEdgeHints && 0 < hint.getEdgeCount())
  184. push(hint.getEdgeList());
  185. else
  186. push(hint.getSequentialList());
  187. }
  188. }
  189. }
  190. private void push(List<String> list) {
  191. List<ChunkKey> keys = new ArrayList<ChunkKey>(list.size());
  192. for (String keyString : list)
  193. keys.add(ChunkKey.fromString(keyString));
  194. push(keys);
  195. }
  196. void push(Iterable<ChunkKey> list) {
  197. synchronized (this) {
  198. for (ChunkKey key : list) {
  199. if (status.containsKey(key))
  200. continue;
  201. status.put(key, Status.ON_QUEUE);
  202. queue.add(key);
  203. if (key.equals(stopAt)) {
  204. automaticallyPushHints = false;
  205. break;
  206. }
  207. }
  208. if (!first)
  209. maybeStartGet();
  210. }
  211. }
  212. synchronized ChunkAndOffset find(RepositoryKey repo, AnyObjectId objId) {
  213. for (PackChunk c : ready.values()) {
  214. int p = c.findOffset(repo, objId);
  215. if (0 <= p)
  216. return new ChunkAndOffset(useReadyChunk(c.getChunkKey()), p);
  217. }
  218. return null;
  219. }
  220. synchronized PackChunk get(ChunkKey key) throws DhtException {
  221. GET: for (;;) {
  222. if (error != null)
  223. throw error;
  224. Status chunkStatus = status.get(key);
  225. if (chunkStatus == null)
  226. return null;
  227. switch (chunkStatus) {
  228. case ON_QUEUE:
  229. if (queue.isEmpty()) {
  230. // Should never happen, but let the caller load.
  231. status.put(key, Status.DONE);
  232. return null;
  233. } else if (bytesReady + bytesLoading < highWaterMark) {
  234. // Make sure its first in the queue, start, and wait.
  235. if (!queue.getFirst().equals(key)) {
  236. int idx = queue.indexOf(key);
  237. if (first && objectType == OBJ_COMMIT) {
  238. // If the prefetcher has not started yet, skip all
  239. // chunks up to this first request. Assume this
  240. // initial out-of-order get occurred because the
  241. // RevWalk has already parsed all of the commits
  242. // up to this point and does not need them again.
  243. //
  244. for (; 0 < idx; idx--)
  245. status.put(queue.removeFirst(), Status.DONE);
  246. forceStartGet();
  247. continue GET;
  248. }
  249. stats.access(key).cntPrefetcher_OutOfOrder++;
  250. queue.remove(idx);
  251. queue.addFirst(key);
  252. }
  253. forceStartGet();
  254. continue GET;
  255. } else {
  256. // It cannot be moved up to the front of the queue
  257. // without violating the prefetch size. Let the
  258. // caller load the chunk out of order.
  259. stats.access(key).cntPrefetcher_OutOfOrder++;
  260. status.put(key, Status.DONE);
  261. return null;
  262. }
  263. case LOADING: // Wait for a prefetch that is already started.
  264. status.put(key, Status.WAITING);
  265. //$FALL-THROUGH$
  266. case WAITING:
  267. stats.access(key).cntPrefetcher_WaitedForLoad++;
  268. try {
  269. wait();
  270. } catch (InterruptedException e) {
  271. throw new DhtTimeoutException(e);
  272. }
  273. continue GET;
  274. case READY:
  275. return useReadyChunk(key);
  276. case DONE:
  277. stats.access(key).cntPrefetcher_Revisited++;
  278. return null;
  279. default:
  280. throw new IllegalStateException(key + " " + chunkStatus);
  281. }
  282. }
  283. }
  284. private PackChunk useReadyChunk(ChunkKey key) {
  285. PackChunk chunk = ready.remove(key);
  286. status.put(chunk.getChunkKey(), Status.DONE);
  287. bytesReady -= chunk.getTotalSize();
  288. if (automaticallyPushHints) {
  289. push(chunk.getMeta());
  290. maybeStartGet();
  291. }
  292. return chunk;
  293. }
  294. private void maybeStartGet() {
  295. if (!queue.isEmpty() && bytesReady + bytesLoading <= lowWaterMark)
  296. forceStartGet();
  297. }
  298. private void forceStartGet() {
  299. // Use a LinkedHashSet so insertion order is iteration order.
  300. // This may help a provider that loads sequentially in the
  301. // set's iterator order to load in the order we want data.
  302. //
  303. LinkedHashSet<ChunkKey> toLoad = new LinkedHashSet<ChunkKey>();
  304. while (bytesReady + bytesLoading < highWaterMark && !queue.isEmpty()) {
  305. ChunkKey key = queue.removeFirst();
  306. stats.access(key).cntPrefetcher_Load++;
  307. toLoad.add(key);
  308. status.put(key, Status.LOADING);
  309. bytesLoading += averageChunkSize;
  310. // For the first chunk, start immediately to reduce the
  311. // startup latency associated with additional chunks.
  312. if (first)
  313. break;
  314. }
  315. if (!toLoad.isEmpty() && error == null)
  316. db.chunk().get(Context.LOCAL, toLoad, this);
  317. if (first) {
  318. first = false;
  319. maybeStartGet();
  320. }
  321. }
  322. public synchronized void onPartialResult(Collection<PackChunk.Members> res) {
  323. try {
  324. bytesLoading -= averageChunkSize * res.size();
  325. for (PackChunk.Members builder : res)
  326. chunkIsReady(builder.build());
  327. } catch (DhtException loadError) {
  328. onError(loadError);
  329. }
  330. }
  331. private void chunkIsReady(PackChunk chunk) {
  332. ChunkKey key = chunk.getChunkKey();
  333. ready.put(key, chunk);
  334. bytesReady += chunk.getTotalSize();
  335. if (status.put(key, Status.READY) == Status.WAITING)
  336. notifyAll();
  337. }
  338. public synchronized void onSuccess(Collection<PackChunk.Members> result) {
  339. if (result != null && !result.isEmpty())
  340. onPartialResult(result);
  341. }
  342. public synchronized void onFailure(DhtException asyncError) {
  343. onError(asyncError);
  344. }
  345. private void onError(DhtException asyncError) {
  346. if (error == null) {
  347. error = asyncError;
  348. notifyAll();
  349. }
  350. }
  351. }