Store latest leaf index per destination (#188)

* Initial pass, confused why referencing a Vec<CommittedMessageWithMeta> isn't turning it into a struct

* It compiles, needs cleaning

* CommittedMessageWithMeta -> RawCommittedMessageWithMeta

* cargo fmt

* update_latest_leaf_index_for_destination

* nits

* cargo fmt

* fix tests

* cargo fmt

* Compiling, rm'd message timestamp

* cargo fmt

* Undo changes to abacus-test/src/test_utils.rs
pull/206/head
Trevor Porter 3 years ago committed by GitHub
parent 582e4bd83c
commit 28ffe4bccd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      rust/abacus-base/src/home.rs
  2. 57
      rust/abacus-core/src/db/abacus_db.rs

@ -140,8 +140,8 @@ impl HomeEvents for CachingHome {
nonce: u32, nonce: u32,
) -> Result<Option<RawCommittedMessage>, ChainCommunicationError> { ) -> Result<Option<RawCommittedMessage>, ChainCommunicationError> {
loop { loop {
if let Some(update) = self.db.message_by_nonce(destination, nonce)? { if let Some(message) = self.db.message_by_nonce(destination, nonce)? {
return Ok(Some(update)); return Ok(Some(message));
} }
sleep(Duration::from_millis(500)).await; sleep(Duration::from_millis(500)).await;
} }
@ -153,8 +153,8 @@ impl HomeEvents for CachingHome {
leaf: H256, leaf: H256,
) -> Result<Option<RawCommittedMessage>, ChainCommunicationError> { ) -> Result<Option<RawCommittedMessage>, ChainCommunicationError> {
loop { loop {
if let Some(update) = self.db.message_by_leaf(leaf)? { if let Some(message) = self.db.message_by_leaf(leaf)? {
return Ok(Some(update)); return Ok(Some(message));
} }
sleep(Duration::from_millis(500)).await; sleep(Duration::from_millis(500)).await;
} }

@ -23,6 +23,7 @@ static UPDATE: &str = "update_";
static UPDATE_META: &str = "update_metadata_"; static UPDATE_META: &str = "update_metadata_";
static LATEST_ROOT: &str = "update_latest_root_"; static LATEST_ROOT: &str = "update_latest_root_";
static LATEST_LEAF_INDEX: &str = "latest_known_leaf_index_"; static LATEST_LEAF_INDEX: &str = "latest_known_leaf_index_";
static LATEST_LEAF_INDEX_FOR_DESTINATION: &str = "latest_known_leaf_index_for_destination_";
static UPDATER_PRODUCED_UPDATE: &str = "updater_produced_update_"; static UPDATER_PRODUCED_UPDATE: &str = "updater_produced_update_";
/// DB handle for storing data tied to a specific home. /// DB handle for storing data tied to a specific home.
@ -81,19 +82,14 @@ impl AbacusDB {
pub fn store_latest_message(&self, message: &RawCommittedMessage) -> Result<()> { pub fn store_latest_message(&self, message: &RawCommittedMessage) -> Result<()> {
// If there is no latest root, or if this update is on the latest root // If there is no latest root, or if this update is on the latest root
// update latest root // update latest root
match self.retrieve_latest_leaf_index()? { if let Some(idx) = self.retrieve_latest_leaf_index()? {
Some(idx) => { if idx != message.leaf_index - 1 {
if idx == message.leaf_index - 1 { debug!(
self.update_latest_leaf_index(message.leaf_index)?; "Attempted to store message not building off latest leaf index. Latest leaf index: {}. Attempted leaf index: {}.",
} else { idx,
debug!( message.leaf_index,
"Attempted to store message not building off latest leaf index. Latest leaf index: {}. Attempted leaf index: {}.", )
idx,
message.leaf_index,
)
}
} }
None => self.update_latest_leaf_index(message.leaf_index)?,
} }
self.store_raw_committed_message(message) self.store_raw_committed_message(message)
@ -108,19 +104,17 @@ impl AbacusDB {
pub fn store_raw_committed_message(&self, message: &RawCommittedMessage) -> Result<()> { pub fn store_raw_committed_message(&self, message: &RawCommittedMessage) -> Result<()> {
let parsed = AbacusMessage::read_from(&mut message.message.clone().as_slice())?; let parsed = AbacusMessage::read_from(&mut message.message.clone().as_slice())?;
let destination_and_nonce = parsed.destination_and_nonce();
let leaf = message.leaf(); let leaf = message.leaf();
debug!( debug!(
leaf = ?leaf, leaf = ?leaf,
destination_and_nonce, destination_and_nonce = parsed.destination_and_nonce(),
destination = parsed.destination, destination = parsed.destination,
nonce = parsed.nonce, nonce = parsed.nonce,
leaf_index = message.leaf_index, leaf_index = message.leaf_index,
"storing raw committed message in db" "storing raw committed message in db"
); );
self.store_leaf(message.leaf_index, destination_and_nonce, leaf)?; self.store_leaf(message.leaf_index, parsed.destination, parsed.nonce, leaf)?;
self.store_keyed_encodable(MESSAGE, &leaf, message)?; self.store_keyed_encodable(MESSAGE, &leaf, message)?;
Ok(()) Ok(())
} }
@ -142,11 +136,36 @@ impl AbacusDB {
self.retrieve_decodable("", LATEST_LEAF_INDEX) self.retrieve_decodable("", LATEST_LEAF_INDEX)
} }
/// Store the latest known leaf_index for a destination
///
/// Key --> value: `destination` --> `leaf_index`
pub fn update_latest_leaf_index_for_destination(
&self,
destination: u32,
leaf_index: u32,
) -> Result<(), DbError> {
if let Ok(Some(idx)) = self.retrieve_latest_leaf_index_for_destination(destination) {
if leaf_index <= idx {
return Ok(());
}
}
self.store_keyed_encodable(LATEST_LEAF_INDEX_FOR_DESTINATION, &destination, &leaf_index)
}
/// Retrieve the highest known leaf_index for a destination
pub fn retrieve_latest_leaf_index_for_destination(
&self,
destination: u32,
) -> Result<Option<u32>, DbError> {
self.retrieve_keyed_decodable(LATEST_LEAF_INDEX_FOR_DESTINATION, &destination)
}
/// Store the leaf keyed by leaf_index /// Store the leaf keyed by leaf_index
fn store_leaf( fn store_leaf(
&self, &self,
leaf_index: u32, leaf_index: u32,
destination_and_nonce: u64, destination: u32,
nonce: u32,
leaf: H256, leaf: H256,
) -> Result<(), DbError> { ) -> Result<(), DbError> {
debug!( debug!(
@ -154,9 +173,11 @@ impl AbacusDB {
leaf = ?leaf, leaf = ?leaf,
"storing leaf hash keyed by index and dest+nonce" "storing leaf hash keyed by index and dest+nonce"
); );
let destination_and_nonce = utils::destination_and_nonce(destination, nonce);
self.store_keyed_encodable(LEAF, &destination_and_nonce, &leaf)?; self.store_keyed_encodable(LEAF, &destination_and_nonce, &leaf)?;
self.store_keyed_encodable(LEAF, &leaf_index, &leaf)?; self.store_keyed_encodable(LEAF, &leaf_index, &leaf)?;
self.update_latest_leaf_index(leaf_index) self.update_latest_leaf_index(leaf_index)?;
self.update_latest_leaf_index_for_destination(destination, leaf_index)
} }
/// Retrieve a raw committed message by its leaf hash /// Retrieve a raw committed message by its leaf hash

Loading…
Cancel
Save