]> source.dussan.org Git - vaadin-framework.git/commitdiff
Refactor PushConnection handling (#13223)
authorJohannes Dahlström <johannesd@vaadin.com>
Wed, 22 Jan 2014 13:29:25 +0000 (15:29 +0200)
committerArtur Signell <artur@vaadin.com>
Fri, 31 Jan 2014 09:21:19 +0000 (09:21 +0000)
UIs now always have a PushConnection object if push is enabled,
and push reconnections do not create and set a new instance.
PushConnection.push can always be called; it internally handles
deferring the push until (re)connection if it is currently
disconnected. This is very desirable when using long polling,
as it reconnects after each push.

Change-Id: I478748cc940da86f34a5f55266f6b325682d4585

server/src/com/vaadin/server/communication/AtmospherePushConnection.java
server/src/com/vaadin/server/communication/PushConnection.java
server/src/com/vaadin/server/communication/PushHandler.java
server/src/com/vaadin/ui/PushConfiguration.java
server/src/com/vaadin/ui/UI.java

index 6a515abc295f937e11eadbe4ff1ad8195a96ed1f..56dd57640323e40713a836902a51c2c1d9163139 100644 (file)
@@ -22,22 +22,16 @@ import java.io.Serializable;
 import java.io.StringReader;
 import java.io.StringWriter;
 import java.io.Writer;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
 import org.atmosphere.cpr.AtmosphereResource;
 import org.atmosphere.cpr.AtmosphereResource.TRANSPORT;
-import org.json.JSONException;
 
 import com.vaadin.shared.communication.PushConstants;
 import com.vaadin.ui.UI;
 
 /**
- * {@link PushConnection} implementation using the Atmosphere push support that
- * is by default included in Vaadin.
+ * A {@link PushConnection} implementation using the Atmosphere push support
+ * that is by default included in Vaadin.
  * 
  * @author Vaadin Ltd
  * @since 7.1
@@ -92,55 +86,84 @@ public class AtmospherePushConnection implements PushConnection {
         }
     }
 
+    protected enum State {
+        /**
+         * Not connected. Trying to push will set the connection state to
+         * PUSH_PENDING or RESPONSE_PENDING and defer sending the message until
+         * a connection is established.
+         */
+        DISCONNECTED,
+
+        /**
+         * Not connected. An asynchronous push is pending the opening of the
+         * connection.
+         */
+        PUSH_PENDING,
+
+        /**
+         * Not connected. A response to a client request is pending the opening
+         * of the connection.
+         */
+        RESPONSE_PENDING,
+
+        /**
+         * Connected. Messages can be sent through the connection.
+         */
+        CONNECTED;
+    }
+
+    private State state = State.DISCONNECTED;
     private UI ui;
     private AtmosphereResource resource;
-    private Future<Object> outgoingMessage;
     private FragmentedMessage incomingMessage;
 
