You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

KetchLeader.java 17KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604
  1. /*
  2. * Copyright (C) 2016, Google Inc. and others
  3. *
  4. * This program and the accompanying materials are made available under the
  5. * terms of the Eclipse Distribution License v. 1.0 which is available at
  6. * https://www.eclipse.org/org/documents/edl-v10.php.
  7. *
  8. * SPDX-License-Identifier: BSD-3-Clause
  9. */
  10. package org.eclipse.jgit.internal.ketch;
  11. import static org.eclipse.jgit.internal.ketch.KetchLeader.State.CANDIDATE;
  12. import static org.eclipse.jgit.internal.ketch.KetchLeader.State.LEADER;
  13. import static org.eclipse.jgit.internal.ketch.KetchLeader.State.SHUTDOWN;
  14. import static org.eclipse.jgit.internal.ketch.KetchReplica.Participation.FOLLOWER_ONLY;
  15. import static org.eclipse.jgit.internal.ketch.Proposal.State.QUEUED;
  16. import java.io.IOException;
  17. import java.text.MessageFormat;
  18. import java.util.ArrayList;
  19. import java.util.Arrays;
  20. import java.util.Collection;
  21. import java.util.List;
  22. import java.util.concurrent.locks.Lock;
  23. import java.util.concurrent.locks.ReentrantLock;
  24. import org.eclipse.jgit.internal.storage.reftree.RefTree;
  25. import org.eclipse.jgit.lib.ObjectId;
  26. import org.eclipse.jgit.lib.Repository;
  27. import org.eclipse.jgit.revwalk.RevCommit;
  28. import org.eclipse.jgit.revwalk.RevWalk;
  29. import org.slf4j.Logger;
  30. import org.slf4j.LoggerFactory;
  31. /**
  32. * A leader managing consensus across remote followers.
  33. * <p>
  34. * A leader instance starts up in
  35. * {@link org.eclipse.jgit.internal.ketch.KetchLeader.State#CANDIDATE} and tries
  36. * to begin a new term by sending an
  37. * {@link org.eclipse.jgit.internal.ketch.ElectionRound} to all replicas. Its
  38. * term starts if a majority of replicas have accepted this leader instance for
  39. * the term.
  40. * <p>
  41. * Once elected by a majority the instance enters
  42. * {@link org.eclipse.jgit.internal.ketch.KetchLeader.State#LEADER} and runs
  43. * proposals offered to {@link #queueProposal(Proposal)}. This continues until
  44. * the leader is timed out for inactivity, or is deposed by a competing leader
  45. * gaining its own majority.
  46. * <p>
  47. * Once timed out or deposed this {@code KetchLeader} instance should be
  48. * discarded, and a new instance takes over.
  49. * <p>
  50. * Each leader instance coordinates a group of
  51. * {@link org.eclipse.jgit.internal.ketch.KetchReplica}s. Replica instances are
  52. * owned by the leader instance and must be discarded when the leader is
  53. * discarded.
  54. * <p>
  55. * In Ketch all push requests are issued through the leader. The steps are as
  56. * follows (see {@link org.eclipse.jgit.internal.ketch.KetchPreReceive} for an
  57. * example):
  58. * <ul>
  59. * <li>Create a {@link org.eclipse.jgit.internal.ketch.Proposal} with the
  60. * {@link org.eclipse.jgit.transport.ReceiveCommand}s that represent the push.
  61. * <li>Invoke {@link #queueProposal(Proposal)} on the leader instance.
  62. * <li>Wait for consensus with
  63. * {@link org.eclipse.jgit.internal.ketch.Proposal#await()}.
  64. * <li>To examine the status of the push, check
  65. * {@link org.eclipse.jgit.internal.ketch.Proposal#getCommands()}, looking at
  66. * {@link org.eclipse.jgit.internal.storage.reftree.Command#getResult()}.
  67. * </ul>
  68. * <p>
  69. * The leader gains consensus by first pushing the needed objects and a
  70. * {@link org.eclipse.jgit.internal.storage.reftree.RefTree} representing the
  71. * desired target repository state to the {@code refs/txn/accepted} branch on
  72. * each of the replicas. Once a majority has succeeded, the leader commits the
  73. * state by either pushing the {@code refs/txn/accepted} value to
  74. * {@code refs/txn/committed} (for Ketch-aware replicas) or by pushing updates
  75. * to {@code refs/heads/master}, etc. for stock Git replicas.
  76. * <p>
  77. * Internally, the actual transport to replicas is performed on background
  78. * threads via the {@link org.eclipse.jgit.internal.ketch.KetchSystem}'s
  79. * executor service. For performance, the
  80. * {@link org.eclipse.jgit.internal.ketch.KetchLeader},
  81. * {@link org.eclipse.jgit.internal.ketch.KetchReplica} and
  82. * {@link org.eclipse.jgit.internal.ketch.Proposal} objects share some state,
  83. * and may invoke each other's methods on different threads. This access is
  84. * protected by the leader's {@link #lock} object. Care must be taken to prevent
  85. * concurrent access by correctly obtaining the leader's lock.
  86. */
  87. public abstract class KetchLeader {
  88. private static final Logger log = LoggerFactory.getLogger(KetchLeader.class);
  89. /** Current state of the leader instance. */
  90. public enum State {
  91. /** Newly created instance trying to elect itself leader. */
  92. CANDIDATE,
  93. /** Leader instance elected by a majority. */
  94. LEADER,
  95. /** Instance has been deposed by another with a more recent term. */
  96. DEPOSED,
  97. /** Leader has been gracefully shutdown, e.g. due to inactivity. */
  98. SHUTDOWN;
  99. }
  100. private final KetchSystem system;
  101. /** Leader's knowledge of replicas for this repository. */
  102. private KetchReplica[] voters;
  103. private KetchReplica[] followers;
  104. private LocalReplica self;
  105. /**
  106. * Lock protecting all data within this leader instance.
  107. * <p>
  108. * This lock extends into the {@link KetchReplica} instances used by the
  109. * leader. They share the same lock instance to simplify concurrency.
  110. */
  111. final Lock lock;
  112. private State state = CANDIDATE;
  113. /** Term of this leader, once elected. */
  114. private long term;
  115. /**
  116. * Pending proposals accepted into the queue in FIFO order.
  117. * <p>
  118. * These proposals were preflighted and do not contain any conflicts with
  119. * each other and their expectations matched the leader's local view of the
  120. * agreed upon {@code refs/txn/accepted} tree.
  121. */
  122. private final List<Proposal> queued;
  123. /**
  124. * State of the repository's RefTree after applying all entries in
  125. * {@link #queued}. New proposals must be consistent with this tree to be
  126. * appended to the end of {@link #queued}.
  127. * <p>
  128. * Must be deep-copied with {@link RefTree#copy()} if
  129. * {@link #roundHoldsReferenceToRefTree} is {@code true}.
  130. */
  131. private RefTree refTree;
  132. /**
  133. * If {@code true} {@link #refTree} must be duplicated before queuing the
  134. * next proposal. The {@link #refTree} was passed into the constructor of a
  135. * {@link ProposalRound}, and that external reference to the {@link RefTree}
  136. * object is held by the proposal until it materializes the tree object in
  137. * the object store. This field is set {@code true} when the proposal begins
  138. * execution and set {@code false} once tree objects are persisted in the
  139. * local repository's object store or {@link #refTree} is replaced with a
  140. * copy to isolate it from any running rounds.
  141. * <p>
  142. * If proposals arrive less frequently than the {@code RefTree} is written
  143. * out to the repository the {@link #roundHoldsReferenceToRefTree} behavior
  144. * avoids duplicating {@link #refTree}, reducing both time and memory used.
  145. * However if proposals arrive more frequently {@link #refTree} must be
  146. * duplicated to prevent newly queued proposals from corrupting the
  147. * {@link #runningRound}.
  148. */
  149. volatile boolean roundHoldsReferenceToRefTree;
  150. /** End of the leader's log. */
  151. private LogIndex headIndex;
  152. /** Leader knows this (and all prior) states are committed. */
  153. private LogIndex committedIndex;
  154. /**
  155. * Is the leader idle with no work pending? If {@code true} there is no work
  156. * for the leader (normal state). This field is {@code false} when the
  157. * leader thread is scheduled for execution, or while {@link #runningRound}
  158. * defines a round in progress.
  159. */
  160. private boolean idle;
  161. /** Current round the leader is preparing and waiting for a vote on. */
  162. private Round runningRound;
  163. /**
  164. * Construct a leader for a Ketch instance.
  165. *
  166. * @param system
  167. * Ketch system configuration the leader must adhere to.
  168. */
  169. protected KetchLeader(KetchSystem system) {
  170. this.system = system;
  171. this.lock = new ReentrantLock(true /* fair */);
  172. this.queued = new ArrayList<>(4);
  173. this.idle = true;
  174. }
  175. /** @return system configuration. */
  176. KetchSystem getSystem() {
  177. return system;
  178. }
  179. /**
  180. * Configure the replicas used by this Ketch instance.
  181. * <p>
  182. * Replicas should be configured once at creation before any proposals are
  183. * executed. Once elections happen, <b>reconfiguration is a complicated
  184. * concept that is not currently supported</b>.
  185. *
  186. * @param replicas
  187. * members participating with the same repository.
  188. */
  189. public void setReplicas(Collection<KetchReplica> replicas) {
  190. List<KetchReplica> v = new ArrayList<>(5);
  191. List<KetchReplica> f = new ArrayList<>(5);
  192. for (KetchReplica r : replicas) {
  193. switch (r.getParticipation()) {
  194. case FULL:
  195. v.add(r);
  196. break;
  197. case FOLLOWER_ONLY:
  198. f.add(r);
  199. break;
  200. }
  201. }
  202. Collection<Integer> validVoters = validVoterCounts();
  203. if (!validVoters.contains(Integer.valueOf(v.size()))) {
  204. throw new IllegalArgumentException(MessageFormat.format(
  205. KetchText.get().unsupportedVoterCount,
  206. Integer.valueOf(v.size()),
  207. validVoters));
  208. }
  209. LocalReplica me = findLocal(v);
  210. if (me == null) {
  211. throw new IllegalArgumentException(
  212. KetchText.get().localReplicaRequired);
  213. }
  214. lock.lock();
  215. try {
  216. voters = v.toArray(new KetchReplica[0]);
  217. followers = f.toArray(new KetchReplica[0]);
  218. self = me;
  219. } finally {
  220. lock.unlock();
  221. }
  222. }
  223. private static Collection<Integer> validVoterCounts() {
  224. @SuppressWarnings("boxing")
  225. Integer[] valid = {
  226. // An odd number of voting replicas is required.
  227. 1, 3, 5, 7, 9 };
  228. return Arrays.asList(valid);
  229. }
  230. private static LocalReplica findLocal(Collection<KetchReplica> voters) {
  231. for (KetchReplica r : voters) {
  232. if (r instanceof LocalReplica) {
  233. return (LocalReplica) r;
  234. }
  235. }
  236. return null;
  237. }
  238. /**
  239. * Get an instance of the repository for use by a leader thread.
  240. * <p>
  241. * The caller will close the repository.
  242. *
  243. * @return opened repository for use by the leader thread.
  244. * @throws java.io.IOException
  245. * cannot reopen the repository for the leader.
  246. */
  247. protected abstract Repository openRepository() throws IOException;
  248. /**
  249. * Queue a reference update proposal for consensus.
  250. * <p>
  251. * This method does not wait for consensus to be reached. The proposal is
  252. * checked to look for risks of conflicts, and then submitted into the queue
  253. * for distribution as soon as possible.
  254. * <p>
  255. * Callers must use {@link org.eclipse.jgit.internal.ketch.Proposal#await()}
  256. * to see if the proposal is done.
  257. *
  258. * @param proposal
  259. * the proposed reference updates to queue for consideration.
  260. * Once execution is complete the individual reference result
  261. * fields will be populated with the outcome.
  262. * @throws java.lang.InterruptedException
  263. * current thread was interrupted. The proposal may have been
  264. * aborted if it was not yet queued for execution.
  265. * @throws java.io.IOException
  266. * unrecoverable error preventing proposals from being attempted
  267. * by this leader.
  268. */
  269. public void queueProposal(Proposal proposal)
  270. throws InterruptedException, IOException {
  271. try {
  272. lock.lockInterruptibly();
  273. } catch (InterruptedException e) {
  274. proposal.abort();
  275. throw e;
  276. }
  277. try {
  278. if (refTree == null) {
  279. initialize();
  280. for (Proposal p : queued) {
  281. refTree.apply(p.getCommands());
  282. }
  283. } else if (roundHoldsReferenceToRefTree) {
  284. refTree = refTree.copy();
  285. roundHoldsReferenceToRefTree = false;
  286. }
  287. if (!refTree.apply(proposal.getCommands())) {
  288. // A conflict exists so abort the proposal.
  289. proposal.abort();
  290. return;
  291. }
  292. queued.add(proposal);
  293. proposal.notifyState(QUEUED);
  294. if (idle) {
  295. scheduleLeader();
  296. }
  297. } finally {
  298. lock.unlock();
  299. }
  300. }
  301. private void initialize() throws IOException {
  302. try (Repository git = openRepository(); RevWalk rw = new RevWalk(git)) {
  303. self.initialize(git);
  304. ObjectId accepted = self.getTxnAccepted();
  305. if (!ObjectId.zeroId().equals(accepted)) {
  306. RevCommit c = rw.parseCommit(accepted);
  307. headIndex = LogIndex.unknown(accepted);
  308. refTree = RefTree.read(rw.getObjectReader(), c.getTree());
  309. } else {
  310. headIndex = LogIndex.unknown(ObjectId.zeroId());
  311. refTree = RefTree.newEmptyTree();
  312. }
  313. }
  314. }
  315. private void scheduleLeader() {
  316. idle = false;
  317. system.getExecutor().execute(this::runLeader);
  318. }
  319. private void runLeader() {
  320. Round round;
  321. lock.lock();
  322. try {
  323. switch (state) {
  324. case CANDIDATE:
  325. round = new ElectionRound(this, headIndex);
  326. break;
  327. case LEADER:
  328. round = newProposalRound();
  329. break;
  330. case DEPOSED:
  331. case SHUTDOWN:
  332. default:
  333. log.warn("Leader cannot run {}", state); //$NON-NLS-1$
  334. // TODO(sop): Redirect proposals.
  335. return;
  336. }
  337. } finally {
  338. lock.unlock();
  339. }
  340. try {
  341. round.start();
  342. } catch (IOException e) {
  343. // TODO(sop) Depose leader if it cannot use its repository.
  344. log.error(KetchText.get().leaderFailedToStore, e);
  345. lock.lock();
  346. try {
  347. nextRound();
  348. } finally {
  349. lock.unlock();
  350. }
  351. }
  352. }
  353. private ProposalRound newProposalRound() {
  354. List<Proposal> todo = new ArrayList<>(queued);
  355. queued.clear();
  356. roundHoldsReferenceToRefTree = true;
  357. return new ProposalRound(this, headIndex, todo, refTree);
  358. }
  359. /** @return term of this leader's reign. */
  360. long getTerm() {
  361. return term;
  362. }
  363. /** @return end of the leader's log. */
  364. LogIndex getHead() {
  365. return headIndex;
  366. }
  367. /**
  368. * @return state leader knows it has committed across a quorum of replicas.
  369. */
  370. LogIndex getCommitted() {
  371. return committedIndex;
  372. }
  373. boolean isIdle() {
  374. return idle;
  375. }
  376. void runAsync(Round round) {
  377. lock.lock();
  378. try {
  379. // End of the log is this round. Once transport begins it is
  380. // reasonable to assume at least one replica will eventually get
  381. // this, and there is reasonable probability it commits.
  382. headIndex = round.acceptedNewIndex;
  383. runningRound = round;
  384. for (KetchReplica replica : voters) {
  385. replica.pushTxnAcceptedAsync(round);
  386. }
  387. for (KetchReplica replica : followers) {
  388. replica.pushTxnAcceptedAsync(round);
  389. }
  390. } finally {
  391. lock.unlock();
  392. }
  393. }
  394. /**
  395. * Asynchronous signal from a replica after completion.
  396. * <p>
  397. * Must be called while {@link #lock} is held by the replica.
  398. *
  399. * @param replica
  400. * replica posting a completion event.
  401. */
  402. void onReplicaUpdate(KetchReplica replica) {
  403. if (log.isDebugEnabled()) {
  404. log.debug("Replica {} finished:\n{}", //$NON-NLS-1$
  405. replica.describeForLog(), snapshot());
  406. }
  407. if (replica.getParticipation() == FOLLOWER_ONLY) {
  408. // Followers cannot vote, so votes haven't changed.
  409. return;
  410. } else if (runningRound == null) {
  411. // No round running, no need to tally votes.
  412. return;
  413. }
  414. assert headIndex.equals(runningRound.acceptedNewIndex);
  415. int matching = 0;
  416. for (KetchReplica r : voters) {
  417. if (r.hasAccepted(headIndex)) {
  418. matching++;
  419. }
  420. }
  421. int quorum = voters.length / 2 + 1;
  422. boolean success = matching >= quorum;
  423. if (!success) {
  424. return;
  425. }
  426. switch (state) {
  427. case CANDIDATE:
  428. term = ((ElectionRound) runningRound).getTerm();
  429. state = LEADER;
  430. if (log.isDebugEnabled()) {
  431. log.debug("Won election, running term " + term); //$NON-NLS-1$
  432. }
  433. //$FALL-THROUGH$
  434. case LEADER:
  435. committedIndex = headIndex;
  436. if (log.isDebugEnabled()) {
  437. log.debug("Committed {} in term {}", //$NON-NLS-1$
  438. committedIndex.describeForLog(),
  439. Long.valueOf(term));
  440. }
  441. nextRound();
  442. commitAsync(replica);
  443. notifySuccess(runningRound);
  444. if (log.isDebugEnabled()) {
  445. log.debug("Leader state:\n{}", snapshot()); //$NON-NLS-1$
  446. }
  447. break;
  448. default:
  449. log.debug("Leader ignoring replica while in {}", state); //$NON-NLS-1$
  450. break;
  451. }
  452. }
  453. private void notifySuccess(Round round) {
  454. // Drop the leader lock while notifying Proposal listeners.
  455. lock.unlock();
  456. try {
  457. round.success();
  458. } finally {
  459. lock.lock();
  460. }
  461. }
  462. private void commitAsync(KetchReplica caller) {
  463. for (KetchReplica r : voters) {
  464. if (r == caller) {
  465. continue;
  466. }
  467. if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) {
  468. r.pushCommitAsync(committedIndex);
  469. }
  470. }
  471. for (KetchReplica r : followers) {
  472. if (r == caller) {
  473. continue;
  474. }
  475. if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) {
  476. r.pushCommitAsync(committedIndex);
  477. }
  478. }
  479. }
  480. /** Schedule the next round; invoked while {@link #lock} is held. */
  481. void nextRound() {
  482. runningRound = null;
  483. if (queued.isEmpty()) {
  484. idle = true;
  485. } else {
  486. // Caller holds lock. Reschedule leader on a new thread so
  487. // the call stack can unwind and lock is not held unexpectedly
  488. // during prepare for the next round.
  489. scheduleLeader();
  490. }
  491. }
  492. /**
  493. * Snapshot this leader
  494. *
  495. * @return snapshot of this leader
  496. */
  497. public LeaderSnapshot snapshot() {
  498. lock.lock();
  499. try {
  500. LeaderSnapshot s = new LeaderSnapshot();
  501. s.state = state;
  502. s.term = term;
  503. s.headIndex = headIndex;
  504. s.committedIndex = committedIndex;
  505. s.idle = isIdle();
  506. for (KetchReplica r : voters) {
  507. s.replicas.add(r.snapshot());
  508. }
  509. for (KetchReplica r : followers) {
  510. s.replicas.add(r.snapshot());
  511. }
  512. return s;
  513. } finally {
  514. lock.unlock();
  515. }
  516. }
  517. /**
  518. * Gracefully shutdown this leader and cancel outstanding operations.
  519. */
  520. public void shutdown() {
  521. lock.lock();
  522. try {
  523. if (state != SHUTDOWN) {
  524. state = SHUTDOWN;
  525. for (KetchReplica r : voters) {
  526. r.shutdown();
  527. }
  528. for (KetchReplica r : followers) {
  529. r.shutdown();
  530. }
  531. }
  532. } finally {
  533. lock.unlock();
  534. }
  535. }
  536. /** {@inheritDoc} */
  537. @Override
  538. public String toString() {
  539. return snapshot().toString();
  540. }
  541. }