0%

Dubbo源码分析之Registry注册中心

源码版本是 2.7.8

什么是注册中心?

服务治理框架中大致分为服务通信和服务管理两部分,服务管理可以分为服务注册、服务发现以及服务被热加工介入,服务提供者Provider会往注册中心注册服务,而消费者Consumer会从注册中心订阅相关服务,并不会订阅全部的服务。

dubbo-registry 模块

在dubbo中,注册中心相关的代码在dubbo-registry模块下,子模块dubbo-registry-api中定义了注册中心相关的基础代码,而在dubbo-registry-xxx模块中则定义了具体的注册中心类型实现代码,例如dubbo-registry-zookeeper模块则存放了zookeeper注册中心的实现代码。

类关系图:

dubbo-registry-api 相关实现

通过Registry的实现管理,分析下面各个接口类:

RegistryService

注册中心模块的服务接口,提供注册、取消注册、订阅、取消订阅、查询符合条件的已注册数据。

下面的注释,是官方的解释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public interface RegistryService {
/**
*注册服务
* 注册需处理契约:<br>
* 1. 当URL设置了check=false时,注册失败后不报错,在后台定时重试,否则抛出异常。<br>
* 2. 当URL设置了dynamic=false参数,则需持久存储,否则,当注册者出现断电等情况异常退出时,需自动删除。<br>
* 3. 当URL设置了category=overrides时,表示分类存储,缺省类别为providers,可按分类部分通知数据。<br>
* 4. 当注册中心重启,网络抖动,不能丢失数据,包括断线自动删除数据。<br>
* 5. 允许URI相同但参数不同的URL并存,不能覆盖。<br>
*
* @param url 注册信息,不允许为空,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
*/
void register(URL url);

/**
* 取消注册服务.
*
* 取消注册需处理契约:<br>
* 1. 如果是dynamic=false的持久存储数据,找不到注册数据,则抛IllegalStateException,否则忽略。<br>
* 2. 按全URL匹配取消注册。<br>
*
* @param url 注册信息,不允许为空,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
*/
void unregister(URL url);

/**
* 订阅服务.
*
* 订阅需处理契约:<br>
* 1. 当URL设置了check=false时,订阅失败后不报错,在后台定时重试。<br>
* 2. 当URL设置了category=overrides,只通知指定分类的数据,多个分类用逗号分隔,并允许星号通配,表示订阅所有分类数据。<br>
* 3. 允许以interface,group,version,classifier作为条件查询,如:interface=com.alibaba.foo.BarService&version=1.0.0<br>
* 4. 并且查询条件允许星号通配,订阅所有接口的所有分组的所有版本,或:interface=*&group=*&version=*&classifier=*<br>
* 5. 当注册中心重启,网络抖动,需自动恢复订阅请求。<br>
* 6. 允许URI相同但参数不同的URL并存,不能覆盖。<br>
* 7. 必须阻塞订阅过程,等第一次通知完后再返回。<br>
*
* @param url 订阅条件,不允许为空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
* @param listener 变更事件监听器,不允许为空
*/
void subscribe(URL url, NotifyListener listener);

/**
* 取消订阅服务.
*
* 取消订阅需处理契约:<br>
* 1. 如果没有订阅,直接忽略。<br>
* 2. 按全URL匹配取消订阅。<br>
*
* @param url 订阅条件,不允许为空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
* @param listener 变更事件监听器,不允许为空
*/
void unsubscribe(URL url, NotifyListener listener);

/**
* 查询注册列表,与订阅的推模式相对应,这里为拉模式,只返回一次结果。
*
* @see org.apache.dubbo.registry.NotifyListener#notify(List)
* @param url 查询条件,不允许为空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
* @return 已注册信息列表,可能为空,含义同{@link org.apache.dubbo.registry.NotifyListener#notify(List<URL>)}的参数。
*/
List<URL> lookup(URL url);
}

Node

Node 接口不是registry中,而是在common模块中的一个接口。Node 接口中主要声明了一些节点的操作方法,获取节点Url、是否可用、销毁节点。

