Browse Source

Handle out of order messages (#11733,#17075)

Change-Id: I1958a84be59068caa377256d43e868e13ed69597
tags/7.6.0.alpha5
Artur Signell 9 years ago
parent
commit
40815a4375

+ 9
- 1
client/src/com/vaadin/client/communication/ServerCommunicationHandler.java View File

@@ -331,12 +331,20 @@ public class ServerCommunicationHandler {
*
* @param clientToServerMessageId
* the new client id to set
* @param force
* true if the id must be updated, false otherwise
*/
public void setClientToServerMessageId(int nextExpectedId) {
public void setClientToServerMessageId(int nextExpectedId, boolean force) {
if (nextExpectedId == clientToServerMessageId) {
// No op as everything matches they way it should
return;
}
if (force) {
getLogger().info(
"Forced update of clientId to " + clientToServerMessageId);
clientToServerMessageId = nextExpectedId;
return;
}

if (nextExpectedId > clientToServerMessageId) {
if (clientToServerMessageId == 0) {

+ 177
- 64
client/src/com/vaadin/client/communication/ServerMessageHandler.java View File

@@ -19,6 +19,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.logging.Level;
@@ -227,8 +228,14 @@ public class ServerMessageHandler {
try {
json = parseJSONResponse(jsonText);
} catch (final Exception e) {
// FIXME
// Should not call endRequest for a asynchronous push message
// but there is currently no way of knowing if this is an async
// message if we get invalid JSON.

// TODO Move parsing out from this method and handle the error the
// same way as if we do not receive the expected prefix and suffix
getServerCommunicationHandler().endRequest();

connection.showCommunicationError(e.getMessage()
+ " - Original JSON-text:" + jsonText, 200);
return;
@@ -236,6 +243,13 @@ public class ServerMessageHandler {
getLogger().info(
"JSON parsing took " + (new Date().getTime() - start.getTime())
+ "ms");

if (getServerId(json) == -1) {
getLogger()
.severe("Response didn't contain a server id. "
+ "Please verify that the server is up-to-date and that the response data has not been modified in transmission.");
}

if (connection.getState() == State.RUNNING) {
handleJSON(json);
} else if (connection.getState() == State.INITIALIZING) {
@@ -261,11 +275,51 @@ public class ServerMessageHandler {
}-*/;

protected void handleJSON(final ValueMap json) {
if (!responseHandlingLocks.isEmpty()) {
// Some component is doing something that can't be interrupted
// (e.g. animation that should be smooth). Enqueue the UIDL
// message for later processing.
getLogger().info("Postponing UIDL handling due to lock...");
final int serverId = getServerId(json);

if (isResynchronize(json) && !isNextExpectedMessage(serverId)) {
// Resynchronize request. We must remove any old pending
// messages and ensure this is handled next. Otherwise we
// would keep waiting for an older message forever (if this
// is triggered by forceHandleMessage)
getLogger().info(
"Received resync message with id " + serverId
+ " while waiting for " + getExpectedServerId());
lastSeenServerSyncId = serverId - 1;
removeOldPendingMessages();
}

boolean locked = !responseHandlingLocks.isEmpty();

if (locked || !isNextExpectedMessage(serverId)) {
// Cannot or should not handle this message right now, either
// because of locks or because it's an out-of-order message

if (locked) {
// Some component is doing something that can't be interrupted
// (e.g. animation that should be smooth). Enqueue the UIDL
// message for later processing.
getLogger().info("Postponing UIDL handling due to lock...");
} else {
// Unexpected server id
if (serverId <= lastSeenServerSyncId) {
// Why is the server re-sending an old package? Ignore it
getLogger().warning(
"Received message with server id " + serverId
+ " but have already seen "
+ lastSeenServerSyncId + ". Ignoring it");
endRequestIfResponse(json);
return;
}

// We are waiting for an earlier message...
getLogger()
.info("Received message with server id "
+ serverId
+ " but expected "
+ getExpectedServerId()
+ ". Postponing handling until the missing message(s) have been received");
}
pendingUIDLMessages.add(new PendingUIDLMessage(json));
if (!forceHandleMessage.isRunning()) {
forceHandleMessage.schedule(MAX_SUSPENDED_TIMEOUT);
@@ -291,45 +345,15 @@ public class ServerMessageHandler {
int serverNextExpected = json
.getInt(ApplicationConstants.CLIENT_TO_SERVER_ID);
getServerCommunicationHandler().setClientToServerMessageId(
serverNextExpected);
serverNextExpected, isResynchronize(json));
}

final int syncId;
if (json.containsKey(ApplicationConstants.SERVER_SYNC_ID)) {
syncId = json.getInt(ApplicationConstants.SERVER_SYNC_ID);

if (serverId != -1) {
/*
* Use sync id unless explicitly set as undefined, as is done by
* e.g. critical server-side notifications
*/
if (syncId != -1) {
if (lastSeenServerSyncId == UNDEFINED_SYNC_ID
|| syncId == (lastSeenServerSyncId + 1)) {
lastSeenServerSyncId = syncId;
} else {
getLogger().warning(
"Expected sync id: " + (lastSeenServerSyncId + 1)
+ ", received: " + syncId
+ ". Resynchronizing from server.");
lastSeenServerSyncId = syncId;
// Copied from below...
ValueMap meta = json.getValueMap("meta");
if (meta == null || !meta.containsKey("async")) {
// End the request if the received message was a
// response, not sent asynchronously
getServerCommunicationHandler().endRequest();
}

resumeResponseHandling(lock);
getServerCommunicationHandler().resynchronize();
return;
}
}
} else {
syncId = -1;
getLogger()
.severe("Server response didn't contain a sync id. "
+ "Please verify that the server is up-to-date and that the response data has not been modified in transmission.");
lastSeenServerSyncId = serverId;
}

// Handle redirect
@@ -403,7 +427,7 @@ public class ServerMessageHandler {

@Override
public void execute() {
assert syncId == -1 || syncId == lastSeenServerSyncId;
assert serverId == -1 || serverId == lastSeenServerSyncId;

handleUIDLDuration.logDuration(" * Loading widgets completed",
10);
@@ -550,13 +574,7 @@ public class ServerMessageHandler {
getLogger().info(
"Referenced paintables: " + getConnectorMap().size());

if (meta == null || !meta.containsKey("async")) {
// End the request if the received message was a response,
// not sent asynchronously
// FIXME
getServerCommunicationHandler().endRequest();
}

endRequestIfResponse(json);
resumeResponseHandling(lock);

if (Profiler.isEnabled()) {
@@ -1437,6 +1455,57 @@ public class ServerMessageHandler {
ApplicationConfiguration.runWhenDependenciesLoaded(c);
}

private void endRequestIfResponse(ValueMap json) {
if (isResponse(json)) {
// End the request if the received message was a
// response, not sent asynchronously
getServerCommunicationHandler().endRequest();
}
}

private boolean isResynchronize(ValueMap json) {
return json.containsKey(ApplicationConstants.RESYNCHRONIZE_ID);
}

private boolean isResponse(ValueMap json) {
ValueMap meta = json.getValueMap("meta");
if (meta == null || !meta.containsKey("async")) {
return true;
}
return false;
}

/**
* Checks if the given serverId is the one we are currently waiting for from
* the server
*/
private boolean isNextExpectedMessage(int serverId) {
if (serverId == -1) {
return true;
}
if (serverId == getExpectedServerId()) {
return true;
}
if (lastSeenServerSyncId == UNDEFINED_SYNC_ID) {
// First message is always ok
return true;
}
return false;

}

private int getServerId(ValueMap json) {
if (json.containsKey(ApplicationConstants.SERVER_SYNC_ID)) {
return json.getInt(ApplicationConstants.SERVER_SYNC_ID);
} else {
return -1;
}
}

private int getExpectedServerId() {
return lastSeenServerSyncId + 1;
}

/**
* Timer used to make sure that no misbehaving components can delay response
* handling forever.
@@ -1444,11 +1513,30 @@ public class ServerMessageHandler {
Timer forceHandleMessage = new Timer() {
@Override
public void run() {
getLogger()
.warning(
"WARNING: reponse handling was never resumed, forcibly removing locks...");
responseHandlingLocks.clear();
handlePendingMessages();
if (!responseHandlingLocks.isEmpty()) {
// Lock which was never release -> bug in locker or things just
// too slow
getLogger()
.warning(
"WARNING: reponse handling was never resumed, forcibly removing locks...");
responseHandlingLocks.clear();
} else {
// Waited for out-of-order message which never arrived
// Do one final check and resynchronize if the message is not
// there. The final check is only a precaution as this timer
// should have been cancelled if the message has arrived
getLogger().warning(
"Gave up waiting for message " + getExpectedServerId()
+ " from the server");

}
if (!handlePendingMessages() && !pendingUIDLMessages.isEmpty()) {
// There are messages but the next id was not found, likely it
// has been lost
// Drop pending messages and resynchronize
pendingUIDLMessages.clear();
getServerCommunicationHandler().resynchronize();
}
}
};

@@ -1492,20 +1580,45 @@ public class ServerMessageHandler {
}-*/;

/**
* Handles all pending UIDL messages queued while response handling was
* suspended.
* Finds the next pending UIDL message and handles it (next pending is
* decided based on the server id)
*
* @return true if a message was handled, false otherwise
*/
private void handlePendingMessages() {
if (!pendingUIDLMessages.isEmpty()) {
/*
* Clear the list before processing enqueued messages to support
* reentrancy
*/
List<PendingUIDLMessage> pendingMessages = pendingUIDLMessages;
pendingUIDLMessages = new ArrayList<PendingUIDLMessage>();
private boolean handlePendingMessages() {
if (pendingUIDLMessages.isEmpty()) {
return false;
}

// Try to find the next expected message
PendingUIDLMessage toHandle = null;
for (PendingUIDLMessage message : pendingUIDLMessages) {
if (isNextExpectedMessage(getServerId(message.json))) {
toHandle = message;
break;
}
}

if (toHandle != null) {
pendingUIDLMessages.remove(toHandle);
handleJSON(toHandle.getJson());
// Any remaining messages will be handled when this is called
// again at the end of handleJSON
return true;
} else {
return false;
}

}

for (PendingUIDLMessage pending : pendingMessages) {
handleJSON(pending.getJson());
private void removeOldPendingMessages() {
Iterator<PendingUIDLMessage> i = pendingUIDLMessages.iterator();
while (i.hasNext()) {
PendingUIDLMessage m = i.next();
int serverId = getServerId(m.json);
if (serverId != -1 && serverId < getExpectedServerId()) {
getLogger().info("Removing old message with id " + serverId);
i.remove();
}
}
}

+ 4
- 0
server/src/com/vaadin/server/communication/UidlWriter.java View File

@@ -129,6 +129,10 @@ public class UidlWriter implements Serializable {
.getCurrentSyncId() : -1;
writer.write("\"" + ApplicationConstants.SERVER_SYNC_ID + "\": "
+ syncId + ", ");
if (repaintAll) {
writer.write("\"" + ApplicationConstants.RESYNCHRONIZE_ID
+ "\": true, ");
}
int nextClientToServerMessageId = ui
.getLastProcessedClientToServerId() + 1;
writer.write("\"" + ApplicationConstants.CLIENT_TO_SERVER_ID

Loading…
Cancel
Save