0%

Dubbo源码分析之负载均衡

源码版本是 2.7.8

Dubbo 提供一下几种负载均衡

  • RandomLoadBalance:加权随机算法
  • RoundRobinLoadBalance:加权轮询负载均衡
  • LeastActiveLoadBalance:最小活跃数负载均衡。活跃调用数越小,表明该服务提供者效率越高,单位时间内可处理更多的请求。
  • ConsistentHashLoadBalance:一致性Hash负载均衡
  • ShortestResponseLoadBalance:最短响应负载均衡

Dubbo 的负载均衡代码位于 dubbo-cluster 目录下。抽象类 AbstractLoadBalance 实现了 LoadBalance,然后dubbo提供的几种负载均衡方法,实现了AbstractLoadBalance#doSelect

负载均衡策略

1
2
3
4
5
6
@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {
@Adaptive("loadbalance")
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

}

负载均衡主要是从服务提供者列表中,选择一个。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public abstract class AbstractLoadBalance implements LoadBalance {
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}

@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (CollectionUtils.isEmpty(invokers)) {
return null;
}
if (invokers.size() == 1) {//当只有一个提供者时,直接返回
return invokers.get(0);
}
//选择一个服务
return doSelect(invokers, url, invocation);
}
//具体的负载均衡策略由子类来实现
protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);

//权重方法
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
if (weight > 0) {
long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
int uptime = (int) (System.currentTimeMillis() - timestamp);
int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
if (uptime > 0 && uptime < warmup) {
weight = calculateWarmupWeight(uptime, warmup, weight);
}
}
}
return weight >= 0 ? weight : 0;
}

}

RandomLoadBalance (随机选择算法)

随机,按权重设置随机概率。在一个节点上碰撞的概率高,但是条用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者的权重。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// 随机返回一个权重
return invokers.get(random.nextInt(length));
// 获取调用者数
int length = invokers.size();
// 判断所有服务提供者地址权重是否一致
boolean sameWeight = true;
// the weight of every invokers
int[] weights = new int[length];
// the first invoker's weight
int firstWeight = getWeight(invokers.get(0), invocation);
weights[0] = firstWeight;
// The sum of weights
int totalWeight = firstWeight;
for (int i = 1; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
//数组保存服务提供者的权重,方便后续使用
weights[i] = weight;
totalWeight += weight;
if (sameWeight && weight != firstWeight) {
sameWeight = false;
}
}
//判断总权重是否大于0以及是否所有权重一致
if (totalWeight > 0 && !sameWeight) {
//根据总权重生成一个随机数
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
//随机数减去权重,当随机数小于0,返回当前权重
for (int i = 0; i < length; i++) {
offset -= weights[i];
if (offset < 0) {
return invokers.get(i);
}
}
}
// 随机返回一个权重
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}

总结:

  • 通过累加计算总的权重,并判断所有的权重是否相等
  • 如果总权重是0,或者所有的权重相等,则随机生成一个随机数,返回提供者
  • 如果权重大于0且权重不相同,则在总权重范围内,随机生成一个随机数,循环减去之前保存的每个服务权重,当随机数小于0时,返回当前的服务提供者。

RoundRobinLoadBalance (加权轮询策略)

轮询,按公约后的权重设置轮询比例。所谓的轮询就是将请求轮流分配给每台服务器。轮询是一种无状态的负载均衡。通过对轮询过程进行加权,以调控每台服务器的负载。经过加权后,每台服务器能够得到的请求数比例,接近或等于他们的权重比。

