1、背景
分布式共识算法是指多个参与者针对某件事达成一致,常用于分布式系统中保证多节点间数据一致性,主流的分布式共识算法包括:paxos、raft、zab等。其中paxos是最基础的算法,本文接下来会介绍该算法的流程,以及会用简单的示例说明使用场景,完整示例代码见:https://github.com/jemuelmiao/paxos。
2、算法流程
2.1 概念
提议
- proposal:提议
- proposal id:提议编号
- proposal value:提议值
角色
- proposer:提议者
- acceptor:决策者
- learner:学习者
2.2 时序图
Paxos流程分为三个阶段:prepare、accept、learn。Prepare阶段由proposer向所有acceptor提交提议编号,acceptor根据收到的提议编号作出承诺或拒绝。Accept阶段由proposer向所有acceptor提交提议值,acceptor根据收到的提议作出接收或拒绝。Learn阶段由proposer通知learner决策结果。时序图如下:
2.3 执行流程
在proposer prepare流程,proposer生成提议编号,该编号需要唯一、递增、可比较,一般采用时间戳和服务编号生成。当收到大多数acceptor的承诺时,如果任一承诺中包含提议值,选择提议编号最大的提议值,否则使用当前proposer的提议值;当未收到大多数acceptor的承诺时,proposer重新生成提议。流程如下:
在accept promise流程,acceptor根据自身存储的提议编号和请求的提议编号进行对比,当请求的提议编号大于存储的提议编号时,将存储的提议编号更新为请求的提议编号,并作出两个承诺:1、不再接收proposal id小于等于当前请求的prepare请求;2、不再接收proposal id小于当前请求的propose请求。另外如果acceptor之前已经接收过提议编号、提议值,会将该提议编号、提议值一同返回proposer。流程如下:
在proposer propose流程,proposer会使用prepare阶段确定的proposal id、proposal value请求所有acceptor。当收到大多数acceptor回复且没有拒绝时,该提议通过,否则proposer重新进入prepare阶段。流程如下:
在acceptor accept流程,acceptor根据自身存储的提议编号和请求的提议编号进行对比,当请求的提议编号大于等于存储的提议编号时,将已接收提议编号、提议值更新为请求的提议编号、提议值。流程如下:
2.4 示例
2.4.1 proposer示例
package paxos
import (
"encoding/json"
"errors"
"fmt"
"sync/atomic"
"time"
)
type Proposer struct {
ServerId string //服务id
}
func NewProposer(serverId string) *Proposer {
return &Proposer{ServerId: serverId}
}
// 保证不重复
func (proposer *Proposer) GetProposalId() string {
return fmt.Sprintf("%v_%v", time.Now().UnixNano(), proposer.ServerId)
}
// proposalId, proposalValue, error
func (proposer *Proposer) Prepare(proposalValue string) (string, string, error) {
proposalId := proposer.GetProposalId()
header := map[string]string{
"Content-Type": "application/json; charset=utf-8",
}
data := map[string]interface{}{
"proposal_id": proposalId,
}
type PrepareRsp struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data struct {
AcceptedProposalId string `json:"accepted_proposal_id"` //已接收的提案号
AcceptedValue string `json:"accepted_value"` //已接收的提案值
} `json:"data"`
}
var receive [][2]string
var reject int32
for _, host := range DefaultHosts {
go func(h string) {
url := fmt.Sprintf("http://%v/paxos/promise", h)
status, body, err := HttpGet(url, header, data)
if err != nil || status >= 400 {
fmt.Printf("request prepare fail, host:%v, err:%v, status:%v\n", h, err, status)
return
}
var rsp PrepareRsp
if err := json.Unmarshal(body, &rsp); err != nil {
fmt.Printf("unmarshal fail, body:%v, err:%v\n", string(body), err)
return
}
//acceptor已有更大提案号,拒绝
if rsp.Code != 0 {
fmt.Printf("prepare fail, code:%v, msg:%v\n", rsp.Code, rsp.Msg)
atomic.AddInt32(&reject, 1)
return
}
receive = append(receive, [2]string{rsp.Data.AcceptedProposalId, rsp.Data.AcceptedValue})
}(host)
}
ticker := time.NewTicker(50 * time.Millisecond)
timeout := time.NewTimer(5 * time.Second)
for {
select {
case <-ticker.C:
if len(receive) > len(DefaultHosts)/2 {
//收到超半数回复promise
var maxAcceptedProposalId string
var maxAcceptedValue string
for _, reply := range receive {
if reply[0] != "" && reply[0] > maxAcceptedProposalId {
maxAcceptedProposalId = reply[0]
maxAcceptedValue = reply[1]
}
}
//acceptor没有接收提案号,使用自己的值
if maxAcceptedProposalId == "" {
return proposalId, proposalValue, nil
}
//acceptor有提案值,取最大提案号的值
return proposalId, maxAcceptedValue, nil
} else if atomic.LoadInt32(&reject) > int32(len(DefaultHosts)/2) {
//收到超半数拒绝
return "", "", errors.New("receive more than half of acceptor reject")
}
case <-timeout.C:
//未收到超半数回复promise
return "", "", errors.New("not receive more than half of acceptor promise")
}
}
}
func (proposer *Proposer) Propose(proposalId, proposalValue string) error {
header := map[string]string{
"Content-Type": "application/json; charset=utf-8",
}
data := map[string]interface{}{
"proposal_id": proposalId,
"proposal_value": proposalValue,
}
type ProposeRsp struct {
Code int `json:"code"`
Msg string `json:"msg"`
}
var replyCount int32
var reject int32
for _, host := range DefaultHosts {
go func(h string) {
if atomic.LoadInt32(&reject) > 0 {
return
}
url := fmt.Sprintf("http://%v/paxos/accept", h)
status, body, err := HttpGet(url, header, data)
if err != nil || status >= 400 {
fmt.Printf("request propose fail, host:%v, err:%v, status:%v\n", h, err, status)
return
}
var rsp ProposeRsp
if err := json.Unmarshal(body, &rsp); err != nil {
fmt.Printf("unmarshal fail, body:%v, err:%v\n", string(body), err)
return
}
//acceptor已有更大提案号,拒绝
if rsp.Code != 0 {
fmt.Printf("propose fail, code:%v, msg:%v\n", rsp.Code, rsp.Msg)
atomic.AddInt32(&reject, 1)
return
}
atomic.AddInt32(&replyCount, 1)
}(host)
}
//有拒绝
if atomic.LoadInt32(&reject) > 0 {
return errors.New("some acceptor reject")
}
ticker := time.NewTicker(50 * time.Millisecond)
timeout := time.NewTimer(5 * time.Second)
for {
select {
case <-ticker.C:
if atomic.LoadInt32(&replyCount) > int32(len(DefaultHosts)/2) {
//收到超半数回复,通过
//重置所有acceptor状态为新一轮
for _, host := range DefaultHosts {
go func(h string) {
url := fmt.Sprintf("http://%v/paxos/reset", h)
HttpGet(url, header, nil)
}(host)
}
return nil
}
case <-timeout.C:
//未收到超半数回复
return errors.New("not receive more than half of acceptor response")
}
}
}
2.4.2 acceptor示例
package paxos
import "errors"
type Acceptor struct {
state string //状态,updating、active
MinProposalId string //能够同意的最小提案号
AcceptedProposalId string //已接收的提案号
AcceptedValue string //已接收的提案值
}
func NewAcceptor() *Acceptor {
return &Acceptor{
state: "active",
}
}
// acceptedProposalId, acceptedValue, error
func (acceptor *Acceptor) Promise(proposalId string) (string, string, error) {
if proposalId > acceptor.MinProposalId {
//这里根据状态判断是返回acceptor.AcceptedProposalId还是""
acceptor.MinProposalId = proposalId
if acceptor.state == "updating" {
return acceptor.AcceptedProposalId, acceptor.AcceptedValue, nil
} else {
//未接收提案
acceptor.state = "updating"
return "", "", nil
}
} else {
return "", "", errors.New("reject")
}
}
func (acceptor *Acceptor) Accept(proposalId, proposalValue string) error {
if proposalId >= acceptor.MinProposalId {
acceptor.AcceptedProposalId = proposalId
acceptor.AcceptedValue = proposalValue
return nil
} else {
return errors.New("reject")
}
}
func (acceptor *Acceptor) GetValue() string {
return acceptor.AcceptedValue
}
func (acceptor *Acceptor) ResetState() {
acceptor.state = "active"
}
3、Paxos应用
3.1 选举leader
package paxos
import (
"encoding/json"
"fmt"
"net/http"
"time"
)
func HandleElect(w http.ResponseWriter, r *http.Request) {
var proposalId string
var proposalValue string
var err error
tk := time.NewTicker(50 * time.Millisecond)
to := time.NewTimer(5 * time.Second)
for {
select {
case <-tk.C:
proposalId, proposalValue, err = DefaultProposer.Prepare(DefaultProposer.ServerId)
if err != nil {
break
}
err = DefaultProposer.Propose(proposalId, proposalValue)
if err != nil {
break
}
WriteRsp(w, 0, "elect success", proposalValue)
return
case <-to.C:
WriteRsp(w, -1, "elect time out", nil)
return
}
}
}
func HandleLeader(w http.ResponseWriter, r *http.Request) {
header := map[string]string{
"Content-Type": "application/json; charset=utf-8",
}
type GetValueRsp struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data string `json:"data"`
}
var values []string
for _, host := range DefaultHosts {
go func(h string) {
url := fmt.Sprintf("http://%v/paxos/value", h)
status, body, err := HttpGet(url, header, nil)
if err != nil || status >= 400 {
fmt.Printf("request propose fail, host:%v, err:%v, status:%v\n", h, err, status)
return
}
var rsp GetValueRsp
if err := json.Unmarshal(body, &rsp); err != nil {
fmt.Printf("unmarshal fail, body:%v, err:%v\n", string(body), err)
return
}
values = append(values, rsp.Data)
}(host)
}
ticker := time.NewTicker(50 * time.Millisecond)
timeout := time.NewTimer(5 * time.Second)
for {
select {
case <-ticker.C:
if len(values) > len(DefaultHosts)/2 {
//收到超半数回复
countMap := make(map[string]int)
for _, value := range values {
if _, ok := countMap[value]; !ok {
countMap[value] = 0
}
countMap[value] += 1
}
for value, count := range countMap {
if count > len(DefaultHosts)/2 {
WriteRsp(w, 0, "", value)
return
}
}
WriteRsp(w, -1, "has no leader", nil)
}
case <-timeout.C:
//未收到超半数回复
WriteRsp(w, -1, "get leader time out", nil)
return
}
}
}
3.2 分布式锁
package paxos
import (
"encoding/json"
"fmt"
"net/http"
"time"
)
func HandleEnsureLock(w http.ResponseWriter, r *http.Request) {
key := r.URL.Query().Get("key")
header := map[string]string{
"Content-Type": "application/json; charset=utf-8",
}
type GetValueRsp struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data string `json:"data"`
}
var values []string
for _, host := range DefaultHosts {
go func(h string) {
url := fmt.Sprintf("http://%v/paxos/value", h)
status, body, err := HttpGet(url, header, nil)
if err != nil || status >= 400 {
fmt.Printf("request propose fail, host:%v, err:%v, status:%v\n", h, err, status)
return
}
var rsp GetValueRsp
if err := json.Unmarshal(body, &rsp); err != nil {
fmt.Printf("unmarshal fail, body:%v, err:%v\n", string(body), err)
return
}
values = append(values, rsp.Data)
}(host)
}
ticker := time.NewTicker(50 * time.Millisecond)
timeout := time.NewTimer(5 * time.Second)
for {
select {
case <-ticker.C:
if len(values) > len(DefaultHosts)/2 {
//收到超半数回复
//判断是否有锁
countMap := make(map[string]int)
for _, value := range values {
if _, ok := countMap[value]; !ok {
countMap[value] = 0
}
countMap[value] += 1
}
for value, count := range countMap {
if count > len(DefaultHosts)/2 && value != "" {
WriteRsp(w, -1, "has already lock", value)
return
}
}
//加锁
var proposalId string
var proposalValue string
var err error
tk := time.NewTicker(50 * time.Millisecond)
to := time.NewTimer(5 * time.Second)
for {
select {
case <-tk.C:
proposalId, proposalValue, err = DefaultProposer.Prepare(key)
if err != nil {
break
}
err = DefaultProposer.Propose(proposalId, proposalValue)
if err != nil {
break
}
if proposalValue == key {
WriteRsp(w, 0, "ensure lock success", proposalValue)
} else {
WriteRsp(w, -1, "ensure lock fail", proposalValue)
}
return
case <-to.C:
WriteRsp(w, -1, "ensure lock time out", nil)
return
}
}
}
case <-timeout.C:
//未收到超半数回复
WriteRsp(w, -1, "ensure lock time out", nil)
return
}
}
}
func HandleReleaseLock(w http.ResponseWriter, r *http.Request) {
key := r.URL.Query().Get("key")
header := map[string]string{
"Content-Type": "application/json; charset=utf-8",
}
type GetValueRsp struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data string `json:"data"`
}
var values []string
for _, host := range DefaultHosts {
go func(h string) {
url := fmt.Sprintf("http://%v/paxos/value", h)
status, body, err := HttpGet(url, header, nil)
if err != nil || status >= 400 {
fmt.Printf("request propose fail, host:%v, err:%v, status:%v\n", h, err, status)
return
}
var rsp GetValueRsp
if err := json.Unmarshal(body, &rsp); err != nil {
fmt.Printf("unmarshal fail, body:%v, err:%v\n", string(body), err)
return
}
values = append(values, rsp.Data)
}(host)
}
ticker := time.NewTicker(50 * time.Millisecond)
timeout := time.NewTimer(5 * time.Second)
for {
select {
case <-ticker.C:
if len(values) > len(DefaultHosts)/2 {
//收到超半数回复
//判断是否有锁
countMap := make(map[string]int)
for _, value := range values {
if _, ok := countMap[value]; !ok {
countMap[value] = 0
}
countMap[value] += 1
}
for value, count := range countMap {
if count > len(DefaultHosts)/2 {
if value == key {
//释放锁
var proposalId string
var proposalValue string
var err error
tk := time.NewTicker(50 * time.Millisecond)
to := time.NewTimer(5 * time.Second)
for {
select {
case <-tk.C:
proposalId, proposalValue, err = DefaultProposer.Prepare("")
if err != nil {
break
}
err = DefaultProposer.Propose(proposalId, proposalValue)
if err != nil {
break
}
if proposalValue == "" {
WriteRsp(w, 0, "release lock success", proposalValue)
} else {
WriteRsp(w, -1, "release lock fail", proposalValue)
}
//WriteRsp(w, 0, "release lock success", proposalValue)
return
case <-to.C:
WriteRsp(w, -1, "release lock time out", nil)
return
}
}
} else {
WriteRsp(w, -1, "can not release other lock", value)
return
}
}
}
}
if len(values) == len(DefaultHosts) {
WriteRsp(w, -1, "has no lock", nil)
return
}
case <-timeout.C:
//未收到超半数回复
WriteRsp(w, -1, "ensure lock time out", nil)
return
}
}
}
3.3 分布式存储
package paxos
import (
"encoding/json"
"fmt"
"net/http"
"time"
)
func HandlePutStorage(w http.ResponseWriter, r *http.Request) {
value := r.URL.Query().Get("value")
var proposalId string
var proposalValue string
var err error
tk := time.NewTicker(50 * time.Millisecond)
to := time.NewTimer(5 * time.Second)
for {
select {
case <-tk.C:
proposalId, proposalValue, err = DefaultProposer.Prepare(value)
if err != nil {
break
}
err = DefaultProposer.Propose(proposalId, proposalValue)
if err != nil {
break
}
if proposalValue == value {
WriteRsp(w, 0, "put storage success", proposalValue)
} else {
WriteRsp(w, -1, "put storage fail", proposalValue)
}
return
case <-to.C:
WriteRsp(w, -1, "put storage time out", nil)
return
}
}
}
func HandleGetStorage(w http.ResponseWriter, r *http.Request) {
header := map[string]string{
"Content-Type": "application/json; charset=utf-8",
}
type GetValueRsp struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data string `json:"data"`
}
var values []string
for _, host := range DefaultHosts {
go func(h string) {
url := fmt.Sprintf("http://%v/paxos/value", h)
status, body, err := HttpGet(url, header, nil)
if err != nil || status >= 400 {
fmt.Printf("request propose fail, host:%v, err:%v, status:%v\n", h, err, status)
return
}
var rsp GetValueRsp
if err := json.Unmarshal(body, &rsp); err != nil {
fmt.Printf("unmarshal fail, body:%v, err:%v\n", string(body), err)
return
}
values = append(values, rsp.Data)
}(host)
}
ticker := time.NewTicker(50 * time.Millisecond)
timeout := time.NewTimer(5 * time.Second)
for {
select {
case <-ticker.C:
if len(values) > len(DefaultHosts)/2 {
//收到超半数回复
//判断是否有值
countMap := make(map[string]int)
for _, value := range values {
if _, ok := countMap[value]; !ok {
countMap[value] = 0
}
countMap[value] += 1
}
for value, count := range countMap {
if count > len(DefaultHosts)/2 {
WriteRsp(w, 0, "", value)
return
}
}
}
if len(values) == len(DefaultHosts) {
WriteRsp(w, -1, "has no value", nil)
return
}
case <-timeout.C:
//未收到超半数回复
WriteRsp(w, -1, "get storage time out", nil)
return
}
}
}




