You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

KetchSystem.java 11KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. /*
  2. * Copyright (C) 2016, Google Inc.
  3. * and other copyright owners as documented in the project's IP log.
  4. *
  5. * This program and the accompanying materials are made available
  6. * under the terms of the Eclipse Distribution License v1.0 which
  7. * accompanies this distribution, is reproduced below, and is
  8. * available at http://www.eclipse.org/org/documents/edl-v10.php
  9. *
  10. * All rights reserved.
  11. *
  12. * Redistribution and use in source and binary forms, with or
  13. * without modification, are permitted provided that the following
  14. * conditions are met:
  15. *
  16. * - Redistributions of source code must retain the above copyright
  17. * notice, this list of conditions and the following disclaimer.
  18. *
  19. * - Redistributions in binary form must reproduce the above
  20. * copyright notice, this list of conditions and the following
  21. * disclaimer in the documentation and/or other materials provided
  22. * with the distribution.
  23. *
  24. * - Neither the name of the Eclipse Foundation, Inc. nor the
  25. * names of its contributors may be used to endorse or promote
  26. * products derived from this software without specific prior
  27. * written permission.
  28. *
  29. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
  30. * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
  31. * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
  32. * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  33. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
  34. * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  35. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
  36. * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  37. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
  38. * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
  39. * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  40. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
  41. * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  42. */
  43. package org.eclipse.jgit.internal.ketch;
  44. import static org.eclipse.jgit.internal.ketch.KetchConstants.ACCEPTED;
  45. import static org.eclipse.jgit.internal.ketch.KetchConstants.COMMITTED;
  46. import static org.eclipse.jgit.internal.ketch.KetchConstants.CONFIG_KEY_TYPE;
  47. import static org.eclipse.jgit.internal.ketch.KetchConstants.CONFIG_SECTION_KETCH;
  48. import static org.eclipse.jgit.internal.ketch.KetchConstants.DEFAULT_TXN_NAMESPACE;
  49. import static org.eclipse.jgit.internal.ketch.KetchConstants.STAGE;
  50. import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_NAME;
  51. import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_REMOTE;
  52. import java.net.URISyntaxException;
  53. import java.time.Duration;
  54. import java.util.ArrayList;
  55. import java.util.List;
  56. import java.util.Random;
  57. import java.util.concurrent.Executors;
  58. import java.util.concurrent.ScheduledExecutorService;
  59. import java.util.concurrent.ThreadFactory;
  60. import java.util.concurrent.atomic.AtomicInteger;
  61. import org.eclipse.jgit.annotations.Nullable;
  62. import org.eclipse.jgit.lib.Config;
  63. import org.eclipse.jgit.lib.PersonIdent;
  64. import org.eclipse.jgit.lib.Repository;
  65. import org.eclipse.jgit.transport.RemoteConfig;
  66. import org.eclipse.jgit.transport.URIish;
  67. import org.eclipse.jgit.util.time.MonotonicClock;
  68. import org.eclipse.jgit.util.time.MonotonicSystemClock;
  69. import org.eclipse.jgit.util.time.ProposedTimestamp;
  70. import org.slf4j.Logger;
  71. import org.slf4j.LoggerFactory;
  72. /**
  73. * Ketch system-wide configuration.
  74. * <p>
  75. * This class provides useful defaults for testing and small proof of concepts.
  76. * Full scale installations are expected to subclass and override methods to
  77. * provide consistent configuration across all managed repositories.
  78. * <p>
  79. * Servers should configure their own {@link ScheduledExecutorService}.
  80. */
  81. public class KetchSystem {
  82. private static final Random RNG = new Random();
  83. /** @return default executor, one thread per available processor. */
  84. public static ScheduledExecutorService defaultExecutor() {
  85. return DefaultExecutorHolder.I;
  86. }
  87. private final ScheduledExecutorService executor;
  88. private final MonotonicClock clock;
  89. private final String txnNamespace;
  90. private final String txnAccepted;
  91. private final String txnCommitted;
  92. private final String txnStage;
  93. /** Create a default system with a thread pool of 1 thread per CPU. */
  94. public KetchSystem() {
  95. this(defaultExecutor(), new MonotonicSystemClock(), DEFAULT_TXN_NAMESPACE);
  96. }
  97. /**
  98. * Create a Ketch system with the provided executor service.
  99. *
  100. * @param executor
  101. * thread pool to run background operations.
  102. * @param clock
  103. * clock to create timestamps.
  104. * @param txnNamespace
  105. * reference namespace for the RefTree graph and associated
  106. * transaction state. Must begin with {@code "refs/"} and end
  107. * with {@code '/'}, for example {@code "refs/txn/"}.
  108. */
  109. public KetchSystem(ScheduledExecutorService executor, MonotonicClock clock,
  110. String txnNamespace) {
  111. this.executor = executor;
  112. this.clock = clock;
  113. this.txnNamespace = txnNamespace;
  114. this.txnAccepted = txnNamespace + ACCEPTED;
  115. this.txnCommitted = txnNamespace + COMMITTED;
  116. this.txnStage = txnNamespace + STAGE;
  117. }
  118. /** @return executor to perform background operations. */
  119. public ScheduledExecutorService getExecutor() {
  120. return executor;
  121. }
  122. /** @return clock to obtain timestamps from. */
  123. public MonotonicClock getClock() {
  124. return clock;
  125. }
  126. /**
  127. * @return how long the leader will wait for the {@link #getClock()}'s
  128. * {@code ProposedTimestamp} used in commits proposed to the RefTree
  129. * graph ({@link #getTxnAccepted()}). Defaults to 5 seconds.
  130. */
  131. public Duration getMaxWaitForMonotonicClock() {
  132. return Duration.ofSeconds(5);
  133. }
  134. /**
  135. * @return true if elections should require monotonically increasing commit
  136. * timestamps. This requires a very good {@link MonotonicClock}.
  137. */
  138. public boolean requireMonotonicLeaderElections() {
  139. return false;
  140. }
  141. /**
  142. * Get the namespace used for the RefTree graph and transaction management.
  143. *
  144. * @return reference namespace such as {@code "refs/txn/"}.
  145. */
  146. public String getTxnNamespace() {
  147. return txnNamespace;
  148. }
  149. /** @return name of the accepted RefTree graph. */
  150. public String getTxnAccepted() {
  151. return txnAccepted;
  152. }
  153. /** @return name of the committed RefTree graph. */
  154. public String getTxnCommitted() {
  155. return txnCommitted;
  156. }
  157. /** @return prefix for staged objects, e.g. {@code "refs/txn/stage/"}. */
  158. public String getTxnStage() {
  159. return txnStage;
  160. }
  161. /**
  162. * @param time
  163. * timestamp for the committer.
  164. * @return identity line for the committer header of a RefTreeGraph.
  165. */
  166. public PersonIdent newCommitter(ProposedTimestamp time) {
  167. String name = "ketch"; //$NON-NLS-1$
  168. String email = "ketch@system"; //$NON-NLS-1$
  169. return new PersonIdent(name, email, time);
  170. }
  171. /**
  172. * Construct a random tag to identify a candidate during leader election.
  173. * <p>
  174. * Multiple processes trying to elect themselves leaders at exactly the same
  175. * time (rounded to seconds) using the same
  176. * {@link #newCommitter(ProposedTimestamp)} identity strings, for the same
  177. * term, may generate the same ObjectId for the election commit and falsely
  178. * assume they have both won.
  179. * <p>
  180. * Candidates add this tag to their election ballot commit to disambiguate
  181. * the election. The tag only needs to be unique for a given triplet of
  182. * {@link #newCommitter(ProposedTimestamp)}, system time (rounded to
  183. * seconds), and term. If every replica in the system uses a unique
  184. * {@code newCommitter} (such as including the host name after the
  185. * {@code "@"} in the email address) the tag could be the empty string.
  186. * <p>
  187. * The default implementation generates a few bytes of random data.
  188. *
  189. * @return unique tag; null or empty string if {@code newCommitter()} is
  190. * sufficiently unique to identify the leader.
  191. */
  192. @Nullable
  193. public String newLeaderTag() {
  194. int n = RNG.nextInt(1 << (6 * 4));
  195. return String.format("%06x", Integer.valueOf(n)); //$NON-NLS-1$
  196. }
  197. /**
  198. * Construct the KetchLeader instance of a repository.
  199. *
  200. * @param repo
  201. * local repository stored by the leader.
  202. * @return leader instance.
  203. * @throws URISyntaxException
  204. * a follower configuration contains an unsupported URI.
  205. */
  206. public KetchLeader createLeader(final Repository repo)
  207. throws URISyntaxException {
  208. KetchLeader leader = new KetchLeader(this) {
  209. @Override
  210. protected Repository openRepository() {
  211. repo.incrementOpen();
  212. return repo;
  213. }
  214. };
  215. leader.setReplicas(createReplicas(leader, repo));
  216. return leader;
  217. }
  218. /**
  219. * Get the collection of replicas for a repository.
  220. * <p>
  221. * The collection of replicas must include the local repository.
  222. *
  223. * @param leader
  224. * the leader driving these replicas.
  225. * @param repo
  226. * repository to get the replicas of.
  227. * @return collection of replicas for the specified repository.
  228. * @throws URISyntaxException
  229. * a configured URI is invalid.
  230. */
  231. protected List<KetchReplica> createReplicas(KetchLeader leader,
  232. Repository repo) throws URISyntaxException {
  233. List<KetchReplica> replicas = new ArrayList<>();
  234. Config cfg = repo.getConfig();
  235. String localName = getLocalName(cfg);
  236. for (String name : cfg.getSubsections(CONFIG_KEY_REMOTE)) {
  237. if (!hasParticipation(cfg, name)) {
  238. continue;
  239. }
  240. ReplicaConfig kc = ReplicaConfig.newFromConfig(cfg, name);
  241. if (name.equals(localName)) {
  242. replicas.add(new LocalReplica(leader, name, kc));
  243. continue;
  244. }
  245. RemoteConfig rc = new RemoteConfig(cfg, name);
  246. List<URIish> uris = rc.getPushURIs();
  247. if (uris.isEmpty()) {
  248. uris = rc.getURIs();
  249. }
  250. for (URIish uri : uris) {
  251. String n = uris.size() == 1 ? name : uri.getHost();
  252. replicas.add(new RemoteGitReplica(leader, n, uri, kc, rc));
  253. }
  254. }
  255. return replicas;
  256. }
  257. private static boolean hasParticipation(Config cfg, String name) {
  258. return cfg.getString(CONFIG_KEY_REMOTE, name, CONFIG_KEY_TYPE) != null;
  259. }
  260. private static String getLocalName(Config cfg) {
  261. return cfg.getString(CONFIG_SECTION_KETCH, null, CONFIG_KEY_NAME);
  262. }
  263. static class DefaultExecutorHolder {
  264. private static final Logger log = LoggerFactory.getLogger(KetchSystem.class);
  265. static final ScheduledExecutorService I = create();
  266. private static ScheduledExecutorService create() {
  267. int cores = Runtime.getRuntime().availableProcessors();
  268. int threads = Math.max(5, cores);
  269. log.info("Using {} threads", Integer.valueOf(threads)); //$NON-NLS-1$
  270. return Executors.newScheduledThreadPool(
  271. threads,
  272. new ThreadFactory() {
  273. private final AtomicInteger threadCnt = new AtomicInteger();
  274. @Override
  275. public Thread newThread(Runnable r) {
  276. int id = threadCnt.incrementAndGet();
  277. Thread thr = new Thread(r);
  278. thr.setName("KetchExecutor-" + id); //$NON-NLS-1$
  279. return thr;
  280. }
  281. });
  282. }
  283. private DefaultExecutorHolder() {
  284. }
  285. }
  286. /**
  287. * Compute a delay in a {@code min..max} interval with random jitter.
  288. *
  289. * @param last
  290. * amount of delay waited before the last attempt. This is used
  291. * to seed the next delay interval. Should be 0 if there was no
  292. * prior delay.
  293. * @param min
  294. * shortest amount of allowable delay between attempts.
  295. * @param max
  296. * longest amount of allowable delay between attempts.
  297. * @return new amount of delay to wait before the next attempt.
  298. */
  299. static long delay(long last, long min, long max) {
  300. long r = Math.max(0, last * 3 - min);
  301. if (r > 0) {
  302. int c = (int) Math.min(r + 1, Integer.MAX_VALUE);
  303. r = RNG.nextInt(c);
  304. }
  305. return Math.max(Math.min(min + r, max), min);
  306. }
  307. }