Browse Source

Support atomic push in JGit client

This should mirror the behavior of `git push --atomic` where the
client asks the server to apply all-or-nothing. Some JGit servers
already support this based on a custom DFS backend. InMemoryRepository
is extended to support atomic push for unit testing purposes.

Local disk server side support inside of JGit is a more complex animal
due to the excessive amount of file locking required to protect every
reference as a loose reference.

Change-Id: I15083fbe48447678e034afeffb4639572a32f50c
tags/v4.2.0.201601211800-r
Shawn Pearce 8 years ago
parent
commit
3d8e6b1e16

+ 4
- 0
org.eclipse.jgit.pgm/src/org/eclipse/jgit/pgm/Push.java View File

@@ -82,6 +82,9 @@ class Push extends TextBuiltin {
@Option(name = "--all")
private boolean all;

@Option(name = "--atomic")
private boolean atomic;

@Option(name = "--tags")
private boolean tags;

@@ -122,6 +125,7 @@ class Push extends TextBuiltin {
push.setPushTags();
push.setRemote(remote);
push.setThin(thin);
push.setAtomic(atomic);
push.setTimeout(timeout);
Iterable<PushResult> results = push.call();
for (PushResult result : results) {

+ 200
- 0
org.eclipse.jgit.test/tst/org/eclipse/jgit/transport/AtomicPushTest.java View File

@@ -0,0 +1,200 @@
/*
* Copyright (C) 2015, Google Inc.
* and other copyright owners as documented in the project's IP log.
*
* This program and the accompanying materials are made available
* under the terms of the Eclipse Distribution License v1.0 which
* accompanies this distribution, is reproduced below, and is
* available at http://www.eclipse.org/org/documents/edl-v10.php
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided
* with the distribution.
*
* - Neither the name of the Eclipse Foundation, Inc. nor the
* names of its contributors may be used to endorse or promote
* products derived from this software without specific prior
* written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
* CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package org.eclipse.jgit.transport;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.eclipse.jgit.errors.TransportException;
import org.eclipse.jgit.internal.JGitText;
import org.eclipse.jgit.internal.storage.dfs.DfsRepositoryDescription;
import org.eclipse.jgit.internal.storage.dfs.InMemoryRepository;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.NullProgressMonitor;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.ObjectInserter;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.transport.resolver.ReceivePackFactory;
import org.eclipse.jgit.transport.resolver.ServiceNotAuthorizedException;
import org.eclipse.jgit.transport.resolver.ServiceNotEnabledException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class AtomicPushTest {
private URIish uri;
private TestProtocol<Object> testProtocol;
private Object ctx = new Object();
private InMemoryRepository server;
private InMemoryRepository client;
private ObjectId obj1;
private ObjectId obj2;

@Before
public void setUp() throws Exception {
server = newRepo("server");
client = newRepo("client");
testProtocol = new TestProtocol<>(
null,
new ReceivePackFactory<Object>() {
@Override
public ReceivePack create(Object req, Repository db)
throws ServiceNotEnabledException,
ServiceNotAuthorizedException {
return new ReceivePack(db);
}
});
uri = testProtocol.register(ctx, server);

try (ObjectInserter ins = client.newObjectInserter()) {
obj1 = ins.insert(Constants.OBJ_BLOB, Constants.encode("test"));
obj2 = ins.insert(Constants.OBJ_BLOB, Constants.encode("file"));
ins.flush();
}
}

@After
public void tearDown() {
Transport.unregister(testProtocol);
}

private static InMemoryRepository newRepo(String name) {
return new InMemoryRepository(new DfsRepositoryDescription(name));
}

@Test
public void pushNonAtomic() throws Exception {
PushResult r;
server.setPerformsAtomicTransactions(false);
Transport tn = testProtocol.open(uri, client, "server");
try {
tn.setPushAtomic(false);
r = tn.push(NullProgressMonitor.INSTANCE, commands());
} finally {
tn.close();
}

RemoteRefUpdate one = r.getRemoteUpdate("refs/heads/one");
RemoteRefUpdate two = r.getRemoteUpdate("refs/heads/two");
assertSame(RemoteRefUpdate.Status.OK, one.getStatus());
assertSame(
RemoteRefUpdate.Status.REJECTED_REMOTE_CHANGED,
two.getStatus());
}

@Test
public void pushAtomicClientGivesUpEarly() throws Exception {
PushResult r;
Transport tn = testProtocol.open(uri, client, "server");
try {
tn.setPushAtomic(true);
r = tn.push(NullProgressMonitor.INSTANCE, commands());
} finally {
tn.close();
}

RemoteRefUpdate one = r.getRemoteUpdate("refs/heads/one");
RemoteRefUpdate two = r.getRemoteUpdate("refs/heads/two");
assertSame(
RemoteRefUpdate.Status.REJECTED_OTHER_REASON,
one.getStatus());
assertSame(
RemoteRefUpdate.Status.REJECTED_REMOTE_CHANGED,
two.getStatus());
assertEquals(JGitText.get().transactionAborted, one.getMessage());
}

@Test
public void pushAtomicDisabled() throws Exception {
List<RemoteRefUpdate> cmds = new ArrayList<>();
cmds.add(new RemoteRefUpdate(
null, null,
obj1, "refs/heads/one",
true /* force update */,
null /* no local tracking ref */,
ObjectId.zeroId()));
cmds.add(new RemoteRefUpdate(
null, null,
obj2, "refs/heads/two",
true /* force update */,
null /* no local tracking ref */,
ObjectId.zeroId()));

server.setPerformsAtomicTransactions(false);
Transport tn = testProtocol.open(uri, client, "server");
try {
tn.setPushAtomic(true);
tn.push(NullProgressMonitor.INSTANCE, cmds);
fail("did not throw TransportException");
} catch (TransportException e) {
assertEquals(
uri + ": " + JGitText.get().atomicPushNotSupported,
e.getMessage());
} finally {
tn.close();
}
}

