-
Notifications
You must be signed in to change notification settings - Fork 402
lightning-liquidity
: Introduce EventQueue
notifier and wake BP for message processing
#3509
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
90256c6
to
f24e843
Compare
ec27a7a
to
d24eca4
Compare
Rebased after #3533 landed, should be ready for review (cc @TheBlueMatt) |
90a1b34
to
cfce460
Compare
Why end up going with a direct function callback instead of the |
Mhh, that might be an alternative approach, yes. I wonder if it would make sense to add this more generally to the |
cfce460
to
cbd4754
Compare
cbd4754
to
03eb649
Compare
Now switched to having the BP woken to trigger message processing. Had to rebase on top of #3546 to avoid silent rebase conflicts while adding a Should be generally good for review, but I'm currently still fighting some issues from |
lightning-liquidity
: Introduce MessageQueue
and EventQueue
notifier typeslightning-liquidity
: Introduce EventQueue
notifier and wake BP for message processing
ea35484
to
0845ea1
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3509 +/- ##
==========================================
- Coverage 89.12% 89.07% -0.05%
==========================================
Files 156 157 +1
Lines 123514 123528 +14
Branches 123514 123528 +14
==========================================
- Hits 110086 110037 -49
- Misses 10749 10803 +54
- Partials 2679 2688 +9 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
ca96d1f
to
4af5fc3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to get some additional context in some of the commit messages, especially "Relax Sync + Send
bounds on BackgroundProcessor
where possible" which really need an explainer of why we think its a good idea to do so.
|
||
let (expected_payment_size_msat, mpp_mode) = | ||
if let Some(payment_size_msat) = payment_size_msat { | ||
(*payment_size_msat, true) | ||
} else { | ||
debug_assert_eq!(num_htlcs, 1); | ||
if num_htlcs != 1 { | ||
// Revert the queue before error'ing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand the point of this logic here - we've pushed a new htlc onto payment_queue
, but we're erroring here which causes the HTLC to be failed-back. The comment seems to imply we want to revert to the old queue state (which I think would be right?), but that isn't what we're doing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, maybe comment could be more specific, but the idea here is to preserve the previous behavior, which keeps all HTLCs in the queue until we get HTLCHandlingFailed
, at which point we need to clean up the queue, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, so two things:
(a) I think there's an existing bug here - LSPS2ServiceHandler::htlc_intercepted
calls through to here, and when it gets an Err
it calls channelmanager....fail_intercepted_htlc
, but the code here keeps the HTLC listed as pending.
(b) doing the take
at the top looks pretty brittle, and isn't required. eg the following patch compiles:
$ git diff -U2
diff --git a/lightning-liquidity/src/lsps2/service.rs b/lightning-liquidity/src/lsps2/service.rs
index 69acc5ba6..f6ba6aec7 100644
--- a/lightning-liquidity/src/lsps2/service.rs
+++ b/lightning-liquidity/src/lsps2/service.rs
@@ -142,6 +142,5 @@ impl OutboundJITChannelState {
let new_state;
let res = match self {
- OutboundJITChannelState::PendingInitialPayment { payment_queue: old_payment_queue } => {
- let mut payment_queue = core::mem::take(old_payment_queue);
+ OutboundJITChannelState::PendingInitialPayment { payment_queue } => {
let (total_expected_outbound_amount_msat, num_htlcs) = payment_queue.add_htlc(htlc);
@@ -152,6 +151,4 @@ impl OutboundJITChannelState {
debug_assert_eq!(num_htlcs, 1);
if num_htlcs != 1 {
- // Revert the queue before error'ing
- core::mem::swap(old_payment_queue, &mut payment_queue);
return Err(ChannelStateError(
"Paying via multiple HTLCs is disallowed in \"no-MPP+var-invoice\" mode.".to_string()
@@ -164,6 +161,4 @@ impl OutboundJITChannelState {
|| expected_payment_size_msat > opening_fee_params.max_payment_size_msat
{
- // Revert the queue before error'ing
- core::mem::swap(old_payment_queue, &mut payment_queue);
return Err(ChannelStateError(
format!("Payment size violates our limits: expected_payment_size_msat = {}, min_payment_size_msat = {}, max_payment_size_msat = {}",
@@ -181,6 +176,4 @@ impl OutboundJITChannelState {
opening_fee
} else {
- // Revert the queue before error'ing
- core::mem::swap(old_payment_queue, &mut payment_queue);
return Err(ChannelStateError(
format!("Could not compute valid opening fee with min_fee_msat = {}, proportional = {}, and expected_payment_size_msat = {}",
@@ -199,5 +192,5 @@ impl OutboundJITChannelState {
{
new_state = OutboundJITChannelState::PendingChannelOpen {
- payment_queue,
+ payment_queue: core::mem::take(payment_queue),
opening_fee_msat,
};
@@ -210,9 +203,7 @@ impl OutboundJITChannelState {
if mpp_mode {
new_state =
- OutboundJITChannelState::PendingInitialPayment { payment_queue };
+ OutboundJITChannelState::PendingInitialPayment { payment_queue: core::mem::take(payment_queue) };
Ok(None)
} else {
- // Revert the queue before error'ing
- core::mem::swap(old_payment_queue, &mut payment_queue);
return Err(ChannelStateError(
"Intercepted HTLC is too small to pay opening fee".to_string(),
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(a) I think there's an existing bug here -
LSPS2ServiceHandler::htlc_intercepted
calls through to here, and when it gets anErr
it callschannelmanager....fail_intercepted_htlc
, but the code here keeps the HTLC listed as pending.
Well, we def. need to keep the payment as pending, question is when we'd remove the HTLC. My thinking was that it would always happen as part of handling HTLCHandlingFailed
, but IIUC, we need #3551 for this to work reliably in all cases.
(b) doing the
take
at the top looks pretty brittle, and isn't required. eg the following patch compiles:
I think I considered this but somehow arrived at the opinion this wouldn't work (maybe something to do with persistence, as this commit came from that draft branch). I however can't recall why I thought that and indeed it seems the above approach should work just as well as being cleaner. Now added in a fixup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, we def. need to keep the payment as pending, question is when we'd remove the HTLC. My thinking was that it would always happen as part of handling HTLCHandlingFailed, but IIUC, we need #3551 for this to work reliably in all cases.
Ah, okay, I wasn't considering the fail_intercepted_htlc
generated an HTLCHandlingFailed
and we handled it that way....that said I'm not really convinced that's the right approach - that leaves a race where our state doesn't match the HTLC state and we might think we've received enough HTLCs if another one comes in before we process the event.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you plan on changing the approach here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you plan on changing the approach here?
No, I think I'd prefer to tackle this pre-existing in a follow-up, also since we might want to work out a common pattern in conjuction with/after #3712.
@@ -166,6 +163,8 @@ impl OutboundJITChannelState { | |||
if expected_payment_size_msat < opening_fee_params.min_payment_size_msat | |||
|| expected_payment_size_msat > opening_fee_params.max_payment_size_msat | |||
{ | |||
// Revert the queue before error'ing | |||
core::mem::swap(old_payment_queue, &mut payment_queue); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be easier to just push the HTLC before we move to the success block instead of pushing it then trying to revert before returning an Err...like this we'll end up adding a new error return in the future and forgetting to revert.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, although note that all following calculations are made based on the state with the HTLC already added. Pushing only in the success case would mean to refactor the logic to run the same calcualtions twice essentially.
4af5fc3
to
30e5fab
Compare
Alright, now added some rationale there. |
30e5fab
to
eb3ea83
Compare
b4dc220
to
fd7f65a
Compare
Somehow this is now failing |
Looks like we just need |
fd7f65a
to
a69b7e7
Compare
Lol, thanks for pointing that out. Hope it was indeed just that, even though it's an embarrassing oversight on my part. |
Oops, now missing: $ git diff
diff --git a/lightning-invoice/Cargo.toml b/lightning-invoice/Cargo.toml
index 7c49b2177..8e0c7587f 100644
--- a/lightning-invoice/Cargo.toml
+++ b/lightning-invoice/Cargo.toml
@@ -20,7 +20,7 @@ std = []
[dependencies]
bech32 = { version = "0.11.0", default-features = false }
lightning-types = { version = "0.3.0", path = "../lightning-types", default-features = false }
-serde = { version = "1.0.118", optional = true }
+serde = { version = "1.0", optional = true, default-features = false, features = ["alloc"] }
bitcoin = { version = "0.32.2", default-features = false, features = ["secp-recovery"] }
[dev-dependencies] |
a69b7e7
to
2c2d3a8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically LGTM, then.
@@ -30,17 +30,18 @@ use core::pin::Pin; | |||
use core::task::{Context, Poll}; | |||
|
|||
/// Used to signal to one of many waiters that the condition they're waiting on has happened. | |||
pub(crate) struct Notifier { | |||
pub struct Notifier { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs drastically better documentation if we're gonna make it public. Discuss use-cases, describe that notification coalescing happens, detail when a notification is considered "received" (either future poll completes Ready
or a callback returns (or just is called?))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now added some more docs, let me know if you want to add more specific phrasings somewhere.
Btw, I wonder if this line in the fn poll
implementation should actually be a debug_assert(true)
, as we should have always made the callbacks at this point, no?
state.callbacks_made = true;
2c2d3a8
to
278a877
Compare
Feel free to squash (once there's better |
The previous transition pattern of `OutboundJITChannelState` was never great: we'd take `&mut self`, only to also return `Self` and required updating the state externally to the state transtion methods. In addition, we previously wrapped `PaymentQueue` in an `Arc<Mutex<..>>` to avoid cloning them during state transtions. Here, we clean up all of this, having the state transtion methods updating the state in-place and merely returning an `action` in the method's `Result`s. We also use `core::mem::take` to move the `payment_queue` to the new states without reallocation.
Previously, when enqueuing new events to the `EventQueue`, we'd directly attempt to wake any notifiers/notify any threads waiting on the `Condvar` about the newly available events. This could of course mean we'd notify them while ourselves still holding some locks, e.g., on the peer state. Here, we instead introduce a `EventQueueNotifierGuard` type that will notify about pending events if necesssary, which mitigates any potential lock contention: we now simply have to ensure that any method calling `enqueue` holds the notifier before retrieving any locks.
.. in order to make handling generics easier, just as we do with `AChannelManager`, `AOnionMessenger`, etc.
Instead of doing the callback dance, we have `lightning-background-processor` take `lightning-liquidity` as a dependency and wake the BP whenever we enqueue new messages to the `MessageQueue`.
It seems that a lot of the generics on `BackgroundProcessor` don't actually require the `Sync + Send` bounds. Here, we therefore drop them where possible as the unncessary bounds could result in the compiler disallowing the use of certain types that aren't `Sync + Send`, even if run threadless environments (i.e., some `no_std` environments).
278a877
to
1251c96
Compare
Squashed fixups and included: > git diff-tree -U2 278a8773c 1251c962f
diff --git a/lightning/src/util/wakers.rs b/lightning/src/util/wakers.rs
index 6cd31c774..ce504f632 100644
--- a/lightning/src/util/wakers.rs
+++ b/lightning/src/util/wakers.rs
@@ -31,4 +31,10 @@ use core::task::{Context, Poll};
/// Used to signal to one of many waiters that the condition they're waiting on has happened.
+///
+/// This is usually used by LDK objects such as [`ChannelManager`] or [`PeerManager`] to signal to
+/// the background processor that it should wake up and process pending events.
+///
+/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
+/// [`PeerManager`]: crate::ln::peer_handler::PeerManager
pub struct Notifier {
notify_pending: Mutex<(bool, Option<Arc<Mutex<FutureState>>>)>,
@@ -42,4 +48,9 @@ impl Notifier {
/// Wake waiters, tracking that wake needs to occur even if there are currently no waiters.
+ ///
+ /// We deem the notification successful either directly after any callbacks were made, or after
+ /// the user [`poll`]ed a previously-completed future.
+ ///
+ /// [`poll`]: core::future::Future::poll
pub fn notify(&self) {
let mut lock = self.notify_pending.lock().unwrap(); |
I spent some time reviewing the PR and trying to understand every piece. I understand the overall concept and have no objections to the implementation. I do have a question regarding a design choice of the event queue (this was not introduced in this PR, but it's still relevant). Currently, the event queue only stores a single waker. If two tasks call next_event_async().await concurrently, they will race to overwrite that waker slot. When the notifier is dropped, it only wakes the last stored waker, leaving the first task permanently stuck in Pending. Here's a test case that reproduces this behavior: use super::*;
use crate::lsps0::event::LSPS0ClientEvent;
use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
use std::sync::Arc;
use tokio::sync::oneshot;
use tokio::time::{timeout, Duration};
#[tokio::test]
#[cfg(feature = "std")]
async fn single_slot_waker_overwrite_blocks_one_task() {
let secp = Secp256k1::new();
let counterparty_node_id =
PublicKey::from_secret_key(&secp, &SecretKey::from_slice(&[42; 32]).unwrap());
// build a dummy event
let evt = LiquidityEvent::LSPS0Client(LSPS0ClientEvent::ListProtocolsResponse {
counterparty_node_id,
protocols: Vec::new(),
});
let q = Arc::new(EventQueue::new());
// spawn two consumers
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let q1 = Arc::clone(&q);
tokio::spawn(async move {
let e = q1.next_event_async().await;
let _ = tx1.send(e);
});
let q2 = Arc::clone(&q);
tokio::spawn(async move {
let e = q2.next_event_async().await;
let _ = tx2.send(e);
});
// give them time to register their wakers
tokio::time::sleep(Duration::from_millis(10)).await;
{
let guard = q.notifier();
guard.enqueue(evt.clone());
} // drop → wake only the last stored waker
// wait on both, but assert exactly one succeeds
let res1 = timeout(Duration::from_millis(100), rx1).await;
let res2 = timeout(Duration::from_millis(100), rx2).await;
// exactly one channel must have received our event
let got1 = res1.ok().and_then(|r| r.ok());
let got2 = res2.ok().and_then(|r| r.ok());
// XOR
assert!(
got1.is_some() ^ got2.is_some(),
"expected exactly one receiver to get the event (got1={:?}, got2={:?})",
got1,
got2
);
// verify the one that did get it is correct
if let Some(got_evt) = got1.or(got2) {
assert_eq!(got_evt, evt);
}
} In this test, only one of the two listeners receives the event. The other is left hanging indefinitely. Would it make sense to store a Vec of wakers instead of a single waker? I'm not sure if I'm missing something obvious or if this is simply not needed |
Yes, it is the intended behavior to only wake up one task, as additional tasks would be blocking on acquiring the |
As discussed/mentioned in #3436, this is a prefactor to upcoming persistence/LSPS1 service/LSPS5 work.
Here, we introduce a
EventQueueNotifierGuard
type reducing the potential of lock contention arising from callingEventQueue::enqueue
while holding peer state locks.We furthermore wake the background processor to trigger message processing after we enqueue new messages.