summaryrefslogtreecommitdiffstats
path: root/server
diff options
context:
space:
mode:
authorJohannes Dahlström <johannesd@vaadin.com>2013-04-24 19:38:39 +0300
committerVaadin Code Review <review@vaadin.com>2013-04-29 15:56:47 +0000
commita430180f6fee1073ac7952e5f0c6bd7a01ed6606 (patch)
tree66a8623cadeb4869f703850deaf50c8a021289da /server
parent1f08e16589924841341c2145df6a1a135afb63d6 (diff)
downloadvaadin-framework-a430180f6fee1073ac7952e5f0c6bd7a01ed6606.tar.gz
vaadin-framework-a430180f6fee1073ac7952e5f0c6bd7a01ed6606.zip
Split client-to-server messages when websockets are used (#11648)
* First part of a fragmented message prefixed by total message length plus '|' * Atmosphere websocket buffer size set to 65536 * Fragment size is 65535 characters (Jetty requires less than buffer size) Change-Id: I8bf872bbb03b86386070fdc99c14ea805dd2ef3a
Diffstat (limited to 'server')
-rw-r--r--server/src/com/vaadin/server/communication/AtmospherePushConnection.java96
-rw-r--r--server/src/com/vaadin/server/communication/PushHandler.java10
-rw-r--r--server/src/com/vaadin/server/communication/PushRequestHandler.java15
3 files changed, 112 insertions, 9 deletions
diff --git a/server/src/com/vaadin/server/communication/AtmospherePushConnection.java b/server/src/com/vaadin/server/communication/AtmospherePushConnection.java
index bb1461b18e..0bba65ff1d 100644
--- a/server/src/com/vaadin/server/communication/AtmospherePushConnection.java
+++ b/server/src/com/vaadin/server/communication/AtmospherePushConnection.java
@@ -17,7 +17,9 @@
package com.vaadin.server.communication;
import java.io.IOException;
+import java.io.Reader;
import java.io.Serializable;
+import java.io.StringReader;
import java.io.StringWriter;
import java.io.Writer;
import java.util.concurrent.Future;
@@ -27,8 +29,10 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import org.atmosphere.cpr.AtmosphereResource;
+import org.atmosphere.cpr.AtmosphereResource.TRANSPORT;
import org.json.JSONException;
+import com.vaadin.shared.ApplicationConstants;
import com.vaadin.ui.UI;
/**
@@ -40,9 +44,57 @@ import com.vaadin.ui.UI;
*/
public class AtmospherePushConnection implements Serializable, PushConnection {
+ /**
+ * Represents a message that can arrive as multiple fragments.
+ */
+ protected static class FragmentedMessage {
+ private final StringBuilder message = new StringBuilder();
+ private final int messageLength;
+
+ public FragmentedMessage(Reader reader) throws IOException {
+ // Messages are prefixed by the total message length plus '|'
+ String length = "";
+ int c;
+ while ((c = reader.read()) != -1
+ && c != ApplicationConstants.WEBSOCKET_MESSAGE_DELIMITER) {
+ length += (char) c;
+ }
+ try {
+ messageLength = Integer.parseInt(length);
+ } catch (NumberFormatException e) {
+ throw new IOException("Invalid message length " + length, e);
+ }
+ }
+
+ /**
+ * Appends all the data from the given Reader to this message and
+ * returns whether the message was completed.
+ *
+ * @param reader
+ * The Reader from which to read.
+ * @return true if this message is complete, false otherwise.
+ * @throws IOException
+ */
+ public boolean append(Reader reader) throws IOException {
+ char[] buffer = new char[ApplicationConstants.WEBSOCKET_BUFFER_SIZE];
+ int read;
+ while ((read = reader.read(buffer)) != -1) {
+ message.append(buffer, 0, read);
+ assert message.length() <= messageLength : "Received message "
+ + message.length() + "chars, expected " + messageLength;
+ }
+ return message.length() == messageLength;
+ }
+
+ public Reader getReader() {
+ return new StringReader(message.toString());
+ }
+ }
+
private UI ui;
private transient AtmosphereResource resource;
- private Future<String> lastMessage;
+ private transient Future<String> outgoingMessage;
+ private transient FragmentedMessage incomingMessage;
public AtmospherePushConnection(UI ui) {
this.ui = ui;
@@ -85,11 +137,45 @@ public class AtmospherePushConnection implements Serializable, PushConnection {
*/
void sendMessage(String message) {
// "Broadcast" the changes to the single client only
- lastMessage = getResource().getBroadcaster().broadcast(message,
+ outgoingMessage = getResource().getBroadcaster().broadcast(message,
getResource());
}
/**
+ * Reads and buffers a (possibly partial) message. If a complete message was
+ * received, or if the call resulted in the completion of a partially
+ * received message, returns a {@link Reader} yielding the complete message.
+ * Otherwise, returns null.
+ *
+ * @param reader
+ * A Reader from which to read the (partial) message
+ * @return A Reader yielding a complete message or null if the message is
+ * not yet complete.
+ * @throws IOException
+ */
+ protected Reader receiveMessage(Reader reader) throws IOException {
+
+ if (resource.transport() != TRANSPORT.WEBSOCKET) {
+ return reader;
+ }
+
+ if (incomingMessage == null) {
+ // No existing partially received message
+ incomingMessage = new FragmentedMessage(reader);
+ }
+
+ if (incomingMessage.append(reader)) {
+ // Message is complete
+ Reader completeReader = incomingMessage.getReader();
+ incomingMessage = null;
+ return completeReader;
+ } else {
+ // Only received a partial message
+ return null;
+ }
+ }
+
+ /**
* Associates this connection with the given AtmosphereResource. If there is
* a push pending, commits it.
*
@@ -128,11 +214,11 @@ public class AtmospherePushConnection implements Serializable, PushConnection {
@Override
public void disconnect() {
- if (lastMessage != null) {
+ if (outgoingMessage != null) {
// Wait for the last message to be sent before closing the
// connection (assumes that futures are completed in order)
try {
- lastMessage.get(1000, TimeUnit.MILLISECONDS);
+ outgoingMessage.get(1000, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
getLogger()
.log(Level.INFO,
@@ -142,7 +228,7 @@ public class AtmospherePushConnection implements Serializable, PushConnection {
.log(Level.INFO,
"Error waiting for messages to be sent to client before disconnect");
}
- lastMessage = null;
+ outgoingMessage = null;
}
resource.resume();
diff --git a/server/src/com/vaadin/server/communication/PushHandler.java b/server/src/com/vaadin/server/communication/PushHandler.java
index c2b0f36e2a..e740db410d 100644
--- a/server/src/com/vaadin/server/communication/PushHandler.java
+++ b/server/src/com/vaadin/server/communication/PushHandler.java
@@ -17,6 +17,7 @@
package com.vaadin.server.communication;
import java.io.IOException;
+import java.io.Reader;
import java.io.Writer;
import java.util.Arrays;
import java.util.logging.Level;
@@ -129,13 +130,18 @@ public class PushHandler implements AtmosphereHandler {
+ "connection is kept open or if the UI has a "
+ "connection of unexpected type.";
+ Reader reader = connection.receiveMessage(req.getReader());
+ if (reader == null) {
+ // The whole message was not yet received
+ return;
+ }
+
// Should be set up by caller
VaadinRequest vaadinRequest = VaadinService.getCurrentRequest();
assert vaadinRequest != null;
try {
- new ServerRpcHandler().handleRpc(ui, req.getReader(),
- vaadinRequest);
+ new ServerRpcHandler().handleRpc(ui, reader, vaadinRequest);
connection.push(false);
} catch (JSONException e) {
getLogger().log(Level.SEVERE, "Error writing JSON to response",
diff --git a/server/src/com/vaadin/server/communication/PushRequestHandler.java b/server/src/com/vaadin/server/communication/PushRequestHandler.java
index 575ec4a86f..8360e08af9 100644
--- a/server/src/com/vaadin/server/communication/PushRequestHandler.java
+++ b/server/src/com/vaadin/server/communication/PushRequestHandler.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import javax.servlet.ServletException;
import org.atmosphere.client.TrackMessageSizeInterceptor;
+import org.atmosphere.cpr.ApplicationConfig;
import org.atmosphere.cpr.AtmosphereFramework;
import org.atmosphere.cpr.AtmosphereRequest;
import org.atmosphere.cpr.AtmosphereResponse;
@@ -35,6 +36,7 @@ import com.vaadin.server.VaadinServletRequest;
import com.vaadin.server.VaadinServletResponse;
import com.vaadin.server.VaadinServletService;
import com.vaadin.server.VaadinSession;
+import com.vaadin.shared.ApplicationConstants;
/**
* Handles requests to open a push (bidirectional) communication channel between
@@ -57,8 +59,17 @@ public class PushRequestHandler implements RequestHandler,
pushHandler = new PushHandler(service);
atmosphere.addAtmosphereHandler("/*", pushHandler);
- atmosphere
- .addInitParameter("org.atmosphere.cpr.sessionSupport", "true");
+ atmosphere.addInitParameter(ApplicationConfig.PROPERTY_SESSION_SUPPORT,
+ "true");
+
+ final String bufferSize = String
+ .valueOf(ApplicationConstants.WEBSOCKET_BUFFER_SIZE);
+ atmosphere.addInitParameter(ApplicationConfig.WEBSOCKET_BUFFER_SIZE,
+ bufferSize);
+ atmosphere.addInitParameter(ApplicationConfig.WEBSOCKET_MAXTEXTSIZE,
+ bufferSize);
+ atmosphere.addInitParameter(ApplicationConfig.WEBSOCKET_MAXBINARYSIZE,
+ bufferSize);
// Disable Atmosphere's message about commercial support
atmosphere.addInitParameter("org.atmosphere.cpr.showSupportMessage",