源码版本是 2.7.8
Dubbo 提供一下几种负载均衡
RandomLoadBalance:加权随机算法
RoundRobinLoadBalance:加权轮询负载均衡
LeastActiveLoadBalance:最小活跃数负载均衡。活跃调用数越小,表明该服务提供者效率越高,单位时间内可处理更多的请求。
ConsistentHashLoadBalance:一致性Hash负载均衡
ShortestResponseLoadBalance:最短响应负载均衡
Dubbo 的负载均衡代码位于 dubbo-cluster
目录下。抽象类 AbstractLoadBalance
实现了 LoadBalance
,然后dubbo提供的几种负载均衡方法,实现了AbstractLoadBalance#doSelect
负载均衡策略 1 2 3 4 5 6 @SPI(RandomLoadBalance.NAME) public interface LoadBalance { @Adaptive("loadbalance") <T> Invoker<T> select (List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException ; }
负载均衡主要是从服务提供者列表中,选择一个。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public abstract class AbstractLoadBalance implements LoadBalance { static int calculateWarmupWeight (int uptime, int warmup, int weight) { int ww = (int ) ((float ) uptime / ((float ) warmup / (float ) weight)); return ww < 1 ? 1 : (ww > weight ? weight : ww); } @Override public <T> Invoker<T> select (List<Invoker<T>> invokers, URL url, Invocation invocation) { if (CollectionUtils.isEmpty(invokers)) { return null ; } if (invokers.size() == 1 ) { return invokers.get(0 ); } return doSelect(invokers, url, invocation); } protected abstract <T> Invoker<T> doSelect (List<Invoker<T>> invokers, URL url, Invocation invocation) ; protected int getWeight (Invoker<?> invoker, Invocation invocation) { int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); if (weight > 0 ) { long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L ); if (timestamp > 0L ) { int uptime = (int ) (System.currentTimeMillis() - timestamp); int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP); if (uptime > 0 && uptime < warmup) { weight = calculateWarmupWeight(uptime, warmup, weight); } } } return weight >= 0 ? weight : 0 ; } }
RandomLoadBalance (随机选择算法) 随机,按权重设置随机概率。在一个节点上碰撞的概率高,但是条用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者的权重。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 protected <T> Invoker<T> doSelect (List<Invoker<T>> invokers, URL url, Invocation invocation) { return invokers.get(random.nextInt(length)); int length = invokers.size(); boolean sameWeight = true ; int [] weights = new int [length]; int firstWeight = getWeight(invokers.get(0 ), invocation); weights[0 ] = firstWeight; int totalWeight = firstWeight; for (int i = 1 ; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); weights[i] = weight; totalWeight += weight; if (sameWeight && weight != firstWeight) { sameWeight = false ; } } if (totalWeight > 0 && !sameWeight) { int offset = ThreadLocalRandom.current().nextInt(totalWeight); for (int i = 0 ; i < length; i++) { offset -= weights[i]; if (offset < 0 ) { return invokers.get(i); } } } return invokers.get(ThreadLocalRandom.current().nextInt(length)); }
总结:
通过累加计算总的权重,并判断所有的权重是否相等
如果总权重是0,或者所有的权重相等,则随机生成一个随机数,返回提供者
如果权重大于0且权重不相同,则在总权重范围内,随机生成一个随机数,循环减去之前保存的每个服务权重,当随机数小于0时,返回当前的服务提供者。
RoundRobinLoadBalance (加权轮询策略) 轮询,按公约后的权重设置轮询比例。所谓的轮询就是将请求轮流分配给每台服务器。轮询是一种无状态的负载均衡。通过对轮询过程进行加权,以调控每台服务器的负载。经过加权后,每台服务器能够得到的请求数比例,接近或等于他们的权重比。
轮询存在慢的提供者积累请求的问题,比如:第二台机器很慢,但是没挂掉,当请求调到第二台时,就卡在哪里,久而久之,所以请求都卡在调用的第二台上。 轮询,按公约后的权重设置轮询比例。存在慢的提供者积累请求的问题,比如:第二台机器很慢,但是没挂掉,当请求调到第二台时,就卡在哪里,久而久之,所以请求都卡在调用的第二台上。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 public class RoundRobinLoadBalance extends AbstractLoadBalance { public static final String NAME = "roundrobin" ; private static final int RECYCLE_PERIOD = 60000 ; protected static class WeightedRoundRobin { private int weight; private AtomicLong current = new AtomicLong(0 ); private long lastUpdate; public int getWeight () { return weight; } public void setWeight (int weight) { this .weight = weight; current.set(0 ); } public long increaseCurrent () { return current.addAndGet(weight); } public void sel (int total) { current.addAndGet(-1 * total); } public long getLastUpdate () { return lastUpdate; } public void setLastUpdate (long lastUpdate) { this .lastUpdate = lastUpdate; } } private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>(); protected <T> Collection<String> getInvokerAddrList (List<Invoker<T>> invokers, Invocation invocation) { String key = invokers.get(0 ).getUrl().getServiceKey() + "." + invocation.getMethodName(); Map<String, WeightedRoundRobin> map = methodWeightMap.get(key); if (map != null ) { return map.keySet(); } return null ; } @Override protected <T> Invoker<T> doSelect (List<Invoker<T>> invokers, URL url, Invocation invocation) { String key = invokers.get(0 ).getUrl().getServiceKey() + "." + invocation.getMethodName(); ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>()); int totalWeight = 0 ; long maxCurrent = Long.MIN_VALUE; long now = System.currentTimeMillis(); Invoker<T> selectedInvoker = null ; WeightedRoundRobin selectedWRR = null ; for (Invoker<T> invoker : invokers) { String identifyString = invoker.getUrl().toIdentityString(); int weight = getWeight(invoker, invocation); WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> { WeightedRoundRobin wrr = new WeightedRoundRobin(); wrr.setWeight(weight); return wrr; }); if (weight != weightedRoundRobin.getWeight()) { weightedRoundRobin.setWeight(weight); } long cur = weightedRoundRobin.increaseCurrent(); weightedRoundRobin.setLastUpdate(now); if (cur > maxCurrent) { maxCurrent = cur; selectedInvoker = invoker; selectedWRR = weightedRoundRobin; } totalWeight += weight; } if (invokers.size() != map.size()) { map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD); } if (selectedInvoker != null ) { selectedWRR.sel(totalWeight); return selectedInvoker; } return invokers.get(0 ); } }
总结:
循环遍历invokers,初始化对应invoker的缓存,更新weight和current
每次 current += weight,然后判断哪个invoker的current的权重最大
在 selectedInvoker不为null的情况下,最大权重的current减去总权重,返回selectedInvoker
LeastActiveLoadBalance(最少活跃调用数) 活跃调用数越小,表明该服务提供者效率高,单位时间内可以处理更多的请求。在具体实现中,每个服务提供对应一个活跃数 active,初始情况下,所以服务提供者活跃为0,每收到一个请求,活跃数+1,完成请求+1。在运行一段时间后,性能好的服务提供者处理请求的速度更快,因此活跃数下降的也越快,此时这样的服务提供者能够优先获取到新的服务请求、这就是最小活跃数负载均衡算法的基本思想。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 public class LeastActiveLoadBalance extends AbstractLoadBalance { public static final String NAME = "leastactive" ; @Override protected <T> Invoker<T> doSelect (List<Invoker<T>> invokers, URL url, Invocation invocation) { int length = invokers.size(); int leastActive = -1 ; int leastCount = 0 ; int [] leastIndexes = new int [length]; int [] weights = new int [length]; int totalWeight = 0 ; int firstWeight = 0 ; boolean sameWeight = true ; for (int i = 0 ; i < length; i++) { Invoker<T> invoker = invokers.get(i); int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); int afterWarmup = getWeight(invoker, invocation); weights[i] = afterWarmup; if (leastActive == -1 || active < leastActive) { leastActive = active; leastCount = 1 ; leastIndexes[0 ] = i; totalWeight = afterWarmup; firstWeight = afterWarmup; sameWeight = true ; } else if (active == leastActive) { leastIndexes[leastCount++] = i; totalWeight += afterWarmup; if (sameWeight && afterWarmup != firstWeight) { sameWeight = false ; } } } if (leastCount == 1 ) { return invokers.get(leastIndexes[0 ]); } if (!sameWeight && totalWeight > 0 ) { int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); for (int i = 0 ; i < leastCount; i++) { int leastIndex = leastIndexes[i]; offsetWeight -= weights[leastIndex]; if (offsetWeight < 0 ) { return invokers.get(leastIndex); } } } return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]); } }
总结:
循环遍历 invokers 列表,寻找活跃数最小的 Invoker
如果多个 Invoker 具有相同的最小活跃数,此时记录下这些Invoker在invokers中的下标,并累加他们的权重,比较它们的权重值是否相同
如果只有一个Invoker具有最小活跃数,直接返回这个Invoker
如果有多个 Invoker 具有最小活跃数,且它们的权重不相等,此时处理方式和 RandomLoadBalance 一致
如果有多个 Invoker 具有最小活跃数,但它们的权重相等,此时随机返回一个即可
ConsistentHashLoadBalance (一致性Hash算法) 相同参数请求总是落在同一台机器上。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 public class ConsistentHashLoadBalance extends AbstractLoadBalance { public static final String NAME = "consistenthash" ; private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>(); @SuppressWarnings("unchecked") @Override protected <T> Invoker<T> doSelect (List<Invoker<T>> invokers, URL url, Invocation invocation) { String methodName = RpcUtils.getMethodName(invocation); String key = invokers.get(0 ).getUrl().getServiceKey() + "." + methodName; int identityHashCode = System.identityHashCode(invokers); ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key); if (selector == null || selector.identityHashCode != identityHashCode) { selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode)); selector = (ConsistentHashSelector<T>) selectors.get(key); } return selector.select(invocation); } private static final class ConsistentHashSelector <T > { private final TreeMap<Long, Invoker<T>> virtualInvokers; private final int replicaNumber; private final int identityHashCode; private final int [] argumentIndex; ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) { this .virtualInvokers = new TreeMap<Long, Invoker<T>>(); this .identityHashCode = identityHashCode; URL url = invokers.get(0 ).getUrl(); this .replicaNumber = url.getMethodParameter(methodName, "hash.nodes" , 160 ); String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments" , "0" )); argumentIndex = new int [index.length]; for (int i = 0 ; i < index.length; i++) { argumentIndex[i] = Integer.parseInt(index[i]); } for (Invoker<T> invoker : invokers) { String address = invoker.getUrl().getAddress(); for (int i = 0 ; i < replicaNumber / 4 ; i++) { byte [] digest = md5(address + i); for (int h = 0 ; h < 4 ; h++) { long m = hash(digest, h); virtualInvokers.put(m, invoker); } } } } public Invoker<T> select (Invocation invocation) { String key = toKey(invocation.getArguments()); byte [] digest = md5(key); return selectForKey(hash(digest, 0 )); } private String toKey (Object[] args) { StringBuilder buf = new StringBuilder(); for (int i : argumentIndex) { if (i >= 0 && i < args.length) { buf.append(args[i]); } } return buf.toString(); } private Invoker<T> selectForKey (long hash) { Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash); if (entry == null ) { entry = virtualInvokers.firstEntry(); } return entry.getValue(); } private long hash (byte [] digest, int number) { return (((long ) (digest[3 + number * 4 ] & 0xFF ) << 24 ) | ((long ) (digest[2 + number * 4 ] & 0xFF ) << 16 ) | ((long ) (digest[1 + number * 4 ] & 0xFF ) << 8 ) | (digest[number * 4 ] & 0xFF )) & 0xFFFFFFFFL ; } private byte [] md5(String value) { MessageDigest md5; try { md5 = MessageDigest.getInstance("MD5" ); } catch (NoSuchAlgorithmException e) { throw new IllegalStateException(e.getMessage(), e); } md5.reset(); byte [] bytes = value.getBytes(StandardCharsets.UTF_8); md5.update(bytes); return md5.digest(); } } }
ShortestResponseLoadBalance (最短响应负载均衡) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 public class ShortestResponseLoadBalance extends AbstractLoadBalance { public static final String NAME = "shortestresponse" ; @Override protected <T> Invoker<T> doSelect (List<Invoker<T>> invokers, URL url, Invocation invocation) { int length = invokers.size(); long shortestResponse = Long.MAX_VALUE; int shortestCount = 0 ; int [] shortestIndexes = new int [length]; int [] weights = new int [length]; int totalWeight = 0 ; int firstWeight = 0 ; boolean sameWeight = true ; for (int i = 0 ; i < length; i++) { Invoker<T> invoker = invokers.get(i); RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()); long succeededAverageElapsed = rpcStatus.getSucceededAverageElapsed(); int active = rpcStatus.getActive(); long estimateResponse = succeededAverageElapsed * active; int afterWarmup = getWeight(invoker, invocation); weights[i] = afterWarmup; if (estimateResponse < shortestResponse) { shortestResponse = estimateResponse; shortestCount = 1 ; shortestIndexes[0 ] = i; totalWeight = afterWarmup; firstWeight = afterWarmup; sameWeight = true ; } else if (estimateResponse == shortestResponse) { shortestIndexes[shortestCount++] = i; totalWeight += afterWarmup; if (sameWeight && i > 0 && afterWarmup != firstWeight) { sameWeight = false ; } } } if (shortestCount == 1 ) { return invokers.get(shortestIndexes[0 ]); } if (!sameWeight && totalWeight > 0 ) { int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); for (int i = 0 ; i < shortestCount; i++) { int shortestIndex = shortestIndexes[i]; offsetWeight -= weights[shortestIndex]; if (offsetWeight < 0 ) { return invokers.get(shortestIndex); } } } return invokers.get(shortestIndexes[ThreadLocalRandom.current().nextInt(shortestCount)]); } }
Reference