summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--client/src/com/vaadin/client/communication/AtmospherePushConnection.java59
-rw-r--r--server/src/com/vaadin/server/communication/AtmospherePushConnection.java96
-rw-r--r--server/src/com/vaadin/server/communication/PushHandler.java10
-rw-r--r--server/src/com/vaadin/server/communication/PushRequestHandler.java15
-rw-r--r--shared/src/com/vaadin/shared/ApplicationConstants.java4
5 files changed, 171 insertions, 13 deletions
diff --git a/client/src/com/vaadin/client/communication/AtmospherePushConnection.java b/client/src/com/vaadin/client/communication/AtmospherePushConnection.java
index f6b704ed9b..e42f91ea67 100644
--- a/client/src/com/vaadin/client/communication/AtmospherePushConnection.java
+++ b/client/src/com/vaadin/client/communication/AtmospherePushConnection.java
@@ -62,6 +62,45 @@ public class AtmospherePushConnection implements PushConnection {
DISCONNECTED;
}
+ /**
+ * Represents a message that should be sent as multiple fragments.
+ */
+ protected static class FragmentedMessage {
+
+ // Jetty requires length less than buffer size
+ private int FRAGMENT_LENGTH = ApplicationConstants.WEBSOCKET_BUFFER_SIZE - 1;
+
+ private String message;
+ private int index = 0;
+
+ public FragmentedMessage(String message) {
+ this.message = message;
+ }
+
+ public boolean hasNextFragment() {
+ return index < message.length();
+ }
+
+ public String getNextFragment() {
+ String result;
+ if (index == 0) {
+ String header = "" + message.length()
+ + ApplicationConstants.WEBSOCKET_MESSAGE_DELIMITER;
+ int fragmentLen = FRAGMENT_LENGTH - header.length();
+ result = header + getFragment(0, fragmentLen);
+ index += fragmentLen;
+ } else {
+ result = getFragment(index, index + FRAGMENT_LENGTH);
+ index += FRAGMENT_LENGTH;
+ }
+ return result;
+ }
+
+ private String getFragment(int begin, int end) {
+ return message.substring(begin, Math.min(message.length(), end));
+ }
+ }
+
private ApplicationConnection connection;
private JavaScriptObject socket;
@@ -74,6 +113,8 @@ public class AtmospherePushConnection implements PushConnection {
private String uri;
+ private String transport;
+
/**
* Keeps track of the disconnect confirmation command for cases where
* pending messages should be pushed before actually disconnecting.
@@ -151,7 +192,15 @@ public class AtmospherePushConnection implements PushConnection {
case CONNECTED:
assert isActive();
VConsole.log("Sending push message: " + message);
- doPush(socket, message);
+
+ if (transport.equals("websocket")) {
+ FragmentedMessage fragmented = new FragmentedMessage(message);
+ while (fragmented.hasNextFragment()) {
+ doPush(socket, fragmented.getNextFragment());
+ }
+ } else {
+ doPush(socket, message);
+ }
break;
case DISCONNECT_PENDING:
case DISCONNECTED:
@@ -167,10 +216,12 @@ public class AtmospherePushConnection implements PushConnection {
}
protected void onOpen(AtmosphereResponse response) {
- VConsole.log("Push connection established using "
- + response.getTransport());
+ transport = response.getTransport();
+
+ VConsole.log("Push connection established using " + transport);
+
for (String message : messageQueue) {
- doPush(socket, message);
+ push(message);
}
messageQueue.clear();
diff --git a/server/src/com/vaadin/server/communication/AtmospherePushConnection.java b/server/src/com/vaadin/server/communication/AtmospherePushConnection.java
index bb1461b18e..0bba65ff1d 100644
--- a/server/src/com/vaadin/server/communication/AtmospherePushConnection.java
+++ b/server/src/com/vaadin/server/communication/AtmospherePushConnection.java
@@ -17,7 +17,9 @@
package com.vaadin.server.communication;
import java.io.IOException;
+import java.io.Reader;
import java.io.Serializable;
+import java.io.StringReader;
import java.io.StringWriter;
import java.io.Writer;
import java.util.concurrent.Future;
@@ -27,8 +29,10 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import org.atmosphere.cpr.AtmosphereResource;
+import org.atmosphere.cpr.AtmosphereResource.TRANSPORT;
import org.json.JSONException;
+import com.vaadin.shared.ApplicationConstants;
import com.vaadin.ui.UI;
/**
@@ -40,9 +44,57 @@ import com.vaadin.ui.UI;
*/
public class AtmospherePushConnection implements Serializable, PushConnection {
+ /**
+ * Represents a message that can arrive as multiple fragments.
+ */
+ protected static class FragmentedMessage {
+ private final StringBuilder message = new StringBuilder();
+ private final int messageLength;
+
+ public FragmentedMessage(Reader reader) throws IOException {
+ // Messages are prefixed by the total message length plus '|'
+ String length = "";
+ int c;
+ while ((c = reader.read()) != -1
+ && c != ApplicationConstants.WEBSOCKET_MESSAGE_DELIMITER) {
+ length += (char) c;
+ }
+ try {
+ messageLength = Integer.parseInt(length);
+ } catch (NumberFormatException e) {
+ throw new IOException("Invalid message length " + length, e);
+ }
+ }
+
+ /**
+ * Appends all the data from the given Reader to this message and
+ * returns whether the message was completed.
+ *
+ * @param reader
+ * The Reader from which to read.
+ * @return true if this message is complete, false otherwise.
+ * @throws IOException
+ */
+ public boolean append(Reader reader) throws IOException {
+ char[] buffer = new char[ApplicationConstants.WEBSOCKET_BUFFER_SIZE];
+ int read;
+ while ((read = reader.read(buffer)) != -1) {
+ message.append(buffer, 0, read);
+ assert message.length() <= messageLength : "Received message "
+ + message.length() + "chars, expected " + messageLength;
+ }
+ return message.length() == messageLength;
+ }
+
+ public Reader getReader() {
+ return new StringReader(message.toString());
+ }
+ }
+
private UI ui;
private transient AtmosphereResource resource;
- private Future<String> lastMessage;
+ private transient Future<String> outgoingMessage;
+ private transient FragmentedMessage incomingMessage;
public AtmospherePushConnection(UI ui) {
this.ui = ui;
@@ -85,11 +137,45 @@ public class AtmospherePushConnection implements Serializable, PushConnection {
*/
void sendMessage(String message) {
// "Broadcast" the changes to the single client only
- lastMessage = getResource().getBroadcaster().broadcast(message,
+ outgoingMessage = getResource().getBroadcaster().broadcast(message,
getResource());
}
/**
+ * Reads and buffers a (possibly partial) message. If a complete message was
+ * received, or if the call resulted in the completion of a partially
+ * received message, returns a {@link Reader} yielding the complete message.
+ * Otherwise, returns null.
+ *
+ * @param reader
+ * A Reader from which to read the (partial) message
+ * @return A Reader yielding a complete message or null if the message is
+ * not yet complete.
+ * @throws IOException
+ */
+ protected Reader receiveMessage(Reader reader) throws IOException {
+
+ if (resource.transport() != TRANSPORT.WEBSOCKET) {
+ return reader;
+ }
+
+ if (incomingMessage == null) {
+ // No existing partially received message
+ incomingMessage = new FragmentedMessage(reader);
+ }
+
+ if (incomingMessage.append(reader)) {
+ // Message is complete
+ Reader completeReader = incomingMessage.getReader();
+ incomingMessage = null;
+ return completeReader;
+ } else {
+ // Only received a partial message
+ return null;
+ }
+ }
+
+ /**
* Associates this connection with the given AtmosphereResource. If there is
* a push pending, commits it.
*
@@ -128,11 +214,11 @@ public class AtmospherePushConnection implements Serializable, PushConnection {
@Override
public void disconnect() {
- if (lastMessage != null) {
+ if (outgoingMessage != null) {
// Wait for the last message to be sent before closing the
// connection (assumes that futures are completed in order)
try {
- lastMessage.get(1000, TimeUnit.MILLISECONDS);
+ outgoingMessage.get(1000, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
getLogger()
.log(Level.INFO,
@@ -142,7 +228,7 @@ public class AtmospherePushConnection implements Serializable, PushConnection {
.log(Level.INFO,
"Error waiting for messages to be sent to client before disconnect");
}
- lastMessage = null;
+ outgoingMessage = null;
}
resource.resume();
diff --git a/server/src/com/vaadin/server/communication/PushHandler.java b/server/src/com/vaadin/server/communication/PushHandler.java
index c2b0f36e2a..e740db410d 100644
--- a/server/src/com/vaadin/server/communication/PushHandler.java
+++ b/server/src/com/vaadin/server/communication/PushHandler.java
@@ -17,6 +17,7 @@
package com.vaadin.server.communication;
import java.io.IOException;
+import java.io.Reader;
import java.io.Writer;
import java.util.Arrays;
import java.util.logging.Level;
@@ -129,13 +130,18 @@ public class PushHandler implements AtmosphereHandler {
+ "connection is kept open or if the UI has a "
+ "connection of unexpected type.";
+ Reader reader = connection.receiveMessage(req.getReader());
+ if (reader == null) {
+ // The whole message was not yet received
+ return;
+ }
+
// Should be set up by caller
VaadinRequest vaadinRequest = VaadinService.getCurrentRequest();
assert vaadinRequest != null;
try {
- new ServerRpcHandler().handleRpc(ui, req.getReader(),
- vaadinRequest);
+ new ServerRpcHandler().handleRpc(ui, reader, vaadinRequest);
connection.push(false);
} catch (JSONException e) {
getLogger().log(Level.SEVERE, "Error writing JSON to response",
diff --git a/server/src/com/vaadin/server/communication/PushRequestHandler.java b/server/src/com/vaadin/server/communication/PushRequestHandler.java
index 575ec4a86f..8360e08af9 100644
--- a/server/src/com/vaadin/server/communication/PushRequestHandler.java
+++ b/server/src/com/vaadin/server/communication/PushRequestHandler.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import javax.servlet.ServletException;
import org.atmosphere.client.TrackMessageSizeInterceptor;
+import org.atmosphere.cpr.ApplicationConfig;
import org.atmosphere.cpr.AtmosphereFramework;
import org.atmosphere.cpr.AtmosphereRequest;
import org.atmosphere.cpr.AtmosphereResponse;
@@ -35,6 +36,7 @@ import com.vaadin.server.VaadinServletRequest;
import com.vaadin.server.VaadinServletResponse;
import com.vaadin.server.VaadinServletService;
import com.vaadin.server.VaadinSession;
+import com.vaadin.shared.ApplicationConstants;
/**
* Handles requests to open a push (bidirectional) communication channel between
@@ -57,8 +59,17 @@ public class PushRequestHandler implements RequestHandler,
pushHandler = new PushHandler(service);
atmosphere.addAtmosphereHandler("/*", pushHandler);
- atmosphere
- .addInitParameter("org.atmosphere.cpr.sessionSupport", "true");
+ atmosphere.addInitParameter(ApplicationConfig.PROPERTY_SESSION_SUPPORT,
+ "true");
+
+ final String bufferSize = String
+ .valueOf(ApplicationConstants.WEBSOCKET_BUFFER_SIZE);
+ atmosphere.addInitParameter(ApplicationConfig.WEBSOCKET_BUFFER_SIZE,
+ bufferSize);
+ atmosphere.addInitParameter(ApplicationConfig.WEBSOCKET_MAXTEXTSIZE,
+ bufferSize);
+ atmosphere.addInitParameter(ApplicationConfig.WEBSOCKET_MAXBINARYSIZE,
+ bufferSize);
// Disable Atmosphere's message about commercial support
atmosphere.addInitParameter("org.atmosphere.cpr.showSupportMessage",
diff --git a/shared/src/com/vaadin/shared/ApplicationConstants.java b/shared/src/com/vaadin/shared/ApplicationConstants.java
index 04cba79c0c..fc4abd1988 100644
--- a/shared/src/com/vaadin/shared/ApplicationConstants.java
+++ b/shared/src/com/vaadin/shared/ApplicationConstants.java
@@ -83,4 +83,8 @@ public class ApplicationConstants implements Serializable {
* Name of the parameter used to transmit the CSRF token.
*/
public static final String CSRF_TOKEN_PARAMETER = "v-csrfToken";
+
+ public static final int WEBSOCKET_BUFFER_SIZE = 65536;
+
+ public static final char WEBSOCKET_MESSAGE_DELIMITER = '|';
}