Skip to content

[sql-28] firewall+sessions: clean up in preparation for SQL backend #1029

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 54 additions & 45 deletions firewalldb/kvstores_kvdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ func (db *BoltDB) GetKVStores(rule string, groupID session.ID,
db: db.DB,
wrapTx: func(tx *bbolt.Tx) KVStoreTx {
return &kvStoreTx{
boltTx: tx,
boltTx: tx,
sessions: db.sessionIDIndex,
kvStores: &kvStores{
ruleName: rule,
groupID: groupID,
Expand Down Expand Up @@ -109,18 +110,20 @@ type getBucketFunc func(tx *bbolt.Tx, create bool) (*bbolt.Bucket, error)
type kvStoreTx struct {
boltTx *bbolt.Tx
getBucket getBucketFunc
sessions session.IDToGroupIndex

*kvStores
}

// Global gives the caller access to the global kv store of the rule.
//
// NOTE: this is part of the rules.KVStoreTx interface.
func (tx *kvStoreTx) Global() KVStore {
func (s *kvStoreTx) Global() KVStore {
return &kvStoreTx{
kvStores: tx.kvStores,
boltTx: tx.boltTx,
getBucket: getGlobalRuleBucket(true, tx.ruleName),
kvStores: s.kvStores,
boltTx: s.boltTx,
sessions: s.sessions,
getBucket: getGlobalRuleBucket(true, s.ruleName),
}
}

Expand All @@ -129,17 +132,16 @@ func (tx *kvStoreTx) Global() KVStore {
// how the kv store was initialised.
//
// NOTE: this is part of the KVStoreTx interface.
func (tx *kvStoreTx) Local() KVStore {
fn := getSessionRuleBucket(true, tx.ruleName, tx.groupID)
if tx.featureName != "" {
fn = getSessionFeatureRuleBucket(
true, tx.ruleName, tx.groupID, tx.featureName,
)
func (s *kvStoreTx) Local() KVStore {
fn := s.getSessionRuleBucket(true)
if s.featureName != "" {
fn = s.getSessionFeatureRuleBucket(true)
}

return &kvStoreTx{
kvStores: tx.kvStores,
boltTx: tx.boltTx,
kvStores: s.kvStores,
boltTx: s.boltTx,
sessions: s.sessions,
getBucket: fn,
}
}
Expand All @@ -148,29 +150,29 @@ func (tx *kvStoreTx) Local() KVStore {
// rule.
//
// NOTE: this is part of the KVStoreTx interface.
func (tx *kvStoreTx) GlobalTemp() KVStore {
func (s *kvStoreTx) GlobalTemp() KVStore {
return &kvStoreTx{
kvStores: tx.kvStores,
boltTx: tx.boltTx,
getBucket: getGlobalRuleBucket(false, tx.ruleName),
kvStores: s.kvStores,
boltTx: s.boltTx,
sessions: s.sessions,
getBucket: getGlobalRuleBucket(false, s.ruleName),
}
}

// LocalTemp gives the caller access to the temporary local kv store of the
// rule.
//
// NOTE: this is part of the KVStoreTx interface.
func (tx *kvStoreTx) LocalTemp() KVStore {
fn := getSessionRuleBucket(false, tx.ruleName, tx.groupID)
if tx.featureName != "" {
fn = getSessionFeatureRuleBucket(
false, tx.ruleName, tx.groupID, tx.featureName,
)
func (s *kvStoreTx) LocalTemp() KVStore {
fn := s.getSessionRuleBucket(false)
if s.featureName != "" {
fn = s.getSessionFeatureRuleBucket(false)
}

return &kvStoreTx{
kvStores: tx.kvStores,
boltTx: tx.boltTx,
kvStores: s.kvStores,
boltTx: s.boltTx,
sessions: s.sessions,
getBucket: fn,
}
}
Expand All @@ -179,8 +181,8 @@ func (tx *kvStoreTx) LocalTemp() KVStore {
// If no value is found, nil is returned.
//
// NOTE: this is part of the KVStore interface.
func (tx *kvStoreTx) Get(_ context.Context, key string) ([]byte, error) {
bucket, err := tx.getBucket(tx.boltTx, false)
func (s *kvStoreTx) Get(_ context.Context, key string) ([]byte, error) {
bucket, err := s.getBucket(s.boltTx, false)
if err != nil {
return nil, err
}
Expand All @@ -194,8 +196,8 @@ func (tx *kvStoreTx) Get(_ context.Context, key string) ([]byte, error) {
// Set sets the given key-value pair in the underlying kv store.
//
// NOTE: this is part of the KVStore interface.
func (tx *kvStoreTx) Set(_ context.Context, key string, value []byte) error {
bucket, err := tx.getBucket(tx.boltTx, true)
func (s *kvStoreTx) Set(_ context.Context, key string, value []byte) error {
bucket, err := s.getBucket(s.boltTx, true)
if err != nil {
return err
}
Expand All @@ -206,8 +208,8 @@ func (tx *kvStoreTx) Set(_ context.Context, key string, value []byte) error {
// Del deletes the value under the given key in the underlying kv store.
//
// NOTE: this is part of the .KVStore interface.
func (tx *kvStoreTx) Del(_ context.Context, key string) error {
bucket, err := tx.getBucket(tx.boltTx, false)
func (s *kvStoreTx) Del(_ context.Context, key string) error {
bucket, err := s.getBucket(s.boltTx, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -286,11 +288,9 @@ func getGlobalRuleBucket(perm bool, ruleName string) getBucketFunc {
// bucket under which a kv store for a specific rule-name and group ID is
// stored. The `perm` param determines if the temporary or permanent store is
// used.
func getSessionRuleBucket(perm bool, ruleName string,
groupID session.ID) getBucketFunc {

func (s *kvStoreTx) getSessionRuleBucket(perm bool) getBucketFunc {
return func(tx *bbolt.Tx, create bool) (*bbolt.Bucket, error) {
ruleBucket, err := getRuleBucket(perm, ruleName)(tx, create)
ruleBucket, err := getRuleBucket(perm, s.ruleName)(tx, create)
if err != nil {
return nil, err
}
Expand All @@ -300,35 +300,44 @@ func getSessionRuleBucket(perm bool, ruleName string,
}

if create {
// NOTE: for a bbolt backend, the context is in any case
// dropped behind the GetSessionIDs call. So passing in
// a new context here is not a problem.
ctx := context.Background()

// If create is true, we expect this to be an existing
// session. So we check that now and return an error
// accordingly if the session does not exist.
_, err := s.sessions.GetSessionIDs(ctx, s.groupID)
if err != nil {
return nil, err
}

sessBucket, err := ruleBucket.CreateBucketIfNotExists(
sessKVStoreBucketKey,
)
if err != nil {
return nil, err
}

return sessBucket.CreateBucketIfNotExists(groupID[:])
return sessBucket.CreateBucketIfNotExists(s.groupID[:])
}

sessBucket := ruleBucket.Bucket(sessKVStoreBucketKey)
if sessBucket == nil {
return nil, nil
}
return sessBucket.Bucket(groupID[:]), nil
return sessBucket.Bucket(s.groupID[:]), nil
}
}

// getSessionFeatureRuleBucket returns a function that can be used to fetch the
// bucket under which a kv store for a specific rule-name, group ID and
// feature name is stored. The `perm` param determines if the temporary or
// permanent store is used.
func getSessionFeatureRuleBucket(perm bool, ruleName string,
groupID session.ID, featureName string) getBucketFunc {

func (s *kvStoreTx) getSessionFeatureRuleBucket(perm bool) getBucketFunc {
return func(tx *bbolt.Tx, create bool) (*bbolt.Bucket, error) {
sessBucket, err := getSessionRuleBucket(
perm, ruleName, groupID,
)(tx, create)
sessBucket, err := s.getSessionRuleBucket(perm)(tx, create)
if err != nil {
return nil, err
}
Expand All @@ -346,14 +355,14 @@ func getSessionFeatureRuleBucket(perm bool, ruleName string,
}

return featureBucket.CreateBucketIfNotExists(
[]byte(featureName),
[]byte(s.featureName),
)
}

featureBucket := sessBucket.Bucket(featureKVStoreBucketKey)
if featureBucket == nil {
return nil, nil
}
return featureBucket.Bucket([]byte(featureName)), nil
return featureBucket.Bucket([]byte(s.featureName)), nil
}
}
123 changes: 107 additions & 16 deletions firewalldb/kvstores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com./lightninglabs/lightning-terminal/session"
"github.com./lightningnetwork/lnd/clock"
"github.com./stretchr/testify/require"
)

Expand Down Expand Up @@ -83,15 +85,21 @@ func testTempAndPermStores(t *testing.T, featureSpecificStore bool) {
featureName = "auto-fees"
}

store := NewTestDB(t)
sessions := session.NewTestDB(t, clock.NewDefaultClock())
store := NewTestDBWithSessions(t, sessions)
db := NewDB(store)
require.NoError(t, db.Start(ctx))

kvstores := db.GetKVStores(
"test-rule", [4]byte{1, 1, 1, 1}, featureName,
// Create a session that we can reference.
sess, err := sessions.NewSession(
ctx, "test", session.TypeAutopilot, time.Unix(1000, 0),
"something",
)
require.NoError(t, err)

kvstores := db.GetKVStores("test-rule", sess.GroupID, featureName)

err := kvstores.Update(ctx, func(ctx context.Context,
err = kvstores.Update(ctx, func(ctx context.Context,
tx KVStoreTx) error {

// Set an item in the temp store.
Expand Down Expand Up @@ -137,7 +145,7 @@ func testTempAndPermStores(t *testing.T, featureSpecificStore bool) {
require.NoError(t, db.Stop())
})

kvstores = db.GetKVStores("test-rule", [4]byte{1, 1, 1, 1}, featureName)
kvstores = db.GetKVStores("test-rule", sess.GroupID, featureName)

// The temp store should no longer have the stored value but the perm
// store should .
Expand All @@ -164,23 +172,31 @@ func testTempAndPermStores(t *testing.T, featureSpecificStore bool) {
func TestKVStoreNameSpaces(t *testing.T) {
t.Parallel()
ctx := context.Background()
db := NewTestDB(t)

var (
groupID1 = intToSessionID(1)
groupID2 = intToSessionID(2)
sessions := session.NewTestDB(t, clock.NewDefaultClock())
db := NewTestDBWithSessions(t, sessions)

// Create 2 sessions that we can reference.
sess1, err := sessions.NewSession(
ctx, "test", session.TypeAutopilot, time.Unix(1000, 0), "",
)
require.NoError(t, err)

sess2, err := sessions.NewSession(
ctx, "test1", session.TypeAutopilot, time.Unix(1000, 0), "",
)
require.NoError(t, err)

// Two DBs for same group but different features.
rulesDB1 := db.GetKVStores("test-rule", groupID1, "auto-fees")
rulesDB2 := db.GetKVStores("test-rule", groupID1, "re-balance")
rulesDB1 := db.GetKVStores("test-rule", sess1.GroupID, "auto-fees")
rulesDB2 := db.GetKVStores("test-rule", sess1.GroupID, "re-balance")

// The third DB is for the same rule but a different group. It is
// for the same feature as db 2.
rulesDB3 := db.GetKVStores("test-rule", groupID2, "re-balance")
rulesDB3 := db.GetKVStores("test-rule", sess2.GroupID, "re-balance")

// Test that the three ruleDBs share the same global space.
err := rulesDB1.Update(ctx, func(ctx context.Context,
err = rulesDB1.Update(ctx, func(ctx context.Context,
tx KVStoreTx) error {

return tx.Global().Set(
Expand Down Expand Up @@ -311,9 +327,9 @@ func TestKVStoreNameSpaces(t *testing.T) {
// Test that the group space is shared by the first two dbs but not
// the third. To do this, we re-init the DB's but leave the feature
// names out. This way, we will access the group storage.
rulesDB1 = db.GetKVStores("test-rule", groupID1, "")
rulesDB2 = db.GetKVStores("test-rule", groupID1, "")
rulesDB3 = db.GetKVStores("test-rule", groupID2, "")
rulesDB1 = db.GetKVStores("test-rule", sess1.GroupID, "")
rulesDB2 = db.GetKVStores("test-rule", sess1.GroupID, "")
rulesDB3 = db.GetKVStores("test-rule", sess2.GroupID, "")

err = rulesDB1.Update(ctx, func(ctx context.Context,
tx KVStoreTx) error {
Expand Down Expand Up @@ -376,6 +392,81 @@ func TestKVStoreNameSpaces(t *testing.T) {
require.True(t, bytes.Equal(v, []byte("thing 3")))
}

// TestKVStoreSessionCoupling tests if we attempt to write to a kvstore that
// is namespaced by a session that does not exist, then we should get an error.
func TestKVStoreSessionCoupling(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔥

t.Parallel()
ctx := context.Background()

sessions := session.NewTestDB(t, clock.NewDefaultClock())
db := NewTestDBWithSessions(t, sessions)

// Get a kvstore namespaced by a session ID for a session that does
// not exist.
store := db.GetKVStores("AutoFees", [4]byte{1, 1, 1, 1}, "auto-fees")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to already error here in case the session is not found, or does this prevent some functionality?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just cause of how this works under the hood on db types, it makes more sense to have this check at DB access time. THis call doesnt currently make a DB call. Also i think we want Get/Delete to not necessarily error out if the session does not exist yet - those should just return nil, nil


err := store.Update(ctx, func(ctx context.Context,
tx KVStoreTx) error {

// First, show that any call to the global namespace will not
// error since it is not namespaced by a session.
res, err := tx.Global().Get(ctx, "foo")
require.NoError(t, err)
require.Nil(t, res)

err = tx.Global().Set(ctx, "foo", []byte("bar"))
require.NoError(t, err)

res, err = tx.Global().Get(ctx, "foo")
require.NoError(t, err)
require.Equal(t, []byte("bar"), res)

// Now we switch to the local store. We don't expect the Get
// call to error since it should just return a nil value for
// key that has not been set.
_, err = tx.Local().Get(ctx, "foo")
require.NoError(t, err)

// For Set, we expect an error since the session does not exist.
err = tx.Local().Set(ctx, "foo", []byte("bar"))
require.ErrorIs(t, err, session.ErrUnknownGroup)

// We again don't expect the error for delete since we just
// expect it to return nil if the key is not found.
err = tx.Local().Del(ctx, "foo")
require.NoError(t, err)

return nil
})
require.NoError(t, err)

// Now, go and create a sessions in the session DB.
sess, err := sessions.NewSession(
ctx, "test", session.TypeAutopilot, time.Unix(1000, 0),
"something",
)
require.NoError(t, err)

// Get a kvstore namespaced by a session ID for a session that now
// does exist.
store = db.GetKVStores("AutoFees", sess.GroupID, "auto-fees")

// Now, repeat the "Set" call for this session's kvstore to
// show that it no longer errors.
err = store.Update(ctx, func(ctx context.Context, tx KVStoreTx) error {
// For Set, we expect an error since the session does not exist.
err = tx.Local().Set(ctx, "foo", []byte("bar"))
require.NoError(t, err)

res, err := tx.Local().Get(ctx, "foo")
require.NoError(t, err)
require.Equal(t, []byte("bar"), res)

return nil
})
require.NoError(t, err)
}

func intToSessionID(i uint32) session.ID {
var id session.ID
byteOrder.PutUint32(id[:], i)
Expand Down
Loading
Loading