Skip to content

Commit 1fefb5c

Browse files
jasnelltargos
authored andcommitted
events: allow use of AbortController with once
Allows an AbortSignal to be passed in to events.once() to cancel waiting on an event. Signed-off-by: James M Snell <[email protected]> PR-URL: #34911 Backport-PR-URL: #38386 Reviewed-By: Denys Otrishko <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent 4b04bb8 commit 1fefb5c

File tree

4 files changed

+179
-4
lines changed

4 files changed

+179
-4
lines changed

doc/api/events.md

+29-1
Original file line numberDiff line numberDiff line change
@@ -829,7 +829,7 @@ class MyClass extends EventEmitter {
829829
}
830830
```
831831

832-
## `events.once(emitter, name)`
832+
## `events.once(emitter, name[, options])`
833833
<!-- YAML
834834
added:
835835
- v11.13.0
@@ -838,6 +838,9 @@ added:
838838

839839
* `emitter` {EventEmitter}
840840
* `name` {string}
841+
* `options` {Object}
842+
* `signal` {AbortSignal} An {AbortSignal} that may be used to cancel waiting
843+
for the event.
841844
* Returns: {Promise}
842845

843846
Creates a `Promise` that is fulfilled when the `EventEmitter` emits the given
@@ -896,6 +899,31 @@ ee.emit('error', new Error('boom'));
896899
// Prints: ok boom
897900
```
898901

902+
An {AbortSignal} may be used to cancel waiting for the event early:
903+
904+
```js
905+
const { EventEmitter, once } = require('events');
906+
907+
const ee = new EventEmitter();
908+
const ac = new AbortController();
909+
910+
async function foo(emitter, event, signal) {
911+
try {
912+
await once(emitter, event, { signal });
913+
console.log('event emitted!');
914+
} catch (error) {
915+
if (error.name === 'AbortError') {
916+
console.error('Waiting for the event was canceled!');
917+
} else {
918+
console.error('There was an error', error.message);
919+
}
920+
}
921+
}
922+
923+
foo(ee, 'foo', ac.signal);
924+
ac.abort(); // Abort waiting for the event
925+
```
926+
899927
### Awaiting multiple events emitted on `process.nextTick()`
900928

901929
There is an edge case worth noting when using the `events.once()` function

lib/events.js

+52-1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ const kRejection = SymbolFor('nodejs.rejection');
4646
let spliceOne;
4747

4848
const {
49+
hideStackFrames,
4950
kEnhanceStackBeforeInspector,
5051
codes
5152
} = require('internal/errors');
@@ -59,9 +60,20 @@ const {
5960
inspect
6061
} = require('internal/util/inspect');
6162

63+
const {
64+
validateAbortSignal
65+
} = require('internal/validators');
66+
6267
const kCapture = Symbol('kCapture');
6368
const kErrorMonitor = Symbol('events.errorMonitor');
6469

70+
let DOMException;
71+
const lazyDOMException = hideStackFrames((message, name) => {
72+
if (DOMException === undefined)
73+
DOMException = internalBinding('messaging').DOMException;
74+
return new DOMException(message, name);
75+
});
76+
6577
function EventEmitter(opts) {
6678
EventEmitter.init.call(this, opts);
6779
}
@@ -622,22 +634,61 @@ function unwrapListeners(arr) {
622634
return ret;
623635
}
624636