private List<RemoteRefUpdate> commands() throws IOException {
List<RemoteRefUpdate> cmds = new ArrayList<>();
cmds.add(new RemoteRefUpdate(
null, null,
obj1, "refs/heads/one",
true /* force update */,
null /* no local tracking ref */,
ObjectId.zeroId()));
cmds.add(new RemoteRefUpdate(
null, null,
obj2, "refs/heads/two",
true /* force update */,
null /* no local tracking ref */,
obj1));
return cmds;
}
}

+ 1
- 0
org.eclipse.jgit/resources/org/eclipse/jgit/internal/JGitText.properties View File

@@ -20,6 +20,7 @@ argumentIsNotAValidCommentString=Invalid comment: {0}
atLeastOnePathIsRequired=At least one path is required.
atLeastOnePatternIsRequired=At least one pattern is required.
atLeastTwoFiltersNeeded=At least two filters needed.
atomicPushNotSupported=Atomic push not supported.
authenticationNotSupported=authentication not supported
badBase64InputCharacterAt=Bad Base64 input character at {0} : {1} (decimal)
badEntryDelimiter=Bad entry delimiter

+ 25
- 2
org.eclipse.jgit/src/org/eclipse/jgit/api/PushCommand.java View File

@@ -89,9 +89,8 @@ public class PushCommand extends
private String receivePack = RemoteConfig.DEFAULT_RECEIVE_PACK;

private boolean dryRun;
private boolean atomic;
private boolean force;

private boolean thin = Transport.DEFAULT_PUSH_THIN;

private OutputStream out;
@@ -145,6 +144,7 @@ public class PushCommand extends
transports = Transport.openAll(repo, remote, Transport.Operation.PUSH);
for (final Transport transport : transports) {
transport.setPushThin(thin);
transport.setPushAtomic(atomic);
if (receivePack != null)
transport.setOptionReceivePack(receivePack);
transport.setDryRun(dryRun);
@@ -396,6 +396,29 @@ public class PushCommand extends
return this;
}

/**
* @return true if all-or-nothing behavior is requested.
* @since 4.2
*/
public boolean isAtomic() {
return atomic;
}

/**
* Requests atomic push (all references updated, or no updates).
*
* Default setting is false.
*
* @param atomic
* @return {@code this}
* @since 4.2
*/
public PushCommand setAtomic(boolean atomic) {
checkCallable();
this.atomic = atomic;
return this;
}

/**
* @return the force preference for push operation
*/

+ 1
- 0
org.eclipse.jgit/src/org/eclipse/jgit/internal/JGitText.java View File

