You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

FdInStream.cxx 7.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. /* Copyright (C) 2002-2003 RealVNC Ltd. All Rights Reserved.
  2. *
  3. * This is free software; you can redistribute it and/or modify
  4. * it under the terms of the GNU General Public License as published by
  5. * the Free Software Foundation; either version 2 of the License, or
  6. * (at your option) any later version.
  7. *
  8. * This software is distributed in the hope that it will be useful,
  9. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. * GNU General Public License for more details.
  12. *
  13. * You should have received a copy of the GNU General Public License
  14. * along with this software; if not, write to the Free Software
  15. * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
  16. * USA.
  17. */
  18. #include <stdio.h>
  19. #include <string.h>
  20. #ifdef _WIN32
  21. #include <winsock2.h>
  22. #ifndef _WIN32_WCE
  23. #include <sys/timeb.h>
  24. #endif
  25. #define read(s,b,l) recv(s,(char*)b,l,0)
  26. #define close closesocket
  27. #undef errno
  28. #define errno WSAGetLastError()
  29. #undef EINTR
  30. #define EINTR WSAEINTR
  31. #else
  32. #include <sys/types.h>
  33. #include <errno.h>
  34. #include <unistd.h>
  35. #include <sys/time.h>
  36. #endif
  37. // XXX should use autoconf HAVE_SYS_SELECT_H
  38. #ifdef _AIX
  39. #include <sys/select.h>
  40. #endif
  41. #include <rdr/FdInStream.h>
  42. #include <rdr/Exception.h>
  43. using namespace rdr;
  44. enum { DEFAULT_BUF_SIZE = 8192,
  45. MIN_BULK_SIZE = 1024 };
  46. FdInStream::FdInStream(int fd_, int timeoutms_, int bufSize_,
  47. bool closeWhenDone_)
  48. : fd(fd_), closeWhenDone(closeWhenDone_),
  49. timeoutms(timeoutms_), blockCallback(0),
  50. timing(false), timeWaitedIn100us(5), timedKbits(0),
  51. bufSize(bufSize_ ? bufSize_ : DEFAULT_BUF_SIZE), offset(0)
  52. {
  53. ptr = end = start = new U8[bufSize];
  54. }
  55. FdInStream::FdInStream(int fd_, FdInStreamBlockCallback* blockCallback_,
  56. int bufSize_)
  57. : fd(fd_), timeoutms(0), blockCallback(blockCallback_),
  58. timing(false), timeWaitedIn100us(5), timedKbits(0),
  59. bufSize(bufSize_ ? bufSize_ : DEFAULT_BUF_SIZE), offset(0)
  60. {
  61. ptr = end = start = new U8[bufSize];
  62. }
  63. FdInStream::~FdInStream()
  64. {
  65. delete [] start;
  66. if (closeWhenDone) close(fd);
  67. }
  68. void FdInStream::setTimeout(int timeoutms_) {
  69. timeoutms = timeoutms_;
  70. }
  71. void FdInStream::setBlockCallback(FdInStreamBlockCallback* blockCallback_)
  72. {
  73. blockCallback = blockCallback_;
  74. timeoutms = 0;
  75. }
  76. int FdInStream::pos()
  77. {
  78. return offset + ptr - start;
  79. }
  80. void FdInStream::readBytes(void* data, int length)
  81. {
  82. if (length < MIN_BULK_SIZE) {
  83. InStream::readBytes(data, length);
  84. return;
  85. }
  86. U8* dataPtr = (U8*)data;
  87. int n = end - ptr;
  88. if (n > length) n = length;
  89. memcpy(dataPtr, ptr, n);
  90. dataPtr += n;
  91. length -= n;
  92. ptr += n;
  93. while (length > 0) {
  94. n = readWithTimeoutOrCallback(dataPtr, length);
  95. dataPtr += n;
  96. length -= n;
  97. offset += n;
  98. }
  99. }
  100. int FdInStream::overrun(int itemSize, int nItems, bool wait)
  101. {
  102. if (itemSize > bufSize)
  103. throw Exception("FdInStream overrun: max itemSize exceeded");
  104. if (end - ptr != 0)
  105. memmove(start, ptr, end - ptr);
  106. offset += ptr - start;
  107. end -= ptr - start;
  108. ptr = start;
  109. while (end < start + itemSize) {
  110. int n = readWithTimeoutOrCallback((U8*)end, start + bufSize - end, wait);
  111. if (n == 0) return 0;
  112. end += n;
  113. }
  114. if (itemSize * nItems > end - ptr)
  115. nItems = (end - ptr) / itemSize;
  116. return nItems;
  117. }
  118. #ifdef _WIN32
  119. static void gettimeofday(struct timeval* tv, void*)
  120. {
  121. LARGE_INTEGER counts, countsPerSec;
  122. static double usecPerCount = 0.0;
  123. if (QueryPerformanceCounter(&counts)) {
  124. if (usecPerCount == 0.0) {
  125. QueryPerformanceFrequency(&countsPerSec);
  126. usecPerCount = 1000000.0 / countsPerSec.QuadPart;
  127. }
  128. LONGLONG usecs = (LONGLONG)(counts.QuadPart * usecPerCount);
  129. tv->tv_usec = (long)(usecs % 1000000);
  130. tv->tv_sec = (long)(usecs / 1000000);
  131. } else {
  132. #ifndef _WIN32_WCE
  133. struct timeb tb;
  134. ftime(&tb);
  135. tv->tv_sec = tb.time;
  136. tv->tv_usec = tb.millitm * 1000;
  137. #else
  138. throw SystemException("QueryPerformanceCounter", GetLastError());
  139. #endif
  140. }
  141. }
  142. #endif
  143. //
  144. // readWithTimeoutOrCallback() reads up to the given length in bytes from the
  145. // file descriptor into a buffer. If the wait argument is false, then zero is
  146. // returned if no bytes can be read without blocking. Otherwise if a
  147. // blockCallback is set, it will be called (repeatedly) instead of blocking.
  148. // If alternatively there is a timeout set and that timeout expires, it throws
  149. // a TimedOut exception. Otherwise it returns the number of bytes read. It
  150. // never attempts to read() unless select() indicates that the fd is readable -
  151. // this means it can be used on an fd which has been set non-blocking. It also
  152. // has to cope with the annoying possibility of both select() and read()
  153. // returning EINTR.
  154. //
  155. int FdInStream::readWithTimeoutOrCallback(void* buf, int len, bool wait)
  156. {
  157. struct timeval before, after;
  158. if (timing)
  159. gettimeofday(&before, 0);
  160. int n;
  161. while (true) {
  162. do {
  163. fd_set fds;
  164. struct timeval tv;
  165. struct timeval* tvp = &tv;
  166. if (!wait) {
  167. tv.tv_sec = tv.tv_usec = 0;
  168. } else if (timeoutms != -1) {
  169. tv.tv_sec = timeoutms / 1000;
  170. tv.tv_usec = (timeoutms % 1000) * 1000;
  171. } else {
  172. tvp = 0;
  173. }
  174. FD_ZERO(&fds);
  175. FD_SET(fd, &fds);
  176. n = select(fd+1, &fds, 0, 0, tvp);
  177. } while (n < 0 && errno == EINTR);
  178. if (n > 0) break;
  179. if (n < 0) throw SystemException("select",errno);
  180. if (!wait) return 0;
  181. if (!blockCallback) throw TimedOut();
  182. blockCallback->blockCallback();
  183. }
  184. do {
  185. n = ::read(fd, buf, len);
  186. } while (n < 0 && errno == EINTR);
  187. if (n < 0) throw SystemException("read",errno);
  188. if (n == 0) throw EndOfStream();
  189. if (timing) {
  190. gettimeofday(&after, 0);
  191. // fprintf(stderr,"%d.%06d\n",(after.tv_sec - before.tv_sec),
  192. // (after.tv_usec - before.tv_usec));
  193. int newTimeWaited = ((after.tv_sec - before.tv_sec) * 10000 +
  194. (after.tv_usec - before.tv_usec) / 100);
  195. int newKbits = n * 8 / 1000;
  196. // if (newTimeWaited == 0) {
  197. // fprintf(stderr,"new kbps infinite t %d k %d\n",
  198. // newTimeWaited, newKbits);
  199. // } else {
  200. // fprintf(stderr,"new kbps %d t %d k %d\n",
  201. // newKbits * 10000 / newTimeWaited, newTimeWaited, newKbits);
  202. // }
  203. // limit rate to between 10kbit/s and 40Mbit/s
  204. if (newTimeWaited > newKbits*1000) newTimeWaited = newKbits*1000;
  205. if (newTimeWaited < newKbits/4) newTimeWaited = newKbits/4;
  206. timeWaitedIn100us += newTimeWaited;
  207. timedKbits += newKbits;
  208. }
  209. return n;
  210. }
  211. void FdInStream::startTiming()
  212. {
  213. timing = true;
  214. // Carry over up to 1s worth of previous rate for smoothing.
  215. if (timeWaitedIn100us > 10000) {
  216. timedKbits = timedKbits * 10000 / timeWaitedIn100us;
  217. timeWaitedIn100us = 10000;
  218. }
  219. }
  220. void FdInStream::stopTiming()
  221. {
  222. timing = false;
  223. if (timeWaitedIn100us < timedKbits/2)
  224. timeWaitedIn100us = timedKbits/2; // upper limit 20Mbit/s
  225. }
  226. unsigned int FdInStream::kbitsPerSecond()
  227. {
  228. // The following calculation will overflow 32-bit arithmetic if we have
  229. // received more than about 50Mbytes (400Mbits) since we started timing, so
  230. // it should be OK for a single RFB update.
  231. return timedKbits * 10000 / timeWaitedIn100us;
  232. }