From ade3854af8720f746cd94046ce7b733388eba349 Mon Sep 17 00:00:00 2001 From: Luke Wagner Date: Thu, 10 Apr 2025 14:20:12 -0500 Subject: [PATCH] Change stream results to indicate cancellation Resolves #490 --- design/mvp/CanonicalABI.md | 74 ++++++---- design/mvp/canonical-abi/definitions.py | 61 ++++---- design/mvp/canonical-abi/run_tests.py | 181 ++++++++++++++---------- 3 files changed, 188 insertions(+), 128 deletions(-) diff --git a/design/mvp/CanonicalABI.md b/design/mvp/CanonicalABI.md index 3f50739b..835917b2 100644 --- a/design/mvp/CanonicalABI.md +++ b/design/mvp/CanonicalABI.md @@ -354,7 +354,7 @@ values *into* the buffer's memory. Buffers are represented by the following 3 abstract Python classes: ```python class Buffer: - MAX_LENGTH = 2**30 - 1 + MAX_LENGTH = 2**28 - 1 t: ValType remain: Callable[[], int] @@ -1055,7 +1055,7 @@ stream.) ```python RevokeBuffer = Callable[[], None] OnPartialCopy = Callable[[RevokeBuffer], None] -OnCopyDone = Callable[[], None] +OnCopyDone = Callable[[Literal['completed','cancelled']], None] class ReadableStream: t: ValType @@ -1068,7 +1068,8 @@ The key operation is `read` which works as follows: * `read` is non-blocking, returning `'blocked'` if it would have blocked. * The `On*` callbacks are only called *after* `read` returns `'blocked'`. * `OnCopyDone` is called to indicate that the caller has regained ownership of - the buffer. + the buffer and whether this was due to the read/write completing or + being cancelled. * `OnPartialCopy` is called to indicate a partial write has been made to the buffer, but there may be further writes made in the future, so the caller has *not* regained ownership of the buffer. @@ -1118,21 +1119,21 @@ If set, the `pending_*` fields record the `Buffer` and `On*` callbacks of a `read`. Closing the readable or writable end of a stream or cancelling a `read` or `write` notifies any pending `read` or `write` via its `OnCopyDone` callback, which lets the other side know that ownership of the `Buffer` has -been returned: +been returned and why: ```python - def reset_and_notify_pending(self): + def reset_and_notify_pending(self, why): pending_on_copy_done = self.pending_on_copy_done self.reset_pending() - pending_on_copy_done() + pending_on_copy_done(why) def cancel(self): - self.reset_and_notify_pending() + self.reset_and_notify_pending('cancelled') def close(self): if not self.closed_: self.closed_ = True if self.pending_buffer: - self.reset_and_notify_pending() + self.reset_and_notify_pending('completed') def closed(self): return self.closed_ @@ -1178,7 +1179,7 @@ but in the opposite direction. Both are implemented by a single underlying if self.pending_buffer.remain() > 0: self.pending_on_partial_copy(self.reset_pending) else: - self.reset_and_notify_pending() + self.reset_and_notify_pending('completed') return 'done' ``` Currently, there is a trap when both the `read` and `write` come from the same @@ -1242,10 +1243,10 @@ and closing once a value has been read-from or written-to the given buffer: class FutureEnd(StreamEnd): def close_after_copy(self, copy_op, inst, buffer, on_copy_done): assert(buffer.remain() == 1) - def on_copy_done_wrapper(): + def on_copy_done_wrapper(why): if buffer.remain() == 0: self.stream.close() - on_copy_done() + on_copy_done(why) ret = copy_op(inst, buffer, on_partial_copy = None, on_copy_done = on_copy_done_wrapper) if ret == 'done' and buffer.remain() == 0: self.stream.close() @@ -3520,7 +3521,8 @@ multiple partial copies before having to context-switch back. ```python if opts.sync: final_revoke_buffer = None - def on_partial_copy(revoke_buffer): + def on_partial_copy(revoke_buffer, why = 'completed'): + assert(why == 'completed') nonlocal final_revoke_buffer final_revoke_buffer = revoke_buffer if not async_copy.done(): @@ -3531,6 +3533,8 @@ multiple partial copies before having to context-switch back. await task.wait_on(async_copy, sync = True) final_revoke_buffer() ``` +(When non-cooperative threads are added, the assertion that synchronous copies +can only be `completed`, and not `cancelled`, will no longer hold.) In the asynchronous case, the `on_*` callbacks set a pending event on the `Waitable` which will be delivered to core wasm when core wasm calls @@ -3541,36 +3545,46 @@ allowing multiple partial copies to complete in the interim, reducing overall context-switching overhead. ```python else: - def copy_event(revoke_buffer): + def copy_event(why, revoke_buffer): revoke_buffer() e.copying = False - return (event_code, i, pack_copy_result(task, buffer, e)) + return (event_code, i, pack_copy_result(task, e, buffer, why)) def on_partial_copy(revoke_buffer): - e.set_event(partial(copy_event, revoke_buffer)) - def on_copy_done(): - e.set_event(partial(copy_event, revoke_buffer = lambda:())) + e.set_event(partial(copy_event, 'completed', revoke_buffer)) + def on_copy_done(why): + e.set_event(partial(copy_event, why, revoke_buffer = lambda:())) if e.copy(task.inst, buffer, on_partial_copy, on_copy_done) != 'done': e.copying = True return [BLOCKED] - return [pack_copy_result(task, buffer, e)] + return [pack_copy_result(task, e, buffer, 'completed')] ``` However the copy completes, the results are reported to the caller via `pack_copy_result`: ```python -BLOCKED = 0xffff_ffff -CLOSED = 0x8000_0000 +BLOCKED = 0xffff_ffff +COMPLETED = 0x0 +CLOSED = 0x1 +CANCELLED = 0x2 -def pack_copy_result(task, buffer, e): - if buffer.progress or not e.stream.closed(): - assert(buffer.progress <= Buffer.MAX_LENGTH < BLOCKED) - assert(not (buffer.progress & CLOSED)) - return buffer.progress +def pack_copy_result(task, e, buffer, why): + if e.stream.closed(): + result = CLOSED + elif why == 'cancelled': + result = CANCELLED else: - return CLOSED -``` -The order of tests here indicates that, if some progress was made and then the -stream was closed, only the progress is reported and the `CLOSED` status is -left to be discovered next time. + assert(why == 'completed') + assert(not isinstance(e, FutureEnd)) + result = COMPLETED + assert(buffer.progress <= Buffer.MAX_LENGTH < 2**28) + packed = result | (buffer.progress << 4) + assert(packed != BLOCKED) + return packed +``` +The `result` indicates whether the stream was closed by the other end, the +copy was cancelled by this end (via `{stream,future}.cancel-{read,write}`) or, +otherwise, completed successfully. In all cases, any number of elements (from +`0` to `n`) may have *first* been copied into or out of the buffer passed to +the `read` or `write` and so this number is packed into the `i32` result. ### 🔀 `canon {stream,future}.cancel-{read,write}` diff --git a/design/mvp/canonical-abi/definitions.py b/design/mvp/canonical-abi/definitions.py index 86d56723..50504f15 100644 --- a/design/mvp/canonical-abi/definitions.py +++ b/design/mvp/canonical-abi/definitions.py @@ -303,7 +303,7 @@ def __init__(self, impl, dtor = None, dtor_sync = True, dtor_callback = None): #### Buffer State class Buffer: - MAX_LENGTH = 2**30 - 1 + MAX_LENGTH = 2**28 - 1 t: ValType remain: Callable[[], int] @@ -638,7 +638,7 @@ def drop(self): RevokeBuffer = Callable[[], None] OnPartialCopy = Callable[[RevokeBuffer], None] -OnCopyDone = Callable[[], None] +OnCopyDone = Callable[[Literal['completed','cancelled']], None] class ReadableStream: t: ValType @@ -665,19 +665,19 @@ def reset_pending(self): self.pending_on_partial_copy = None self.pending_on_copy_done = None - def reset_and_notify_pending(self): + def reset_and_notify_pending(self, why): pending_on_copy_done = self.pending_on_copy_done self.reset_pending() - pending_on_copy_done() + pending_on_copy_done(why) def cancel(self): - self.reset_and_notify_pending() + self.reset_and_notify_pending('cancelled') def close(self): if not self.closed_: self.closed_ = True if self.pending_buffer: - self.reset_and_notify_pending() + self.reset_and_notify_pending('completed') def closed(self): return self.closed_ @@ -705,7 +705,7 @@ def copy(self, inst, buffer, on_partial_copy, on_copy_done, src, dst): if self.pending_buffer.remain() > 0: self.pending_on_partial_copy(self.reset_pending) else: - self.reset_and_notify_pending() + self.reset_and_notify_pending('completed') return 'done' class StreamEnd(Waitable): @@ -735,10 +735,10 @@ def copy(self, inst, src, on_partial_copy, on_copy_done): class FutureEnd(StreamEnd): def close_after_copy(self, copy_op, inst, buffer, on_copy_done): assert(buffer.remain() == 1) - def on_copy_done_wrapper(): + def on_copy_done_wrapper(why): if buffer.remain() == 0: self.stream.close() - on_copy_done() + on_copy_done(why) ret = copy_op(inst, buffer, on_partial_copy = None, on_copy_done = on_copy_done_wrapper) if ret == 'done' and buffer.remain() == 0: self.stream.close() @@ -2021,7 +2021,8 @@ async def copy(EndT, BufferT, event_code, t, opts, task, i, ptr, n): buffer = BufferT(t, cx, ptr, n) if opts.sync: final_revoke_buffer = None - def on_partial_copy(revoke_buffer): + def on_partial_copy(revoke_buffer, why = 'completed'): + assert(why == 'completed') nonlocal final_revoke_buffer final_revoke_buffer = revoke_buffer if not async_copy.done(): @@ -2032,29 +2033,37 @@ def on_partial_copy(revoke_buffer): await task.wait_on(async_copy, sync = True) final_revoke_buffer() else: - def copy_event(revoke_buffer): + def copy_event(why, revoke_buffer): revoke_buffer() e.copying = False - return (event_code, i, pack_copy_result(task, buffer, e)) + return (event_code, i, pack_copy_result(task, e, buffer, why)) def on_partial_copy(revoke_buffer): - e.set_event(partial(copy_event, revoke_buffer)) - def on_copy_done(): - e.set_event(partial(copy_event, revoke_buffer = lambda:())) + e.set_event(partial(copy_event, 'completed', revoke_buffer)) + def on_copy_done(why): + e.set_event(partial(copy_event, why, revoke_buffer = lambda:())) if e.copy(task.inst, buffer, on_partial_copy, on_copy_done) != 'done': e.copying = True return [BLOCKED] - return [pack_copy_result(task, buffer, e)] - -BLOCKED = 0xffff_ffff -CLOSED = 0x8000_0000 - -def pack_copy_result(task, buffer, e): - if buffer.progress or not e.stream.closed(): - assert(buffer.progress <= Buffer.MAX_LENGTH < BLOCKED) - assert(not (buffer.progress & CLOSED)) - return buffer.progress + return [pack_copy_result(task, e, buffer, 'completed')] + +BLOCKED = 0xffff_ffff +COMPLETED = 0x0 +CLOSED = 0x1 +CANCELLED = 0x2 + +def pack_copy_result(task, e, buffer, why): + if e.stream.closed(): + result = CLOSED + elif why == 'cancelled': + result = CANCELLED else: - return CLOSED + assert(why == 'completed') + assert(not isinstance(e, FutureEnd)) + result = COMPLETED + assert(buffer.progress <= Buffer.MAX_LENGTH < 2**28) + packed = result | (buffer.progress << 4) + assert(packed != BLOCKED) + return packed ### 🔀 `canon {stream,future}.cancel-{read,write}` diff --git a/design/mvp/canonical-abi/run_tests.py b/design/mvp/canonical-abi/run_tests.py index f16b098f..507b2a0f 100644 --- a/design/mvp/canonical-abi/run_tests.py +++ b/design/mvp/canonical-abi/run_tests.py @@ -60,7 +60,7 @@ def mk_tup_rec(x): return x return { str(i):mk_tup_rec(v) for i,v in enumerate(a) } -def unpack_lower_result(ret): +def unpack_result(ret): return (ret & 0xf, ret >> 4) def unpack_new_ends(packed): @@ -570,14 +570,14 @@ async def consumer(task, args): u8 = consumer_heap.memory[ptr] assert(u8 == 43) [ret] = await canon_lower(consumer_opts, toggle_ft, toggle_callee, task, []) - state,subi1 = unpack_lower_result(ret) + state,subi1 = unpack_result(ret) assert(subi1 == 1) assert(state == CallState.STARTED) [] = await canon_waitable_join(task, subi1, seti) retp = ptr consumer_heap.memory[retp] = 13 [ret] = await canon_lower(consumer_opts, blocking_ft, blocking_callee, task, [83, retp]) - state,subi2 = unpack_lower_result(ret) + state,subi2 = unpack_result(ret) assert(subi2 == 2) assert(state == CallState.STARTING) assert(consumer_heap.memory[retp] == 13) @@ -618,7 +618,7 @@ async def dtor(task, args): assert(i == 1) assert(dtor_value is None) [ret] = await canon_resource_drop(rt, False, task, 1) - state,dtorsubi = unpack_lower_result(ret) + state,dtorsubi = unpack_result(ret) assert(dtorsubi == 2) assert(state == CallState.STARTED) assert(dtor_value is None) @@ -674,12 +674,12 @@ async def consumer(task, args): assert(len(args) == 0) [ret] = await canon_lower(opts, producer_ft, producer1, task, []) - state,subi1 = unpack_lower_result(ret) + state,subi1 = unpack_result(ret) assert(subi1 == 1) assert(state == CallState.STARTED) [ret] = await canon_lower(opts, producer_ft, producer2, task, []) - state,subi2 = unpack_lower_result(ret) + state,subi2 = unpack_result(ret) assert(subi2 == 2) assert(state == CallState.STARTED) @@ -771,12 +771,12 @@ async def consumer(task, args): assert(len(args) == 0) [ret] = await canon_lower(consumer_opts, producer_ft, producer1, task, []) - state,subi1 = unpack_lower_result(ret) + state,subi1 = unpack_result(ret) assert(subi1 == 1) assert(state == CallState.STARTED) [ret] = await canon_lower(consumer_opts, producer_ft, producer2, task, []) - state,subi2 = unpack_lower_result(ret) + state,subi2 = unpack_result(ret) assert(subi2 == 2) assert(state == CallState.STARTING) @@ -857,12 +857,12 @@ async def consumer(task, args): assert(len(args) == 0) [ret] = await canon_lower(consumer_opts, producer_ft, producer1, task, []) - state,subi1 = unpack_lower_result(ret) + state,subi1 = unpack_result(ret) assert(subi1 == 1) assert(state == CallState.STARTED) [ret] = await canon_lower(consumer_opts, producer_ft, producer2, task, []) - state,subi2 = unpack_lower_result(ret) + state,subi2 = unpack_result(ret) assert(subi2 == 2) assert(state == CallState.STARTING) @@ -928,11 +928,11 @@ async def core_hostcall_pre(fut, task, args): async def core_func(task, args): [ret] = await canon_lower(lower_opts, ft, hostcall1, task, []) - state,subi1 = unpack_lower_result(ret) + state,subi1 = unpack_result(ret) assert(subi1 == 1) assert(state == CallState.STARTED) [ret] = await canon_lower(lower_opts, ft, hostcall2, task, []) - state,subi2 = unpack_lower_result(ret) + state,subi2 = unpack_result(ret) assert(subi2 == 2) assert(state == CallState.STARTED) @@ -993,10 +993,8 @@ def close(self): self.remaining = [] self.destroy_if_empty = True if self.pending_dst: - self.pending_on_copy_done() - self.pending_dst = None - self.pending_on_partial_copy = None - self.pending_copy_on_done = None + self.pending_on_copy_done('completed') + self.reset_pending() def destroy_once_empty(self): self.destroy_if_empty = True @@ -1029,10 +1027,8 @@ async def async_cancel(): asyncio.create_task(async_cancel()) def actually_cancel(self): - self.pending_on_copy_done() - self.pending_dst = None - self.pending_on_partial_copy = None - self.pending_on_copy_done = None + self.pending_on_copy_done('cancelled') + self.reset_pending() def write(self, vs): assert(vs and not self.closed()) @@ -1042,7 +1038,8 @@ def write(self, vs): if self.pending_dst.remain(): self.pending_on_partial_copy(self.reset_pending) else: - self.actually_cancel() + self.pending_on_copy_done('completed') + self.reset_pending() class HostSink: stream: ReadableStream @@ -1070,7 +1067,7 @@ def on_partial_copy(revoke_buffer): revoke_buffer() if not f.done(): f.set_result(None) - def on_copy_done(): + def on_copy_done(why): if not f.done(): f.set_result(None) if self.stream.read(None, self, on_partial_copy, on_copy_done) != 'done': @@ -1144,7 +1141,8 @@ async def core_func(task, args): rsi2,wsi2 = unpack_new_ends(packed) [] = await canon_task_return(task, [StreamType(U8Type())], opts, [rsi2]) [ret] = await canon_stream_read(U8Type(), opts, task, rsi1, 0, 4) - assert(ret == 4) + result,n = unpack_result(ret) + assert(n == 4 and result == definitions.COMPLETED) assert(mem[0:4] == b'\x01\x02\x03\x04') [packed] = await canon_stream_new(U8Type(), task) rsi3,wsi3 = unpack_new_ends(packed) @@ -1154,24 +1152,29 @@ async def core_func(task, args): assert(ret == 0) rsi4 = mem[retp] [ret] = await canon_stream_write(U8Type(), opts, task, wsi3, 0, 4) - assert(ret == 4) + result,n = unpack_result(ret) + assert(n == 4 and result == definitions.COMPLETED) await asyncio.sleep(0) [ret] = await canon_stream_read(U8Type(), opts, task, rsi4, 0, 4) - assert(ret == 4) + result,n = unpack_result(ret) + assert(n == 4 and result == definitions.COMPLETED) [ret] = await canon_stream_write(U8Type(), opts, task, wsi2, 0, 4) - assert(ret == 4) - [ret] = await canon_stream_read(U8Type(), opts, task, rsi1, 0, 4) - assert(ret == 4) + result,n = unpack_result(ret) + assert(n == 4 and result == definitions.COMPLETED) [ret] = await canon_stream_read(U8Type(), opts, task, rsi1, 0, 4) - assert(ret == definitions.CLOSED) + result,n = unpack_result(ret) + assert(n == 4 and result == definitions.CLOSED) assert(mem[0:4] == b'\x05\x06\x07\x08') [ret] = await canon_stream_write(U8Type(), opts, task, wsi3, 0, 4) - assert(ret == 4) + result,n = unpack_result(ret) + assert(n == 4 and result == definitions.COMPLETED) await asyncio.sleep(0) [ret] = await canon_stream_read(U8Type(), opts, task, rsi4, 0, 4) - assert(ret == 4) + result,n = unpack_result(ret) + assert(n == 4 and result == definitions.COMPLETED) [ret] = await canon_stream_write(U8Type(), opts, task, wsi2, 0, 4) - assert(ret == 4) + result,n = unpack_result(ret) + assert(n == 4 and result == definitions.COMPLETED) [] = await canon_stream_close_readable(U8Type(), task, rsi1) [] = await canon_stream_close_readable(U8Type(), task, rsi4) [] = await canon_stream_close_writable(U8Type(), task, wsi2) @@ -1230,10 +1233,12 @@ async def core_func(task, args): retp = 16 [seti] = await canon_waitable_set_new(task) [] = await canon_waitable_join(task, rsi1, seti) - [event] = await canon_waitable_set_wait(False, mem, task, rsi1, retp) + definitions.throw_it = True + [event] = await canon_waitable_set_wait(False, mem, task, seti, retp) assert(event == EventCode.STREAM_READ) assert(mem[retp+0] == rsi1) - assert(mem[retp+4] == 4) + result,n = unpack_result(mem[retp+4]) + assert(n == 4 and result == definitions.COMPLETED) assert(mem[0:4] == b'\x01\x02\x03\x04') [packed] = await canon_stream_new(U8Type(), task) rsi3,wsi3 = unpack_new_ends(packed) @@ -1248,9 +1253,11 @@ async def core_func(task, args): [event] = await canon_waitable_set_wait(False, mem, task, seti, retp) assert(event == EventCode.STREAM_WRITE) assert(mem[retp+0] == wsi3) - assert(mem[retp+4] == 4) + result,n = unpack_result(mem[retp+4]) + assert(n == 4 and result == definitions.COMPLETED) [ret] = await canon_stream_read(U8Type(), sync_opts, task, rsi4, 0, 4) - assert(ret == 4) + result,n = unpack_result(ret) + assert(n == 4 and result == definitions.COMPLETED) [ret] = await canon_stream_write(U8Type(), opts, task, wsi2, 0, 4) assert(ret == definitions.BLOCKED) dst_stream.set_remain(100) @@ -1258,17 +1265,18 @@ async def core_func(task, args): [event] = await canon_waitable_set_wait(False, mem, task, seti, retp) assert(event == EventCode.STREAM_WRITE) assert(mem[retp+0] == wsi2) - assert(mem[retp+4] == 4) + result,n = unpack_result(mem[retp+4]) + assert(n == 4 and result == definitions.COMPLETED) src_stream.write([5,6,7,8]) src_stream.destroy_once_empty() [ret] = await canon_stream_read(U8Type(), opts, task, rsi1, 0, 4) - assert(ret == 4) - [ret] = await canon_stream_read(U8Type(), sync_opts, task, rsi1, 0, 4) - assert(ret == definitions.CLOSED) + result,n = unpack_result(ret) + assert(n == 4 and result == definitions.CLOSED) [] = await canon_stream_close_readable(U8Type(), task, rsi1) assert(mem[0:4] == b'\x05\x06\x07\x08') [ret] = await canon_stream_write(U8Type(), opts, task, wsi3, 0, 4) - assert(ret == 4) + result,n = unpack_result(ret) + assert(n == 4 and result == definitions.COMPLETED) [] = await canon_stream_close_writable(U8Type(), task, wsi3) [ret] = await canon_stream_read(U8Type(), opts, task, rsi4, 0, 4) assert(ret == definitions.BLOCKED) @@ -1276,12 +1284,14 @@ async def core_func(task, args): [event] = await canon_waitable_set_wait(False, mem, task, seti, retp) assert(event == EventCode.STREAM_READ) assert(mem[retp+0] == rsi4) - assert(mem[retp+4] == 4) + result,n = unpack_result(mem[retp+4]) + assert(n == 4 and result == definitions.CLOSED) [ret] = await canon_stream_read(U8Type(), opts, task, rsi4, 0, 4) assert(ret == definitions.CLOSED) [] = await canon_stream_close_readable(U8Type(), task, rsi4) [ret] = await canon_stream_write(U8Type(), sync_opts, task, wsi2, 0, 4) - assert(ret == 4) + result,n = unpack_result(ret) + assert(n == 4 and result == definitions.COMPLETED) [] = await canon_stream_close_writable(U8Type(), task, wsi2) [] = await canon_waitable_set_drop(task, seti) return [] @@ -1378,10 +1388,12 @@ async def core_func(task, args): rsi = mem[retp] assert(rsi == 1) [ret] = await canon_stream_read(U8Type(), opts, task, rsi, 0, 4) - assert(ret == 2) + result,n = unpack_result(ret) + assert(n == 2 and result == definitions.COMPLETED) assert(mem[0:2] == b'\x01\x02') [ret] = await canon_stream_read(U8Type(), opts, task, rsi, 0, 4) - assert(ret == 2) + result,n = unpack_result(ret) + assert(n == 2 and result == definitions.COMPLETED) assert(mem[0:2] == b'\x03\x04') [ret] = await canon_stream_read(U8Type(), opts, task, rsi, 0, 4) assert(ret == definitions.BLOCKED) @@ -1392,7 +1404,8 @@ async def core_func(task, args): [event] = await canon_waitable_set_wait(False, mem, task, seti, retp) assert(event == EventCode.STREAM_READ) assert(mem[retp+0] == rsi) - assert(mem[retp+4] == 2) + result,n = unpack_result(mem[retp+4]) + assert(n == 2 and result == definitions.COMPLETED) [] = await canon_stream_close_readable(U8Type(), task, rsi) [packed] = await canon_stream_new(U8Type(), task) @@ -1403,7 +1416,8 @@ async def core_func(task, args): assert(ret == 0) mem[0:6] = b'\x01\x02\x03\x04\x05\x06' [ret] = await canon_stream_write(U8Type(), opts, task, wsi, 0, 6) - assert(ret == 2) + result,n = unpack_result(ret) + assert(n == 2 and result == definitions.COMPLETED) [ret] = await canon_stream_write(U8Type(), opts, task, wsi, 2, 4) assert(ret == definitions.BLOCKED) dst.set_remain(4) @@ -1411,7 +1425,8 @@ async def core_func(task, args): [event] = await canon_waitable_set_wait(False, mem, task, seti, retp) assert(event == EventCode.STREAM_WRITE) assert(mem[retp+0] == wsi) - assert(mem[retp+4] == 4) + result,n = unpack_result(mem[retp+4]) + assert(n == 4 and result == definitions.COMPLETED) assert(dst.received == [1,2,3,4,5,6]) [] = await canon_stream_close_writable(U8Type(), task, wsi) [] = await canon_waitable_set_drop(task, seti) @@ -1444,9 +1459,11 @@ async def core_func1(task, args): mem1[0:4] = b'\x01\x02\x03\x04' [ret] = await canon_stream_write(U8Type(), opts1, task, wsi, 0, 2) - assert(ret == 2) + result,n = unpack_result(ret) + assert(n == 2 and result == definitions.COMPLETED) [ret] = await canon_stream_write(U8Type(), opts1, task, wsi, 2, 2) - assert(ret == 2) + result,n = unpack_result(ret) + assert(n == 2 and result == definitions.COMPLETED) await task.on_block(fut2) @@ -1462,7 +1479,8 @@ async def core_func1(task, args): [event] = await canon_waitable_set_wait(False, mem1, task, seti, retp) assert(event == EventCode.STREAM_WRITE) assert(mem1[retp+0] == wsi) - assert(mem1[retp+4] == 4) + result,n = unpack_result(mem1[retp+4]) + assert(n == 4 and result == definitions.COMPLETED) fut4.set_result(None) @@ -1499,7 +1517,8 @@ async def core_func2(task, args): [event] = await canon_waitable_set_wait(False, mem2, task, seti, retp) assert(event == EventCode.STREAM_READ) assert(mem2[retp+0] == rsi) - assert(mem2[retp+4] == 4) + result,n = unpack_result(mem2[retp+4]) + assert(n == 4 and result == definitions.COMPLETED) assert(mem2[0:8] == b'\x01\x02\x03\x04\x00\x00\x00\x00') fut2.set_result(None) @@ -1507,16 +1526,19 @@ async def core_func2(task, args): mem2[0:8] = bytes(8) [ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 2) - assert(ret == 2) + result,n = unpack_result(ret) + assert(n == 2 and result == definitions.COMPLETED) assert(mem2[0:6] == b'\x05\x06\x00\x00\x00\x00') [ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 2, 2) - assert(ret == 2) + result,n = unpack_result(ret) + assert(n == 2 and result == definitions.COMPLETED) assert(mem2[0:6] == b'\x05\x06\x07\x08\x00\x00') await task.on_block(fut4) [ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 2) - assert(ret == definitions.CLOSED) + result,n = unpack_result(ret) + assert(n == 0 and result == definitions.CLOSED) [] = await canon_stream_close_readable(U8Type(), task, rsi) [] = await canon_waitable_set_drop(task, seti) return [] @@ -1540,9 +1562,11 @@ async def core_func1(task, args): await task.on_block(fut1) [ret] = await canon_stream_write(None, opts1, task, wsi, 10000, 2) - assert(ret == 2) + result,n = unpack_result(ret) + assert(n == 2 and result == definitions.COMPLETED) [ret] = await canon_stream_write(None, opts1, task, wsi, 10000, 2) - assert(ret == 2) + result,n = unpack_result(ret) + assert(n == 2 and result == definitions.COMPLETED) await task.on_block(fut2) @@ -1557,7 +1581,8 @@ async def core_func1(task, args): [event] = await canon_waitable_set_wait(False, mem1, task, seti, retp) assert(event == EventCode.STREAM_WRITE) assert(mem1[retp+0] == wsi) - assert(mem1[retp+4] == 4) + result,n = unpack_result(mem1[retp+4]) + assert(n == 4 and result == definitions.COMPLETED) fut4.set_result(None) @@ -1593,20 +1618,24 @@ async def core_func2(task, args): [event] = await canon_waitable_set_wait(False, mem2, task, seti, retp) assert(event == EventCode.STREAM_READ) assert(mem2[retp+0] == rsi) - assert(mem2[retp+4] == 4) + result,n = unpack_result(mem2[retp+4]) + assert(n == 4 and result == definitions.COMPLETED) fut2.set_result(None) await task.on_block(fut3) [ret] = await canon_stream_read(None, opts2, task, rsi, 1000000, 2) - assert(ret == 2) + result,n = unpack_result(ret) + assert(n == 2 and result == definitions.COMPLETED) [ret] = await canon_stream_read(None, opts2, task, rsi, 1000000, 2) - assert(ret == 2) + result,n = unpack_result(ret) + assert(n == 2 and result == definitions.COMPLETED) await task.on_block(fut4) [ret] = await canon_stream_read(None, opts2, task, rsi, 1000000, 2) - assert(ret == definitions.CLOSED) + result,n = unpack_result(ret) + assert(n == 0 and result == definitions.CLOSED) [] = await canon_stream_close_readable(None, task, rsi) return [] @@ -1649,7 +1678,8 @@ async def core_func(task, args): got = await host_sink.consume(2) assert(got == [0xa, 0xb]) [ret] = await canon_stream_cancel_write(U8Type(), True, task, wsi) - assert(ret == 2) + result,n = unpack_result(ret) + assert(n == 2 and result == definitions.COMPLETED) [] = await canon_stream_close_writable(U8Type(), task, wsi) host_sink.set_remain(100) assert(await host_sink.consume(100) is None) @@ -1665,7 +1695,8 @@ async def core_func(task, args): got = await host_sink.consume(2) assert(got == [1, 2]) [ret] = await canon_stream_cancel_write(U8Type(), False, task, wsi) - assert(ret == 2) + result,n = unpack_result(ret) + assert(n == 2 and result == definitions.COMPLETED) [] = await canon_stream_close_writable(U8Type(), task, wsi) host_sink.set_remain(100) assert(await host_sink.consume(100) is None) @@ -1677,7 +1708,8 @@ async def core_func(task, args): [ret] = await canon_stream_read(U8Type(), lower_opts, task, rsi, 0, 4) assert(ret == definitions.BLOCKED) [ret] = await canon_stream_cancel_read(U8Type(), True, task, rsi) - assert(ret == 0) + result,n = unpack_result(ret) + assert(n == 0 and result == definitions.CANCELLED) [] = await canon_stream_close_readable(U8Type(), task, rsi) [ret] = await canon_lower(lower_opts, host_ft2, host_func2, task, [retp]) @@ -1695,7 +1727,8 @@ async def core_func(task, args): [event] = await canon_waitable_set_wait(False, mem, task, seti, retp) assert(event == EventCode.STREAM_READ) assert(mem[retp+0] == rsi) - assert(mem[retp+4] == 2) + result,n = unpack_result(mem[retp+4]) + assert(n == 2 and result == definitions.CANCELLED) assert(mem[0:2] == b'\x07\x08') [] = await canon_stream_close_readable(U8Type(), task, rsi) [] = await canon_waitable_set_drop(task, seti) @@ -1758,7 +1791,7 @@ def cancel(self): pending_on_copy_done = self.pending_on_copy_done self.pending_buffer = None self.pending_on_copy_done = None - pending_on_copy_done() + pending_on_copy_done('cancelled') def close(self): self.is_closed = True if self.pending_buffer: @@ -1775,7 +1808,7 @@ async def host_func(task, on_start, on_return, on_block): outgoing = HostFutureSource(U8Type()) on_return([outgoing]) incoming = HostFutureSink() - future.read(None, incoming, lambda:(), lambda:()) + future.read(None, incoming, lambda:(), lambda why:()) await on_block(incoming.has_v.wait()) assert(incoming.v == 42) outgoing.set_result(43) @@ -1797,14 +1830,16 @@ async def core_func(task, args): writep = 8 mem[writep] = 42 [ret] = await canon_future_write(U8Type(), lower_opts, task, wfi, writep) - assert(ret == 1) + result,n = unpack_result(ret) + assert(n == 1 and result == definitions.CLOSED) [seti] = await canon_waitable_set_new(task) [] = await canon_waitable_join(task, rfi, seti) [event] = await canon_waitable_set_wait(False, mem, task, seti, retp) assert(event == EventCode.FUTURE_READ) assert(mem[retp+0] == rfi) - assert(mem[retp+4] == 1) + result,n = unpack_result(mem[retp+4]) + assert(n == 1 and result == definitions.CLOSED) assert(mem[readp] == 43) [] = await canon_future_close_writable(U8Type(), task, wfi) @@ -1824,13 +1859,15 @@ async def core_func(task, args): writep = 8 mem[writep] = 42 [ret] = await canon_future_write(U8Type(), lower_opts, task, wfi, writep) - assert(ret == 1) + result,n = unpack_result(ret) + assert(n == 1 and result == definitions.CLOSED) while not task.inst.waitables.get(rfi).stream.closed(): await task.yield_(sync = False) [ret] = await canon_future_cancel_read(U8Type(), True, task, rfi) - assert(ret == 1) + result,n = unpack_result(ret) + assert(n == 1 and result == definitions.CLOSED) assert(mem[readp] == 43) [] = await canon_future_close_writable(U8Type(), task, wfi)