Registry

Registry 接口主要是继承了 Node 接口和 RegistryService 接口,将这两个接口的内容都统一在一起。同时 Registry 接口也提供了两个方法。

1
2
3
4
5
6
7
8
9
10
public interface Registry extends Node, RegistryService {
/**再次暴露注册*/
default void reExportRegister(URL url) {
register(url);
}
/**再次取消注册*/
default void reExportUnregister(URL url) {
unregister(url);
}
}

AbstractRegistry

AbstractRegistry 实现 Registry 接口,实现了接口中定义的注册、订阅等方法。

抽象类的属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// URL 地址分隔符,用于文件缓存,服务提供成功url分隔
private static final char URL_SEPARATOR = ' ';
// URL 地址分隔的正则表达式,用于分析文件缓存中的服务提供程序URL列表
private static final String URL_SPLIT = "\\s+";
// Max times to retry to save properties to local cache file
private static final int MAX_RETRY_TIMES_SAVE_PROPERTIES = 3;
// Log output
protected final Logger logger = LoggerFactory.getLogger(getClass());
// 本地磁盘缓存,其中的特殊key为registies是记录注册表中心列表,其他是已通知服务提供商的列表
private final Properties properties = new Properties();
// 文件缓存写入执行器 提供一个线程的线程池
private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
// 是否同步保存文件标识
private boolean syncSaveFile;
// 缓存的版本号
private final AtomicLong lastCacheChanged = new AtomicLong();
private final AtomicInteger savePropertiesRetryTimes = new AtomicInteger();
// 已经注册的URL集合,不仅仅服务提供者,也注册服务消费者
private final Set<URL> registered = new ConcurrentHashSet<>();
// 已订阅 URL,value是监听器集合
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<>();
// 消费者或者服务治理服务获取注册信息后的缓存对象。内存中服务器缓存的notified对象是ConcurrentHashMap里面嵌套了一个Map,
// 外层Map的Key是消费者的URL,内层的Map的key是分类,包括provider,consumer,routes,configurators四种,value则对应服务列表,没有服务提供者提供服务的URL,会以一个特别的empty://前缀开头
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<>();
// 注册中心的URL
private URL registryUrl;
// 本地磁盘缓存文件保存的是注册中心的数据
private File file;
构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public AbstractRegistry(URL url) {
// 这只注册中心的地址URL
setUrl(url);
// 当file.cache参数为true时,即开启本地文件缓存,加载本地文件中的参数
if (url.getParameter(REGISTRY__LOCAL_FILE_CACHE_ENABLED, true)) {
syncSaveFile = url.getParameter(REGISTRY_FILESAVE_SYNC_KEY, false);
String defaultFilename = System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(APPLICATION_KEY) + "-" + url.getAddress().replaceAll(":", "-") + ".cache";
String filename = url.getParameter(FILE_KEY, defaultFilename);
File file = null;
if (ConfigUtils.isNotEmpty(filename)) {
file = new File(filename);
if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
if (!file.getParentFile().mkdirs()) {
throw new IllegalArgumentException("Invalid registry cache file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
}
}
}
this.file = file;
// 加载文件中的参数放到Properties,Properties继承HashTable。
loadProperties();
// 通知监视器URL变化
notify(url.getBackupUrls());
}
}
private void loadProperties() {
if (file != null && file.exists()) {
InputStream in = null;
try {
in = new FileInputStream(file);
properties.load(in);
if (logger.isInfoEnabled()) {
logger.info("Load registry cache file " + file + ", data: " + properties);
}
} catch (Throwable e) {
logger.warn("Failed to load registry cache file " + file, e);
} finally {
if (in != null) {
try {
in.close();
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
}
lookup

获取消费者URL订阅的服务URL列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
public List<URL> lookup(URL url) {
List<URL> result = new ArrayList<>();
// 获取注册信息中分类服务列表信息
Map<String, List<URL>> notifiedUrls = getNotified().get(url);
// 如果该服务订阅了服务
if (CollectionUtils.isNotEmptyMap(notifiedUrls)) {
for (List<URL> urls : notifiedUrls.values()) {
for (URL u : urls) {
if (!EMPTY_PROTOCOL.equals(u.getProtocol())) {
result.add(u);
}
}
}
} else {
// 如果没有订阅服务,获取注册中心的最新的注册服务URL
final AtomicReference<List<URL>> reference = new AtomicReference<>();
// 通知监听器,当收到服务变更时触发
NotifyListener listener = reference::set;
// 添加服务的监听器
subscribe(url, listener);
List<URL> urls = reference.get();
if (CollectionUtils.isNotEmpty(urls)) {
for (URL u : urls) {
if (!EMPTY_PROTOCOL.equals(u.getProtocol())) {
result.add(u);
}
}
}
}
return result;
}
register and unregister

URL 注册和取消注册,代码的主要逻辑就是从registered的内存缓存中添加或者删除URL。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public void register(URL url) {
if (url == null) {
throw new IllegalArgumentException("register url == null");
}
if (logger.isInfoEnabled()) {
logger.info("Register: " + url);
}
registered.add(url);
}

@Override
public void unregister(URL url) {
if (url == null) {
throw new IllegalArgumentException("unregister url == null");
}
if (logger.isInfoEnabled()) {
logger.info("Unregister: " + url);
}
registered.remove(url);
}
notify
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
protected void notify(List<URL> urls) {
if (CollectionUtils.isEmpty(urls)) {
return;
}
// 遍历已订阅URL匹配当前URL
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
URL url = entry.getKey();
if (!UrlUtils.isMatch(url, urls.get(0))) {
continue;
}

Set<NotifyListener> listeners = entry.getValue();
if (listeners != null) {
for (NotifyListener listener : listeners) {
try {
notify(url, listener, filterEmpty(url, urls));
} catch (Throwable t) {
logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
}
}
}
}
}
/**
* Notify changes from the Provider side.
*
* @param url consumer side url
* @param listener listener
* @param urls provider latest urls
*/
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((CollectionUtils.isEmpty(urls))
&& !ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
// 对每个服务提供URL进行分类
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
// 通知监听器
listener.notify(categoryList);
// 保存一份到文件缓存中
saveProperties(url);
}
}

FailbackRegistry

FailbackRegistry 是继承AbstractRegistry,增加了失败重试的机制作为抽象能力,后面不同的注册中心具体实现继承了这个类,就可以直接使用这个能力。

抽象类的属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 注册失败集合
private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>();
// 取消注册失败集合
private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>();
// 订阅失败集合
private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>();
// 取消订阅集合
private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>();

/**
* The time in milliseconds the retryExecutor will wait
*/
private final int retryPeriod;

// Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry
private final HashedWheelTimer retryTimer;
构造方法
1
2
3
4
5
6
7
public FailbackRegistry(URL url) {
super(url);
this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD);

// since the retry task will not be very much. 128 ticks is enough.
retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
}
register
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Override
public void register(URL url) {
if (!acceptable(url)) {
logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
return;
}
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// 这里调用具体的注册中心,比如Zookeeper
doRegister(url);
} catch (Exception e) {
Throwable t = e;

// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}

// 保存注册失败的URL
addFailedRegistered(url);
}
}

