编程技术分享

  • 首页
  1. 首页
  2. 分布式
  3. 正文

分布式共识算法paxos

2023年8月8日 2284点热度 1人点赞 0条评论

1、背景

分布式共识算法是指多个参与者针对某件事达成一致,常用于分布式系统中保证多节点间数据一致性,主流的分布式共识算法包括:paxos、raft、zab等。其中paxos是最基础的算法,本文接下来会介绍该算法的流程,以及会用简单的示例说明使用场景,完整示例代码见:https://github.com/jemuelmiao/paxos。

2、算法流程

2.1 概念

提议

  • proposal:提议
  • proposal id:提议编号
  • proposal value:提议值

角色

  • proposer:提议者
  • acceptor:决策者
  • learner:学习者

2.2 时序图

Paxos流程分为三个阶段:prepare、accept、learn。Prepare阶段由proposer向所有acceptor提交提议编号,acceptor根据收到的提议编号作出承诺或拒绝。Accept阶段由proposer向所有acceptor提交提议值,acceptor根据收到的提议作出接收或拒绝。Learn阶段由proposer通知learner决策结果。时序图如下:

2.3 执行流程

在proposer prepare流程,proposer生成提议编号,该编号需要唯一、递增、可比较,一般采用时间戳和服务编号生成。当收到大多数acceptor的承诺时,如果任一承诺中包含提议值,选择提议编号最大的提议值,否则使用当前proposer的提议值;当未收到大多数acceptor的承诺时,proposer重新生成提议。流程如下:

在accept promise流程,acceptor根据自身存储的提议编号和请求的提议编号进行对比,当请求的提议编号大于存储的提议编号时,将存储的提议编号更新为请求的提议编号,并作出两个承诺:1、不再接收proposal id小于等于当前请求的prepare请求;2、不再接收proposal id小于当前请求的propose请求。另外如果acceptor之前已经接收过提议编号、提议值,会将该提议编号、提议值一同返回proposer。流程如下:

在proposer propose流程,proposer会使用prepare阶段确定的proposal id、proposal value请求所有acceptor。当收到大多数acceptor回复且没有拒绝时,该提议通过,否则proposer重新进入prepare阶段。流程如下:

在acceptor accept流程,acceptor根据自身存储的提议编号和请求的提议编号进行对比,当请求的提议编号大于等于存储的提议编号时,将已接收提议编号、提议值更新为请求的提议编号、提议值。流程如下:

2.4 示例

2.4.1 proposer示例

package paxos

import (
    "encoding/json"
    "errors"
    "fmt"
    "sync/atomic"
    "time"
)

type Proposer struct {
    ServerId string //服务id
}

func NewProposer(serverId string) *Proposer {
    return &Proposer{ServerId: serverId}
}

// 保证不重复
func (proposer *Proposer) GetProposalId() string {
    return fmt.Sprintf("%v_%v", time.Now().UnixNano(), proposer.ServerId)
}

