]> source.dussan.org Git - vaadin-framework.git/commitdiff
Handle push disconnections and reconnections more reliably (#11831, #11922)
authorJohannes Dahlström <johannesd@vaadin.com>
Wed, 12 Jun 2013 11:16:57 +0000 (14:16 +0300)
committerVaadin Code Review <review@vaadin.com>
Wed, 12 Jun 2013 14:51:27 +0000 (14:51 +0000)
Client-side:

* Call onOpen() also after a successful reconnection
* Reliably call onClose() and try to reconnect after disconnection
* Don't try to reconnect if !isApplicationRunning() after push
* Queue messages while trying to reconnect (state CONNECT_PENDING)

Server-side:

* Implement AtmosphereResourceEventListener.onDisconnect()
* Push marked as pending until client reconnects (if ever)

Change-Id: I1783eb72eb7005b07cae786d8ec8371da3903108

WebContent/VAADIN/jquery.atmosphere.js
client/src/com/vaadin/client/communication/AtmospherePushConnection.java
server/src/com/vaadin/server/communication/PushHandler.java

index 2b176668c19586dd4acf2937a6b67a5c1240f3d1..eb57ed6d53222083ae4b360d29afa05b86e24a08 100644 (file)
@@ -341,12 +341,7 @@ jQuery.atmosphere = function() {
                 _request.ctime = jQuery.now();
 
                 if (_request.transport != 'websocket' && _request.transport != 'sse') {
-                    // Gives a chance to the connection to be established before calling the callback
-                    setTimeout(function() {
-                        _open('opening', _request.transport, _request);
-                    }, 500);
                     _executeRequest();
-
                 } else if (_request.transport == 'websocket') {
                     if (!_supportWebsocket()) {
                         _reconnectWithFallbackTransport("Websocket is not supported, using request.fallbackTransport (" + _request.fallbackTransport + ")");
@@ -685,6 +680,8 @@ jQuery.atmosphere = function() {
                     _close();
                 };
 
+                request.closed = false;
+
                 if (_response.error == null) {
                     _response.request = request;
                     var prevState = _response.state;
@@ -1025,10 +1022,6 @@ jQuery.atmosphere = function() {
                     jQuery.atmosphere.debug("Using URL: " + location);
                 }
 
-                if (webSocketOpened) {
-                    _open('re-opening', "websocket", _request);
-                }
-
                 if (webSocketOpened && !_request.reconnect) {
                     if (_websocket != null) {
                         _clearState();
@@ -1068,9 +1061,7 @@ jQuery.atmosphere = function() {
                         jQuery.atmosphere.debug("Websocket successfully opened");
                     }
 
-                    if (!webSocketOpened) {
-                        _open('opening', "websocket", _request);
-                    }
+                    _open('opening', 'websocket', _request);
 
                     webSocketOpened = true;
                     _websocket.webSocketOpened = webSocketOpened;
@@ -1445,10 +1436,14 @@ jQuery.atmosphere = function() {
                         if (_abordingConnection) {
                             return;
                         }
+
                         _response.error = null;
                         var skipCallbackInvocation = false;
                         var update = false;
 
+                        if (rq.transport == 'streaming' && ajaxRequest.readyState == 2) {
+                            _open('opening', rq.transport, rq);
+                        }
 
                         // Opera doesn't call onerror if the server disconnect.
                         if (jQuery.browser.opera
@@ -1483,6 +1478,7 @@ jQuery.atmosphere = function() {
                                 // Prevent onerror callback to be called
                                 _response.errorHandled = true;
                                 _clearState();
+                                _invokeClose(true);
                                 reconnectF();
                                 return;
                             }
@@ -1579,6 +1575,11 @@ jQuery.atmosphere = function() {
                             }
 
                             _verifyStreamingLength(ajaxRequest, rq);
+
+                            if (rq.transport == 'streaming' && rq.readyState == 4) {
+                                _invokeClose(true);
+                                reconnectF();
+                            }
                         }
                     };
                     ajaxRequest.send(rq.data);
@@ -1669,16 +1670,12 @@ jQuery.atmosphere = function() {
                         _response.status = status == 0 ? 204 : status;
                         _response.reason = status == 0 ? "Server resumed the connection or down." : "OK";
 
-                        var reconnectInterval = (request.connectTimeout == -1) ? 0 : request.connectTimeout;
+                        var reconnectInterval = request.reconnectInterval;
 
                         // Reconnect immedialtely
-                        if (!force) {
-                            request.id = setTimeout(function () {
-                                _executeRequest(request);
-                            }, reconnectInterval);
-                        } else {
+                        request.id = setTimeout(function () {
                             _executeRequest(request);
-                        }
+                        }, reconnectInterval);
                     }
                 }
             }
index 506716040da8638567f1ee3463615f3acdef7268..20ccd451733346a7560a0d1846b316b01aca7149 100644 (file)
@@ -292,6 +292,14 @@ public class AtmospherePushConnection implements PushConnection {
             message = message.substring(9, message.length() - 1);
             connection.handlePushMessage(message);
         }
+
+        if (!connection.isApplicationRunning()) {
+            disconnect(new Command() {
+                @Override
+                public void execute() {
+                }
+            });
+        }
     }
 
     /**
@@ -309,9 +317,21 @@ public class AtmospherePushConnection implements PushConnection {
      * the connection until successful.
      * 
      */
-    protected void onError() {
-        VConsole.error("Push connection using " + getConfig().getTransport()
-                + " failed!");
+    protected void onError(AtmosphereResponse response) {
+        state = State.DISCONNECTED;
+        errorHandler.onError("Push connection using "
+                + getConfig().getTransport() + " failed!",
+                response.getStatusCode());
+    }
+
+    protected void onClose(AtmosphereResponse response) {
+        VConsole.log("Push connection closed, awaiting reconnection");
+        state = State.CONNECT_PENDING;
+    }
+
+    protected void onReconnect(JavaScriptObject request,
+            final AtmosphereResponse response) {
+        VConsole.log("Reopening push connection");
     }
 
     public static abstract class AbstractJSO extends JavaScriptObject {
@@ -370,6 +390,10 @@ public class AtmospherePushConnection implements PushConnection {
 
         }
 
+        public final int getStatusCode() {
+            return getIntValue("status");
+        }
+
         public final String getResponseBody() {
             return getStringValue("responseBody");
         }
@@ -394,7 +418,7 @@ public class AtmospherePushConnection implements PushConnection {
             transport: 'websocket',
             fallbackTransport: 'streaming',
             contentType: 'application/json; charset=UTF-8',
-            reconnectInterval: '5000',
+            reconnectInterval: 5000,
             maxReconnectOnClose: 10000000, 
             trackMessageLength: true,
             messageDelimiter: String.fromCharCode(@com.vaadin.shared.communication.PushConstants::MESSAGE_DELIMITER)
@@ -414,11 +438,17 @@ public class AtmospherePushConnection implements PushConnection {
             self.@com.vaadin.client.communication.AtmospherePushConnection::onMessage(*)(response);
         });
         config.onError = $entry(function(response) {
-            self.@com.vaadin.client.communication.AtmospherePushConnection::onError()(response);
+            self.@com.vaadin.client.communication.AtmospherePushConnection::onError(*)(response);
         });
         config.onTransportFailure = $entry(function(reason,request) {
             self.@com.vaadin.client.communication.AtmospherePushConnection::onTransportFailure(*)(reason);
         });
+        config.onClose = $entry(function(response) {
+            self.@com.vaadin.client.communication.AtmospherePushConnection::onClose(*)(response);
+        });
+        config.onReconnect = $entry(function(request, response) {
+            self.@com.vaadin.client.communication.AtmospherePushConnection::onReconnect(*)(request, response);
+        });
 
         return $wnd.jQueryVaadin.atmosphere.subscribe(config);
     }-*/;
index 6b853063a7baac965f2ec7ef496f088868105a92..1c50f79349534074c2bad2c34a2a5a8f03542459 100644 (file)
@@ -28,6 +28,7 @@ import org.atmosphere.cpr.AtmosphereRequest;
 import org.atmosphere.cpr.AtmosphereResource;
 import org.atmosphere.cpr.AtmosphereResource.TRANSPORT;
 import org.atmosphere.cpr.AtmosphereResourceEvent;
+import org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter;
 import org.json.JSONException;
 
 import com.vaadin.server.LegacyCommunicationManager.InvalidUIDLSecurityKeyException;
@@ -51,7 +52,8 @@ import com.vaadin.ui.UI;
  * @author Vaadin Ltd
  * @since 7.1
  */
-public class PushHandler implements AtmosphereHandler {
+public class PushHandler extends AtmosphereResourceEventListenerAdapter
+        implements AtmosphereHandler {
 
     /**
      * Callback interface used internally to process an event with the
@@ -67,12 +69,15 @@ public class PushHandler implements AtmosphereHandler {
      * open by calling resource.suspend(). If there is a pending push, send it
      * now.
      */
-    private static PushEventCallback establishCallback = new PushEventCallback() {
+    private final PushEventCallback establishCallback = new PushEventCallback() {
         @Override
         public void run(AtmosphereResource resource, UI ui) throws IOException {
             getLogger().log(Level.FINER,
                     "New push connection with transport {0}",
                     resource.transport());
+
+            resource.addEventListener(PushHandler.this);
+
             resource.getResponse().setContentType("text/plain; charset=UTF-8");
 
             VaadinSession session = ui.getSession();
@@ -122,9 +127,12 @@ public class PushHandler implements AtmosphereHandler {
      * the request and send changed UI state via the push channel (we do not
      * respond to the request directly.)
      */
-    private static PushEventCallback receiveCallback = new PushEventCallback() {
+    private final PushEventCallback receiveCallback = new PushEventCallback() {
         @Override
         public void run(AtmosphereResource resource, UI ui) throws IOException {
+            getLogger().log(Level.FINER, "Received message from resource {0}",
+                    resource.uuid());
+
             AtmosphereRequest req = resource.getRequest();
 
             AtmospherePushConnection connection = getConnectionForUI(ui);
@@ -167,7 +175,7 @@ public class PushHandler implements AtmosphereHandler {
     /**
      * Callback used when a connection is closed by the client.
      */
-    PushEventCallback disconnectCallback = new PushEventCallback() {
+    private final PushEventCallback disconnectCallback = new PushEventCallback() {
         @Override
         public void run(AtmosphereResource resource, UI ui) throws IOException {
             PushMode pushMode = ui.getPushConfiguration().getPushMode();
@@ -187,7 +195,7 @@ public class PushHandler implements AtmosphereHandler {
                      * mode has been set to disabled, just clean up some stuff
                      * and be done with it
                      */
-                    getLogger().log(Level.FINEST,
+                    getLogger().log(Level.FINER,
                             "Connection closed for resource {0}", id);
                 } else {
                     /*
@@ -195,7 +203,7 @@ public class PushHandler implements AtmosphereHandler {
                      * tab.
                      */
                     getLogger()
-                            .log(Level.FINE,
+                            .log(Level.FINER,
                                     "Connection unexpectedly closed for resource {0} with transport {1}",
                                     new Object[] { id, resource.transport() });
                 }
@@ -316,7 +324,8 @@ public class PushHandler implements AtmosphereHandler {
 
         String id = resource.uuid();
         if (event.isCancelled()) {
-            callWithUi(resource, disconnectCallback);
+            // Disconnected for whatever reason, handle in onDisconnect() as
+            // it's more reliable
         } else if (event.isResuming()) {
             // A connection that was suspended earlier was resumed (committed to
             // the client.) Should only happen if the transport is JSONP or
@@ -351,6 +360,13 @@ public class PushHandler implements AtmosphereHandler {
         }
     }
 
+    @Override
+    public void onDisconnect(AtmosphereResourceEvent event) {
+        // Log event on trace level
+        super.onDisconnect(event);
+        callWithUi(event.getResource(), disconnectCallback);
+    }
+
     @Override
     public void destroy() {
     }