You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

redis_pool.cxx 18KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663
  1. /*
  2. * Copyright 2023 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "config.h"
  17. #include "contrib/libev/ev.h"
  18. #include "redis_pool.h"
  19. #include "cfg_file.h"
  20. #include "contrib/hiredis/hiredis.h"
  21. #include "contrib/hiredis/async.h"
  22. #include "contrib/hiredis/adapters/libev.h"
  23. #include "cryptobox.h"
  24. #include "logger.h"
  25. #include "contrib/ankerl/unordered_dense.h"
  26. #include <list>
  27. #include <unordered_map>
  28. namespace rspamd {
  29. class redis_pool_elt;
  30. class redis_pool;
  31. #define msg_debug_rpool(...) rspamd_conditional_debug_fast(NULL, NULL, \
  32. rspamd_redis_pool_log_id, "redis_pool", conn->tag, \
  33. __FUNCTION__, \
  34. __VA_ARGS__)
  35. INIT_LOG_MODULE(redis_pool)
  36. enum class rspamd_redis_pool_connection_state : std::uint8_t {
  37. RSPAMD_REDIS_POOL_CONN_INACTIVE = 0,
  38. RSPAMD_REDIS_POOL_CONN_ACTIVE,
  39. RSPAMD_REDIS_POOL_CONN_FINALISING
  40. };
  41. struct redis_pool_connection {
  42. using redis_pool_connection_ptr = std::unique_ptr<redis_pool_connection>;
  43. using conn_iter_t = std::list<redis_pool_connection_ptr>::iterator;
  44. struct redisAsyncContext *ctx;
  45. redis_pool_elt *elt;
  46. redis_pool *pool;
  47. conn_iter_t elt_pos;
  48. ev_timer timeout;
  49. char tag[MEMPOOL_UID_LEN];
  50. rspamd_redis_pool_connection_state state;
  51. auto schedule_timeout() -> void;
  52. ~redis_pool_connection();
  53. explicit redis_pool_connection(redis_pool *_pool,
  54. redis_pool_elt *_elt,
  55. const std::string &db,
  56. const std::string &username,
  57. const std::string &password,
  58. struct redisAsyncContext *_ctx);
  59. private:
  60. static auto redis_conn_timeout_cb(EV_P_ ev_timer *w, int revents) -> void;
  61. static auto redis_quit_cb(redisAsyncContext *c, void *r, void *priv) -> void;
  62. static auto redis_on_disconnect(const struct redisAsyncContext *ac, int status) -> auto;
  63. };
  64. using redis_pool_key_t = std::uint64_t;
  65. class redis_pool;
  66. class redis_pool_elt {
  67. using redis_pool_connection_ptr = std::unique_ptr<redis_pool_connection>;
  68. redis_pool *pool;
  69. /*
  70. * These lists owns connections, so if an element is removed from both
  71. * lists, it is destructed
  72. */
  73. std::list<redis_pool_connection_ptr> active;
  74. std::list<redis_pool_connection_ptr> inactive;
  75. std::list<redis_pool_connection_ptr> terminating;
  76. std::string ip;
  77. std::string db;
  78. std::string username;
  79. std::string password;
  80. int port;
  81. redis_pool_key_t key;
  82. bool is_unix;
  83. public:
  84. /* Disable copy */
  85. redis_pool_elt() = delete;
  86. redis_pool_elt(const redis_pool_elt &) = delete;
  87. /* Enable move */
  88. redis_pool_elt(redis_pool_elt &&other) = default;
  89. explicit redis_pool_elt(redis_pool *_pool,
  90. const char *_db, const char *_username,
  91. const char *_password,
  92. const char *_ip, int _port)
  93. : pool(_pool), ip(_ip), port(_port),
  94. key(redis_pool_elt::make_key(_db, _username, _password, _ip, _port))
  95. {
  96. is_unix = ip[0] == '.' || ip[0] == '/';
  97. if (_db) {
  98. db = _db;
  99. }
  100. if (_username) {
  101. username = _username;
  102. }
  103. if (_password) {
  104. password = _password;
  105. }
  106. }
  107. auto new_connection() -> redisAsyncContext *;
  108. auto release_connection(const redis_pool_connection *conn) -> void
  109. {
  110. switch (conn->state) {
  111. case rspamd_redis_pool_connection_state::RSPAMD_REDIS_POOL_CONN_ACTIVE:
  112. active.erase(conn->elt_pos);
  113. break;
  114. case rspamd_redis_pool_connection_state::RSPAMD_REDIS_POOL_CONN_INACTIVE:
  115. inactive.erase(conn->elt_pos);
  116. break;
  117. case rspamd_redis_pool_connection_state::RSPAMD_REDIS_POOL_CONN_FINALISING:
  118. terminating.erase(conn->elt_pos);
  119. break;
  120. }
  121. }
  122. auto move_to_inactive(redis_pool_connection *conn) -> void
  123. {
  124. inactive.splice(std::end(inactive), active, conn->elt_pos);
  125. conn->elt_pos = std::prev(std::end(inactive));
  126. }
  127. auto move_to_terminating(redis_pool_connection *conn) -> void
  128. {
  129. terminating.splice(std::end(terminating), inactive, conn->elt_pos);
  130. conn->elt_pos = std::prev(std::end(terminating));
  131. }
  132. inline static auto make_key(const char *db, const char *username,
  133. const char *password, const char *ip, int port) -> redis_pool_key_t
  134. {
  135. rspamd_cryptobox_fast_hash_state_t st;
  136. rspamd_cryptobox_fast_hash_init(&st, rspamd_hash_seed());
  137. if (db) {
  138. rspamd_cryptobox_fast_hash_update(&st, db, strlen(db));
  139. }
  140. if (username) {
  141. rspamd_cryptobox_fast_hash_update(&st, username, strlen(username));
  142. }
  143. if (password) {
  144. rspamd_cryptobox_fast_hash_update(&st, password, strlen(password));
  145. }
  146. rspamd_cryptobox_fast_hash_update(&st, ip, strlen(ip));
  147. rspamd_cryptobox_fast_hash_update(&st, &port, sizeof(port));
  148. return rspamd_cryptobox_fast_hash_final(&st);
  149. }
  150. auto num_active() const -> auto
  151. {
  152. return active.size();
  153. }
  154. ~redis_pool_elt()
  155. {
  156. rspamd_explicit_memzero(password.data(), password.size());
  157. }
  158. private:
  159. auto redis_async_new() -> redisAsyncContext *
  160. {
  161. struct redisAsyncContext *ctx;
  162. if (is_unix) {
  163. ctx = redisAsyncConnectUnix(ip.c_str());
  164. }
  165. else {
  166. ctx = redisAsyncConnect(ip.c_str(), port);
  167. }
  168. if (ctx && ctx->err != REDIS_OK) {
  169. msg_err("cannot connect to redis %s (port %d): %s", ip.c_str(), port,
  170. ctx->errstr);
  171. redisAsyncFree(ctx);
  172. return nullptr;
  173. }
  174. return ctx;
  175. }
  176. };
  177. class redis_pool final {
  178. static constexpr const double default_timeout = 10.0;
  179. static constexpr const unsigned default_max_conns = 100;
  180. /* We want to have references integrity */
  181. ankerl::unordered_dense::map<redisAsyncContext *,
  182. redis_pool_connection *>
  183. conns_by_ctx;
  184. /*
  185. * We store a pointer to the element in each connection, so this has to be
  186. * a buckets map with pointers/references stability guarantees.
  187. */
  188. std::unordered_map<redis_pool_key_t, redis_pool_elt> elts_by_key;
  189. bool wanna_die = false; /* Hiredis is 'clever' so we can call ourselves from destructor */
  190. public:
  191. double timeout = default_timeout;
  192. unsigned max_conns = default_max_conns;
  193. struct ev_loop *event_loop;
  194. struct rspamd_config *cfg;
  195. public:
  196. explicit redis_pool()
  197. : event_loop(nullptr), cfg(nullptr)
  198. {
  199. conns_by_ctx.reserve(max_conns);
  200. }
  201. /* Legacy stuff */
  202. auto do_config(struct ev_loop *_loop, struct rspamd_config *_cfg) -> void
  203. {
  204. event_loop = _loop;
  205. cfg = _cfg;
  206. }
  207. auto new_connection(const char *db, const char *username,
  208. const char *password, const char *ip, int port) -> redisAsyncContext *;
  209. auto release_connection(redisAsyncContext *ctx,
  210. enum rspamd_redis_pool_release_type how) -> void;
  211. auto unregister_context(redisAsyncContext *ctx) -> void
  212. {
  213. conns_by_ctx.erase(ctx);
  214. }
  215. auto register_context(redisAsyncContext *ctx, redis_pool_connection *conn)
  216. {
  217. conns_by_ctx.emplace(ctx, conn);
  218. }
  219. /* Hack to prevent Redis callbacks to be executed */
  220. auto prepare_to_die() -> void
  221. {
  222. wanna_die = true;
  223. }
  224. ~redis_pool()
  225. {
  226. }
  227. };
  228. redis_pool_connection::~redis_pool_connection()
  229. {
  230. const auto *conn = this; /* For debug */
  231. if (state == rspamd_redis_pool_connection_state::RSPAMD_REDIS_POOL_CONN_ACTIVE) {
  232. msg_debug_rpool("active connection destructed: %p", ctx);
  233. if (ctx) {
  234. pool->unregister_context(ctx);
  235. if (!(ctx->c.flags & REDIS_FREEING)) {
  236. auto *ac = ctx;
  237. ctx = nullptr;
  238. ac->onDisconnect = nullptr;
  239. redisAsyncFree(ac);
  240. }
  241. }
  242. }
  243. else {
  244. msg_debug_rpool("inactive connection destructed: %p", ctx);
  245. ev_timer_stop(pool->event_loop, &timeout);
  246. if (ctx) {
  247. pool->unregister_context(ctx);
  248. if (!(ctx->c.flags & REDIS_FREEING)) {
  249. auto *ac = ctx;
  250. /* To prevent on_disconnect here */
  251. ctx = nullptr;
  252. ac->onDisconnect = nullptr;
  253. redisAsyncFree(ac);
  254. }
  255. }
  256. }
  257. }
  258. auto redis_pool_connection::redis_quit_cb(redisAsyncContext *c, void *r, void *priv) -> void
  259. {
  260. struct redis_pool_connection *conn =
  261. (struct redis_pool_connection *) priv;
  262. msg_debug_rpool("quit command reply for the connection %p",
  263. conn->ctx);
  264. /*
  265. * The connection will be freed by hiredis itself as we are here merely after
  266. * quit command has succeeded and we have timer being set already.
  267. * The problem is that when this callback is called, our connection is likely
  268. * dead, so probably even on_disconnect callback has been already called...
  269. *
  270. * Hence, the connection might already be freed, so even (conn) pointer may be
  271. * inaccessible.
  272. *
  273. * TODO: Use refcounts to prevent this stuff to happen, the problem is how
  274. * to handle Redis timeout on `quit` command in fact... The good thing is that
  275. * it will not likely happen.
  276. */
  277. }
  278. /*
  279. * Called for inactive connections that due to be removed
  280. */
  281. auto redis_pool_connection::redis_conn_timeout_cb(EV_P_ ev_timer *w, int revents) -> void
  282. {
  283. auto *conn = (struct redis_pool_connection *) w->data;
  284. g_assert(conn->state != rspamd_redis_pool_connection_state::RSPAMD_REDIS_POOL_CONN_ACTIVE);
  285. if (conn->state == rspamd_redis_pool_connection_state::RSPAMD_REDIS_POOL_CONN_INACTIVE) {
  286. msg_debug_rpool("scheduled soft removal of connection %p",
  287. conn->ctx);
  288. conn->state = rspamd_redis_pool_connection_state::RSPAMD_REDIS_POOL_CONN_FINALISING;
  289. ev_timer_again(EV_A_ w);
  290. redisAsyncCommand(conn->ctx, redis_pool_connection::redis_quit_cb, conn, "QUIT");
  291. conn->elt->move_to_terminating(conn);
  292. }
  293. else {
  294. /* Finalising by timeout */
  295. ev_timer_stop(EV_A_ w);
  296. msg_debug_rpool("final removal of connection %p, refcount: %d",
  297. conn->ctx);
  298. /* Erasure of shared pointer will cause it to be removed */
  299. conn->elt->release_connection(conn);
  300. }
  301. }
  302. auto redis_pool_connection::redis_on_disconnect(const struct redisAsyncContext *ac, int status) -> auto
  303. {
  304. auto *conn = (struct redis_pool_connection *) ac->data;
  305. /*
  306. * Here, we know that redis itself will free this connection
  307. * so, we need to do something very clever about it
  308. */
  309. if (conn->state != rspamd_redis_pool_connection_state::RSPAMD_REDIS_POOL_CONN_ACTIVE) {
  310. /* Do nothing for active connections as it is already handled somewhere */
  311. if (conn->ctx) {
  312. msg_debug_rpool("inactive connection terminated: %s",
  313. conn->ctx->errstr);
  314. }
  315. /* Erasure of shared pointer will cause it to be removed */
  316. conn->elt->release_connection(conn);
  317. }
  318. }
  319. auto redis_pool_connection::schedule_timeout() -> void
  320. {
  321. const auto *conn = this; /* For debug */
  322. double real_timeout;
  323. auto active_elts = elt->num_active();
  324. if (active_elts > pool->max_conns) {
  325. real_timeout = pool->timeout / 2.0;
  326. real_timeout = rspamd_time_jitter(real_timeout, real_timeout / 4.0);
  327. }
  328. else {
  329. real_timeout = pool->timeout;
  330. real_timeout = rspamd_time_jitter(real_timeout, real_timeout / 2.0);
  331. }
  332. msg_debug_rpool("scheduled connection %p cleanup in %.1f seconds",
  333. ctx, real_timeout);
  334. timeout.data = this;
  335. /* Restore in case if these fields have been modified externally */
  336. ctx->data = this;
  337. redisAsyncSetDisconnectCallback(ctx, redis_pool_connection::redis_on_disconnect);
  338. ev_timer_init(&timeout,
  339. redis_pool_connection::redis_conn_timeout_cb,
  340. real_timeout, real_timeout / 2.0);
  341. ev_timer_start(pool->event_loop, &timeout);
  342. }
  343. redis_pool_connection::redis_pool_connection(redis_pool *_pool,
  344. redis_pool_elt *_elt,
  345. const std::string &db,
  346. const std::string &username,
  347. const std::string &password,
  348. struct redisAsyncContext *_ctx)
  349. : ctx(_ctx), elt(_elt), pool(_pool)
  350. {
  351. state = rspamd_redis_pool_connection_state::RSPAMD_REDIS_POOL_CONN_ACTIVE;
  352. pool->register_context(ctx, this);
  353. ctx->data = this;
  354. memset(tag, 0, sizeof(tag));
  355. rspamd_random_hex(tag, sizeof(tag) - 1);
  356. redisLibevAttach(pool->event_loop, ctx);
  357. redisAsyncSetDisconnectCallback(ctx, redis_pool_connection::redis_on_disconnect);
  358. if (!username.empty()) {
  359. if (!password.empty()) {
  360. redisAsyncCommand(ctx, nullptr, nullptr,
  361. "AUTH %s %s", username.c_str(), password.c_str());
  362. }
  363. else {
  364. msg_warn("Redis requires a password when username is supplied");
  365. }
  366. }
  367. else if (!password.empty()) {
  368. redisAsyncCommand(ctx, nullptr, nullptr,
  369. "AUTH %s", password.c_str());
  370. }
  371. if (!db.empty()) {
  372. redisAsyncCommand(ctx, nullptr, nullptr,
  373. "SELECT %s", db.c_str());
  374. }
  375. }
  376. auto redis_pool_elt::new_connection() -> redisAsyncContext *
  377. {
  378. if (!inactive.empty()) {
  379. decltype(inactive)::value_type conn;
  380. conn.swap(inactive.back());
  381. inactive.pop_back();
  382. g_assert(conn->state != rspamd_redis_pool_connection_state::RSPAMD_REDIS_POOL_CONN_ACTIVE);
  383. if (conn->ctx->err == REDIS_OK) {
  384. /* Also check SO_ERROR */
  385. int err;
  386. socklen_t len = sizeof(int);
  387. if (getsockopt(conn->ctx->c.fd, SOL_SOCKET, SO_ERROR,
  388. (void *) &err, &len) == -1) {
  389. err = errno;
  390. }
  391. if (err != 0) {
  392. /*
  393. * We cannot reuse connection, so we just recursively call
  394. * this function one more time
  395. */
  396. return new_connection();
  397. }
  398. else {
  399. /* Reuse connection */
  400. ev_timer_stop(pool->event_loop, &conn->timeout);
  401. conn->state = rspamd_redis_pool_connection_state::RSPAMD_REDIS_POOL_CONN_ACTIVE;
  402. msg_debug_rpool("reused existing connection to %s:%d: %p",
  403. ip.c_str(), port, conn->ctx);
  404. active.emplace_front(std::move(conn));
  405. active.front()->elt_pos = active.begin();
  406. return active.front()->ctx;
  407. }
  408. }
  409. else {
  410. auto *nctx = redis_async_new();
  411. if (nctx) {
  412. active.emplace_front(std::make_unique<redis_pool_connection>(pool, this,
  413. db.c_str(), username.c_str(), password.c_str(), nctx));
  414. active.front()->elt_pos = active.begin();
  415. }
  416. return nctx;
  417. }
  418. }
  419. else {
  420. auto *nctx = redis_async_new();
  421. if (nctx) {
  422. active.emplace_front(std::make_unique<redis_pool_connection>(pool, this,
  423. db.c_str(), username.c_str(), password.c_str(), nctx));
  424. active.front()->elt_pos = active.begin();
  425. }
  426. return nctx;
  427. }
  428. RSPAMD_UNREACHABLE;
  429. }
  430. auto redis_pool::new_connection(const char *db, const char *username,
  431. const char *password, const char *ip, int port) -> redisAsyncContext *
  432. {
  433. if (!wanna_die) {
  434. auto key = redis_pool_elt::make_key(db, username, password, ip, port);
  435. auto found_elt = elts_by_key.find(key);
  436. if (found_elt != elts_by_key.end()) {
  437. auto &elt = found_elt->second;
  438. return elt.new_connection();
  439. }
  440. else {
  441. /* Need to create a pool */
  442. auto nelt = elts_by_key.try_emplace(key,
  443. this, db, username, password, ip, port);
  444. return nelt.first->second.new_connection();
  445. }
  446. }
  447. return nullptr;
  448. }
  449. auto redis_pool::release_connection(redisAsyncContext *ctx,
  450. enum rspamd_redis_pool_release_type how) -> void
  451. {
  452. if (!wanna_die) {
  453. auto conn_it = conns_by_ctx.find(ctx);
  454. if (conn_it != conns_by_ctx.end()) {
  455. auto *conn = conn_it->second;
  456. g_assert(conn->state == rspamd_redis_pool_connection_state::RSPAMD_REDIS_POOL_CONN_ACTIVE);
  457. if (ctx->err != REDIS_OK) {
  458. /* We need to terminate connection forcefully */
  459. msg_debug_rpool("closed connection %p due to an error", conn->ctx);
  460. }
  461. else {
  462. if (how == RSPAMD_REDIS_RELEASE_DEFAULT) {
  463. /* Ensure that there are no callbacks attached to this conn */
  464. if (ctx->replies.head == nullptr && (ctx->c.flags & REDIS_CONNECTED)) {
  465. /* Just move it to the inactive queue */
  466. conn->state = rspamd_redis_pool_connection_state::RSPAMD_REDIS_POOL_CONN_INACTIVE;
  467. conn->elt->move_to_inactive(conn);
  468. conn->schedule_timeout();
  469. msg_debug_rpool("mark connection %p inactive", conn->ctx);
  470. return;
  471. }
  472. else {
  473. msg_debug_rpool("closed connection %p due to callbacks left",
  474. conn->ctx);
  475. }
  476. }
  477. else {
  478. if (how == RSPAMD_REDIS_RELEASE_FATAL) {
  479. msg_debug_rpool("closed connection %p due to an fatal termination",
  480. conn->ctx);
  481. }
  482. else {
  483. msg_debug_rpool("closed connection %p due to explicit termination",
  484. conn->ctx);
  485. }
  486. }
  487. }
  488. conn->elt->release_connection(conn);
  489. }
  490. else {
  491. msg_err("fatal internal error, connection with ctx %p is not found in the Redis pool",
  492. ctx);
  493. RSPAMD_UNREACHABLE;
  494. }
  495. }
  496. }
  497. }// namespace rspamd
  498. void *
  499. rspamd_redis_pool_init(void)
  500. {
  501. return new rspamd::redis_pool{};
  502. }
  503. void rspamd_redis_pool_config(void *p,
  504. struct rspamd_config *cfg,
  505. struct ev_loop *ev_base)
  506. {
  507. g_assert(p != NULL);
  508. auto *pool = reinterpret_cast<class rspamd::redis_pool *>(p);
  509. pool->do_config(ev_base, cfg);
  510. }
  511. struct redisAsyncContext *
  512. rspamd_redis_pool_connect(void *p,
  513. const char *db, const char *username,
  514. const char *password, const char *ip, int port)
  515. {
  516. g_assert(p != NULL);
  517. auto *pool = reinterpret_cast<class rspamd::redis_pool *>(p);
  518. return pool->new_connection(db, username, password, ip, port);
  519. }
  520. void rspamd_redis_pool_release_connection(void *p,
  521. struct redisAsyncContext *ctx, enum rspamd_redis_pool_release_type how)
  522. {
  523. g_assert(p != NULL);
  524. g_assert(ctx != NULL);
  525. auto *pool = reinterpret_cast<class rspamd::redis_pool *>(p);
  526. pool->release_connection(ctx, how);
  527. }
  528. void rspamd_redis_pool_destroy(void *p)
  529. {
  530. auto *pool = reinterpret_cast<class rspamd::redis_pool *>(p);
  531. pool->prepare_to_die();
  532. delete pool;
  533. }
  534. const char *
  535. rspamd_redis_type_to_string(int type)
  536. {
  537. const char *ret = "unknown";
  538. switch (type) {
  539. case REDIS_REPLY_STRING:
  540. ret = "string";
  541. break;
  542. case REDIS_REPLY_ARRAY:
  543. ret = "array";
  544. break;
  545. case REDIS_REPLY_INTEGER:
  546. ret = "int";
  547. break;
  548. case REDIS_REPLY_STATUS:
  549. ret = "status";
  550. break;
  551. case REDIS_REPLY_NIL:
  552. ret = "nil";
  553. break;
  554. case REDIS_REPLY_ERROR:
  555. ret = "error";
  556. break;
  557. default:
  558. break;
  559. }
  560. return ret;
  561. }