// proposalId, proposalValue, error
func (proposer *Proposer) Prepare(proposalValue string) (string, string, error) {
    proposalId := proposer.GetProposalId()

    header := map[string]string{
        "Content-Type": "application/json; charset=utf-8",
    }
    data := map[string]interface{}{
        "proposal_id": proposalId,
    }
    type PrepareRsp struct {
        Code int    `json:"code"`
        Msg  string `json:"msg"`
        Data struct {
            AcceptedProposalId string `json:"accepted_proposal_id"` //已接收的提案号
            AcceptedValue      string `json:"accepted_value"`       //已接收的提案值
        } `json:"data"`
    }

    var receive [][2]string
    var reject int32
    for _, host := range DefaultHosts {
        go func(h string) {
            url := fmt.Sprintf("http://%v/paxos/promise", h)
            status, body, err := HttpGet(url, header, data)
            if err != nil || status >= 400 {
                fmt.Printf("request prepare fail, host:%v, err:%v, status:%v\n", h, err, status)
                return
            }
            var rsp PrepareRsp
            if err := json.Unmarshal(body, &rsp); err != nil {
                fmt.Printf("unmarshal fail, body:%v, err:%v\n", string(body), err)
                return
            }
            //acceptor已有更大提案号,拒绝
            if rsp.Code != 0 {
                fmt.Printf("prepare fail, code:%v, msg:%v\n", rsp.Code, rsp.Msg)
                atomic.AddInt32(&reject, 1)
                return
            }
            receive = append(receive, [2]string{rsp.Data.AcceptedProposalId, rsp.Data.AcceptedValue})
        }(host)
    }

    ticker := time.NewTicker(50 * time.Millisecond)
    timeout := time.NewTimer(5 * time.Second)
    for {
        select {
        case <-ticker.C:
            if len(receive) > len(DefaultHosts)/2 {
                //收到超半数回复promise
                var maxAcceptedProposalId string
                var maxAcceptedValue string
                for _, reply := range receive {
                    if reply[0] != "" && reply[0] > maxAcceptedProposalId {
                        maxAcceptedProposalId = reply[0]
                        maxAcceptedValue = reply[1]
                    }
                }
                //acceptor没有接收提案号,使用自己的值
                if maxAcceptedProposalId == "" {
                    return proposalId, proposalValue, nil
                }
                //acceptor有提案值,取最大提案号的值
                return proposalId, maxAcceptedValue, nil
            } else if atomic.LoadInt32(&reject) > int32(len(DefaultHosts)/2) {
                //收到超半数拒绝
                return "", "", errors.New("receive more than half of acceptor reject")
            }
        case <-timeout.C:
            //未收到超半数回复promise
            return "", "", errors.New("not receive more than half of acceptor promise")
        }
    }
}

func (proposer *Proposer) Propose(proposalId, proposalValue string) error {
    header := map[string]string{
        "Content-Type": "application/json; charset=utf-8",
    }
    data := map[string]interface{}{
        "proposal_id":    proposalId,
        "proposal_value": proposalValue,
    }
    type ProposeRsp struct {
        Code int    `json:"code"`
        Msg  string `json:"msg"`
    }

    var replyCount int32
    var reject int32
    for _, host := range DefaultHosts {
        go func(h string) {
            if atomic.LoadInt32(&reject) > 0 {
                return
            }
            url := fmt.Sprintf("http://%v/paxos/accept", h)
            status, body, err := HttpGet(url, header, data)
            if err != nil || status >= 400 {
                fmt.Printf("request propose fail, host:%v, err:%v, status:%v\n", h, err, status)
                return
            }
            var rsp ProposeRsp
            if err := json.Unmarshal(body, &rsp); err != nil {
                fmt.Printf("unmarshal fail, body:%v, err:%v\n", string(body), err)
                return
            }
            //acceptor已有更大提案号,拒绝
            if rsp.Code != 0 {
                fmt.Printf("propose fail, code:%v, msg:%v\n", rsp.Code, rsp.Msg)
                atomic.AddInt32(&reject, 1)
                return
            }
            atomic.AddInt32(&replyCount, 1)
        }(host)
    }
    //有拒绝
    if atomic.LoadInt32(&reject) > 0 {
        return errors.New("some acceptor reject")
    }

    ticker := time.NewTicker(50 * time.Millisecond)
    timeout := time.NewTimer(5 * time.Second)
    for {
        select {
        case <-ticker.C:
            if atomic.LoadInt32(&replyCount) > int32(len(DefaultHosts)/2) {
                //收到超半数回复,通过
                //重置所有acceptor状态为新一轮
                for _, host := range DefaultHosts {
                    go func(h string) {
                        url := fmt.Sprintf("http://%v/paxos/reset", h)
                        HttpGet(url, header, nil)
                    }(host)
                }
                return nil
            }
        case <-timeout.C:
            //未收到超半数回复
            return errors.New("not receive more than half of acceptor response")
        }
    }
}

2.4.2 acceptor示例

package paxos

import "errors"

type Acceptor struct {
    state              string //状态,updating、active
    MinProposalId      string //能够同意的最小提案号
    AcceptedProposalId string //已接收的提案号
    AcceptedValue      string //已接收的提案值
}

func NewAcceptor() *Acceptor {
    return &Acceptor{
        state: "active",
    }
}

// acceptedProposalId, acceptedValue, error
func (acceptor *Acceptor) Promise(proposalId string) (string, string, error) {
    if proposalId > acceptor.MinProposalId {
        //这里根据状态判断是返回acceptor.AcceptedProposalId还是""
        acceptor.MinProposalId = proposalId
        if acceptor.state == "updating" {
            return acceptor.AcceptedProposalId, acceptor.AcceptedValue, nil
        } else {
            //未接收提案
            acceptor.state = "updating"
            return "", "", nil
        }
    } else {
        return "", "", errors.New("reject")
    }
}

