Skip to content
Snippets Groups Projects

Write `get-file` and commands to get the list of blocks from another node

1 file
+ 11
2
Compare changes
  • Side-by-side
  • Inline
+ 11
2
@@ -798,6 +798,7 @@ where
P: DenseUVPolynomial<F>,
for<'a, 'b> &'a P: Div<&'b P, Output = P>,
{
info!("Get file: getting providers of file {}", file_hash);
let (sender, receiver) = oneshot::channel();
self.get_providers(file_hash.clone(), sender).await;
let provider_list = match receiver.await {
@@ -826,7 +827,7 @@ where
let (info_sender, mut info_receiver) = mpsc::unbounded_channel();
let (block_sender, mut block_receiver) = mpsc::unbounded_channel();
//create the list locally since the global storage uses oneshot channel for sender, not mpsc
debug!("Got provider list for file {} : {:?} | Now requesting the list of blocks from those peers", file_hash, provider_list);
for peer_id in provider_list {
let request_id = self.swarm.behaviour_mut().request_info.send_request(
&peer_id,
@@ -843,6 +844,7 @@ where
let powers = Self::get_powers(powers_path).await?;
let mut number_of_blocks_written: u32 = 0;
let block_dir = self.get_block_dir(file_hash.clone());
debug!("Finished requesting block list for file {}", file_hash);
'download_first_k_blocks: loop {
tokio::select! {
Some(response) = info_receiver.recv() => {
@@ -852,10 +854,12 @@ where
anyhow::anyhow!("Could not retrieve peer block block info: {}", e)
})?;
let PeerBlockInfo { peer_id_base_58, file_hash, block_hashes } = response;
debug!("Got block list from {} for file {} : {:?}", peer_id_base_58, file_hash, block_hashes);
let blocks_to_request: Vec<String> = block_hashes
.into_iter()
.filter(|x| !already_request_block.contains(x)) // do not request the block if it's already requested
.collect();
debug!("Requesting the following blocks from {} for file {} : {:?}", peer_id_base_58, file_hash, blocks_to_request);
let bytes = bs58::decode(peer_id_base_58).into_vec().unwrap();
let peer_id = PeerId::from_bytes(&bytes).unwrap();
for block_hash in blocks_to_request {
@@ -874,12 +878,17 @@ where
//TODO change this unwrap
let block_response = block_response.unwrap();
let block: Block<F,G> = Block::deserialize_with_mode(&block_response.block_data[..], Compress::Yes, Validate::Yes)?;
debug!("Got a block for the file {} : {} ", file_hash, block_response.block_hash);
let number_of_blocks_to_reconstruct_file = block.shard.k;
debug!("Number of blocks to reconstruct file {} : {}", file_hash, number_of_blocks_to_reconstruct_file);
if verify::<F,G,P>(&block, &powers)? {
debug!("Block {} for file {} was verified successfully; Now dumping to disk", block_response.block_hash, file_hash);
let _ = fs::dump(&block, &block_dir, None, Compress::Yes)?;
number_of_blocks_written += 1;
block_hashes_on_disk.push(block_response.block_hash);
if number_of_blocks_written >= number_of_blocks_to_reconstruct_file {
debug!("Received exactly {} blocks, pausing block download and trying to reconstruct the file {}", number_of_blocks_to_reconstruct_file, file_hash);
//TODO properly stop downloads ? drop/close receiver ?
break 'download_first_k_blocks;
}
}
@@ -892,7 +901,7 @@ where
}
}
Self::decode_blocks(block_dir.clone(), &block_hashes_on_disk, output_filename.clone()).await;
let _ = Self::decode_blocks(block_dir.clone(), &block_hashes_on_disk, output_filename.clone()).await;
//TODO if it fails, keep requesting block info, try to check which matrix is invertible taking k-1 blocks already on disk and one more that isn't
//TODO if it fails, do the same with k-2, etc...
Loading