轮询存在慢的提供者积累请求的问题,比如:第二台机器很慢,但是没挂掉,当请求调到第二台时,就卡在哪里,久而久之,所以请求都卡在调用的第二台上。
轮询,按公约后的权重设置轮询比例。存在慢的提供者积累请求的问题,比如:第二台机器很慢,但是没挂掉,当请求调到第二台时,就卡在哪里,久而久之,所以请求都卡在调用的第二台上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
public class RoundRobinLoadBalance extends AbstractLoadBalance {
public static final String NAME = "roundrobin";

private static final int RECYCLE_PERIOD = 60000;

protected static class WeightedRoundRobin {
private int weight;
private AtomicLong current = new AtomicLong(0);
private long lastUpdate;

public int getWeight() {
return weight;
}

public void setWeight(int weight) {
this.weight = weight;
current.set(0);
}

public long increaseCurrent() {
return current.addAndGet(weight);
}

public void sel(int total) {
current.addAndGet(-1 * total);
}

public long getLastUpdate() {
return lastUpdate;
}

public void setLastUpdate(long lastUpdate) {
this.lastUpdate = lastUpdate;
}
}

private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();

protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (map != null) {
return map.keySet();
}
return null;
}

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
//value->map{key:invoker的唯一标志 value:轮询权重相关类WeightedRoundRobin}
//methodWeightMap的value就是不同的invokers,如果为空则创建一个空的map
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>());
int totalWeight = 0; //总权重
long maxCurrent = Long.MIN_VALUE; //当期最大权重
long now = System.currentTimeMillis();
Invoker<T> selectedInvoker = null;//被选中的调用者
WeightedRoundRobin selectedWRR = null;//被选中调用者所对应的权值类
for (Invoker<T> invoker : invokers) {
String identifyString = invoker.getUrl().toIdentityString();
int weight = getWeight(invoker, invocation);
// 如果换成中没有则进行创建
WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> {
WeightedRoundRobin wrr = new WeightedRoundRobin();
wrr.setWeight(weight);
return wrr;
});
// 如果url权重更新,则更新缓存中的权重值
if (weight != weightedRoundRobin.getWeight()) {
//weight changed
weightedRoundRobin.setWeight(weight);
}
//当前权值=当前权值+设置的权值
//目的是获取最大的当前权值的提供者 作为调用使用
long cur = weightedRoundRobin.increaseCurrent();
weightedRoundRobin.setLastUpdate(now);
//获取全部中invoker中有效权值最大的那个提供者
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = invoker;
selectedWRR = weightedRoundRobin;
}
//计算总权重
totalWeight += weight;
}
//一段时间内没有访问,则去掉缓存中的记录重新计算
if (invokers.size() != map.size()) {
map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
}
if (selectedInvoker != null) {
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
// should not happen here
return invokers.get(0);
}
}

总结:

  • 循环遍历invokers,初始化对应invoker的缓存,更新weight和current
  • 每次 current += weight,然后判断哪个invoker的current的权重最大
  • 在 selectedInvoker不为null的情况下,最大权重的current减去总权重,返回selectedInvoker

LeastActiveLoadBalance(最少活跃调用数)

