123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415 |
- /*
- * Copyright (C) 2016, Google Inc. and others
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Distribution License v. 1.0 which is available at
- * https://www.eclipse.org/org/documents/edl-v10.php.
- *
- * SPDX-License-Identifier: BSD-3-Clause
- */
-
- package org.eclipse.jgit.internal.ketch;
-
- import static org.eclipse.jgit.internal.ketch.Proposal.State.ABORTED;
- import static org.eclipse.jgit.internal.ketch.Proposal.State.EXECUTED;
- import static org.eclipse.jgit.internal.ketch.Proposal.State.NEW;
- import static org.eclipse.jgit.transport.ReceiveCommand.Result.NOT_ATTEMPTED;
- import static org.eclipse.jgit.transport.ReceiveCommand.Result.OK;
-
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.Collection;
- import java.util.Collections;
- import java.util.List;
- import java.util.concurrent.CopyOnWriteArrayList;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.atomic.AtomicReference;
-
- import org.eclipse.jgit.annotations.Nullable;
- import org.eclipse.jgit.errors.MissingObjectException;
- import org.eclipse.jgit.internal.storage.reftree.Command;
- import org.eclipse.jgit.lib.ObjectId;
- import org.eclipse.jgit.lib.PersonIdent;
- import org.eclipse.jgit.lib.Ref;
- import org.eclipse.jgit.revwalk.RevWalk;
- import org.eclipse.jgit.transport.PushCertificate;
- import org.eclipse.jgit.transport.ReceiveCommand;
- import org.eclipse.jgit.util.time.ProposedTimestamp;
-
- /**
- * A proposal to be applied in a Ketch system.
- * <p>
- * Pushing to a Ketch leader results in the leader making a proposal. The
- * proposal includes the list of reference updates. The leader attempts to send
- * the proposal to a quorum of replicas by pushing the proposal to a "staging"
- * area under the {@code refs/txn/stage/} namespace. If the proposal succeeds
- * then the changes are durable and the leader can commit the proposal.
- * <p>
- * Proposals are executed by
- * {@link org.eclipse.jgit.internal.ketch.KetchLeader#queueProposal(Proposal)},
- * which runs them asynchronously in the background. Proposals are thread-safe
- * futures allowing callers to {@link #await()} for results or be notified by
- * callback using {@link #addListener(Runnable)}.
- */
- public class Proposal {
- /** Current state of the proposal. */
- public enum State {
- /** Proposal has not yet been given to a {@link KetchLeader}. */
- NEW(false),
-
- /**
- * Proposal was validated and has entered the queue, but a round
- * containing this proposal has not started yet.
- */
- QUEUED(false),
-
- /** Round containing the proposal has begun and is in progress. */
- RUNNING(false),
-
- /**
- * Proposal was executed through a round. Individual results from
- * {@link Proposal#getCommands()}, {@link Command#getResult()} explain
- * the success or failure outcome.
- */
- EXECUTED(true),
-
- /** Proposal was aborted and did not reach consensus. */
- ABORTED(true);
-
- private final boolean done;
-
- private State(boolean done) {
- this.done = done;
- }
-
- /** @return true if this is a terminal state. */
- public boolean isDone() {
- return done;
- }
- }
-
- private final List<Command> commands;
- private PersonIdent author;
- private String message;
- private PushCertificate pushCert;
-
- private List<ProposedTimestamp> timestamps;
- private final List<Runnable> listeners = new CopyOnWriteArrayList<>();
- private final AtomicReference<State> state = new AtomicReference<>(NEW);
-
- /**
- * Create a proposal from a list of Ketch commands.
- *
- * @param cmds
- * prepared list of commands.
- */
- public Proposal(List<Command> cmds) {
- commands = Collections.unmodifiableList(new ArrayList<>(cmds));
- }
-
- /**
- * Create a proposal from a collection of received commands.
- *
- * @param rw
- * walker to assist in preparing commands.
- * @param cmds
- * list of pending commands.
- * @throws org.eclipse.jgit.errors.MissingObjectException
- * newId of a command is not found locally.
- * @throws java.io.IOException
- * local objects cannot be accessed.
- */
- public Proposal(RevWalk rw, Collection<ReceiveCommand> cmds)
- throws MissingObjectException, IOException {
- commands = asCommandList(rw, cmds);
- }
-
- private static List<Command> asCommandList(RevWalk rw,
- Collection<ReceiveCommand> cmds)
- throws MissingObjectException, IOException {
- List<Command> commands = new ArrayList<>(cmds.size());
- for (ReceiveCommand cmd : cmds) {
- commands.add(new Command(rw, cmd));
- }
- return Collections.unmodifiableList(commands);
- }
-
- /**
- * Get commands from this proposal.
- *
- * @return commands from this proposal.
- */
- public Collection<Command> getCommands() {
- return commands;
- }
-
- /**
- * Get optional author of the proposal.
- *
- * @return optional author of the proposal.
- */
- @Nullable
- public PersonIdent getAuthor() {
- return author;
- }
-
- /**
- * Set the author for the proposal.
- *
- * @param who
- * optional identity of the author of the proposal.
- * @return {@code this}
- */
- public Proposal setAuthor(@Nullable PersonIdent who) {
- author = who;
- return this;
- }
-
- /**
- * Get optional message for the commit log of the RefTree.
- *
- * @return optional message for the commit log of the RefTree.
- */
- @Nullable
- public String getMessage() {
- return message;
- }
-
- /**
- * Set the message to appear in the commit log of the RefTree.
- *
- * @param msg
- * message text for the commit.
- * @return {@code this}
- */
- public Proposal setMessage(@Nullable String msg) {
- message = msg != null && !msg.isEmpty() ? msg : null;
- return this;
- }
-
- /**
- * Get optional certificate signing the references.
- *
- * @return optional certificate signing the references.
- */
- @Nullable
- public PushCertificate getPushCertificate() {
- return pushCert;
- }
-
- /**
- * Set the push certificate signing the references.
- *
- * @param cert
- * certificate, may be null.
- * @return {@code this}
- */
- public Proposal setPushCertificate(@Nullable PushCertificate cert) {
- pushCert = cert;
- return this;
- }
-
- /**
- * Get timestamps that Ketch must block for.
- *
- * @return timestamps that Ketch must block for. These may have been used as
- * commit times inside the objects involved in the proposal.
- */
- public List<ProposedTimestamp> getProposedTimestamps() {
- if (timestamps != null) {
- return timestamps;
- }
- return Collections.emptyList();
- }
-
- /**
- * Request the proposal to wait for the affected timestamps to resolve.
- *
- * @param ts
- * a {@link org.eclipse.jgit.util.time.ProposedTimestamp} object.
- * @return {@code this}.
- */
- public Proposal addProposedTimestamp(ProposedTimestamp ts) {
- if (timestamps == null) {
- timestamps = new ArrayList<>(4);
- }
- timestamps.add(ts);
- return this;
- }
-
- /**
- * Add a callback to be invoked when the proposal is done.
- * <p>
- * A proposal is done when it has entered either
- * {@link org.eclipse.jgit.internal.ketch.Proposal.State#EXECUTED} or
- * {@link org.eclipse.jgit.internal.ketch.Proposal.State#ABORTED} state. If
- * the proposal is already done {@code callback.run()} is immediately
- * invoked on the caller's thread.
- *
- * @param callback
- * method to run after the proposal is done. The callback may be
- * run on a Ketch system thread and should be completed quickly.
- */
- public void addListener(Runnable callback) {
- boolean runNow = false;
- synchronized (state) {
- if (state.get().isDone()) {
- runNow = true;
- } else {
- listeners.add(callback);
- }
- }
- if (runNow) {
- callback.run();
- }
- }
-
- /** Set command result as OK. */
- void success() {
- for (Command c : commands) {
- if (c.getResult() == NOT_ATTEMPTED) {
- c.setResult(OK);
- }
- }
- notifyState(EXECUTED);
- }
-
- /** Mark commands as "transaction aborted". */
- void abort() {
- Command.abort(commands, null);
- notifyState(ABORTED);
- }
-
- /**
- * Read the current state of the proposal.
- *
- * @return read the current state of the proposal.
- */
- public State getState() {
- return state.get();
- }
-
- /**
- * Whether the proposal was attempted
- *
- * @return {@code true} if the proposal was attempted. A true value does not
- * mean consensus was reached, only that the proposal was considered
- * and will not be making any more progress beyond its current
- * state.
- */
- public boolean isDone() {
- return state.get().isDone();
- }
-
- /**
- * Wait for the proposal to be attempted and {@link #isDone()} to be true.
- *
- * @throws java.lang.InterruptedException
- * caller was interrupted before proposal executed.
- */
- public void await() throws InterruptedException {
- synchronized (state) {
- while (!state.get().isDone()) {
- state.wait();
- }
- }
- }
-
- /**
- * Wait for the proposal to be attempted and {@link #isDone()} to be true.
- *
- * @param wait
- * how long to wait.
- * @param unit
- * unit describing the wait time.
- * @return true if the proposal is done; false if the method timed out.
- * @throws java.lang.InterruptedException
- * caller was interrupted before proposal executed.
- */
- public boolean await(long wait, TimeUnit unit) throws InterruptedException {
- synchronized (state) {
- if (state.get().isDone()) {
- return true;
- }
- state.wait(unit.toMillis(wait));
- return state.get().isDone();
- }
- }
-
- /**
- * Wait for the proposal to exit a state.
- *
- * @param notIn
- * state the proposal should not be in to return.
- * @param wait
- * how long to wait.
- * @param unit
- * unit describing the wait time.
- * @return true if the proposal exited the state; false on time out.
- * @throws java.lang.InterruptedException
- * caller was interrupted before proposal executed.
- */
- public boolean awaitStateChange(State notIn, long wait, TimeUnit unit)
- throws InterruptedException {
- synchronized (state) {
- if (state.get() != notIn) {
- return true;
- }
- state.wait(unit.toMillis(wait));
- return state.get() != notIn;
- }
- }
-
- void notifyState(State s) {
- synchronized (state) {
- state.set(s);
- state.notifyAll();
- }
- if (s.isDone()) {
- for (Runnable callback : listeners) {
- callback.run();
- }
- listeners.clear();
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public String toString() {
- StringBuilder s = new StringBuilder();
- s.append("Ketch Proposal {\n"); //$NON-NLS-1$
- s.append(" ").append(state.get()).append('\n'); //$NON-NLS-1$
- if (author != null) {
- s.append(" author ").append(author).append('\n'); //$NON-NLS-1$
- }
- if (message != null) {
- s.append(" message ").append(message).append('\n'); //$NON-NLS-1$
- }
- for (Command c : commands) {
- s.append(" "); //$NON-NLS-1$
- format(s, c.getOldRef(), "CREATE"); //$NON-NLS-1$
- s.append(' ');
- format(s, c.getNewRef(), "DELETE"); //$NON-NLS-1$
- s.append(' ').append(c.getRefName());
- if (c.getResult() != ReceiveCommand.Result.NOT_ATTEMPTED) {
- s.append(' ').append(c.getResult()); // $NON-NLS-1$
- }
- s.append('\n');
- }
- s.append('}');
- return s.toString();
- }
-
- private static void format(StringBuilder s, @Nullable Ref r, String n) {
- if (r == null) {
- s.append(n);
- } else if (r.isSymbolic()) {
- s.append(r.getTarget().getName());
- } else {
- ObjectId id = r.getObjectId();
- if (id != null) {
- s.append(id.abbreviate(8).name());
- }
- }
- }
- }
|