package node import ( "harmony-benchmark/blockchain" "harmony-benchmark/consensus" "harmony-benchmark/common" "harmony-benchmark/p2p" "log" "net" "os" "time" "bytes" "encoding/gob" ) // A node represents a program (machine) participating in the network type Node struct { consensus *consensus.Consensus BlockChannel chan blockchain.Block pendingTransactions []blockchain.Transaction } // Start a server and process the request by a handler. func (node *Node) StartServer(port string) { listenOnPort(port, node.NodeHandler) } func listenOnPort(port string, handler func(net.Conn)) { listen, err := net.Listen("tcp4", ":"+port) defer listen.Close() if err != nil { log.Fatalf("Socket listen port %s failed,%s", port, err) os.Exit(1) } for { conn, err := listen.Accept() if err != nil { log.Printf("Error listening on port: %s. Exiting.", port) log.Fatalln(err) continue } go handler(conn) } } // Handler of the leader node. func (node *Node) NodeHandler(conn net.Conn) { defer conn.Close() // Read p2p message payload content, err := p2p.ReadMessageContent(conn) consensus := node.consensus if err != nil { if consensus.IsLeader { log.Printf("[Leader] Read p2p data failed:%s", err) } else { log.Printf("[Slave] Read p2p data failed:%s", err) } return } msgCategory, err := common.GetMessageCategory(content) if err != nil { if consensus.IsLeader { log.Printf("[Leader] Read node type failed:%s", err) } else { log.Printf("[Slave] Read node type failed:%s", err) } return } msgType, err := common.GetMessageType(content) if err != nil { if consensus.IsLeader { log.Printf("[Leader] Read action type failed:%s", err) } else { log.Printf("[Slave] Read action type failed:%s", err) } return } msgPayload, err := common.GetMessagePayload(content) if err != nil { if consensus.IsLeader { log.Printf("[Leader] Read message payload failed:%s", err) } else { log.Printf("[Slave] Read message payload failed:%s", err) } return } switch msgCategory { case common.COMMITTEE: actionType := common.CommitteeMessageType(msgType) switch actionType { case common.CONSENSUS: if consensus.IsLeader { consensus.ProcessMessageLeader(msgPayload) } else { consensus.ProcessMessageValidator(msgPayload) } } case common.NODE: actionType := common.NodeMessageType(msgType) switch actionType { case common.TRANSACTION: node.transactionMessageHandler(msgPayload) case common.CONTROL: controlType := msgPayload[0] if ControlMessageType(controlType) == STOP { log.Println("Stopping Node") os.Exit(0) } } } } func (node *Node) transactionMessageHandler(msgPayload []byte) { txMessageType := TransactionMessageType(msgPayload[0]) switch txMessageType { case SEND: txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the SEND messge type txList := new([]blockchain.Transaction) err := txDecoder.Decode(&txList) if err != nil { log.Println("Failed deserializing transaction list") } node.pendingTransactions = append(node.pendingTransactions, *txList...) case REQUEST: reader := bytes.NewBuffer(msgPayload[1:]) var txIds map[[32]byte]bool txId := make([]byte, 32) // 32 byte hash Id for { _, err := reader.Read(txId) if err != nil { break } txIds[getFixedByteTxId(txId)] = true } var txToReturn []blockchain.Transaction for _, tx := range node.pendingTransactions { if txIds[getFixedByteTxId(tx.ID)] { txToReturn = append(txToReturn, tx) } } // TODO: return the transaction list to requester } } // Copy the txId byte slice over to 32 byte array so the map can key on it func getFixedByteTxId(txId []byte) [32]byte { var id [32]byte for i := range id { id[i] = txId[i] } return id } func (node *Node) WaitForConsensusReady(readySignal chan int) { for { // keep waiting for consensus ready <-readySignal // create a new block newBlock := new(blockchain.Block) for { if len(node.pendingTransactions) >= 10 { log.Println("Creating new block") // TODO (Minh): package actual transactions // For now, just take out 10 transactions var txList []*blockchain.Transaction for _, tx := range node.pendingTransactions[0:10] { txList = append(txList, &tx) } node.pendingTransactions = node.pendingTransactions[10:] newBlock = blockchain.NewBlock(txList, []byte{}) break } time.Sleep(1 * time.Second) // Periodically check whether we have enough transactions to package into block. } node.BlockChannel <- *newBlock } } // Create a new Node func NewNode(consensus *consensus.Consensus) Node { node := Node{} node.consensus = consensus node.BlockChannel = make(chan blockchain.Block) return node }