OmniPaxos

集群中的每个服务器都应该有一个OmniPaxos结构体的本地实例。OmniPaxos维护分布式日志的本地状态,处理传入消息并生成传出消息,用户必须使用其网络实现来获取和发送这些消息。用户还需通过OmniPaxos访问日志。为了本教程的方便,我们将使用一些开箱即用的宏和存储实现。这需要我们修改Cargo.toml:

[dependencies]
omnipaxos = { version = "LATEST_VERSION", features = ["macros"] }
omnipaxos_storage = "LATEST_VERSION"

示例: 键值存储

作为本教程的指南,我们将使用OmniPaxos来实现分布式日志,以构建一致的键值存储。 我们首先定义我们希望日志项包含的类型:

#[derive(Clone, Debug)] // Clone and Debug are required traits.
pub struct KeyValue {
    pub key: String,
    pub value: u64,
}

新建一个节点

定义了日志项和存储的结构后,我们现在可以继续创建OmniPaxos副本实例。假设我们希望在三台服务器上复制键值存储。在节点2上,我们将执行以下操作:

use omnipaxos::{OmniPaxos, OmniPaxosConfig};
use omnipaxos_storage::{
    memory_storage::MemoryStorage,
};

// configuration with id 1 and the following cluster
let configuration_id = 1;
let cluster = vec![1, 2, 3];

// create the replica 2 in this cluster (other replica instances are created similarly with pid 1 and 3 on the other nodes)
let my_pid = 2;
let my_peers = vec![1, 3];

let omnipaxos_config = OmniPaxosConfig {
    configuration_id,
    pid: my_pid,
    peers: my_peers,
    ..Default::default()
}

let storage = MemoryStorage::<KeyValue, ()>::default();
let mut omni_paxos = omnipaxos_config.build(storage);

为了方便起见,OmniPaxosConfig 还提供了一个构造函数OmniPaxosConfig::with_toml(),该构造函数使用TOML, 用户可以将参数保存在文件config/node1.toml中。

configuration_id = 1
pid = 2
peers = [1, 3]
logger_file_path = "/omnipaxos/logs"

这些参数被加载后可以用来构建OmniPaxosConfig:

let config_file_path = "config/node1.toml";
let omnipaxos_config = OmniPaxosConfig::with_toml(config_file_path);

故障恢复

为了支持故障恢复,我们必须确保存储的实现能够持久化日志项和存储状态。恢复后,我们必须确保我们的OmniPaxos将从之前的持久状态开始。为此,我们首先使用与上一个实例相同的存储路径重新创建存储。然后我们创建一个OmniPaxos实例,但使用持久状态作为storage参数。最后,我们调用fail_recovery()来正确初始化易变状态。我们使用PersistentStorage进行示例。

/* Re-creating our node after a crash... */

// Configuration from previous storage
let my_path = "/my_path_before_crash/";
let my_log_opts = LogOptions::new(my_path);
let persist_conf = PersistentStorageConfig::default();

persist_conf.set_path(my_path); // set the path to the persistent storage
my_config.set_commitlog_options(my_logopts);

// Re-create storage with previous state, then create `OmniPaxos`
let recovered_storage = PersistentStorage::open(persist_conf);
let mut recovered_paxos = omnipaxos_config.build(recovered_storage);
recovered_paxos.fail_recovery();