-    public AtmospherePushConnection(UI ui, AtmosphereResource resource) {
+    public AtmospherePushConnection(UI ui) {
         this.ui = ui;
-        this.resource = resource;
     }
 
     @Override
     public void push() {
-        assert isConnected();
-        try {
-            push(true);
-        } catch (IOException e) {
-            // TODO Error handling
-            throw new RuntimeException("Push failed", e);
-        }
+        push(true);
     }
 
     /**
-     * Pushes pending state changes and client RPC calls to the client.
+     * Pushes pending state changes and client RPC calls to the client. If
+     * {@code isConnected()} is false, defers the push until a connection is
+     * established.
      * 
      * @param async
      *            True if this push asynchronously originates from the server,
      *            false if it is a response to a client request.
-     * @throws IOException
      */
-    protected void push(boolean async) throws IOException {
-        Writer writer = new StringWriter();
-        try {
-            new UidlWriter().write(getUI(), writer, false, async);
-        } catch (JSONException e) {
-            throw new IOException("Error writing UIDL", e);
+    public void push(boolean async) {
+        if (!isConnected()) {
+            if (async && state != State.RESPONSE_PENDING) {
+                state = State.PUSH_PENDING;
+            } else {
+                state = State.RESPONSE_PENDING;
+            }
+        } else {
+            try {
+                Writer writer = new StringWriter();
+                new UidlWriter().write(getUI(), writer, false, async);
+                sendMessage("for(;;);[{" + writer.toString() + "}]");
+            } catch (Exception e) {
+                throw new RuntimeException("Push failed", e);
+            }
         }
-        sendMessage("for(;;);[{" + writer.toString() + "}]");
     }
 
     /**
-     * Sends the given message to the current client.
+     * Sends the given message to the current client. Cannot be called if
+     * {@isConnected()} is false.
      * 
      * @param message
      *            The message to send
      */
     void sendMessage(String message) {
+        assert (isConnected());
         // "Broadcast" the changes to the single client only
-        outgoingMessage = getResource().getBroadcaster().broadcast(message,
-                getResource());
+        getResource().getBroadcaster().broadcast(message, getResource());
     }
 
     /**
@@ -157,7 +180,7 @@ public class AtmospherePushConnection implements PushConnection {
      */
     protected Reader receiveMessage(Reader reader) throws IOException {
 
-        if (resource.transport() != TRANSPORT.WEBSOCKET) {
+        if (resource == null || resource.transport() != TRANSPORT.WEBSOCKET) {
             return reader;
         }
 
@@ -179,9 +202,37 @@ public class AtmospherePushConnection implements PushConnection {
 
     @Override
     public boolean isConnected() {
-        return resource != null
-                && resource.getBroadcaster().getAtmosphereResources()
-                        .contains(resource);
+        assert (state == State.CONNECTED) ^ (resource == null);
+        return state == State.CONNECTED;
+    }
+
+    /**
+     * Associates this {@code AtmospherePushConnection} with the given
+     * {@AtmosphereResource} representing an established
+     * push connection. If already connected, calls {@link #disconnect()} first.
+     * If there is a deferred push, carries it out via the new connection.
+     * 
+     * @since 7.2
+     */
+    public void connect(AtmosphereResource resource) {
+
+        assert resource != null;
+        assert resource != this.resource;
+
+        if (isConnected()) {
+            disconnect();
+        }
+
+        this.resource = resource;
+        State oldState = state;
+        state = State.CONNECTED;
+
+        if (oldState == State.PUSH_PENDING
+                || oldState == State.RESPONSE_PENDING) {
+            // Sending a "response" message (async=false) also takes care of a
+            // pending push, but not vice versa
+            push(oldState == State.PUSH_PENDING);
+        }
     }
 
     /**
@@ -202,33 +253,8 @@ public class AtmospherePushConnection implements PushConnection {
     @Override
     public void disconnect() {
         assert isConnected();
-
-        if (outgoingMessage != null) {
-            // Wait for the last message to be sent before closing the
-            // connection (assumes that futures are completed in order)
-            try {
-                outgoingMessage.get(1000, TimeUnit.MILLISECONDS);
-            } catch (TimeoutException e) {
-                getLogger()
-                        .log(Level.INFO,
-                                "Timeout waiting for messages to be sent to client before disconnect");
-            } catch (Exception e) {
-                getLogger()
-                        .log(Level.INFO,
-                                "Error waiting for messages to be sent to client before disconnect");
-            }
-            outgoingMessage = null;
-        }
-
         resource.resume();
         resource = null;
-    }
-
-    /**
-     * @since
-     * @return
-     */
-    private static Logger getLogger() {
-        return Logger.getLogger(AtmospherePushConnection.class.getName());
+        state = State.DISCONNECTED;
     }
 }
index 7f78d1d48e977772891251f2b43c2204c50e9943..cab3c948247fc2f326348c93d91e01eb8165d3e7 100644 (file)
@@ -20,7 +20,12 @@ import com.vaadin.ui.UI;
 
 /**
  * Represents a bidirectional ("push") connection between a single UI and its
- * client-side.
+ * client-side. A single {@code PushConnection} instance is bound to a UI as
+ * long as push is enabled in that UI, even if the actual connection is
+ * momentarily dropped either due to a network failure or as a normal part of
+ * the transport mechanism.
+ * <p>
+ * This interface is an internal API, only meant to be used by the framework.
  * 
  * @author Vaadin Ltd
  * @since 7.1
@@ -28,9 +33,10 @@ import com.vaadin.ui.UI;
 public interface PushConnection {
 
     /**
-     * Pushes pending state changes and client RPC calls to the client. Cannot
-     * be called if {@link #isConnected()} is false. It is NOT safe to invoke
-     * this method if not holding the session lock.
+     * Pushes pending state changes and client RPC calls to the client. Can be
+     * called even if {@link #isConnected()} is false; the push will be deferred
+     * until a connection is available. It is NOT safe to invoke this method if
+     * not holding the session lock.
      * <p>
      * This is internal API; please use {@link UI#push()} instead.
      */
index eaa0de60270529508a81026db0beae7b37872f3b..99aff3780fd52a6b630b54b36518625b3e72e05a 100644 (file)
@@ -73,8 +73,8 @@ public class PushHandler extends AtmosphereResourceEventListenerAdapter
         @Override
         public void run(AtmosphereResource resource, UI ui) throws IOException {
             getLogger().log(Level.FINER,
-                    "New push connection with transport {0}",
-                    resource.transport());
+                    "New push connection for resource {0} with transport {1}",
+                    new Object[] { resource.uuid(), resource.transport() });
 
             resource.addEventListener(PushHandler.this);
 
@@ -105,10 +105,9 @@ public class PushHandler extends AtmosphereResourceEventListenerAdapter
 
             resource.suspend();
 
-            AtmospherePushConnection connection = new AtmospherePushConnection(
-                    ui, resource);
-
-            ui.setPushConnection(connection);
+            AtmospherePushConnection connection = getConnectionForUI(ui);
+            assert (connection != null);
+            connection.connect(resource);
         }
     };
 
@@ -172,11 +171,11 @@ public class PushHandler extends AtmosphereResourceEventListenerAdapter
         @Override
         public void run(AtmosphereResource resource, UI ui) throws IOException {
             PushMode pushMode = ui.getPushConfiguration().getPushMode();
-            AtmospherePushConnection pushConnection = getConnectionForUI(ui);
+            AtmospherePushConnection connection = getConnectionForUI(ui);
 
             String id = resource.uuid();
 
-            if (pushConnection == null) {
+            if (connection == null) {
                 getLogger()
                         .log(Level.WARNING,
                                 "Could not find push connection to close: {0} with transport {1}",
@@ -199,7 +198,7 @@ public class PushHandler extends AtmosphereResourceEventListenerAdapter
                                     "Connection unexpectedly closed for resource {0} with transport {1}",
                                     new Object[] { id, resource.transport() });
                 }
-                ui.setPushConnection(null);
+                connection.disconnect();
             }
         }
     };
@@ -333,10 +332,10 @@ public class PushHandler extends AtmosphereResourceEventListenerAdapter
     private static AtmospherePushConnection getConnectionForUI(UI ui) {
         PushConnection pushConnection = ui.getPushConnection();
         if (pushConnection instanceof AtmospherePushConnection) {
-            assert pushConnection.isConnected();
             return (AtmospherePushConnection) pushConnection;
+        } else {
+            return null;
         }
-        return null;
     }
 
     @Override
@@ -373,7 +372,7 @@ public class PushHandler extends AtmosphereResourceEventListenerAdapter
                 break;
             case JSONP:
             case LONG_POLLING:
-                resource.resume();
+                disconnect(event);
                 break;
             default:
                 getLogger().log(Level.SEVERE, "Unknown transport {0}",
@@ -396,13 +395,6 @@ public class PushHandler extends AtmosphereResourceEventListenerAdapter
         disconnect(event);
     }
 
-    @Override
-    public void onResume(AtmosphereResourceEvent event) {
-        // Log event on trace level
-        super.onResume(event);
-        disconnect(event);
-    }
-
     @Override
     public void destroy() {
     }
@@ -426,8 +418,8 @@ public class PushHandler extends AtmosphereResourceEventListenerAdapter
      */
     private static void sendRefreshAndDisconnect(AtmosphereResource resource)
             throws IOException {
-        AtmospherePushConnection connection = new AtmospherePushConnection(
-                null, resource);
+        AtmospherePushConnection connection = new AtmospherePushConnection(null);
+        connection.connect(resource);
         try {
             connection.sendMessage(VaadinService
                     .createCriticalNotificationJSON(null, null, null, null));
index a592b39bef4bb64302e99b5bbed2b886b1222240..49738c5aff6359238b7a9cec0bb65e85968b3055 100644 (file)
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.Collections;
 
 import com.vaadin.server.VaadinSession;
+import com.vaadin.server.communication.AtmospherePushConnection;
 import com.vaadin.shared.communication.PushMode;
 import com.vaadin.shared.ui.ui.Transport;
 import com.vaadin.shared.ui.ui.UIState.PushConfigurationState;
@@ -170,20 +171,32 @@ class PushConfigurationImpl implements PushConfiguration {
             throw new IllegalArgumentException("Push mode cannot be null");
         }
 
-        if (pushMode.isEnabled()) {
-            VaadinSession session = ui.getSession();
-            if (session != null && !session.getService().ensurePushAvailable()) {
-                throw new IllegalStateException(
-                        "Push is not available. See previous log messages for more information.");
-            }
+        VaadinSession session = ui.getSession();
+
+        if (session == null) {
+            throw new UIDetachedException(
+                    "Cannot set the push mode for a detached UI");
+        }
+
+        assert session.hasLock();
+
+        if (pushMode.isEnabled() && !session.getService().ensurePushAvailable()) {
+            throw new IllegalStateException(
+                    "Push is not available. See previous log messages for more information.");
         }
 
-        /*
-         * Client-side will open a new connection or disconnect the old
-         * connection, so there's nothing more to do on the server at this
-         * point.
-         */
-        getState().mode = pushMode;
+        PushMode oldMode = getState().mode;
+        if (oldMode != pushMode) {
+            getState().mode = pushMode;
+
+            if (!oldMode.isEnabled() && pushMode.isEnabled()) {
+                // The push connection is initially in a disconnected state;
+                // the client will establish the connection
+                ui.setPushConnection(new AtmospherePushConnection(ui));
+            }
+            // Nothing to do here if disabling push;
+            // the client will close the connection
+        }
     }
 
     /*
@@ -274,9 +287,8 @@ class PushConfigurationImpl implements PushConfiguration {
 
     @Override
     public Collection<String> getParameterNames() {
-        return Collections
-                .unmodifiableCollection(ui.getState(false).pushConfiguration.parameters
-                        .keySet());
+        return Collections.unmodifiableCollection(getState(false).parameters
+                .keySet());
     }
 
 }
index a292e6b829876e94910df4705ab918d5c6f384fb..e688c0606170a8ba6933b74ff9ea5648b629e955 100644 (file)
@@ -55,6 +55,7 @@ import com.vaadin.server.communication.PushConnection;
 import com.vaadin.shared.Connector;
 import com.vaadin.shared.EventId;
 import com.vaadin.shared.MouseEventDetails;
+import com.vaadin.shared.communication.PushMode;
 import com.vaadin.shared.ui.ui.DebugWindowClientRpc;
 import com.vaadin.shared.ui.ui.DebugWindowServerRpc;
 import com.vaadin.shared.ui.ui.ScrollClientRpc;
@@ -422,8 +423,9 @@ public abstract class UI extends AbstractSingleComponentContainer implements
         } else {
             if (session == null) {
                 detach();
-                // Close the push connection when UI is detached. Otherwise the
+                // Disable push when the UI is detached. Otherwise the
                 // push connection and possibly VaadinSession will live on.
+                getPushConfiguration().setPushMode(PushMode.DISABLED);
                 setPushConnection(null);
             }
             this.session = session;
@@ -550,8 +552,6 @@ public abstract class UI extends AbstractSingleComponentContainer implements
 
     private transient PushConnection pushConnection = null;
 
-    private boolean hasPendingPush = false;
-
     private LocaleService localeService = new LocaleService(this,
             getState(false).localeServiceState);
 
@@ -1368,6 +1368,9 @@ public abstract class UI extends AbstractSingleComponentContainer implements
      * Pushes the pending changes and client RPC invocations of this UI to the
      * client-side.
      * <p>
+     * If push is enabled, but the push connection is not currently open, the
+     * push will be done when the connection is established.
+     * <p>
      * As with all UI methods, the session must be locked when calling this
      * method. It is also recommended that {@link UI#getCurrent()} is set up to
      * return this UI since writing the response may invoke logic in any
@@ -1385,79 +1388,73 @@ public abstract class UI extends AbstractSingleComponentContainer implements
      */
     public void push() {
         VaadinSession session = getSession();
-        if (session != null) {
-            assert session.hasLock();
-
-            /*
-             * Purge the pending access queue as it might mark a connector as
-             * dirty when the push would otherwise be ignored because there are
-             * no changes to push.
-             */
-            session.getService().runPendingAccessTasks(session);
-
-            if (!getConnectorTracker().hasDirtyConnectors()) {
-                // Do not push if there is nothing to push
-                return;
-            }
 
-            if (!getPushConfiguration().getPushMode().isEnabled()) {
-                throw new IllegalStateException("Push not enabled");
-            }
+        if (session == null) {
+            throw new UIDetachedException("Cannot push a detached UI");
+        }
+        assert session.hasLock();
 
-            if (pushConnection == null) {
-                hasPendingPush = true;
-            } else {
-                pushConnection.push();
-            }
-        } else {
-            throw new UIDetachedException("Trying to push a detached UI");
+        if (!getPushConfiguration().getPushMode().isEnabled()) {
+            throw new IllegalStateException("Push not enabled");
+        }
+        assert pushConnection != null;
+
+        /*
+         * Purge the pending access queue as it might mark a connector as dirty
+         * when the push would otherwise be ignored because there are no changes
+         * to push.
+         */
+        session.getService().runPendingAccessTasks(session);
+
+        if (!getConnectorTracker().hasDirtyConnectors()) {
+            // Do not push if there is nothing to push
+            return;
         }
+
+        pushConnection.push();
     }
 
     /**
      * Returns the internal push connection object used by this UI. This method
-     * should only be called by the framework. If the returned PushConnection is
-     * not null, it is guaranteed to have {@code isConnected() == true}.
+     * should only be called by the framework.
      * <p>
      * This method is not intended to be overridden. If it is overridden, care
      * should be taken since this method might be called in situations where
      * {@link UI#getCurrent()} does not return this UI.
      * 
-     * @return the push connection used by this UI, <code>null</code> if there
-     *         is no active push connection.
+     * @return the push connection used by this UI, or {@code null} if push is
+     *         not available.
      */
     public PushConnection getPushConnection() {
-        assert (pushConnection == null || pushConnection.isConnected());
+        assert !(getPushConfiguration().getPushMode().isEnabled() && pushConnection == null);
         return pushConnection;
     }
 
     /**
      * Sets the internal push connection object used by this UI. This method
-     * should only be called by the framework. If {@pushConnection} is not null,
-     * its {@code isConnected()} must be true.
+     * should only be called by the framework.
+     * <p>
+     * The {@code pushConnection} argument must be non-null if and only if
+     * {@code getPushConfiguration().getPushMode().isEnabled()}.
      * 
      * @param pushConnection
      *            the push connection to use for this UI
      */
     public void setPushConnection(PushConnection pushConnection) {
-        // If pushMode is disabled then there should never be a pushConnection
-        assert (pushConnection == null || getPushConfiguration().getPushMode()
-                .isEnabled());
-        assert (pushConnection == null || pushConnection.isConnected());
+        // If pushMode is disabled then there should never be a pushConnection;
+        // if enabled there should always be
+        assert (pushConnection == null)
+                ^ getPushConfiguration().getPushMode().isEnabled();
 
         if (pushConnection == this.pushConnection) {
             return;
         }
 
-        if (this.pushConnection != null) {
+        if (this.pushConnection != null && this.pushConnection.isConnected()) {
             this.pushConnection.disconnect();
         }
 
         this.pushConnection = pushConnection;
-        if (pushConnection != null && hasPendingPush) {
-            hasPendingPush = false;
-            pushConnection.push();
-        }
     }
 
     /**