Raft,分布式共识算法,是工程上使用较为广泛的强一致性、去中心化、高可用的分布式协议。如redis-sentinel,etcd等都使用raft协议解决分布式一致性的问题。
nacos注册中心是阿里巴巴贡献的开源项目,兼具服务注册发现、动态配置管理、动态dns等功能。nacos集群间,使用了raft协议,来解决分布式一致性问题。
Raft简介
关于Raft协议,首先,可以看一下动画,初步认识一下这个分布式公式算法的具体步骤。
当然,更好的方式是阅读论文:
https://raft.github.io/raft.pdf
Raft节点有三种状态,follower、candidate、leader,各状态之间的转化如下图:
整个过程大致分为 Leader Election 、Log Replication 两步
Leader Election
①所有节点一开始都是follower状态,一定时间未收到leader的心跳,则进入candidate状态,参与选举;
②选出leader后,leader通过向follower发送心跳来表明存活状态,若leader故障,则整体退回到 ①中的状态;
③每次选举完成后,会产生一个term,term本身是递增的,充当了逻辑时钟的作用;
具体的选举过程:等待beat,若超时未等到,准备选举—->add current term ,转变为candidate状态—->给自己投票,然后给其他节点发送投票请求—->等待选举结果。
具体投票过程有三个约束:
- 在同一任期内,单个节点最多只能投一票;
- 候选人知道的信息不能比自己的少(Log与term);
- first-come-first-served 先来先得。
选举结果有三种情况:
- 收到majority的投票(含自己的一票),则赢得选举,成为leader;
- 被告知别人已当选,那么自行切换到follower;
- 一段时间内没有收到majority投票,则保持candidate状态,重新发出选举。(ps:如果是遇到平票现象,则会增加系统不可用时间,因此,raft中引入了randomized election timeouts,尽量避免出现平票现象的产生)
一旦选举完毕,leader节点会给所有其他节点发消息,避免其他节点触发新的选举。
nacos中的Raft的选举
RaftCore 实现raft算法的核心类,也是raft启动的地方;
RaftPeer 单个raft节点的对象,其成员变量描述了单个节点的信息;
RaftPeerSet 当前节点持有的同伴节点信息;
RaftProxy 用于转发请求给leader;
RaftStore 持久化节点信息到磁盘的类。
首先打开RaftCore.java,这里是raft启动的地方,@PostConstruct 注解使得该方法在servlet init()之前自动执行。
@PostConstruct
public void init() throws Exception {
Loggers.RAFT.info("initializing Raft sub-system");
executor.submit(notifier);
long start = System.currentTimeMillis();
raftStore.loadDatums(notifier, datums);
setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
while (notifier.tasks.size() > 0) {
Thread.sleep(1000L);
}
initialized = true;
GlobalExecutor.registerMasterElection(new MasterElection());
GlobalExecutor.registerHeartbeat(new HeartBeat());
}
这段init()方法首先通过周期执行任务的线程池提交notifier对象,然后通过loadDatums()读取磁盘中的datum,随后有一个while循环,不断sleep直至跳出。
通过阅读loadDatums()方法和notifier的run()方法(这边就不贴出来了),我们可以知道这里是一个生产者、消费者模型,loadDatums()方法生产新的datums并addTask,notifier则不断消费,通过notifier.tasks.size() > 0条件来控制,直至所有datum被消费完成才继续执行该线程。datums是一个concurrentHashMap,本地缓存了所有的datum信息。这一步是同步本地service与instance的信息。
接下来两个方法,顾名思义,是开启选举和心跳。
public class MasterElection implements Runnable {
@Override
public void run() {
try {
if (!peers.isReady()) {
return;
}
RaftPeer local = peers.local();
local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
if (local.leaderDueMs > 0) {
return;
}
// reset timeout
local.resetLeaderDue();
local.resetHeartbeatDue();
sendVote();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while master election {}", e);
}
}
MasterElection是交给ScheduledThreadPool来定期执行的,每次执行会对leaderDueMs递减(leaderDueMs是一个随机值,为了减小平票的概率),当递减至值小于零,说明follower未接收到心跳,则自动变为candidate,参与选举。
public void sendVote() {
RaftPeer local = peers.get(NetUtils.localServer());
peers.reset();
local.term.incrementAndGet();
local.voteFor = local.ip;
local.state = RaftPeer.State.CANDIDATE;
Map<String, String> params = new HashMap<>(1);
params.put("vote", JSON.toJSONString(local));
for (final String server : peers.allServersWithoutMySelf()) {
final String url = buildURL(server, API_VOTE);
try {
HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
return 1;
}
RaftPeer peer = JSON.parseObject(response.getResponseBody(), RaftPeer.class);
peers.decideLeader(peer);
return 0;
}
});
} catch (Exception e) {
Loggers.RAFT.warn("error while sending vote to server: {}", server);
}
}
}
candidate给自己投票,并且term+1,通过HttpClient异步发送post请求,给所有其他的server,异步结束后,回调一个函数,根据返回的peer信息,调用decideLeader()。这个就是Raft中定义的RequestVote RPCs。
先看看投票请求发送后如何接收,根据发送地址,找到对应的controller—RaftController的vote方法,最终还是定位到了RaftCore的 receivedVote(RaftPeer remote) 方法:
public RaftPeer receivedVote(RaftPeer remote) {
if (!peers.contains(remote)) {
throw new IllegalStateException("can not find peer: " + remote.ip);
}
RaftPeer local = peers.get(NetUtils.localServer());
if (remote.term.get() <= local.term.get()) {
String msg = "received illegitimate vote" +
", voter-term:" + remote.term + ", votee-term:" + local.term;
Loggers.RAFT.info(msg);
if (StringUtils.isEmpty(local.voteFor)) {
local.voteFor = local.ip;
}
return local;
}
local.resetLeaderDue();
local.state = RaftPeer.State.FOLLOWER;
local.voteFor = remote.ip;
local.term.set(remote.term.get());
Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term);
return local;
}
若接收到的peer,term小于等于自己当前的term,则不为其投票,直接返回local,反之则投票,并把自己的状态修改成follower,并重置leaderDue时间。返回的local信息包含了本节点term是多少、本节点给谁投票等信息。
再回过头看前面的decideLeader():
public RaftPeer decideLeader(RaftPeer candidate) {
peers.put(candidate.ip, candidate);
SortedBag ips = new TreeBag();
int maxApproveCount = 0;
String maxApprovePeer = null;
for (RaftPeer peer : peers.values()) {
if (StringUtils.isEmpty(peer.voteFor)) {
continue;
}
ips.add(peer.voteFor);
if (ips.getCount(peer.voteFor) > maxApproveCount) {
maxApproveCount = ips.getCount(peer.voteFor);
maxApprovePeer = peer.voteFor;
}
}
if (maxApproveCount >= majorityCount()) {
RaftPeer peer = peers.get(maxApprovePeer);
peer.state = RaftPeer.State.LEADER;
if (!Objects.equals(leader, peer)) {
leader = peer;
applicationContext.publishEvent(new LeaderElectFinishedEvent(this, leader));
Loggers.RAFT.info("{} has become the LEADER", leader.ip);
}
}
return leader;
}
把接收到的节点信息放入hashmap,遍历peers,记录每个节点所投的票,统计出得票最大的节点,若此数值>majorityCount,则相应的节点成为leader。
leader选举成功后,会向其他节点发送心跳((AppendEntries RPCs),来确认其存在的权威性。让我们回到第一段,除了开启MasterElection,init方法同样通过ScheduledThreadPool提交HeartBeat线程,来维持心跳包的发送。下面是HeartBeat中的run()方法:
public void run() {
try {
if (!peers.isReady()) {
return;
}
RaftPeer local = peers.local();
local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
if (local.heartbeatDueMs > 0) {
return;
}
local.resetHeartbeatDue();
sendBeat();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
}
}
这里的逻辑与选举几乎一样,微小的差别在于heartbeatDueMs<< leaderDueMs 。
由于心跳包还有数据同步的功能,这里只粘了选举相关的部分代码。
public void sendBeat() throws IOException, InterruptedException {
RaftPeer local = peers.local();
if (local.state != RaftPeer.State.LEADER && !STANDALONE_MODE) {
return;
}
local.resetLeaderDue();
JSONObject packet = new JSONObject();
packet.put("peer", local);
JSONArray array = new JSONArray();
for (final String server : peers.allServersWithoutMySelf()) {
try {
final String url = buildURL(server, API_BEAT);
HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
peers.update(JSON.parseObject(response.getResponseBody(), RaftPeer.class));
}
}
public RaftPeer receivedBeat(JSONObject beat) throws Exception {
final RaftPeer local = peers.local();
final RaftPeer remote = new RaftPeer();
remote.ip = beat.getJSONObject("peer").getString("ip");
remote.state = RaftPeer.State.valueOf(beat.getJSONObject("peer").getString("state"));
remote.term.set(beat.getJSONObject("peer").getLongValue("term"));
remote.heartbeatDueMs = beat.getJSONObject("peer").getLongValue("heartbeatDueMs");
remote.leaderDueMs = beat.getJSONObject("peer").getLongValue("leaderDueMs");
remote.voteFor = beat.getJSONObject("peer").getString("voteFor");
if (remote.state != RaftPeer.State.LEADER) {
Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}",
remote.state, JSON.toJSONString(remote));
throw new IllegalArgumentException("invalid state from master, state: " + remote.state);
}
if (local.term.get() > remote.term.get()) {
Loggers.RAFT.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}"
, remote.term.get(), local.term.get(), JSON.toJSONString(remote), local.leaderDueMs);
throw new IllegalArgumentException("out of date beat, beat-from-term: " + remote.term.get()
+ ", beat-to-term: " + local.term.get());
}
if (local.state != RaftPeer.State.FOLLOWER) {
Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JSON.toJSONString(remote));
// mk follower
local.state = RaftPeer.State.FOLLOWER;
local.voteFor = remote.ip;
}
final JSONArray beatDatums = beat.getJSONArray("datums");
local.resetLeaderDue();
local.resetHeartbeatDue();
peers.makeLeader(remote);
}
可以看到,如果不是leader,sendBeat()方法是直接return的,而receivedBeat()中,也会根据term来判断是否“认”这个leader,如果认了,则改变自己的状态为follower,并且在自己的RaftPeerSet中设置leader。
至此,nacos中raft选举的部分就写完了,因为本篇基本是自己读代码后写的,很可能有些代码理解的不对,欢迎讨论和指正~~☺
转载请注明:SuperIT » Nacos与Raft (一)选举