From 15b217d26278471ee713f46340981c862e3839d2 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Johannes=20Dahlstr=C3=B6m?= Date: Wed, 12 Jun 2013 14:16:57 +0300 Subject: [PATCH] Handle push disconnections and reconnections more reliably (#11831, #11922) Client-side: * Call onOpen() also after a successful reconnection * Reliably call onClose() and try to reconnect after disconnection * Don't try to reconnect if !isApplicationRunning() after push * Queue messages while trying to reconnect (state CONNECT_PENDING) Server-side: * Implement AtmosphereResourceEventListener.onDisconnect() * Push marked as pending until client reconnects (if ever) Change-Id: I1783eb72eb7005b07cae786d8ec8371da3903108 --- WebContent/VAADIN/jquery.atmosphere.js | 35 ++++++++-------- .../AtmospherePushConnection.java | 40 ++++++++++++++++--- .../server/communication/PushHandler.java | 30 ++++++++++---- 3 files changed, 74 insertions(+), 31 deletions(-) diff --git a/WebContent/VAADIN/jquery.atmosphere.js b/WebContent/VAADIN/jquery.atmosphere.js index 2b176668c1..eb57ed6d53 100644 --- a/WebContent/VAADIN/jquery.atmosphere.js +++ b/WebContent/VAADIN/jquery.atmosphere.js @@ -341,12 +341,7 @@ jQuery.atmosphere = function() { _request.ctime = jQuery.now(); if (_request.transport != 'websocket' && _request.transport != 'sse') { - // Gives a chance to the connection to be established before calling the callback - setTimeout(function() { - _open('opening', _request.transport, _request); - }, 500); _executeRequest(); - } else if (_request.transport == 'websocket') { if (!_supportWebsocket()) { _reconnectWithFallbackTransport("Websocket is not supported, using request.fallbackTransport (" + _request.fallbackTransport + ")"); @@ -685,6 +680,8 @@ jQuery.atmosphere = function() { _close(); }; + request.closed = false; + if (_response.error == null) { _response.request = request; var prevState = _response.state; @@ -1025,10 +1022,6 @@ jQuery.atmosphere = function() { jQuery.atmosphere.debug("Using URL: " + location); } - if (webSocketOpened) { - _open('re-opening', "websocket", _request); - } - if (webSocketOpened && !_request.reconnect) { if (_websocket != null) { _clearState(); @@ -1068,9 +1061,7 @@ jQuery.atmosphere = function() { jQuery.atmosphere.debug("Websocket successfully opened"); } - if (!webSocketOpened) { - _open('opening', "websocket", _request); - } + _open('opening', 'websocket', _request); webSocketOpened = true; _websocket.webSocketOpened = webSocketOpened; @@ -1445,10 +1436,14 @@ jQuery.atmosphere = function() { if (_abordingConnection) { return; } + _response.error = null; var skipCallbackInvocation = false; var update = false; + if (rq.transport == 'streaming' && ajaxRequest.readyState == 2) { + _open('opening', rq.transport, rq); + } // Opera doesn't call onerror if the server disconnect. if (jQuery.browser.opera @@ -1483,6 +1478,7 @@ jQuery.atmosphere = function() { // Prevent onerror callback to be called _response.errorHandled = true; _clearState(); + _invokeClose(true); reconnectF(); return; } @@ -1579,6 +1575,11 @@ jQuery.atmosphere = function() { } _verifyStreamingLength(ajaxRequest, rq); + + if (rq.transport == 'streaming' && rq.readyState == 4) { + _invokeClose(true); + reconnectF(); + } } }; ajaxRequest.send(rq.data); @@ -1669,16 +1670,12 @@ jQuery.atmosphere = function() { _response.status = status == 0 ? 204 : status; _response.reason = status == 0 ? "Server resumed the connection or down." : "OK"; - var reconnectInterval = (request.connectTimeout == -1) ? 0 : request.connectTimeout; + var reconnectInterval = request.reconnectInterval; // Reconnect immedialtely - if (!force) { - request.id = setTimeout(function () { - _executeRequest(request); - }, reconnectInterval); - } else { + request.id = setTimeout(function () { _executeRequest(request); - } + }, reconnectInterval); } } } diff --git a/client/src/com/vaadin/client/communication/AtmospherePushConnection.java b/client/src/com/vaadin/client/communication/AtmospherePushConnection.java index 506716040d..20ccd45173 100644 --- a/client/src/com/vaadin/client/communication/AtmospherePushConnection.java +++ b/client/src/com/vaadin/client/communication/AtmospherePushConnection.java @@ -292,6 +292,14 @@ public class AtmospherePushConnection implements PushConnection { message = message.substring(9, message.length() - 1); connection.handlePushMessage(message); } + + if (!connection.isApplicationRunning()) { + disconnect(new Command() { + @Override + public void execute() { + } + }); + } } /** @@ -309,9 +317,21 @@ public class AtmospherePushConnection implements PushConnection { * the connection until successful. * */ - protected void onError() { - VConsole.error("Push connection using " + getConfig().getTransport() - + " failed!"); + protected void onError(AtmosphereResponse response) { + state = State.DISCONNECTED; + errorHandler.onError("Push connection using " + + getConfig().getTransport() + " failed!", + response.getStatusCode()); + } + + protected void onClose(AtmosphereResponse response) { + VConsole.log("Push connection closed, awaiting reconnection"); + state = State.CONNECT_PENDING; + } + + protected void onReconnect(JavaScriptObject request, + final AtmosphereResponse response) { + VConsole.log("Reopening push connection"); } public static abstract class AbstractJSO extends JavaScriptObject { @@ -370,6 +390,10 @@ public class AtmospherePushConnection implements PushConnection { } + public final int getStatusCode() { + return getIntValue("status"); + } + public final String getResponseBody() { return getStringValue("responseBody"); } @@ -394,7 +418,7 @@ public class AtmospherePushConnection implements PushConnection { transport: 'websocket', fallbackTransport: 'streaming', contentType: 'application/json; charset=UTF-8', - reconnectInterval: '5000', + reconnectInterval: 5000, maxReconnectOnClose: 10000000, trackMessageLength: true, messageDelimiter: String.fromCharCode(@com.vaadin.shared.communication.PushConstants::MESSAGE_DELIMITER) @@ -414,11 +438,17 @@ public class AtmospherePushConnection implements PushConnection { self.@com.vaadin.client.communication.AtmospherePushConnection::onMessage(*)(response); }); config.onError = $entry(function(response) { - self.@com.vaadin.client.communication.AtmospherePushConnection::onError()(response); + self.@com.vaadin.client.communication.AtmospherePushConnection::onError(*)(response); }); config.onTransportFailure = $entry(function(reason,request) { self.@com.vaadin.client.communication.AtmospherePushConnection::onTransportFailure(*)(reason); }); + config.onClose = $entry(function(response) { + self.@com.vaadin.client.communication.AtmospherePushConnection::onClose(*)(response); + }); + config.onReconnect = $entry(function(request, response) { + self.@com.vaadin.client.communication.AtmospherePushConnection::onReconnect(*)(request, response); + }); return $wnd.jQueryVaadin.atmosphere.subscribe(config); }-*/; diff --git a/server/src/com/vaadin/server/communication/PushHandler.java b/server/src/com/vaadin/server/communication/PushHandler.java index 6b853063a7..1c50f79349 100644 --- a/server/src/com/vaadin/server/communication/PushHandler.java +++ b/server/src/com/vaadin/server/communication/PushHandler.java @@ -28,6 +28,7 @@ import org.atmosphere.cpr.AtmosphereRequest; import org.atmosphere.cpr.AtmosphereResource; import org.atmosphere.cpr.AtmosphereResource.TRANSPORT; import org.atmosphere.cpr.AtmosphereResourceEvent; +import org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter; import org.json.JSONException; import com.vaadin.server.LegacyCommunicationManager.InvalidUIDLSecurityKeyException; @@ -51,7 +52,8 @@ import com.vaadin.ui.UI; * @author Vaadin Ltd * @since 7.1 */ -public class PushHandler implements AtmosphereHandler { +public class PushHandler extends AtmosphereResourceEventListenerAdapter + implements AtmosphereHandler { /** * Callback interface used internally to process an event with the @@ -67,12 +69,15 @@ public class PushHandler implements AtmosphereHandler { * open by calling resource.suspend(). If there is a pending push, send it * now. */ - private static PushEventCallback establishCallback = new PushEventCallback() { + private final PushEventCallback establishCallback = new PushEventCallback() { @Override public void run(AtmosphereResource resource, UI ui) throws IOException { getLogger().log(Level.FINER, "New push connection with transport {0}", resource.transport()); + + resource.addEventListener(PushHandler.this); + resource.getResponse().setContentType("text/plain; charset=UTF-8"); VaadinSession session = ui.getSession(); @@ -122,9 +127,12 @@ public class PushHandler implements AtmosphereHandler { * the request and send changed UI state via the push channel (we do not * respond to the request directly.) */ - private static PushEventCallback receiveCallback = new PushEventCallback() { + private final PushEventCallback receiveCallback = new PushEventCallback() { @Override public void run(AtmosphereResource resource, UI ui) throws IOException { + getLogger().log(Level.FINER, "Received message from resource {0}", + resource.uuid()); + AtmosphereRequest req = resource.getRequest(); AtmospherePushConnection connection = getConnectionForUI(ui); @@ -167,7 +175,7 @@ public class PushHandler implements AtmosphereHandler { /** * Callback used when a connection is closed by the client. */ - PushEventCallback disconnectCallback = new PushEventCallback() { + private final PushEventCallback disconnectCallback = new PushEventCallback() { @Override public void run(AtmosphereResource resource, UI ui) throws IOException { PushMode pushMode = ui.getPushConfiguration().getPushMode(); @@ -187,7 +195,7 @@ public class PushHandler implements AtmosphereHandler { * mode has been set to disabled, just clean up some stuff * and be done with it */ - getLogger().log(Level.FINEST, + getLogger().log(Level.FINER, "Connection closed for resource {0}", id); } else { /* @@ -195,7 +203,7 @@ public class PushHandler implements AtmosphereHandler { * tab. */ getLogger() - .log(Level.FINE, + .log(Level.FINER, "Connection unexpectedly closed for resource {0} with transport {1}", new Object[] { id, resource.transport() }); } @@ -316,7 +324,8 @@ public class PushHandler implements AtmosphereHandler { String id = resource.uuid(); if (event.isCancelled()) { - callWithUi(resource, disconnectCallback); + // Disconnected for whatever reason, handle in onDisconnect() as + // it's more reliable } else if (event.isResuming()) { // A connection that was suspended earlier was resumed (committed to // the client.) Should only happen if the transport is JSONP or @@ -351,6 +360,13 @@ public class PushHandler implements AtmosphereHandler { } } + @Override + public void onDisconnect(AtmosphereResourceEvent event) { + // Log event on trace level + super.onDisconnect(event); + callWithUi(event.getResource(), disconnectCallback); + } + @Override public void destroy() { } -- 2.39.5