diff --git a/benches/payments.rs b/benches/payments.rs index 52769d794..8ded1399e 100644 --- a/benches/payments.rs +++ b/benches/payments.rs @@ -127,6 +127,7 @@ fn payment_benchmark(c: &mut Criterion) { true, false, common::TestStoreType::Sqlite, + common::TestStoreType::Sqlite, ); let runtime = diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index c1b97e0e7..d25bba319 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -32,6 +32,28 @@ interface LogWriter { void log(LogRecord record); }; +[Trait, WithForeign] +interface FfiDynStoreTrait { + [Throws=IOError, Async] + bytes read_async(string primary_namespace, string secondary_namespace, string key); + [Throws=IOError, Async] + void write_async(string primary_namespace, string secondary_namespace, string key, bytes buf); + [Throws=IOError, Async] + void remove_async(string primary_namespace, string secondary_namespace, string key, boolean lazy); + [Throws=IOError, Async] + sequence list_async(string primary_namespace, string secondary_namespace); + + [Throws=IOError] + bytes read(string primary_namespace, string secondary_namespace, string key); + [Throws=IOError] + void write(string primary_namespace, string secondary_namespace, string key, bytes buf); + [Throws=IOError] + void remove(string primary_namespace, string secondary_namespace, string key, boolean lazy); + [Throws=IOError] + sequence list(string primary_namespace, string secondary_namespace); +}; + + interface Builder { constructor(); [Name=from_config] @@ -58,6 +80,8 @@ interface Builder { void set_tor_config(TorConfig tor_config); [Throws=BuildError] void set_node_alias(string node_alias); + void set_backup_store(FfiDynStoreTrait backup_store); + void set_ephemeral_store(FfiDynStoreTrait ephemeral_store); [Throws=BuildError] void set_async_payments_role(AsyncPaymentsRole? role); void set_wallet_recovery_mode(); @@ -73,6 +97,8 @@ interface Builder { Node build_with_vss_store_and_fixed_headers(NodeEntropy node_entropy, string vss_url, string store_id, record fixed_headers); [Throws=BuildError] Node build_with_vss_store_and_header_provider(NodeEntropy node_entropy, string vss_url, string store_id, VssHeaderProvider header_provider); + [Throws=BuildError] + Node build_with_store(NodeEntropy node_entropy, FfiDynStoreTrait store); }; interface Node { @@ -231,6 +257,8 @@ enum NodeError { "InvalidLnurl", }; +typedef enum IOError; + typedef dictionary NodeStatus; [Remote] diff --git a/bindings/python/src/ldk_node/kv_store.py b/bindings/python/src/ldk_node/kv_store.py new file mode 100644 index 000000000..495b427b1 --- /dev/null +++ b/bindings/python/src/ldk_node/kv_store.py @@ -0,0 +1,110 @@ +import threading + +from abc import ABC, abstractmethod +from typing import List + +from ldk_node import IoError + +class AbstractKvStore(ABC): + @abstractmethod + async def read_async(self, primary_namespace: "str",secondary_namespace: "str",key: "str") -> "List[int]": + pass + + @abstractmethod + async def write_async(self, primary_namespace: "str",secondary_namespace: "str",key: "str",buf: "List[int]") -> None: + pass + + @abstractmethod + async def remove_async(self, primary_namespace: "str",secondary_namespace: "str",key: "str",lazy: "bool") -> None: + pass + + @abstractmethod + async def list_async(self, primary_namespace: "str",secondary_namespace: "str") -> "List[str]": + pass + + @abstractmethod + def read(self, primary_namespace: "str",secondary_namespace: "str",key: "str") -> "List[int]": + pass + + @abstractmethod + def write(self, primary_namespace: "str",secondary_namespace: "str",key: "str",buf: "List[int]") -> None: + pass + + @abstractmethod + def remove(self, primary_namespace: "str",secondary_namespace: "str",key: "str",lazy: "bool") -> None: + pass + + @abstractmethod + def list(self, primary_namespace: "str",secondary_namespace: "str") -> "List[str]": + pass + +class TestKvStore(AbstractKvStore): + def __init__(self, name: str): + self.name = name + # Storage structure: {(primary_ns, secondary_ns): {key: [bytes]}} + self.storage = {} + self._lock = threading.Lock() + + def dump(self): + print(f"\n[{self.name}] Store contents:") + for (primary_ns, secondary_ns), keys_dict in self.storage.items(): + print(f" Namespace: ({primary_ns!r}, {secondary_ns!r})") + for key, data in keys_dict.items(): + print(f" Key: {key!r} -> {len(data)} bytes") + # Optionally show first few bytes + preview = data[:20] if len(data) > 20 else data + print(f" Data preview: {preview}...") + + def read(self, primary_namespace: str, secondary_namespace: str, key: str) -> List[int]: + with self._lock: + namespace_key = (primary_namespace, secondary_namespace) + + if namespace_key not in self.storage: + raise IoError.NotFound() + + if key not in self.storage[namespace_key]: + raise IoError.NotFound() + + return list(self.storage[namespace_key][key]) + + def write(self, primary_namespace: str, secondary_namespace: str, key: str, buf: List[int]) -> None: + with self._lock: + namespace_key = (primary_namespace, secondary_namespace) + if namespace_key not in self.storage: + self.storage[namespace_key] = {} + + self.storage[namespace_key][key] = list(buf) + + def remove(self, primary_namespace: str, secondary_namespace: str, key: str, lazy: bool) -> None: + with self._lock: + namespace_key = (primary_namespace, secondary_namespace) + if namespace_key not in self.storage: + raise IoError.NotFound() + + if key not in self.storage[namespace_key]: + raise IoError.NotFound() + + del self.storage[namespace_key][key] + + if not self.storage[namespace_key]: + del self.storage[namespace_key] + + def list(self, primary_namespace: str, secondary_namespace: str) -> List[str]: + with self._lock: + namespace_key = (primary_namespace, secondary_namespace) + if namespace_key in self.storage: + return sorted(self.storage[namespace_key].keys()) + return [] + + async def read_async(self, primary_namespace: str, secondary_namespace: str, key: str) -> List[int]: + return self.read(primary_namespace, secondary_namespace, key) + + async def write_async(self, primary_namespace: str, secondary_namespace: str, key: str, buf: List[int]) -> None: + self.write(primary_namespace, secondary_namespace, key, buf) + + async def remove_async(self, primary_namespace: str, secondary_namespace: str, key: str, lazy: bool) -> None: + self.remove(primary_namespace, secondary_namespace, key, lazy) + + async def list_async(self, primary_namespace: str, secondary_namespace: str) -> List[str]: + return self.list(primary_namespace, secondary_namespace) + \ No newline at end of file diff --git a/bindings/python/src/ldk_node/test_ldk_node.py b/bindings/python/src/ldk_node/test_ldk_node.py index 4f53dbabf..2d796ddda 100644 --- a/bindings/python/src/ldk_node/test_ldk_node.py +++ b/bindings/python/src/ldk_node/test_ldk_node.py @@ -5,13 +5,30 @@ import os import re import requests +import asyncio +import threading +import ldk_node from ldk_node import * +from kv_store import TestKvStore DEFAULT_ESPLORA_SERVER_URL = "http://127.0.0.1:3002" DEFAULT_TEST_NETWORK = Network.REGTEST DEFAULT_BITCOIN_CLI_BIN = "bitcoin-cli" +class NodeSetup: + def __init__(self, node, node_id, tmp_dir, listening_addresses, stores=None): + self.node = node + self.node_id = node_id + self.tmp_dir = tmp_dir + self.listening_addresses = listening_addresses + self.stores = stores # (primary, backup, ephemeral) or None + + def cleanup(self): + self.node.stop() + time.sleep(1) + self.tmp_dir.cleanup() + def bitcoin_cli(cmd): args = [] @@ -95,7 +112,6 @@ def send_to_address(address, amount_sats): print("SEND TX:", res) return res - def setup_node(tmp_dir, esplora_endpoint, listening_addresses): mnemonic = generate_entropy_mnemonic(None) node_entropy = NodeEntropy.from_bip39_mnemonic(mnemonic, None) @@ -107,134 +123,260 @@ def setup_node(tmp_dir, esplora_endpoint, listening_addresses): builder.set_listening_addresses(listening_addresses) return builder.build(node_entropy) -def get_esplora_endpoint(): - if os.environ.get('ESPLORA_ENDPOINT'): - return str(os.environ['ESPLORA_ENDPOINT']) - return DEFAULT_ESPLORA_SERVER_URL +def setup_two_nodes(esplora_endpoint, port_1=2323, port_2=2324, use_tier_store=False) -> tuple[NodeSetup, NodeSetup]: + # Setup Node 1 + tmp_dir_1 = tempfile.TemporaryDirectory("_ldk_node_1") + print("TMP DIR 1:", tmp_dir_1.name) + + listening_addresses_1 = [f"127.0.0.1:{port_1}"] + if use_tier_store: + node_1, stores_1 = setup_node_with_tier_store(tmp_dir_1.name, esplora_endpoint, listening_addresses_1) + else: + node_1 = setup_node(tmp_dir_1.name, esplora_endpoint, listening_addresses_1) + stores_1 = None + + node_1.start() + node_id_1 = node_1.node_id() + print("Node ID 1:", node_id_1) + + setup_1 = NodeSetup(node_1, node_id_1, tmp_dir_1, listening_addresses_1, stores_1) + + # Setup Node 2 + tmp_dir_2 = tempfile.TemporaryDirectory("_ldk_node_2") + print("TMP DIR 2:", tmp_dir_2.name) + + listening_addresses_2 = [f"127.0.0.1:{port_2}"] + if use_tier_store: + node_2, stores_2 = setup_node_with_tier_store(tmp_dir_2.name, esplora_endpoint, listening_addresses_2) + else: + node_2 = setup_node(tmp_dir_2.name, esplora_endpoint, listening_addresses_2) + stores_2 = None + + node_2.start() + node_id_2 = node_2.node_id() + print("Node ID 2:", node_id_2) + + setup_2 = NodeSetup(node_2, node_id_2, tmp_dir_2, listening_addresses_2, stores_2) + + return setup_1, setup_2 +def setup_node_with_tier_store(tmp_dir, esplora_endpoint, listening_addresses) -> tuple[Node, tuple[TestKvStore, TestKvStore, TestKvStore]]: + mnemonic = generate_entropy_mnemonic(None) + node_entropy = NodeEntropy.from_bip39_mnemonic(mnemonic, None) + config = default_config() -def expect_event(node, expected_event_type): - event = node.wait_next_event() - assert isinstance(event, expected_event_type) - print("EVENT:", event) - node.event_handled() - return event + primary = TestKvStore("primary") + backup = TestKvStore("backup") + ephemeral = TestKvStore("ephemeral") + builder = Builder.from_config(config) + builder.set_storage_dir_path(tmp_dir) + builder.set_chain_source_esplora(esplora_endpoint, None) + builder.set_network(DEFAULT_TEST_NETWORK) + builder.set_listening_addresses(listening_addresses) + builder.set_backup_store(backup) + builder.set_ephemeral_store(ephemeral) + + return builder.build_with_store(node_entropy, primary), (primary, backup, ephemeral) +def do_channel_full_cycle(setup_1: NodeSetup, setup_2: NodeSetup, esplora_endpoint): + # Fund both nodes + (node_1, node_2) = (setup_1.node, setup_2.node) + address_1 = node_1.onchain_payment().new_address() + txid_1 = send_to_address(address_1, 100000) + address_2 = node_2.onchain_payment().new_address() + txid_2 = send_to_address(address_2, 100000) -class TestLdkNode(unittest.TestCase): - def setUp(self): - bitcoin_cli("createwallet ldk_node_test") - mine(101) - time.sleep(3) - esplora_endpoint = get_esplora_endpoint() - mine_and_wait(esplora_endpoint, 1) + wait_for_tx(esplora_endpoint, txid_1) + wait_for_tx(esplora_endpoint, txid_2) - def test_channel_full_cycle(self): - esplora_endpoint = get_esplora_endpoint() + mine_and_wait(esplora_endpoint, 6) - ## Setup Node 1 - tmp_dir_1 = tempfile.TemporaryDirectory("_ldk_node_1") - print("TMP DIR 1:", tmp_dir_1.name) + node_1.sync_wallets() + node_2.sync_wallets() - listening_addresses_1 = ["127.0.0.1:2323"] - node_1 = setup_node(tmp_dir_1.name, esplora_endpoint, listening_addresses_1) - node_1.start() - node_id_1 = node_1.node_id() - print("Node ID 1:", node_id_1) + spendable_balance_1 = node_1.list_balances().spendable_onchain_balance_sats + spendable_balance_2 = node_2.list_balances().spendable_onchain_balance_sats + total_balance_1 = node_1.list_balances().total_onchain_balance_sats + total_balance_2 = node_2.list_balances().total_onchain_balance_sats - # Setup Node 2 - tmp_dir_2 = tempfile.TemporaryDirectory("_ldk_node_2") - print("TMP DIR 2:", tmp_dir_2.name) + print("SPENDABLE 1:", spendable_balance_1) + assert spendable_balance_1 == 100000 - listening_addresses_2 = ["127.0.0.1:2324"] - node_2 = setup_node(tmp_dir_2.name, esplora_endpoint, listening_addresses_2) - node_2.start() - node_id_2 = node_2.node_id() - print("Node ID 2:", node_id_2) + print("SPENDABLE 2:", spendable_balance_2) + assert spendable_balance_2 == 100000 - address_1 = node_1.onchain_payment().new_address() - txid_1 = send_to_address(address_1, 100000) - address_2 = node_2.onchain_payment().new_address() - txid_2 = send_to_address(address_2, 100000) + print("TOTAL 1:", total_balance_1) + assert total_balance_1 == 100000 - wait_for_tx(esplora_endpoint, txid_1) - wait_for_tx(esplora_endpoint, txid_2) + print("TOTAL 2:", total_balance_2) + assert total_balance_2 == 100000 - mine_and_wait(esplora_endpoint, 6) + (node_id_2, listening_addresses_2) = (setup_2.node_id, setup_2.listening_addresses) + node_1.open_channel(node_id_2, listening_addresses_2[0], 50000, None, None) - node_1.sync_wallets() - node_2.sync_wallets() + channel_pending_event_1 = expect_event(node_1, Event.CHANNEL_PENDING) + channel_pending_event_2 = expect_event(node_2, Event.CHANNEL_PENDING) + funding_txid = channel_pending_event_1.funding_txo.txid + wait_for_tx(esplora_endpoint, funding_txid) + mine_and_wait(esplora_endpoint, 6) - spendable_balance_1 = node_1.list_balances().spendable_onchain_balance_sats - spendable_balance_2 = node_2.list_balances().spendable_onchain_balance_sats - total_balance_1 = node_1.list_balances().total_onchain_balance_sats - total_balance_2 = node_2.list_balances().total_onchain_balance_sats + node_1.sync_wallets() + node_2.sync_wallets() - print("SPENDABLE 1:", spendable_balance_1) - self.assertEqual(spendable_balance_1, 100000) + channel_ready_event_1 = expect_event(node_1, Event.CHANNEL_READY) + print("funding_txo:", funding_txid) - print("SPENDABLE 2:", spendable_balance_2) - self.assertEqual(spendable_balance_2, 100000) + channel_ready_event_2 = expect_event(node_2, Event.CHANNEL_READY) - print("TOTAL 1:", total_balance_1) - self.assertEqual(total_balance_1, 100000) + description = Bolt11InvoiceDescription.DIRECT("asdf") + invoice = node_2.bolt11_payment().receive(2500000, description, 9217) + node_1.bolt11_payment().send(invoice, None) - print("TOTAL 2:", total_balance_2) - self.assertEqual(total_balance_2, 100000) + expect_event(node_1, Event.PAYMENT_SUCCESSFUL) - node_1.open_channel(node_id_2, listening_addresses_2[0], 50000, None, None) + expect_event(node_2, Event.PAYMENT_RECEIVED) + node_id_1 = setup_1.node_id + time.sleep(1) + node_2.close_channel(channel_ready_event_2.user_channel_id, node_id_1) - channel_pending_event_1 = expect_event(node_1, Event.CHANNEL_PENDING) - channel_pending_event_2 = expect_event(node_2, Event.CHANNEL_PENDING) - funding_txid = channel_pending_event_1.funding_txo.txid - wait_for_tx(esplora_endpoint, funding_txid) - mine_and_wait(esplora_endpoint, 6) + # expect channel closed event on both nodes + expect_event(node_1, Event.CHANNEL_CLOSED) - node_1.sync_wallets() - node_2.sync_wallets() + expect_event(node_2, Event.CHANNEL_CLOSED) - channel_ready_event_1 = expect_event(node_1, Event.CHANNEL_READY) - print("funding_txo:", funding_txid) + mine_and_wait(esplora_endpoint, 1) - channel_ready_event_2 = expect_event(node_2, Event.CHANNEL_READY) + node_1.sync_wallets() + node_2.sync_wallets() - description = Bolt11InvoiceDescription.DIRECT("asdf") - invoice = node_2.bolt11_payment().receive(2500000, description, 9217) - node_1.bolt11_payment().send(invoice, None) - - expect_event(node_1, Event.PAYMENT_SUCCESSFUL) + spendable_balance_after_close_1 = node_1.list_balances().spendable_onchain_balance_sats + assert spendable_balance_after_close_1 > 95000 + assert spendable_balance_after_close_1 < 100000 + spendable_balance_after_close_2 = node_2.list_balances().spendable_onchain_balance_sats + assert spendable_balance_after_close_2 == 102500 - expect_event(node_2, Event.PAYMENT_RECEIVED) - +def get_esplora_endpoint(): + if os.environ.get('ESPLORA_ENDPOINT'): + return str(os.environ['ESPLORA_ENDPOINT']) + return DEFAULT_ESPLORA_SERVER_URL + + +def expect_event(node, expected_event_type): + event = node.wait_next_event() + assert isinstance(event, expected_event_type) + print("EVENT:", event) + node.event_handled() + return event + +def wait_until(predicate, timeout=10.0, interval=0.1, message="condition not met before timeout"): + deadline = time.time() + timeout + while time.time() < deadline: + if predicate(): + return + time.sleep(interval) + raise AssertionError(message) - node_2.close_channel(channel_ready_event_2.user_channel_id, node_id_1) - # expect channel closed event on both nodes - expect_event(node_1, Event.CHANNEL_CLOSED) +def stores_match(store_a, store_b): + if set(store_a.storage.keys()) != set(store_b.storage.keys()): + return False - expect_event(node_2, Event.CHANNEL_CLOSED) + for namespace_key in store_a.storage: + keys_a = store_a.storage[namespace_key] + keys_b = store_b.storage[namespace_key] + if set(keys_a.keys()) != set(keys_b.keys()): + return False + + for key in keys_a: + if keys_a[key] != keys_b[key]: + return False + + return True + +class TestLdkNode(unittest.TestCase): + def setUp(self): + bitcoin_cli("createwallet ldk_node_test") + mine(101) + time.sleep(3) + esplora_endpoint = get_esplora_endpoint() mine_and_wait(esplora_endpoint, 1) - node_1.sync_wallets() - node_2.sync_wallets() + def test_channel_full_cycle(self): + esplora_endpoint = get_esplora_endpoint() + setup_1, setup_2 = setup_two_nodes(esplora_endpoint) + + try: + do_channel_full_cycle(setup_1, setup_2, esplora_endpoint) + finally: + setup_1.cleanup() + setup_2.cleanup() + - spendable_balance_after_close_1 = node_1.list_balances().spendable_onchain_balance_sats - assert spendable_balance_after_close_1 > 95000 - assert spendable_balance_after_close_1 < 100000 - spendable_balance_after_close_2 = node_2.list_balances().spendable_onchain_balance_sats - self.assertEqual(spendable_balance_after_close_2, 102500) + def test_tier_store(self): + # Set an event loop for async Python callbacks invoked from Rust. + # (https://mozilla.github.io/uniffi-rs/0.27/futures.html#python-uniffi_set_event_loop) + loop = asyncio.new_event_loop() - # Stop nodes - node_1.stop() - node_2.stop() + def run_loop(): + asyncio.set_event_loop(loop) + loop.run_forever() - # Cleanup - time.sleep(1) # Wait a sec so our logs can finish writing - tmp_dir_1.cleanup() - tmp_dir_2.cleanup() + loop_thread = threading.Thread(target=run_loop, daemon=True) + loop_thread.start() + ldk_node.uniffi_set_event_loop(loop) + + esplora_endpoint = get_esplora_endpoint() + setup_1, setup_2 = setup_two_nodes( + esplora_endpoint, + port_1=2325, + port_2=2326, + use_tier_store=True, + ) + + try: + do_channel_full_cycle(setup_1, setup_2, esplora_endpoint) + + primary, backup, ephemeral = setup_1.stores + + wait_until( + lambda: len(primary.storage) > 0, + message="primary store did not receive any data", + ) + wait_until( + lambda: len(backup.storage) > 0, + message="backup store did not receive any data", + ) + wait_until( + lambda: stores_match(primary, backup), + timeout=15.0, + message="backup store did not converge to primary contents", + ) + + self.assertTrue(stores_match(primary, backup), "backup should mirror primary contents") + + self.assertGreater(len(ephemeral.storage), 0, "ephemeral should have data") + + ephemeral_keys = [ + key + for namespace in ephemeral.storage.values() + for key in namespace.keys() + ] + has_scorer_or_graph = any( + key in ["scorer", "network_graph"] for key in ephemeral_keys + ) + self.assertTrue( + has_scorer_or_graph, + "ephemeral should contain scorer or network_graph data", + ) + finally: + setup_1.cleanup() + setup_2.cleanup() + loop.call_soon_threadsafe(loop.stop) + loop_thread.join(timeout=5) + loop.close() if __name__ == '__main__': unittest.main() diff --git a/src/builder.rs b/src/builder.rs index cd8cc184f..cc7a92747 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -52,8 +52,11 @@ use crate::connection::ConnectionManager; use crate::entropy::NodeEntropy; use crate::event::EventQueue; use crate::fee_estimator::OnchainFeeEstimator; +#[cfg(feature = "uniffi")] +use crate::ffi::{FfiDynStore, FfiDynStoreTrait}; use crate::gossip::GossipSource; use crate::io::sqlite_store::SqliteStore; +use crate::io::tier_store::TierStore; use crate::io::utils::{ read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph, read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_pending_payments, @@ -151,6 +154,21 @@ impl std::fmt::Debug for LogWriterConfig { } } +#[derive(Default)] +struct TierStoreConfig { + ephemeral: Option>, + backup: Option>, +} + +impl std::fmt::Debug for TierStoreConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TierStoreConfig") + .field("ephemeral", &self.ephemeral.as_ref().map(|_| "Arc")) + .field("backup", &self.backup.as_ref().map(|_| "Arc")) + .finish() + } +} + /// An error encountered during building a [`Node`]. /// /// [`Node`]: crate::Node @@ -278,6 +296,7 @@ pub struct NodeBuilder { liquidity_source_config: Option, log_writer_config: Option, async_payments_role: Option, + tier_store_config: Option, runtime_handle: Option, pathfinding_scores_sync_config: Option, recovery_mode: bool, @@ -296,6 +315,7 @@ impl NodeBuilder { let gossip_source_config = None; let liquidity_source_config = None; let log_writer_config = None; + let tier_store_config = None; let runtime_handle = None; let pathfinding_scores_sync_config = None; let recovery_mode = false; @@ -305,6 +325,7 @@ impl NodeBuilder { gossip_source_config, liquidity_source_config, log_writer_config, + tier_store_config, runtime_handle, async_payments_role: None, pathfinding_scores_sync_config, @@ -614,6 +635,36 @@ impl NodeBuilder { self } + /// Configures the backup store for local disaster recovery. + /// + /// When building with tiered storage, this store receives a second durable + /// copy of data written to the primary store. + /// + /// Writes and removals for primary-backed data only succeed once both the + /// primary and backup stores complete successfully. + /// + /// If not set, durable data will be stored only in the primary store. + #[allow(dead_code)] // Used by subsequent FFI/test integration commits. + pub fn set_backup_store(&mut self, backup_store: Arc) -> &mut Self { + let tier_store_config = self.tier_store_config.get_or_insert(TierStoreConfig::default()); + tier_store_config.backup = Some(backup_store); + self + } + + /// Configures the ephemeral store for non-critical, frequently-accessed data. + /// + /// When building with tiered storage, this store is used for ephemeral data like + /// the network graph and scorer data to reduce latency for reads. Data stored here + /// can be rebuilt if lost. + /// + /// If not set, non-critical data will be stored in the primary store. + #[allow(dead_code)] // Used by subsequent FFI/test integration commits. + pub fn set_ephemeral_store(&mut self, ephemeral_store: Arc) -> &mut Self { + let tier_store_config = self.tier_store_config.get_or_insert(TierStoreConfig::default()); + tier_store_config.ephemeral = Some(ephemeral_store); + self + } + /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self, node_entropy: NodeEntropy) -> Result { @@ -762,8 +813,23 @@ impl NodeBuilder { } /// Builds a [`Node`] instance according to the options previously configured. + /// + /// The provided `kv_store` will be used as the primary storage backend. Optionally, + /// an ephemeral store for frequently-accessed non-critical data (e.g., network graph, scorer) + /// and a backup store for local disaster recovery can be configured via + /// [`set_ephemeral_store`] and [`set_backup_store`]. + /// + /// [`set_ephemeral_store`]: Self::set_ephemeral_store + /// [`set_backup_store`]: Self::set_backup_store pub fn build_with_store( &self, node_entropy: NodeEntropy, kv_store: S, + ) -> Result { + let primary_store: Arc = Arc::new(DynStoreWrapper(kv_store)); + self.build_with_dynstore(node_entropy, primary_store) + } + + fn build_with_dynstore( + &self, node_entropy: NodeEntropy, primary_store: Arc, ) -> Result { let logger = setup_logger(&self.log_writer_config, &self.config)?; @@ -776,6 +842,13 @@ impl NodeBuilder { })?) }; + let ts_config = self.tier_store_config.as_ref(); + let mut tier_store = TierStore::new(primary_store, Arc::clone(&logger)); + if let Some(config) = ts_config { + config.ephemeral.as_ref().map(|s| tier_store.set_ephemeral_store(Arc::clone(s))); + config.backup.as_ref().map(|s| tier_store.set_backup_store(Arc::clone(s))); + } + let seed_bytes = node_entropy.to_seed_bytes(); let config = Arc::new(self.config.clone()); @@ -790,7 +863,7 @@ impl NodeBuilder { seed_bytes, runtime, logger, - Arc::new(DynStoreWrapper(kv_store)), + Arc::new(DynStoreWrapper(tier_store)), ) } } @@ -1081,6 +1154,32 @@ impl ArcedNodeBuilder { self.inner.write().unwrap().set_wallet_recovery_mode(); } + /// Configures the backup store for local disaster recovery. + /// + /// When building with tiered storage, this store receives a second durable + /// copy of data written to the primary store. + /// + /// Writes and removals for primary-backed data only succeed once both the + /// primary and backup stores complete successfully. + /// + /// If not set, durable data will be stored only in the primary store. + pub fn set_backup_store(&self, backup_store: Arc) { + let store: Arc = Arc::new(FfiDynStore::from_store(backup_store)); + self.inner.write().unwrap().set_backup_store(store); + } + + /// Configures the ephemeral store for non-critical, frequently-accessed data. + /// + /// When building with tiered storage, this store is used for ephemeral data like + /// the network graph and scorer data to reduce latency for reads. Data stored here + /// can be rebuilt if lost. + /// + /// If not set, non-critical data will be stored in the primary store. + pub fn set_ephemeral_store(&self, ephemeral_store: Arc) { + let store: Arc = Arc::new(FfiDynStore::from_store(ephemeral_store)); + self.inner.write().unwrap().set_ephemeral_store(store); + } + /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self, node_entropy: Arc) -> Result, BuildError> { @@ -1209,12 +1308,19 @@ impl ArcedNodeBuilder { } /// Builds a [`Node`] instance according to the options previously configured. - // Note that the generics here don't actually work for Uniffi, but we don't currently expose - // this so its not needed. - pub fn build_with_store( - &self, node_entropy: Arc, kv_store: S, + /// + /// The provided `kv_store` will be used as the primary storage backend. Optionally, + /// an ephemeral store for frequently-accessed non-critical data (e.g., network graph, scorer) + /// and a backup store for local disaster recovery can be configured via + /// [`set_ephemeral_store`] and [`set_backup_store`]. + /// + /// [`set_ephemeral_store`]: Self::set_ephemeral_store + /// [`set_backup_store`]: Self::set_backup_store + pub fn build_with_store( + &self, node_entropy: Arc, kv_store: Arc, ) -> Result, BuildError> { - self.inner.read().unwrap().build_with_store(*node_entropy, kv_store).map(Arc::new) + let store: Arc = Arc::new(FfiDynStore::from_store(kv_store)); + self.inner.read().unwrap().build_with_dynstore(*node_entropy, store).map(Arc::new) } } diff --git a/src/ffi/io.rs b/src/ffi/io.rs new file mode 100644 index 000000000..eddc4e26b --- /dev/null +++ b/src/ffi/io.rs @@ -0,0 +1,626 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in + +use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex, OnceLock}; + +use crate::io::utils::check_namespace_key_validity; +use crate::types::{DynStoreTrait, DynStoreWrapper, SyncAndAsyncKVStore}; + +#[derive(Debug)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Error))] +pub enum IOError { + NotFound, + PermissionDenied, + ConnectionRefused, + ConnectionReset, + ConnectionAborted, + NotConnected, + AddrInUse, + AddrNotAvailable, + BrokenPipe, + AlreadyExists, + WouldBlock, + InvalidInput, + InvalidData, + TimedOut, + WriteZero, + Interrupted, + UnexpectedEof, + Other, +} + +impl From for IOError { + fn from(error: bitcoin::io::Error) -> Self { + match error.kind() { + bitcoin::io::ErrorKind::NotFound => IOError::NotFound, + bitcoin::io::ErrorKind::PermissionDenied => IOError::PermissionDenied, + bitcoin::io::ErrorKind::ConnectionRefused => IOError::ConnectionRefused, + bitcoin::io::ErrorKind::ConnectionReset => IOError::ConnectionReset, + bitcoin::io::ErrorKind::ConnectionAborted => IOError::ConnectionAborted, + bitcoin::io::ErrorKind::NotConnected => IOError::NotConnected, + bitcoin::io::ErrorKind::AddrInUse => IOError::AddrInUse, + bitcoin::io::ErrorKind::AddrNotAvailable => IOError::AddrNotAvailable, + bitcoin::io::ErrorKind::BrokenPipe => IOError::BrokenPipe, + bitcoin::io::ErrorKind::AlreadyExists => IOError::AlreadyExists, + bitcoin::io::ErrorKind::WouldBlock => IOError::WouldBlock, + bitcoin::io::ErrorKind::InvalidInput => IOError::InvalidInput, + bitcoin::io::ErrorKind::InvalidData => IOError::InvalidData, + bitcoin::io::ErrorKind::TimedOut => IOError::TimedOut, + bitcoin::io::ErrorKind::WriteZero => IOError::WriteZero, + bitcoin::io::ErrorKind::Interrupted => IOError::Interrupted, + bitcoin::io::ErrorKind::UnexpectedEof => IOError::UnexpectedEof, + bitcoin::io::ErrorKind::Other => IOError::Other, + } + } +} + +impl From for bitcoin::io::Error { + fn from(error: IOError) -> Self { + match error { + IOError::NotFound => bitcoin::io::ErrorKind::NotFound.into(), + IOError::PermissionDenied => bitcoin::io::ErrorKind::PermissionDenied.into(), + IOError::ConnectionRefused => bitcoin::io::ErrorKind::ConnectionRefused.into(), + IOError::ConnectionReset => bitcoin::io::ErrorKind::ConnectionReset.into(), + IOError::ConnectionAborted => bitcoin::io::ErrorKind::ConnectionAborted.into(), + IOError::NotConnected => bitcoin::io::ErrorKind::NotConnected.into(), + IOError::AddrInUse => bitcoin::io::ErrorKind::AddrInUse.into(), + IOError::AddrNotAvailable => bitcoin::io::ErrorKind::AddrNotAvailable.into(), + IOError::BrokenPipe => bitcoin::io::ErrorKind::BrokenPipe.into(), + IOError::AlreadyExists => bitcoin::io::ErrorKind::AlreadyExists.into(), + IOError::WouldBlock => bitcoin::io::ErrorKind::WouldBlock.into(), + IOError::InvalidInput => bitcoin::io::ErrorKind::InvalidInput.into(), + IOError::InvalidData => bitcoin::io::ErrorKind::InvalidData.into(), + IOError::TimedOut => bitcoin::io::ErrorKind::TimedOut.into(), + IOError::WriteZero => bitcoin::io::ErrorKind::WriteZero.into(), + IOError::Interrupted => bitcoin::io::ErrorKind::Interrupted.into(), + IOError::UnexpectedEof => bitcoin::io::ErrorKind::UnexpectedEof.into(), + IOError::Other => bitcoin::io::ErrorKind::Other.into(), + } + } +} + +impl std::fmt::Display for IOError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + IOError::NotFound => write!(f, "NotFound"), + IOError::PermissionDenied => write!(f, "PermissionDenied"), + IOError::ConnectionRefused => write!(f, "ConnectionRefused"), + IOError::ConnectionReset => write!(f, "ConnectionReset"), + IOError::ConnectionAborted => write!(f, "ConnectionAborted"), + IOError::NotConnected => write!(f, "NotConnected"), + IOError::AddrInUse => write!(f, "AddrInUse"), + IOError::AddrNotAvailable => write!(f, "AddrNotAvailable"), + IOError::BrokenPipe => write!(f, "BrokenPipe"), + IOError::AlreadyExists => write!(f, "AlreadyExists"), + IOError::WouldBlock => write!(f, "WouldBlock"), + IOError::InvalidInput => write!(f, "InvalidInput"), + IOError::InvalidData => write!(f, "InvalidData"), + IOError::TimedOut => write!(f, "TimedOut"), + IOError::WriteZero => write!(f, "WriteZero"), + IOError::Interrupted => write!(f, "Interrupted"), + IOError::UnexpectedEof => write!(f, "UnexpectedEof"), + IOError::Other => write!(f, "Other"), + } + } +} + +/// FFI-safe version of [`DynStoreTrait`]. +/// +/// Foreign implementations must provide both synchronous and asynchronous store +/// methods so they can be used across the Rust and language-binding surfaces. +/// +/// Internally, [`FfiDynStore`] treats the asynchronous write/remove path as the +/// canonical implementation for ordered mutations. Its synchronous methods are +/// bridged through that async path to preserve per-key write ordering guarantees +/// across both sync and async callers. +/// +/// As a result, Rust-side synchronous calls routed through [`FfiDynStore`] may +/// invoke the async methods of a foreign implementation rather than its sync +/// methods directly. +/// +/// Note: Foreign implementations are free to make either the sync or async methods +/// their primary implementation and have the other delegate to it. [`FfiDynStore`] +/// does not assume which side is primary, but its Rust-side sync bridge executes +/// through the async interface. +/// +/// [`DynStoreTrait`]: crate::types::DynStoreTrait +#[async_trait::async_trait] +pub trait FfiDynStoreTrait: Send + Sync { + async fn read_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> Result, IOError>; + async fn write_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> Result<(), IOError>; + async fn remove_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> Result<(), IOError>; + async fn list_async( + &self, primary_namespace: String, secondary_namespace: String, + ) -> Result, IOError>; + + fn read( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> Result, IOError>; + fn write( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> Result<(), IOError>; + fn remove( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> Result<(), IOError>; + fn list( + &self, primary_namespace: String, secondary_namespace: String, + ) -> Result, IOError>; +} + +/// Bridges a foreign [`FfiDynStoreTrait`] implementation into Rust's +/// [`DynStoreTrait`] while enforcing per-key write ordering. +/// +/// Ordered writes/removals are coordinated within [`FfiDynStore`] so the +/// underlying foreign store does not need to provide its own cross-call ordering +/// guarantees. +/// +/// The async mutation path is canonical. Sync operations are executed by running +/// the corresponding async operation on an internal runtime, which preserves the +/// same ordering guarantees for both sync and async callers. +#[derive(Clone, uniffi::Object)] +pub(crate) struct FfiDynStore { + inner: Arc, + next_write_version: Arc, +} + +#[uniffi::export] +impl FfiDynStore { + #[uniffi::constructor] + pub fn from_store(store: Arc) -> Self { + let inner = Arc::new(FfiDynStoreInner::new(store)); + + Self { inner, next_write_version: Arc::new(AtomicU64::new(1)) } + } +} + +impl FfiDynStore { + fn build_locking_key( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> String { + if primary_namespace.is_empty() { + key.to_owned() + } else { + format!("{}#{}#{}", primary_namespace, secondary_namespace, key) + } + } + + fn get_new_version_and_lock_ref( + &self, locking_key: String, + ) -> (Arc>, u64) { + let version = self.next_write_version.fetch_add(1, Ordering::Relaxed); + if version == u64::MAX { + panic!("FfiDynStore version counter overflowed"); + } + + let inner_lock_ref = self.inner.get_inner_lock_ref(locking_key); + + (inner_lock_ref, version) + } + + /// Runs a synchronous Rust-side store call via FfiDynStore's canonical async path. + /// + /// This keeps sync callers aligned with the same ordered mutation logic used by + /// async callers and avoids blocking directly on async coordination primitives. + /// + /// If already inside a Tokio runtime, the async operation is run from a + /// dedicated OS thread using a shared process-wide runtime. + fn run_sync_via_async(&self, fut: F) -> Result + where + T: Send + 'static, + F: Future> + Send + 'static, + { + let runtime = ffi_dynstore_runtime(); + + // If we are already inside a Tokio runtime, avoid blocking the current Tokio runtime + // thread directly and run the async operation on a dedicated OS thread. + if tokio::runtime::Handle::try_current().is_ok() { + std::thread::spawn(move || runtime.block_on(fut)).join().unwrap_or_else(|_| { + Err(bitcoin::io::Error::new( + bitcoin::io::ErrorKind::Other, + "FfiDynStore sync bridge thread panicked", + )) + }) + } else { + runtime.block_on(fut) + } + } +} + +impl DynStoreTrait for FfiDynStore { + fn read_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Pin, bitcoin::io::Error>> + Send + 'static>> { + let store = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + Box::pin(async move { + store + .read_internal_async(primary_namespace, secondary_namespace, key) + .await + .map_err(|e| e.into()) + }) + } + + fn write_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Pin> + Send + 'static>> { + let store = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + let locking_key = self.build_locking_key(&primary_namespace, &secondary_namespace, &key); + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); + + Box::pin(async move { + store + .write_internal_async( + inner_lock_ref, + locking_key, + version, + primary_namespace, + secondary_namespace, + key, + buf, + ) + .await + .map_err(|e| e.into()) + }) + } + + fn remove_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Pin> + Send + 'static>> { + let store = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + let locking_key = self.build_locking_key(&primary_namespace, &secondary_namespace, &key); + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); + + Box::pin(async move { + store + .remove_internal_async( + inner_lock_ref, + locking_key, + version, + primary_namespace, + secondary_namespace, + key, + lazy, + ) + .await + .map_err(|e| e.into()) + }) + } + + fn list_async( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Pin, bitcoin::io::Error>> + Send + 'static>> { + let store = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + + Box::pin(async move { + store + .list_internal_async(primary_namespace, secondary_namespace) + .await + .map_err(|e| e.into()) + }) + } + + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, bitcoin::io::Error> { + let store = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + self.run_sync_via_async(async move { + store + .read_internal_async(primary_namespace, secondary_namespace, key) + .await + .map_err(|e| e.into()) + }) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Result<(), bitcoin::io::Error> { + let store = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + let locking_key = self.build_locking_key(&primary_namespace, &secondary_namespace, &key); + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); + + self.run_sync_via_async(async move { + store + .write_internal_async( + inner_lock_ref, + locking_key, + version, + primary_namespace, + secondary_namespace, + key, + buf, + ) + .await + .map_err(|e| e.into()) + }) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Result<(), bitcoin::io::Error> { + let store = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + let locking_key = self.build_locking_key(&primary_namespace, &secondary_namespace, &key); + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); + + self.run_sync_via_async(async move { + store + .remove_internal_async( + inner_lock_ref, + locking_key, + version, + primary_namespace, + secondary_namespace, + key, + lazy, + ) + .await + .map_err(|e| e.into()) + }) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, bitcoin::io::Error> { + let store = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + + self.run_sync_via_async(async move { + store + .list_internal_async(primary_namespace, secondary_namespace) + .await + .map_err(|e| e.into()) + }) + } +} + +struct FfiDynStoreInner { + ffi_store: Arc, + write_version_locks: Mutex>>>, +} + +impl FfiDynStoreInner { + fn new(ffi_store: Arc) -> Self { + Self { ffi_store, write_version_locks: Mutex::new(HashMap::new()) } + } + + fn get_inner_lock_ref(&self, locking_key: String) -> Arc> { + let mut outer_lock = self.write_version_locks.lock().unwrap(); + Arc::clone(&outer_lock.entry(locking_key).or_default()) + } + + async fn read_internal_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> bitcoin::io::Result> { + check_namespace_key_validity(&primary_namespace, &secondary_namespace, Some(&key), "read")?; + self.ffi_store + .read_async(primary_namespace, secondary_namespace, key) + .await + .map_err(|e| e.into()) + } + + async fn write_internal_async( + &self, inner_lock_ref: Arc>, locking_key: String, version: u64, + primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> bitcoin::io::Result<()> { + check_namespace_key_validity( + &primary_namespace, + &secondary_namespace, + Some(&key), + "write", + )?; + + let store = Arc::clone(&self.ffi_store); + + self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { + store + .write_async(primary_namespace, secondary_namespace, key, buf) + .await + .map_err(|e| >::into(e))?; + + Ok(()) + }) + .await + } + + async fn remove_internal_async( + &self, inner_lock_ref: Arc>, locking_key: String, version: u64, + primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> bitcoin::io::Result<()> { + check_namespace_key_validity( + &primary_namespace, + &secondary_namespace, + Some(&key), + "remove", + )?; + + let store = Arc::clone(&self.ffi_store); + + self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { + store + .remove_async(primary_namespace, secondary_namespace, key, lazy) + .await + .map_err(|e| >::into(e))?; + + Ok(()) + }) + .await + } + + async fn list_internal_async( + &self, primary_namespace: String, secondary_namespace: String, + ) -> bitcoin::io::Result> { + check_namespace_key_validity(&primary_namespace, &secondary_namespace, None, "list")?; + self.ffi_store + .list_async(primary_namespace, secondary_namespace) + .await + .map_err(|e| e.into()) + } + + async fn execute_locked_write< + F: Future>, + FN: FnOnce() -> F, + >( + &self, inner_lock_ref: Arc>, locking_key: String, version: u64, + callback: FN, + ) -> Result<(), bitcoin::io::Error> { + let res = { + let mut last_written_version = inner_lock_ref.lock().await; + + // Check if we already have a newer version written/removed. This is used in async contexts to realize eventual + // consistency. + let is_stale_version = version <= *last_written_version; + + // If the version is not stale, we execute the callback. Otherwise we can and must skip writing. + if is_stale_version { + Ok(()) + } else { + callback().await.map(|_| { + *last_written_version = version; + }) + } + }; + + self.clean_locks(&inner_lock_ref, locking_key); + + res + } + + fn clean_locks(&self, inner_lock_ref: &Arc>, locking_key: String) { + // If there no arcs in use elsewhere, this means that there are no in-flight writes. We can remove the map entry + // to prevent leaking memory. The two arcs that are expected are the one in the map and the one held here in + // inner_lock_ref. The outer lock is obtained first, to avoid a new arc being cloned after we've already + // counted. + let mut outer_lock = self.write_version_locks.lock().unwrap(); + + let strong_count = Arc::strong_count(&inner_lock_ref); + debug_assert!(strong_count >= 2, "Unexpected FfiDynStore strong count"); + + if strong_count == 2 { + outer_lock.remove(&locking_key); + } + } +} + +// We do this to allow native Rust stores to satisfy the UniFFI-facing store trait +// in tests and internal bridge code. This keeps `DynStoreWrapper` as the single +// adapter for Rust `KVStore`/`KVStoreSync` implementations while letting the UniFFI +// builder paths accept `Arc` without requiring a public +// `FfiDynStore` wrapper at call sites. +#[async_trait::async_trait] +impl FfiDynStoreTrait for DynStoreWrapper { + async fn read_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> Result, IOError> { + DynStoreTrait::read_async(self, &primary_namespace, &secondary_namespace, &key) + .await + .map_err(IOError::from) + } + + async fn write_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> Result<(), IOError> { + DynStoreTrait::write_async(self, &primary_namespace, &secondary_namespace, &key, buf) + .await + .map_err(IOError::from) + } + + async fn remove_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> Result<(), IOError> { + DynStoreTrait::remove_async(self, &primary_namespace, &secondary_namespace, &key, lazy) + .await + .map_err(IOError::from) + } + + async fn list_async( + &self, primary_namespace: String, secondary_namespace: String, + ) -> Result, IOError> { + DynStoreTrait::list_async(self, &primary_namespace, &secondary_namespace) + .await + .map_err(IOError::from) + } + + fn read( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> Result, IOError> { + DynStoreTrait::read(self, &primary_namespace, &secondary_namespace, &key) + .map_err(IOError::from) + } + + fn write( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> Result<(), IOError> { + DynStoreTrait::write(self, &primary_namespace, &secondary_namespace, &key, buf) + .map_err(IOError::from) + } + + fn remove( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> Result<(), IOError> { + DynStoreTrait::remove(self, &primary_namespace, &secondary_namespace, &key, lazy) + .map_err(IOError::from) + } + + fn list( + &self, primary_namespace: String, secondary_namespace: String, + ) -> Result, IOError> { + DynStoreTrait::list(self, &primary_namespace, &secondary_namespace).map_err(IOError::from) + } +} + +/// Returns the process-wide runtime used to bridge Rust-side synchronous +/// `FfiDynStore` calls through the canonical async store path. +fn ffi_dynstore_runtime() -> &'static tokio::runtime::Runtime { + static RUNTIME: OnceLock = OnceLock::new(); + + RUNTIME.get_or_init(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(2) + .max_blocking_threads(2) + .build() + .expect("Failed to build FfiDynStore runtime") + }) +} diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index 32464d044..627667175 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -4,6 +4,10 @@ // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license , at your option. You may not use this file except in +#[cfg(feature = "uniffi")] +mod io; +#[cfg(feature = "uniffi")] +pub use io::*; #[cfg(feature = "uniffi")] mod types; diff --git a/src/ffi/types.rs b/src/ffi/types.rs index 6fe95a2b3..2bd207307 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -51,6 +51,14 @@ use vss_client::headers::{ VssHeaderProviderError as VssClientHeaderProviderError, }; +use crate::builder::sanitize_alias; +pub use crate::config::{default_config, ElectrumSyncConfig, EsploraSyncConfig, TorConfig}; +pub use crate::entropy::{generate_entropy_mnemonic, NodeEntropy, WordCount}; +use crate::error::Error; +pub use crate::liquidity::LSPS1OrderStatus; +pub use crate::logger::{LogLevel, LogRecord, LogWriter}; +use crate::{hex_utils, SocketAddress, UserChannelId}; + /// Errors around providing headers for each VSS request. #[derive(Debug, uniffi::Error)] pub enum VssHeaderProviderError { @@ -142,14 +150,6 @@ impl VssClientHeaderProvider for VssHeaderProviderAdapter { } } -use crate::builder::sanitize_alias; -pub use crate::config::{default_config, ElectrumSyncConfig, EsploraSyncConfig, TorConfig}; -pub use crate::entropy::{generate_entropy_mnemonic, NodeEntropy, WordCount}; -use crate::error::Error; -pub use crate::liquidity::LSPS1OrderStatus; -pub use crate::logger::{LogLevel, LogRecord, LogWriter}; -use crate::{hex_utils, SocketAddress, UserChannelId}; - uniffi::custom_type!(PublicKey, String, { remote, try_lift: |val| { diff --git a/src/io/mod.rs b/src/io/mod.rs index e080d39f7..bf6366c45 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -10,6 +10,7 @@ pub mod sqlite_store; #[cfg(test)] pub(crate) mod test_utils; +pub(crate) mod tier_store; pub(crate) mod utils; pub mod vss_store; diff --git a/src/io/tier_store.rs b/src/io/tier_store.rs new file mode 100644 index 000000000..70e9eb883 --- /dev/null +++ b/src/io/tier_store.rs @@ -0,0 +1,907 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. +#![allow(dead_code)] // TODO: Temporal warning silencer. Will be removed in later commit. + +use crate::io::utils::check_namespace_key_validity; +use crate::logger::{LdkLogger, Logger}; +use crate::types::DynStore; + +use lightning::util::persist::{ + KVStore, KVStoreSync, NETWORK_GRAPH_PERSISTENCE_KEY, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, +}; +use lightning::{io, log_error}; + +use std::future::Future; +use std::sync::Arc; + +/// A 3-tiered [`KVStoreSync`] implementation that routes data across storage +/// backends that may be local or remote: +/// - a primary store for durable, authoritative persistence, +/// - an optional backup store that maintains an additional durable copy of +/// primary-backed data, and +/// - an optional ephemeral store for non-critical, rebuildable cached data. +/// +/// When a backup store is configured, writes and removals for primary-backed data +/// are issued to the primary and backup stores concurrently and only succeed once +/// both stores complete successfully. +/// +/// Reads and lists do not consult the backup store during normal operation. +/// Ephemeral data is read from and written to the ephemeral store when configured. +/// +/// Note that dual-store writes and removals are not atomic across the primary and +/// backup stores. If one store succeeds and the other fails, the operation +/// returns an error even though one store may already reflect the change. +pub(crate) struct TierStore { + inner: Arc, + logger: Arc, +} + +impl TierStore { + pub fn new(primary_store: Arc, logger: Arc) -> Self { + let inner = Arc::new(TierStoreInner::new(primary_store, Arc::clone(&logger))); + + Self { inner, logger } + } + + /// Configures a backup store for primary-backed data. + /// + /// Once set, writes and removals targeting the primary tier succeed only if both + /// the primary and backup stores succeed. The two operations are issued + /// concurrently, and any failure is returned to the caller. + /// + /// Note: dual-store writes/removals are not atomic. An error may be returned + /// after the primary store has already been updated if the backup store fails. + /// + /// The backup store is not consulted for normal reads or lists. + pub fn set_backup_store(&mut self, backup: Arc) { + debug_assert_eq!(Arc::strong_count(&self.inner), 1); + + let inner = Arc::get_mut(&mut self.inner).expect( + "TierStore should not be shared during configuration. No other references should exist", + ); + + inner.backup_store = Some(backup); + } + + /// Configures the ephemeral store for non-critical, rebuildable data. + /// + /// When configured, selected cache-like data is routed to this store instead of + /// the primary store. + pub fn set_ephemeral_store(&mut self, ephemeral: Arc) { + debug_assert_eq!(Arc::strong_count(&self.inner), 1); + + let inner = Arc::get_mut(&mut self.inner).expect( + "TierStore should not be shared during configuration. No other references should exist", + ); + + inner.ephemeral_store = Some(ephemeral); + } +} + +impl KVStore for TierStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, io::Error>> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.read_internal(primary_namespace, secondary_namespace, key).await } + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.write_internal(primary_namespace, secondary_namespace, key, buf).await } + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> impl Future> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.remove_internal(primary_namespace, secondary_namespace, key, lazy).await } + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> impl Future, io::Error>> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + + async move { inner.list_internal(primary_namespace, secondary_namespace).await } + } +} + +impl KVStoreSync for TierStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + self.inner.read_internal_sync( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + ) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + self.inner.write_internal_sync( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + buf, + ) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + self.inner.remove_internal_sync( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + lazy, + ) + } + + fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { + self.inner + .list_internal_sync(primary_namespace.to_string(), secondary_namespace.to_string()) + } +} + +struct TierStoreInner { + /// The authoritative store for durable data. + primary_store: Arc, + /// The store used for non-critical, rebuildable cached data. + ephemeral_store: Option>, + /// An optional second durable store for primary-backed data. + backup_store: Option>, + logger: Arc, +} + +impl TierStoreInner { + /// Creates a tier store with the primary data store. + pub fn new(primary_store: Arc, logger: Arc) -> Self { + Self { primary_store, ephemeral_store: None, backup_store: None, logger } + } + + /// Reads from the primary data store. + async fn read_primary( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + match KVStore::read( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + ) + .await + { + Ok(data) => Ok(data), + Err(e) => { + log_error!( + self.logger, + "Failed to read from primary store for key {}/{}/{}: {}.", + primary_namespace, + secondary_namespace, + key, + e + ); + Err(e) + }, + } + } + + fn read_primary_sync( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + match KVStoreSync::read( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + ) { + Ok(data) => Ok(data), + Err(e) => { + log_error!( + self.logger, + "Failed to read from primary store for key {}/{}/{}: {}.", + primary_namespace, + secondary_namespace, + key, + e + ); + Err(e) + }, + } + } + + /// Lists keys from the primary data store. + async fn list_primary( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> io::Result> { + match KVStore::list(self.primary_store.as_ref(), primary_namespace, secondary_namespace) + .await + { + Ok(keys) => Ok(keys), + Err(e) => { + log_error!( + self.logger, + "Failed to list from primary store for namespace {}/{}: {}.", + primary_namespace, + secondary_namespace, + e + ); + Err(e) + }, + } + } + + fn list_primary_sync( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> io::Result> { + match KVStoreSync::list(self.primary_store.as_ref(), primary_namespace, secondary_namespace) + { + Ok(keys) => Ok(keys), + Err(e) => { + log_error!( + self.logger, + "Failed to list keys in namespace {}/{} from primary store: {}.", + primary_namespace, + secondary_namespace, + e + ); + Err(e) + }, + } + } + + async fn write_primary_backup_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + let primary_fut = KVStore::write( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf.clone(), + ); + + if let Some(backup_store) = self.backup_store.as_ref() { + let backup_fut = KVStore::write( + backup_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf, + ); + + let (primary_res, backup_res) = tokio::join!(primary_fut, backup_fut); + + self.handle_primary_backup_results( + "write", + primary_namespace, + secondary_namespace, + key, + primary_res, + backup_res, + ) + } else { + primary_fut.await + } + } + + fn write_primary_backup_sync( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + if let Some(backup_store) = self.backup_store.as_ref() { + let primary_res = KVStoreSync::write( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf.clone(), + ); + let backup_res = KVStoreSync::write( + backup_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf, + ); + + self.handle_primary_backup_results( + "write", + primary_namespace, + secondary_namespace, + key, + primary_res, + backup_res, + ) + } else { + KVStoreSync::write( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf, + ) + } + } + + async fn remove_primary_backup_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + let primary_fut = KVStore::remove( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ); + + if let Some(backup_store) = self.backup_store.as_ref() { + let backup_fut = KVStore::remove( + backup_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ); + + let (primary_res, backup_res) = tokio::join!(primary_fut, backup_fut); + + self.handle_primary_backup_results( + "removal", + primary_namespace, + secondary_namespace, + key, + primary_res, + backup_res, + ) + } else { + primary_fut.await + } + } + + fn remove_primary_backup_sync( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + if let Some(backup_store) = self.backup_store.as_ref() { + let primary_res = KVStoreSync::remove( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ); + let backup_res = KVStoreSync::remove( + backup_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ); + + self.handle_primary_backup_results( + "removal", + primary_namespace, + secondary_namespace, + key, + primary_res, + backup_res, + ) + } else { + KVStoreSync::remove( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ) + } + } + + async fn read_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "read", + )?; + + if let Some(eph_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + // We don't retry ephemeral-store reads here. Local failures are treated as + // terminal for this access path rather than falling back to another store. + KVStore::read(eph_store.as_ref(), &primary_namespace, &secondary_namespace, &key).await + } else { + self.read_primary(&primary_namespace, &secondary_namespace, &key).await + } + } + + fn read_internal_sync( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "read", + )?; + + if let Some(eph_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + KVStoreSync::read(eph_store.as_ref(), &primary_namespace, &secondary_namespace, &key) + } else { + self.read_primary_sync(&primary_namespace, &secondary_namespace, &key) + } + } + + async fn write_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> io::Result<()> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "write", + )?; + + if let Some(eph_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + KVStore::write( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await + } else { + self.write_primary_backup_async( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await + } + } + + fn write_internal_sync( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> io::Result<()> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "write", + )?; + + if let Some(ephemeral_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + KVStoreSync::write( + ephemeral_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + } else { + self.write_primary_backup_sync( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + } + } + + async fn remove_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> io::Result<()> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "remove", + )?; + + if let Some(eph_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + KVStore::remove( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await + } else { + self.remove_primary_backup_async( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await + } + } + + fn remove_internal_sync( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> io::Result<()> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "remove", + )?; + + if let Some(ephemeral_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + KVStoreSync::remove( + ephemeral_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + } else { + self.remove_primary_backup_sync( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + } + } + + async fn list_internal( + &self, primary_namespace: String, secondary_namespace: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + None, + "list", + )?; + + match (primary_namespace.as_str(), secondary_namespace.as_str()) { + ( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + ) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _) => { + if let Some(eph_store) = self.ephemeral_store.as_ref() { + // We don't retry ephemeral-store lists here. Local failures are treated as + // terminal for this access path rather than falling back to another store. + KVStore::list(eph_store.as_ref(), &primary_namespace, &secondary_namespace) + .await + } else { + self.list_primary(&primary_namespace, &secondary_namespace).await + } + }, + _ => self.list_primary(&primary_namespace, &secondary_namespace).await, + } + } + + fn list_internal_sync( + &self, primary_namespace: String, secondary_namespace: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + None, + "list", + )?; + + match (primary_namespace.as_str(), secondary_namespace.as_str()) { + ( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + ) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _) => { + if let Some(ephemeral_store) = self.ephemeral_store.as_ref() { + KVStoreSync::list( + ephemeral_store.as_ref(), + &primary_namespace, + &secondary_namespace, + ) + } else { + self.list_primary_sync(&primary_namespace, &secondary_namespace) + } + }, + _ => self.list_primary_sync(&primary_namespace, &secondary_namespace), + } + } + + fn ephemeral_store( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Option<&Arc> { + self.ephemeral_store + .as_ref() + .filter(|_s| is_ephemeral_cached_key(primary_namespace, secondary_namespace, key)) + } + + fn handle_primary_backup_results( + &self, op: &str, primary_namespace: &str, secondary_namespace: &str, key: &str, + primary_res: io::Result<()>, backup_res: io::Result<()>, + ) -> io::Result<()> { + match (primary_res, backup_res) { + (Ok(()), Ok(())) => Ok(()), + (Err(primary_err), Ok(())) => Err(primary_err), + (Ok(()), Err(backup_err)) => Err(backup_err), + (Err(primary_err), Err(backup_err)) => { + log_error!( + self.logger, + "Primary and backup {}s both failed for key {}/{}/{}: primary={}, backup={}", + op, + primary_namespace, + secondary_namespace, + key, + primary_err, + backup_err + ); + Err(primary_err) + }, + } + } +} + +fn is_ephemeral_cached_key(pn: &str, sn: &str, key: &str) -> bool { + matches!( + (pn, sn, key), + (NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY) + ) +} + +#[cfg(test)] +mod tests { + use std::panic::RefUnwindSafe; + use std::path::PathBuf; + use std::sync::Arc; + + use lightning::util::logger::Level; + use lightning::util::persist::{ + CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + }; + use lightning_persister::fs_store::v1::FilesystemStore; + + use crate::io::test_utils::{do_read_write_remove_list_persist, random_storage_path}; + use crate::io::tier_store::TierStore; + use crate::logger::Logger; + use crate::types::DynStore; + use crate::types::DynStoreWrapper; + + use super::*; + + impl RefUnwindSafe for TierStore {} + + struct CleanupDir(PathBuf); + impl Drop for CleanupDir { + fn drop(&mut self) { + let _ = std::fs::remove_dir_all(&self.0); + } + } + + fn setup_tier_store(primary_store: Arc, logger: Arc) -> TierStore { + TierStore::new(primary_store, logger) + } + + #[test] + fn write_read_list_remove() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let tier = setup_tier_store(primary_store, logger); + + do_read_write_remove_list_persist(&tier); + } + + #[test] + fn ephemeral_routing() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let mut tier = setup_tier_store(Arc::clone(&primary_store), logger); + + let ephemeral_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("ephemeral")))); + tier.set_ephemeral_store(Arc::clone(&ephemeral_store)); + + let data = vec![42u8; 32]; + + KVStoreSync::write( + &tier, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + data.clone(), + ) + .unwrap(); + + KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + data.clone(), + ) + .unwrap(); + + let primary_read_ng = KVStoreSync::read( + &*primary_store, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ); + let ephemeral_read_ng = KVStoreSync::read( + &*ephemeral_store, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ); + + let primary_read_cm = KVStoreSync::read( + &*primary_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + let ephemeral_read_cm = KVStoreSync::read( + &*ephemeral_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + + assert!(primary_read_ng.is_err()); + assert_eq!(ephemeral_read_ng.unwrap(), data); + + assert!(ephemeral_read_cm.is_err()); + assert_eq!(primary_read_cm.unwrap(), data); + } + + #[test] + fn backup_write_is_part_of_success_path() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let mut tier = setup_tier_store(Arc::clone(&primary_store), logger); + + let backup_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("backup")))); + tier.set_backup_store(Arc::clone(&backup_store)); + + let data = vec![42u8; 32]; + + KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + data.clone(), + ) + .unwrap(); + + let primary_read = KVStoreSync::read( + &*primary_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + let backup_read = KVStoreSync::read( + &*backup_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + + assert_eq!(primary_read.unwrap(), data); + assert_eq!(backup_read.unwrap(), data); + } + + #[test] + fn backup_remove_is_part_of_success_path() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let mut tier = setup_tier_store(Arc::clone(&primary_store), logger); + + let backup_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("backup")))); + tier.set_backup_store(Arc::clone(&backup_store)); + + let data = vec![42u8; 32]; + let key = CHANNEL_MANAGER_PERSISTENCE_KEY; + + KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + data, + ) + .unwrap(); + + KVStoreSync::remove( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + true, + ) + .unwrap(); + + let primary_read = KVStoreSync::read( + &*primary_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + ); + let backup_read = KVStoreSync::read( + &*backup_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + ); + + assert!(primary_read.is_err()); + assert!(backup_read.is_err()); + } +} diff --git a/src/lib.rs b/src/lib.rs index dbaffae1c..761471b82 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -141,6 +141,8 @@ use event::{EventHandler, EventQueue}; use fee_estimator::{ConfirmationTarget, FeeEstimator, OnchainFeeEstimator}; #[cfg(feature = "uniffi")] use ffi::*; +#[cfg(feature = "uniffi")] +pub use ffi::{FfiDynStoreTrait, IOError}; use gossip::GossipSource; use graph::NetworkGraph; use io::utils::write_node_metrics; @@ -173,11 +175,13 @@ use peer_store::{PeerInfo, PeerStore}; use runtime::Runtime; pub use tokio; use types::{ - Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph, - HRNResolver, KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, - Wallet, + Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, Graph, HRNResolver, + KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, Wallet, +}; +pub use types::{ + ChannelDetails, CustomTlvRecord, DynStore, DynStoreWrapper, PeerDetails, SyncAndAsyncKVStore, + UserChannelId, }; -pub use types::{ChannelDetails, CustomTlvRecord, PeerDetails, SyncAndAsyncKVStore, UserChannelId}; pub use vss_client; use crate::scoring::setup_background_pathfinding_scores_sync; diff --git a/src/types.rs b/src/types.rs index a36639808..768272379 100644 --- a/src/types.rs +++ b/src/types.rs @@ -51,29 +51,37 @@ where { } -pub(crate) trait DynStoreTrait: Send + Sync { +/// Object-safe store trait supporting both async and sync operations. +pub trait DynStoreTrait: Send + Sync { + /// Asynchronously reads the value for the given namespace and key. fn read_async( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> Pin, bitcoin::io::Error>> + Send + 'static>>; + /// Asynchronously writes the value for the given namespace and key. fn write_async( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> Pin> + Send + 'static>>; + /// Asynchronously removes the value for the given namespace and key. fn remove_async( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> Pin> + Send + 'static>>; + /// Asynchronously lists keys under the given namespace. fn list_async( &self, primary_namespace: &str, secondary_namespace: &str, ) -> Pin, bitcoin::io::Error>> + Send + 'static>>; - + /// Synchronously reads the value for the given namespace and key. fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> Result, bitcoin::io::Error>; + /// Synchronously writes the value for the given namespace and key. fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> Result<(), bitcoin::io::Error>; + /// Synchronously removes the value for the given namespace and key. fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> Result<(), bitcoin::io::Error>; + /// Synchronously lists keys under the given namespace. fn list( &self, primary_namespace: &str, secondary_namespace: &str, ) -> Result, bitcoin::io::Error>; @@ -131,12 +139,13 @@ impl<'a> KVStoreSync for dyn DynStoreTrait + 'a { } } -pub(crate) type DynStore = dyn DynStoreTrait; +/// Type alias for any store that implements DynStoreTrait. +pub type DynStore = dyn DynStoreTrait; -// Newtype wrapper that implements `KVStore` for `Arc`. This is needed because `KVStore` -// methods return `impl Future`, which is not object-safe. `DynStoreTrait` works around this by -// returning `Pin>` instead, and this wrapper bridges the two by delegating -// `KVStore` methods to the corresponding `DynStoreTrait::*_async` methods. +/// Newtype wrapper that implements `KVStore` for `Arc`. This is needed because `KVStore` +/// methods return `impl Future`, which is not object-safe. `DynStoreTrait` works around this by +/// returning `Pin>` instead, and this wrapper bridges the two by delegating +/// `KVStore` methods to the corresponding `DynStoreTrait::*_async` methods. #[derive(Clone)] pub(crate) struct DynStoreRef(pub(crate) Arc); @@ -166,7 +175,8 @@ impl KVStore for DynStoreRef { } } -pub(crate) struct DynStoreWrapper(pub(crate) T); +/// Wrapper that adapts a concrete `SyncAndAsyncKVStore` to `DynStoreTrait`. +pub struct DynStoreWrapper(pub T); impl DynStoreTrait for DynStoreWrapper { fn read_async( diff --git a/tests/common/mod.rs b/tests/common/mod.rs index a44aee174..38115a99b 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -326,10 +326,23 @@ pub(crate) enum TestChainSource<'a> { BitcoindRestSync(&'a BitcoinD), } -#[derive(Clone, Copy)] +#[cfg(feature = "uniffi")] +use ldk_node::FfiDynStoreTrait; + +#[cfg(feature = "uniffi")] +type TestDynStore = Arc; +#[cfg(not(feature = "uniffi"))] +type TestDynStore = TestSyncStore; + +#[derive(Clone)] pub(crate) enum TestStoreType { TestSyncStore, Sqlite, + TierStore { + primary: TestDynStore, + backup: Option, + ephemeral: Option, + }, } impl Default for TestStoreType { @@ -380,6 +393,27 @@ macro_rules! setup_builder { pub(crate) use setup_builder; +pub(crate) fn create_tier_stores(base_path: PathBuf) -> (TestDynStore, TestDynStore, TestDynStore) { + let primary = TestSyncStore::new(base_path.join("primary")); + let backup = TestSyncStore::new(base_path.join("backup")); + let ephemeral = TestSyncStore::new(base_path.join("ephemeral")); + + #[cfg(feature = "uniffi")] + { + use ldk_node::DynStoreWrapper; + + ( + Arc::new(DynStoreWrapper(primary)) as Arc, + Arc::new(DynStoreWrapper(backup)) as Arc, + Arc::new(DynStoreWrapper(ephemeral)) as Arc, + ) + } + #[cfg(not(feature = "uniffi"))] + { + (primary, backup, ephemeral) + } +} + pub(crate) fn setup_two_nodes( chain_source: &TestChainSource, allow_0conf: bool, anchor_channels: bool, anchors_trusted_no_reserve: bool, @@ -390,21 +424,22 @@ pub(crate) fn setup_two_nodes( anchor_channels, anchors_trusted_no_reserve, TestStoreType::TestSyncStore, + TestStoreType::TestSyncStore, ) } pub(crate) fn setup_two_nodes_with_store( chain_source: &TestChainSource, allow_0conf: bool, anchor_channels: bool, - anchors_trusted_no_reserve: bool, store_type: TestStoreType, + anchors_trusted_no_reserve: bool, store_type_a: TestStoreType, store_type_b: TestStoreType, ) -> (TestNode, TestNode) { println!("== Node A =="); let mut config_a = random_config(anchor_channels); - config_a.store_type = store_type; + config_a.store_type = store_type_a; let node_a = setup_node(chain_source, config_a); println!("\n== Node B =="); let mut config_b = random_config(anchor_channels); - config_b.store_type = store_type; + config_b.store_type = store_type_b; if allow_0conf { config_b.node_config.trusted_peers_0conf.push(node_a.node_id()); } @@ -484,9 +519,53 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> let node = match config.store_type { TestStoreType::TestSyncStore => { let kv_store = TestSyncStore::new(config.node_config.storage_dir_path.into()); - builder.build_with_store(config.node_entropy.into(), kv_store).unwrap() + #[cfg(feature = "uniffi")] + { + use ldk_node::DynStoreWrapper; + let kv_store: Arc = Arc::new(DynStoreWrapper(kv_store)); + + builder.build_with_store(config.node_entropy.into(), kv_store).unwrap() + } + #[cfg(not(feature = "uniffi"))] + { + builder.build_with_store(config.node_entropy, kv_store).unwrap() + } }, TestStoreType::Sqlite => builder.build(config.node_entropy.into()).unwrap(), + TestStoreType::TierStore { primary, backup, ephemeral } => { + if let Some(backup) = backup { + #[cfg(feature = "uniffi")] + { + builder.set_backup_store(backup); + } + #[cfg(not(feature = "uniffi"))] + { + use ldk_node::{DynStore, DynStoreWrapper}; + let store: Arc = Arc::new(DynStoreWrapper(backup)); + builder.set_backup_store(store); + } + } + if let Some(ephemeral) = ephemeral { + #[cfg(feature = "uniffi")] + { + builder.set_ephemeral_store(ephemeral); + } + #[cfg(not(feature = "uniffi"))] + { + use ldk_node::{DynStore, DynStoreWrapper}; + let store: Arc = Arc::new(DynStoreWrapper(ephemeral)); + builder.set_ephemeral_store(store); + } + } + #[cfg(feature = "uniffi")] + { + builder.build_with_store(config.node_entropy.into(), primary).unwrap() + } + #[cfg(not(feature = "uniffi"))] + { + builder.build_with_store(config.node_entropy, primary).unwrap() + } + }, }; if config.recovery_mode { @@ -1709,3 +1788,36 @@ impl TestSyncStoreInner { self.do_list(primary_namespace, secondary_namespace) } } + +pub fn test_kv_read( + store: &TestDynStore, primary_ns: &str, secondary_ns: &str, key: &str, +) -> Result, bitcoin::io::Error> { + #[cfg(feature = "uniffi")] + { + ldk_node::FfiDynStoreTrait::read( + &**store, + primary_ns.to_string(), + secondary_ns.to_string(), + key.to_string(), + ) + .map_err(Into::into) + } + #[cfg(not(feature = "uniffi"))] + { + KVStoreSync::read(store, primary_ns, secondary_ns, key) + } +} + +pub fn test_kv_list( + store: &TestDynStore, primary_ns: &str, secondary_ns: &str, +) -> Result, bitcoin::io::Error> { + #[cfg(feature = "uniffi")] + { + ldk_node::FfiDynStoreTrait::list(&**store, primary_ns.to_string(), secondary_ns.to_string()) + .map_err(Into::into) + } + #[cfg(not(feature = "uniffi"))] + { + KVStoreSync::list(store, primary_ns, secondary_ns) + } +} diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 9d1f99bed..28fbc86bf 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -17,14 +17,15 @@ use bitcoin::hashes::Hash; use bitcoin::{Address, Amount, ScriptBuf, Txid}; use common::logging::{init_log_logger, validate_log_entry, MultiNodeLogger, TestLogWriter}; use common::{ - bump_fee_and_broadcast, distribute_funds_unconfirmed, do_channel_full_cycle, - expect_channel_pending_event, expect_channel_ready_event, expect_channel_ready_events, - expect_event, expect_payment_claimable_event, expect_payment_received_event, - expect_payment_successful_event, expect_splice_pending_event, generate_blocks_and_wait, - generate_listening_addresses, open_channel, open_channel_push_amt, open_channel_with_all, - premine_and_distribute_funds, premine_blocks, prepare_rbf, random_chain_source, random_config, - setup_bitcoind_and_electrsd, setup_builder, setup_node, setup_two_nodes, splice_in_with_all, - wait_for_tx, TestChainSource, TestStoreType, TestSyncStore, + bump_fee_and_broadcast, create_tier_stores, distribute_funds_unconfirmed, + do_channel_full_cycle, expect_channel_pending_event, expect_channel_ready_event, + expect_channel_ready_events, expect_event, expect_payment_claimable_event, + expect_payment_received_event, expect_payment_successful_event, expect_splice_pending_event, + generate_blocks_and_wait, generate_listening_addresses, open_channel, open_channel_push_amt, + open_channel_with_all, premine_and_distribute_funds, premine_blocks, prepare_rbf, + random_chain_source, random_config, random_storage_path, setup_bitcoind_and_electrsd, + setup_builder, setup_node, setup_two_nodes, setup_two_nodes_with_store, splice_in_with_all, + test_kv_list, test_kv_read, wait_for_tx, TestChainSource, TestStoreType, TestSyncStore, }; use electrsd::corepc_node::Node as BitcoinD; use electrsd::ElectrsD; @@ -39,6 +40,11 @@ use ldk_node::{Builder, Event, NodeError}; use lightning::ln::channelmanager::PaymentId; use lightning::routing::gossip::{NodeAlias, NodeId}; use lightning::routing::router::RouteParametersConfig; +use lightning::util::persist::{ + CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, +}; use lightning_invoice::{Bolt11InvoiceDescription, Description}; use lightning_types::payment::{PaymentHash, PaymentPreimage}; use log::LevelFilter; @@ -52,6 +58,106 @@ async fn channel_full_cycle() { .await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn channel_full_cycle_tier_store() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + let (primary_a, backup_a, ephemeral_a) = create_tier_stores(random_storage_path()); + let (primary_b, backup_b, ephemeral_b) = create_tier_stores(random_storage_path()); + + let (node_a, node_b) = setup_two_nodes_with_store( + &chain_source, + false, + true, + false, + TestStoreType::TierStore { + primary: primary_a.clone(), + backup: Some(backup_a.clone()), + ephemeral: Some(ephemeral_a.clone()), + }, + TestStoreType::TierStore { + primary: primary_b, + backup: Some(backup_b), + ephemeral: Some(ephemeral_b), + }, + ); + do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, false) + .await; + + // Verify primary and backup both contain the same channel manager data. + let primary_channel_manager = test_kv_read( + &(primary_a.clone()), + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ) + .expect("Primary should have channel manager data"); + let backup_channel_manager = test_kv_read( + &(backup_a.clone()), + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ) + .expect("Backup should have channel manager data"); + assert_eq!( + primary_channel_manager, backup_channel_manager, + "Primary and backup should store identical channel manager data" + ); + + // Verify payment data is persisted in both primary and backup stores. + let primary_payments = + test_kv_list(&(primary_a.clone()), "payments", "").expect("Primary should list payments"); + assert!( + !primary_payments.is_empty(), + "Primary should have payment entries after the full cycle" + ); + + let backup_payments = + test_kv_list(&(backup_a.clone()), "payments", "").expect("Backup should list payments"); + assert!(!backup_payments.is_empty(), "Backup should have payment entries after the full cycle"); + + // Verify ephemeral store does not contain primary-backed critical data. + let ephemeral_channel_manager = test_kv_read( + &(ephemeral_a.clone()), + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + assert!(ephemeral_channel_manager.is_err(), "Ephemeral should not have channel manager data"); + + let ephemeral_payments = test_kv_list(&(ephemeral_a.clone()), "payments", ""); + assert!( + ephemeral_payments.is_err() || ephemeral_payments.unwrap().is_empty(), + "Ephemeral should not have payment data" + ); + + // Verify ephemeral-routed data is stored in the ephemeral store. + let ephemeral_network_graph = test_kv_read( + &(ephemeral_a.clone()), + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ); + assert!(ephemeral_network_graph.is_ok(), "Ephemeral should have network graph data"); + + // Verify ephemeral-routed data is not mirrored to primary or backup stores. + let primary_network_graph = test_kv_read( + &(primary_a.clone()), + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ); + assert!(primary_network_graph.is_err(), "Primary should not have ephemeral network graph data"); + + let backup_network_graph = test_kv_read( + &(backup_a.clone()), + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ); + assert!(backup_network_graph.is_err(), "Backup should not have ephemeral network graph data"); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn channel_full_cycle_force_close() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); @@ -237,8 +343,20 @@ async fn start_stop_reinit() { setup_builder!(builder, config.node_config); builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - let node = - builder.build_with_store(config.node_entropy.into(), test_sync_store.clone()).unwrap(); + let node; + #[cfg(feature = "uniffi")] + { + use ldk_node::{DynStoreWrapper, FfiDynStoreTrait}; + + let test_sync_store: Arc = + Arc::new(DynStoreWrapper(test_sync_store.clone())); + + node = builder.build_with_store(config.node_entropy.into(), test_sync_store).unwrap(); + } + #[cfg(not(feature = "uniffi"))] + { + node = builder.build_with_store(config.node_entropy, test_sync_store.clone()).unwrap(); + } node.start().unwrap(); let expected_node_id = node.node_id(); @@ -276,8 +394,21 @@ async fn start_stop_reinit() { setup_builder!(builder, config.node_config); builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - let reinitialized_node = - builder.build_with_store(config.node_entropy.into(), test_sync_store).unwrap(); + let reinitialized_node; + #[cfg(feature = "uniffi")] + { + use ldk_node::{DynStoreWrapper, FfiDynStoreTrait}; + + let test_sync_store: Arc = Arc::new(DynStoreWrapper(test_sync_store)); + + reinitialized_node = + builder.build_with_store(config.node_entropy.into(), test_sync_store).unwrap(); + } + #[cfg(not(feature = "uniffi"))] + { + reinitialized_node = + builder.build_with_store(config.node_entropy, test_sync_store).unwrap(); + } reinitialized_node.start().unwrap(); assert_eq!(reinitialized_node.node_id(), expected_node_id);