另外的几个方法,unregister、subscribe、unsubscribe都类似,里面会调用一个doxxx方法,底层就是不同的注册中心具体实现方法。

notify
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
doNotify(url, listener, urls);
} catch (Exception t) {
// Record a failed registration request to a failed list
logger.error("Failed to notify addresses for subscribe " + url + ", cause: " + t.getMessage(), t);
}
}

protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
// 调用父类的notify方法
super.notify(url, listener, urls);
}

RegistryFactory

RegistryFactory 接口定义只有一个getRegistry方法。URL 为dubbo封装的统一资源定位符,其中定义了协议protocol、用户名username、密码password、host主机、path路径等等属性。

RegistryFactory 是一个工厂方法,根据具体的注册协议,比如zookeeper,获取具体的注册中心实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@SPI("dubbo")
public interface RegistryFactory {
/**
* 连接注册中心.
*
* 连接注册中心需处理契约:<br>
* 1. 当设置check=false时表示不检查连接,否则在连接不上时抛出异常。<br>
* 2. 支持URL上的username:password权限认证。<br>
* 3. 支持backup=10.20.153.10备选注册中心集群地址。<br>
* 4. 支持file=registry.cache本地磁盘文件缓存。<br>
* 5. 支持timeout=1000请求超时设置。<br>
* 6. 支持session=60000会话超时或过期设置。<br>
*
* @param url 注册中心地址,不允许为空
* @return 注册中心引用,总不返回空
*/
@Adaptive({"protocol"})
Registry getRegistry(URL url);

}

NotifyListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface NotifyListener {

/**
* 当收到服务变更通知时触发。
*
* 通知需处理契约:<br>
* 1. 总是以服务接口和数据类型为维度全量通知,即不会通知一个服务的同类型的部分数据,用户不需要对比上一次通知结果。<br>
* 2. 订阅时的第一次通知,必须是一个服务的所有类型数据的全量通知。<br>
* 3. 中途变更时,允许不同类型的数据分开通知,比如:providers, consumers, routes, overrides,允许只通知其中一种类型,但该类型的数据必须是全量的,不是增量的。<br>
* 4. 如果一种类型的数据为空,需通知一个empty协议并带category参数的标识性URL数据。<br>
* 5. 通知者(即注册中心实现)需保证通知的顺序,比如:单线程推送,队列串行化,带版本对比。<br>
*
* @param urls 已注册信息列表,总不为空,含义同{@link org.apache.dubbo.registry.RegistryService#lookup(URL)}的返回值。
*/
void notify(List<URL> urls);

default void addServiceListener(ServiceInstancesChangedListener instanceListener) {
}
}

dubbo-registry-zookeeper

对于dubbo的注册中心,这里只介绍下zookeeper,这个也是dubbo默认的注册中心。

ZookeeperRegistry

属性
1
2
3
4
5
6
7
8
9
10
// dubbo 在zookeeper注册中默认的根节点
private final static String DEFAULT_ROOT = "dubbo";
// 组的名称 或者说是 根节点的值
private final String root;
// 服务集合
private final Set<String> anyServices = new ConcurrentHashSet<>();
// zk 的监听器,<URL,Map<监听器,>>
private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<>();
// zk 客户端
private final ZookeeperClient zkClient;
构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
// 调用父类FailbackRegistry的构造函数
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// 获取组名称并赋值给root
String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(PATH_SEPARATOR)) {
group = PATH_SEPARATOR + group;
}
this.root = group;
// 连接 Zookeeper
zkClient = zookeeperTransporter.connect(url);
//添加连接状态监听器
zkClient.addStateListener((state) -> {
if (state == StateListener.RECONNECTED) {
logger.warn("Trying to fetch the latest urls, in case there're provider changes during connection loss.\n" +
" Since ephemeral ZNode will not get deleted for a connection lose, " +
"there's no need to re-register url of this instance.");
ZookeeperRegistry.this.fetchLatestAddresses();
} else if (state == StateListener.NEW_SESSION_CREATED) {
logger.warn("Trying to re-register urls and re-subscribe listeners of this instance to registry...");
try {
ZookeeperRegistry.this.recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
} else if (state == StateListener.SESSION_LOST) {
logger.warn("Url of this instance will be deleted from registry soon. " +
"Dubbo client will try to re-register once a new session is created.");
} else if (state == StateListener.SUSPENDED) {

} else if (state == StateListener.CONNECTED) {

}
});
}
服务注册发布和下线取消注册
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void doRegister(URL url) {
try {
// zk生成一个节点
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}

public void doUnregister(URL url) {
try {
// 删除zk节点
zkClient.delete(toUrlPath(url));
} catch (Throwable e) {
throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}

Reference

客官,赏一杯coffee嘛~~~~