From bd531eb998e233b8be27ef9944cae2c1148db5b6 Mon Sep 17 00:00:00 2001 From: "Shawn O. Pearce" Date: Tue, 15 Feb 2011 14:46:30 -0800 Subject: [PATCH] smart-http: Support progress in ReceivePack As PackParser supports a progress meter for the "Resolving deltas" phase of its work, we should export this to smart HTTP clients so they know the server is still working on their (large) upload. However this isn't as simple as just dropping in a binding for the SmartOutputStream to flush when its told to. We want to avoid spurious flushes triggered by the use of sideband, or the status report formatting in the send-pack/receive-pack protocol. Change-Id: Ibd88022a298c5fed0edb23dfaf2e90278807ba8b Signed-off-by: Shawn O. Pearce --- .../jgit/http/server/ReceivePackServlet.java | 7 ++- .../eclipse/jgit/transport/PacketLineOut.java | 17 +++++- .../eclipse/jgit/transport/ReceivePack.java | 58 +++++++++++-------- .../jgit/transport/SideBandOutputStream.java | 8 ++- 4 files changed, 61 insertions(+), 29 deletions(-) diff --git a/org.eclipse.jgit.http.server/src/org/eclipse/jgit/http/server/ReceivePackServlet.java b/org.eclipse.jgit.http.server/src/org/eclipse/jgit/http/server/ReceivePackServlet.java index a85e84b210..bb4ae19e78 100644 --- a/org.eclipse.jgit.http.server/src/org/eclipse/jgit/http/server/ReceivePackServlet.java +++ b/org.eclipse.jgit.http.server/src/org/eclipse/jgit/http/server/ReceivePackServlet.java @@ -112,7 +112,12 @@ class ReceivePackServlet extends HttpServlet { rp.setBiDirectionalPipe(false); rsp.setContentType(RSP_TYPE); - final SmartOutputStream out = new SmartOutputStream(req, rsp); + final SmartOutputStream out = new SmartOutputStream(req, rsp) { + @Override + public void flush() throws IOException { + doFlush(); + } + }; rp.receive(getInputStream(req), out, null); out.close(); diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/PacketLineOut.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/PacketLineOut.java index 51506b20aa..bb83dce48d 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/PacketLineOut.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/PacketLineOut.java @@ -65,6 +65,8 @@ public class PacketLineOut { private final byte[] lenbuffer; + private boolean flushOnEnd; + /** * Create a new packet line writer. * @@ -74,6 +76,18 @@ public class PacketLineOut { public PacketLineOut(final OutputStream outputStream) { out = outputStream; lenbuffer = new byte[5]; + flushOnEnd = true; + } + + /** + * Set the flush behavior during {@link #end()}. + * + * @param flushOnEnd + * if true, a flush-pkt written during {@link #end()} also + * flushes the underlying stream. + */ + public void setFlushOnEnd(boolean flushOnEnd) { + this.flushOnEnd = flushOnEnd; } /** @@ -121,7 +135,8 @@ public class PacketLineOut { public void end() throws IOException { formatLength(0); out.write(lenbuffer, 0, 4); - flush(); + if (flushOnEnd) + flush(); } /** diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/ReceivePack.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/ReceivePack.java index 5ea8501890..324e5604d6 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/ReceivePack.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/ReceivePack.java @@ -55,8 +55,6 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.io.Writer; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Collections; @@ -70,7 +68,6 @@ import org.eclipse.jgit.errors.MissingObjectException; import org.eclipse.jgit.errors.PackProtocolException; import org.eclipse.jgit.errors.UnpackException; import org.eclipse.jgit.lib.Config; -import org.eclipse.jgit.lib.Config.SectionParser; import org.eclipse.jgit.lib.Constants; import org.eclipse.jgit.lib.NullProgressMonitor; import org.eclipse.jgit.lib.ObjectId; @@ -81,6 +78,7 @@ import org.eclipse.jgit.lib.ProgressMonitor; import org.eclipse.jgit.lib.Ref; import org.eclipse.jgit.lib.RefUpdate; import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.lib.Config.SectionParser; import org.eclipse.jgit.revwalk.ObjectWalk; import org.eclipse.jgit.revwalk.RevBlob; import org.eclipse.jgit.revwalk.RevCommit; @@ -157,14 +155,12 @@ public class ReceivePack { private OutputStream rawOut; + private OutputStream msgOut; + private PacketLineIn pckIn; private PacketLineOut pckOut; - private Writer msgs; - - private SideBandOutputStream msgOut; - private PackParser parser; /** The refs we advertised as existing at the start of the connection. */ @@ -508,8 +504,8 @@ public class ReceivePack { advertiseError.append(what).append('\n'); } else { try { - if (msgs != null) - msgs.write("error: " + what + "\n"); + if (msgOut != null) + msgOut.write(Constants.encode("error: " + what + "\n")); } catch (IOException e) { // Ignore write failures. } @@ -528,8 +524,8 @@ public class ReceivePack { */ public void sendMessage(final String what) { try { - if (msgs != null) - msgs.write(what + "\n"); + if (msgOut != null) + msgOut.write(Constants.encode(what + "\n")); } catch (IOException e) { // Ignore write failures. } @@ -558,6 +554,7 @@ public class ReceivePack { try { rawIn = input; rawOut = output; + msgOut = messages; if (timeout > 0) { final Thread caller = Thread.currentThread(); @@ -572,8 +569,7 @@ public class ReceivePack { pckIn = new PacketLineIn(rawIn); pckOut = new PacketLineOut(rawOut); - if (messages != null) - msgs = new OutputStreamWriter(messages, Constants.CHARSET); + pckOut.setFlushOnEnd(false); enabledCapablities = new HashSet(); commands = new ArrayList(); @@ -582,11 +578,6 @@ public class ReceivePack { } finally { walk.release(); try { - if (pckOut != null) - pckOut.flush(); - if (msgs != null) - msgs.flush(); - if (sideBand) { // If we are using side band, we need to send a final // flush-pkt to tell the remote peer the side band is @@ -594,16 +585,31 @@ public class ReceivePack { // use the original output stream as rawOut is now the // side band data channel. // - new PacketLineOut(output).end(); + ((SideBandOutputStream) msgOut).flushBuffer(); + ((SideBandOutputStream) rawOut).flushBuffer(); + + PacketLineOut plo = new PacketLineOut(output); + plo.setFlushOnEnd(false); + plo.end(); + } + + if (biDirectionalPipe) { + // If this was a native git connection, flush the pipe for + // the caller. For smart HTTP we don't do this flush and + // instead let the higher level HTTP servlet code do it. + // + if (!sideBand && msgOut != null) + msgOut.flush(); + rawOut.flush(); } } finally { unlockPack(); timeoutIn = null; rawIn = null; rawOut = null; + msgOut = null; pckIn = null; pckOut = null; - msgs = null; refs = null; enabledCapablities = null; commands = null; @@ -619,9 +625,10 @@ public class ReceivePack { } private void service() throws IOException { - if (biDirectionalPipe) + if (biDirectionalPipe) { sendAdvertisedRefs(new PacketLineOutRefAdvertiser(pckOut)); - else + pckOut.flush(); + } else refs = refFilter.filter(db.getAllRefs()); if (advertiseError != null) return; @@ -658,10 +665,10 @@ public class ReceivePack { } }); pckOut.end(); - } else if (msgs != null) { + } else if (msgOut != null) { sendStatusReport(false, new Reporter() { void sendString(final String s) throws IOException { - msgs.write(s + "\n"); + msgOut.write(Constants.encode(s + "\n")); } }); } @@ -761,8 +768,9 @@ public class ReceivePack { rawOut = new SideBandOutputStream(CH_DATA, MAX_BUF, out); msgOut = new SideBandOutputStream(CH_PROGRESS, MAX_BUF, out); + pckOut = new PacketLineOut(rawOut); - msgs = new OutputStreamWriter(msgOut, Constants.CHARSET); + pckOut.setFlushOnEnd(false); } } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandOutputStream.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandOutputStream.java index 31acdcde5b..8d9b3e005b 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandOutputStream.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandOutputStream.java @@ -109,10 +109,14 @@ class SideBandOutputStream extends OutputStream { cnt = HDR_SIZE; } - @Override - public void flush() throws IOException { + void flushBuffer() throws IOException { if (HDR_SIZE < cnt) writeBuffer(); + } + + @Override + public void flush() throws IOException { + flushBuffer(); out.flush(); } -- 2.39.5