@@ -79,6 +79,7 @@ public class JGitText extends TranslationBundle {
/***/ public String atLeastOnePathIsRequired;
/***/ public String atLeastOnePatternIsRequired;
/***/ public String atLeastTwoFiltersNeeded;
/***/ public String atomicPushNotSupported;
/***/ public String authenticationNotSupported;
/***/ public String badBase64InputCharacterAt;
/***/ public String badEntryDelimiter;

+ 154
- 26
org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/InMemoryRepository.java View File

@@ -13,14 +13,22 @@ import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.eclipse.jgit.internal.JGitText;
import org.eclipse.jgit.internal.storage.pack.PackExt;
import org.eclipse.jgit.lib.BatchRefUpdate;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.ObjectIdRef;
import org.eclipse.jgit.lib.ProgressMonitor;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Ref.Storage;
import org.eclipse.jgit.lib.SymbolicRef;
import org.eclipse.jgit.revwalk.RevObject;
import org.eclipse.jgit.revwalk.RevTag;
import org.eclipse.jgit.revwalk.RevWalk;
import org.eclipse.jgit.transport.ReceiveCommand;
import org.eclipse.jgit.util.RefList;

/**
@@ -46,8 +54,8 @@ public class InMemoryRepository extends DfsRepository {
static final AtomicInteger packId = new AtomicInteger();

private final DfsObjDatabase objdb;

private final DfsRefDatabase refdb;
private boolean performsAtomicTransactions = true;

/**
* Initialize a new in-memory repository.
@@ -76,6 +84,17 @@ public class InMemoryRepository extends DfsRepository {
return refdb;
}

/**
* Enable (or disable) the atomic reference transaction support.
* <p>
* Useful for testing atomic support enabled or disabled.
*
* @param atomic
*/
public void setPerformsAtomicTransactions(boolean atomic) {
performsAtomicTransactions = atomic;
}

private class MemObjDatabase extends DfsObjDatabase {
private List<DfsPackDescription> packs = new ArrayList<DfsPackDescription>();

@@ -235,41 +254,143 @@ public class InMemoryRepository extends DfsRepository {

private class MemRefDatabase extends DfsRefDatabase {
private final ConcurrentMap<String, Ref> refs = new ConcurrentHashMap<String, Ref>();
private final ReadWriteLock lock = new ReentrantReadWriteLock(true /* fair */);

MemRefDatabase() {
super(InMemoryRepository.this);
}

@Override
public boolean performsAtomicTransactions() {
return performsAtomicTransactions;
}

@Override
public BatchRefUpdate newBatchUpdate() {
return new BatchRefUpdate(this) {
@Override
public void execute(RevWalk walk, ProgressMonitor monitor)
throws IOException {
if (performsAtomicTransactions()) {
try {
lock.writeLock().lock();
batch(walk, getCommands());
} finally {
lock.writeLock().unlock();
}
} else {
super.execute(walk, monitor);
}
}
};
}

@Override
protected RefCache scanAllRefs() throws IOException {
RefList.Builder<Ref> ids = new RefList.Builder<Ref>();
RefList.Builder<Ref> sym = new RefList.Builder<Ref>();
for (Ref ref : refs.values()) {
if (ref.isSymbolic())
sym.add(ref);
ids.add(ref);
try {
lock.readLock().lock();
for (Ref ref : refs.values()) {
if (ref.isSymbolic())
sym.add(ref);
ids.add(ref);
}
} finally {
lock.readLock().unlock();
}
ids.sort();
sym.sort();
return new RefCache(ids.toRefList(), sym.toRefList());
}

private void batch(RevWalk walk, List<ReceiveCommand> cmds) {
// Validate that the target exists in a new RevWalk, as the RevWalk
// from the RefUpdate might be reading back unflushed objects.
Map<ObjectId, ObjectId> peeled = new HashMap<>();
try (RevWalk rw = new RevWalk(getRepository())) {
for (ReceiveCommand c : cmds) {
if (!ObjectId.zeroId().equals(c.getNewId())) {
try {
RevObject o = rw.parseAny(c.getNewId());
if (o instanceof RevTag) {
peeled.put(o, rw.peel(o).copy());
}
} catch (IOException e) {
c.setResult(ReceiveCommand.Result.REJECTED_MISSING_OBJECT);
reject(cmds);
return;
}
}
}
}

// Check all references conform to expected old value.
for (ReceiveCommand c : cmds) {
Ref r = refs.get(c.getRefName());
if (r == null) {
if (c.getType() != ReceiveCommand.Type.CREATE) {
c.setResult(ReceiveCommand.Result.LOCK_FAILURE);
reject(cmds);
return;
}
} else if (r.isSymbolic() || r.getObjectId() == null
|| !r.getObjectId().equals(c.getOldId())) {
c.setResult(ReceiveCommand.Result.LOCK_FAILURE);
reject(cmds);
return;
}
}

// Write references.
for (ReceiveCommand c : cmds) {
if (c.getType() == ReceiveCommand.Type.DELETE) {
refs.remove(c.getRefName());
c.setResult(ReceiveCommand.Result.OK);
continue;
}

ObjectId p = peeled.get(c.getNewId());
Ref r;
if (p != null) {
r = new ObjectIdRef.PeeledTag(Storage.PACKED,
c.getRefName(), c.getNewId(), p);
} else {
r = new ObjectIdRef.PeeledNonTag(Storage.PACKED,
c.getRefName(), c.getNewId());
}
refs.put(r.getName(), r);
c.setResult(ReceiveCommand.Result.OK);
}
clearCache();
}

private void reject(List<ReceiveCommand> cmds) {
for (ReceiveCommand c : cmds) {
if (c.getResult() == ReceiveCommand.Result.NOT_ATTEMPTED) {
c.setResult(ReceiveCommand.Result.REJECTED_OTHER_REASON,
JGitText.get().transactionAborted);
}
}
}

@Override
protected boolean compareAndPut(Ref oldRef, Ref newRef)
throws IOException {
ObjectId id = newRef.getObjectId();
if (id != null) {
try (RevWalk rw = new RevWalk(getRepository())) {
// Validate that the target exists in a new RevWalk, as the RevWalk
// from the RefUpdate might be reading back unflushed objects.
rw.parseAny(id);
try {
lock.writeLock().lock();
ObjectId id = newRef.getObjectId();
if (id != null) {
try (RevWalk rw = new RevWalk(getRepository())) {
// Validate that the target exists in a new RevWalk, as the RevWalk
// from the RefUpdate might be reading back unflushed objects.
rw.parseAny(id);
}
}
}
String name = newRef.getName();
if (oldRef == null)
return refs.putIfAbsent(name, newRef) == null;
String name = newRef.getName();
if (oldRef == null)
return refs.putIfAbsent(name, newRef) == null;

synchronized (refs) {
Ref cur = refs.get(name);
Ref toCompare = cur;
if (toCompare != null) {
@@ -294,22 +415,29 @@ public class InMemoryRepository extends DfsRepository {
if (eq(toCompare, oldRef))
return refs.replace(name, cur, newRef);
}
}

if (oldRef.getStorage() == Storage.NEW)
return refs.putIfAbsent(name, newRef) == null;
if (oldRef.getStorage() == Storage.NEW)
return refs.putIfAbsent(name, newRef) == null;

return false;
return false;
} finally {
lock.writeLock().unlock();
}
}

@Override
protected boolean compareAndRemove(Ref oldRef) throws IOException {
String name = oldRef.getName();
Ref cur = refs.get(name);
if (cur != null && eq(cur, oldRef))
return refs.remove(name, cur);
else
return false;
try {
lock.writeLock().lock();
String name = oldRef.getName();
Ref cur = refs.get(name);
if (cur != null && eq(cur, oldRef))
return refs.remove(name, cur);
else
return false;
} finally {
lock.writeLock().unlock();
}
}

private boolean eq(Ref a, Ref b) {

+ 12
- 4
org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackPushConnection.java View File

@@ -44,6 +44,8 @@

package org.eclipse.jgit.transport;

import static org.eclipse.jgit.transport.GitProtocolConstants.CAPABILITY_ATOMIC;

import java.io.IOException;
import java.io.OutputStream;
import java.text.MessageFormat;
@@ -110,17 +112,15 @@ public abstract class BasePackPushConnection extends BasePackConnection implemen
public static final String CAPABILITY_SIDE_BAND_64K = GitProtocolConstants.CAPABILITY_SIDE_BAND_64K;

private final boolean thinPack;
private final boolean atomic;

private boolean capableAtomic;
private boolean capableDeleteRefs;

private boolean capableReport;

private boolean capableSideBand;

private boolean capableOfsDelta;

private boolean sentCommand;

private boolean writePack;

/** Time in milliseconds spent transferring the pack data. */
@@ -135,6 +135,7 @@ public abstract class BasePackPushConnection extends BasePackConnection implemen
public BasePackPushConnection(final PackTransport packTransport) {
super(packTransport);
thinPack = transport.isPushThin();
atomic = transport.isPushAtomic();
}

public void push(final ProgressMonitor monitor,
@@ -224,6 +225,11 @@ public abstract class BasePackPushConnection extends BasePackConnection implemen
private void writeCommands(final Collection<RemoteRefUpdate> refUpdates,
final ProgressMonitor monitor, OutputStream outputStream) throws IOException {
final String capabilities = enableCapabilities(monitor, outputStream);
if (atomic && !capableAtomic) {
throw new TransportException(uri,
JGitText.get().atomicPushNotSupported);
}

for (final RemoteRefUpdate rru : refUpdates) {
if (!capableDeleteRefs && rru.isDelete()) {
rru.setStatus(Status.REJECTED_NODELETE);
@@ -259,6 +265,8 @@ public abstract class BasePackPushConnection extends BasePackConnection implemen
private String enableCapabilities(final ProgressMonitor monitor,
OutputStream outputStream) {
final StringBuilder line = new StringBuilder();
if (atomic)
capableAtomic = wantCapability(line, CAPABILITY_ATOMIC);
capableReport = wantCapability(line, CAPABILITY_REPORT_STATUS);
capableDeleteRefs = wantCapability(line, CAPABILITY_DELETE_REFS);
capableOfsDelta = wantCapability(line, CAPABILITY_OFS_DELTA);

+ 21
- 2
org.eclipse.jgit/src/org/eclipse/jgit/transport/PushProcess.java View File

@@ -47,6 +47,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

@@ -183,6 +184,7 @@ class PushProcess {

private Map<String, RemoteRefUpdate> prepareRemoteUpdates()
throws TransportException {
boolean atomic = transport.isPushAtomic();
final Map<String, RemoteRefUpdate> result = new HashMap<String, RemoteRefUpdate>();
for (final RemoteRefUpdate rru : toPush.values()) {
final Ref advertisedRef = connection.getRef(rru.getRemoteName());
@@ -205,6 +207,9 @@ class PushProcess {
if (rru.isExpectingOldObjectId()
&& !rru.getExpectedOldObjectId().equals(advertisedOld)) {
rru.setStatus(Status.REJECTED_REMOTE_CHANGED);
if (atomic) {
return rejectAll();
}
continue;
}

@@ -236,14 +241,28 @@ class PushProcess {
JGitText.get().readingObjectsFromLocalRepositoryFailed, x.getMessage()), x);
}
rru.setFastForward(fastForward);
if (!fastForward && !rru.isForceUpdate())
if (!fastForward && !rru.isForceUpdate()) {
rru.setStatus(Status.REJECTED_NONFASTFORWARD);
else
if (atomic) {
return rejectAll();
}
} else {
result.put(rru.getRemoteName(), rru);
}
}
return result;
}

private Map<String, RemoteRefUpdate> rejectAll() {
for (RemoteRefUpdate rru : toPush.values()) {
if (rru.getStatus() == Status.NOT_ATTEMPTED) {
rru.setStatus(RemoteRefUpdate.Status.REJECTED_OTHER_REASON);
rru.setMessage(JGitText.get().transactionAborted);
}
}
return Collections.emptyMap();
}

private void modifyUpdatesForDryRun() {
for (final RemoteRefUpdate rru : toPush.values())
if (rru.getStatus() == Status.NOT_ATTEMPTED)

+ 28
- 0
org.eclipse.jgit/src/org/eclipse/jgit/transport/Transport.java View File

@@ -752,6 +752,9 @@ public abstract class Transport {
/** Should push produce thin-pack when sending objects to remote repository. */
private boolean pushThin = DEFAULT_PUSH_THIN;

/** Should push be all-or-nothing atomic behavior? */
private boolean pushAtomic;

/** Should push just check for operation result, not really push. */
private boolean dryRun;

@@ -969,6 +972,31 @@ public abstract class Transport {
this.pushThin = pushThin;
}

/**
* Default setting is false.
*
* @return true if push requires all-or-nothing atomic behavior.
* @since 4.2
*/
public boolean isPushAtomic() {
return pushAtomic;
}

/**
* Request atomic push (all references succeed, or none do).
* <p>
* Server must also support atomic push. If the server does not support the
* feature the push will abort without making changes.
*
* @param atomic
* true when push should be an all-or-nothing operation.
* @see PackTransport
* @since 4.2
*/
public void setPushAtomic(final boolean atomic) {
this.pushAtomic = atomic;
}

/**
* @return true if destination refs should be removed if they no longer
* exist at the source repository.

Loading…
Cancel
Save