From a430180f6fee1073ac7952e5f0c6bd7a01ed6606 Mon Sep 17 00:00:00 2001
From: Johannes Dahlström <johannesd@vaadin.com>
Date: Wed, 24 Apr 2013 19:38:39 +0300
Subject: Split client-to-server messages when websockets are used (#11648)

* First part of a fragmented message prefixed by total message length plus '|'
* Atmosphere websocket buffer size set to 65536
* Fragment size is 65535 characters (Jetty requires less than buffer size)

Change-Id: I8bf872bbb03b86386070fdc99c14ea805dd2ef3a
---
 .../communication/AtmospherePushConnection.java    | 59 ++++++++++++-
 .../communication/AtmospherePushConnection.java    | 96 ++++++++++++++++++++--
 .../vaadin/server/communication/PushHandler.java   | 10 ++-
 .../server/communication/PushRequestHandler.java   | 15 +++-
 .../com/vaadin/shared/ApplicationConstants.java    |  4 +
 5 files changed, 171 insertions(+), 13 deletions(-)

diff --git a/client/src/com/vaadin/client/communication/AtmospherePushConnection.java b/client/src/com/vaadin/client/communication/AtmospherePushConnection.java
index f6b704ed9b..e42f91ea67 100644
--- a/client/src/com/vaadin/client/communication/AtmospherePushConnection.java
+++ b/client/src/com/vaadin/client/communication/AtmospherePushConnection.java
@@ -62,6 +62,45 @@ public class AtmospherePushConnection implements PushConnection {
         DISCONNECTED;
     }
 
+    /**
+     * Represents a message that should be sent as multiple fragments.
+     */
+    protected static class FragmentedMessage {
+
+        // Jetty requires length less than buffer size
+        private int FRAGMENT_LENGTH = ApplicationConstants.WEBSOCKET_BUFFER_SIZE - 1;
+
+        private String message;
+        private int index = 0;
+
+        public FragmentedMessage(String message) {
+            this.message = message;
+        }
+
+        public boolean hasNextFragment() {
+            return index < message.length();
+        }
+
+        public String getNextFragment() {
+            String result;
+            if (index == 0) {
+                String header = "" + message.length()
+                        + ApplicationConstants.WEBSOCKET_MESSAGE_DELIMITER;
+                int fragmentLen = FRAGMENT_LENGTH - header.length();
+                result = header + getFragment(0, fragmentLen);
+                index += fragmentLen;
+            } else {
+                result = getFragment(index, index + FRAGMENT_LENGTH);
+                index += FRAGMENT_LENGTH;
+            }
+            return result;
+        }
+
+        private String getFragment(int begin, int end) {
+            return message.substring(begin, Math.min(message.length(), end));
+        }
+    }
+
     private ApplicationConnection connection;
 
     private JavaScriptObject socket;
@@ -74,6 +113,8 @@ public class AtmospherePushConnection implements PushConnection {
 
     private String uri;
 
+    private String transport;
+
     /**
      * Keeps track of the disconnect confirmation command for cases where
      * pending messages should be pushed before actually disconnecting.
@@ -151,7 +192,15 @@ public class AtmospherePushConnection implements PushConnection {
         case CONNECTED:
             assert isActive();
             VConsole.log("Sending push message: " + message);
-            doPush(socket, message);
+
+            if (transport.equals("websocket")) {
+                FragmentedMessage fragmented = new FragmentedMessage(message);
+                while (fragmented.hasNextFragment()) {
+                    doPush(socket, fragmented.getNextFragment());
+                }
+            } else {
+                doPush(socket, message);
+            }
             break;
         case DISCONNECT_PENDING:
         case DISCONNECTED:
@@ -167,10 +216,12 @@ public class AtmospherePushConnection implements PushConnection {
     }
 
     protected void onOpen(AtmosphereResponse response) {
-        VConsole.log("Push connection established using "
-                + response.getTransport());
+        transport = response.getTransport();
+
+        VConsole.log("Push connection established using " + transport);
+
         for (String message : messageQueue) {
-            doPush(socket, message);
+            push(message);
         }
         messageQueue.clear();
 
diff --git a/server/src/com/vaadin/server/communication/AtmospherePushConnection.java b/server/src/com/vaadin/server/communication/AtmospherePushConnection.java
index bb1461b18e..0bba65ff1d 100644
--- a/server/src/com/vaadin/server/communication/AtmospherePushConnection.java
+++ b/server/src/com/vaadin/server/communication/AtmospherePushConnection.java
@@ -17,7 +17,9 @@
 package com.vaadin.server.communication;
 
 import java.io.IOException;
+import java.io.Reader;
 import java.io.Serializable;
+import java.io.StringReader;
 import java.io.StringWriter;
 import java.io.Writer;
 import java.util.concurrent.Future;
@@ -27,8 +29,10 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.atmosphere.cpr.AtmosphereResource;
+import org.atmosphere.cpr.AtmosphereResource.TRANSPORT;
 import org.json.JSONException;
 
+import com.vaadin.shared.ApplicationConstants;
 import com.vaadin.ui.UI;
 
 /**
@@ -40,9 +44,57 @@ import com.vaadin.ui.UI;
  */
 public class AtmospherePushConnection implements Serializable, PushConnection {
 
+    /**
+     * Represents a message that can arrive as multiple fragments.
+     */
+    protected static class FragmentedMessage {
+        private final StringBuilder message = new StringBuilder();
+        private final int messageLength;
+
+        public FragmentedMessage(Reader reader) throws IOException {
+            // Messages are prefixed by the total message length plus '|'
+            String length = "";
+            int c;
+            while ((c = reader.read()) != -1
+                    && c != ApplicationConstants.WEBSOCKET_MESSAGE_DELIMITER) {
+                length += (char) c;
+            }
+            try {
+                messageLength = Integer.parseInt(length);
+            } catch (NumberFormatException e) {
+                throw new IOException("Invalid message length " + length, e);
+            }
+        }
+
+        /**
+         * Appends all the data from the given Reader to this message and
+         * returns whether the message was completed.
+         * 
+         * @param reader
+         *            The Reader from which to read.
+         * @return true if this message is complete, false otherwise.
+         * @throws IOException
+         */
+        public boolean append(Reader reader) throws IOException {
+            char[] buffer = new char[ApplicationConstants.WEBSOCKET_BUFFER_SIZE];
+            int read;
+            while ((read = reader.read(buffer)) != -1) {
+                message.append(buffer, 0, read);
+                assert message.length() <= messageLength : "Received message "
+                        + message.length() + "chars, expected " + messageLength;
+            }
+            return message.length() == messageLength;
+        }
+
+        public Reader getReader() {
+            return new StringReader(message.toString());
+        }
+    }
+
     private UI ui;
     private transient AtmosphereResource resource;
-    private Future<String> lastMessage;
+    private transient Future<String> outgoingMessage;
+    private transient FragmentedMessage incomingMessage;
 
     public AtmospherePushConnection(UI ui) {
         this.ui = ui;
@@ -85,10 +137,44 @@ public class AtmospherePushConnection implements Serializable, PushConnection {
      */
     void sendMessage(String message) {
         // "Broadcast" the changes to the single client only
-        lastMessage = getResource().getBroadcaster().broadcast(message,
+        outgoingMessage = getResource().getBroadcaster().broadcast(message,
                 getResource());
     }
 
+    /**
+     * Reads and buffers a (possibly partial) message. If a complete message was
+     * received, or if the call resulted in the completion of a partially
+     * received message, returns a {@link Reader} yielding the complete message.
+     * Otherwise, returns null.
+     * 
+     * @param reader
+     *            A Reader from which to read the (partial) message
+     * @return A Reader yielding a complete message or null if the message is
+     *         not yet complete.
+     * @throws IOException
+     */
+    protected Reader receiveMessage(Reader reader) throws IOException {
+
+        if (resource.transport() != TRANSPORT.WEBSOCKET) {
+            return reader;
+        }
+
+        if (incomingMessage == null) {
+            // No existing partially received message
+            incomingMessage = new FragmentedMessage(reader);
+        }
+
+        if (incomingMessage.append(reader)) {
+            // Message is complete
+            Reader completeReader = incomingMessage.getReader();
+            incomingMessage = null;
+            return completeReader;
+        } else {
+            // Only received a partial message
+            return null;
+        }
+    }
+
     /**
      * Associates this connection with the given AtmosphereResource. If there is
      * a push pending, commits it.
@@ -128,11 +214,11 @@ public class AtmospherePushConnection implements Serializable, PushConnection {
 
     @Override
     public void disconnect() {
-        if (lastMessage != null) {
+        if (outgoingMessage != null) {
             // Wait for the last message to be sent before closing the
             // connection (assumes that futures are completed in order)
             try {
-                lastMessage.get(1000, TimeUnit.MILLISECONDS);
+                outgoingMessage.get(1000, TimeUnit.MILLISECONDS);
             } catch (TimeoutException e) {
                 getLogger()
                         .log(Level.INFO,
@@ -142,7 +228,7 @@ public class AtmospherePushConnection implements Serializable, PushConnection {
                         .log(Level.INFO,
                                 "Error waiting for messages to be sent to client before disconnect");
             }
-            lastMessage = null;
+            outgoingMessage = null;
         }
 
         resource.resume();
diff --git a/server/src/com/vaadin/server/communication/PushHandler.java b/server/src/com/vaadin/server/communication/PushHandler.java
index c2b0f36e2a..e740db410d 100644
--- a/server/src/com/vaadin/server/communication/PushHandler.java
+++ b/server/src/com/vaadin/server/communication/PushHandler.java
@@ -17,6 +17,7 @@
 package com.vaadin.server.communication;
 
 import java.io.IOException;
+import java.io.Reader;
 import java.io.Writer;
 import java.util.Arrays;
 import java.util.logging.Level;
@@ -129,13 +130,18 @@ public class PushHandler implements AtmosphereHandler {
                     + "connection is kept open or if the UI has a "
                     + "connection of unexpected type.";
 
+            Reader reader = connection.receiveMessage(req.getReader());
+            if (reader == null) {
+                // The whole message was not yet received
+                return;
+            }
+
             // Should be set up by caller
             VaadinRequest vaadinRequest = VaadinService.getCurrentRequest();
             assert vaadinRequest != null;
 
             try {
-                new ServerRpcHandler().handleRpc(ui, req.getReader(),
-                        vaadinRequest);
+                new ServerRpcHandler().handleRpc(ui, reader, vaadinRequest);
                 connection.push(false);
             } catch (JSONException e) {
                 getLogger().log(Level.SEVERE, "Error writing JSON to response",
diff --git a/server/src/com/vaadin/server/communication/PushRequestHandler.java b/server/src/com/vaadin/server/communication/PushRequestHandler.java
index 575ec4a86f..8360e08af9 100644
--- a/server/src/com/vaadin/server/communication/PushRequestHandler.java
+++ b/server/src/com/vaadin/server/communication/PushRequestHandler.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import javax.servlet.ServletException;
 
 import org.atmosphere.client.TrackMessageSizeInterceptor;
+import org.atmosphere.cpr.ApplicationConfig;
 import org.atmosphere.cpr.AtmosphereFramework;
 import org.atmosphere.cpr.AtmosphereRequest;
 import org.atmosphere.cpr.AtmosphereResponse;
@@ -35,6 +36,7 @@ import com.vaadin.server.VaadinServletRequest;
 import com.vaadin.server.VaadinServletResponse;
 import com.vaadin.server.VaadinServletService;
 import com.vaadin.server.VaadinSession;
+import com.vaadin.shared.ApplicationConstants;
 
 /**
  * Handles requests to open a push (bidirectional) communication channel between
@@ -57,8 +59,17 @@ public class PushRequestHandler implements RequestHandler,
 
         pushHandler = new PushHandler(service);
         atmosphere.addAtmosphereHandler("/*", pushHandler);
-        atmosphere
-                .addInitParameter("org.atmosphere.cpr.sessionSupport", "true");
+        atmosphere.addInitParameter(ApplicationConfig.PROPERTY_SESSION_SUPPORT,
+                "true");
+
+        final String bufferSize = String
+                .valueOf(ApplicationConstants.WEBSOCKET_BUFFER_SIZE);
+        atmosphere.addInitParameter(ApplicationConfig.WEBSOCKET_BUFFER_SIZE,
+                bufferSize);
+        atmosphere.addInitParameter(ApplicationConfig.WEBSOCKET_MAXTEXTSIZE,
+                bufferSize);
+        atmosphere.addInitParameter(ApplicationConfig.WEBSOCKET_MAXBINARYSIZE,
+                bufferSize);
 
         // Disable Atmosphere's message about commercial support
         atmosphere.addInitParameter("org.atmosphere.cpr.showSupportMessage",
diff --git a/shared/src/com/vaadin/shared/ApplicationConstants.java b/shared/src/com/vaadin/shared/ApplicationConstants.java
index 04cba79c0c..fc4abd1988 100644
--- a/shared/src/com/vaadin/shared/ApplicationConstants.java
+++ b/shared/src/com/vaadin/shared/ApplicationConstants.java
@@ -83,4 +83,8 @@ public class ApplicationConstants implements Serializable {
      * Name of the parameter used to transmit the CSRF token.
      */
     public static final String CSRF_TOKEN_PARAMETER = "v-csrfToken";
+
+    public static final int WEBSOCKET_BUFFER_SIZE = 65536;
+
+    public static final char WEBSOCKET_MESSAGE_DELIMITER = '|';
 }
-- 
cgit v1.2.3