不积跬步,无以至千里

Dubbo远程过程调用


最近在考虑写一个小工具,输入是zk注册地址、dubbo接口的类名、方法名、请求参数,返回dubbo接口的调用结果。其中,zk注册地址和请求参数为手工输入,其他输入可根据zk注册地址上的注册服务做选择。

要实现这么一个功能,需要两个基本步骤:

  1. 获取指定zk地址的所有Provider服务
  2. 根据指定的Provider方法,发起远程调用

同时,由于目前的开发测试环境并未与线上环境做隔离,我希望这个工具能够做一个防控,目前考虑的方案是只寻址注册在本地zk的服务,这样当服务打成jar包后,只会部署在开发和测试机上,也就不会对线上服务产生影响。不过这样做依旧会存在源码泄露遭修改的风险,后续有其他更好的方案的话再考虑安全校验的问题。

获取指定zk地址的服务

要做Dubbo相关的工具,首先需要对Dubbo源码有足够了解才行。在Dubbo-Admin项目中,存在类似的获取Dubbo服务的相关功能,可以做一个参考。

这部分功能集中在RegistryContainer类中,需要说明的是,在Dubbo中,URL类中基本包含了Provider、Consumer所有的信息,包括注册协议、注册地址、注册的类信息、方法信息等等。

查看初始化类的DubboNamespaceHandler,可以看到Service相关的信息保存在ServiceBean类中。

这张错综复杂的架构图可以帮助理下Dubbo中各种类的功能,其中Registry通知NotifyListener注册服务的变化,通过RegisterDirector可以根据URL获得指定服务的Invoker。

public void start() {
    String url = ConfigUtils.getProperty(REGISTRY_ADDRESS);
    if (url == null || url.length() == 0) {
        throw new IllegalArgumentException("Please set java start argument: -D" + REGISTRY_ADDRESS + "=zookeeper://127.0.0.1:2181");
    }
    registry = (RegistryService) SpringContainer.getContext().getBean("registryService");
    URL subscribeUrl = new URL(Constants.ADMIN_PROTOCOL, NetUtils.getLocalHost(), 0, "",
            Constants.INTERFACE_KEY, Constants.ANY_VALUE,
            Constants.GROUP_KEY, Constants.ANY_VALUE,
            Constants.VERSION_KEY, Constants.ANY_VALUE,
            Constants.CLASSIFIER_KEY, Constants.ANY_VALUE,
            Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + ","
            + Constants.CONSUMERS_CATEGORY,
            Constants.CHECK_KEY, String.valueOf(false));
    registry.subscribe(subscribeUrl, new NotifyListener() {
        public void notify(List<URL> urls) {
            if (urls == null || urls.size() == 0) {
                return;
            }
            Map<String, List<URL>> proivderMap = new HashMap<String, List<URL>>();
            Map<String, List<URL>> consumerMap = new HashMap<String, List<URL>>();
            for (URL url : urls) {
                String application = url.getParameter(Constants.APPLICATION_KEY);
                if (application != null && application.length() > 0) {
                    applications.add(application);
                }
                String service = url.getServiceInterface();
                services.add(service);
                String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                if (Constants.PROVIDERS_CATEGORY.equals(category)) {
                    if (Constants.EMPTY_PROTOCOL.equals(url.getProtocol())) {
                        serviceProviders.remove(service);
                    } else {
                        List<URL> list = proivderMap.get(service);
                        if (list == null) {
                            list = new ArrayList<URL>();
                            proivderMap.put(service, list);
                        }
                        list.add(url);
                        if (application != null && application.length() > 0) {
                            Set<String> serviceApplications = providerServiceApplications.get(service);
                            if (serviceApplications == null) {
                                providerServiceApplications.put(service, new ConcurrentHashSet<String>());
                                serviceApplications = providerServiceApplications.get(service);
                            }
                            serviceApplications.add(application);

                            Set<String> applicationServices = providerApplicationServices.get(application);
                            if (applicationServices == null) {
                                providerApplicationServices.put(application, new ConcurrentHashSet<String>());
                                applicationServices = providerApplicationServices.get(application);
                            }
                            applicationServices.add(service);
                        }
                    }
                } else if (Constants.CONSUMERS_CATEGORY.equals(category)) {
                    if (Constants.EMPTY_PROTOCOL.equals(url.getProtocol())) {
                        serviceConsumers.remove(service);
                    } else {
                        List<URL> list = consumerMap.get(service);
                        if (list == null) {
                            list = new ArrayList<URL>();
                            consumerMap.put(service, list);
                        }
                        list.add(url);
                        if (application != null && application.length() > 0) {
                            Set<String> serviceApplications = consumerServiceApplications.get(service);
                            if (serviceApplications == null) {
                                consumerServiceApplications.put(service, new ConcurrentHashSet<String>());
                                serviceApplications = consumerServiceApplications.get(service);
                            }
                            serviceApplications.add(application);

                            Set<String> applicationServices = consumerApplicationServices.get(application);
                            if (applicationServices == null) {
                                consumerApplicationServices.put(application, new ConcurrentHashSet<String>());
                                applicationServices = consumerApplicationServices.get(application);
                            }
                            applicationServices.add(service);
                        }

                    }
                }
            }
            if (proivderMap != null && proivderMap.size() > 0) {
                serviceProviders.putAll(proivderMap);
            }
            if (consumerMap != null && consumerMap.size() > 0) {
                serviceConsumers.putAll(consumerMap);
            }
        }
    });
}

` RegistryService.subscribe(URL url, NotifyListener listener);`方法对注册在zk地址和订阅了url地址的服务做了监听,当节点数据发生变更时,可以接收到通知,并更新相关信息。RegisterFactory根据SPI扩展在配置文件中指定,从而使得在服务启动时,可根据Dubbo配置获取到对应的RegisterService示例。如此完成了启动时对配置的Dubbo相关服务的注册和订阅过程。

以上可以看到服务订阅过程为:RegisterFactory->RegistryService->NotifyListener.在结构图中,Invoke和Directory是相互关联的,按照这个调用链,可以继续理下远程过程调用的过程。

远程过程调用

通过Debug跟踪Consumer对Provider的调用过程,

InvokerInvocationHandler.invoke//jM=dk的动态代理

RpcInvocation

MockClusterInvoker.invoke

AbstractClusterInvoker.invoke

FailoverClusterInvoker.doInvoke

DubboInvoker.doInvoke

MonitorFilter.invoke

ProtocolFilterWrapper.buildInvokerChain