活跃调用数越小,表明该服务提供者效率高,单位时间内可以处理更多的请求。在具体实现中,每个服务提供对应一个活跃数 active,初始情况下,所以服务提供者活跃为0,每收到一个请求,活跃数+1,完成请求+1。在运行一段时间后,性能好的服务提供者处理请求的速度更快,因此活跃数下降的也越快,此时这样的服务提供者能够优先获取到新的服务请求、这就是最小活跃数负载均衡算法的基本思想。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public class LeastActiveLoadBalance extends AbstractLoadBalance {

public static final String NAME = "leastactive";

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size();
int leastActive = -1;//最小活跃数
// 具有相同“最小活跃数”的服务者提供者(以下用 Invoker 代称)数量
int leastCount = 0;
// leastIndexs 用于记录具有相同“最小活跃数”的 Invoker 在 invokers 列表中的下标信息
int[] leastIndexes = new int[length];
int[] weights = new int[length];//权重数组
int totalWeight = 0;//总权重
// 第一个最小活跃数的权重,用于与其他具有相同最小活跃数的 Invoker 的权重进行对比,
// 以检测是否“所有具有相同最小活跃数的 Invoker 的权重”均相等
int firstWeight = 0;
boolean sameWeight = true;// 所以最小活跃数的权重是否一样

// 遍历 invokers 列表
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
// 获取 Invoker 对应的活跃数
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
// 获取invoke配置的权重值,默认100
int afterWarmup = getWeight(invoker, invocation);
// 数组保存权重,方便后面随机选择权重
weights[i] = afterWarmup;
// 发现最小活跃数,如果最小活跃数-1(第一个服务提供者)或者当前最小活跃小于最小活跃数
if (leastActive == -1 || active < leastActive) {
// 重置最小活跃数,设置为当前 Invoker 的最小活跃数
leastActive = active;
// 更新 leastCount 为 1
leastCount = 1;
// 记录当前下标值到 leastIndexs 中
leastIndexes[0] = i;
// 更新总权重
totalWeight = afterWarmup;
// Record the weight the first least active invoker
firstWeight = afterWarmup;
// Each invoke has the same weight (only one invoker here)
sameWeight = true;

// 当 Invoker 的活跃数 active 与最小活跃数 leastActive 相同
} else if (active == leastActive) {
// 在 leastIndexs 中记录下当前 Invoker 在 invokers 集合中的下标
leastIndexes[leastCount++] = i;
// 累加权重
totalWeight += afterWarmup;
// 检测当前 Invoker 的权重与 firstWeight 是否相等,
// 不相等则将 sameWeight 置为 false
if (sameWeight && afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
// 当只有一个 Invoker 具有最小活跃数,此时直接返回该 Invoker 即可
if (leastCount == 1) {
return invokers.get(leastIndexes[0]);
}
// 有多个Invoker具有相同的最小活跃数,但它们的权重不同
if (!sameWeight && totalWeight > 0) {
// 根据总权重随机生成一个[0, totalWeight)的数字
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
// 循环让随机数减去具有最小活跃数的 Invoker,当offset小于0,返回相应的 Invoker
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexes[i];
offsetWeight -= weights[leastIndex];
if (offsetWeight < 0) {
return invokers.get(leastIndex);
}
}
}
// 如果权重相同或者权重为0,随机返回一个 Invoker
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}
}

总结:

  • 循环遍历 invokers 列表,寻找活跃数最小的 Invoker
  • 如果多个 Invoker 具有相同的最小活跃数,此时记录下这些Invoker在invokers中的下标,并累加他们的权重,比较它们的权重值是否相同
  • 如果只有一个Invoker具有最小活跃数,直接返回这个Invoker
  • 如果有多个 Invoker 具有最小活跃数,且它们的权重不相等,此时处理方式和 RandomLoadBalance 一致
  • 如果有多个 Invoker 具有最小活跃数,但它们的权重相等,此时随机返回一个即可

ConsistentHashLoadBalance (一致性Hash算法)

相同参数请求总是落在同一台机器上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
public static final String NAME = "consistenthash";
//一个方法对应一个一个hash选择器
private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();

