import java.io.StringReader;
import java.io.StringWriter;
import java.io.Writer;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.AtmosphereResource.TRANSPORT;
-import org.json.JSONException;
import com.vaadin.shared.communication.PushConstants;
import com.vaadin.ui.UI;
/**
- * {@link PushConnection} implementation using the Atmosphere push support that
- * is by default included in Vaadin.
+ * A {@link PushConnection} implementation using the Atmosphere push support
+ * that is by default included in Vaadin.
*
* @author Vaadin Ltd
* @since 7.1
}
}
+ protected enum State {
+ /**
+ * Not connected. Trying to push will set the connection state to
+ * PUSH_PENDING or RESPONSE_PENDING and defer sending the message until
+ * a connection is established.
+ */
+ DISCONNECTED,
+
+ /**
+ * Not connected. An asynchronous push is pending the opening of the
+ * connection.
+ */
+ PUSH_PENDING,
+
+ /**
+ * Not connected. A response to a client request is pending the opening
+ * of the connection.
+ */
+ RESPONSE_PENDING,
+
+ /**
+ * Connected. Messages can be sent through the connection.
+ */
+ CONNECTED;
+ }
+
+ private State state = State.DISCONNECTED;
private UI ui;
private AtmosphereResource resource;
- private Future<Object> outgoingMessage;
private FragmentedMessage incomingMessage;
- public AtmospherePushConnection(UI ui, AtmosphereResource resource) {
+ public AtmospherePushConnection(UI ui) {
this.ui = ui;
- this.resource = resource;
}
@Override
public void push() {
- assert isConnected();
- try {
- push(true);
- } catch (IOException e) {
- // TODO Error handling
- throw new RuntimeException("Push failed", e);
- }
+ push(true);
}
/**
- * Pushes pending state changes and client RPC calls to the client.
+ * Pushes pending state changes and client RPC calls to the client. If
+ * {@code isConnected()} is false, defers the push until a connection is
+ * established.
*
* @param async
* True if this push asynchronously originates from the server,
* false if it is a response to a client request.
- * @throws IOException
*/
- protected void push(boolean async) throws IOException {
- Writer writer = new StringWriter();
- try {
- new UidlWriter().write(getUI(), writer, false, async);
- } catch (JSONException e) {
- throw new IOException("Error writing UIDL", e);
+ public void push(boolean async) {
+ if (!isConnected()) {
+ if (async && state != State.RESPONSE_PENDING) {
+ state = State.PUSH_PENDING;
+ } else {
+ state = State.RESPONSE_PENDING;
+ }
+ } else {
+ try {
+ Writer writer = new StringWriter();
+ new UidlWriter().write(getUI(), writer, false, async);
+ sendMessage("for(;;);[{" + writer.toString() + "}]");
+ } catch (Exception e) {
+ throw new RuntimeException("Push failed", e);
+ }
}
- sendMessage("for(;;);[{" + writer.toString() + "}]");
}
/**
- * Sends the given message to the current client.
+ * Sends the given message to the current client. Cannot be called if
+ * {@isConnected()} is false.
*
* @param message
* The message to send
*/
void sendMessage(String message) {
+ assert (isConnected());
// "Broadcast" the changes to the single client only
- outgoingMessage = getResource().getBroadcaster().broadcast(message,
- getResource());
+ getResource().getBroadcaster().broadcast(message, getResource());
}
/**
*/
protected Reader receiveMessage(Reader reader) throws IOException {
- if (resource.transport() != TRANSPORT.WEBSOCKET) {
+ if (resource == null || resource.transport() != TRANSPORT.WEBSOCKET) {
return reader;
}
@Override
public boolean isConnected() {
- return resource != null
- && resource.getBroadcaster().getAtmosphereResources()
- .contains(resource);
+ assert (state == State.CONNECTED) ^ (resource == null);
+ return state == State.CONNECTED;
+ }
+
+ /**
+ * Associates this {@code AtmospherePushConnection} with the given
+ * {@AtmosphereResource} representing an established
+ * push connection. If already connected, calls {@link #disconnect()} first.
+ * If there is a deferred push, carries it out via the new connection.
+ *
+ * @since 7.2
+ */
+ public void connect(AtmosphereResource resource) {
+
+ assert resource != null;
+ assert resource != this.resource;
+
+ if (isConnected()) {
+ disconnect();
+ }
+
+ this.resource = resource;
+ State oldState = state;
+ state = State.CONNECTED;
+
+ if (oldState == State.PUSH_PENDING
+ || oldState == State.RESPONSE_PENDING) {
+ // Sending a "response" message (async=false) also takes care of a
+ // pending push, but not vice versa
+ push(oldState == State.PUSH_PENDING);
+ }
}
/**
@Override
public void disconnect() {
assert isConnected();
-
- if (outgoingMessage != null) {
- // Wait for the last message to be sent before closing the
- // connection (assumes that futures are completed in order)
- try {
- outgoingMessage.get(1000, TimeUnit.MILLISECONDS);
- } catch (TimeoutException e) {
- getLogger()
- .log(Level.INFO,
- "Timeout waiting for messages to be sent to client before disconnect");
- } catch (Exception e) {
- getLogger()
- .log(Level.INFO,
- "Error waiting for messages to be sent to client before disconnect");
- }
- outgoingMessage = null;
- }
-
resource.resume();
resource = null;
- }
-
- /**
- * @since
- * @return
- */
- private static Logger getLogger() {
- return Logger.getLogger(AtmospherePushConnection.class.getName());
+ state = State.DISCONNECTED;
}
}
/**
* Represents a bidirectional ("push") connection between a single UI and its
- * client-side.
+ * client-side. A single {@code PushConnection} instance is bound to a UI as
+ * long as push is enabled in that UI, even if the actual connection is
+ * momentarily dropped either due to a network failure or as a normal part of
+ * the transport mechanism.
+ * <p>
+ * This interface is an internal API, only meant to be used by the framework.
*
* @author Vaadin Ltd
* @since 7.1
public interface PushConnection {
/**
- * Pushes pending state changes and client RPC calls to the client. Cannot
- * be called if {@link #isConnected()} is false. It is NOT safe to invoke
- * this method if not holding the session lock.
+ * Pushes pending state changes and client RPC calls to the client. Can be
+ * called even if {@link #isConnected()} is false; the push will be deferred
+ * until a connection is available. It is NOT safe to invoke this method if
+ * not holding the session lock.
* <p>
* This is internal API; please use {@link UI#push()} instead.
*/
@Override
public void run(AtmosphereResource resource, UI ui) throws IOException {
getLogger().log(Level.FINER,
- "New push connection with transport {0}",
- resource.transport());
+ "New push connection for resource {0} with transport {1}",
+ new Object[] { resource.uuid(), resource.transport() });
resource.addEventListener(PushHandler.this);
resource.suspend();
- AtmospherePushConnection connection = new AtmospherePushConnection(
- ui, resource);
-
- ui.setPushConnection(connection);
+ AtmospherePushConnection connection = getConnectionForUI(ui);
+ assert (connection != null);
+ connection.connect(resource);
}
};
@Override
public void run(AtmosphereResource resource, UI ui) throws IOException {
PushMode pushMode = ui.getPushConfiguration().getPushMode();
- AtmospherePushConnection pushConnection = getConnectionForUI(ui);
+ AtmospherePushConnection connection = getConnectionForUI(ui);
String id = resource.uuid();
- if (pushConnection == null) {
+ if (connection == null) {
getLogger()
.log(Level.WARNING,
"Could not find push connection to close: {0} with transport {1}",
"Connection unexpectedly closed for resource {0} with transport {1}",
new Object[] { id, resource.transport() });
}
- ui.setPushConnection(null);
+ connection.disconnect();
}
}
};
private static AtmospherePushConnection getConnectionForUI(UI ui) {
PushConnection pushConnection = ui.getPushConnection();
if (pushConnection instanceof AtmospherePushConnection) {
- assert pushConnection.isConnected();
return (AtmospherePushConnection) pushConnection;
+ } else {
+ return null;
}
- return null;
}
@Override
break;
case JSONP:
case LONG_POLLING:
- resource.resume();
+ disconnect(event);
break;
default:
getLogger().log(Level.SEVERE, "Unknown transport {0}",
disconnect(event);
}
- @Override
- public void onResume(AtmosphereResourceEvent event) {
- // Log event on trace level
- super.onResume(event);
- disconnect(event);
- }
-
@Override
public void destroy() {
}
*/
private static void sendRefreshAndDisconnect(AtmosphereResource resource)
throws IOException {
- AtmospherePushConnection connection = new AtmospherePushConnection(
- null, resource);
+ AtmospherePushConnection connection = new AtmospherePushConnection(null);
+ connection.connect(resource);
try {
connection.sendMessage(VaadinService
.createCriticalNotificationJSON(null, null, null, null));
import java.util.Collections;
import com.vaadin.server.VaadinSession;
+import com.vaadin.server.communication.AtmospherePushConnection;
import com.vaadin.shared.communication.PushMode;
import com.vaadin.shared.ui.ui.Transport;
import com.vaadin.shared.ui.ui.UIState.PushConfigurationState;
throw new IllegalArgumentException("Push mode cannot be null");
}
- if (pushMode.isEnabled()) {
- VaadinSession session = ui.getSession();
- if (session != null && !session.getService().ensurePushAvailable()) {
- throw new IllegalStateException(
- "Push is not available. See previous log messages for more information.");
- }
+ VaadinSession session = ui.getSession();
+
+ if (session == null) {
+ throw new UIDetachedException(
+ "Cannot set the push mode for a detached UI");
+ }
+
+ assert session.hasLock();
+
+ if (pushMode.isEnabled() && !session.getService().ensurePushAvailable()) {
+ throw new IllegalStateException(
+ "Push is not available. See previous log messages for more information.");
}
- /*
- * Client-side will open a new connection or disconnect the old
- * connection, so there's nothing more to do on the server at this
- * point.
- */
- getState().mode = pushMode;
+ PushMode oldMode = getState().mode;
+ if (oldMode != pushMode) {
+ getState().mode = pushMode;
+
+ if (!oldMode.isEnabled() && pushMode.isEnabled()) {
+ // The push connection is initially in a disconnected state;
+ // the client will establish the connection
+ ui.setPushConnection(new AtmospherePushConnection(ui));
+ }
+ // Nothing to do here if disabling push;
+ // the client will close the connection
+ }
}
/*
@Override
public Collection<String> getParameterNames() {
- return Collections
- .unmodifiableCollection(ui.getState(false).pushConfiguration.parameters
- .keySet());
+ return Collections.unmodifiableCollection(getState(false).parameters
+ .keySet());
}
}
import com.vaadin.shared.Connector;
import com.vaadin.shared.EventId;
import com.vaadin.shared.MouseEventDetails;
+import com.vaadin.shared.communication.PushMode;
import com.vaadin.shared.ui.ui.DebugWindowClientRpc;
import com.vaadin.shared.ui.ui.DebugWindowServerRpc;
import com.vaadin.shared.ui.ui.ScrollClientRpc;
} else {
if (session == null) {
detach();
- // Close the push connection when UI is detached. Otherwise the
+ // Disable push when the UI is detached. Otherwise the
// push connection and possibly VaadinSession will live on.
+ getPushConfiguration().setPushMode(PushMode.DISABLED);
setPushConnection(null);
}
this.session = session;
private transient PushConnection pushConnection = null;
- private boolean hasPendingPush = false;
-
private LocaleService localeService = new LocaleService(this,
getState(false).localeServiceState);
* Pushes the pending changes and client RPC invocations of this UI to the
* client-side.
* <p>
+ * If push is enabled, but the push connection is not currently open, the
+ * push will be done when the connection is established.
+ * <p>
* As with all UI methods, the session must be locked when calling this
* method. It is also recommended that {@link UI#getCurrent()} is set up to
* return this UI since writing the response may invoke logic in any
*/
public void push() {
VaadinSession session = getSession();
- if (session != null) {
- assert session.hasLock();
-
- /*
- * Purge the pending access queue as it might mark a connector as
- * dirty when the push would otherwise be ignored because there are
- * no changes to push.
- */
- session.getService().runPendingAccessTasks(session);
-
- if (!getConnectorTracker().hasDirtyConnectors()) {
- // Do not push if there is nothing to push
- return;
- }
- if (!getPushConfiguration().getPushMode().isEnabled()) {
- throw new IllegalStateException("Push not enabled");
- }
+ if (session == null) {
+ throw new UIDetachedException("Cannot push a detached UI");
+ }
+ assert session.hasLock();
- if (pushConnection == null) {
- hasPendingPush = true;
- } else {
- pushConnection.push();
- }
- } else {
- throw new UIDetachedException("Trying to push a detached UI");
+ if (!getPushConfiguration().getPushMode().isEnabled()) {
+ throw new IllegalStateException("Push not enabled");
+ }
+ assert pushConnection != null;
+
+ /*
+ * Purge the pending access queue as it might mark a connector as dirty
+ * when the push would otherwise be ignored because there are no changes
+ * to push.
+ */
+ session.getService().runPendingAccessTasks(session);
+
+ if (!getConnectorTracker().hasDirtyConnectors()) {
+ // Do not push if there is nothing to push
+ return;
}
+
+ pushConnection.push();
}
/**
* Returns the internal push connection object used by this UI. This method
- * should only be called by the framework. If the returned PushConnection is
- * not null, it is guaranteed to have {@code isConnected() == true}.
+ * should only be called by the framework.
* <p>
* This method is not intended to be overridden. If it is overridden, care
* should be taken since this method might be called in situations where
* {@link UI#getCurrent()} does not return this UI.
*
- * @return the push connection used by this UI, <code>null</code> if there
- * is no active push connection.
+ * @return the push connection used by this UI, or {@code null} if push is
+ * not available.
*/
public PushConnection getPushConnection() {
- assert (pushConnection == null || pushConnection.isConnected());
+ assert !(getPushConfiguration().getPushMode().isEnabled() && pushConnection == null);
return pushConnection;
}
/**
* Sets the internal push connection object used by this UI. This method
- * should only be called by the framework. If {@pushConnection} is not null,
- * its {@code isConnected()} must be true.
+ * should only be called by the framework.
+ * <p>
+ * The {@code pushConnection} argument must be non-null if and only if
+ * {@code getPushConfiguration().getPushMode().isEnabled()}.
*
* @param pushConnection
* the push connection to use for this UI
*/
public void setPushConnection(PushConnection pushConnection) {
- // If pushMode is disabled then there should never be a pushConnection
- assert (pushConnection == null || getPushConfiguration().getPushMode()
- .isEnabled());
- assert (pushConnection == null || pushConnection.isConnected());
+ // If pushMode is disabled then there should never be a pushConnection;
+ // if enabled there should always be
+ assert (pushConnection == null)
+ ^ getPushConfiguration().getPushMode().isEnabled();
if (pushConnection == this.pushConnection) {
return;
}
- if (this.pushConnection != null) {
+ if (this.pushConnection != null && this.pushConnection.isConnected()) {
this.pushConnection.disconnect();
}
this.pushConnection = pushConnection;
- if (pushConnection != null && hasPendingPush) {
- hasPendingPush = false;
- pushConnection.push();
- }
}
/**