Skip to content

Commit

Permalink
refactor(blendnet): rename mix to blend (#63)
Browse files Browse the repository at this point in the history
* rename mix to blend

* update ci

* add missing dir
  • Loading branch information
youngjoon-lee authored Dec 13, 2024
1 parent 8ef0adc commit 45a2152
Show file tree
Hide file tree
Showing 13 changed files with 73 additions and 73 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ env:
CARGO_TERM_COLOR: always

jobs:
mixnet:
simlib:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
Expand Down
4 changes: 2 additions & 2 deletions simlib/blendnet-sims/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ tracing = "0.1.40"
tracing-subscriber = { version = "0.3", features = ["json", "env-filter", "tracing-log"] }
netrunner = { path = "../netrunner" }
nomos-tracing = { git = "https://github.com/logos-co/nomos-node.git" }
nomos-mix = { git = "https://github.com/logos-co/nomos-node", package = "nomos-mix" }
nomos-mix-message = { git = "https://github.com/logos-co/nomos-node", package = "nomos-mix-message" }
nomos-blend = { git = "https://github.com/logos-co/nomos-node", package = "nomos-blend" }
nomos-blend-message = { git = "https://github.com/logos-co/nomos-node", package = "nomos-blend-message" }
futures = "0.3.31"
rand_chacha = "0.3"
multiaddr = "0.18"
Expand Down
2 changes: 1 addition & 1 deletion simlib/blendnet-sims/config/blendnet.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@
"max_emission_frequency": 1.0,
"drop_message_probability": 0.0
},
"number_of_mix_layers": 2,
"number_of_blend_layers": 2,
"max_delay_seconds": 10
}
34 changes: 17 additions & 17 deletions simlib/blendnet-sims/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
// crates
use crate::node::mix::state::{MixnodeRecord, MixnodeState};
use crate::node::mix::{MixMessage, MixnodeSettings};
use crate::node::blend::state::{BlendnodeRecord, BlendnodeState};
use crate::node::blend::{BlendMessage, BlendnodeSettings};
use anyhow::Ok;
use clap::Parser;
use crossbeam::channel;
Expand All @@ -16,8 +16,8 @@ use netrunner::node::{NodeId, NodeIdExt};
use netrunner::output_processors::Record;
use netrunner::runner::{BoxedNode, SimulationRunnerHandle};
use netrunner::streaming::{io::IOSubscriber, naive::NaiveSubscriber, StreamType};
use nomos_mix::cover_traffic::CoverTrafficSettings;
use nomos_mix::message_blend::{
use nomos_blend::cover_traffic::CoverTrafficSettings;
use nomos_blend::message_blend::{
CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings,
};
use parking_lot::Mutex;
Expand All @@ -28,7 +28,7 @@ use rand_chacha::ChaCha12Rng;
use serde::de::DeserializeOwned;
use serde::Serialize;
// internal
use crate::node::mix::MixNode;
use crate::node::blend::BlendNode;
use crate::settings::SimSettings;
use netrunner::{runner::SimulationRunner, settings::SimulationSettings};

Expand Down Expand Up @@ -88,19 +88,19 @@ impl SimulationApp {
let regions_data = RegionsData::new(regions, behaviours);

let ids = node_ids.clone();
let network = Arc::new(Mutex::new(Network::<MixMessage>::new(regions_data, seed)));
let network = Arc::new(Mutex::new(Network::<BlendMessage>::new(regions_data, seed)));

let nodes: Vec<_> = node_ids
.iter()
.copied()
.map(|node_id| {
let mut network = network.lock();
create_boxed_mixnode(
create_boxed_blendnode(
node_id,
&mut network,
settings.simulation_settings.clone(),
no_netcap,
MixnodeSettings {
BlendnodeSettings {
connected_peers: ids
.iter()
.filter(|&id| id != &node_id)
Expand All @@ -115,7 +115,7 @@ impl SimulationApp {
message_blend: MessageBlendSettings {
cryptographic_processor: CryptographicProcessorSettings {
private_key: node_id.into(),
num_mix_layers: settings.number_of_mix_layers,
num_blend_layers: settings.number_of_blend_layers,
},
temporal_processor: TemporalSchedulerSettings {
max_delay_seconds: settings.max_delay_seconds,
Expand All @@ -140,13 +140,13 @@ impl SimulationApp {
}
}

fn create_boxed_mixnode(
fn create_boxed_blendnode(
node_id: NodeId,
network: &mut Network<MixMessage>,
network: &mut Network<BlendMessage>,
simulation_settings: SimulationSettings,
no_netcap: bool,
mixnode_settings: MixnodeSettings,
) -> BoxedNode<MixnodeSettings, MixnodeState> {
blendnode_settings: BlendnodeSettings,
) -> BoxedNode<BlendnodeSettings, BlendnodeState> {
let (node_message_broadcast_sender, node_message_broadcast_receiver) = channel::unbounded();
let (node_message_sender, node_message_receiver) = channel::unbounded();
// Dividing milliseconds in second by milliseconds in the step.
Expand Down Expand Up @@ -174,7 +174,7 @@ fn create_boxed_mixnode(
node_message_sender,
network_message_receiver,
);
Box::new(MixNode::new(node_id, mixnode_settings, network_interface))
Box::new(BlendNode::new(node_id, blendnode_settings, network_interface))
}

fn run<M, S, T>(
Expand All @@ -189,7 +189,7 @@ where
T: Serialize + Clone + 'static,
{
let stream_settings = settings.stream_settings.clone();
let runner = SimulationRunner::<_, MixnodeRecord, S, T>::new(
let runner = SimulationRunner::<_, BlendnodeRecord, S, T>::new(
network,
nodes,
Default::default(),
Expand All @@ -199,11 +199,11 @@ where
let handle = match stream_type {
Some(StreamType::Naive) => {
let settings = stream_settings.unwrap_naive();
runner.simulate_and_subscribe::<NaiveSubscriber<MixnodeRecord>>(settings)?
runner.simulate_and_subscribe::<NaiveSubscriber<BlendnodeRecord>>(settings)?
}
Some(StreamType::IO) => {
let settings = stream_settings.unwrap_io();
runner.simulate_and_subscribe::<IOSubscriber<MixnodeRecord>>(settings)?
runner.simulate_and_subscribe::<IOSubscriber<BlendnodeRecord>>(settings)?
}
None => runner.simulate()?,
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::node::mix::scheduler::Interval;
use crate::node::blend::scheduler::Interval;
use crossbeam::channel;
use futures::stream::iter;
use futures::{Stream, StreamExt};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub mod scheduler;
pub mod state;
pub mod stream_wrapper;

use crate::node::mix::consensus_streams::{Epoch, Slot};
use crate::node::blend::consensus_streams::{Epoch, Slot};
use cached::{Cached, TimedCache};
use crossbeam::channel;
use futures::Stream;
Expand All @@ -18,7 +18,7 @@ use netrunner::{
network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize},
warding::WardCondition,
};
use nomos_mix::{
use nomos_blend::{
cover_traffic::{CoverTraffic, CoverTrafficSettings},
membership::Membership,
message_blend::{
Expand All @@ -27,49 +27,49 @@ use nomos_mix::{
persistent_transmission::{
PersistentTransmissionExt, PersistentTransmissionSettings, PersistentTransmissionStream,
},
MixOutgoingMessage,
BlendOutgoingMessage,
};
use nomos_mix_message::mock::MockMixMessage;
use nomos_blend_message::mock::MockBlendMessage;
use rand::SeedableRng;
use rand_chacha::ChaCha12Rng;
use scheduler::{Interval, TemporalRelease};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use state::MixnodeState;
use state::BlendnodeState;
use std::{pin::pin, task::Poll, time::Duration};
use stream_wrapper::CrossbeamReceiverStream;

#[derive(Debug, Clone)]
pub struct MixMessage(Vec<u8>);
pub struct BlendMessage(Vec<u8>);

impl PayloadSize for MixMessage {
impl PayloadSize for BlendMessage {
fn size_bytes(&self) -> u32 {
2208
}
}

#[derive(Deserialize)]
pub struct MixnodeSettings {
pub struct BlendnodeSettings {
pub connected_peers: Vec<NodeId>,
pub data_message_lottery_interval: Duration,
pub stake_proportion: f64,
pub seed: u64,
pub epoch_duration: Duration,
pub slot_duration: Duration,
pub persistent_transmission: PersistentTransmissionSettings,
pub message_blend: MessageBlendSettings<MockMixMessage>,
pub message_blend: MessageBlendSettings<MockBlendMessage>,
pub cover_traffic_settings: CoverTrafficSettings,
pub membership: Vec<<MockMixMessage as nomos_mix_message::MixMessage>::PublicKey>,
pub membership: Vec<<MockBlendMessage as nomos_blend_message::BlendMessage>::PublicKey>,
}

type Sha256Hash = [u8; 32];

/// This node implementation only used for testing different streaming implementation purposes.
pub struct MixNode {
pub struct BlendNode {
id: NodeId,
state: MixnodeState,
settings: MixnodeSettings,
network_interface: InMemoryNetworkInterface<MixMessage>,
state: BlendnodeState,
settings: BlendnodeSettings,
network_interface: InMemoryNetworkInterface<BlendMessage>,
message_cache: TimedCache<Sha256Hash, ()>,

data_msg_lottery_update_time_sender: channel::Sender<Duration>,
Expand All @@ -81,28 +81,28 @@ pub struct MixNode {
persistent_transmission_messages: PersistentTransmissionStream<
CrossbeamReceiverStream<Vec<u8>>,
ChaCha12Rng,
MockMixMessage,
MockBlendMessage,
Interval,
>,
crypto_processor: CryptographicProcessor<ChaCha12Rng, MockMixMessage>,
crypto_processor: CryptographicProcessor<ChaCha12Rng, MockBlendMessage>,
blend_sender: channel::Sender<Vec<u8>>,
blend_update_time_sender: channel::Sender<Duration>,
blend_messages: MessageBlendStream<
CrossbeamReceiverStream<Vec<u8>>,
ChaCha12Rng,
MockMixMessage,
MockBlendMessage,
TemporalRelease,
>,
epoch_update_sender: channel::Sender<Duration>,
slot_update_sender: channel::Sender<Duration>,
cover_traffic: CoverTraffic<Epoch, Slot, MockMixMessage>,
cover_traffic: CoverTraffic<Epoch, Slot, MockBlendMessage>,
}

impl MixNode {
impl BlendNode {
pub fn new(
id: NodeId,
settings: MixnodeSettings,
network_interface: InMemoryNetworkInterface<MixMessage>,
settings: BlendnodeSettings,
network_interface: InMemoryNetworkInterface<BlendMessage>,
) -> Self {
let mut rng_generator = ChaCha12Rng::seed_from_u64(settings.seed);

Expand Down Expand Up @@ -137,18 +137,18 @@ impl MixNode {
let (blend_sender, blend_receiver) = channel::unbounded();
let (blend_update_time_sender, blend_update_time_receiver) = channel::unbounded();
let nodes: Vec<
nomos_mix::membership::Node<
<MockMixMessage as nomos_mix_message::MixMessage>::PublicKey,
nomos_blend::membership::Node<
<MockBlendMessage as nomos_blend_message::BlendMessage>::PublicKey,
>,
> = settings
.membership
.iter()
.map(|&public_key| nomos_mix::membership::Node {
.map(|&public_key| nomos_blend::membership::Node {
address: Multiaddr::empty(),
public_key,
})
.collect();
let membership = Membership::<MockMixMessage>::new(nodes, id.into());
let membership = Membership::<MockBlendMessage>::new(nodes, id.into());
let crypto_processor = CryptographicProcessor::new(
settings.message_blend.cryptographic_processor.clone(),
membership.clone(),
Expand All @@ -172,7 +172,7 @@ impl MixNode {
// tier 3 cover traffic
let (epoch_update_sender, epoch_updater_update_receiver) = channel::unbounded();
let (slot_update_sender, slot_updater_update_receiver) = channel::unbounded();
let cover_traffic: CoverTraffic<Epoch, Slot, MockMixMessage> = CoverTraffic::new(
let cover_traffic: CoverTraffic<Epoch, Slot, MockBlendMessage> = CoverTraffic::new(
settings.cover_traffic_settings,
Epoch::new(settings.epoch_duration, epoch_updater_update_receiver),
Slot::new(
Expand All @@ -189,7 +189,7 @@ impl MixNode {
// We expected that a message will be delivered to most of nodes within 60s.
message_cache: TimedCache::with_lifespan(60),
settings,
state: MixnodeState {
state: BlendnodeState {
node_id: id,
step_id: 0,
num_messages_fully_unwrapped: 0,
Expand All @@ -212,7 +212,7 @@ impl MixNode {

fn forward(
&mut self,
message: MixMessage,
message: BlendMessage,
exclude_node: Option<NodeId>,
log: Option<EmissionLog>,
) {
Expand All @@ -234,7 +234,7 @@ impl MixNode {
self.message_cache.cache_set(Self::sha256(&message.0), ());
}

fn receive(&mut self) -> Vec<NetworkMessage<MixMessage>> {
fn receive(&mut self) -> Vec<NetworkMessage<BlendMessage>> {
self.network_interface
.receive_messages()
.into_iter()
Expand Down Expand Up @@ -293,10 +293,10 @@ impl MixNode {
}
}

impl Node for MixNode {
type Settings = MixnodeSettings;
impl Node for BlendNode {
type Settings = BlendnodeSettings;

type State = MixnodeState;
type State = BlendnodeState;

fn id(&self) -> NodeId {
self.id
Expand Down Expand Up @@ -339,10 +339,10 @@ impl Node for MixNode {
// Proceed message blend
if let Poll::Ready(Some(msg)) = pin!(&mut self.blend_messages).poll_next(&mut cx) {
match msg {
MixOutgoingMessage::Outbound(msg) => {
BlendOutgoingMessage::Outbound(msg) => {
self.persistent_sender.send(msg).unwrap();
}
MixOutgoingMessage::FullyUnwrapped(payload) => {
BlendOutgoingMessage::FullyUnwrapped(payload) => {
let payload = Payload::load(payload);
self.log_message_fully_unwrapped(&payload);
self.state.num_messages_fully_unwrapped += 1;
Expand All @@ -367,7 +367,7 @@ impl Node for MixNode {
pin!(&mut self.persistent_transmission_messages).poll_next(&mut cx)
{
self.forward(
MixMessage(msg),
BlendMessage(msg),
None,
Some(self.new_emission_log("FromPersistent")),
);
Expand Down
Loading

0 comments on commit 45a2152

Please sign in to comment.