use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::fs::File;
use std::io::Write;
use std::iter;

use blsttc::{SecretKeySet, SignatureShare};
use log::info;
use rand::prelude::{IteratorRandom, StdRng};
use rand::Rng;
use sn_membership::{
    consensus::VoteResponse, Ballot, Decision, Error, Generation, Membership, NodeId, Reconfig,
    Result, SignedVote, Vote,
};

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Packet {
    pub source: NodeId,
    pub dest: NodeId,
    pub vote: SignedVote<Reconfig<u8>>,
}

#[derive(Default, Debug)]
pub struct Net {
    pub procs: Vec<Membership<u8>>,
    pub reconfigs_by_gen: BTreeMap<Generation, BTreeSet<Reconfig<u8>>>,
    pub members_at_gen: BTreeMap<Generation, BTreeSet<u8>>,
    pub packets: BTreeMap<NodeId, VecDeque<Packet>>,
    pub delivered_packets: Vec<Packet>,
    pub decisions: BTreeMap<Generation, Decision<Reconfig<u8>>>,
}

impl Net {
    pub fn with_procs(threshold: u8, n: u8, mut rng: &mut StdRng) -> Self {
        let elders_sk = SecretKeySet::random(threshold as usize, &mut rng);
        let procs = Vec::from_iter((1u8..(n + 1)).into_iter().map(|i| {
            Membership::from(
                (i, elders_sk.secret_key_share(i as u64)),
                elders_sk.public_keys(),
                n as usize,
            )
        }));
        Self {
            procs,
            ..Default::default()
        }
    }

    pub fn proc(&self, id: NodeId) -> Option<&Membership<u8>> {
        self.procs.iter().find(|p| p.id() == id)
    }

    pub fn proc_mut(&mut self, id: NodeId) -> Option<&mut Membership<u8>> {
        self.procs.iter_mut().find(|p| p.id() == id)
    }

    /// Pick a random public key from the set of procs
    pub fn pick_id(&self, rng: &mut StdRng) -> NodeId {
        self.procs.iter().choose(rng).unwrap().id()
    }

    /// Generate a randomized ballot
    pub fn gen_ballot(
        &self,
        recursion: u8,
        faulty: &BTreeSet<NodeId>,
        rng: &mut StdRng,
    ) -> Ballot<Reconfig<u8>> {
        match rng.gen() || recursion == 0 {
            true => Ballot::Propose(match rng.gen() {
                true => Reconfig::Join(rng.gen()),
                false => Reconfig::Leave(rng.gen()),
            }),
            false => {
                let n_votes = rng.gen::<usize>() % self.procs.len().pow(2);
                let votes = BTreeSet::from_iter(
                    iter::repeat_with(|| self.gen_faulty_vote(recursion - 1, faulty, rng))
                        .take(n_votes),
                );
                match rng.gen() {
                    true => Ballot::Merge(votes),
                    false => {
                        let n_proposals = rng.gen::<usize>() % (self.procs.len() + 1);
                        let proposals: BTreeMap<Reconfig<u8>, (NodeId, SignatureShare)> =
                            std::iter::repeat_with(|| {
                                let prop = match rng.gen() {
                                    true => Reconfig::Join(rng.gen()),
                                    false => Reconfig::Leave(rng.gen()),
                                };
                                let sig = self
                                    .procs
                                    .iter()
                                    .choose(rng)
                                    .unwrap()
                                    .consensus
                                    .sign(&prop)?;
                                Ok((prop, (self.pick_id(rng), sig)))
                            })
                            .take(n_proposals)
                            .collect::<Result<_>>()
                            .unwrap();
                        Ballot::SuperMajority { votes, proposals }
                    }
                }
            }
        }
    }

    /// Generate a random faulty vote
    pub fn gen_faulty_vote(
        &self,
        recursion: u8,
        faulty_nodes: &BTreeSet<NodeId>,
        rng: &mut StdRng,
    ) -> SignedVote<Reconfig<u8>> {
        let faulty_node = faulty_nodes
            .iter()
            .choose(rng)
            .and_then(|pk| self.proc(*pk))
            .unwrap();

        let vote = Vote {
            gen: rng.gen::<u64>() % 7,
            ballot: self.gen_ballot(recursion, faulty_nodes, rng),
            faults: Default::default(),
        };

        SignedVote {
            voter: self.pick_id(rng),
            ..faulty_node.sign_vote(vote).unwrap()
        }
    }

    /// Generate a faulty random packet
    pub fn gen_faulty_packet(
        &self,
        recursion: u8,
        faulty: &BTreeSet<NodeId>,
        rng: &mut StdRng,
    ) -> Packet {
        Packet {
            source: *faulty.iter().choose(rng).unwrap(),
            dest: self.pick_id(rng),
            vote: self.gen_faulty_vote(recursion, faulty, rng),
        }
    }

