From 8f1e6c12d9568e53cc4fb86ba689205c9f42548f Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 20 Jul 2021 10:20:11 +0100 Subject: [PATCH] fix: await pubsub unsubscribe to fix flaky test We manage pubsub subscriptions locally in the http client, when we unsubscribe from a given topic, we abort the long-lived connection we're holding open but do not wait for it to actually close. In our tests we unsubscribe then check the subscription list - if we do this before the subscription is torn down it can report that we are still subscribed to the topic we just unsubscribed from. The fix here is to wait for the connection to be torn down before returning from the unsubscribe call, by which point we should no longer have the topic in our subs list. --- packages/ipfs-http-client/package.json | 2 +- packages/ipfs-http-client/src/pubsub/subscribe.js | 8 +++++++- .../src/pubsub/subscription-tracker.js | 12 +++++++++--- packages/ipfs-http-client/src/pubsub/unsubscribe.js | 2 +- 4 files changed, 18 insertions(+), 6 deletions(-) diff --git a/packages/ipfs-http-client/package.json b/packages/ipfs-http-client/package.json index 00ccbac4e5..31fe7d8b76 100644 --- a/packages/ipfs-http-client/package.json +++ b/packages/ipfs-http-client/package.json @@ -64,6 +64,7 @@ "multiformats": "^9.4.1", "nanoid": "^3.1.12", "native-abort-controller": "^1.0.3", + "p-defer": "^3.0.0", "parse-duration": "^1.0.0", "stream-to-it": "^0.2.2", "uint8arrays": "^2.1.6" @@ -77,7 +78,6 @@ "it-concat": "^2.0.0", "it-first": "^1.0.4", "nock": "^13.0.2", - "p-defer": "^3.0.0", "rimraf": "^3.0.2" }, "engines": { diff --git a/packages/ipfs-http-client/src/pubsub/subscribe.js b/packages/ipfs-http-client/src/pubsub/subscribe.js index db99bf128f..d265ea6a6b 100644 --- a/packages/ipfs-http-client/src/pubsub/subscribe.js +++ b/packages/ipfs-http-client/src/pubsub/subscribe.js @@ -5,6 +5,7 @@ const uint8ArrayToString = require('uint8arrays/to-string') const log = require('debug')('ipfs-http-client:pubsub:subscribe') const configure = require('../lib/configure') const toUrlSearchParams = require('../lib/to-url-search-params') +const defer = require('p-defer') /** * @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions @@ -24,7 +25,9 @@ module.exports = (options, subsTracker) => { * @type {PubsubAPI["subscribe"]} */ async function subscribe (topic, handler, options = {}) { // eslint-disable-line require-await - options.signal = subsTracker.subscribe(topic, handler, options.signal) + const end = defer() + + options.signal = subsTracker.subscribe(topic, handler, end.promise, options.signal) /** @type {(value?: any) => void} */ let done @@ -73,6 +76,9 @@ module.exports = (options, subsTracker) => { done() }) + .finally(() => { + end.resolve() + }) }, 0) return result diff --git a/packages/ipfs-http-client/src/pubsub/subscription-tracker.js b/packages/ipfs-http-client/src/pubsub/subscription-tracker.js index faf15ad6c4..fc5777d069 100644 --- a/packages/ipfs-http-client/src/pubsub/subscription-tracker.js +++ b/packages/ipfs-http-client/src/pubsub/subscription-tracker.js @@ -8,6 +8,7 @@ const { AbortController } = require('native-abort-controller') * @typedef {Object} Subscription * @property {MessageHandlerFn} handler * @property {AbortController} controller + * @property {Promise} end */ class SubscriptionTracker { @@ -19,9 +20,10 @@ class SubscriptionTracker { /** * @param {string} topic * @param {MessageHandlerFn} handler + * @param {Promise} end * @param {AbortSignal} [signal] */ - subscribe (topic, handler, signal) { + subscribe (topic, handler, end, signal) { const topicSubs = this._subs.get(topic) || [] if (topicSubs.find(s => s.handler === handler)) { @@ -31,7 +33,7 @@ class SubscriptionTracker { // Create controller so a call to unsubscribe can cancel the request const controller = new AbortController() - this._subs.set(topic, [{ handler, controller }].concat(topicSubs)) + this._subs.set(topic, [{ handler, controller, end }].concat(topicSubs)) // If there is an external signal, forward the abort event if (signal) { @@ -45,7 +47,7 @@ class SubscriptionTracker { * @param {string} topic * @param {MessageHandlerFn} [handler] */ - unsubscribe (topic, handler) { + async unsubscribe (topic, handler) { const subs = this._subs.get(topic) || [] let unsubs @@ -62,6 +64,10 @@ class SubscriptionTracker { } unsubs.forEach(s => s.controller.abort()) + + await Promise.all( + unsubs.map(sub => sub.end) + ) } } diff --git a/packages/ipfs-http-client/src/pubsub/unsubscribe.js b/packages/ipfs-http-client/src/pubsub/unsubscribe.js index 79c40eb3ba..1b00f88117 100644 --- a/packages/ipfs-http-client/src/pubsub/unsubscribe.js +++ b/packages/ipfs-http-client/src/pubsub/unsubscribe.js @@ -15,7 +15,7 @@ module.exports = (options, subsTracker) => { * @type {PubsubAPI["unsubscribe"]} */ async function unsubscribe (topic, handler) { - subsTracker.unsubscribe(topic, handler) + await subsTracker.unsubscribe(topic, handler) } return unsubscribe }