func (acceptor *Acceptor) Accept(proposalId, proposalValue string) error {
    if proposalId >= acceptor.MinProposalId {
        acceptor.AcceptedProposalId = proposalId
        acceptor.AcceptedValue = proposalValue
        return nil
    } else {
        return errors.New("reject")
    }
}

func (acceptor *Acceptor) GetValue() string {
    return acceptor.AcceptedValue
}

func (acceptor *Acceptor) ResetState() {
    acceptor.state = "active"
}

3、Paxos应用

3.1 选举leader

package paxos

import (
    "encoding/json"
    "fmt"
    "net/http"
    "time"
)

func HandleElect(w http.ResponseWriter, r *http.Request) {
    var proposalId string
    var proposalValue string
    var err error
    tk := time.NewTicker(50 * time.Millisecond)
    to := time.NewTimer(5 * time.Second)
    for {
        select {
        case <-tk.C:
            proposalId, proposalValue, err = DefaultProposer.Prepare(DefaultProposer.ServerId)
            if err != nil {
                break
            }
            err = DefaultProposer.Propose(proposalId, proposalValue)
            if err != nil {
                break
            }
            WriteRsp(w, 0, "elect success", proposalValue)
            return
        case <-to.C:
            WriteRsp(w, -1, "elect time out", nil)
            return
        }
    }
}

func HandleLeader(w http.ResponseWriter, r *http.Request) {
    header := map[string]string{
        "Content-Type": "application/json; charset=utf-8",
    }
    type GetValueRsp struct {
        Code int    `json:"code"`
        Msg  string `json:"msg"`
        Data string `json:"data"`
    }
    var values []string
    for _, host := range DefaultHosts {
        go func(h string) {
            url := fmt.Sprintf("http://%v/paxos/value", h)
            status, body, err := HttpGet(url, header, nil)
            if err != nil || status >= 400 {
                fmt.Printf("request propose fail, host:%v, err:%v, status:%v\n", h, err, status)
                return
            }
            var rsp GetValueRsp
            if err := json.Unmarshal(body, &rsp); err != nil {
                fmt.Printf("unmarshal fail, body:%v, err:%v\n", string(body), err)
                return
            }
            values = append(values, rsp.Data)
        }(host)
    }

    ticker := time.NewTicker(50 * time.Millisecond)
    timeout := time.NewTimer(5 * time.Second)
    for {
        select {
        case <-ticker.C:
            if len(values) > len(DefaultHosts)/2 {
                //收到超半数回复
                countMap := make(map[string]int)
                for _, value := range values {
                    if _, ok := countMap[value]; !ok {
                        countMap[value] = 0
                    }
                    countMap[value] += 1
                }
                for value, count := range countMap {
                    if count > len(DefaultHosts)/2 {
                        WriteRsp(w, 0, "", value)
                        return
                    }
                }
                WriteRsp(w, -1, "has no leader", nil)
            }
        case <-timeout.C:
            //未收到超半数回复
            WriteRsp(w, -1, "get leader time out", nil)
            return
        }
    }
}

3.2 分布式锁

package paxos

import (
    "encoding/json"
    "fmt"
    "net/http"
    "time"
)

func HandleEnsureLock(w http.ResponseWriter, r *http.Request) {
    key := r.URL.Query().Get("key")

    header := map[string]string{
        "Content-Type": "application/json; charset=utf-8",
    }
    type GetValueRsp struct {
        Code int    `json:"code"`
        Msg  string `json:"msg"`
        Data string `json:"data"`
    }
    var values []string
    for _, host := range DefaultHosts {
        go func(h string) {
            url := fmt.Sprintf("http://%v/paxos/value", h)
            status, body, err := HttpGet(url, header, nil)
            if err != nil || status >= 400 {
                fmt.Printf("request propose fail, host:%v, err:%v, status:%v\n", h, err, status)
                return
            }
            var rsp GetValueRsp
            if err := json.Unmarshal(body, &rsp); err != nil {
                fmt.Printf("unmarshal fail, body:%v, err:%v\n", string(body), err)
                return
            }
            values = append(values, rsp.Data)
        }(host)
    }

    ticker := time.NewTicker(50 * time.Millisecond)
    timeout := time.NewTimer(5 * time.Second)
    for {
        select {
        case <-ticker.C:
            if len(values) > len(DefaultHosts)/2 {
                //收到超半数回复
                //判断是否有锁
                countMap := make(map[string]int)
                for _, value := range values {
                    if _, ok := countMap[value]; !ok {
                        countMap[value] = 0
                    }
                    countMap[value] += 1
                }
                for value, count := range countMap {
                    if count > len(DefaultHosts)/2 && value != "" {
                        WriteRsp(w, -1, "has already lock", value)
                        return
                    }
                }
                //加锁
                var proposalId string
                var proposalValue string
                var err error
                tk := time.NewTicker(50 * time.Millisecond)
                to := time.NewTimer(5 * time.Second)
                for {
                    select {
                    case <-tk.C:
                        proposalId, proposalValue, err = DefaultProposer.Prepare(key)
                        if err != nil {
                            break
                        }
                        err = DefaultProposer.Propose(proposalId, proposalValue)
                        if err != nil {
                            break
                        }
                        if proposalValue == key {
                            WriteRsp(w, 0, "ensure lock success", proposalValue)
                        } else {
                            WriteRsp(w, -1, "ensure lock fail", proposalValue)
                        }
                        return
                    case <-to.C:
                        WriteRsp(w, -1, "ensure lock time out", nil)
                        return
                    }
                }
            }
        case <-timeout.C:
            //未收到超半数回复
            WriteRsp(w, -1, "ensure lock time out", nil)
            return
        }
    }
}

func HandleReleaseLock(w http.ResponseWriter, r *http.Request) {
    key := r.URL.Query().Get("key")

    header := map[string]string{
        "Content-Type": "application/json; charset=utf-8",
    }
    type GetValueRsp struct {
        Code int    `json:"code"`
        Msg  string `json:"msg"`
        Data string `json:"data"`
    }
    var values []string
    for _, host := range DefaultHosts {
        go func(h string) {
            url := fmt.Sprintf("http://%v/paxos/value", h)
            status, body, err := HttpGet(url, header, nil)
            if err != nil || status >= 400 {
                fmt.Printf("request propose fail, host:%v, err:%v, status:%v\n", h, err, status)
                return
            }
            var rsp GetValueRsp
            if err := json.Unmarshal(body, &rsp); err != nil {
                fmt.Printf("unmarshal fail, body:%v, err:%v\n", string(body), err)
                return
            }
            values = append(values, rsp.Data)
        }(host)
    }

    ticker := time.NewTicker(50 * time.Millisecond)
    timeout := time.NewTimer(5 * time.Second)
    for {
        select {
        case <-ticker.C:
            if len(values) > len(DefaultHosts)/2 {
                //收到超半数回复
                //判断是否有锁
                countMap := make(map[string]int)
                for _, value := range values {
                    if _, ok := countMap[value]; !ok {
                        countMap[value] = 0
                    }
                    countMap[value] += 1
                }
                for value, count := range countMap {
                    if count > len(DefaultHosts)/2 {
                        if value == key {
                            //释放锁
                            var proposalId string
                            var proposalValue string
                            var err error
                            tk := time.NewTicker(50 * time.Millisecond)
                            to := time.NewTimer(5 * time.Second)
                            for {
                                select {
                                case <-tk.C:
                                    proposalId, proposalValue, err = DefaultProposer.Prepare("")
                                    if err != nil {
                                        break
                                    }
                                    err = DefaultProposer.Propose(proposalId, proposalValue)
                                    if err != nil {
                                        break
                                    }
                                    if proposalValue == "" {
                                        WriteRsp(w, 0, "release lock success", proposalValue)
                                    } else {
                                        WriteRsp(w, -1, "release lock fail", proposalValue)
                                    }
                                    //WriteRsp(w, 0, "release lock success", proposalValue)
                                    return
                                case <-to.C:
                                    WriteRsp(w, -1, "release lock time out", nil)
                                    return
                                }
                            }
                        } else {
                            WriteRsp(w, -1, "can not release other lock", value)
                            return
                        }
                    }
                }
            }
            if len(values) == len(DefaultHosts) {
                WriteRsp(w, -1, "has no lock", nil)
                return
            }
        case <-timeout.C:
            //未收到超半数回复
            WriteRsp(w, -1, "ensure lock time out", nil)
            return
        }
    }
}

