// 注册失败集合 privatefinal ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>(); // 取消注册失败集合 privatefinal ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>(); // 订阅失败集合 privatefinal ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>(); // 取消订阅集合 privatefinal ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>();
/** * The time in milliseconds the retryExecutor will wait */ privatefinalint retryPeriod;
// Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry privatefinal HashedWheelTimer retryTimer;
// since the retry task will not be very much. 128 ticks is enough. retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128); }
@Override publicvoidregister(URL url){ if (!acceptable(url)) { logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type."); return; } super.register(url); removeFailedRegistered(url); removeFailedUnregistered(url); try { // 这里调用具体的注册中心,比如Zookeeper doRegister(url); } catch (Exception e) { Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } thrownew IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); }
publicZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter){ // 调用父类FailbackRegistry的构造函数 super(url); if (url.isAnyHost()) { thrownew IllegalStateException("registry address == null"); } // 获取组名称并赋值给root String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT); if (!group.startsWith(PATH_SEPARATOR)) { group = PATH_SEPARATOR + group; } this.root = group; // 连接 Zookeeper zkClient = zookeeperTransporter.connect(url); //添加连接状态监听器 zkClient.addStateListener((state) -> { if (state == StateListener.RECONNECTED) { logger.warn("Trying to fetch the latest urls, in case there're provider changes during connection loss.\n" + " Since ephemeral ZNode will not get deleted for a connection lose, " + "there's no need to re-register url of this instance."); ZookeeperRegistry.this.fetchLatestAddresses(); } elseif (state == StateListener.NEW_SESSION_CREATED) { logger.warn("Trying to re-register urls and re-subscribe listeners of this instance to registry..."); try { ZookeeperRegistry.this.recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } elseif (state == StateListener.SESSION_LOST) { logger.warn("Url of this instance will be deleted from registry soon. " + "Dubbo client will try to re-register once a new session is created."); } elseif (state == StateListener.SUSPENDED) {