/* * Copyright (C) 2008, 2010 Google Inc. * Copyright (C) 2008, Robin Rosenberg * Copyright (C) 2008, 2022 Shawn O. Pearce and others * * This program and the accompanying materials are made available under the * terms of the Eclipse Distribution License v. 1.0 which is available at * https://www.eclipse.org/org/documents/edl-v10.php. * * SPDX-License-Identifier: BSD-3-Clause */ package org.eclipse.jgit.transport; import static org.eclipse.jgit.transport.GitProtocolConstants.PACKET_DELIM; import static org.eclipse.jgit.transport.GitProtocolConstants.PACKET_DEEPEN; import static org.eclipse.jgit.transport.GitProtocolConstants.PACKET_DEEPEN_NOT; import static org.eclipse.jgit.transport.GitProtocolConstants.PACKET_DEEPEN_SINCE; import static org.eclipse.jgit.transport.GitProtocolConstants.PACKET_DONE; import static org.eclipse.jgit.transport.GitProtocolConstants.PACKET_END; import static org.eclipse.jgit.transport.GitProtocolConstants.PACKET_ERR; import static org.eclipse.jgit.transport.GitProtocolConstants.PACKET_HAVE; import static org.eclipse.jgit.transport.GitProtocolConstants.PACKET_SHALLOW; import static org.eclipse.jgit.transport.GitProtocolConstants.PACKET_UNSHALLOW; import static org.eclipse.jgit.transport.GitProtocolConstants.PACKET_WANT; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.text.MessageFormat; import java.time.Instant; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; import org.eclipse.jgit.errors.PackProtocolException; import org.eclipse.jgit.errors.RemoteRepositoryException; import org.eclipse.jgit.errors.TransportException; import org.eclipse.jgit.internal.JGitText; import org.eclipse.jgit.lib.AnyObjectId; import org.eclipse.jgit.lib.Config; import org.eclipse.jgit.lib.MutableObjectId; import org.eclipse.jgit.lib.NullProgressMonitor; import org.eclipse.jgit.lib.ObjectDatabase; import org.eclipse.jgit.lib.ObjectId; import org.eclipse.jgit.lib.ObjectInserter; import org.eclipse.jgit.lib.ProgressMonitor; import org.eclipse.jgit.lib.Ref; import org.eclipse.jgit.revwalk.RevCommit; import org.eclipse.jgit.revwalk.RevCommitList; import org.eclipse.jgit.revwalk.RevFlag; import org.eclipse.jgit.revwalk.RevObject; import org.eclipse.jgit.revwalk.RevSort; import org.eclipse.jgit.revwalk.RevWalk; import org.eclipse.jgit.revwalk.filter.CommitTimeRevFilter; import org.eclipse.jgit.revwalk.filter.RevFilter; import org.eclipse.jgit.transport.GitProtocolConstants.MultiAck; import org.eclipse.jgit.transport.PacketLineIn.AckNackResult; import org.eclipse.jgit.util.StringUtils; import org.eclipse.jgit.util.TemporaryBuffer; /** * Fetch implementation using the native Git pack transfer service. *

* This is the canonical implementation for transferring objects from the remote * repository to the local repository by talking to the 'git-upload-pack' * service. Objects are packed on the remote side into a pack file and then sent * down the pipe to us. *

* This connection requires only a bi-directional pipe or socket, and thus is * easily wrapped up into a local process pipe, anonymous TCP socket, or a * command executed through an SSH tunnel. *

* If {@link org.eclipse.jgit.transport.BasePackConnection#statelessRPC} is * {@code true}, this connection can be tunneled over a request-response style * RPC system like HTTP. The RPC call boundary is determined by this class * switching from writing to the OutputStream to reading from the InputStream. *

* Concrete implementations should just call * {@link #init(java.io.InputStream, java.io.OutputStream)} and * {@link #readAdvertisedRefs()} methods in constructor or before any use. They * should also handle resources releasing in {@link #close()} method if needed. */ public abstract class BasePackFetchConnection extends BasePackConnection implements FetchConnection { /** * Maximum number of 'have' lines to send before giving up. *

* During {@link #negotiate(ProgressMonitor, boolean, Set)} we send at most this many * commits to the remote peer as 'have' lines without an ACK response before * we give up. */ private static final int MAX_HAVES = 256; /** * Amount of data the client sends before starting to read. *

* Any output stream given to the client must be able to buffer this many * bytes before the client will stop writing and start reading from the * input stream. If the output stream blocks before this many bytes are in * the send queue, the system will deadlock. */ protected static final int MIN_CLIENT_BUFFER = 2 * 32 * 46 + 8; /** * Include tags if we are also including the referenced objects. * @since 2.0 */ public static final String OPTION_INCLUDE_TAG = GitProtocolConstants.OPTION_INCLUDE_TAG; /** * Multi-ACK support for improved negotiation. * @since 2.0 */ public static final String OPTION_MULTI_ACK = GitProtocolConstants.OPTION_MULTI_ACK; /** * Multi-ACK detailed support for improved negotiation. * @since 2.0 */ public static final String OPTION_MULTI_ACK_DETAILED = GitProtocolConstants.OPTION_MULTI_ACK_DETAILED; /** * The client supports packs with deltas but not their bases. * @since 2.0 */ public static final String OPTION_THIN_PACK = GitProtocolConstants.OPTION_THIN_PACK; /** * The client supports using the side-band for progress messages. * @since 2.0 */ public static final String OPTION_SIDE_BAND = GitProtocolConstants.OPTION_SIDE_BAND; /** * The client supports using the 64K side-band for progress messages. * @since 2.0 */ public static final String OPTION_SIDE_BAND_64K = GitProtocolConstants.OPTION_SIDE_BAND_64K; /** * The client supports packs with OFS deltas. * @since 2.0 */ public static final String OPTION_OFS_DELTA = GitProtocolConstants.OPTION_OFS_DELTA; /** * The client supports shallow fetches. * @since 2.0 */ public static final String OPTION_SHALLOW = GitProtocolConstants.OPTION_SHALLOW; /** * The client does not want progress messages and will ignore them. * @since 2.0 */ public static final String OPTION_NO_PROGRESS = GitProtocolConstants.OPTION_NO_PROGRESS; /** * The client supports receiving a pack before it has sent "done". * @since 2.0 */ public static final String OPTION_NO_DONE = GitProtocolConstants.OPTION_NO_DONE; /** * The client supports fetching objects at the tip of any ref, even if not * advertised. * @since 3.1 */ public static final String OPTION_ALLOW_TIP_SHA1_IN_WANT = GitProtocolConstants.OPTION_ALLOW_TIP_SHA1_IN_WANT; /** * The client supports fetching objects that are reachable from a tip of a * ref that is allowed to fetch. * @since 4.1 */ public static final String OPTION_ALLOW_REACHABLE_SHA1_IN_WANT = GitProtocolConstants.OPTION_ALLOW_REACHABLE_SHA1_IN_WANT; /** * The client specified a filter expression. * * @since 5.0 */ public static final String OPTION_FILTER = GitProtocolConstants.OPTION_FILTER; private final RevWalk walk; /** All commits that are immediately reachable by a local ref. */ private RevCommitList reachableCommits; /** Marks an object as having all its dependencies. */ final RevFlag REACHABLE; /** Marks a commit known to both sides of the connection. */ final RevFlag COMMON; /** Like {@link #COMMON} but means its also in {@link #pckState}. */ private final RevFlag STATE; /** Marks a commit listed in the advertised refs. */ final RevFlag ADVERTISED; private MultiAck multiAck = MultiAck.OFF; private boolean thinPack; private boolean sideband; private boolean includeTags; private boolean allowOfsDelta; private boolean useNegotiationTip; private boolean noDone; private boolean noProgress; private String lockMessage; private PackLock packLock; private int maxHaves; private Integer depth; private Instant deepenSince; private List deepenNots; /** * RPC state, if {@link BasePackConnection#statelessRPC} is true or protocol * V2 is used. */ private TemporaryBuffer.Heap state; private PacketLineOut pckState; /** * Either FilterSpec.NO_FILTER for a filter that doesn't filter * anything, or a filter that indicates what and what not to send to the * server. */ private final FilterSpec filterSpec; /** * Create a new connection to fetch using the native git transport. * * @param packTransport * the transport. */ public BasePackFetchConnection(PackTransport packTransport) { super(packTransport); if (local != null) { final FetchConfig cfg = getFetchConfig(); allowOfsDelta = cfg.allowOfsDelta; maxHaves = cfg.maxHaves; useNegotiationTip = cfg.useNegotiationTip; } else { allowOfsDelta = true; maxHaves = Integer.MAX_VALUE; useNegotiationTip = false; } includeTags = transport.getTagOpt() != TagOpt.NO_TAGS; thinPack = transport.isFetchThin(); filterSpec = transport.getFilterSpec(); depth = transport.getDepth(); deepenSince = transport.getDeepenSince(); deepenNots = transport.getDeepenNots(); if (local != null) { walk = new RevWalk(local); walk.setRetainBody(false); reachableCommits = new RevCommitList<>(); REACHABLE = walk.newFlag("REACHABLE"); //$NON-NLS-1$ COMMON = walk.newFlag("COMMON"); //$NON-NLS-1$ STATE = walk.newFlag("STATE"); //$NON-NLS-1$ ADVERTISED = walk.newFlag("ADVERTISED"); //$NON-NLS-1$ walk.carry(COMMON); walk.carry(REACHABLE); walk.carry(ADVERTISED); } else { walk = null; REACHABLE = null; COMMON = null; STATE = null; ADVERTISED = null; } } static class FetchConfig { final boolean allowOfsDelta; final int maxHaves; final boolean useNegotiationTip; FetchConfig(Config c) { allowOfsDelta = c.getBoolean("repack", "usedeltabaseoffset", true); //$NON-NLS-1$ //$NON-NLS-2$ maxHaves = c.getInt("fetch", "maxhaves", Integer.MAX_VALUE); //$NON-NLS-1$ //$NON-NLS-2$ useNegotiationTip = c.getBoolean("fetch", "usenegotiationtip", //$NON-NLS-1$ //$NON-NLS-2$ false); } FetchConfig(boolean allowOfsDelta, int maxHaves) { this(allowOfsDelta, maxHaves, false); } /** * @param allowOfsDelta * when true optimizes the pack size by deltafying base * object * @param maxHaves * max haves to be sent per negotiation * @param useNegotiationTip * if true uses the wanted refs instead of all refs as source * of the "have" list to send. * @since 6.6 */ FetchConfig(boolean allowOfsDelta, int maxHaves, boolean useNegotiationTip) { this.allowOfsDelta = allowOfsDelta; this.maxHaves = maxHaves; this.useNegotiationTip = useNegotiationTip; } } @Override public final void fetch(final ProgressMonitor monitor, final Collection want, final Set have) throws TransportException { fetch(monitor, want, have, null); } @Override public final void fetch(final ProgressMonitor monitor, final Collection want, final Set have, OutputStream outputStream) throws TransportException { markStartedOperation(); doFetch(monitor, want, have, outputStream); } @Override public boolean didFetchIncludeTags() { return false; } @Override public boolean didFetchTestConnectivity() { return false; } @Override public void setPackLockMessage(String message) { lockMessage = message; } @Override public Collection getPackLocks() { if (packLock != null) return Collections.singleton(packLock); return Collections. emptyList(); } private void clearState() { walk.dispose(); reachableCommits = null; state = null; pckState = null; } /** * Execute common ancestor negotiation and fetch the objects. * * @param monitor * progress monitor to receive status updates. If the monitor is * the {@link org.eclipse.jgit.lib.NullProgressMonitor#INSTANCE}, then the no-progress * option enabled. * @param want * the advertised remote references the caller wants to fetch. * @param have * additional objects to assume that already exist locally. This * will be added to the set of objects reachable from the * destination repository's references. * @param outputStream * ouputStream to write sideband messages to * @throws org.eclipse.jgit.errors.TransportException * if any exception occurs. * @since 3.0 */ protected void doFetch(final ProgressMonitor monitor, final Collection want, final Set have, OutputStream outputStream) throws TransportException { boolean hasObjects = !have.isEmpty(); try { noProgress = monitor == NullProgressMonitor.INSTANCE; if (hasObjects) { markRefsAdvertised(); } markReachable(want, have, maxTimeWanted(want, hasObjects)); if (TransferConfig.ProtocolVersion.V2 .equals(getProtocolVersion())) { // Protocol V2 always is a "stateless" protocol, even over a // bidirectional pipe: the server serves one "fetch" request and // then forgets anything it has learned, so the next fetch // request has to re-send all wants and previously determined // common objects as "have"s again. state = new TemporaryBuffer.Heap(Integer.MAX_VALUE); pckState = new PacketLineOut(state); try { doFetchV2(monitor, want, outputStream, hasObjects); } finally { clearState(); } return; } // Protocol V0/1 if (statelessRPC) { state = new TemporaryBuffer.Heap(Integer.MAX_VALUE); pckState = new PacketLineOut(state); } PacketLineOut output = statelessRPC ? pckState : pckOut; if (sendWants(want, output, hasObjects)) { boolean mayHaveShallow = depth != null || deepenSince != null || !deepenNots.isEmpty(); Set shallowCommits = local.getObjectDatabase().getShallowCommits(); if (isCapableOf(GitProtocolConstants.CAPABILITY_SHALLOW)) { sendShallow(shallowCommits, output); } else if (mayHaveShallow) { throw new PackProtocolException(JGitText.get().shallowNotSupported); } output.end(); outNeedsEnd = false; negotiate(monitor, mayHaveShallow, shallowCommits); clearState(); receivePack(monitor, outputStream); } } catch (CancelledException ce) { close(); return; // Caller should test (or just know) this themselves. } catch (IOException | RuntimeException err) { close(); throw new TransportException(err.getMessage(), err); } } private void doFetchV2(ProgressMonitor monitor, Collection want, OutputStream outputStream, boolean hasObjects) throws IOException, CancelledException { sideband = true; negotiateBegin(); pckState.writeString("command=" + GitProtocolConstants.COMMAND_FETCH); //$NON-NLS-1$ // Capabilities are sent as command arguments in protocol V2 String agent = UserAgent.get(); if (agent != null && isCapableOf(GitProtocolConstants.OPTION_AGENT)) { pckState.writeString( GitProtocolConstants.OPTION_AGENT + '=' + agent); } Set capabilities = new HashSet<>(); String advertised = getCapability(GitProtocolConstants.COMMAND_FETCH); if (!StringUtils.isEmptyOrNull(advertised)) { capabilities.addAll(Arrays.asList(advertised.split("\\s+"))); //$NON-NLS-1$ } // Arguments pckState.writeDelim(); for (String capability : getCapabilitiesV2(capabilities)) { pckState.writeString(capability); } if (!sendWants(want, pckState, hasObjects)) { // We already have everything we wanted. return; } Set shallowCommits = local.getObjectDatabase().getShallowCommits(); if (capabilities.contains(GitProtocolConstants.CAPABILITY_SHALLOW)) { sendShallow(shallowCommits, pckState); } else if (depth != null || deepenSince != null || !deepenNots.isEmpty()) { throw new PackProtocolException(JGitText.get().shallowNotSupported); } // If we send something, we always close it properly ourselves. outNeedsEnd = false; FetchStateV2 fetchState = new FetchStateV2(); boolean sentDone = false; for (;;) { // The "state" buffer contains the full fetch request with all // common objects found so far. state.writeTo(out, monitor); sentDone = sendNextHaveBatch(fetchState, pckOut, monitor); if (sentDone) { break; } if (readAcknowledgments(fetchState, pckIn, monitor)) { // We got a "ready": next should be a patch file. break; } // Note: C git reads and requires here (and after a packfile) a // "0002" packet in stateless RPC transports (https). This "response // end" packet is even mentioned in the protocol V2 technical // documentation. However, it is not actually part of the public // protocol; it occurs only in an internal protocol wrapper in the C // git implementation. } clearState(); String line = pckIn.readString(); // If we sent a done, we may have an error reply here. if (sentDone && line.startsWith(PACKET_ERR)) { throw new RemoteRepositoryException(uri, line.substring(4)); } if (GitProtocolConstants.SECTION_SHALLOW_INFO.equals(line)) { line = handleShallowUnshallow(shallowCommits, pckIn); if (!PacketLineIn.isDelimiter(line)) { throw new PackProtocolException(MessageFormat .format(JGitText.get().expectedGot, PACKET_DELIM, line)); } line = pckIn.readString(); } // "wanted-refs" and "packfile-uris" would have to be // handled here in that order. if (!GitProtocolConstants.SECTION_PACKFILE.equals(line)) { throw new PackProtocolException( MessageFormat.format(JGitText.get().expectedGot, GitProtocolConstants.SECTION_PACKFILE, line)); } receivePack(monitor, outputStream); } /** * Sends the next batch of "have"s and terminates the {@code output}. * * @param fetchState * is updated with information about the number of items written, * and whether to expect a packfile next * @param output * to write to * @param monitor * for progress reporting and cancellation * @return {@code true} if a "done" was written and we should thus expect a * packfile next * @throws IOException * on errors * @throws CancelledException * on cancellation */ private boolean sendNextHaveBatch(FetchStateV2 fetchState, PacketLineOut output, ProgressMonitor monitor) throws IOException, CancelledException { long n = 0; while (n < fetchState.havesToSend) { final RevCommit c = walk.next(); if (c == null) { break; } output.writeString(PACKET_HAVE + c.getId().name() + '\n'); n++; if (n % 10 == 0 && monitor.isCancelled()) { throw new CancelledException(); } } fetchState.havesTotal += n; if (n == 0 || (fetchState.hadAcks && fetchState.havesWithoutAck > MAX_HAVES) || fetchState.havesTotal > maxHaves) { output.writeString(PACKET_DONE + '\n'); output.end(); return true; } // Increment only after the test above. Of course we have no ACKs yet // for the newly added "have"s, so it makes no sense to count them // against the MAX_HAVES limit. fetchState.havesWithoutAck += n; output.end(); fetchState.incHavesToSend(statelessRPC); return false; } /** * Reads and processes acknowledgments, adding ACKed objects as "have"s to * the global state {@link TemporaryBuffer}. * * @param fetchState * to update * @param input * to read from * @param monitor * for progress reporting and cancellation * @return {@code true} if a "ready" was received and a packfile is expected * next * @throws IOException * on errors * @throws CancelledException * on cancellation */ private boolean readAcknowledgments(FetchStateV2 fetchState, PacketLineIn input, ProgressMonitor monitor) throws IOException, CancelledException { String line = input.readString(); if (!GitProtocolConstants.SECTION_ACKNOWLEDGMENTS.equals(line)) { throw new PackProtocolException(MessageFormat.format( JGitText.get().expectedGot, GitProtocolConstants.SECTION_ACKNOWLEDGMENTS, line)); } MutableObjectId returnedId = new MutableObjectId(); line = input.readString(); boolean gotReady = false; long n = 0; while (!PacketLineIn.isEnd(line) && !PacketLineIn.isDelimiter(line)) { AckNackResult ack = PacketLineIn.parseACKv2(line, returnedId); // If we got a "ready", we just skip the remaining lines after // having checked them for being valid. (Normally, the "ready" // should be the last line anyway.) if (!gotReady) { if (ack == AckNackResult.ACK_COMMON) { // markCommon appends the object to the "state" markCommon(walk.parseAny(returnedId), ack, true); fetchState.havesWithoutAck = 0; fetchState.hadAcks = true; } else if (ack == AckNackResult.ACK_READY) { gotReady = true; } } n++; if (n % 10 == 0 && monitor.isCancelled()) { throw new CancelledException(); } line = input.readString(); } if (gotReady) { if (!PacketLineIn.isDelimiter(line)) { throw new PackProtocolException(MessageFormat .format(JGitText.get().expectedGot, PACKET_DELIM, line)); } } else if (!PacketLineIn.isEnd(line)) { throw new PackProtocolException(MessageFormat .format(JGitText.get().expectedGot, PACKET_END, line)); } return gotReady; } @Override public void close() { if (walk != null) walk.close(); super.close(); } FetchConfig getFetchConfig() { return local.getConfig().get(FetchConfig::new); } private int maxTimeWanted(Collection wants, boolean hasObjects) { int maxTime = 0; if (!hasObjects) { // we don't have any objects locally, we can immediately bail out return maxTime; } for (Ref r : wants) { try { final RevObject obj = walk.parseAny(r.getObjectId()); if (obj instanceof RevCommit) { final int cTime = ((RevCommit) obj).getCommitTime(); if (maxTime < cTime) maxTime = cTime; } } catch (IOException error) { // We don't have it, but we want to fetch (thus fixing error). } } return maxTime; } private void markReachable(Collection want, Set have, int maxTime) throws IOException { Collection refsToMark; if (useNegotiationTip) { refsToMark = translateToLocalTips(want); if (refsToMark.size() < want.size()) { refsToMark.addAll(local.getRefDatabase().getRefs()); } } else { refsToMark = local.getRefDatabase().getRefs(); } markReachableRefTips(refsToMark); for (ObjectId id : local.getAdditionalHaves()) markReachable(id); for (ObjectId id : have) markReachable(id); if (maxTime > 0) { // Mark reachable commits until we reach maxTime. These may // wind up later matching up against things we want and we // can avoid asking for something we already happen to have. // final Date maxWhen = new Date(maxTime * 1000L); walk.sort(RevSort.COMMIT_TIME_DESC); walk.markStart(reachableCommits); walk.setRevFilter(CommitTimeRevFilter.after(maxWhen)); for (;;) { final RevCommit c = walk.next(); if (c == null) break; if (c.has(ADVERTISED) && !c.has(COMMON)) { // This is actually going to be a common commit, but // our peer doesn't know that fact yet. // c.add(COMMON); c.carry(COMMON); reachableCommits.add(c); } } } } private Collection translateToLocalTips(Collection want) throws IOException { String[] refs = want.stream().map(Ref::getName) .collect(Collectors.toSet()).toArray(String[]::new); Map wantRefMap = local.getRefDatabase().exactRef(refs); return wantRefMap.values().stream() .filter(r -> getRefObjectId(r) != null) .collect(Collectors.toList()); } /** * Marks commits reachable. * * @param refsToMark * references that client is requesting to be marked. */ private void markReachableRefTips(Collection refsToMark) { refsToMark.stream().map(BasePackFetchConnection::getRefObjectId) .filter(Objects::nonNull) .forEach(oid -> markReachable(oid)); } private static ObjectId getRefObjectId(Ref ref) { ObjectId id = ref.getPeeledObjectId(); if (id == null) { id = ref.getObjectId(); } return id; } private void markReachable(ObjectId id) { try { RevCommit o = walk.parseCommit(id); if (!o.has(REACHABLE)) { o.add(REACHABLE); reachableCommits.add(o); } } catch (IOException readError) { // If we cannot read the value of the ref skip it. } } private boolean sendWants(Collection want, PacketLineOut p, boolean hasObjects) throws IOException { boolean first = true; for (Ref r : want) { ObjectId objectId = r.getObjectId(); if (objectId == null) { continue; } // if depth is set we need to fetch the objects even if they are already available if (transport.getDepth() == null // only check reachable objects when we have objects && hasObjects) { try { if (walk.parseAny(objectId).has(REACHABLE)) { // We already have this object. Asking for it is // not a very good idea. // continue; } } catch (IOException err) { // Its OK, we don't have it, but we want to fix that // by fetching the object from the other side. } } final StringBuilder line = new StringBuilder(46); line.append(PACKET_WANT).append(objectId.name()); if (first && TransferConfig.ProtocolVersion.V0 .equals(getProtocolVersion())) { line.append(enableCapabilities()); } first = false; line.append('\n'); p.writeString(line.toString()); } if (first) { return false; } if (!filterSpec.isNoOp()) { p.writeString(filterSpec.filterLine()); } return true; } private Set getCapabilitiesV2(Set advertisedCapabilities) throws TransportException { Set capabilities = new LinkedHashSet<>(); // Protocol V2 is implicitly capable of all these. if (noProgress) { capabilities.add(OPTION_NO_PROGRESS); } if (includeTags) { capabilities.add(OPTION_INCLUDE_TAG); } if (allowOfsDelta) { capabilities.add(OPTION_OFS_DELTA); } if (thinPack) { capabilities.add(OPTION_THIN_PACK); } if (!filterSpec.isNoOp() && !advertisedCapabilities.contains(OPTION_FILTER)) { throw new PackProtocolException(uri, JGitText.get().filterRequiresCapability); } // The FilterSpec will be added later in sendWants(). return capabilities; } private String enableCapabilities() throws TransportException { final StringBuilder line = new StringBuilder(); if (noProgress) wantCapability(line, OPTION_NO_PROGRESS); if (includeTags) includeTags = wantCapability(line, OPTION_INCLUDE_TAG); if (allowOfsDelta) wantCapability(line, OPTION_OFS_DELTA); if (wantCapability(line, OPTION_MULTI_ACK_DETAILED)) { multiAck = MultiAck.DETAILED; if (statelessRPC) noDone = wantCapability(line, OPTION_NO_DONE); } else if (wantCapability(line, OPTION_MULTI_ACK)) multiAck = MultiAck.CONTINUE; else multiAck = MultiAck.OFF; if (thinPack) thinPack = wantCapability(line, OPTION_THIN_PACK); if (wantCapability(line, OPTION_SIDE_BAND_64K)) sideband = true; else if (wantCapability(line, OPTION_SIDE_BAND)) sideband = true; if (statelessRPC && multiAck != MultiAck.DETAILED) { // Our stateless RPC implementation relies upon the detailed // ACK status to tell us common objects for reuse in future // requests. If its not enabled, we can't talk to the peer. // throw new PackProtocolException(uri, MessageFormat.format( JGitText.get().statelessRPCRequiresOptionToBeEnabled, OPTION_MULTI_ACK_DETAILED)); } if (!filterSpec.isNoOp() && !wantCapability(line, OPTION_FILTER)) { throw new PackProtocolException(uri, JGitText.get().filterRequiresCapability); } addUserAgentCapability(line); return line.toString(); } private void negotiate(ProgressMonitor monitor, boolean mayHaveShallow, Set shallowCommits) throws IOException, CancelledException { final MutableObjectId ackId = new MutableObjectId(); int resultsPending = 0; int havesSent = 0; int havesSinceLastContinue = 0; boolean receivedContinue = false; boolean receivedAck = false; boolean receivedReady = false; if (statelessRPC) { state.writeTo(out, null); } negotiateBegin(); SEND_HAVES: for (;;) { final RevCommit c = walk.next(); if (c == null) { break SEND_HAVES; } ObjectId o = c.getId(); pckOut.writeString(PACKET_HAVE + o.name() + '\n'); havesSent++; havesSinceLastContinue++; if ((31 & havesSent) != 0) { // We group the have lines into blocks of 32, each marked // with a flush (aka end). This one is within a block so // continue with another have line. // continue; } if (monitor.isCancelled()) { throw new CancelledException(); } pckOut.end(); resultsPending++; // Each end will cause a result to come back. if (havesSent == 32 && !statelessRPC) { // On the first block we race ahead and try to send // more of the second block while waiting for the // remote to respond to our first block request. // This keeps us one block ahead of the peer. // continue; } READ_RESULT: for (;;) { final AckNackResult anr = pckIn.readACK(ackId); switch (anr) { case NAK: // More have lines are necessary to compute the // pack on the remote side. Keep doing that. // resultsPending--; break READ_RESULT; case ACK: // The remote side is happy and knows exactly what // to send us. There is no further negotiation and // we can break out immediately. // multiAck = MultiAck.OFF; resultsPending = 0; receivedAck = true; if (statelessRPC) { state.writeTo(out, null); } break SEND_HAVES; case ACK_CONTINUE: case ACK_COMMON: case ACK_READY: // The server knows this commit (ackId). We don't // need to send any further along its ancestry, but // we need to continue to talk about other parts of // our local history. // markCommon(walk.parseAny(ackId), anr, statelessRPC); receivedAck = true; receivedContinue = true; havesSinceLastContinue = 0; if (anr == AckNackResult.ACK_READY) { receivedReady = true; } break; } if (monitor.isCancelled()) { throw new CancelledException(); } } if (noDone && receivedReady) { break SEND_HAVES; } if (statelessRPC) { state.writeTo(out, null); } if ((receivedContinue && havesSinceLastContinue > MAX_HAVES) || havesSent >= maxHaves) { // Our history must be really different from the remote's. // We just sent a whole slew of have lines, and it did not // recognize any of them. Avoid sending our entire history // to them by giving up early. // break SEND_HAVES; } } // Tell the remote side we have run out of things to talk about. // if (monitor.isCancelled()) { throw new CancelledException(); } if (!receivedReady || !noDone) { // When statelessRPC is true we should always leave SEND_HAVES // loop above while in the middle of a request. This allows us // to just write done immediately. // pckOut.writeString(PACKET_DONE + '\n'); pckOut.flush(); } if (!receivedAck) { // Apparently if we have never received an ACK earlier // there is one more result expected from the done we // just sent to the remote. // multiAck = MultiAck.OFF; resultsPending++; } if (mayHaveShallow) { String line = handleShallowUnshallow(shallowCommits, pckIn); if (!PacketLineIn.isEnd(line)) { throw new PackProtocolException(MessageFormat .format(JGitText.get().expectedGot, PACKET_END, line)); } } READ_RESULT: while (resultsPending > 0 || multiAck != MultiAck.OFF) { final AckNackResult anr = pckIn.readACK(ackId); resultsPending--; switch (anr) { case NAK: // A NAK is a response to an end we queued earlier // we eat it and look for another ACK/NAK message. // break; case ACK: // A solitary ACK at this point means the remote won't // speak anymore, but is going to send us a pack now. // break READ_RESULT; case ACK_CONTINUE: case ACK_COMMON: case ACK_READY: // We will expect a normal ACK to break out of the loop. // multiAck = MultiAck.CONTINUE; break; } if (monitor.isCancelled()) { throw new CancelledException(); } } } private void negotiateBegin() throws IOException { walk.resetRetain(REACHABLE, ADVERTISED); walk.markStart(reachableCommits); walk.sort(RevSort.COMMIT_TIME_DESC); walk.setRevFilter(new RevFilter() { @Override public RevFilter clone() { return this; } @Override public boolean include(RevWalk walker, RevCommit c) { final boolean remoteKnowsIsCommon = c.has(COMMON); if (c.has(ADVERTISED)) { // Remote advertised this, and we have it, hence common. // Whether or not the remote knows that fact is tested // before we added the flag. If the remote doesn't know // we have to still send them this object. // c.add(COMMON); } return !remoteKnowsIsCommon; } @Override public boolean requiresCommitBody() { return false; } }); } private void markRefsAdvertised() { for (Ref r : getRefs()) { markAdvertised(r.getObjectId()); if (r.getPeeledObjectId() != null) markAdvertised(r.getPeeledObjectId()); } } private void markAdvertised(AnyObjectId id) { try { walk.parseAny(id).add(ADVERTISED); } catch (IOException readError) { // We probably just do not have this object locally. } } private void markCommon(RevObject obj, AckNackResult anr, boolean useState) throws IOException { if (useState && anr == AckNackResult.ACK_COMMON && !obj.has(STATE)) { pckState.writeString(PACKET_HAVE + obj.name() + '\n'); obj.add(STATE); } obj.add(COMMON); if (obj instanceof RevCommit) ((RevCommit) obj).carry(COMMON); } private void receivePack(final ProgressMonitor monitor, OutputStream outputStream) throws IOException { onReceivePack(); InputStream input = in; SideBandInputStream sidebandIn = null; if (sideband) { sidebandIn = new SideBandInputStream(input, monitor, getMessageWriter(), outputStream); input = sidebandIn; } try (ObjectInserter ins = local.newObjectInserter()) { PackParser parser = ins.newPackParser(input); parser.setAllowThin(thinPack); parser.setObjectChecker(transport.getObjectChecker()); parser.setLockMessage(lockMessage); packLock = parser.parse(monitor); ins.flush(); } finally { if (sidebandIn != null) { sidebandIn.drainMessages(); } } } private void sendShallow(Set shallowCommits, PacketLineOut output) throws IOException { for (ObjectId shallowCommit : shallowCommits) { output.writeString(PACKET_SHALLOW + shallowCommit.name()); } if (depth != null) { output.writeString(PACKET_DEEPEN + depth); } if (deepenSince != null) { output.writeString( PACKET_DEEPEN_SINCE + deepenSince.getEpochSecond()); } if (deepenNots != null) { for (String deepenNotRef : deepenNots) { output.writeString(PACKET_DEEPEN_NOT + deepenNotRef); } } } private String handleShallowUnshallow( Set advertisedShallowCommits, PacketLineIn input) throws IOException { String line = input.readString(); ObjectDatabase objectDatabase = local.getObjectDatabase(); HashSet newShallowCommits = new HashSet<>( advertisedShallowCommits); while (!PacketLineIn.isDelimiter(line) && !PacketLineIn.isEnd(line)) { if (line.startsWith(PACKET_SHALLOW)) { newShallowCommits.add(ObjectId .fromString(line.substring(PACKET_SHALLOW.length()))); } else if (line.startsWith(PACKET_UNSHALLOW)) { ObjectId unshallow = ObjectId .fromString(line.substring(PACKET_UNSHALLOW.length())); if (!advertisedShallowCommits.contains(unshallow)) { throw new PackProtocolException(MessageFormat.format( JGitText.get().notShallowedUnshallow, unshallow.name())); } newShallowCommits.remove(unshallow); } line = input.readString(); } objectDatabase.setShallowCommits(newShallowCommits); return line; } /** * Notification event delivered just before the pack is received from the * network. This event can be used by RPC such as {@link org.eclipse.jgit.transport.TransportHttp} to * disable its request magic and ensure the pack stream is read correctly. * * @since 2.0 */ protected void onReceivePack() { // By default do nothing for TCP based protocols. } private static class CancelledException extends Exception { private static final long serialVersionUID = 1L; } private static class FetchStateV2 { long havesToSend = 32; long havesTotal; // Set to true if we got at least one ACK in protocol V2. boolean hadAcks; // Counts haves without ACK. Use as cutoff for negotiation only once // hadAcks == true. long havesWithoutAck; void incHavesToSend(boolean statelessRPC) { if (statelessRPC) { // Increase this quicker since connection setup costs accumulate if (havesToSend < 16384) { havesToSend *= 2; } else { havesToSend = havesToSend * 11 / 10; } } else { havesToSend += 32; } } } }