Skip to content
Snippets Groups Projects

Refactor commands in handle_command for less redundancy in error handling

Merged DISSOUBRAY Nathan requested to merge handle_command_error_refactor into master
1 file
+ 152
153
Compare changes
  • Side-by-side
  • Inline
+ 152
153
@@ -372,51 +372,11 @@ where
debug!("[cmd] {:?}", cmd);
match cmd {
DragoonCommand::Listen { multiaddr, sender } => {
if let Ok(addr) = multiaddr.parse() {
match self.swarm.listen_on(addr) {
Ok(listener_id) => {
info!("Listening on {}", multiaddr);
let id = regex::Regex::new(r"ListenerId\((\d+)\)")
.unwrap()
.captures(&format!("{:?}", listener_id))
.unwrap()
.get(1)
.unwrap()
.as_str()
.parse::<u64>()
.unwrap();
self.listeners.insert(id, listener_id);
debug!("sending id {}", id);
if sender.send(Ok(id)).is_err() {
error!("Could not send listener ID");
}
}
Err(te) => {
let err_msg = match te {
TransportError::Other(e) => e.to_string(),
TransportError::MultiaddrNotSupported(addr) => {
format!("multiaddr {} not supported", addr)
}
};
error!("{}", err_msg);
debug!("sending error {}", err_msg);
if sender.send(Err(Box::new(BadListener(err_msg)))).is_err() {
error!("Could not send result");
}
}
}
} else {
error!("Could not parse addr {}", multiaddr);
let err = BadListener(format!("Could not parse {}", multiaddr));
debug!("sending error {}", err);
if sender.send(Err(Box::new(err))).is_err() {
error!("Could not send result");
}
if sender
.send(self.listen(multiaddr).await.map_err(|err| err.into()))
.is_err()
{
error!("Could not send the result of the listen operation")
}
}
DragoonCommand::GetListeners { sender } => {
@@ -447,21 +407,15 @@ where
listener_id,
sender,
} => {
if let Some(listener) = self.listeners.get(&listener_id) {
let res = self.swarm.remove_listener(*listener);
debug!("sending result {}", res);
if sender.send(Ok(res)).is_err() {
error!("Could not send remove listener");
}
} else {
error!("Listener {} not found", listener_id);
let err = BadListener(format!("Listener {} not found", listener_id));
debug!("sending error {}", err);
if sender.send(Err(Box::new(err))).is_err() {
error!("Could not send result");
}
if sender
.send(
self.remove_listener(listener_id)
.await
.map_err(|err| err.into()),
)
.is_err()
{
error!("Could not send the result of the remove_listener operation")
}
}
DragoonCommand::GetConnectedPeers { sender } => {
@@ -478,61 +432,19 @@ where
}
}
DragoonCommand::Dial { multiaddr, sender } => {
if let Ok(addr) = multiaddr.parse::<Multiaddr>() {
match self.swarm.dial(addr) {
Ok(()) => {
debug!("sending empty response");
if sender.send(Ok(())).is_err() {
error!("Could not send result");
}
}
Err(de) => {
error!("error: {}", de);
let err = DialError(de.to_string());
debug!("sending error {}", err);
if sender.send(Err(Box::new(err))).is_err() {
error!("Could not send result");
}
}
}
} else {
error!("Could not parse addr {}", multiaddr);
let err = BadListener(format!("Could not parse {}", multiaddr));
debug!("sending error {}", err);
if sender.send(Err(Box::new(err))).is_err() {
error!("Could not send result");
}
if sender
.send(self.dial(multiaddr).await.map_err(|err| err.into()))
.is_err()
{
error!("Could not send the result of the dial operation")
}
}
DragoonCommand::AddPeer { multiaddr, sender } => {
if let Ok(addr) = multiaddr.parse::<Multiaddr>() {
if let Some(Protocol::P2p(hash)) = addr.iter().last() {
self.swarm.behaviour_mut().kademlia.add_address(&hash, addr);
debug!("sending empty response");
if sender.send(Ok(())).is_err() {
error!("Could not send result");
}
} else {
error!("could no isolate P2P component in {}", addr);
let err =
BadListener(format!("could no isolate P2P component in {}", addr));
debug!("sending error {}", err);
if sender.send(Err(Box::new(err))).is_err() {
error!("Could not send result");
}
}
} else {
error!("Cannot parse addr {}", multiaddr);
let err = BadListener(format!("Could not parse {}", multiaddr));
debug!("sending error {}", err);
if sender.send(Err(Box::new(err))).is_err() {
error!("Could not send result");
}
if sender
.send(self.add_peer(multiaddr).await.map_err(|err| err.into()))
.is_err()
{
error!("Could not send the result of the add_peer operation")
}
}
DragoonCommand::StartProvide { key, sender } => {
@@ -562,21 +474,11 @@ where
self.pending_get_providers.insert(query_id, sender);
}
DragoonCommand::Bootstrap { sender } => {
match self.swarm.behaviour_mut().kademlia.bootstrap() {
Ok(_) => {
if sender.send(Ok(())).is_err() {
error!("Could not send result");
}
}
Err(nkp) => {
error!("error: {}", nkp);
let err = BootstrapError(nkp.to_string());
debug!("sending error {}", err);
if sender.send(Err(Box::new(err))).is_err() {
error!("Could not send result");
}
}
if sender
.send(self.bootstrap().await.map_err(|err| err.into()))
.is_err()
{
error!("Could not send the result of the bootstrap operation")
}
}
DragoonCommand::GetBlockFrom {
@@ -607,32 +509,16 @@ where
block_path,
peerid,
sender,
} =>
// TODO go search the block on the disk
{
match PeerId::from_str(peerid.as_str()) {
Ok(peer) => {
if self
.swarm
.behaviour_mut()
.dragoon
.send_data_to_peer(block_hash, block_path, peer)
{
if sender.send(Ok(())).is_err() {
error!("could not send result");
}
} else {
let err = PeerNotFound;
if sender.send(Err(Box::new(err))).is_err() {
error!("Cannot send result");
}
}
}
Err(err) => {
if sender.send(Err(Box::new(err))).is_err() {
error!("Cannot send result");
}
}
} => {
if sender
.send(
self.dragoon_send(block_hash, block_path, peerid)
.await
.map_err(|err| err.into()),
)
.is_err()
{
error!("Could not send the result of the dragoon_send operation")
}
}
DragoonCommand::DecodeBlocks {
@@ -710,6 +596,119 @@ where
// Ok(id)
// }
async fn listen(&mut self, multiaddr: String) -> Result<u64> {
if let Ok(addr) = multiaddr.parse() {
match self.swarm.listen_on(addr) {
Ok(listener_id) => {
info!("Listening on {}", multiaddr);
let id = regex::Regex::new(r"ListenerId\((\d+)\)")
.unwrap()
.captures(&format!("{:?}", listener_id))
.unwrap()
.get(1)
.unwrap()
.as_str()
.parse::<u64>()
.unwrap();
self.listeners.insert(id, listener_id);
Ok(id)
}
Err(te) => {
let err_msg = match te {
TransportError::Other(e) => e.to_string(),
TransportError::MultiaddrNotSupported(addr) => {
format!("multiaddr {} not supported", addr)
}
};
error!(err_msg);
Err(BadListener(err_msg).into())
}
}
} else {
let err_msg = format!("Could not parse {}", multiaddr);
error!(err_msg);
Err(BadListener(err_msg).into())
}
}
async fn remove_listener(&mut self, listener_id: u64) -> Result<bool> {
if let Some(listener) = self.listeners.get(&listener_id) {
Ok(self.swarm.remove_listener(*listener))
} else {
let err_msg = format!("Listener {} not found", listener_id);
error!(err_msg);
Err(BadListener(err_msg).into())
}
}
async fn dial(&mut self, multiaddr: String) -> Result<()> {
if let Ok(addr) = multiaddr.parse::<Multiaddr>() {
match self.swarm.dial(addr) {
Ok(()) => Ok(()),
Err(de) => {
let err_msg = format!("Could not dial {0}: {1}", multiaddr, de);
error!(err_msg);
Err(DialError(err_msg).into())
}
}
} else {
let err_msg = format!("Could not parse {}", multiaddr);
error!(err_msg);
Err(BadListener(err_msg).into())
}
}
async fn add_peer(&mut self, multiaddr: String) -> Result<()> {
if let Ok(addr) = multiaddr.parse::<Multiaddr>() {
if let Some(Protocol::P2p(hash)) = addr.iter().last() {
self.swarm.behaviour_mut().kademlia.add_address(&hash, addr);
Ok(())
} else {
let err_msg = format!("could no isolate P2P component in {}", addr);
error!(err_msg);
Err(BadListener(err_msg).into())
}
} else {
let err_msg = format!("Could not parse {}", multiaddr);
error!(err_msg);
Err(BadListener(err_msg).into())
}
}
async fn bootstrap(&mut self) -> Result<()> {
match self.swarm.behaviour_mut().kademlia.bootstrap() {
Ok(_) => Ok(()),
Err(nkp) => {
error!("Bootstrap: no known peers");
Err(BootstrapError(nkp.to_string()).into())
}
}
}
async fn dragoon_send(
&mut self,
block_hash: String,
block_path: String,
peerid: String,
) -> Result<()> {
// TODO go search the block on the disk
let peer = PeerId::from_str(&peerid)?;
if self
.swarm
.behaviour_mut()
.dragoon
.send_data_to_peer(block_hash, block_path, peer)
{
Ok(())
} else {
error!("Dragoon send: peer {} not found", peer);
Err(PeerNotFound.into())
}
}
async fn decode_blocks(
block_dir: String,
block_hashes: Vec<String>,
Loading