You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

AtmospherePushConnection.java 12KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. /*
  2. * Copyright 2000-2016 Vaadin Ltd.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License"); you may not
  5. * use this file except in compliance with the License. You may obtain a copy of
  6. * the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. * License for the specific language governing permissions and limitations under
  14. * the License.
  15. */
  16. package com.vaadin.server.communication;
  17. import java.io.IOException;
  18. import java.io.ObjectInputStream;
  19. import java.io.Reader;
  20. import java.io.Serializable;
  21. import java.io.StringReader;
  22. import java.io.StringWriter;
  23. import java.io.Writer;
  24. import java.util.concurrent.Future;
  25. import java.util.concurrent.TimeUnit;
  26. import java.util.concurrent.TimeoutException;
  27. import java.util.logging.ConsoleHandler;
  28. import java.util.logging.Level;
  29. import java.util.logging.LogRecord;
  30. import java.util.logging.Logger;
  31. import org.atmosphere.cpr.AtmosphereResource;
  32. import org.atmosphere.cpr.AtmosphereResource.TRANSPORT;
  33. import org.atmosphere.util.Version;
  34. import com.vaadin.shared.communication.PushConstants;
  35. import com.vaadin.ui.UI;
  36. /**
  37. * A {@link PushConnection} implementation using the Atmosphere push support
  38. * that is by default included in Vaadin.
  39. *
  40. * @author Vaadin Ltd
  41. * @since 7.1
  42. */
  43. public class AtmospherePushConnection implements PushConnection {
  44. public static String getAtmosphereVersion() {
  45. try {
  46. String v = Version.getRawVersion();
  47. assert v != null;
  48. return v;
  49. } catch (NoClassDefFoundError e) {
  50. return null;
  51. }
  52. }
  53. /**
  54. * Represents a message that can arrive as multiple fragments.
  55. */
  56. protected static class FragmentedMessage implements Serializable {
  57. private final StringBuilder message = new StringBuilder();
  58. private final int messageLength;
  59. public FragmentedMessage(Reader reader) throws IOException {
  60. // Messages are prefixed by the total message length plus a
  61. // delimiter
  62. String length = "";
  63. int c;
  64. while ((c = reader.read()) != -1
  65. && c != PushConstants.MESSAGE_DELIMITER) {
  66. length += (char) c;
  67. }
  68. try {
  69. messageLength = Integer.parseInt(length);
  70. } catch (NumberFormatException e) {
  71. throw new IOException("Invalid message length " + length, e);
  72. }
  73. }
  74. /**
  75. * Appends all the data from the given Reader to this message and
  76. * returns whether the message was completed.
  77. *
  78. * @param reader
  79. * The Reader from which to read.
  80. * @return true if this message is complete, false otherwise.
  81. * @throws IOException
  82. */
  83. public boolean append(Reader reader) throws IOException {
  84. char[] buffer = new char[PushConstants.WEBSOCKET_BUFFER_SIZE];
  85. int read;
  86. while ((read = reader.read(buffer)) != -1) {
  87. message.append(buffer, 0, read);
  88. assert message.length() <= messageLength : "Received message "
  89. + message.length() + "chars, expected " + messageLength;
  90. }
  91. return message.length() == messageLength;
  92. }
  93. public Reader getReader() {
  94. return new StringReader(message.toString());
  95. }
  96. }
  97. protected enum State {
  98. /**
  99. * Not connected. Trying to push will set the connection state to
  100. * PUSH_PENDING or RESPONSE_PENDING and defer sending the message until
  101. * a connection is established.
  102. */
  103. DISCONNECTED,
  104. /**
  105. * Not connected. An asynchronous push is pending the opening of the
  106. * connection.
  107. */
  108. PUSH_PENDING,
  109. /**
  110. * Not connected. A response to a client request is pending the opening
  111. * of the connection.
  112. */
  113. RESPONSE_PENDING,
  114. /**
  115. * Connected. Messages can be sent through the connection.
  116. */
  117. CONNECTED;
  118. }
  119. private final UI ui;
  120. private transient State state = State.DISCONNECTED;
  121. private transient AtmosphereResource resource;
  122. private transient FragmentedMessage incomingMessage;
  123. private transient Future<Object> outgoingMessage;
  124. public AtmospherePushConnection(UI ui) {
  125. this.ui = ui;
  126. }
  127. @Override
  128. public void push() {
  129. push(true);
  130. }
  131. /**
  132. * Pushes pending state changes and client RPC calls to the client. If
  133. * {@code isConnected()} is false, defers the push until a connection is
  134. * established.
  135. *
  136. * @param async
  137. * True if this push asynchronously originates from the server,
  138. * false if it is a response to a client request.
  139. */
  140. public void push(boolean async) {
  141. if (!isConnected()) {
  142. if (async && state != State.RESPONSE_PENDING) {
  143. state = State.PUSH_PENDING;
  144. } else {
  145. state = State.RESPONSE_PENDING;
  146. }
  147. } else {
  148. try {
  149. Writer writer = new StringWriter();
  150. new UidlWriter().write(getUI(), writer, async);
  151. sendMessage("for(;;);[{" + writer + "}]");
  152. } catch (Exception e) {
  153. throw new RuntimeException("Push failed", e);
  154. }
  155. }
  156. }
  157. /**
  158. * Sends the given message to the current client. Cannot be called if
  159. * {@isConnected()} is false.
  160. *
  161. * @param message
  162. * The message to send
  163. */
  164. protected void sendMessage(String message) {
  165. assert (isConnected());
  166. // "Broadcast" the changes to the single client only
  167. outgoingMessage = getResource().getBroadcaster().broadcast(message,
  168. getResource());
  169. }
  170. /**
  171. * Reads and buffers a (possibly partial) message. If a complete message was
  172. * received, or if the call resulted in the completion of a partially
  173. * received message, returns a {@link Reader} yielding the complete message.
  174. * Otherwise, returns null.
  175. *
  176. * @param reader
  177. * A Reader from which to read the (partial) message
  178. * @return A Reader yielding a complete message or null if the message is
  179. * not yet complete.
  180. * @throws IOException
  181. */
  182. protected Reader receiveMessage(Reader reader) throws IOException {
  183. if (resource == null || resource.transport() != TRANSPORT.WEBSOCKET) {
  184. return reader;
  185. }
  186. if (incomingMessage == null) {
  187. // No existing partially received message
  188. incomingMessage = new FragmentedMessage(reader);
  189. }
  190. if (incomingMessage.append(reader)) {
  191. // Message is complete
  192. Reader completeReader = incomingMessage.getReader();
  193. incomingMessage = null;
  194. return completeReader;
  195. } else {
  196. // Only received a partial message
  197. return null;
  198. }
  199. }
  200. @Override
  201. public boolean isConnected() {
  202. assert state != null;
  203. assert (state == State.CONNECTED) ^ (resource == null);
  204. return state == State.CONNECTED;
  205. }
  206. /**
  207. * Associates this {@link AtmospherePushConnection} with the given
  208. * {@link AtmosphereResource} representing an established push connection.
  209. * If already connected, calls {@link #disconnect()} first. If there is a
  210. * deferred push, carries it out via the new connection.
  211. *
  212. * @since 7.2
  213. */
  214. public void connect(AtmosphereResource resource) {
  215. assert resource != null;
  216. assert resource != this.resource;
  217. if (isConnected()) {
  218. disconnect();
  219. }
  220. this.resource = resource;
  221. State oldState = state;
  222. state = State.CONNECTED;
  223. if (oldState == State.PUSH_PENDING
  224. || oldState == State.RESPONSE_PENDING) {
  225. // Sending a "response" message (async=false) also takes care of a
  226. // pending push, but not vice versa
  227. push(oldState == State.PUSH_PENDING);
  228. }
  229. }
  230. /**
  231. * Gets the UI this push connection is associated with.
  232. *
  233. * @return the UI associated with this connection
  234. */
  235. public UI getUI() {
  236. return ui;
  237. }
  238. /**
  239. * Gets the atmosphere resource associated with this connection.
  240. *
  241. * @return The AtmosphereResource associated with this connection or
  242. * <code>null</code> if the connection is not open.
  243. */
  244. public AtmosphereResource getResource() {
  245. return resource;
  246. }
  247. @Override
  248. public void disconnect() {
  249. assert isConnected();
  250. if (resource == null) {
  251. // Already disconnected. Should not happen but if it does, we don't
  252. // want to cause NPEs
  253. getLogger().fine(
  254. "AtmospherePushConnection.disconnect() called twice, this should not happen");
  255. return;
  256. }
  257. if (resource.isResumed()) {
  258. // This can happen for long polling because of
  259. // http://dev.vaadin.com/ticket/16919
  260. // Once that is fixed, this should never happen
  261. connectionLost();
  262. return;
  263. }
  264. if (outgoingMessage != null) {
  265. // Wait for the last message to be sent before closing the
  266. // connection (assumes that futures are completed in order)
  267. try {
  268. outgoingMessage.get(1000, TimeUnit.MILLISECONDS);
  269. } catch (TimeoutException e) {
  270. getLogger().log(Level.INFO,
  271. "Timeout waiting for messages to be sent to client before disconnect");
  272. } catch (Exception e) {
  273. getLogger().log(Level.INFO,
  274. "Error waiting for messages to be sent to client before disconnect");
  275. }
  276. outgoingMessage = null;
  277. }
  278. try {
  279. resource.close();
  280. } catch (IOException e) {
  281. getLogger().log(Level.INFO, "Error when closing push connection",
  282. e);
  283. }
  284. connectionLost();
  285. }
  286. /**
  287. * Called when the connection to the client has been lost.
  288. *
  289. * @since 7.4.1
  290. */
  291. public void connectionLost() {
  292. resource = null;
  293. if (state == State.CONNECTED) {
  294. // Guard against connectionLost being (incorrectly) called when
  295. // state is PUSH_PENDING or RESPONSE_PENDING
  296. // (http://dev.vaadin.com/ticket/16919)
  297. state = State.DISCONNECTED;
  298. }
  299. }
  300. /**
  301. * Returns the state of this connection.
  302. */
  303. protected State getState() {
  304. return state;
  305. }
  306. /**
  307. * Reinitializes this PushConnection after deserialization. The connection
  308. * is initially in disconnected state; the client will handle the
  309. * reconnecting.
  310. */
  311. private void readObject(ObjectInputStream stream)
  312. throws IOException, ClassNotFoundException {
  313. stream.defaultReadObject();
  314. state = State.DISCONNECTED;
  315. }
  316. private static Logger getLogger() {
  317. return Logger.getLogger(AtmospherePushConnection.class.getName());
  318. }
  319. /**
  320. * Internal method used for reconfiguring loggers to show all Atmosphere log
  321. * messages in the console.
  322. *
  323. * @since 7.6
  324. */
  325. public static void enableAtmosphereDebugLogging() {
  326. Level level = Level.FINEST;
  327. Logger atmosphereLogger = Logger.getLogger("org.atmosphere");
  328. if (atmosphereLogger.getLevel() == level) {
  329. // Already enabled
  330. return;
  331. }
  332. atmosphereLogger.setLevel(level);
  333. // Without this logging, we will have a ClassCircularityError
  334. LogRecord record = new LogRecord(Level.INFO,
  335. "Enabling Atmosphere debug logging");
  336. atmosphereLogger.log(record);
  337. ConsoleHandler ch = new ConsoleHandler();
  338. ch.setLevel(Level.ALL);
  339. atmosphereLogger.addHandler(ch);
  340. }
  341. }