From 3c5644c4a3e0484b7e65a2852d9cedeeab167419 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Johannes=20Dahlstr=C3=B6m?= Date: Wed, 22 Jan 2014 15:29:25 +0200 Subject: [PATCH] Refactor PushConnection handling (#13223) UIs now always have a PushConnection object if push is enabled, and push reconnections do not create and set a new instance. PushConnection.push can always be called; it internally handles deferring the push until (re)connection if it is currently disconnected. This is very desirable when using long polling, as it reconnects after each push. Change-Id: I478748cc940da86f34a5f55266f6b325682d4585 --- .../AtmospherePushConnection.java | 146 +++++++++++------- .../server/communication/PushConnection.java | 14 +- .../server/communication/PushHandler.java | 34 ++-- .../src/com/vaadin/ui/PushConfiguration.java | 42 +++-- server/src/com/vaadin/ui/UI.java | 83 +++++----- 5 files changed, 176 insertions(+), 143 deletions(-) diff --git a/server/src/com/vaadin/server/communication/AtmospherePushConnection.java b/server/src/com/vaadin/server/communication/AtmospherePushConnection.java index 6a515abc29..56dd576403 100644 --- a/server/src/com/vaadin/server/communication/AtmospherePushConnection.java +++ b/server/src/com/vaadin/server/communication/AtmospherePushConnection.java @@ -22,22 +22,16 @@ import java.io.Serializable; import java.io.StringReader; import java.io.StringWriter; import java.io.Writer; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.logging.Level; -import java.util.logging.Logger; import org.atmosphere.cpr.AtmosphereResource; import org.atmosphere.cpr.AtmosphereResource.TRANSPORT; -import org.json.JSONException; import com.vaadin.shared.communication.PushConstants; import com.vaadin.ui.UI; /** - * {@link PushConnection} implementation using the Atmosphere push support that - * is by default included in Vaadin. + * A {@link PushConnection} implementation using the Atmosphere push support + * that is by default included in Vaadin. * * @author Vaadin Ltd * @since 7.1 @@ -92,55 +86,84 @@ public class AtmospherePushConnection implements PushConnection { } } + protected enum State { + /** + * Not connected. Trying to push will set the connection state to + * PUSH_PENDING or RESPONSE_PENDING and defer sending the message until + * a connection is established. + */ + DISCONNECTED, + + /** + * Not connected. An asynchronous push is pending the opening of the + * connection. + */ + PUSH_PENDING, + + /** + * Not connected. A response to a client request is pending the opening + * of the connection. + */ + RESPONSE_PENDING, + + /** + * Connected. Messages can be sent through the connection. + */ + CONNECTED; + } + + private State state = State.DISCONNECTED; private UI ui; private AtmosphereResource resource; - private Future outgoingMessage; private FragmentedMessage incomingMessage; - public AtmospherePushConnection(UI ui, AtmosphereResource resource) { + public AtmospherePushConnection(UI ui) { this.ui = ui; - this.resource = resource; } @Override public void push() { - assert isConnected(); - try { - push(true); - } catch (IOException e) { - // TODO Error handling - throw new RuntimeException("Push failed", e); - } + push(true); } /** - * Pushes pending state changes and client RPC calls to the client. + * Pushes pending state changes and client RPC calls to the client. If + * {@code isConnected()} is false, defers the push until a connection is + * established. * * @param async * True if this push asynchronously originates from the server, * false if it is a response to a client request. - * @throws IOException */ - protected void push(boolean async) throws IOException { - Writer writer = new StringWriter(); - try { - new UidlWriter().write(getUI(), writer, false, async); - } catch (JSONException e) { - throw new IOException("Error writing UIDL", e); + public void push(boolean async) { + if (!isConnected()) { + if (async && state != State.RESPONSE_PENDING) { + state = State.PUSH_PENDING; + } else { + state = State.RESPONSE_PENDING; + } + } else { + try { + Writer writer = new StringWriter(); + new UidlWriter().write(getUI(), writer, false, async); + sendMessage("for(;;);[{" + writer.toString() + "}]"); + } catch (Exception e) { + throw new RuntimeException("Push failed", e); + } } - sendMessage("for(;;);[{" + writer.toString() + "}]"); } /** - * Sends the given message to the current client. + * Sends the given message to the current client. Cannot be called if + * {@isConnected()} is false. * * @param message * The message to send */ void sendMessage(String message) { + assert (isConnected()); // "Broadcast" the changes to the single client only - outgoingMessage = getResource().getBroadcaster().broadcast(message, - getResource()); + getResource().getBroadcaster().broadcast(message, getResource()); } /** @@ -157,7 +180,7 @@ public class AtmospherePushConnection implements PushConnection { */ protected Reader receiveMessage(Reader reader) throws IOException { - if (resource.transport() != TRANSPORT.WEBSOCKET) { + if (resource == null || resource.transport() != TRANSPORT.WEBSOCKET) { return reader; } @@ -179,9 +202,37 @@ public class AtmospherePushConnection implements PushConnection { @Override public boolean isConnected() { - return resource != null - && resource.getBroadcaster().getAtmosphereResources() - .contains(resource); + assert (state == State.CONNECTED) ^ (resource == null); + return state == State.CONNECTED; + } + + /** + * Associates this {@code AtmospherePushConnection} with the given + * {@AtmosphereResource} representing an established + * push connection. If already connected, calls {@link #disconnect()} first. + * If there is a deferred push, carries it out via the new connection. + * + * @since 7.2 + */ + public void connect(AtmosphereResource resource) { + + assert resource != null; + assert resource != this.resource; + + if (isConnected()) { + disconnect(); + } + + this.resource = resource; + State oldState = state; + state = State.CONNECTED; + + if (oldState == State.PUSH_PENDING + || oldState == State.RESPONSE_PENDING) { + // Sending a "response" message (async=false) also takes care of a + // pending push, but not vice versa + push(oldState == State.PUSH_PENDING); + } } /** @@ -202,33 +253,8 @@ public class AtmospherePushConnection implements PushConnection { @Override public void disconnect() { assert isConnected(); - - if (outgoingMessage != null) { - // Wait for the last message to be sent before closing the - // connection (assumes that futures are completed in order) - try { - outgoingMessage.get(1000, TimeUnit.MILLISECONDS); - } catch (TimeoutException e) { - getLogger() - .log(Level.INFO, - "Timeout waiting for messages to be sent to client before disconnect"); - } catch (Exception e) { - getLogger() - .log(Level.INFO, - "Error waiting for messages to be sent to client before disconnect"); - } - outgoingMessage = null; - } - resource.resume(); resource = null; - } - - /** - * @since - * @return - */ - private static Logger getLogger() { - return Logger.getLogger(AtmospherePushConnection.class.getName()); + state = State.DISCONNECTED; } } diff --git a/server/src/com/vaadin/server/communication/PushConnection.java b/server/src/com/vaadin/server/communication/PushConnection.java index 7f78d1d48e..cab3c94824 100644 --- a/server/src/com/vaadin/server/communication/PushConnection.java +++ b/server/src/com/vaadin/server/communication/PushConnection.java @@ -20,7 +20,12 @@ import com.vaadin.ui.UI; /** * Represents a bidirectional ("push") connection between a single UI and its - * client-side. + * client-side. A single {@code PushConnection} instance is bound to a UI as + * long as push is enabled in that UI, even if the actual connection is + * momentarily dropped either due to a network failure or as a normal part of + * the transport mechanism. + *

+ * This interface is an internal API, only meant to be used by the framework. * * @author Vaadin Ltd * @since 7.1 @@ -28,9 +33,10 @@ import com.vaadin.ui.UI; public interface PushConnection { /** - * Pushes pending state changes and client RPC calls to the client. Cannot - * be called if {@link #isConnected()} is false. It is NOT safe to invoke - * this method if not holding the session lock. + * Pushes pending state changes and client RPC calls to the client. Can be + * called even if {@link #isConnected()} is false; the push will be deferred + * until a connection is available. It is NOT safe to invoke this method if + * not holding the session lock. *

* This is internal API; please use {@link UI#push()} instead. */ diff --git a/server/src/com/vaadin/server/communication/PushHandler.java b/server/src/com/vaadin/server/communication/PushHandler.java index eaa0de6027..99aff3780f 100644 --- a/server/src/com/vaadin/server/communication/PushHandler.java +++ b/server/src/com/vaadin/server/communication/PushHandler.java @@ -73,8 +73,8 @@ public class PushHandler extends AtmosphereResourceEventListenerAdapter @Override public void run(AtmosphereResource resource, UI ui) throws IOException { getLogger().log(Level.FINER, - "New push connection with transport {0}", - resource.transport()); + "New push connection for resource {0} with transport {1}", + new Object[] { resource.uuid(), resource.transport() }); resource.addEventListener(PushHandler.this); @@ -105,10 +105,9 @@ public class PushHandler extends AtmosphereResourceEventListenerAdapter resource.suspend(); - AtmospherePushConnection connection = new AtmospherePushConnection( - ui, resource); - - ui.setPushConnection(connection); + AtmospherePushConnection connection = getConnectionForUI(ui); + assert (connection != null); + connection.connect(resource); } }; @@ -172,11 +171,11 @@ public class PushHandler extends AtmosphereResourceEventListenerAdapter @Override public void run(AtmosphereResource resource, UI ui) throws IOException { PushMode pushMode = ui.getPushConfiguration().getPushMode(); - AtmospherePushConnection pushConnection = getConnectionForUI(ui); + AtmospherePushConnection connection = getConnectionForUI(ui); String id = resource.uuid(); - if (pushConnection == null) { + if (connection == null) { getLogger() .log(Level.WARNING, "Could not find push connection to close: {0} with transport {1}", @@ -199,7 +198,7 @@ public class PushHandler extends AtmosphereResourceEventListenerAdapter "Connection unexpectedly closed for resource {0} with transport {1}", new Object[] { id, resource.transport() }); } - ui.setPushConnection(null); + connection.disconnect(); } } }; @@ -333,10 +332,10 @@ public class PushHandler extends AtmosphereResourceEventListenerAdapter private static AtmospherePushConnection getConnectionForUI(UI ui) { PushConnection pushConnection = ui.getPushConnection(); if (pushConnection instanceof AtmospherePushConnection) { - assert pushConnection.isConnected(); return (AtmospherePushConnection) pushConnection; + } else { + return null; } - return null; } @Override @@ -373,7 +372,7 @@ public class PushHandler extends AtmosphereResourceEventListenerAdapter break; case JSONP: case LONG_POLLING: - resource.resume(); + disconnect(event); break; default: getLogger().log(Level.SEVERE, "Unknown transport {0}", @@ -396,13 +395,6 @@ public class PushHandler extends AtmosphereResourceEventListenerAdapter disconnect(event); } - @Override - public void onResume(AtmosphereResourceEvent event) { - // Log event on trace level - super.onResume(event); - disconnect(event); - } - @Override public void destroy() { } @@ -426,8 +418,8 @@ public class PushHandler extends AtmosphereResourceEventListenerAdapter */ private static void sendRefreshAndDisconnect(AtmosphereResource resource) throws IOException { - AtmospherePushConnection connection = new AtmospherePushConnection( - null, resource); + AtmospherePushConnection connection = new AtmospherePushConnection(null); + connection.connect(resource); try { connection.sendMessage(VaadinService .createCriticalNotificationJSON(null, null, null, null)); diff --git a/server/src/com/vaadin/ui/PushConfiguration.java b/server/src/com/vaadin/ui/PushConfiguration.java index a592b39bef..49738c5aff 100644 --- a/server/src/com/vaadin/ui/PushConfiguration.java +++ b/server/src/com/vaadin/ui/PushConfiguration.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Collections; import com.vaadin.server.VaadinSession; +import com.vaadin.server.communication.AtmospherePushConnection; import com.vaadin.shared.communication.PushMode; import com.vaadin.shared.ui.ui.Transport; import com.vaadin.shared.ui.ui.UIState.PushConfigurationState; @@ -170,20 +171,32 @@ class PushConfigurationImpl implements PushConfiguration { throw new IllegalArgumentException("Push mode cannot be null"); } - if (pushMode.isEnabled()) { - VaadinSession session = ui.getSession(); - if (session != null && !session.getService().ensurePushAvailable()) { - throw new IllegalStateException( - "Push is not available. See previous log messages for more information."); - } + VaadinSession session = ui.getSession(); + + if (session == null) { + throw new UIDetachedException( + "Cannot set the push mode for a detached UI"); + } + + assert session.hasLock(); + + if (pushMode.isEnabled() && !session.getService().ensurePushAvailable()) { + throw new IllegalStateException( + "Push is not available. See previous log messages for more information."); } - /* - * Client-side will open a new connection or disconnect the old - * connection, so there's nothing more to do on the server at this - * point. - */ - getState().mode = pushMode; + PushMode oldMode = getState().mode; + if (oldMode != pushMode) { + getState().mode = pushMode; + + if (!oldMode.isEnabled() && pushMode.isEnabled()) { + // The push connection is initially in a disconnected state; + // the client will establish the connection + ui.setPushConnection(new AtmospherePushConnection(ui)); + } + // Nothing to do here if disabling push; + // the client will close the connection + } } /* @@ -274,9 +287,8 @@ class PushConfigurationImpl implements PushConfiguration { @Override public Collection getParameterNames() { - return Collections - .unmodifiableCollection(ui.getState(false).pushConfiguration.parameters - .keySet()); + return Collections.unmodifiableCollection(getState(false).parameters + .keySet()); } } diff --git a/server/src/com/vaadin/ui/UI.java b/server/src/com/vaadin/ui/UI.java index a292e6b829..e688c06061 100644 --- a/server/src/com/vaadin/ui/UI.java +++ b/server/src/com/vaadin/ui/UI.java @@ -55,6 +55,7 @@ import com.vaadin.server.communication.PushConnection; import com.vaadin.shared.Connector; import com.vaadin.shared.EventId; import com.vaadin.shared.MouseEventDetails; +import com.vaadin.shared.communication.PushMode; import com.vaadin.shared.ui.ui.DebugWindowClientRpc; import com.vaadin.shared.ui.ui.DebugWindowServerRpc; import com.vaadin.shared.ui.ui.ScrollClientRpc; @@ -422,8 +423,9 @@ public abstract class UI extends AbstractSingleComponentContainer implements } else { if (session == null) { detach(); - // Close the push connection when UI is detached. Otherwise the + // Disable push when the UI is detached. Otherwise the // push connection and possibly VaadinSession will live on. + getPushConfiguration().setPushMode(PushMode.DISABLED); setPushConnection(null); } this.session = session; @@ -550,8 +552,6 @@ public abstract class UI extends AbstractSingleComponentContainer implements private transient PushConnection pushConnection = null; - private boolean hasPendingPush = false; - private LocaleService localeService = new LocaleService(this, getState(false).localeServiceState); @@ -1368,6 +1368,9 @@ public abstract class UI extends AbstractSingleComponentContainer implements * Pushes the pending changes and client RPC invocations of this UI to the * client-side. *

+ * If push is enabled, but the push connection is not currently open, the + * push will be done when the connection is established. + *

* As with all UI methods, the session must be locked when calling this * method. It is also recommended that {@link UI#getCurrent()} is set up to * return this UI since writing the response may invoke logic in any @@ -1385,79 +1388,73 @@ public abstract class UI extends AbstractSingleComponentContainer implements */ public void push() { VaadinSession session = getSession(); - if (session != null) { - assert session.hasLock(); - - /* - * Purge the pending access queue as it might mark a connector as - * dirty when the push would otherwise be ignored because there are - * no changes to push. - */ - session.getService().runPendingAccessTasks(session); - - if (!getConnectorTracker().hasDirtyConnectors()) { - // Do not push if there is nothing to push - return; - } - if (!getPushConfiguration().getPushMode().isEnabled()) { - throw new IllegalStateException("Push not enabled"); - } + if (session == null) { + throw new UIDetachedException("Cannot push a detached UI"); + } + assert session.hasLock(); - if (pushConnection == null) { - hasPendingPush = true; - } else { - pushConnection.push(); - } - } else { - throw new UIDetachedException("Trying to push a detached UI"); + if (!getPushConfiguration().getPushMode().isEnabled()) { + throw new IllegalStateException("Push not enabled"); + } + assert pushConnection != null; + + /* + * Purge the pending access queue as it might mark a connector as dirty + * when the push would otherwise be ignored because there are no changes + * to push. + */ + session.getService().runPendingAccessTasks(session); + + if (!getConnectorTracker().hasDirtyConnectors()) { + // Do not push if there is nothing to push + return; } + + pushConnection.push(); } /** * Returns the internal push connection object used by this UI. This method - * should only be called by the framework. If the returned PushConnection is - * not null, it is guaranteed to have {@code isConnected() == true}. + * should only be called by the framework. *

* This method is not intended to be overridden. If it is overridden, care * should be taken since this method might be called in situations where * {@link UI#getCurrent()} does not return this UI. * - * @return the push connection used by this UI, null if there - * is no active push connection. + * @return the push connection used by this UI, or {@code null} if push is + * not available. */ public PushConnection getPushConnection() { - assert (pushConnection == null || pushConnection.isConnected()); + assert !(getPushConfiguration().getPushMode().isEnabled() && pushConnection == null); return pushConnection; } /** * Sets the internal push connection object used by this UI. This method - * should only be called by the framework. If {@pushConnection} is not null, - * its {@code isConnected()} must be true. + * should only be called by the framework. + *

+ * The {@code pushConnection} argument must be non-null if and only if + * {@code getPushConfiguration().getPushMode().isEnabled()}. * * @param pushConnection * the push connection to use for this UI */ public void setPushConnection(PushConnection pushConnection) { - // If pushMode is disabled then there should never be a pushConnection - assert (pushConnection == null || getPushConfiguration().getPushMode() - .isEnabled()); - assert (pushConnection == null || pushConnection.isConnected()); + // If pushMode is disabled then there should never be a pushConnection; + // if enabled there should always be + assert (pushConnection == null) + ^ getPushConfiguration().getPushMode().isEnabled(); if (pushConnection == this.pushConnection) { return; } - if (this.pushConnection != null) { + if (this.pushConnection != null && this.pushConnection.isConnected()) { this.pushConnection.disconnect(); } this.pushConnection = pushConnection; - if (pushConnection != null && hasPendingPush) { - hasPendingPush = false; - pushConnection.push(); - } } /** -- 2.39.5