You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

KetchLeader.java 19KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637
  1. /*
  2. * Copyright (C) 2016, Google Inc.
  3. * and other copyright owners as documented in the project's IP log.
  4. *
  5. * This program and the accompanying materials are made available
  6. * under the terms of the Eclipse Distribution License v1.0 which
  7. * accompanies this distribution, is reproduced below, and is
  8. * available at http://www.eclipse.org/org/documents/edl-v10.php
  9. *
  10. * All rights reserved.
  11. *
  12. * Redistribution and use in source and binary forms, with or
  13. * without modification, are permitted provided that the following
  14. * conditions are met:
  15. *
  16. * - Redistributions of source code must retain the above copyright
  17. * notice, this list of conditions and the following disclaimer.
  18. *
  19. * - Redistributions in binary form must reproduce the above
  20. * copyright notice, this list of conditions and the following
  21. * disclaimer in the documentation and/or other materials provided
  22. * with the distribution.
  23. *
  24. * - Neither the name of the Eclipse Foundation, Inc. nor the
  25. * names of its contributors may be used to endorse or promote
  26. * products derived from this software without specific prior
  27. * written permission.
  28. *
  29. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
  30. * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
  31. * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
  32. * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  33. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
  34. * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  35. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
  36. * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  37. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
  38. * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
  39. * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  40. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
  41. * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  42. */
  43. package org.eclipse.jgit.internal.ketch;
  44. import static org.eclipse.jgit.internal.ketch.KetchLeader.State.CANDIDATE;
  45. import static org.eclipse.jgit.internal.ketch.KetchLeader.State.LEADER;
  46. import static org.eclipse.jgit.internal.ketch.KetchLeader.State.SHUTDOWN;
  47. import static org.eclipse.jgit.internal.ketch.KetchReplica.Participation.FOLLOWER_ONLY;
  48. import static org.eclipse.jgit.internal.ketch.Proposal.State.QUEUED;
  49. import java.io.IOException;
  50. import java.text.MessageFormat;
  51. import java.util.ArrayList;
  52. import java.util.Arrays;
  53. import java.util.Collection;
  54. import java.util.List;
  55. import java.util.concurrent.locks.Lock;
  56. import java.util.concurrent.locks.ReentrantLock;
  57. import org.eclipse.jgit.internal.storage.reftree.RefTree;
  58. import org.eclipse.jgit.lib.ObjectId;
  59. import org.eclipse.jgit.lib.Repository;
  60. import org.eclipse.jgit.revwalk.RevCommit;
  61. import org.eclipse.jgit.revwalk.RevWalk;
  62. import org.slf4j.Logger;
  63. import org.slf4j.LoggerFactory;
  64. /**
  65. * A leader managing consensus across remote followers.
  66. * <p>
  67. * A leader instance starts up in
  68. * {@link org.eclipse.jgit.internal.ketch.KetchLeader.State#CANDIDATE} and tries
  69. * to begin a new term by sending an
  70. * {@link org.eclipse.jgit.internal.ketch.ElectionRound} to all replicas. Its
  71. * term starts if a majority of replicas have accepted this leader instance for
  72. * the term.
  73. * <p>
  74. * Once elected by a majority the instance enters
  75. * {@link org.eclipse.jgit.internal.ketch.KetchLeader.State#LEADER} and runs
  76. * proposals offered to {@link #queueProposal(Proposal)}. This continues until
  77. * the leader is timed out for inactivity, or is deposed by a competing leader
  78. * gaining its own majority.
  79. * <p>
  80. * Once timed out or deposed this {@code KetchLeader} instance should be
  81. * discarded, and a new instance takes over.
  82. * <p>
  83. * Each leader instance coordinates a group of
  84. * {@link org.eclipse.jgit.internal.ketch.KetchReplica}s. Replica instances are
  85. * owned by the leader instance and must be discarded when the leader is
  86. * discarded.
  87. * <p>
  88. * In Ketch all push requests are issued through the leader. The steps are as
  89. * follows (see {@link org.eclipse.jgit.internal.ketch.KetchPreReceive} for an
  90. * example):
  91. * <ul>
  92. * <li>Create a {@link org.eclipse.jgit.internal.ketch.Proposal} with the
  93. * {@link org.eclipse.jgit.transport.ReceiveCommand}s that represent the push.
  94. * <li>Invoke {@link #queueProposal(Proposal)} on the leader instance.
  95. * <li>Wait for consensus with
  96. * {@link org.eclipse.jgit.internal.ketch.Proposal#await()}.
  97. * <li>To examine the status of the push, check
  98. * {@link org.eclipse.jgit.internal.ketch.Proposal#getCommands()}, looking at
  99. * {@link org.eclipse.jgit.internal.storage.reftree.Command#getResult()}.
  100. * </ul>
  101. * <p>
  102. * The leader gains consensus by first pushing the needed objects and a
  103. * {@link org.eclipse.jgit.internal.storage.reftree.RefTree} representing the
  104. * desired target repository state to the {@code refs/txn/accepted} branch on
  105. * each of the replicas. Once a majority has succeeded, the leader commits the
  106. * state by either pushing the {@code refs/txn/accepted} value to
  107. * {@code refs/txn/committed} (for Ketch-aware replicas) or by pushing updates
  108. * to {@code refs/heads/master}, etc. for stock Git replicas.
  109. * <p>
  110. * Internally, the actual transport to replicas is performed on background
  111. * threads via the {@link org.eclipse.jgit.internal.ketch.KetchSystem}'s
  112. * executor service. For performance, the
  113. * {@link org.eclipse.jgit.internal.ketch.KetchLeader},
  114. * {@link org.eclipse.jgit.internal.ketch.KetchReplica} and
  115. * {@link org.eclipse.jgit.internal.ketch.Proposal} objects share some state,
  116. * and may invoke each other's methods on different threads. This access is
  117. * protected by the leader's {@link #lock} object. Care must be taken to prevent
  118. * concurrent access by correctly obtaining the leader's lock.
  119. */
  120. public abstract class KetchLeader {
  121. private static final Logger log = LoggerFactory.getLogger(KetchLeader.class);
  122. /** Current state of the leader instance. */
  123. public enum State {
  124. /** Newly created instance trying to elect itself leader. */
  125. CANDIDATE,
  126. /** Leader instance elected by a majority. */
  127. LEADER,
  128. /** Instance has been deposed by another with a more recent term. */
  129. DEPOSED,
  130. /** Leader has been gracefully shutdown, e.g. due to inactivity. */
  131. SHUTDOWN;
  132. }
  133. private final KetchSystem system;
  134. /** Leader's knowledge of replicas for this repository. */
  135. private KetchReplica[] voters;
  136. private KetchReplica[] followers;
  137. private LocalReplica self;
  138. /**
  139. * Lock protecting all data within this leader instance.
  140. * <p>
  141. * This lock extends into the {@link KetchReplica} instances used by the
  142. * leader. They share the same lock instance to simplify concurrency.
  143. */
  144. final Lock lock;
  145. private State state = CANDIDATE;
  146. /** Term of this leader, once elected. */
  147. private long term;
  148. /**
  149. * Pending proposals accepted into the queue in FIFO order.
  150. * <p>
  151. * These proposals were preflighted and do not contain any conflicts with
  152. * each other and their expectations matched the leader's local view of the
  153. * agreed upon {@code refs/txn/accepted} tree.
  154. */
  155. private final List<Proposal> queued;
  156. /**
  157. * State of the repository's RefTree after applying all entries in
  158. * {@link #queued}. New proposals must be consistent with this tree to be
  159. * appended to the end of {@link #queued}.
  160. * <p>
  161. * Must be deep-copied with {@link RefTree#copy()} if
  162. * {@link #roundHoldsReferenceToRefTree} is {@code true}.
  163. */
  164. private RefTree refTree;
  165. /**
  166. * If {@code true} {@link #refTree} must be duplicated before queuing the
  167. * next proposal. The {@link #refTree} was passed into the constructor of a
  168. * {@link ProposalRound}, and that external reference to the {@link RefTree}
  169. * object is held by the proposal until it materializes the tree object in
  170. * the object store. This field is set {@code true} when the proposal begins
  171. * execution and set {@code false} once tree objects are persisted in the
  172. * local repository's object store or {@link #refTree} is replaced with a
  173. * copy to isolate it from any running rounds.
  174. * <p>
  175. * If proposals arrive less frequently than the {@code RefTree} is written
  176. * out to the repository the {@link #roundHoldsReferenceToRefTree} behavior
  177. * avoids duplicating {@link #refTree}, reducing both time and memory used.
  178. * However if proposals arrive more frequently {@link #refTree} must be
  179. * duplicated to prevent newly queued proposals from corrupting the
  180. * {@link #runningRound}.
  181. */
  182. volatile boolean roundHoldsReferenceToRefTree;
  183. /** End of the leader's log. */
  184. private LogIndex headIndex;
  185. /** Leader knows this (and all prior) states are committed. */
  186. private LogIndex committedIndex;
  187. /**
  188. * Is the leader idle with no work pending? If {@code true} there is no work
  189. * for the leader (normal state). This field is {@code false} when the
  190. * leader thread is scheduled for execution, or while {@link #runningRound}
  191. * defines a round in progress.
  192. */
  193. private boolean idle;
  194. /** Current round the leader is preparing and waiting for a vote on. */
  195. private Round runningRound;
  196. /**
  197. * Construct a leader for a Ketch instance.
  198. *
  199. * @param system
  200. * Ketch system configuration the leader must adhere to.
  201. */
  202. protected KetchLeader(KetchSystem system) {
  203. this.system = system;
  204. this.lock = new ReentrantLock(true /* fair */);
  205. this.queued = new ArrayList<>(4);
  206. this.idle = true;
  207. }
  208. /** @return system configuration. */
  209. KetchSystem getSystem() {
  210. return system;
  211. }
  212. /**
  213. * Configure the replicas used by this Ketch instance.
  214. * <p>
  215. * Replicas should be configured once at creation before any proposals are
  216. * executed. Once elections happen, <b>reconfiguration is a complicated
  217. * concept that is not currently supported</b>.
  218. *
  219. * @param replicas
  220. * members participating with the same repository.
  221. */
  222. public void setReplicas(Collection<KetchReplica> replicas) {
  223. List<KetchReplica> v = new ArrayList<>(5);
  224. List<KetchReplica> f = new ArrayList<>(5);
  225. for (KetchReplica r : replicas) {
  226. switch (r.getParticipation()) {
  227. case FULL:
  228. v.add(r);
  229. break;
  230. case FOLLOWER_ONLY:
  231. f.add(r);
  232. break;
  233. }
  234. }
  235. Collection<Integer> validVoters = validVoterCounts();
  236. if (!validVoters.contains(Integer.valueOf(v.size()))) {
  237. throw new IllegalArgumentException(MessageFormat.format(
  238. KetchText.get().unsupportedVoterCount,
  239. Integer.valueOf(v.size()),
  240. validVoters));
  241. }
  242. LocalReplica me = findLocal(v);
  243. if (me == null) {
  244. throw new IllegalArgumentException(
  245. KetchText.get().localReplicaRequired);
  246. }
  247. lock.lock();
  248. try {
  249. voters = v.toArray(new KetchReplica[0]);
  250. followers = f.toArray(new KetchReplica[0]);
  251. self = me;
  252. } finally {
  253. lock.unlock();
  254. }
  255. }
  256. private static Collection<Integer> validVoterCounts() {
  257. @SuppressWarnings("boxing")
  258. Integer[] valid = {
  259. // An odd number of voting replicas is required.
  260. 1, 3, 5, 7, 9 };
  261. return Arrays.asList(valid);
  262. }
  263. private static LocalReplica findLocal(Collection<KetchReplica> voters) {
  264. for (KetchReplica r : voters) {
  265. if (r instanceof LocalReplica) {
  266. return (LocalReplica) r;
  267. }
  268. }
  269. return null;
  270. }
  271. /**
  272. * Get an instance of the repository for use by a leader thread.
  273. * <p>
  274. * The caller will close the repository.
  275. *
  276. * @return opened repository for use by the leader thread.
  277. * @throws java.io.IOException
  278. * cannot reopen the repository for the leader.
  279. */
  280. protected abstract Repository openRepository() throws IOException;
  281. /**
  282. * Queue a reference update proposal for consensus.
  283. * <p>
  284. * This method does not wait for consensus to be reached. The proposal is
  285. * checked to look for risks of conflicts, and then submitted into the queue
  286. * for distribution as soon as possible.
  287. * <p>
  288. * Callers must use {@link org.eclipse.jgit.internal.ketch.Proposal#await()}
  289. * to see if the proposal is done.
  290. *
  291. * @param proposal
  292. * the proposed reference updates to queue for consideration.
  293. * Once execution is complete the individual reference result
  294. * fields will be populated with the outcome.
  295. * @throws java.lang.InterruptedException
  296. * current thread was interrupted. The proposal may have been
  297. * aborted if it was not yet queued for execution.
  298. * @throws java.io.IOException
  299. * unrecoverable error preventing proposals from being attempted
  300. * by this leader.
  301. */
  302. public void queueProposal(Proposal proposal)
  303. throws InterruptedException, IOException {
  304. try {
  305. lock.lockInterruptibly();
  306. } catch (InterruptedException e) {
  307. proposal.abort();
  308. throw e;
  309. }
  310. try {
  311. if (refTree == null) {
  312. initialize();
  313. for (Proposal p : queued) {
  314. refTree.apply(p.getCommands());
  315. }
  316. } else if (roundHoldsReferenceToRefTree) {
  317. refTree = refTree.copy();
  318. roundHoldsReferenceToRefTree = false;
  319. }
  320. if (!refTree.apply(proposal.getCommands())) {
  321. // A conflict exists so abort the proposal.
  322. proposal.abort();
  323. return;
  324. }
  325. queued.add(proposal);
  326. proposal.notifyState(QUEUED);
  327. if (idle) {
  328. scheduleLeader();
  329. }
  330. } finally {
  331. lock.unlock();
  332. }
  333. }
  334. private void initialize() throws IOException {
  335. try (Repository git = openRepository(); RevWalk rw = new RevWalk(git)) {
  336. self.initialize(git);
  337. ObjectId accepted = self.getTxnAccepted();
  338. if (!ObjectId.zeroId().equals(accepted)) {
  339. RevCommit c = rw.parseCommit(accepted);
  340. headIndex = LogIndex.unknown(accepted);
  341. refTree = RefTree.read(rw.getObjectReader(), c.getTree());
  342. } else {
  343. headIndex = LogIndex.unknown(ObjectId.zeroId());
  344. refTree = RefTree.newEmptyTree();
  345. }
  346. }
  347. }
  348. private void scheduleLeader() {
  349. idle = false;
  350. system.getExecutor().execute(this::runLeader);
  351. }
  352. private void runLeader() {
  353. Round round;
  354. lock.lock();
  355. try {
  356. switch (state) {
  357. case CANDIDATE:
  358. round = new ElectionRound(this, headIndex);
  359. break;
  360. case LEADER:
  361. round = newProposalRound();
  362. break;
  363. case DEPOSED:
  364. case SHUTDOWN:
  365. default:
  366. log.warn("Leader cannot run {}", state); //$NON-NLS-1$
  367. // TODO(sop): Redirect proposals.
  368. return;
  369. }
  370. } finally {
  371. lock.unlock();
  372. }
  373. try {
  374. round.start();
  375. } catch (IOException e) {
  376. // TODO(sop) Depose leader if it cannot use its repository.
  377. log.error(KetchText.get().leaderFailedToStore, e);
  378. lock.lock();
  379. try {
  380. nextRound();
  381. } finally {
  382. lock.unlock();
  383. }
  384. }
  385. }
  386. private ProposalRound newProposalRound() {
  387. List<Proposal> todo = new ArrayList<>(queued);
  388. queued.clear();
  389. roundHoldsReferenceToRefTree = true;
  390. return new ProposalRound(this, headIndex, todo, refTree);
  391. }
  392. /** @return term of this leader's reign. */
  393. long getTerm() {
  394. return term;
  395. }
  396. /** @return end of the leader's log. */
  397. LogIndex getHead() {
  398. return headIndex;
  399. }
  400. /**
  401. * @return state leader knows it has committed across a quorum of replicas.
  402. */
  403. LogIndex getCommitted() {
  404. return committedIndex;
  405. }
  406. boolean isIdle() {
  407. return idle;
  408. }
  409. void runAsync(Round round) {
  410. lock.lock();
  411. try {
  412. // End of the log is this round. Once transport begins it is
  413. // reasonable to assume at least one replica will eventually get
  414. // this, and there is reasonable probability it commits.
  415. headIndex = round.acceptedNewIndex;
  416. runningRound = round;
  417. for (KetchReplica replica : voters) {
  418. replica.pushTxnAcceptedAsync(round);
  419. }
  420. for (KetchReplica replica : followers) {
  421. replica.pushTxnAcceptedAsync(round);
  422. }
  423. } finally {
  424. lock.unlock();
  425. }
  426. }
  427. /**
  428. * Asynchronous signal from a replica after completion.
  429. * <p>
  430. * Must be called while {@link #lock} is held by the replica.
  431. *
  432. * @param replica
  433. * replica posting a completion event.
  434. */
  435. void onReplicaUpdate(KetchReplica replica) {
  436. if (log.isDebugEnabled()) {
  437. log.debug("Replica {} finished:\n{}", //$NON-NLS-1$
  438. replica.describeForLog(), snapshot());
  439. }
  440. if (replica.getParticipation() == FOLLOWER_ONLY) {
  441. // Followers cannot vote, so votes haven't changed.
  442. return;
  443. } else if (runningRound == null) {
  444. // No round running, no need to tally votes.
  445. return;
  446. }
  447. assert headIndex.equals(runningRound.acceptedNewIndex);
  448. int matching = 0;
  449. for (KetchReplica r : voters) {
  450. if (r.hasAccepted(headIndex)) {
  451. matching++;
  452. }
  453. }
  454. int quorum = voters.length / 2 + 1;
  455. boolean success = matching >= quorum;
  456. if (!success) {
  457. return;
  458. }
  459. switch (state) {
  460. case CANDIDATE:
  461. term = ((ElectionRound) runningRound).getTerm();
  462. state = LEADER;
  463. if (log.isDebugEnabled()) {
  464. log.debug("Won election, running term " + term); //$NON-NLS-1$
  465. }
  466. //$FALL-THROUGH$
  467. case LEADER:
  468. committedIndex = headIndex;
  469. if (log.isDebugEnabled()) {
  470. log.debug("Committed {} in term {}", //$NON-NLS-1$
  471. committedIndex.describeForLog(),
  472. Long.valueOf(term));
  473. }
  474. nextRound();
  475. commitAsync(replica);
  476. notifySuccess(runningRound);
  477. if (log.isDebugEnabled()) {
  478. log.debug("Leader state:\n{}", snapshot()); //$NON-NLS-1$
  479. }
  480. break;
  481. default:
  482. log.debug("Leader ignoring replica while in {}", state); //$NON-NLS-1$
  483. break;
  484. }
  485. }
  486. private void notifySuccess(Round round) {
  487. // Drop the leader lock while notifying Proposal listeners.
  488. lock.unlock();
  489. try {
  490. round.success();
  491. } finally {
  492. lock.lock();
  493. }
  494. }
  495. private void commitAsync(KetchReplica caller) {
  496. for (KetchReplica r : voters) {
  497. if (r == caller) {
  498. continue;
  499. }
  500. if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) {
  501. r.pushCommitAsync(committedIndex);
  502. }
  503. }
  504. for (KetchReplica r : followers) {
  505. if (r == caller) {
  506. continue;
  507. }
  508. if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) {
  509. r.pushCommitAsync(committedIndex);
  510. }
  511. }
  512. }
  513. /** Schedule the next round; invoked while {@link #lock} is held. */
  514. void nextRound() {
  515. runningRound = null;
  516. if (queued.isEmpty()) {
  517. idle = true;
  518. } else {
  519. // Caller holds lock. Reschedule leader on a new thread so
  520. // the call stack can unwind and lock is not held unexpectedly
  521. // during prepare for the next round.
  522. scheduleLeader();
  523. }
  524. }
  525. /**
  526. * Snapshot this leader
  527. *
  528. * @return snapshot of this leader
  529. */
  530. public LeaderSnapshot snapshot() {
  531. lock.lock();
  532. try {
  533. LeaderSnapshot s = new LeaderSnapshot();
  534. s.state = state;
  535. s.term = term;
  536. s.headIndex = headIndex;
  537. s.committedIndex = committedIndex;
  538. s.idle = isIdle();
  539. for (KetchReplica r : voters) {
  540. s.replicas.add(r.snapshot());
  541. }
  542. for (KetchReplica r : followers) {
  543. s.replicas.add(r.snapshot());
  544. }
  545. return s;
  546. } finally {
  547. lock.unlock();
  548. }
  549. }
  550. /**
  551. * Gracefully shutdown this leader and cancel outstanding operations.
  552. */
  553. public void shutdown() {
  554. lock.lock();
  555. try {
  556. if (state != SHUTDOWN) {
  557. state = SHUTDOWN;
  558. for (KetchReplica r : voters) {
  559. r.shutdown();
  560. }
  561. for (KetchReplica r : followers) {
  562. r.shutdown();
  563. }
  564. }
  565. } finally {
  566. lock.unlock();
  567. }
  568. }
  569. /** {@inheritDoc} */
  570. @Override
  571. public String toString() {
  572. return snapshot().toString();
  573. }
  574. }