Skip to content
Snippets Groups Projects

Write `get-file` and commands to get the list of blocks from another node

2 files
+ 83
1
Compare changes
  • Side-by-side
  • Inline

Files

+ 82
1
@@ -776,7 +776,88 @@ where
/// - If it can reconstruct the file, it will close the requests for block info and blocks to all the peers it contacted, construct the file, write it to disk and send the path where the file was written to the user
/// - If it can't reconstruct the file yet, given the block combination it got from block info, it will try to find the combination of blocks that will allow for file reconstruction with a minimal block download (ie using the max number of already downloaded blocks it can)
/// - If even after all that it still can't find a combination of blocks that works, it will exit with an error
async fn get_file(&mut self, file_hash: String) -> Result<String> {
async fn get_file<P>(&mut self, file_hash: String) -> Result<String>
where
P: DenseUVPolynomial<F>,
for<'a, 'b> &'a P: Div<&'b P, Output = P>,
{
let (sender, receiver) = oneshot::channel();
self.get_providers(file_hash.clone(), sender).await;
let provider_list = match receiver.await {
Err(e) => {
let err_string = format!(
"The list of providers request for get_file {0} was canceled: {1:?}",
file_hash.clone(),
e
);
error!(err_string);
return Err(anyhow::anyhow!(err_string));
}
Ok(res) => match res {
Err(e) => {
let err_string = format!(
"Could not get the list of providers for file {0}: {1:?}",
file_hash.clone(),
e
);
error!(err_string);
return Err(anyhow::anyhow!(err_string));
}
Ok(peer_ids) => peer_ids,
},
};
let (info_sender, mut info_receiver) = mpsc::unbounded_channel();
let (block_sender, mut block_receiver) = mpsc::unbounded_channel();
//create the list locally since the global storage uses oneshot channel for sender, not mpsc
for peer_id in provider_list {
let request_id = self.swarm.behaviour_mut().request_info.send_request(
&peer_id,
PeerBlockInfoRequest {
file_hash: file_hash.clone(),
},
);
self.pending_request_block_info.insert(request_id, Sender::SenderMPSC(info_sender.clone()));
}
let mut already_request_block = vec![];
loop {
tokio::select! {
Some(response) = info_receiver.recv() => {
//TODO handle errors to keep going even if some peer fail
let response = response.map_err(|e| -> anyhow::Error {
anyhow::anyhow!("Could not retrieve peer block block info: {}", e)
})?;
let PeerBlockInfo { peer_id_base_58, file_hash, block_hashes } = response;
let blocks_to_request: Vec<String> = block_hashes
.into_iter()
.filter(|x| !already_request_block.contains(x)) // do not request the block if it's already requested
.collect();
let bytes = bs58::decode(peer_id_base_58).into_vec().unwrap();
let peer_id = PeerId::from_bytes(&bytes).unwrap();
for block_hash in blocks_to_request {
let request_id = self.swarm.behaviour_mut().request_response.send_request(
&peer_id,
BlockRequest {
file_hash: file_hash.clone(),
block_hash: block_hash.clone(),
},
);
self.pending_request_block.insert(request_id, Sender::SenderMPSC(block_sender.clone()));
already_request_block.push(block_hash);
}
},
Some(block_response) = block_receiver.recv() => {
//TODO check the blocks, save them, potentially make the file
todo!()
}
}
//TODO add a break condition or change the loop to a while with a boolean check
}
//TODO await the response and start requesting the blocks
//TODO when you can reconstruct the file
//TODO return the path where the file was stored
todo!()
}
Loading