    pub fn deliver_packet_from_source(&mut self, source: NodeId) -> Result<()> {
        let packet = match self.packets.get_mut(&source).map(|ps| ps.pop_front()) {
            Some(Some(p)) => p,
            _ => return Ok(()), // nothing to do
        };
        self.purge_empty_queues();

        // println!("delivering {:?}->{:?} {:?}", packet.source, dest, packet);

        self.delivered_packets.push(packet.clone());

        let source_elders = self.proc(source).unwrap().consensus.elders.clone();
        let dest_proc = match self.procs.iter_mut().find(|p| p.id() == packet.dest) {
            Some(proc) => proc,
            None => {
                return Ok(());
            }
        };

        info!("{} handling: {:?}", packet.dest, packet.vote);
        let packet_gen = packet.vote.vote.gen;
        let resp = dest_proc.handle_signed_vote(packet.vote);
        info!("resp from {}: {:?}", packet.dest, resp);
        match resp {
            Ok(VoteResponse::Broadcast(vote)) => {
                self.broadcast(packet.dest, vote);
            }
            Ok(VoteResponse::WaitingForMoreVotes) => {}
            Err(Error::NotElder) => {
                assert_ne!(dest_proc.consensus.elders, source_elders);
            }
            Err(Error::BadGeneration { requested_gen, gen }) => {
                assert!(requested_gen == 0 || requested_gen > gen + 1);
                assert_eq!(dest_proc.gen, gen);
            }
            Err(err) => return Err(err),
        }

        match self.proc(packet.dest) {
            Some(proc) => {
                let network_decision = self.decisions.get(&packet_gen);

                let proc_decision = proc
                    .consensus_at_gen(packet_gen)
                    .ok()
                    .and_then(|c| c.decision.clone());

                match (network_decision, proc_decision) {
                    (Some(net_d), Some(proc_d)) => {
                        assert_eq!(net_d.proposals, proc_d.proposals);
                    }
                    (None, Some(proc_d)) => {
                        self.decisions.insert(packet_gen, proc_d);
                    }
                    (None | Some(_), None) => (),
                }

                Ok(())
            }
            _ => Ok(()),
        }
    }

    pub fn enqueue_packets(&mut self, packets: impl IntoIterator<Item = Packet>) {
        for packet in packets {
            self.packets
                .entry(packet.source)
                .or_default()
                .push_back(packet);
        }
    }

    pub fn broadcast_packets(
        &self,
        source: NodeId,
        vote: &SignedVote<Reconfig<u8>>,
    ) -> Vec<Packet> {
        Vec::from_iter(self.procs.iter().map(Membership::id).map(|dest| Packet {
            source,
            dest,
            vote: vote.clone(),
        }))
    }

    pub fn broadcast(&mut self, source: NodeId, vote: SignedVote<Reconfig<u8>>) {
        let packets = self.broadcast_packets(source, &vote);
        self.enqueue_packets(packets);
    }

    pub fn drain_queued_packets(&mut self) -> Result<()> {
        while let Some(source) = self.packets.keys().next().cloned() {
            self.deliver_packet_from_source(source)?;
            self.purge_empty_queues();
        }
        Ok(())
    }

    pub fn purge_empty_queues(&mut self) {
        self.packets = core::mem::take(&mut self.packets)
            .into_iter()
            .filter(|(_, queue)| !queue.is_empty())
            .collect();
    }

    pub fn enqueue_anti_entropy(&mut self, i: usize, j: usize) {
        let dest_generation = self.procs[i].gen;
        let dest = self.procs[i].id();
        let source = self.procs[j].id();

        self.enqueue_packets(
            self.procs[j]
                .anti_entropy(dest_generation)
                .unwrap()
                .into_iter()
                .map(|vote| Packet { source, dest, vote }),
        );
    }

    pub fn generate_msc(&self, name: &str) -> Result<()> {
        // See: http://www.mcternan.me.uk/mscgen/
        let mut msc = String::from(
            "
msc {\n
  hscale = \"2\";\n
",
        );
        let procs = self
            .procs
            .iter()
            .map(|p| p.id())
            .collect::<BTreeSet<_>>() // sort by actor id
            .into_iter()
            .map(|id| format!("{}", id))
            .collect::<Vec<_>>()
            .join(",");
        msc.push_str(&procs);
        msc.push_str(";\n");
        for packet in self.delivered_packets.iter() {
            msc.push_str(&format!(
                "{:?} -> {:?} [ label=\"{:?}\"];\n",
                packet.source, packet.dest, packet.vote
            ));
        }
        msc.push_str("}\n");

        let mut msc_file = File::create(name)?;
        msc_file.write_all(msc.as_bytes())?;
        Ok(())
    }
}