diff options
5 files changed, 171 insertions, 13 deletions
diff --git a/client/src/com/vaadin/client/communication/AtmospherePushConnection.java b/client/src/com/vaadin/client/communication/AtmospherePushConnection.java index f6b704ed9b..e42f91ea67 100644 --- a/client/src/com/vaadin/client/communication/AtmospherePushConnection.java +++ b/client/src/com/vaadin/client/communication/AtmospherePushConnection.java @@ -62,6 +62,45 @@ public class AtmospherePushConnection implements PushConnection { DISCONNECTED; } + /** + * Represents a message that should be sent as multiple fragments. + */ + protected static class FragmentedMessage { + + // Jetty requires length less than buffer size + private int FRAGMENT_LENGTH = ApplicationConstants.WEBSOCKET_BUFFER_SIZE - 1; + + private String message; + private int index = 0; + + public FragmentedMessage(String message) { + this.message = message; + } + + public boolean hasNextFragment() { + return index < message.length(); + } + + public String getNextFragment() { + String result; + if (index == 0) { + String header = "" + message.length() + + ApplicationConstants.WEBSOCKET_MESSAGE_DELIMITER; + int fragmentLen = FRAGMENT_LENGTH - header.length(); + result = header + getFragment(0, fragmentLen); + index += fragmentLen; + } else { + result = getFragment(index, index + FRAGMENT_LENGTH); + index += FRAGMENT_LENGTH; + } + return result; + } + + private String getFragment(int begin, int end) { + return message.substring(begin, Math.min(message.length(), end)); + } + } + private ApplicationConnection connection; private JavaScriptObject socket; @@ -74,6 +113,8 @@ public class AtmospherePushConnection implements PushConnection { private String uri; + private String transport; + /** * Keeps track of the disconnect confirmation command for cases where * pending messages should be pushed before actually disconnecting. @@ -151,7 +192,15 @@ public class AtmospherePushConnection implements PushConnection { case CONNECTED: assert isActive(); VConsole.log("Sending push message: " + message); - doPush(socket, message); + + if (transport.equals("websocket")) { + FragmentedMessage fragmented = new FragmentedMessage(message); + while (fragmented.hasNextFragment()) { + doPush(socket, fragmented.getNextFragment()); + } + } else { + doPush(socket, message); + } break; case DISCONNECT_PENDING: case DISCONNECTED: @@ -167,10 +216,12 @@ public class AtmospherePushConnection implements PushConnection { } protected void onOpen(AtmosphereResponse response) { - VConsole.log("Push connection established using " - + response.getTransport()); + transport = response.getTransport(); + + VConsole.log("Push connection established using " + transport); + for (String message : messageQueue) { - doPush(socket, message); + push(message); } messageQueue.clear(); diff --git a/server/src/com/vaadin/server/communication/AtmospherePushConnection.java b/server/src/com/vaadin/server/communication/AtmospherePushConnection.java index bb1461b18e..0bba65ff1d 100644 --- a/server/src/com/vaadin/server/communication/AtmospherePushConnection.java +++ b/server/src/com/vaadin/server/communication/AtmospherePushConnection.java @@ -17,7 +17,9 @@ package com.vaadin.server.communication; import java.io.IOException; +import java.io.Reader; import java.io.Serializable; +import java.io.StringReader; import java.io.StringWriter; import java.io.Writer; import java.util.concurrent.Future; @@ -27,8 +29,10 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.atmosphere.cpr.AtmosphereResource; +import org.atmosphere.cpr.AtmosphereResource.TRANSPORT; import org.json.JSONException; +import com.vaadin.shared.ApplicationConstants; import com.vaadin.ui.UI; /** @@ -40,9 +44,57 @@ import com.vaadin.ui.UI; */ public class AtmospherePushConnection implements Serializable, PushConnection { + /** + * Represents a message that can arrive as multiple fragments. + */ + protected static class FragmentedMessage { + private final StringBuilder message = new StringBuilder(); + private final int messageLength; + + public FragmentedMessage(Reader reader) throws IOException { + // Messages are prefixed by the total message length plus '|' + String length = ""; + int c; + while ((c = reader.read()) != -1 + && c != ApplicationConstants.WEBSOCKET_MESSAGE_DELIMITER) { + length += (char) c; + } + try { + messageLength = Integer.parseInt(length); + } catch (NumberFormatException e) { + throw new IOException("Invalid message length " + length, e); + } + } + + /** + * Appends all the data from the given Reader to this message and + * returns whether the message was completed. + * + * @param reader + * The Reader from which to read. + * @return true if this message is complete, false otherwise. + * @throws IOException + */ + public boolean append(Reader reader) throws IOException { + char[] buffer = new char[ApplicationConstants.WEBSOCKET_BUFFER_SIZE]; + int read; + while ((read = reader.read(buffer)) != -1) { + message.append(buffer, 0, read); + assert message.length() <= messageLength : "Received message " + + message.length() + "chars, expected " + messageLength; + } + return message.length() == messageLength; + } + + public Reader getReader() { + return new StringReader(message.toString()); + } + } + private UI ui; private transient AtmosphereResource resource; - private Future<String> lastMessage; + private transient Future<String> outgoingMessage; + private transient FragmentedMessage incomingMessage; public AtmospherePushConnection(UI ui) { this.ui = ui; @@ -85,11 +137,45 @@ public class AtmospherePushConnection implements Serializable, PushConnection { */ void sendMessage(String message) { // "Broadcast" the changes to the single client only - lastMessage = getResource().getBroadcaster().broadcast(message, + outgoingMessage = getResource().getBroadcaster().broadcast(message, getResource()); } /** + * Reads and buffers a (possibly partial) message. If a complete message was + * received, or if the call resulted in the completion of a partially + * received message, returns a {@link Reader} yielding the complete message. + * Otherwise, returns null. + * + * @param reader + * A Reader from which to read the (partial) message + * @return A Reader yielding a complete message or null if the message is + * not yet complete. + * @throws IOException + */ + protected Reader receiveMessage(Reader reader) throws IOException { + + if (resource.transport() != TRANSPORT.WEBSOCKET) { + return reader; + } + + if (incomingMessage == null) { + // No existing partially received message + incomingMessage = new FragmentedMessage(reader); + } + + if (incomingMessage.append(reader)) { + // Message is complete + Reader completeReader = incomingMessage.getReader(); + incomingMessage = null; + return completeReader; + } else { + // Only received a partial message + return null; + } + } + + /** * Associates this connection with the given AtmosphereResource. If there is * a push pending, commits it. * @@ -128,11 +214,11 @@ public class AtmospherePushConnection implements Serializable, PushConnection { @Override public void disconnect() { - if (lastMessage != null) { + if (outgoingMessage != null) { // Wait for the last message to be sent before closing the // connection (assumes that futures are completed in order) try { - lastMessage.get(1000, TimeUnit.MILLISECONDS); + outgoingMessage.get(1000, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { getLogger() .log(Level.INFO, @@ -142,7 +228,7 @@ public class AtmospherePushConnection implements Serializable, PushConnection { .log(Level.INFO, "Error waiting for messages to be sent to client before disconnect"); } - lastMessage = null; + outgoingMessage = null; } resource.resume(); diff --git a/server/src/com/vaadin/server/communication/PushHandler.java b/server/src/com/vaadin/server/communication/PushHandler.java index c2b0f36e2a..e740db410d 100644 --- a/server/src/com/vaadin/server/communication/PushHandler.java +++ b/server/src/com/vaadin/server/communication/PushHandler.java @@ -17,6 +17,7 @@ package com.vaadin.server.communication; import java.io.IOException; +import java.io.Reader; import java.io.Writer; import java.util.Arrays; import java.util.logging.Level; @@ -129,13 +130,18 @@ public class PushHandler implements AtmosphereHandler { + "connection is kept open or if the UI has a " + "connection of unexpected type."; + Reader reader = connection.receiveMessage(req.getReader()); + if (reader == null) { + // The whole message was not yet received + return; + } + // Should be set up by caller VaadinRequest vaadinRequest = VaadinService.getCurrentRequest(); assert vaadinRequest != null; try { - new ServerRpcHandler().handleRpc(ui, req.getReader(), - vaadinRequest); + new ServerRpcHandler().handleRpc(ui, reader, vaadinRequest); connection.push(false); } catch (JSONException e) { getLogger().log(Level.SEVERE, "Error writing JSON to response", diff --git a/server/src/com/vaadin/server/communication/PushRequestHandler.java b/server/src/com/vaadin/server/communication/PushRequestHandler.java index 575ec4a86f..8360e08af9 100644 --- a/server/src/com/vaadin/server/communication/PushRequestHandler.java +++ b/server/src/com/vaadin/server/communication/PushRequestHandler.java @@ -21,6 +21,7 @@ import java.io.IOException; import javax.servlet.ServletException; import org.atmosphere.client.TrackMessageSizeInterceptor; +import org.atmosphere.cpr.ApplicationConfig; import org.atmosphere.cpr.AtmosphereFramework; import org.atmosphere.cpr.AtmosphereRequest; import org.atmosphere.cpr.AtmosphereResponse; @@ -35,6 +36,7 @@ import com.vaadin.server.VaadinServletRequest; import com.vaadin.server.VaadinServletResponse; import com.vaadin.server.VaadinServletService; import com.vaadin.server.VaadinSession; +import com.vaadin.shared.ApplicationConstants; /** * Handles requests to open a push (bidirectional) communication channel between @@ -57,8 +59,17 @@ public class PushRequestHandler implements RequestHandler, pushHandler = new PushHandler(service); atmosphere.addAtmosphereHandler("/*", pushHandler); - atmosphere - .addInitParameter("org.atmosphere.cpr.sessionSupport", "true"); + atmosphere.addInitParameter(ApplicationConfig.PROPERTY_SESSION_SUPPORT, + "true"); + + final String bufferSize = String + .valueOf(ApplicationConstants.WEBSOCKET_BUFFER_SIZE); + atmosphere.addInitParameter(ApplicationConfig.WEBSOCKET_BUFFER_SIZE, + bufferSize); + atmosphere.addInitParameter(ApplicationConfig.WEBSOCKET_MAXTEXTSIZE, + bufferSize); + atmosphere.addInitParameter(ApplicationConfig.WEBSOCKET_MAXBINARYSIZE, + bufferSize); // Disable Atmosphere's message about commercial support atmosphere.addInitParameter("org.atmosphere.cpr.showSupportMessage", diff --git a/shared/src/com/vaadin/shared/ApplicationConstants.java b/shared/src/com/vaadin/shared/ApplicationConstants.java index 04cba79c0c..fc4abd1988 100644 --- a/shared/src/com/vaadin/shared/ApplicationConstants.java +++ b/shared/src/com/vaadin/shared/ApplicationConstants.java @@ -83,4 +83,8 @@ public class ApplicationConstants implements Serializable { * Name of the parameter used to transmit the CSRF token. */ public static final String CSRF_TOKEN_PARAMETER = "v-csrfToken"; + + public static final int WEBSOCKET_BUFFER_SIZE = 65536; + + public static final char WEBSOCKET_MESSAGE_DELIMITER = '|'; } |