1、背景
分布式共识算法是指多个参与者针对某件事达成一致,常用于分布式系统中保证多节点间数据一致性,主流的分布式共识算法包括:paxos、raft、zab等。其中paxos是最基础的算法,本文接下来会介绍该算法的流程,以及会用简单的示例说明使用场景,完整示例代码见:https://github.com/jemuelmiao/paxos。
2、算法流程
2.1 概念
提议
- proposal:提议
- proposal id:提议编号
- proposal value:提议值
角色
- proposer:提议者
- acceptor:决策者
- learner:学习者
2.2 时序图
Paxos流程分为三个阶段:prepare、accept、learn。Prepare阶段由proposer向所有acceptor提交提议编号,acceptor根据收到的提议编号作出承诺或拒绝。Accept阶段由proposer向所有acceptor提交提议值,acceptor根据收到的提议作出接收或拒绝。Learn阶段由proposer通知learner决策结果。时序图如下:
2.3 执行流程
在proposer prepare流程,proposer生成提议编号,该编号需要唯一、递增、可比较,一般采用时间戳和服务编号生成。当收到大多数acceptor的承诺时,如果任一承诺中包含提议值,选择提议编号最大的提议值,否则使用当前proposer的提议值;当未收到大多数acceptor的承诺时,proposer重新生成提议。流程如下:
在accept promise流程,acceptor根据自身存储的提议编号和请求的提议编号进行对比,当请求的提议编号大于存储的提议编号时,将存储的提议编号更新为请求的提议编号,并作出两个承诺:1、不再接收proposal id小于等于当前请求的prepare请求;2、不再接收proposal id小于当前请求的propose请求。另外如果acceptor之前已经接收过提议编号、提议值,会将该提议编号、提议值一同返回proposer。流程如下:
在proposer propose流程,proposer会使用prepare阶段确定的proposal id、proposal value请求所有acceptor。当收到大多数acceptor回复且没有拒绝时,该提议通过,否则proposer重新进入prepare阶段。流程如下:
在acceptor accept流程,acceptor根据自身存储的提议编号和请求的提议编号进行对比,当请求的提议编号大于等于存储的提议编号时,将已接收提议编号、提议值更新为请求的提议编号、提议值。流程如下:
2.4 示例
2.4.1 proposer示例
package paxos import ( "encoding/json" "errors" "fmt" "sync/atomic" "time" ) type Proposer struct { ServerId string //服务id } func NewProposer(serverId string) *Proposer { return &Proposer{ServerId: serverId} } // 保证不重复 func (proposer *Proposer) GetProposalId() string { return fmt.Sprintf("%v_%v", time.Now().UnixNano(), proposer.ServerId) } // proposalId, proposalValue, error func (proposer *Proposer) Prepare(proposalValue string) (string, string, error) { proposalId := proposer.GetProposalId() header := map[string]string{ "Content-Type": "application/json; charset=utf-8", } data := map[string]interface{}{ "proposal_id": proposalId, } type PrepareRsp struct { Code int `json:"code"` Msg string `json:"msg"` Data struct { AcceptedProposalId string `json:"accepted_proposal_id"` //已接收的提案号 AcceptedValue string `json:"accepted_value"` //已接收的提案值 } `json:"data"` } var receive [][2]string var reject int32 for _, host := range DefaultHosts { go func(h string) { url := fmt.Sprintf("http://%v/paxos/promise", h) status, body, err := HttpGet(url, header, data) if err != nil || status >= 400 { fmt.Printf("request prepare fail, host:%v, err:%v, status:%v\n", h, err, status) return } var rsp PrepareRsp if err := json.Unmarshal(body, &rsp); err != nil { fmt.Printf("unmarshal fail, body:%v, err:%v\n", string(body), err) return } //acceptor已有更大提案号,拒绝 if rsp.Code != 0 { fmt.Printf("prepare fail, code:%v, msg:%v\n", rsp.Code, rsp.Msg) atomic.AddInt32(&reject, 1) return } receive = append(receive, [2]string{rsp.Data.AcceptedProposalId, rsp.Data.AcceptedValue}) }(host) } ticker := time.NewTicker(50 * time.Millisecond) timeout := time.NewTimer(5 * time.Second) for { select { case <-ticker.C: if len(receive) > len(DefaultHosts)/2 { //收到超半数回复promise var maxAcceptedProposalId string var maxAcceptedValue string for _, reply := range receive { if reply[0] != "" && reply[0] > maxAcceptedProposalId { maxAcceptedProposalId = reply[0] maxAcceptedValue = reply[1] } } //acceptor没有接收提案号,使用自己的值 if maxAcceptedProposalId == "" { return proposalId, proposalValue, nil } //acceptor有提案值,取最大提案号的值 return proposalId, maxAcceptedValue, nil } else if atomic.LoadInt32(&reject) > int32(len(DefaultHosts)/2) { //收到超半数拒绝 return "", "", errors.New("receive more than half of acceptor reject") } case <-timeout.C: //未收到超半数回复promise return "", "", errors.New("not receive more than half of acceptor promise") } } } func (proposer *Proposer) Propose(proposalId, proposalValue string) error { header := map[string]string{ "Content-Type": "application/json; charset=utf-8", } data := map[string]interface{}{ "proposal_id": proposalId, "proposal_value": proposalValue, } type ProposeRsp struct { Code int `json:"code"` Msg string `json:"msg"` } var replyCount int32 var reject int32 for _, host := range DefaultHosts { go func(h string) { if atomic.LoadInt32(&reject) > 0 { return } url := fmt.Sprintf("http://%v/paxos/accept", h) status, body, err := HttpGet(url, header, data) if err != nil || status >= 400 { fmt.Printf("request propose fail, host:%v, err:%v, status:%v\n", h, err, status) return } var rsp ProposeRsp if err := json.Unmarshal(body, &rsp); err != nil { fmt.Printf("unmarshal fail, body:%v, err:%v\n", string(body), err) return } //acceptor已有更大提案号,拒绝 if rsp.Code != 0 { fmt.Printf("propose fail, code:%v, msg:%v\n", rsp.Code, rsp.Msg) atomic.AddInt32(&reject, 1) return } atomic.AddInt32(&replyCount, 1) }(host) } //有拒绝 if atomic.LoadInt32(&reject) > 0 { return errors.New("some acceptor reject") } ticker := time.NewTicker(50 * time.Millisecond) timeout := time.NewTimer(5 * time.Second) for { select { case <-ticker.C: if atomic.LoadInt32(&replyCount) > int32(len(DefaultHosts)/2) { //收到超半数回复,通过 //重置所有acceptor状态为新一轮 for _, host := range DefaultHosts { go func(h string) { url := fmt.Sprintf("http://%v/paxos/reset", h) HttpGet(url, header, nil) }(host) } return nil } case <-timeout.C: //未收到超半数回复 return errors.New("not receive more than half of acceptor response") } } }
2.4.2 acceptor示例
package paxos import "errors" type Acceptor struct { state string //状态,updating、active MinProposalId string //能够同意的最小提案号 AcceptedProposalId string //已接收的提案号 AcceptedValue string //已接收的提案值 } func NewAcceptor() *Acceptor { return &Acceptor{ state: "active", } } // acceptedProposalId, acceptedValue, error func (acceptor *Acceptor) Promise(proposalId string) (string, string, error) { if proposalId > acceptor.MinProposalId { //这里根据状态判断是返回acceptor.AcceptedProposalId还是"" acceptor.MinProposalId = proposalId if acceptor.state == "updating" { return acceptor.AcceptedProposalId, acceptor.AcceptedValue, nil } else { //未接收提案 acceptor.state = "updating" return "", "", nil } } else { return "", "", errors.New("reject") } } func (acceptor *Acceptor) Accept(proposalId, proposalValue string) error { if proposalId >= acceptor.MinProposalId { acceptor.AcceptedProposalId = proposalId acceptor.AcceptedValue = proposalValue return nil } else { return errors.New("reject") } } func (acceptor *Acceptor) GetValue() string { return acceptor.AcceptedValue } func (acceptor *Acceptor) ResetState() { acceptor.state = "active" }
3、Paxos应用
3.1 选举leader
package paxos import ( "encoding/json" "fmt" "net/http" "time" ) func HandleElect(w http.ResponseWriter, r *http.Request) { var proposalId string var proposalValue string var err error tk := time.NewTicker(50 * time.Millisecond) to := time.NewTimer(5 * time.Second) for { select { case <-tk.C: proposalId, proposalValue, err = DefaultProposer.Prepare(DefaultProposer.ServerId) if err != nil { break } err = DefaultProposer.Propose(proposalId, proposalValue) if err != nil { break } WriteRsp(w, 0, "elect success", proposalValue) return case <-to.C: WriteRsp(w, -1, "elect time out", nil) return } } } func HandleLeader(w http.ResponseWriter, r *http.Request) { header := map[string]string{ "Content-Type": "application/json; charset=utf-8", } type GetValueRsp struct { Code int `json:"code"` Msg string `json:"msg"` Data string `json:"data"` } var values []string for _, host := range DefaultHosts { go func(h string) { url := fmt.Sprintf("http://%v/paxos/value", h) status, body, err := HttpGet(url, header, nil) if err != nil || status >= 400 { fmt.Printf("request propose fail, host:%v, err:%v, status:%v\n", h, err, status) return } var rsp GetValueRsp if err := json.Unmarshal(body, &rsp); err != nil { fmt.Printf("unmarshal fail, body:%v, err:%v\n", string(body), err) return } values = append(values, rsp.Data) }(host) } ticker := time.NewTicker(50 * time.Millisecond) timeout := time.NewTimer(5 * time.Second) for { select { case <-ticker.C: if len(values) > len(DefaultHosts)/2 { //收到超半数回复 countMap := make(map[string]int) for _, value := range values { if _, ok := countMap[value]; !ok { countMap[value] = 0 } countMap[value] += 1 } for value, count := range countMap { if count > len(DefaultHosts)/2 { WriteRsp(w, 0, "", value) return } } WriteRsp(w, -1, "has no leader", nil) } case <-timeout.C: //未收到超半数回复 WriteRsp(w, -1, "get leader time out", nil) return } } }
3.2 分布式锁
package paxos import ( "encoding/json" "fmt" "net/http" "time" ) func HandleEnsureLock(w http.ResponseWriter, r *http.Request) { key := r.URL.Query().Get("key") header := map[string]string{ "Content-Type": "application/json; charset=utf-8", } type GetValueRsp struct { Code int `json:"code"` Msg string `json:"msg"` Data string `json:"data"` } var values []string for _, host := range DefaultHosts { go func(h string) { url := fmt.Sprintf("http://%v/paxos/value", h) status, body, err := HttpGet(url, header, nil) if err != nil || status >= 400 { fmt.Printf("request propose fail, host:%v, err:%v, status:%v\n", h, err, status) return } var rsp GetValueRsp if err := json.Unmarshal(body, &rsp); err != nil { fmt.Printf("unmarshal fail, body:%v, err:%v\n", string(body), err) return } values = append(values, rsp.Data) }(host) } ticker := time.NewTicker(50 * time.Millisecond) timeout := time.NewTimer(5 * time.Second) for { select { case <-ticker.C: if len(values) > len(DefaultHosts)/2 { //收到超半数回复 //判断是否有锁 countMap := make(map[string]int) for _, value := range values { if _, ok := countMap[value]; !ok { countMap[value] = 0 } countMap[value] += 1 } for value, count := range countMap { if count > len(DefaultHosts)/2 && value != "" { WriteRsp(w, -1, "has already lock", value) return } } //加锁 var proposalId string var proposalValue string var err error tk := time.NewTicker(50 * time.Millisecond) to := time.NewTimer(5 * time.Second) for { select { case <-tk.C: proposalId, proposalValue, err = DefaultProposer.Prepare(key) if err != nil { break } err = DefaultProposer.Propose(proposalId, proposalValue) if err != nil { break } if proposalValue == key { WriteRsp(w, 0, "ensure lock success", proposalValue) } else { WriteRsp(w, -1, "ensure lock fail", proposalValue) } return case <-to.C: WriteRsp(w, -1, "ensure lock time out", nil) return } } } case <-timeout.C: //未收到超半数回复 WriteRsp(w, -1, "ensure lock time out", nil) return } } } func HandleReleaseLock(w http.ResponseWriter, r *http.Request) { key := r.URL.Query().Get("key") header := map[string]string{ "Content-Type": "application/json; charset=utf-8", } type GetValueRsp struct { Code int `json:"code"` Msg string `json:"msg"` Data string `json:"data"` } var values []string for _, host := range DefaultHosts { go func(h string) { url := fmt.Sprintf("http://%v/paxos/value", h) status, body, err := HttpGet(url, header, nil) if err != nil || status >= 400 { fmt.Printf("request propose fail, host:%v, err:%v, status:%v\n", h, err, status) return } var rsp GetValueRsp if err := json.Unmarshal(body, &rsp); err != nil { fmt.Printf("unmarshal fail, body:%v, err:%v\n", string(body), err) return } values = append(values, rsp.Data) }(host) } ticker := time.NewTicker(50 * time.Millisecond) timeout := time.NewTimer(5 * time.Second) for { select { case <-ticker.C: if len(values) > len(DefaultHosts)/2 { //收到超半数回复 //判断是否有锁 countMap := make(map[string]int) for _, value := range values { if _, ok := countMap[value]; !ok { countMap[value] = 0 } countMap[value] += 1 } for value, count := range countMap { if count > len(DefaultHosts)/2 { if value == key { //释放锁 var proposalId string var proposalValue string var err error tk := time.NewTicker(50 * time.Millisecond) to := time.NewTimer(5 * time.Second) for { select { case <-tk.C: proposalId, proposalValue, err = DefaultProposer.Prepare("") if err != nil { break } err = DefaultProposer.Propose(proposalId, proposalValue) if err != nil { break } if proposalValue == "" { WriteRsp(w, 0, "release lock success", proposalValue) } else { WriteRsp(w, -1, "release lock fail", proposalValue) } //WriteRsp(w, 0, "release lock success", proposalValue) return case <-to.C: WriteRsp(w, -1, "release lock time out", nil) return } } } else { WriteRsp(w, -1, "can not release other lock", value) return } } } } if len(values) == len(DefaultHosts) { WriteRsp(w, -1, "has no lock", nil) return } case <-timeout.C: //未收到超半数回复 WriteRsp(w, -1, "ensure lock time out", nil) return } } }
3.3 分布式存储
package paxos import ( "encoding/json" "fmt" "net/http" "time" ) func HandlePutStorage(w http.ResponseWriter, r *http.Request) { value := r.URL.Query().Get("value") var proposalId string var proposalValue string var err error tk := time.NewTicker(50 * time.Millisecond) to := time.NewTimer(5 * time.Second) for { select { case <-tk.C: proposalId, proposalValue, err = DefaultProposer.Prepare(value) if err != nil { break } err = DefaultProposer.Propose(proposalId, proposalValue) if err != nil { break } if proposalValue == value { WriteRsp(w, 0, "put storage success", proposalValue) } else { WriteRsp(w, -1, "put storage fail", proposalValue) } return case <-to.C: WriteRsp(w, -1, "put storage time out", nil) return } } } func HandleGetStorage(w http.ResponseWriter, r *http.Request) { header := map[string]string{ "Content-Type": "application/json; charset=utf-8", } type GetValueRsp struct { Code int `json:"code"` Msg string `json:"msg"` Data string `json:"data"` } var values []string for _, host := range DefaultHosts { go func(h string) { url := fmt.Sprintf("http://%v/paxos/value", h) status, body, err := HttpGet(url, header, nil) if err != nil || status >= 400 { fmt.Printf("request propose fail, host:%v, err:%v, status:%v\n", h, err, status) return } var rsp GetValueRsp if err := json.Unmarshal(body, &rsp); err != nil { fmt.Printf("unmarshal fail, body:%v, err:%v\n", string(body), err) return } values = append(values, rsp.Data) }(host) } ticker := time.NewTicker(50 * time.Millisecond) timeout := time.NewTimer(5 * time.Second) for { select { case <-ticker.C: if len(values) > len(DefaultHosts)/2 { //收到超半数回复 //判断是否有值 countMap := make(map[string]int) for _, value := range values { if _, ok := countMap[value]; !ok { countMap[value] = 0 } countMap[value] += 1 } for value, count := range countMap { if count > len(DefaultHosts)/2 { WriteRsp(w, 0, "", value) return } } } if len(values) == len(DefaultHosts) { WriteRsp(w, -1, "has no value", nil) return } case <-timeout.C: //未收到超半数回复 WriteRsp(w, -1, "get storage time out", nil) return } } }