Skip to content
Snippets Groups Projects
Commit f5d74c85 authored by DISSOUBRAY Nathan's avatar DISSOUBRAY Nathan
Browse files

Write the send_block_to protocol

parent aae772ff
Branches
Tags
1 merge request!14Write the send_block_to protocol
......@@ -30,6 +30,9 @@ bs58 = "0.5.1"
rs_merkle = "1.4.2"
resolve-path = "0.1.0"
async-stream = "0.3.5"
libp2p-stream = "0.1.0-alpha.1"
strum = { version = "0.26", features = ["derive"] }
chrono = "0.4.38"
[dependencies.libp2p]
default-features = false
......
......@@ -15,10 +15,7 @@ clippy:
cargo clippy --workspace --all-targets -- -D warnings
test:
nu tests/test_network_builder.nu
nu tests/message_exchange_block_per_block.nu
nu tests/get_file_2_peers.nu
nu tests/get_file_4_peers_min_blocks.nu
nu tests/help_func/execute_all_tests.nu
show:
rustup --version
......
......@@ -236,5 +236,22 @@ export def node-info [
--node: string = $DEFAULT_IP,
] nothing -> any {
log debug $"Getting the info from node ($node)"
$"node-info" | run-command $node
"node-info" | run-command $node
}
export def send-block-to [
peer_id_base_58: string,
file_hash: string,
block_hash: string,
--node: string = $DEFAULT_IP
] nothing -> any {
log debug $"Sending block ($block_hash) part of file ($file_hash) to ($peer_id_base_58)"
$"send-block-to/($peer_id_base_58)/($file_hash)/($block_hash)" | run-command $node
}
export def get-available-storage [
--node: string = $DEFAULT_IP
] nothing -> any {
log debug $"Getting the size left available for sending blocks from ($node)"
$"get-available-storage" | run-command $node
}
......@@ -2,6 +2,7 @@ use std log
const NAME = "dragoonfly"
const LOG_DIR = ($nu.temp-path | path join $NAME)
const POWERS_PATH = "setup/powers/powers_test_Fr_155kB"
# create a swarm table
export def "swarm create" [n: int]: nothing -> table {
......@@ -35,7 +36,7 @@ export def "swarm run" [
log info $"launching node ($node.seed) \(($node.ip_port)\)"
^bash -c (
$"cargo run --features '($features | str join ',')' "
+ $"-- ($node.ip_port) ($node.seed) "
+ $"-- ($POWERS_PATH) ($node.ip_port) ($node.seed) "
+ $"1> ($log_dir)/($node.seed).log 2> /dev/null &"
)
}
......
......@@ -92,9 +92,12 @@ pub(crate) enum DragoonCommand {
encoding_method: EncodingMethod,
encode_mat_k: usize,
encode_mat_n: usize,
powers_path: String,
powers_path: PathBuf,
sender: Sender<(String, String)>,
},
GetAvailableStorage {
sender: Sender<usize>,
},
GetBlockDir {
file_hash: String,
sender: Sender<PathBuf>,
......@@ -120,7 +123,7 @@ pub(crate) enum DragoonCommand {
GetFile {
file_hash: String,
output_filename: String,
powers_path: String,
powers_path: PathBuf,
sender: Sender<PathBuf>,
},
GetFileDir {
......@@ -147,10 +150,21 @@ pub(crate) enum DragoonCommand {
NodeInfo {
sender: Sender<PeerId>,
},
RemoveEntryFromSendBlockToSet {
peer_id: PeerId,
block_hash: String,
sender: Sender<()>,
},
RemoveListener {
listener_id: u64,
sender: Sender<bool>,
},
SendBlockTo {
peer_id: PeerId,
file_hash: String,
block_hash: String,
sender: Sender<bool>,
},
StartProvide {
key: String,
sender: Sender<()>,
......@@ -165,9 +179,8 @@ impl std::fmt::Display for DragoonCommand {
DragoonCommand::DecodeBlocks { .. } => write!(f, "decode-blocks"),
DragoonCommand::DialMultiple { .. } => write!(f, "dial-multiple"),
DragoonCommand::DialSingle { .. } => write!(f, "dial-single"),
// DragoonCommand::DragoonPeers { .. } => write!(f, "dragoon-peers"),
// DragoonCommand::DragoonSend { .. } => write!(f, "dragoon-send"),
DragoonCommand::EncodeFile { .. } => write!(f, "encode-file"),
DragoonCommand::GetAvailableStorage { .. } => write!(f, "get-available-storage"),
DragoonCommand::GetBlockDir { .. } => write!(f, "get-block-dir"),
DragoonCommand::GetBlockFrom { .. } => write!(f, "get-block-from"),
DragoonCommand::GetBlocksInfoFrom { .. } => write!(f, "get-blocks-info-from"),
......@@ -181,7 +194,11 @@ impl std::fmt::Display for DragoonCommand {
DragoonCommand::GetProviders { .. } => write!(f, "get-providers"),
DragoonCommand::Listen { .. } => write!(f, "listen"),
DragoonCommand::NodeInfo { .. } => write!(f, "node-info"),
DragoonCommand::RemoveEntryFromSendBlockToSet { .. } => {
write!(f, "remove-entry-from-send-block-to-set")
}
DragoonCommand::RemoveListener { .. } => write!(f, "remove-listener"),
DragoonCommand::SendBlockTo { .. } => write!(f, "send-block-to"),
DragoonCommand::StartProvide { .. } => write!(f, "start-provide"),
}
}
......@@ -301,7 +318,7 @@ pub(crate) async fn create_cmd_dial_single(
// }
pub(crate) async fn create_cmd_encode_file(
Path((file_path, replace_blocks, encoding_method, encode_mat_k, encode_mat_n, powers_path)): Path<(String, bool, EncodingMethod, usize, usize, String)>,
Path((file_path, replace_blocks, encoding_method, encode_mat_k, encode_mat_n, powers_path)): Path<(String, bool, EncodingMethod, usize, usize, PathBuf)>,
State(state): State<Arc<AppState>>,
) -> Response {
info!("running command `encode_file`");
......@@ -317,6 +334,13 @@ pub(crate) async fn create_cmd_encode_file(
)
}
pub(crate) async fn create_cmd_get_available_storage(
State(state): State<Arc<AppState>>,
) -> Response {
info!("running command `get_available_storage`");
dragoon_command!(state, GetAvailableStorage)
}
pub(crate) async fn create_cmd_get_block_from(
Path((peer_id_base_58, file_hash, block_hash)): Path<(String, String, String)>,
State(state): State<Arc<AppState>>,
......@@ -351,7 +375,7 @@ pub(crate) async fn create_cmd_get_connected_peers(State(state): State<Arc<AppSt
}
pub(crate) async fn create_cmd_get_file(
Path((file_hash, output_filename, powers_path)): Path<(String, String, String)>,
Path((file_hash, output_filename, powers_path)): Path<(String, String, PathBuf)>,
State(state): State<Arc<AppState>>,
) -> Response {
info!("running command get_file");
......@@ -431,6 +455,16 @@ pub(crate) async fn create_cmd_remove_listener(
dragoon_command!(state, RemoveListener, listener_id)
}
pub(crate) async fn create_cmd_send_block_to(
Path((peer_id_base_58, file_hash, block_hash)): Path<(String, String, String)>,
State(state): State<Arc<AppState>>,
) -> Response {
info!("running command `send_block_to`");
let bytes = bs58::decode(peer_id_base_58).into_vec().unwrap();
let peer_id = PeerId::from_bytes(&bytes).unwrap();
dragoon_command!(state, SendBlockTo, peer_id, block_hash, file_hash)
}
pub(crate) async fn create_cmd_start_provide(
Path(key): Path<String>,
State(state): State<Arc<AppState>>,
......
This diff is collapsed.
......@@ -3,14 +3,15 @@ mod commands;
mod dragoon_network;
mod error;
mod peer_block_info;
mod send_block_to;
mod to_serialize;
use axum::routing::get;
use axum::Router;
use libp2p::identity;
use libp2p::identity::Keypair;
use std::net::SocketAddr;
use std::sync::Arc;
use std::{net::SocketAddr, path::PathBuf};
use tokio::signal;
use tokio::sync::mpsc;
use tracing::info;
......@@ -73,11 +74,21 @@ pub(crate) async fn main() -> Result<()> {
.route("/get-file/:file_hash/:output_filename/:powers_path", get(commands::create_cmd_get_file))
.route("/get-block-list/:file_hash", get(commands::create_cmd_get_block_list))
.route("/get-blocks-info-from/:peer_id_base_58/:file_hash", get(commands::create_cmd_get_blocks_info_from))
.route("/node-info", get(commands::create_cmd_node_info));
.route("/node-info", get(commands::create_cmd_node_info))
.route("/send-block-to/:peer_id_base_58/:file_hash/:block_hash", get(commands::create_cmd_send_block_to))
.route("/get-available-storage", get(commands::create_cmd_get_available_storage));
let router = router.with_state(Arc::new(app::AppState::new(cmd_sender.clone())));
let ip_port: SocketAddr = if let Some(ip_port) = std::env::args().nth(1) {
let powers_path: PathBuf = if let Some(powers_path) = std::env::args().nth(1) {
powers_path
} else {
panic!("No path has been provided for the powers")
}
.parse()
.unwrap();
let ip_port: SocketAddr = if let Some(ip_port) = std::env::args().nth(2) {
ip_port
} else {
"127.0.0.1:3000".to_string()
......@@ -85,7 +96,7 @@ pub(crate) async fn main() -> Result<()> {
.parse()
.unwrap();
let id = if let Some(id) = std::env::args().nth(2) {
let id = if let Some(id) = std::env::args().nth(3) {
id.parse::<u8>().unwrap()
} else {
0
......@@ -100,7 +111,17 @@ pub(crate) async fn main() -> Result<()> {
info!("Peer ID: {} ({})", peer_id, id);
let swarm = dragoon_network::create_swarm(kp).await?;
let network = DragoonNetwork::new(swarm, cmd_receiver, cmd_sender, peer_id, true);
let total_available_storage_for_send = 20usize * 10usize.pow(9); // 20 GB for storing send blocks
let network = DragoonNetwork::new(
swarm,
cmd_receiver,
cmd_sender,
powers_path,
total_available_storage_for_send,
peer_id,
false,
);
tokio::spawn(network.run::<Fr, G1Projective, DensePolynomial<Fr>>());
let shutdown = signal::ctrl_c();
......
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub(crate) struct PeerBlockInfo {
pub(crate) peer_id_base_58: String,
pub(crate) file_hash: String,
pub(crate) block_hashes: Vec<String>,
pub(crate) block_sizes: Option<Vec<usize>>,
}
mod protocol;
use std::fs as sfs;
use std::io::{BufRead, Write};
use std::{
path::PathBuf,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use anyhow::Result;
use ark_ec::CurveGroup;
use ark_ff::PrimeField;
use ark_poly::DenseUVPolynomial;
use ark_std::ops::Div;
use chrono::Utc;
use futures::StreamExt;
use libp2p_stream::IncomingStreams;
use tokio::sync::{
mpsc::{self, Receiver},
Semaphore,
};
use tracing::{debug, error};
use crate::dragoon_network;
pub(crate) use protocol::handle_send_block_exchange_sender_side as send_block_to;
#[derive(Clone)]
pub(crate) struct SendBlockHandler {}
/// An async handler to spawn on a node when we want to automatically manage receiving blocks coming from send requests
impl SendBlockHandler {
pub(crate) fn run<F, G, P>(
mut incoming_streams: IncomingStreams,
powers_path: PathBuf,
file_dir: PathBuf,
current_available_storage: Arc<AtomicUsize>,
total_block_size_on_disk: Arc<AtomicUsize>,
) -> Result<()>
where
F: PrimeField,
G: CurveGroup<ScalarField = F>,
P: DenseUVPolynomial<F>,
for<'a, 'b> &'a P: Div<&'b P, Output = P>,
{
tokio::spawn(async move {
//allow at most 10 send request to be managed at once
let max_send_request = 10;
let semaphore = Arc::new(Semaphore::new(max_send_request));
let (write_to_file_sender, write_to_file_recv) = mpsc::channel(max_send_request);
tokio::task::spawn_blocking(move || {
Self::add_new_block_info_to_send_file(write_to_file_recv, total_block_size_on_disk)
});
loop {
let permit = semaphore.clone().acquire_owned().await.unwrap();
if let Some((peer, stream)) = incoming_streams.next().await {
let p_path = powers_path.clone();
let f_dir = file_dir.clone();
let new_current_available_storage = current_available_storage.clone();
let new_write_to_file_sender = write_to_file_sender.clone();
tokio::spawn(async move {
match protocol::handle_send_block_exchange_recv_side::<F, G, P>(stream, p_path, f_dir, new_current_available_storage, new_write_to_file_sender).await {
Ok(_) => {debug!("Finished getting block from peer {} without issue", peer)},
Err(e) => error!("The stream with the peer {} for receiving a block due to a send request has been dropped due to an handling error: {}", peer, e)
}
drop(permit);
});
} else {
debug!("We are done with the streams for the send");
return Ok::<(), anyhow::Error>(());
}
}
});
Ok(())
}
/// Used to synchronously modify the file that lists all the blocks
fn add_new_block_info_to_send_file(
mut receiver: Receiver<(PathBuf, usize, String, String, String)>,
total_block_size_on_disk: Arc<AtomicUsize>,
) {
while let Some((file_dir, size_of_block, file_hash, block_hash, peer_id_base_58)) =
receiver.blocking_recv()
{
match Self::add_send_file_inner(
file_dir,
total_block_size_on_disk.clone(),
size_of_block,
file_hash,
block_hash,
peer_id_base_58,
) {
Ok(_) => {}
Err(e) => error!("{}", e),
}
}
}
fn add_send_file_inner(
file_dir: PathBuf,
total_block_size_on_disk: Arc<AtomicUsize>,
size_of_block: usize,
file_hash: String,
block_hash: String,
peer_id_base_58: String,
) -> Result<()> {
total_block_size_on_disk.fetch_add(size_of_block, Ordering::SeqCst);
let old_send_file_path: PathBuf = [
file_dir,
PathBuf::from(dragoon_network::SEND_BLOCK_FILE_NAME),
]
.iter()
.collect();
let mut new_send_file_path = old_send_file_path.clone();
new_send_file_path.set_extension("new.txt");
//TODO remove the created file if we return on an error
let mut new_send_file = sfs::File::options()
.read(true)
.append(true)
.create_new(true)
.open(&new_send_file_path)?;
new_send_file.write_all(
format!(
"Total: {}\n",
total_block_size_on_disk.load(Ordering::Relaxed)
)
.as_bytes(),
)?;
let old_file = sfs::File::open(&old_send_file_path)?;
let mut old_file = std::io::BufReader::new(old_file);
// skip the first line (which is the old total)
old_file.read_line(&mut String::new())?;
//file is in append mode so we are putting the content of the old file in the new file (except the first line)
std::io::copy(&mut old_file, &mut new_send_file)?;
// now append the information about the new block
new_send_file.write_all(
format!(
"Size: {} | Timestamp: {} | file_hash: {} | block_hash: {} | peer_id: {}\n",
size_of_block,
Utc::now(),
file_hash,
block_hash,
peer_id_base_58,
)
.as_bytes(),
)?;
// move the new file on the name of the old file
sfs::rename(new_send_file_path, old_send_file_path)?;
Ok(())
}
}
use anyhow::{format_err, Result};
use ark_ec::CurveGroup;
use ark_ff::PrimeField;
use ark_poly::DenseUVPolynomial;
use ark_serialize::{CanonicalDeserialize, Compress, Validate};
use ark_std::ops::Div;
use futures::{AsyncReadExt, AsyncWriteExt};
use komodo::{verify, Block};
use libp2p::{PeerId, Stream};
use std::path::PathBuf;
use std::{
mem::size_of,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use strum::FromRepr;
use tokio::fs::{self, File};
use tokio::sync::mpsc::Sender;
use tracing::{debug, error, info, warn};
use komodo::zk::Powers;
use crate::{
dragoon_network::{get_block_dir, get_powers},
peer_block_info::PeerBlockInfo,
};
const MAX_PBI_SIZE: usize = 1024; // max size in bytes for a peer block info
#[derive(Debug, Clone, Copy, FromRepr)]
#[repr(u8)]
enum ExchangeCode {
AcceptBlockSend,
RejectBlockSend,
BlockIsCorrect,
BlockIsIncorrect,
}
// -------------------- SENDER -------------------- //
/// Build the information regarding the block to be sent, includes the block hash, file hash, and size of the block
async fn build_peer_block_info(
peer_id: PeerId,
block_hash: String,
file_hash: String,
file_dir: PathBuf,
) -> Result<PeerBlockInfo> {
let block_dir = get_block_dir(&file_dir, file_hash.clone());
let block_path: PathBuf = [block_dir, PathBuf::from(block_hash.clone())]
.iter()
.collect();
let block_file = File::open(block_path).await?;
let block_size = block_file.metadata().await?.len();
Ok(PeerBlockInfo {
peer_id_base_58: peer_id.to_base58(),
file_hash,
block_hashes: vec![block_hash],
block_sizes: Some(vec![block_size as usize]),
})
}
/// Send the peer block info to the other end of the stream
async fn send_peer_block_info(
stream: &mut Stream,
own_peer_id: PeerId,
block_hash: String,
file_hash: String,
file_dir: PathBuf,
) -> Result<()> {
let peer_block_info =
build_peer_block_info(own_peer_id, block_hash, file_hash, file_dir).await?;
let ser_peer_block_info = serde_json::to_vec(&peer_block_info)?;
let size_of_pbi = ser_peer_block_info.len();
stream.write_all(&usize::to_be_bytes(size_of_pbi)).await?;
stream.write_all(&ser_peer_block_info).await?;
Ok(())
}
/// Send the block to the other end of the stream
async fn send_block(
stream: &mut Stream,
block_hash: String,
file_hash: String,
file_dir: PathBuf,
) -> Result<()> {
let block_dir = get_block_dir(&file_dir, file_hash.clone());
let block_path: PathBuf = [block_dir, PathBuf::from(block_hash.clone())]
.iter()
.collect();
let ser_block = fs::read(block_path).await?;
stream.write_all(&ser_block).await?;
Ok(())
}
/// Main function for the sender side, will attempt to send the block, can fail if the other end refuses to get the block.
/// This is a oneshot try, meaning there is no logic behind to try to find another peer to get the block.
pub(crate) async fn handle_send_block_exchange_sender_side(
mut stream: Stream, //TODO give a &mut stream instead so the caller can close the stream on all errors
own_peer_id: PeerId,
block_hash: String,
file_hash: String,
file_dir: PathBuf,
) -> Result<bool> {
send_peer_block_info(
&mut stream,
own_peer_id,
block_hash.clone(),
file_hash.clone(),
file_dir.clone(),
)
.await?;
let mut ser_answer = [0u8; 1];
stream.read_exact(&mut ser_answer).await?;
if let Some(answer) = ExchangeCode::from_repr(ser_answer[0]) {
match answer {
ExchangeCode::AcceptBlockSend => {}
ExchangeCode::RejectBlockSend => {
stream.close().await?;
return Ok(false);
}
a => {
let err_string = format!("Unexpected ExchangeCode variant for answer {:?}", a);
warn!(err_string);
stream.close().await?;
return Err(format_err!(err_string));
}
}
} else {
let err_string = format!(
"Unknown ExchangeCode variant discriminant for answer {}",
ser_answer[0]
);
warn!(err_string);
stream.close().await?;
return Err(format_err!(err_string));
}
// block got accepted, we send it
send_block(&mut stream, block_hash, file_hash, file_dir).await?;
let mut ser_block_status = [0u8; 1];
stream.read_exact(&mut ser_block_status).await?;
stream.close().await?;
debug!("ser block status: {:?}", ser_block_status);
if let Some(block_status) = ExchangeCode::from_repr(ser_block_status[0]) {
match block_status {
ExchangeCode::BlockIsCorrect => Ok(true),
ExchangeCode::BlockIsIncorrect => Ok(false),
a => {
let err_string = format!("Unexpected ExchangeCode variant for block status{:?}", a);
warn!(err_string);
stream.close().await?;
Err(format_err!(err_string))
}
}
} else {
let err_string = format!(
"Unknown ExchangeCode variant discriminant for block status {}",
ser_answer[0]
);
warn!(err_string);
stream.close().await?;
Err(format_err!(err_string))
}
}
// -------------------- RECEIVER -------------------- //
/// Choose whether or not to accept the send request.
/// Remove from the total available storage when choosing to accept the block, returning the choice to accept or reject the block and the size by which the total storage space was changed.
/// Returning the change of storage space allows to revert the change later on if we end up rejecting the block for other reasons.
async fn choose_response_to_send_request(
peer_block_info: &PeerBlockInfo,
current_available_storage: Arc<AtomicUsize>,
) -> (ExchangeCode, usize) {
if let Some(block_size_vec) = peer_block_info.block_sizes.as_ref() {
if let Some(size) = block_size_vec.first() {
let available_storage = current_available_storage.load(Ordering::Relaxed);
if &available_storage > size {
// send the new available storage space since we decided to accept the block
current_available_storage.store(available_storage - size, Ordering::Relaxed);
info!("New available storage space: {}", available_storage - size);
(ExchangeCode::AcceptBlockSend, *size)
} else {
(ExchangeCode::RejectBlockSend, 0)
}
} else {
warn!("No size was provided for the block to be received by a send request");
(ExchangeCode::RejectBlockSend, 0)
}
} else {
warn!("No size was provided for the block to be received by a send request");
(ExchangeCode::RejectBlockSend, 0)
}
}
/// Send back the response to the send request
async fn respond_to_send_request(stream: &mut Stream, answer: ExchangeCode) -> Result<()> {
let ser_answer = [answer as u8];
stream.write_all(&ser_answer).await?;
Ok(())
}
/// Send back the block status to tell the sender if the block they sent was valid and stored, or invalid and thus not stored
async fn send_block_status(stream: &mut Stream, block_status: ExchangeCode) -> Result<()> {
let ser_status = [block_status as u8];
stream.write_all(&ser_status).await?;
Ok(())
}
/// Handles receiving the block in itself and deserializing it
async fn receive_block<F, G>(
stream: &mut Stream,
peer_block_info: &PeerBlockInfo,
) -> Result<(Vec<u8>, Block<F, G>)>
where
F: PrimeField,
G: CurveGroup<ScalarField = F>,
{
let PeerBlockInfo { block_sizes, .. } = peer_block_info;
if let Some(vec_size) = block_sizes {
if let Some(size) = vec_size.first() {
let mut ser_block = vec![0u8; *size];
stream.read_exact(&mut ser_block[..]).await?;
let block = Block::deserialize_with_mode(&ser_block[..], Compress::Yes, Validate::Yes)?;
Ok((ser_block, block))
} else {
Err(format_err!("A size vector was provided to read the block that was sent, but the vector was empty"))
}
} else {
Err(format_err!(
"No size vector was provided to read the block that was sent"
))
}
}
/// Handles the entire transaction for the receiver side of the block send
pub(super) async fn handle_send_block_exchange_recv_side<F, G, P>(
mut stream: Stream,
powers_path: PathBuf,
file_dir: PathBuf,
current_available_storage: Arc<AtomicUsize>,
write_to_file_sender: Sender<(PathBuf, usize, String, String, String)>,
) -> Result<()>
where
F: PrimeField,
G: CurveGroup<ScalarField = F>,
P: DenseUVPolynomial<F>,
for<'a, 'b> &'a P: Div<&'b P, Output = P>,
{
// receive the size of the peer block info
let mut ser_peer_block_info_size = [0u8; size_of::<usize>()];
stream.read_exact(&mut ser_peer_block_info_size).await?;
let peer_block_info_size = usize::from_be_bytes(ser_peer_block_info_size);
if peer_block_info_size > MAX_PBI_SIZE {
stream.close().await?;
return Err(format_err!(
"The peer block info's size of {} was bigger than the maximum peer block size of {}",
peer_block_info_size,
MAX_PBI_SIZE,
));
}
// receive the peer block info
let mut ser_peer_block_info = vec![0u8; peer_block_info_size];
stream.read_exact(&mut ser_peer_block_info[..]).await?;
let peer_block_info: PeerBlockInfo = serde_json::de::from_slice(&ser_peer_block_info)?;
let (answer, size_change) =
choose_response_to_send_request(&peer_block_info, current_available_storage.clone()).await;
match send_block_recv_wrapper::<F, G, P>(
&mut stream,
answer,
powers_path,
&file_dir,
peer_block_info,
)
.await
{
Ok((file_hash, block_hash, peer_id_base_58)) => {
match write_to_file_sender
.send((
file_dir,
size_change,
file_hash,
block_hash,
peer_id_base_58,
))
.await
{
Ok(_) => {}
Err(_) => {
stream.close().await.map_err(|e| -> anyhow::Error {format_err!("Got tow errors: couldn't call to write to the list of send block file and {:?}", e)})?;
return Err(format_err!(
"The call to write to the list of send block file failed"
));
}
}
} //TODO change the available size in the send block file and add information about the block by sending the information through a sender
Err(e) => {
current_available_storage.fetch_add(size_change, Ordering::Relaxed);
stream.close().await?;
return Err(e);
}
}
Ok(())
}
/// A wrapper after the part where we choose to accept or reject the block.
/// This is used to catch the errors before they are returned and reverting the change to the available storage (so we free the space that we previously said we would use)
async fn send_block_recv_wrapper<F, G, P>(
stream: &mut Stream,
answer: ExchangeCode,
powers_path: PathBuf,
file_dir: &PathBuf,
peer_block_info: PeerBlockInfo,
) -> Result<(String, String, String)>
where
F: PrimeField,
G: CurveGroup<ScalarField = F>,
P: DenseUVPolynomial<F>,
for<'a, 'b> &'a P: Div<&'b P, Output = P>,
{
respond_to_send_request(stream, answer).await?;
match answer {
ExchangeCode::AcceptBlockSend => {}
ExchangeCode::RejectBlockSend => {
stream.close().await?;
return Ok(Default::default());
}
a => {
let err_msg = format!(
"Wrong enum variant provided by `choose_response_to_send_request`: {:?}",
a
);
error!(err_msg);
return Err(format_err!(err_msg));
}
}
// receive the block
let (ser_block, block) = receive_block::<F, G>(stream, &peer_block_info).await?;
let PeerBlockInfo {
peer_id_base_58,
file_hash,
block_hashes,
..
} = peer_block_info;
let block_hash = if let Some(block_hash) = block_hashes.first() {
block_hash
} else {
let err_msg = format!(
"No block hash has been provided for the block to be sent by {}",
peer_id_base_58
);
error!(err_msg);
return Err(format_err!(err_msg));
};
// at this point we have the block deserialized, but we don't know if it's correct or not
let powers: Powers<F, G> = get_powers(powers_path).await?;
// check that the block is correct
if verify(&block, &powers)? {
let block_dir = get_block_dir(file_dir, file_hash.clone());
tokio::fs::create_dir_all(&block_dir).await?;
let block_path: PathBuf = [block_dir, PathBuf::from(block_hash.clone())]
.iter()
.collect();
debug!("Will write the received block to {:?}", block_path);
tokio::fs::write(block_path, ser_block).await?;
send_block_status(stream, ExchangeCode::BlockIsCorrect).await?;
} else {
send_block_status(stream, ExchangeCode::BlockIsIncorrect).await?;
}
stream.close().await?;
Ok((file_hash, block_hash.clone(), peer_id_base_58))
}
......@@ -30,7 +30,7 @@ macro_rules! impl_Convert {
}
// impl convert for all the types that are already Serialize and thus just return themselves
impl_Convert!(for u64, String, bool, &str, Vec<Multiaddr>, Vec<u8>, (String, String), PeerBlockInfo, BlockResponse, PathBuf);
impl_Convert!(for u64, String, bool, &str, Vec<Multiaddr>, Vec<u8>, (String, String), PeerBlockInfo, BlockResponse, PathBuf, usize);
impl ConvertSer for PeerId {
fn convert_ser(&self) -> impl Serialize {
......
......@@ -7,7 +7,9 @@ use help_func/exit_func.nu exit_on_error
# define variables
let test_file: path = "tests/assets/dragoon_32/dragoon_32x32.png"
let res_filename = "reconstructed_file.png"
let dragoonfly_root = "~/.share/dragoonfly"
print $"Removing ($dragoonfly_root) if it was there from a previous test\n"
try { rm -r "~/.share/dragoonfly" }
# create the nodes
......
for test in (ls tests/*.nu | where type == file) {
nu ($test | get name)
}
\ No newline at end of file
......@@ -8,6 +8,10 @@ use help_func/exit_func.nu exit_on_error
let output_dir: path = "/tmp/dragoon_test/received_blocks"
let test_file: path = "tests/assets/dragoon_32/dragoon_32x32.png"
let res_filename = "reconstructed_file.png"
let dragoonfly_root = "~/.share/dragoonfly"
print $"Removing ($dragoonfly_root) if it was there from a previous test\n"
try { rm -r "~/.share/dragoonfly" }
const connection_list = [
[1],
......
use ../cli/swarm.nu *
use ../cli/app.nu
use ../cli/network_builder.nu *
use std assert
use help_func/exit_func.nu exit_on_error
# define variables
let test_file: path = "tests/assets/dragoon_32/dragoon_32x32.png"
let res_filename = "reconstructed_file.png"
let dragoonfly_root = "~/.share/dragoonfly"
print $"Removing ($dragoonfly_root) if it was there from a previous test\n"
try { rm -r "~/.share/dragoonfly" }
# create the nodes
const connection_list = [
[1],
[0],
]
# create the network topology
let SWARM = build_network --no-shell $connection_list
try {
# Encode the file into blocks, put them to a directory named blocks next to the file
print "Node 0 encodes the file into blocks"
let encode_res = app encode-file --node $SWARM.0.ip_port $test_file
let block_hashes = $encode_res.1 | from json #! This is a string not a list, need to convert
let file_hash = $encode_res.0
print $"The file got cut into blocks, block hashes are"
print $block_hashes
print $"The hash of the file is: ($file_hash)"
print "\nGetting the peer id of the nodes"
let peer_id_0 = app node-info --node $SWARM.0.ip_port
let peer_id_1 = app node-info --node $SWARM.1.ip_port
let number_of_fails = 5
print "\nNode 0 sends the block 0 to node 1, 3 times at once"
let result_list = 0..$number_of_fails | par-each { |index|
print $"Send index ($index)..."
try {
let res = app send-block-to --node $SWARM.0.ip_port $peer_id_1 $file_hash ($block_hashes | get 0)
if not $res {
error make {msg: $"Failed sending block ($index): ($block_hashes | get $index)"}
}
return 0
} catch { |e|
assert equal $e.msg $"unexpected error from Dragoon: Got error from command `send-block-to`: The send block to PeerId\(\"($peer_id_1)\"\) for block ($block_hashes.0) is already being handled \(500\)"
return 1
}
}
print "Node 0 finished sending blocks to node 1\n"
print "Killing the swarm"
swarm kill --no-shell
print $"Checking we failed to send the block exactly ($number_of_fails) times as it was already being sent"
assert equal ($result_list | math sum) $number_of_fails
print "\nChecking the block 0 that was sent against the original"
let original_block_path = $"($dragoonfly_root)/($peer_id_0)/files/($file_hash)/blocks/($block_hashes | get 0)"
let sent_block_path = $"($dragoonfly_root)/($peer_id_1)/files/($file_hash)/blocks/($block_hashes | get 0)"
let difference = {diff ($original_block_path | path expand) ($sent_block_path |path expand)} | exit_on_error | get stdout
if $difference != "" {
print $"test failed, there was a difference between the blocks on index 0: ($block_hashes | get 0)"
error make {msg: "Exit to catch"}
}
print $"(ansi light_green_reverse) TEST SUCCESSFUL !(ansi reset)\n"
} catch { |e|
print "Killing the swarm"
swarm kill --no-shell
error make --unspanned {msg: $"Test failed: ($e.msg)"}
}
use ../cli/swarm.nu *
use ../cli/app.nu
use ../cli/network_builder.nu *
use std assert
use help_func/exit_func.nu exit_on_error
# define variables
let test_file: path = "tests/assets/dragoon_32/dragoon_32x32.png"
let res_filename = "reconstructed_file.png"
let dragoonfly_root = "~/.share/dragoonfly"
print $"Removing ($dragoonfly_root) if it was there from a previous test\n"
try { rm -r "~/.share/dragoonfly" }
# create the nodes
const connection_list = [
[1],
[0],
]
# create the network topology
let SWARM = build_network --no-shell $connection_list
try {
# Encode the file into blocks, put them to a directory named blocks next to the file
print "Node 0 encodes the file into blocks"
let encode_res = app encode-file --node $SWARM.0.ip_port $test_file
let block_hashes = $encode_res.1 | from json #! This is a string not a list, need to convert
let file_hash = $encode_res.0
print $"The file got cut into blocks, block hashes are"
print $block_hashes
print $"The hash of the file is: ($file_hash)"
print "\nGetting the peer id of the nodes"
let peer_id_0 = app node-info --node $SWARM.0.ip_port
let peer_id_1 = app node-info --node $SWARM.1.ip_port
print "\nGetting available storage size"
let original_storage_space = app get-available-storage --node $SWARM.1.ip_port
print "\nNode 0 sends the blocks to node 1"
0..(($block_hashes | length) - 1) | par-each { |index|
print $"Sending block ($index)..."
let res = app send-block-to --node $SWARM.0.ip_port $peer_id_1 $file_hash ($block_hashes | get $index)
if not $res {
error make {msg: $"Failed sending block ($index): ($block_hashes | get $index)"}
}
}
print "Node 0 finished sending blocks to node 1\n"
print "Checking that the reported available size makes sense with respect to the size of the blocks that were sent"
let new_storage_space = app get-available-storage --node $SWARM.1.ip_port
let size_of_all_sent_blocks = ls $"($dragoonfly_root)/($peer_id_0)/files/($file_hash)/blocks/" | get size | math sum | into int
assert equal ($original_storage_space - $new_storage_space) $size_of_all_sent_blocks
print "Killing the swarm"
swarm kill --no-shell
print "\nChecking all the blocks that were sent against the original"
for index in 0..(($block_hashes | length) - 1) {
let original_block_path = $"($dragoonfly_root)/($peer_id_0)/files/($file_hash)/blocks/($block_hashes | get $index)"
let sent_block_path = $"($dragoonfly_root)/($peer_id_1)/files/($file_hash)/blocks/($block_hashes | get $index)"
let difference = {diff ($original_block_path | path expand) ($sent_block_path |path expand)} | exit_on_error | get stdout
if $difference != "" {
print $"test failed, there was a difference between the blocks on index ($index): ($block_hashes | get $index)"
error make {msg: "Exit to catch"}
}
}
print $"(ansi light_green_reverse) TEST SUCCESSFUL !(ansi reset)\n"
} catch { |e|
print "Killing the swarm"
swarm kill --no-shell
error make --unspanned {msg: $"Test failed: ($e.msg)"}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment