微信搜索superit|邀请体验:大数据, 数据管理、OLAP分析与可视化平台 | 赞助作者:赞助作者

Nacos与Raft (一)选举

go aide_941 11℃

Raft,分布式共识算法,是工程上使用较为广泛的强一致性、去中心化、高可用的分布式协议。如redis-sentinel,etcd等都使用raft协议解决分布式一致性的问题。

nacos注册中心是阿里巴巴贡献的开源项目,兼具服务注册发现、动态配置管理、动态dns等功能。nacos集群间,使用了raft协议,来解决分布式一致性问题。

Raft简介

关于Raft协议,首先,可以看一下动画,初步认识一下这个分布式公式算法的具体步骤。

Raft

当然,更好的方式是阅读论文:

raft.github.io/raft.pdf

Raft节点有三种状态,follower、candidate、leader,各状态之间的转化如下图:

图片来源于网络

整个过程大致分为 Leader Election 、Log Replication 两步

Leader Election

①所有节点一开始都是follower状态,一定时间未收到leader的心跳,则进入candidate状态,参与选举;

②选出leader后,leader通过向follower发送心跳来表明存活状态,若leader故障,则整体退回到 ①中的状态;

③每次选举完成后,会产生一个term,term本身是递增的,充当了逻辑时钟的作用;

具体的选举过程:等待beat,若超时未等到,准备选举—->add current term ,转变为candidate状态—->给自己投票,然后给其他节点发送投票请求—->等待选举结果。

具体投票过程有三个约束:

  1. 在同一任期内,单个节点最多只能投一票;
  2. 候选人知道的信息不能比自己的少(Log与term);
  3. first-come-first-served 先来先得。

选举结果有三种情况:

  1. 收到majority的投票(含自己的一票),则赢得选举,成为leader
  2. 被告知别人已当选,那么自行切换到follower
  3. 一段时间内没有收到majority投票,则保持candidate状态,重新发出选举。(ps:如果是遇到平票现象,则会增加系统不可用时间,因此,raft中引入了randomized election timeouts,尽量避免出现平票现象的产生

一旦选举完毕,leader节点会给所有其他节点发消息,避免其他节点触发新的选举。

nacos中的Raft的选举

raft包中的类

RaftCore 实现raft算法的核心类,也是raft启动的地方;

RaftPeer 单个raft节点的对象,其成员变量描述了单个节点的信息;

RaftPeerSet 当前节点持有的同伴节点信息;

RaftProxy 用于转发请求给leader;

RaftStore 持久化节点信息到磁盘的类。

首先打开RaftCore.java,这里是raft启动的地方,@PostConstruct 注解使得该方法在servlet init()之前自动执行。

@PostConstruct
    public void init() throws Exception {
        Loggers.RAFT.info("initializing Raft sub-system");
        executor.submit(notifier);
        long start = System.currentTimeMillis();
        raftStore.loadDatums(notifier, datums);
        setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
        while (notifier.tasks.size() > 0) {
            Thread.sleep(1000L);
        }
        initialized = true;
        GlobalExecutor.registerMasterElection(new MasterElection());
        GlobalExecutor.registerHeartbeat(new HeartBeat());
    } 

这段init()方法首先通过周期执行任务的线程池提交notifier对象,然后通过loadDatums()读取磁盘中的datum,随后有一个while循环,不断sleep直至跳出。

通过阅读loadDatums()方法和notifier的run()方法(这边就不贴出来了),我们可以知道这里是一个生产者、消费者模型,loadDatums()方法生产新的datums并addTask,notifier则不断消费,通过notifier.tasks.size() > 0条件来控制,直至所有datum被消费完成才继续执行该线程。datums是一个concurrentHashMap,本地缓存了所有的datum信息。这一步是同步本地service与instance的信息

接下来两个方法,顾名思义,是开启选举和心跳。

 public class MasterElection implements Runnable {
        @Override
        public void run() {
            try {
                if (!peers.isReady()) {
                        return;
                }
                RaftPeer local = peers.local();
                local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
                if (local.leaderDueMs > 0) {
                    return;
                }
                // reset timeout
                local.resetLeaderDue();
                local.resetHeartbeatDue();
                sendVote();
            } catch (Exception e) {
                Loggers.RAFT.warn("[RAFT] error while master election {}", e);
            }
        }

MasterElection是交给ScheduledThreadPool来定期执行的,每次执行会对leaderDueMs递减(leaderDueMs是一个随机值,为了减小平票的概率),当递减至值小于零,说明follower未接收到心跳,则自动变为candidate,参与选举

public void sendVote() {
            RaftPeer local = peers.get(NetUtils.localServer());
            peers.reset();
            local.term.incrementAndGet();
            local.voteFor = local.ip;
            local.state = RaftPeer.State.CANDIDATE;
            Map<String, String> params = new HashMap<>(1);
            params.put("vote", JSON.toJSONString(local));
            for (final String server : peers.allServersWithoutMySelf()) {
                final String url = buildURL(server, API_VOTE);
                try {
                    HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() {
                        @Override
                        public Integer onCompleted(Response response) throws Exception {
                            if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                               return 1;
                            }
                            RaftPeer peer = JSON.parseObject(response.getResponseBody(), RaftPeer.class);
                            peers.decideLeader(peer);
                            return 0;
                        }
                    });
                } catch (Exception e) {
                    Loggers.RAFT.warn("error while sending vote to server: {}", server);
                }
            }
        }

candidate给自己投票,并且term+1,通过HttpClient异步发送post请求,给所有其他的server,异步结束后,回调一个函数,根据返回的peer信息,调用decideLeader()。这个就是Raft中定义的RequestVote RPCs。

先看看投票请求发送后如何接收,根据发送地址,找到对应的controller—RaftController的vote方法,最终还是定位到了RaftCore的 receivedVote(RaftPeer remote) 方法:

public RaftPeer receivedVote(RaftPeer remote) {
        if (!peers.contains(remote)) {
            throw new IllegalStateException("can not find peer: " + remote.ip);
        }
        RaftPeer local = peers.get(NetUtils.localServer());
        if (remote.term.get() <= local.term.get()) {
            String msg = "received illegitimate vote" +
                ", voter-term:" + remote.term + ", votee-term:" + local.term;
            Loggers.RAFT.info(msg);
            if (StringUtils.isEmpty(local.voteFor)) {
                local.voteFor = local.ip;
            }
            return local;
        }
        local.resetLeaderDue();
        local.state = RaftPeer.State.FOLLOWER;
        local.voteFor = remote.ip;
        local.term.set(remote.term.get());
        Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term);
        return local;
    }

若接收到的peer,term小于等于自己当前的term,则不为其投票,直接返回local,反之则投票,并把自己的状态修改成follower,并重置leaderDue时间。返回的local信息包含了本节点term是多少、本节点给谁投票等信息。

再回过头看前面的decideLeader():

public RaftPeer decideLeader(RaftPeer candidate) {
        peers.put(candidate.ip, candidate);
        SortedBag ips = new TreeBag();
        int maxApproveCount = 0;
        String maxApprovePeer = null;
        for (RaftPeer peer : peers.values()) {
            if (StringUtils.isEmpty(peer.voteFor)) {
                continue;
            }
            ips.add(peer.voteFor);
            if (ips.getCount(peer.voteFor) > maxApproveCount) {
                maxApproveCount = ips.getCount(peer.voteFor);
                maxApprovePeer = peer.voteFor;
            }
        }
        if (maxApproveCount >= majorityCount()) {
            RaftPeer peer = peers.get(maxApprovePeer);
            peer.state = RaftPeer.State.LEADER;
            if (!Objects.equals(leader, peer)) {
                leader = peer;
                applicationContext.publishEvent(new LeaderElectFinishedEvent(this, leader));
                Loggers.RAFT.info("{} has become the LEADER", leader.ip);
            }
        }
        return leader;
    }

把接收到的节点信息放入hashmap,遍历peers,记录每个节点所投的票,统计出得票最大的节点,若此数值>majorityCount,则相应的节点成为leader。

leader选举成功后,会向其他节点发送心跳((AppendEntries RPCs),来确认其存在的权威性。让我们回到第一段,除了开启MasterElection,init方法同样通过ScheduledThreadPool提交HeartBeat线程,来维持心跳包的发送。下面是HeartBeat中的run()方法:

public void run() {
            try {
                if (!peers.isReady()) {
                    return;
                }
                RaftPeer local = peers.local();
                local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
                if (local.heartbeatDueMs > 0) {
                    return;
                }
                local.resetHeartbeatDue();
                sendBeat();
            } catch (Exception e) {
                Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
            }

        }

这里的逻辑与选举几乎一样,微小的差别在于heartbeatDueMs<< leaderDueMs 。

由于心跳包还有数据同步的功能,这里只粘了选举相关的部分代码。

public void sendBeat() throws IOException, InterruptedException {
            RaftPeer local = peers.local();
            if (local.state != RaftPeer.State.LEADER && !STANDALONE_MODE) {
                return;
            }
            local.resetLeaderDue();
            JSONObject packet = new JSONObject();
            packet.put("peer", local);
            JSONArray array = new JSONArray();
            for (final String server : peers.allServersWithoutMySelf()) {
                try {
                    final String url = buildURL(server, API_BEAT);
                    HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new AsyncCompletionHandler<Integer>() {
                        @Override
                        public Integer onCompleted(Response response) throws Exception {
                            peers.update(JSON.parseObject(response.getResponseBody(), RaftPeer.class));
                        }
        }

public RaftPeer receivedBeat(JSONObject beat) throws Exception {
        final RaftPeer local = peers.local();
        final RaftPeer remote = new RaftPeer();
        remote.ip = beat.getJSONObject("peer").getString("ip");
        remote.state = RaftPeer.State.valueOf(beat.getJSONObject("peer").getString("state"));
        remote.term.set(beat.getJSONObject("peer").getLongValue("term"));
        remote.heartbeatDueMs = beat.getJSONObject("peer").getLongValue("heartbeatDueMs");
        remote.leaderDueMs = beat.getJSONObject("peer").getLongValue("leaderDueMs");
        remote.voteFor = beat.getJSONObject("peer").getString("voteFor");
        if (remote.state != RaftPeer.State.LEADER) {
            Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}",
                remote.state, JSON.toJSONString(remote));
            throw new IllegalArgumentException("invalid state from master, state: " + remote.state);
        }
        if (local.term.get() > remote.term.get()) {
            Loggers.RAFT.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}"
                , remote.term.get(), local.term.get(), JSON.toJSONString(remote), local.leaderDueMs);
            throw new IllegalArgumentException("out of date beat, beat-from-term: " + remote.term.get()
                + ", beat-to-term: " + local.term.get());
        }
        if (local.state != RaftPeer.State.FOLLOWER) {
            Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JSON.toJSONString(remote));
            // mk follower
            local.state = RaftPeer.State.FOLLOWER;
            local.voteFor = remote.ip;
        }
        final JSONArray beatDatums = beat.getJSONArray("datums");
        local.resetLeaderDue();
        local.resetHeartbeatDue();
        peers.makeLeader(remote);
}

可以看到,如果不是leader,sendBeat()方法是直接return的,而receivedBeat()中,也会根据term来判断是否“认”这个leader,如果认了,则改变自己的状态为follower,并且在自己的RaftPeerSet中设置leader。

至此,nacos中raft选举的部分就写完了,因为本篇基本是自己读代码后写的,很可能有些代码理解的不对,欢迎讨论和指正~~☺

转载请注明:SuperIT » Nacos与Raft (一)选举

喜欢 (0)or分享 (0)