Node-Red configuration
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

cluster-adapter.js 26KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674
  1. "use strict";
  2. var __rest = (this && this.__rest) || function (s, e) {
  3. var t = {};
  4. for (var p in s) if (Object.prototype.hasOwnProperty.call(s, p) && e.indexOf(p) < 0)
  5. t[p] = s[p];
  6. if (s != null && typeof Object.getOwnPropertySymbols === "function")
  7. for (var i = 0, p = Object.getOwnPropertySymbols(s); i < p.length; i++) {
  8. if (e.indexOf(p[i]) < 0 && Object.prototype.propertyIsEnumerable.call(s, p[i]))
  9. t[p[i]] = s[p[i]];
  10. }
  11. return t;
  12. };
  13. Object.defineProperty(exports, "__esModule", { value: true });
  14. exports.ClusterAdapterWithHeartbeat = exports.ClusterAdapter = exports.MessageType = void 0;
  15. const in_memory_adapter_1 = require("./in-memory-adapter");
  16. const debug_1 = require("debug");
  17. const crypto_1 = require("crypto");
  18. const debug = (0, debug_1.debug)("socket.io-adapter");
  19. const EMITTER_UID = "emitter";
  20. const DEFAULT_TIMEOUT = 5000;
  21. function randomId() {
  22. return (0, crypto_1.randomBytes)(8).toString("hex");
  23. }
  24. var MessageType;
  25. (function (MessageType) {
  26. MessageType[MessageType["INITIAL_HEARTBEAT"] = 1] = "INITIAL_HEARTBEAT";
  27. MessageType[MessageType["HEARTBEAT"] = 2] = "HEARTBEAT";
  28. MessageType[MessageType["BROADCAST"] = 3] = "BROADCAST";
  29. MessageType[MessageType["SOCKETS_JOIN"] = 4] = "SOCKETS_JOIN";
  30. MessageType[MessageType["SOCKETS_LEAVE"] = 5] = "SOCKETS_LEAVE";
  31. MessageType[MessageType["DISCONNECT_SOCKETS"] = 6] = "DISCONNECT_SOCKETS";
  32. MessageType[MessageType["FETCH_SOCKETS"] = 7] = "FETCH_SOCKETS";
  33. MessageType[MessageType["FETCH_SOCKETS_RESPONSE"] = 8] = "FETCH_SOCKETS_RESPONSE";
  34. MessageType[MessageType["SERVER_SIDE_EMIT"] = 9] = "SERVER_SIDE_EMIT";
  35. MessageType[MessageType["SERVER_SIDE_EMIT_RESPONSE"] = 10] = "SERVER_SIDE_EMIT_RESPONSE";
  36. MessageType[MessageType["BROADCAST_CLIENT_COUNT"] = 11] = "BROADCAST_CLIENT_COUNT";
  37. MessageType[MessageType["BROADCAST_ACK"] = 12] = "BROADCAST_ACK";
  38. MessageType[MessageType["ADAPTER_CLOSE"] = 13] = "ADAPTER_CLOSE";
  39. })(MessageType = exports.MessageType || (exports.MessageType = {}));
  40. function encodeOptions(opts) {
  41. return {
  42. rooms: [...opts.rooms],
  43. except: [...opts.except],
  44. flags: opts.flags,
  45. };
  46. }
  47. function decodeOptions(opts) {
  48. return {
  49. rooms: new Set(opts.rooms),
  50. except: new Set(opts.except),
  51. flags: opts.flags,
  52. };
  53. }
  54. /**
  55. * A cluster-ready adapter. Any extending class must:
  56. *
  57. * - implement {@link ClusterAdapter#doPublish} and {@link ClusterAdapter#doPublishResponse}
  58. * - call {@link ClusterAdapter#onMessage} and {@link ClusterAdapter#onResponse}
  59. */
  60. class ClusterAdapter extends in_memory_adapter_1.Adapter {
  61. constructor(nsp) {
  62. super(nsp);
  63. this.requests = new Map();
  64. this.ackRequests = new Map();
  65. this.uid = randomId();
  66. }
  67. /**
  68. * Called when receiving a message from another member of the cluster.
  69. *
  70. * @param message
  71. * @param offset
  72. * @protected
  73. */
  74. onMessage(message, offset) {
  75. if (message.uid === this.uid) {
  76. return debug("[%s] ignore message from self", this.uid);
  77. }
  78. debug("[%s] new event of type %d from %s", this.uid, message.type, message.uid);
  79. switch (message.type) {
  80. case MessageType.BROADCAST: {
  81. const withAck = message.data.requestId !== undefined;
  82. if (withAck) {
  83. super.broadcastWithAck(message.data.packet, decodeOptions(message.data.opts), (clientCount) => {
  84. debug("[%s] waiting for %d client acknowledgements", this.uid, clientCount);
  85. this.publishResponse(message.uid, {
  86. type: MessageType.BROADCAST_CLIENT_COUNT,
  87. data: {
  88. requestId: message.data.requestId,
  89. clientCount,
  90. },
  91. });
  92. }, (arg) => {
  93. debug("[%s] received acknowledgement with value %j", this.uid, arg);
  94. this.publishResponse(message.uid, {
  95. type: MessageType.BROADCAST_ACK,
  96. data: {
  97. requestId: message.data.requestId,
  98. packet: arg,
  99. },
  100. });
  101. });
  102. }
  103. else {
  104. const packet = message.data.packet;
  105. const opts = decodeOptions(message.data.opts);
  106. this.addOffsetIfNecessary(packet, opts, offset);
  107. super.broadcast(packet, opts);
  108. }
  109. break;
  110. }
  111. case MessageType.SOCKETS_JOIN:
  112. super.addSockets(decodeOptions(message.data.opts), message.data.rooms);
  113. break;
  114. case MessageType.SOCKETS_LEAVE:
  115. super.delSockets(decodeOptions(message.data.opts), message.data.rooms);
  116. break;
  117. case MessageType.DISCONNECT_SOCKETS:
  118. super.disconnectSockets(decodeOptions(message.data.opts), message.data.close);
  119. break;
  120. case MessageType.FETCH_SOCKETS: {
  121. debug("[%s] calling fetchSockets with opts %j", this.uid, message.data.opts);
  122. super
  123. .fetchSockets(decodeOptions(message.data.opts))
  124. .then((localSockets) => {
  125. this.publishResponse(message.uid, {
  126. type: MessageType.FETCH_SOCKETS_RESPONSE,
  127. data: {
  128. requestId: message.data.requestId,
  129. sockets: localSockets.map((socket) => {
  130. // remove sessionStore from handshake, as it may contain circular references
  131. const _a = socket.handshake, { sessionStore } = _a, handshake = __rest(_a, ["sessionStore"]);
  132. return {
  133. id: socket.id,
  134. handshake,
  135. rooms: [...socket.rooms],
  136. data: socket.data,
  137. };
  138. }),
  139. },
  140. });
  141. });
  142. break;
  143. }
  144. case MessageType.SERVER_SIDE_EMIT: {
  145. const packet = message.data.packet;
  146. const withAck = message.data.requestId !== undefined;
  147. if (!withAck) {
  148. this.nsp._onServerSideEmit(packet);
  149. return;
  150. }
  151. let called = false;
  152. const callback = (arg) => {
  153. // only one argument is expected
  154. if (called) {
  155. return;
  156. }
  157. called = true;
  158. debug("[%s] calling acknowledgement with %j", this.uid, arg);
  159. this.publishResponse(message.uid, {
  160. type: MessageType.SERVER_SIDE_EMIT_RESPONSE,
  161. data: {
  162. requestId: message.data.requestId,
  163. packet: arg,
  164. },
  165. });
  166. };
  167. this.nsp._onServerSideEmit([...packet, callback]);
  168. break;
  169. }
  170. // @ts-ignore
  171. case MessageType.BROADCAST_CLIENT_COUNT:
  172. // @ts-ignore
  173. case MessageType.BROADCAST_ACK:
  174. // @ts-ignore
  175. case MessageType.FETCH_SOCKETS_RESPONSE:
  176. // @ts-ignore
  177. case MessageType.SERVER_SIDE_EMIT_RESPONSE:
  178. // extending classes may not make a distinction between a ClusterMessage and a ClusterResponse payload and may
  179. // always call the onMessage() method
  180. this.onResponse(message);
  181. break;
  182. default:
  183. debug("[%s] unknown message type: %s", this.uid, message.type);
  184. }
  185. }
  186. /**
  187. * Called when receiving a response from another member of the cluster.
  188. *
  189. * @param response
  190. * @protected
  191. */
  192. onResponse(response) {
  193. var _a, _b;
  194. const requestId = response.data.requestId;
  195. debug("[%s] received response %s to request %s", this.uid, response.type, requestId);
  196. switch (response.type) {
  197. case MessageType.BROADCAST_CLIENT_COUNT: {
  198. (_a = this.ackRequests
  199. .get(requestId)) === null || _a === void 0 ? void 0 : _a.clientCountCallback(response.data.clientCount);
  200. break;
  201. }
  202. case MessageType.BROADCAST_ACK: {
  203. (_b = this.ackRequests.get(requestId)) === null || _b === void 0 ? void 0 : _b.ack(response.data.packet);
  204. break;
  205. }
  206. case MessageType.FETCH_SOCKETS_RESPONSE: {
  207. const request = this.requests.get(requestId);
  208. if (!request) {
  209. return;
  210. }
  211. request.current++;
  212. response.data.sockets.forEach((socket) => request.responses.push(socket));
  213. if (request.current === request.expected) {
  214. clearTimeout(request.timeout);
  215. request.resolve(request.responses);
  216. this.requests.delete(requestId);
  217. }
  218. break;
  219. }
  220. case MessageType.SERVER_SIDE_EMIT_RESPONSE: {
  221. const request = this.requests.get(requestId);
  222. if (!request) {
  223. return;
  224. }
  225. request.current++;
  226. request.responses.push(response.data.packet);
  227. if (request.current === request.expected) {
  228. clearTimeout(request.timeout);
  229. request.resolve(null, request.responses);
  230. this.requests.delete(requestId);
  231. }
  232. break;
  233. }
  234. default:
  235. // @ts-ignore
  236. debug("[%s] unknown response type: %s", this.uid, response.type);
  237. }
  238. }
  239. async broadcast(packet, opts) {
  240. var _a;
  241. const onlyLocal = (_a = opts.flags) === null || _a === void 0 ? void 0 : _a.local;
  242. if (!onlyLocal) {
  243. try {
  244. const offset = await this.publishAndReturnOffset({
  245. type: MessageType.BROADCAST,
  246. data: {
  247. packet,
  248. opts: encodeOptions(opts),
  249. },
  250. });
  251. this.addOffsetIfNecessary(packet, opts, offset);
  252. }
  253. catch (e) {
  254. return debug("[%s] error while broadcasting message: %s", this.uid, e.message);
  255. }
  256. }
  257. super.broadcast(packet, opts);
  258. }
  259. /**
  260. * Adds an offset at the end of the data array in order to allow the client to receive any missed packets when it
  261. * reconnects after a temporary disconnection.
  262. *
  263. * @param packet
  264. * @param opts
  265. * @param offset
  266. * @private
  267. */
  268. addOffsetIfNecessary(packet, opts, offset) {
  269. var _a;
  270. if (!this.nsp.server.opts.connectionStateRecovery) {
  271. return;
  272. }
  273. const isEventPacket = packet.type === 2;
  274. // packets with acknowledgement are not stored because the acknowledgement function cannot be serialized and
  275. // restored on another server upon reconnection
  276. const withoutAcknowledgement = packet.id === undefined;
  277. const notVolatile = ((_a = opts.flags) === null || _a === void 0 ? void 0 : _a.volatile) === undefined;
  278. if (isEventPacket && withoutAcknowledgement && notVolatile) {
  279. packet.data.push(offset);
  280. }
  281. }
  282. broadcastWithAck(packet, opts, clientCountCallback, ack) {
  283. var _a;
  284. const onlyLocal = (_a = opts === null || opts === void 0 ? void 0 : opts.flags) === null || _a === void 0 ? void 0 : _a.local;
  285. if (!onlyLocal) {
  286. const requestId = randomId();
  287. this.ackRequests.set(requestId, {
  288. clientCountCallback,
  289. ack,
  290. });
  291. this.publish({
  292. type: MessageType.BROADCAST,
  293. data: {
  294. packet,
  295. requestId,
  296. opts: encodeOptions(opts),
  297. },
  298. });
  299. // we have no way to know at this level whether the server has received an acknowledgement from each client, so we
  300. // will simply clean up the ackRequests map after the given delay
  301. setTimeout(() => {
  302. this.ackRequests.delete(requestId);
  303. }, opts.flags.timeout);
  304. }
  305. super.broadcastWithAck(packet, opts, clientCountCallback, ack);
  306. }
  307. async addSockets(opts, rooms) {
  308. var _a;
  309. const onlyLocal = (_a = opts.flags) === null || _a === void 0 ? void 0 : _a.local;
  310. if (!onlyLocal) {
  311. try {
  312. await this.publishAndReturnOffset({
  313. type: MessageType.SOCKETS_JOIN,
  314. data: {
  315. opts: encodeOptions(opts),
  316. rooms,
  317. },
  318. });
  319. }
  320. catch (e) {
  321. debug("[%s] error while publishing message: %s", this.uid, e.message);
  322. }
  323. }
  324. super.addSockets(opts, rooms);
  325. }
  326. async delSockets(opts, rooms) {
  327. var _a;
  328. const onlyLocal = (_a = opts.flags) === null || _a === void 0 ? void 0 : _a.local;
  329. if (!onlyLocal) {
  330. try {
  331. await this.publishAndReturnOffset({
  332. type: MessageType.SOCKETS_LEAVE,
  333. data: {
  334. opts: encodeOptions(opts),
  335. rooms,
  336. },
  337. });
  338. }
  339. catch (e) {
  340. debug("[%s] error while publishing message: %s", this.uid, e.message);
  341. }
  342. }
  343. super.delSockets(opts, rooms);
  344. }
  345. async disconnectSockets(opts, close) {
  346. var _a;
  347. const onlyLocal = (_a = opts.flags) === null || _a === void 0 ? void 0 : _a.local;
  348. if (!onlyLocal) {
  349. try {
  350. await this.publishAndReturnOffset({
  351. type: MessageType.DISCONNECT_SOCKETS,
  352. data: {
  353. opts: encodeOptions(opts),
  354. close,
  355. },
  356. });
  357. }
  358. catch (e) {
  359. debug("[%s] error while publishing message: %s", this.uid, e.message);
  360. }
  361. }
  362. super.disconnectSockets(opts, close);
  363. }
  364. async fetchSockets(opts) {
  365. var _a;
  366. const [localSockets, serverCount] = await Promise.all([
  367. super.fetchSockets(opts),
  368. this.serverCount(),
  369. ]);
  370. const expectedResponseCount = serverCount - 1;
  371. if (((_a = opts.flags) === null || _a === void 0 ? void 0 : _a.local) || expectedResponseCount <= 0) {
  372. return localSockets;
  373. }
  374. const requestId = randomId();
  375. return new Promise((resolve, reject) => {
  376. const timeout = setTimeout(() => {
  377. const storedRequest = this.requests.get(requestId);
  378. if (storedRequest) {
  379. reject(new Error(`timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}`));
  380. this.requests.delete(requestId);
  381. }
  382. }, opts.flags.timeout || DEFAULT_TIMEOUT);
  383. const storedRequest = {
  384. type: MessageType.FETCH_SOCKETS,
  385. resolve,
  386. timeout,
  387. current: 0,
  388. expected: expectedResponseCount,
  389. responses: localSockets,
  390. };
  391. this.requests.set(requestId, storedRequest);
  392. this.publish({
  393. type: MessageType.FETCH_SOCKETS,
  394. data: {
  395. opts: encodeOptions(opts),
  396. requestId,
  397. },
  398. });
  399. });
  400. }
  401. async serverSideEmit(packet) {
  402. const withAck = typeof packet[packet.length - 1] === "function";
  403. if (!withAck) {
  404. return this.publish({
  405. type: MessageType.SERVER_SIDE_EMIT,
  406. data: {
  407. packet,
  408. },
  409. });
  410. }
  411. const ack = packet.pop();
  412. const expectedResponseCount = (await this.serverCount()) - 1;
  413. debug('[%s] waiting for %d responses to "serverSideEmit" request', this.uid, expectedResponseCount);
  414. if (expectedResponseCount <= 0) {
  415. return ack(null, []);
  416. }
  417. const requestId = randomId();
  418. const timeout = setTimeout(() => {
  419. const storedRequest = this.requests.get(requestId);
  420. if (storedRequest) {
  421. ack(new Error(`timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}`), storedRequest.responses);
  422. this.requests.delete(requestId);
  423. }
  424. }, DEFAULT_TIMEOUT);
  425. const storedRequest = {
  426. type: MessageType.SERVER_SIDE_EMIT,
  427. resolve: ack,
  428. timeout,
  429. current: 0,
  430. expected: expectedResponseCount,
  431. responses: [],
  432. };
  433. this.requests.set(requestId, storedRequest);
  434. this.publish({
  435. type: MessageType.SERVER_SIDE_EMIT,
  436. data: {
  437. requestId,
  438. packet,
  439. },
  440. });
  441. }
  442. publish(message) {
  443. this.publishAndReturnOffset(message).catch((err) => {
  444. debug("[%s] error while publishing message: %s", this.uid, err);
  445. });
  446. }
  447. publishAndReturnOffset(message) {
  448. message.uid = this.uid;
  449. message.nsp = this.nsp.name;
  450. return this.doPublish(message);
  451. }
  452. publishResponse(requesterUid, response) {
  453. response.uid = this.uid;
  454. response.nsp = this.nsp.name;
  455. this.doPublishResponse(requesterUid, response).catch((err) => {
  456. debug("[%s] error while publishing response: %s", this.uid, err);
  457. });
  458. }
  459. }
  460. exports.ClusterAdapter = ClusterAdapter;
  461. class ClusterAdapterWithHeartbeat extends ClusterAdapter {
  462. constructor(nsp, opts) {
  463. super(nsp);
  464. this.nodesMap = new Map(); // uid => timestamp of last message
  465. this.customRequests = new Map();
  466. this._opts = Object.assign({
  467. heartbeatInterval: 5000,
  468. heartbeatTimeout: 10000,
  469. }, opts);
  470. this.cleanupTimer = setInterval(() => {
  471. const now = Date.now();
  472. this.nodesMap.forEach((lastSeen, uid) => {
  473. const nodeSeemsDown = now - lastSeen > this._opts.heartbeatTimeout;
  474. if (nodeSeemsDown) {
  475. debug("[%s] node %s seems down", this.uid, uid);
  476. this.removeNode(uid);
  477. }
  478. });
  479. }, 1000);
  480. }
  481. init() {
  482. this.publish({
  483. type: MessageType.INITIAL_HEARTBEAT,
  484. });
  485. }
  486. scheduleHeartbeat() {
  487. if (this.heartbeatTimer) {
  488. this.heartbeatTimer.refresh();
  489. }
  490. else {
  491. this.heartbeatTimer = setTimeout(() => {
  492. this.publish({
  493. type: MessageType.HEARTBEAT,
  494. });
  495. }, this._opts.heartbeatInterval);
  496. }
  497. }
  498. close() {
  499. this.publish({
  500. type: MessageType.ADAPTER_CLOSE,
  501. });
  502. clearTimeout(this.heartbeatTimer);
  503. if (this.cleanupTimer) {
  504. clearInterval(this.cleanupTimer);
  505. }
  506. }
  507. onMessage(message, offset) {
  508. if (message.uid === this.uid) {
  509. return debug("[%s] ignore message from self", this.uid);
  510. }
  511. if (message.uid && message.uid !== EMITTER_UID) {
  512. // we track the UID of each sender, in order to know how many servers there are in the cluster
  513. this.nodesMap.set(message.uid, Date.now());
  514. }
  515. debug("[%s] new event of type %d from %s", this.uid, message.type, message.uid);
  516. switch (message.type) {
  517. case MessageType.INITIAL_HEARTBEAT:
  518. this.publish({
  519. type: MessageType.HEARTBEAT,
  520. });
  521. break;
  522. case MessageType.HEARTBEAT:
  523. // nothing to do
  524. break;
  525. case MessageType.ADAPTER_CLOSE:
  526. this.removeNode(message.uid);
  527. break;
  528. default:
  529. super.onMessage(message, offset);
  530. }
  531. }
  532. serverCount() {
  533. return Promise.resolve(1 + this.nodesMap.size);
  534. }
  535. publish(message) {
  536. this.scheduleHeartbeat();
  537. return super.publish(message);
  538. }
  539. async serverSideEmit(packet) {
  540. const withAck = typeof packet[packet.length - 1] === "function";
  541. if (!withAck) {
  542. return this.publish({
  543. type: MessageType.SERVER_SIDE_EMIT,
  544. data: {
  545. packet,
  546. },
  547. });
  548. }
  549. const ack = packet.pop();
  550. const expectedResponseCount = this.nodesMap.size;
  551. debug('[%s] waiting for %d responses to "serverSideEmit" request', this.uid, expectedResponseCount);
  552. if (expectedResponseCount <= 0) {
  553. return ack(null, []);
  554. }
  555. const requestId = randomId();
  556. const timeout = setTimeout(() => {
  557. const storedRequest = this.customRequests.get(requestId);
  558. if (storedRequest) {
  559. ack(new Error(`timeout reached: missing ${storedRequest.missingUids.size} responses`), storedRequest.responses);
  560. this.customRequests.delete(requestId);
  561. }
  562. }, DEFAULT_TIMEOUT);
  563. const storedRequest = {
  564. type: MessageType.SERVER_SIDE_EMIT,
  565. resolve: ack,
  566. timeout,
  567. missingUids: new Set([...this.nodesMap.keys()]),
  568. responses: [],
  569. };
  570. this.customRequests.set(requestId, storedRequest);
  571. this.publish({
  572. type: MessageType.SERVER_SIDE_EMIT,
  573. data: {
  574. requestId,
  575. packet,
  576. },
  577. });
  578. }
  579. async fetchSockets(opts) {
  580. var _a;
  581. const [localSockets, serverCount] = await Promise.all([
  582. super.fetchSockets({
  583. rooms: opts.rooms,
  584. except: opts.except,
  585. flags: {
  586. local: true,
  587. },
  588. }),
  589. this.serverCount(),
  590. ]);
  591. const expectedResponseCount = serverCount - 1;
  592. if (((_a = opts.flags) === null || _a === void 0 ? void 0 : _a.local) || expectedResponseCount <= 0) {
  593. return localSockets;
  594. }
  595. const requestId = randomId();
  596. return new Promise((resolve, reject) => {
  597. const timeout = setTimeout(() => {
  598. const storedRequest = this.customRequests.get(requestId);
  599. if (storedRequest) {
  600. reject(new Error(`timeout reached: missing ${storedRequest.missingUids.size} responses`));
  601. this.customRequests.delete(requestId);
  602. }
  603. }, opts.flags.timeout || DEFAULT_TIMEOUT);
  604. const storedRequest = {
  605. type: MessageType.FETCH_SOCKETS,
  606. resolve,
  607. timeout,
  608. missingUids: new Set([...this.nodesMap.keys()]),
  609. responses: localSockets,
  610. };
  611. this.customRequests.set(requestId, storedRequest);
  612. this.publish({
  613. type: MessageType.FETCH_SOCKETS,
  614. data: {
  615. opts: encodeOptions(opts),
  616. requestId,
  617. },
  618. });
  619. });
  620. }
  621. onResponse(response) {
  622. const requestId = response.data.requestId;
  623. debug("[%s] received response %s to request %s", this.uid, response.type, requestId);
  624. switch (response.type) {
  625. case MessageType.FETCH_SOCKETS_RESPONSE: {
  626. const request = this.customRequests.get(requestId);
  627. if (!request) {
  628. return;
  629. }
  630. response.data.sockets.forEach((socket) => request.responses.push(socket));
  631. request.missingUids.delete(response.uid);
  632. if (request.missingUids.size === 0) {
  633. clearTimeout(request.timeout);
  634. request.resolve(request.responses);
  635. this.customRequests.delete(requestId);
  636. }
  637. break;
  638. }
  639. case MessageType.SERVER_SIDE_EMIT_RESPONSE: {
  640. const request = this.customRequests.get(requestId);
  641. if (!request) {
  642. return;
  643. }
  644. request.responses.push(response.data.packet);
  645. request.missingUids.delete(response.uid);
  646. if (request.missingUids.size === 0) {
  647. clearTimeout(request.timeout);
  648. request.resolve(null, request.responses);
  649. this.customRequests.delete(requestId);
  650. }
  651. break;
  652. }
  653. default:
  654. super.onResponse(response);
  655. }
  656. }
  657. removeNode(uid) {
  658. this.customRequests.forEach((request, requestId) => {
  659. request.missingUids.delete(uid);
  660. if (request.missingUids.size === 0) {
  661. clearTimeout(request.timeout);
  662. if (request.type === MessageType.FETCH_SOCKETS) {
  663. request.resolve(request.responses);
  664. }
  665. else if (request.type === MessageType.SERVER_SIDE_EMIT) {
  666. request.resolve(null, request.responses);
  667. }
  668. this.customRequests.delete(requestId);
  669. }
  670. });
  671. this.nodesMap.delete(uid);
  672. }
  673. }
  674. exports.ClusterAdapterWithHeartbeat = ClusterAdapterWithHeartbeat;