625-
function once(emitter, name) {
637+
async function once(emitter, name, options = {}) {
638+
const signal = options ? options.signal : undefined;
639+
validateAbortSignal(signal, 'options.signal');
640+
if (signal && signal.aborted)
641+
throw lazyDOMException('The operation was aborted', 'AbortError');
626642
return new Promise((resolve, reject) => {
627643
const errorListener = (err) => {
628644
emitter.removeListener(name, resolver);
645+
if (signal != null) {
646+
eventTargetAgnosticRemoveListener(
647+
signal,
648+
'abort',
649+
abortListener,
650+
{ once: true });
651+
}
629652
reject(err);
630653
};
631654
const resolver = (...args) => {
632655
if (typeof emitter.removeListener === 'function') {
633656
emitter.removeListener('error', errorListener);
634657
}
658+
if (signal != null) {
659+
eventTargetAgnosticRemoveListener(
660+
signal,
661+
'abort',
662+
abortListener,
663+
{ once: true });
664+
}
635665
resolve(args);
636666
};
637667
eventTargetAgnosticAddListener(emitter, name, resolver, { once: true });
638668
if (name !== 'error') {
639669
addErrorHandlerIfEventEmitter(emitter, errorListener, { once: true });
640670
}
671+
function abortListener() {
672+
if (typeof emitter.removeListener === 'function') {
673+
emitter.removeListener(name, resolver);
674+
emitter.removeListener('error', errorListener);
675+
} else {
676+
eventTargetAgnosticRemoveListener(
677+
emitter,
678+
name,
679+
resolver,
680+
{ once: true });
681+
eventTargetAgnosticRemoveListener(
682+
emitter,
683+
'error',
684+
errorListener,
685+
{ once: true });
686+
}
687+
reject(lazyDOMException('The operation was aborted', 'AbortError'));
688+
}
689+
if (signal != null) {
690+
signal.addEventListener('abort', abortListener, { once: true });
691+
}
641692
});
642693
}
643694

lib/internal/validators.js

+10
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,15 @@ const validateCallback = hideStackFrames((callback) => {
218218
throw new ERR_INVALID_CALLBACK(callback);
219219
});
220220

221+
const validateAbortSignal = hideStackFrames((signal, name) => {
222+
if (signal !== undefined &&
223+
(signal === null ||
224+
typeof signal !== 'object' ||
225+
!('aborted' in signal))) {
226+
throw new ERR_INVALID_ARG_TYPE(name, 'AbortSignal', signal);
227+
}
228+
});
229+
221230
module.exports = {
222231
isInt32,
223232
isUint32,
@@ -236,4 +245,5 @@ module.exports = {
236245
validateString,
237246
validateUint32,
238247
validateCallback,
248+
validateAbortSignal,
239249
};

test/parallel/test-events-once.js

+88-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
'use strict';
2-
// Flags: --expose-internals
2+
// Flags: --expose-internals --no-warnings --experimental-abortcontroller
33

44
const common = require('../common');
55
const { once, EventEmitter } = require('events');
6-
const { strictEqual, deepStrictEqual, fail } = require('assert');
6+
const {
7+
strictEqual,
8+
deepStrictEqual,
9+
fail,
10+
rejects,
11+
} = require('assert');
712
const { EventTarget, Event } = require('internal/event_target');
813

914
async function onceAnEvent() {
@@ -114,6 +119,81 @@ async function prioritizesEventEmitter() {
114119
process.nextTick(() => ee.emit('foo'));
115120
await once(ee, 'foo');
116121
}
122+
123+
async function abortSignalBefore() {
124+
const ee = new EventEmitter();
125+
const ac = new AbortController();
126+
ee.on('error', common.mustNotCall());
127+
ac.abort();
128+
129+
await Promise.all([1, {}, 'hi', null, false].map((signal) => {
130+
return rejects(once(ee, 'foo', { signal }), {
131+
code: 'ERR_INVALID_ARG_TYPE'
132+
});
133+
}));
134+
135+
return rejects(once(ee, 'foo', { signal: ac.signal }), {
136+
name: 'AbortError'
137+
});
138+
}
139+
140+
async function abortSignalAfter() {
141+
const ee = new EventEmitter();
142+
const ac = new AbortController();
143+
ee.on('error', common.mustNotCall());
144+
const r = rejects(once(ee, 'foo', { signal: ac.signal }), {
145+
name: 'AbortError'
146+
});
147+
process.nextTick(() => ac.abort());
148+
return r;
149+
}
150+
151+
async function abortSignalAfterEvent() {
152+
const ee = new EventEmitter();
153+
const ac = new AbortController();
154+
process.nextTick(() => {
155+
ee.emit('foo');
156+
ac.abort();
157+
});
158+
await once(ee, 'foo', { signal: ac.signal });
159+
}
160+
161+
async function eventTargetAbortSignalBefore() {
162+
const et = new EventTarget();
163+
const ac = new AbortController();
164+
ac.abort();
165+
166+
await Promise.all([1, {}, 'hi', null, false].map((signal) => {
167+
return rejects(once(et, 'foo', { signal }), {
168+
code: 'ERR_INVALID_ARG_TYPE'
169+
});
170+
}));
171+
172+
return rejects(once(et, 'foo', { signal: ac.signal }), {
173+
name: 'AbortError'
174+
});
175+
}
176+
177+
async function eventTargetAbortSignalAfter() {
178+
const et = new EventTarget();
179+
const ac = new AbortController();
180+
const r = rejects(once(et, 'foo', { signal: ac.signal }), {
181+
name: 'AbortError'
182+
});
183+
process.nextTick(() => ac.abort());
184+
return r;
185+
}
186+
187+
async function eventTargetAbortSignalAfterEvent() {
188+
const et = new EventTarget();
189+
const ac = new AbortController();
190+
process.nextTick(() => {
191+
et.dispatchEvent(new Event('foo'));
192+
ac.abort();
193+
});
194+
await once(et, 'foo', { signal: ac.signal });
195+
}
196+
117197
Promise.all([
118198
onceAnEvent(),
119199
onceAnEventWithTwoArgs(),
@@ -123,4 +203,10 @@ Promise.all([
123203
onceWithEventTarget(),
124204
onceWithEventTargetError(),
125205
prioritizesEventEmitter(),
206+
abortSignalBefore(),
207+
abortSignalAfter(),
208+
abortSignalAfterEvent(),
209+
eventTargetAbortSignalBefore(),
210+
eventTargetAbortSignalAfter(),
211+
eventTargetAbortSignalAfterEvent(),
126212
]).then(common.mustCall());

0 commit comments

Comments
 (0)