最近在考虑写一个小工具,输入是zk注册地址、dubbo接口的类名、方法名、请求参数,返回dubbo接口的调用结果。其中,zk注册地址和请求参数为手工输入,其他输入可根据zk注册地址上的注册服务做选择。
要实现这么一个功能,需要两个基本步骤:
- 获取指定zk地址的所有Provider服务
- 根据指定的Provider方法,发起远程调用
同时,由于目前的开发测试环境并未与线上环境做隔离,我希望这个工具能够做一个防控,目前考虑的方案是只寻址注册在本地zk的服务,这样当服务打成jar包后,只会部署在开发和测试机上,也就不会对线上服务产生影响。不过这样做依旧会存在源码泄露遭修改的风险,后续有其他更好的方案的话再考虑安全校验的问题。
获取指定zk地址的服务
要做Dubbo相关的工具,首先需要对Dubbo源码有足够了解才行。在Dubbo-Admin项目中,存在类似的获取Dubbo服务的相关功能,可以做一个参考。
这部分功能集中在RegistryContainer类中,需要说明的是,在Dubbo中,URL类中基本包含了Provider、Consumer所有的信息,包括注册协议、注册地址、注册的类信息、方法信息等等。
查看初始化类的DubboNamespaceHandler,可以看到Service相关的信息保存在ServiceBean类中。

这张错综复杂的架构图可以帮助理下Dubbo中各种类的功能,其中Registry通知NotifyListener注册服务的变化,通过RegisterDirector可以根据URL获得指定服务的Invoker。
public void start() {
String url = ConfigUtils.getProperty(REGISTRY_ADDRESS);
if (url == null || url.length() == 0) {
throw new IllegalArgumentException("Please set java start argument: -D" + REGISTRY_ADDRESS + "=zookeeper://127.0.0.1:2181");
}
registry = (RegistryService) SpringContainer.getContext().getBean("registryService");
URL subscribeUrl = new URL(Constants.ADMIN_PROTOCOL, NetUtils.getLocalHost(), 0, "",
Constants.INTERFACE_KEY, Constants.ANY_VALUE,
Constants.GROUP_KEY, Constants.ANY_VALUE,
Constants.VERSION_KEY, Constants.ANY_VALUE,
Constants.CLASSIFIER_KEY, Constants.ANY_VALUE,
Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + ","
+ Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false));
registry.subscribe(subscribeUrl, new NotifyListener() {
public void notify(List<URL> urls) {
if (urls == null || urls.size() == 0) {
return;
}
Map<String, List<URL>> proivderMap = new HashMap<String, List<URL>>();
Map<String, List<URL>> consumerMap = new HashMap<String, List<URL>>();
for (URL url : urls) {
String application = url.getParameter(Constants.APPLICATION_KEY);
if (application != null && application.length() > 0) {
applications.add(application);
}
String service = url.getServiceInterface();
services.add(service);
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.PROVIDERS_CATEGORY.equals(category)) {
if (Constants.EMPTY_PROTOCOL.equals(url.getProtocol())) {
serviceProviders.remove(service);
} else {
List<URL> list = proivderMap.get(service);
if (list == null) {
list = new ArrayList<URL>();
proivderMap.put(service, list);
}
list.add(url);
if (application != null && application.length() > 0) {
Set<String> serviceApplications = providerServiceApplications.get(service);
if (serviceApplications == null) {
providerServiceApplications.put(service, new ConcurrentHashSet<String>());
serviceApplications = providerServiceApplications.get(service);
}
serviceApplications.add(application);
Set<String> applicationServices = providerApplicationServices.get(application);
if (applicationServices == null) {
providerApplicationServices.put(application, new ConcurrentHashSet<String>());
applicationServices = providerApplicationServices.get(application);
}
applicationServices.add(service);
}
}
} else if (Constants.CONSUMERS_CATEGORY.equals(category)) {
if (Constants.EMPTY_PROTOCOL.equals(url.getProtocol())) {
serviceConsumers.remove(service);
} else {
List<URL> list = consumerMap.get(service);
if (list == null) {
list = new ArrayList<URL>();
consumerMap.put(service, list);
}
list.add(url);
if (application != null && application.length() > 0) {
Set<String> serviceApplications = consumerServiceApplications.get(service);
if (serviceApplications == null) {
consumerServiceApplications.put(service, new ConcurrentHashSet<String>());
serviceApplications = consumerServiceApplications.get(service);
}
serviceApplications.add(application);
Set<String> applicationServices = consumerApplicationServices.get(application);
if (applicationServices == null) {
consumerApplicationServices.put(application, new ConcurrentHashSet<String>());
applicationServices = consumerApplicationServices.get(application);
}
applicationServices.add(service);
}
}
}
}
if (proivderMap != null && proivderMap.size() > 0) {
serviceProviders.putAll(proivderMap);
}
if (consumerMap != null && consumerMap.size() > 0) {
serviceConsumers.putAll(consumerMap);
}
}
});
}
` RegistryService.subscribe(URL url, NotifyListener listener);`方法对注册在zk地址和订阅了url地址的服务做了监听,当节点数据发生变更时,可以接收到通知,并更新相关信息。RegisterFactory根据SPI扩展在配置文件中指定,从而使得在服务启动时,可根据Dubbo配置获取到对应的RegisterService示例。如此完成了启动时对配置的Dubbo相关服务的注册和订阅过程。
以上可以看到服务订阅过程为:RegisterFactory->RegistryService->NotifyListener.在结构图中,Invoke和Directory是相互关联的,按照这个调用链,可以继续理下远程过程调用的过程。
远程过程调用
通过Debug跟踪Consumer对Provider的调用过程,
InvokerInvocationHandler.invoke//jM=dk的动态代理
RpcInvocation
MockClusterInvoker.invoke
AbstractClusterInvoker.invoke
FailoverClusterInvoker.doInvoke
DubboInvoker.doInvoke
MonitorFilter.invoke
ProtocolFilterWrapper.buildInvokerChain