3.3 分布式存储

package paxos

import (
    "encoding/json"
    "fmt"
    "net/http"
    "time"
)

func HandlePutStorage(w http.ResponseWriter, r *http.Request) {
    value := r.URL.Query().Get("value")

    var proposalId string
    var proposalValue string
    var err error
    tk := time.NewTicker(50 * time.Millisecond)
    to := time.NewTimer(5 * time.Second)
    for {
        select {
        case <-tk.C:
            proposalId, proposalValue, err = DefaultProposer.Prepare(value)
            if err != nil {
                break
            }
            err = DefaultProposer.Propose(proposalId, proposalValue)
            if err != nil {
                break
            }
            if proposalValue == value {
                WriteRsp(w, 0, "put storage success", proposalValue)
            } else {
                WriteRsp(w, -1, "put storage fail", proposalValue)
            }
            return
        case <-to.C:
            WriteRsp(w, -1, "put storage time out", nil)
            return
        }
    }
}

func HandleGetStorage(w http.ResponseWriter, r *http.Request) {
    header := map[string]string{
        "Content-Type": "application/json; charset=utf-8",
    }
    type GetValueRsp struct {
        Code int    `json:"code"`
        Msg  string `json:"msg"`
        Data string `json:"data"`
    }
    var values []string
    for _, host := range DefaultHosts {
        go func(h string) {
            url := fmt.Sprintf("http://%v/paxos/value", h)
            status, body, err := HttpGet(url, header, nil)
            if err != nil || status >= 400 {
                fmt.Printf("request propose fail, host:%v, err:%v, status:%v\n", h, err, status)
                return
            }
            var rsp GetValueRsp
            if err := json.Unmarshal(body, &rsp); err != nil {
                fmt.Printf("unmarshal fail, body:%v, err:%v\n", string(body), err)
                return
            }
            values = append(values, rsp.Data)
        }(host)
    }

    ticker := time.NewTicker(50 * time.Millisecond)
    timeout := time.NewTimer(5 * time.Second)
    for {
        select {
        case <-ticker.C:
            if len(values) > len(DefaultHosts)/2 {
                //收到超半数回复
                //判断是否有值
                countMap := make(map[string]int)
                for _, value := range values {
                    if _, ok := countMap[value]; !ok {
                        countMap[value] = 0
                    }
                    countMap[value] += 1
                }
                for value, count := range countMap {
                    if count > len(DefaultHosts)/2 {
                        WriteRsp(w, 0, "", value)
                        return
                    }
                }
            }
            if len(values) == len(DefaultHosts) {
                WriteRsp(w, -1, "has no value", nil)
                return
            }
        case <-timeout.C:
            //未收到超半数回复
            WriteRsp(w, -1, "get storage time out", nil)
            return
        }
    }
}

 

标签: 暂无
最后更新:2023年8月9日

jemuel

这个人很懒,什么都没留下

点赞
下一篇 >
文章目录
  • 1、背景
  • 2、算法流程
    • 2.1 概念
    • 2.2 时序图
    • 2.3 执行流程
    • 2.4 示例
      • 2.4.1 proposer示例
      • 2.4.2 acceptor示例
  • 3、Paxos应用
    • 3.1 选举leader
    • 3.2 分布式锁
    • 3.3 分布式存储
最新 热点 随机
最新 热点 随机
K8S源码分析系列2—远程调试K8S组件 Volcano源码分析系列—调度篇 K8S源码分析系列1—搭建K8S调试集群 K8S Controller开发 6.5840 Lab 1: MapReduce MongoDB源码分析系列1——编译环境搭建
K8S源码分析系列2—远程调试K8S组件
Java热更新 Go调度模型 Golang优先级调度 MySQL源码分析系列3——登录协议解析 MySQL源码分析系列5——ibd解析 Go内存管理

COPYRIGHT © 2021 www.miaozhouguang.com. ALL RIGHTS RESERVED.

THEME KRATOS MADE BY VTROIS

粤ICP备2022006024号

粤公网安备 44030602006568号