@SuppressWarnings("unchecked")
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
//获取调用方法名,一致性Hash选择器的key
String methodName = RpcUtils.getMethodName(invocation);
String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
//基于invokers集合,根据对象内存地址来定义hash值
int identityHashCode = System.identityHashCode(invokers);
//获取 ConsistentHashSelector
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
if (selector == null || selector.identityHashCode != identityHashCode) {
//初始化创建再缓存
selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
return selector.select(invocation);
}
//hash选择器
private static final class ConsistentHashSelector<T> {

private final TreeMap<Long, Invoker<T>> virtualInvokers;
private final int replicaNumber;//每个invoker对应的虚拟节点数
private final int identityHashCode; //定义hash值
private final int[] argumentIndex;//参数位置数组(缺省是第一个参数)

ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
//获取配置的节点数,默认160
this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));//获取需要进行hash的参数数组索引,默认对第一个参数进行hash
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
//创建虚拟节点,对每个invoker生成replicaNumber个虚拟节点,并放入到TreeMap中
for (Invoker<T> invoker : invokers) {
String address = invoker.getUrl().getAddress();
for (int i = 0; i < replicaNumber / 4; i++) {
//根据MD5算法为每4个节点生成一个消息摘要,长为16字节128位
byte[] digest = md5(address + i);
//随机讲128位分成4部分,0-31,32-63,64-95,95-128,并生成4个32位数,存于long中,long的高32位都为0
// 并作为虚拟结点的key。
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
}
//选择节点
public Invoker<T> select(Invocation invocation) {
//根据调用参数来生成key
String key = toKey(invocation.getArguments());
byte[] digest = md5(key);//根据key生成消息摘要
//调用hash(digest, 0),将消息摘要转换为hashCode,这里仅取0-31位来生成HashCode
//调用sekectForKey方法选择结点
return selectForKey(hash(digest, 0));
}

private String toKey(Object[] args) {
StringBuilder buf = new StringBuilder();
for (int i : argumentIndex) {
if (i >= 0 && i < args.length) {
buf.append(args[i]);
}
}
return buf.toString();
}
//根据hashCode选择节点
private Invoker<T> selectForKey(long hash) {
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
}

private long hash(byte[] digest, int number) {
return (((long) (digest[3 + number * 4] & 0xFF) << 24)
| ((long) (digest[2 + number * 4] & 0xFF) << 16)
| ((long) (digest[1 + number * 4] & 0xFF) << 8)
| (digest[number * 4] & 0xFF))
& 0xFFFFFFFFL;
}

private byte[] md5(String value) {
MessageDigest md5;
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException(e.getMessage(), e);
}
md5.reset();
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
md5.update(bytes);
return md5.digest();
}
}
}

ShortestResponseLoadBalance (最短响应负载均衡)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class ShortestResponseLoadBalance extends AbstractLoadBalance {
public static final String NAME = "shortestresponse";

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
int length = invokers.size();
// Estimated shortest response time of all invokers
long shortestResponse = Long.MAX_VALUE;
// The number of invokers having the same estimated shortest response time
int shortestCount = 0;
// The index of invokers having the same estimated shortest response time
int[] shortestIndexes = new int[length];
// the weight of every invokers
int[] weights = new int[length];
// The sum of the warmup weights of all the shortest response invokers
int totalWeight = 0;
// The weight of the first shortest response invokers
int firstWeight = 0;
// Every shortest response invoker has the same weight value?
boolean sameWeight = true;

// Filter out all the shortest response invokers
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
// Calculate the estimated response time from the product of active connections and succeeded average elapsed time.
long succeededAverageElapsed = rpcStatus.getSucceededAverageElapsed();
int active = rpcStatus.getActive();
long estimateResponse = succeededAverageElapsed * active;
int afterWarmup = getWeight(invoker, invocation);
weights[i] = afterWarmup;
// Same as LeastActiveLoadBalance
if (estimateResponse < shortestResponse) {
shortestResponse = estimateResponse;
shortestCount = 1;
shortestIndexes[0] = i;
totalWeight = afterWarmup;
firstWeight = afterWarmup;
sameWeight = true;
} else if (estimateResponse == shortestResponse) {
shortestIndexes[shortestCount++] = i;
totalWeight += afterWarmup;
if (sameWeight && i > 0
&& afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
if (shortestCount == 1) {
return invokers.get(shortestIndexes[0]);
}
if (!sameWeight && totalWeight > 0) {
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
for (int i = 0; i < shortestCount; i++) {
int shortestIndex = shortestIndexes[i];
offsetWeight -= weights[shortestIndex];
if (offsetWeight < 0) {
return invokers.get(shortestIndex);
}
}
}
return invokers.get(shortestIndexes[ThreadLocalRandom.current().nextInt(shortestCount)]);
}
}

Reference

客官,赏一杯coffee嘛~~~~