Skip to content
Snippets Groups Projects

Write `get-file` and commands to get the list of blocks from another node

1 file
+ 35
35
Compare changes
  • Side-by-side
  • Inline
+ 35
35
@@ -48,7 +48,7 @@ use rs_merkle::{algorithms::Sha256, Hasher};
use ark_ec::CurveGroup;
use ark_ff::PrimeField;
use ark_poly::DenseUVPolynomial;
use ark_serialize::{CanonicalSerialize, CanonicalDeserialize, Compress, Validate};
use ark_serialize::{CanonicalDeserialize, CanonicalSerialize, Compress, Validate};
use ark_std::ops::Div;
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -72,8 +72,7 @@ pub(crate) struct PeerBlockInfoResponse(PeerBlockInfo);
pub(crate) async fn create_swarm(
id_keys: Keypair,
) -> Result<Swarm<DragoonBehaviour>, Box<dyn Error>>
{
) -> Result<Swarm<DragoonBehaviour>, Box<dyn Error>> {
let peer_id = id_keys.public().to_peer_id();
let mut swarm = libp2p::SwarmBuilder::with_existing_identity(id_keys)
@@ -119,16 +118,14 @@ pub(crate) async fn create_swarm(
}
#[derive(NetworkBehaviour)]
pub(crate) struct DragoonBehaviour
{
pub(crate) struct DragoonBehaviour {
request_block: request_response::cbor::Behaviour<BlockRequest, BlockResponse>,
request_info: request_response::cbor::Behaviour<PeerBlockInfoRequest, PeerBlockInfoResponse>,
identify: identify::Behaviour,
kademlia: kad::Behaviour<kad::store::MemoryStore>,
}
pub(crate) struct DragoonNetwork
{
pub(crate) struct DragoonNetwork {
swarm: Swarm<DragoonBehaviour>,
command_receiver: mpsc::UnboundedReceiver<DragoonCommand>,
listeners: HashMap<u64, ListenerId>,
@@ -139,8 +136,7 @@ pub(crate) struct DragoonNetwork
pending_request_block: HashMap<OutboundRequestId, Sender<BlockResponse>>,
}
impl DragoonNetwork
{
impl DragoonNetwork {
pub fn new(
swarm: Swarm<DragoonBehaviour>,
command_receiver: mpsc::UnboundedReceiver<DragoonCommand>,
@@ -175,7 +171,7 @@ impl DragoonNetwork
Ok(base_path)
}
pub async fn run<F,G,P>(mut self)
pub async fn run<F, G, P>(mut self)
where
F: PrimeField,
G: CurveGroup<ScalarField = F>,
@@ -259,11 +255,11 @@ impl DragoonNetwork
}
}
async fn handle_event<F,G>(&mut self, event: SwarmEvent<DragoonBehaviourEvent>)
async fn handle_event<F, G>(&mut self, event: SwarmEvent<DragoonBehaviourEvent>)
where
F: PrimeField,
G: CurveGroup<ScalarField = F>,
{
{
debug!("[event] {:?}", event);
match event {
SwarmEvent::Behaviour(DragoonBehaviourEvent::Kademlia(
@@ -315,7 +311,7 @@ impl DragoonNetwork
Message::Request {
request, channel, ..
} => {
if let Err(e) = self.message_request::<F,G>(request, channel).await {
if let Err(e) = self.message_request::<F, G>(request, channel).await {
error!("{}", e)
}
}
@@ -396,27 +392,31 @@ impl DragoonNetwork
.iter()
.collect()
}
fn read_block_from_disk<F,G>(block_hash: String, block_dir: PathBuf) -> Result<Vec<u8>>
fn read_block_from_disk<F, G>(block_hash: String, block_dir: PathBuf) -> Result<Vec<u8>>
where
F: PrimeField,
G: CurveGroup<ScalarField = F>,
{
let block =
match fs::read_blocks::<F, G>(&[block_hash], &block_dir, Compress::Yes, Validate::Yes) {
Ok(vec) => vec[0].clone().1,
Err(e) => return Err(e), // ? would it be better to return the error
};
let block = match fs::read_blocks::<F, G>(
&[block_hash],
&block_dir,
Compress::Yes,
Validate::Yes,
) {
Ok(vec) => vec[0].clone().1,
Err(e) => return Err(e), // ? would it be better to return the error
};
let mut buf = vec![0; block.serialized_size(Compress::Yes)];
block.serialize_with_mode(&mut buf[..], Compress::Yes)?;
Ok(buf)
}
async fn message_request<F,G>(
async fn message_request<F, G>(
&mut self,
request: BlockRequest,
channel: ResponseChannel<BlockResponse>,
) -> Result<()>
) -> Result<()>
where
F: PrimeField,
G: CurveGroup<ScalarField = F>,
@@ -431,7 +431,7 @@ impl DragoonNetwork
file_hash.clone(),
block_dir
);
let ser_block = Self::read_block_from_disk::<F,G>(block_hash.clone(), block_dir)?;
let ser_block = Self::read_block_from_disk::<F, G>(block_hash.clone(), block_dir)?;
debug!(
"Read block {0} for file {1}, got: {2:?}",
block_hash, file_hash, ser_block
@@ -474,7 +474,7 @@ impl DragoonNetwork
.map_err(|_| CouldNotSendInfoResponse(file_hash, channel_info).into())
}
async fn handle_command<F,G,P>(&mut self, cmd: DragoonCommand)
async fn handle_command<F, G, P>(&mut self, cmd: DragoonCommand)
where
F: PrimeField,
G: CurveGroup<ScalarField = F>,
@@ -552,7 +552,7 @@ impl DragoonNetwork
info!("Starting to get the file {}", file_hash);
if sender
.send(
self.get_file::<F,G,P>(file_hash.clone(), output_filename, powers_path)
self.get_file::<F, G, P>(file_hash.clone(), output_filename, powers_path)
.await
.map_err(|err| err.into()),
)
@@ -660,7 +660,7 @@ impl DragoonNetwork
} => {
if sender
.send(
Self::decode_blocks::<F,G>(
Self::decode_blocks::<F, G>(
PathBuf::from(block_dir),
&block_hashes,
output_filename,
@@ -684,7 +684,7 @@ impl DragoonNetwork
} => {
if sender
.send(
self.encode_file::<F,G,P>(
self.encode_file::<F, G, P>(
file_path,
replace_blocks,
encoding_method,
@@ -786,7 +786,7 @@ impl DragoonNetwork
/// - If it can reconstruct the file, it will close the requests for block info and blocks to all the peers it contacted, construct the file, write it to disk and send the path where the file was written to the user
/// - If it can't reconstruct the file yet, given the block combination it got from block info, it will try to find the combination of blocks that will allow for file reconstruction with a minimal block download (ie using the max number of already downloaded blocks it can)
/// - If even after all that it still can't find a combination of blocks that works, it will exit with an error
async fn get_file<F,G,P>(
async fn get_file<F, G, P>(
&mut self,
file_hash: String,
output_filename: String,
@@ -887,7 +887,7 @@ impl DragoonNetwork
}
}
let _ = Self::decode_blocks::<F,G>(
let _ = Self::decode_blocks::<F, G>(
block_dir.clone(),
&block_hashes_on_disk,
output_filename.clone(),
@@ -1005,15 +1005,15 @@ impl DragoonNetwork
Ok(block_names)
}
async fn decode_blocks<F,G>(
async fn decode_blocks<F, G>(
block_dir: PathBuf,
block_hashes: &[String],
output_filename: String,
) -> Result<()>
) -> Result<()>
where
F: PrimeField,
G: CurveGroup<ScalarField = F>,
{
{
let blocks =
fs::read_blocks::<F, G>(block_hashes, &block_dir, Compress::Yes, Validate::Yes)?;
let shards: Vec<Shard<F>> = blocks.into_iter().map(|b| b.1.shard).collect();
@@ -1033,7 +1033,7 @@ impl DragoonNetwork
Ok(())
}
async fn encode_file<F,G,P>(
async fn encode_file<F, G, P>(
&mut self,
file_path: String,
replace_blocks: bool,
@@ -1091,11 +1091,11 @@ impl DragoonNetwork
Ok((file_hash, formatted_output))
}
async fn get_powers<F,G>(powers_path: String) -> Result<Powers<F, G>>
async fn get_powers<F, G>(powers_path: String) -> Result<Powers<F, G>>
where
F: PrimeField,
G: CurveGroup<ScalarField = F>,
{
{
info!("Getting the powers from {:?}", powers_path);
let serialized = tokio::fs::read(powers_path).await?;
Ok(Powers::<F, G>::deserialize_with_mode(
Loading