Browse Source

Use same reconnect logic for Push as for XHR (#17075)

* No longer queue message separately in PushConnection
* Use XHR for client to server communication when using long polling (or streaming)
* Websocket is used for communication in both directions

Note that using XHR for client to server responses at the same time
as a push connection is open means we must take into account on the
client side that we may receive message in the wrong order. This
will be addressed in the following change.

Change-Id: I97706db3481379593e71dc5bb552727a0486692b
tags/7.6.0.alpha5
Artur Signell 9 years ago
parent
commit
3e6bb5541c

+ 47
- 34
client/src/com/vaadin/client/communication/AtmospherePushConnection.java View File

@@ -16,7 +16,6 @@

package com.vaadin.client.communication;

import java.util.ArrayList;
import java.util.logging.Logger;

import com.google.gwt.core.client.JavaScriptObject;
@@ -116,8 +115,6 @@ public class AtmospherePushConnection implements PushConnection {

private JavaScriptObject socket;

private ArrayList<JsonObject> messageQueue = new ArrayList<JsonObject>();

private State state = State.CONNECT_PENDING;

private AtmosphereConfiguration config;
@@ -220,17 +217,36 @@ public class AtmospherePushConnection implements PushConnection {
}
}

@Override
public boolean isBidirectional() {
if (transport == null) {
return false;
}

if (!transport.equals("websocket")) {
// If we are not using websockets, we want to send XHRs
return false;
}
if (state == State.CONNECT_PENDING) {
// Not sure yet, let's go for using websockets still as still will
// delay the message until a connection is established. When the
// connection is established, bi-directionality will be checked
// again to be sure
}
return true;

};

@Override
public void push(JsonObject message) {
switch (state) {
case CONNECT_PENDING:
assert isActive();
getLogger().info("Queuing push message: " + message.toJson());
messageQueue.add(message);
break;
case CONNECTED:
assert isActive();
getLogger().info("Sending push message: " + message.toJson());
if (!isBidirectional()) {
throw new IllegalStateException(
"This server to client push connection should not be used to send client to server messages");
}
if (state == State.CONNECTED) {
getLogger().info(
"Sending push (" + transport + ") message to server: "
+ message.toJson());

if (transport.equals("websocket")) {
FragmentedMessage fragmented = new FragmentedMessage(
@@ -241,11 +257,15 @@ public class AtmospherePushConnection implements PushConnection {
} else {
doPush(socket, message.toJson());
}
break;
case DISCONNECT_PENDING:
case DISCONNECTED:
throw new IllegalStateException("Can not push after disconnecting");
return;
}

if (state == State.CONNECT_PENDING) {
getCommunicationProblemHandler().pushNotConnected(message);
return;
}

throw new IllegalStateException("Can not push after disconnecting");
}

protected AtmosphereConfiguration getConfig() {
@@ -274,16 +294,11 @@ public class AtmospherePushConnection implements PushConnection {
* @since 7.2
*/
protected void onConnect(AtmosphereResponse response) {

transport = response.getTransport();
getCommunicationProblemHandler().pushOk(this);
switch (state) {
case CONNECT_PENDING:
state = State.CONNECTED;
for (JsonObject message : messageQueue) {
push(message);
}
messageQueue.clear();
getCommunicationProblemHandler().pushOk(this);
break;
case DISCONNECT_PENDING:
// Set state to connected to make disconnect close the connection
@@ -331,11 +346,16 @@ public class AtmospherePushConnection implements PushConnection {

protected void onMessage(AtmosphereResponse response) {
String message = response.getResponseBody();
if (message.startsWith("for(;;);")) {
getLogger().info("Received push message: " + message);
// "for(;;);[{json}]" -> "{json}"
message = message.substring(9, message.length() - 1);
connection.getServerMessageHandler().handleMessage(message);
String json = ServerCommunicationHandler.stripJSONWrapping(message);
if (json == null) {
// Invalid string (not wrapped as expected)
getCommunicationProblemHandler().pushInvalidContent(this, message);
return;
} else {
getLogger().info(
"Received push (" + getTransportType() + ") message: "
+ json);
connection.getServerMessageHandler().handleMessage(json);
}
}

@@ -361,7 +381,6 @@ public class AtmospherePushConnection implements PushConnection {
}

protected void onClose(AtmosphereResponse response) {
getLogger().info("Push connection closed");
state = State.CONNECT_PENDING;
getCommunicationProblemHandler().pushClosed(this);
}
@@ -376,7 +395,6 @@ public class AtmospherePushConnection implements PushConnection {
if (state == State.CONNECTED) {
state = State.CONNECT_PENDING;
}
getLogger().info("Reopening push connection");
getCommunicationProblemHandler().pushReconnectPending(this);
}

@@ -567,11 +585,6 @@ public class AtmospherePushConnection implements PushConnection {
return pushJs;
}

/*
* (non-Javadoc)
*
* @see com.vaadin.client.communication.PushConnection#getTransportType()
*/
@Override
public String getTransportType() {
return transport;

+ 21
- 0
client/src/com/vaadin/client/communication/CommunicationProblemHandler.java View File

@@ -19,6 +19,8 @@ import com.google.gwt.http.client.Request;
import com.google.gwt.http.client.Response;
import com.vaadin.client.ApplicationConnection;

import elemental.json.JsonObject;

/**
* Interface for handling problems which occur during communications with the
* server.
@@ -168,4 +170,23 @@ public interface CommunicationProblemHandler {
*/
void xhrOk();

/**
* Called when a message is to be sent to the server through the push
* channel but the push channel is not connected
*
* @param payload
* The payload to send to the server
*/
void pushNotConnected(JsonObject payload);

/**
* Called when invalid content (not JSON) was pushed from the server through
* the push connection
*
* @param communicationProblemEvent
* An event containing what was being sent to the server and what
* was returned
*/
void pushInvalidContent(PushConnection pushConnection, String message);

}

+ 11
- 0
client/src/com/vaadin/client/communication/DefaultCommunicationProblemHandler.java View File

@@ -25,6 +25,8 @@ import com.google.gwt.user.client.Timer;
import com.vaadin.client.ApplicationConnection;
import com.vaadin.client.WidgetUtil;

import elemental.json.JsonObject;

/**
* Default implementation of the communication problem handler.
*
@@ -168,6 +170,11 @@ public class DefaultCommunicationProblemHandler implements

}

@Override
public void pushInvalidContent(PushConnection pushConnection, String message) {
// Do nothing for now. Should likely do the same as xhrInvalidContent
}

private ServerCommunicationHandler getServerCommunicationHandler() {
return connection.getServerCommunicationHandler();
}
@@ -249,4 +256,8 @@ public class DefaultCommunicationProblemHandler implements
@Override
public void pushOk(PushConnection pushConnection) {
}

@Override
public void pushNotConnected(JsonObject payload) {
}
}

+ 22
- 5
client/src/com/vaadin/client/communication/PushConnection.java View File

@@ -47,11 +47,13 @@ public interface PushConnection {
* Pushes a message to the server. Will throw an exception if the connection
* is not active (see {@link #isActive()}).
* <p>
* Implementation detail: The implementation is responsible for queuing
* messages that are pushed after {@link #init(ApplicationConnection)} has
* been called but before the connection has internally been set up and then
* replay those messages in the original order when the connection has been
* established.
* Implementation detail: If the push connection is not connected and the
* message can thus not be sent, the implementation must call
* {@link CommunicationProblemHandler#pushNotConnected(JsonObject)}, which
* will retry the send later.
* <p>
* This method must not be called if the push connection is not
* bidirectional (if {@link #isBidirectional()} returns false)
*
* @param payload
* the payload to push
@@ -101,4 +103,19 @@ public interface PushConnection {
*/
public String getTransportType();

/**
* Checks whether this push connection should be used for communication in
* both directions or if a normal XHR should be used for client to server
* communication.
*
* A bidirectional push connection must be able to reliably tell when it is
* connected and when it is not.
*
* @since 7.6
* @return true if the push connection should be used for messages in both
* directions, false if it should only be used for server to client
* messages
*/
public boolean isBidirectional();

}

+ 32
- 8
client/src/com/vaadin/client/communication/ReconnectingCommunicationProblemHandler.java View File

@@ -28,16 +28,17 @@ public class ReconnectingCommunicationProblemHandler extends
DefaultCommunicationProblemHandler {

private enum Type {
HEARTBEAT, XHR;
HEARTBEAT, MESSAGE
}

ReconnectDialog reconnectDialog = GWT.create(ReconnectDialog.class);
int reconnectAttempt = 0;
private Type reconnectionCause = null;;
private Type reconnectionCause = null;

@Override
public void xhrException(CommunicationProblemEvent event) {
handleTemporaryError(Type.XHR, event.getPayload());
getLogger().warning("xhrException");
handleTemporaryError(Type.MESSAGE, event.getPayload());
}

@Override
@@ -64,6 +65,8 @@ public class ReconnectingCommunicationProblemHandler extends
}

private void handleTemporaryError(Type type, final JsonObject payload) {
getLogger().warning("handleTemporaryError(" + type + ")");

reconnectAttempt++;
reconnectionCause = type;
if (!reconnectDialog.isAttached()) {
@@ -143,27 +146,36 @@ public class ReconnectingCommunicationProblemHandler extends

@Override
public void xhrInvalidContent(CommunicationProblemEvent event) {
getLogger().warning("xhrInvalidContent");
super.xhrInvalidContent(event);
};

@Override
public void xhrInvalidStatusCode(CommunicationProblemEvent event) {
handleTemporaryError(Type.XHR, event.getPayload());
getLogger().info(
"Server returned " + event.getResponse().getStatusCode()
+ " for xhr request");
getLogger().warning("xhrInvalidStatusCode");
handleTemporaryError(Type.MESSAGE, event.getPayload());
}

@Override
public void xhrOk() {
resolveTemporaryError(Type.XHR, true);
getLogger().warning("xhrOk");
resolveTemporaryError(Type.MESSAGE, true);
}

private void resolveTemporaryError(Type cause, boolean success) {
private void resolveTemporaryError(Type type, boolean success) {
getLogger().warning("resolveTemporaryError(" + type + ")");

if (reconnectionCause == null) {
// Not trying to reconnect
return;
}
if (reconnectionCause != cause) {
if (reconnectionCause == Type.MESSAGE && type == Type.HEARTBEAT) {
// If a heartbeat goes through while we are trying to re-send an
// XHR, we wait for the XHR to go through
// XHR, we wait for the XHR to go through to avoid removing the
// reconnect dialog and then possible showing it again
return;
}

@@ -178,4 +190,16 @@ public class ReconnectingCommunicationProblemHandler extends
}

}

@Override
public void pushOk(PushConnection pushConnection) {
super.pushOk(pushConnection);
resolveTemporaryError(Type.MESSAGE, true);
}

@Override
public void pushNotConnected(JsonObject payload) {
super.pushNotConnected(payload);
handleTemporaryError(Type.MESSAGE, payload);
}
}

+ 26
- 3
client/src/com/vaadin/client/communication/ServerCommunicationHandler.java View File

@@ -163,8 +163,6 @@ public class ServerCommunicationHandler {
payload.put(ApplicationConstants.CLIENT_TO_SERVER_ID,
clientToServerMessageId++);

getLogger()
.info("Making UIDL Request with params: " + payload.toJson());
if (extraJson != null) {
for (String key : extraJson.keys()) {
payload.put(key, extraJson.get(key));
@@ -185,7 +183,7 @@ public class ServerCommunicationHandler {
* The contents of the request to send
*/
public void send(final JsonObject payload) {
if (push != null) {
if (push != null && push.isBidirectional()) {
push.push(payload);
} else {
xhrConnection.send(payload);
@@ -361,4 +359,29 @@ public class ServerCommunicationHandler {
// Do nothing as they will arrive eventually
}
}

/**
* Strips the JSON wrapping from the given json string with wrapping.
*
* If the given string is not wrapped as expected, returns null
*
* @since
* @param jsonWithWrapping
* the JSON received from the server
* @return an unwrapped JSON string or null if the given string was not
* wrapped
*/
public static String stripJSONWrapping(String jsonWithWrapping) {
if (!jsonWithWrapping
.startsWith(ServerCommunicationHandler.JSON_COMMUNICATION_PREFIX)
|| !jsonWithWrapping
.endsWith(ServerCommunicationHandler.JSON_COMMUNICATION_SUFFIX)) {
return null;
}
return jsonWithWrapping.substring(
ServerCommunicationHandler.JSON_COMMUNICATION_PREFIX.length(),
jsonWithWrapping.length()
- ServerCommunicationHandler.JSON_COMMUNICATION_SUFFIX
.length());
}
}

+ 9
- 8
client/src/com/vaadin/client/communication/ServerMessageHandler.java View File

@@ -233,7 +233,6 @@ public class ServerMessageHandler {
+ " - Original JSON-text:" + jsonText, 200);
return;
}

getLogger().info(
"JSON parsing took " + (new Date().getTime() - start.getTime())
+ "ms");
@@ -286,6 +285,15 @@ public class ServerMessageHandler {
getLogger().info("Handling message from server");
connection.fireEvent(new ResponseHandlingStartedEvent(connection));

// Client id must be updated before server id, as server id update can
// cause a resync (which must use the updated id)
if (json.containsKey(ApplicationConstants.CLIENT_TO_SERVER_ID)) {
int serverNextExpected = json
.getInt(ApplicationConstants.CLIENT_TO_SERVER_ID);
getServerCommunicationHandler().setClientToServerMessageId(
serverNextExpected);
}

final int syncId;
if (json.containsKey(ApplicationConstants.SERVER_SYNC_ID)) {
syncId = json.getInt(ApplicationConstants.SERVER_SYNC_ID);
@@ -324,13 +332,6 @@ public class ServerMessageHandler {
+ "Please verify that the server is up-to-date and that the response data has not been modified in transmission.");
}

if (json.containsKey(ApplicationConstants.CLIENT_TO_SERVER_ID)) {
int serverNextExpected = json
.getInt(ApplicationConstants.CLIENT_TO_SERVER_ID);
getServerCommunicationHandler().setClientToServerMessageId(
serverNextExpected);
}

// Handle redirect
if (json.containsKey("redirect")) {
String url = json.getValueMap("redirect").getString("url");

+ 11
- 16
client/src/com/vaadin/client/communication/XhrConnection.java View File

@@ -136,11 +136,6 @@ public class XhrConnection {

@Override
public void onResponseReceived(Request request, Response response) {
getLogger().info(
"Server visit took "
+ String.valueOf((new Date()).getTime()
- requestStartTime.getTime()) + "ms");

int statusCode = response.getStatusCode();

if (statusCode != 200) {
@@ -153,6 +148,11 @@ public class XhrConnection {
return;
}

getLogger().info(
"Server visit took "
+ String.valueOf((new Date()).getTime()
- requestStartTime.getTime()) + "ms");

String contentType = response.getHeader("Content-Type");
if (contentType == null
|| !contentType.startsWith("application/json")) {
@@ -165,23 +165,18 @@ public class XhrConnection {
// for(;;);["+ realJson +"]"
String responseText = response.getText();

if (!responseText
.startsWith(ServerCommunicationHandler.JSON_COMMUNICATION_PREFIX)) {
final String jsonText = ServerCommunicationHandler
.stripJSONWrapping(responseText);
if (jsonText == null) {
// Invalid string (not wrapped as expected)
getCommunicationProblemHandler().xhrInvalidContent(
new CommunicationProblemEvent(request, payload,
response));
return;
}

final String jsonText = responseText
.substring(
ServerCommunicationHandler.JSON_COMMUNICATION_PREFIX
.length(),
responseText.length()
- ServerCommunicationHandler.JSON_COMMUNICATION_SUFFIX
.length());

getCommunicationProblemHandler().xhrOk();
getLogger().info("Received xhr message: " + jsonText);
getServerMessageHandler().handleMessage(jsonText);
}

@@ -219,7 +214,7 @@ public class XhrConnection {

rb.setCallback(responseHandler);

getLogger().info("Sending request to server");
getLogger().info("Sending xhr message to server: " + payload.toJson());
try {
final Request request = rb.send();


+ 2
- 1
uitest/src/com/vaadin/tests/components/grid/basicfeatures/server/GridSortingTest.java View File

@@ -376,9 +376,10 @@ public class GridSortingTest extends GridBasicFeaturesTest {
}

private void assertLastSortIsUserOriginated(boolean isUserOriginated) {
// Find a message in the log
List<WebElement> userOriginatedMessages = getDriver()
.findElements(
By.xpath("//*[contains(text(),'SortOrderChangeEvent: isUserOriginated')]"));
By.xpath("//div[@id='Log']//*[contains(text(),'SortOrderChangeEvent: isUserOriginated')]"));

Collections.sort(userOriginatedMessages, new Comparator<WebElement>() {
@Override

Loading…
Cancel
Save