Storage
You are free to use any storage implementation with OmniPaxos
. The only requirement is that it implements the Storage
trait. OmniPaxos includes the package omnipaxos_storage
which provides two types of storage implementation that work out of the box: MemoryStorage
and PersistentStorage
.
Importing omnipaxos_storage
To use the provided storage implementations, we need to add omnipaxos_storage
to the dependencies in the cargo file. You can find the latest version on crates.
[dependencies]
omnipaxos_storage = { version = "LATEST_VERSION", default-features = true }
If you do decide to implement your own storage, we recommend taking a look at MemoryStorage
as a reference for implementing the functions required by Storage
.
Upon receiving a StorageResult::Error(_)
from the storage implementation, Omnipaxos tries to roll back incomplete changes, to enable crash-recovery, and then panicks.
MemoryStorage
MemoryStorage
is an in-memory storage implementation and it will be used in our examples. For simplicity, we leave out some parts of the implementation for now (such as Snapshots).
// from the module omnipaxos_storage::memory_storage
#[derive(Clone)]
pub struct MemoryStorage<T>
where
T: Entry,
{
/// Vector which contains all the replicated entries in-memory.
log: Vec<T>,
/// Last promised round.
n_prom: Ballot,
/// Last accepted round.
acc_round: Ballot,
/// Length of the decided log.
ld: usize,
/// Garbage collected index.
trimmed_idx: usize,
...
}
impl<T> Storage<T> for MemoryStorage<T>
where
T: Entry,
{
fn write_atomically(&mut self, ops: Vec<StorageOp<T>>) -> StorageResult<()> {
for op in ops {
match op {
StorageOp::AppendEntry(entry) => self.append_entry(entry)?,
StorageOp::AppendEntries(entries) => self.append_entries(entries)?,
StorageOp::AppendOnPrefix(from_idx, entries) => {
self.append_on_prefix(from_idx, entries)?
}
StorageOp::SetPromise(bal) => self.set_promise(bal)?,
StorageOp::SetDecidedIndex(idx) => self.set_decided_idx(idx)?,
StorageOp::SetAcceptedRound(bal) => self.set_accepted_round(bal)?,
...
}
}
Ok(())
}
fn append_entry(&mut self, entry: T) -> StorageResult<()> {
self.log.push(entry);
Ok(())
}
fn append_entries(&mut self, entries: Vec<T>) -> StorageResult<()> {
let mut e = entries;
self.log.append(&mut e);
Ok(())
}
fn append_on_prefix(&mut self, from_idx: usize, entries: Vec<T>) -> StorageResult<()> {
self.log.truncate(from_idx - self.trimmed_idx);
self.append_entries(entries)
}
fn set_promise(&mut self, n_prom: Ballot) -> StorageResult<()> {
self.n_prom = Some(n_prom);
Ok(())
}
fn set_decided_idx(&mut self, ld: usize) -> StorageResult<()> {
self.ld = ld;
Ok(())
}
fn get_decided_idx(&self) -> StorageResult<usize> {
Ok(self.ld)
}
fn set_accepted_round(&mut self, na: Ballot) -> StorageResult<()> {
self.acc_round = Some(na);
Ok(())
}
fn get_accepted_round(&self) -> StorageResult<Option<Ballot>> {
Ok(self.acc_round)
}
fn get_entries(&self, from: usize, to: usize) -> StorageResult<Vec<T>> {
let from = from - self.trimmed_idx;
let to = to - self.trimmed_idx;
Ok(self.log.get(from..to).unwrap_or(&[]).to_vec())
}
fn get_log_len(&self) -> StorageResult<usize> {
Ok(self.log.len())
}
fn get_suffix(&self, from: usize) -> StorageResult<Vec<T>> {
Ok(match self.log.get((from - self.trimmed_idx)..) {
Some(s) => s.to_vec(),
None => vec![],
})
}
fn get_promise(&self) -> StorageResult<Option<Ballot>> {
Ok(self.n_prom)
}
fn trim(&mut self, trimmed_idx: usize) -> StorageResult<()> {
let to_trim = (trimmed_idx - self.trimmed_idx).min(self.log.len());
self.log.drain(0..to_trim);
self.trimmed_idx = trimmed_idx;
Ok(())
}
...
}
PersistentStorage
PersistentStorage
is a persistent storage implementation, built on top of RocksDB, that stores the replicated log and the state of OmniPaxos. It can be enabled with the “persistent_storage” feature flag. Users can configure the path to log entries and OmniPaxos state, and storage-related options through PersistentStorageConfig
. The configuration struct features a default()
constructor for generating default configuration, and the constructor with()
that takes the storage path and options as arguments.
[dependencies]
omnipaxos_storage = { version = "LATEST_VERSION", features=["persistent_storage"] }
The persistent storage implementation must first be enabled via the “persistent_storage” feature flag.
use omnipaxos_storage::{
persistent_storage::{PersistentStorage, PersistentStorageConfig},
};
use rocksdb;
// user-defined configuration
let my_path = "my_storage";
let log_store_options = rocksdb::Options::default();
let state_store_options = rocksdb::Options::default();
state_store_options.create_missing_column_families(true); // required
state_store_options.create_if_missing(true); // required
// generate default configuration and set user-defined options
let mut my_config = PersistentStorageConfig::default();
my_config.set_path(my_path.to_string());
my_config.set_database_options(state_store_options);
my_config.set_log_options(log_store_options);
Batching
OmniPaxos supports batching to reduce the number of IO operations to storage. It is enabled by specifying the batch_size
in OmniPaxosConfig
.
let omnipaxos_config = OmniPaxosConfig {
server_config: ServerConfig {
batch_size: 100, // `batch_size = 1` by default
..Default::default()
},
cluster_config: ClusterConfig {
configuration_id: 1,
nodes: vec![1, 2, 3],
..Default::default()
},
};
// build omnipaxos instance
Note OmniPaxos will wait until the batch size is reached before the entries get decided. A larger batch size may therefore incur higher latency before an append operation is decided.