You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1249 lines
56 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

## JDK 的 SPI 思想
SPI即 Service Provider Interface。在面向对象的设计里面模块之间推荐基于接口编程而不是对实现类进行硬编码这样做也是为了模块设计的可插拔原则。
比较典型的应用,如 JDBCJava 定义了一套 JDBC 的接口,但是 Java 本身并不提供对 JDBC 的实现类,而是开发者根据项目实际使用的数据库来选择驱动程序 jar 包,比如 mysql你就将 mysql-jdbc-connector.jar 引入进来oracle你就将 oracle-jdbc-connector.jar 引入进来。在系统跑的时候,碰到你使用 jdbc 的接口,他会在底层使用你引入的那个 jar 中提供的实现类。
## Dubbo 的 SPI 扩展机制原理
dubbo 自己实现了一套 SPI 机制,并对 JDK 的 SPI 进行了改进。
1. JDK 标准的 SPI 只能通过遍历来查找扩展点和实例化有可能导致一次性加载所有的扩展点如果不是所有的扩展点都被用到就会导致资源的浪费。dubbo 每个扩展点都有多种实现例如com.alibaba.dubbo.rpc.Protocol 接口有 InjvmProtocol、DubboProtocol、RmiProtocol、HttpProtocol、HessianProtocol 等实现,如果只是用到其中一个实现,可是加载了全部的实现,会导致资源的浪费。
2. 对配置文件中扩展实现的格式的修改例如META-INF/dubbo/com.xxx.Protocol 里的 com.foo.XxxProtocol 格式 改为了 xxx = com.foo.XxxProtocol 这种以键值对的形式这样做的目的是为了让我们更容易的定位到问题。比如由于第三方库不存在无法初始化导致无法加载扩展点“A”当用户配置使用 A 时dubbo 就会报无法加载扩展点的错误,而不是报哪些扩展点的实现加载失败以及错误原因,**这是因为原来的配置格式没有记录扩展名的 id导致 dubbo 无法抛出较为精准的异常,这会加大排查问题的难度**。所以改成 key-value 的形式来进行配置。
3. dubbo 的 SPI 机制增加了对 IOC、AOP 的支持,一个扩展点可以直接通过 setter 注入到其他扩展点。
下面我们看一下 Dubbo 的 SPI 扩展机制实现的结构目录。
![avatar](https://fastly.jsdelivr.net/gh/doocs/source-code-hunter@main/images/Dubbo/SPI组件目录结构.png)
### SPI 注解
首先看一下 SPI 注解。在某个接口上加上 @SPI 注解后,表明该接口为可扩展接口。比如,协议扩展接口 Protocol如果使用者在 <dubbo:protocol />、<dubbo:service />、<dubbo:reference /> 都没有指定 protocol 属性 的话,那么就默认使用 DubboProtocol 作为接口 Protocol 的实现,因为在 Protocol 上有 @SPI("dubbo")注解。而这个 protocol 属性值 或者默认值会被当作该接口的实现类中的一个 keydubbo 会去 META-INF.dubbo.internal 下的 com.alibaba.dubbo.rpc.Protocol 文件中找该 key 对应的 value源码如下。
```java
/**
* 协议接口
* Protocol 是服务域,它是 Invoker 暴露和引用的主功能入口,它负责 Invoker 的生命周期管理。
*/
@SPI("dubbo")
public interface Protocol {
/**
* Get default port when user doesn't config the port.
*/
int getDefaultPort();
/**
* 暴露远程服务:
* 1. 协议在接收请求时应记录请求来源方地址信息RpcContext.getContext().setRemoteAddress();<br>
* 2. export() 必须是幂等的,也就是暴露同一个 URL 的 Invoker 两次,和暴露一次没有区别。<br>
* 3. export() 传入的 Invoker 由框架实现并传入,协议不需要关心。<br>
*
* @param <T> 服务的类型
* @param invoker 服务的执行体
* @return exporter 暴露服务的引用,用于取消暴露
* @throws RpcException 当暴露服务出错时抛出,比如端口已占用
*/
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
/**
* 引用远程服务:<br>
* 1. 当用户调用 refer() 所返回的 Invoker 对象的 invoke() 方法时,协议需相应执行同 URL 远端 export() 传入的 Invoker 对象的 invoke() 方法。<br>
* 2. refer() 返回的 Invoker 由协议实现,协议通常需要在此 Invoker 中发送远程请求。<br>
* 3. 当 url 中有设置 check=false 时,连接失败不能抛出异常,并内部自动恢复。<br>
*
* @param <T> 服务的类型
* @param type 服务的类型
* @param url 远程服务的URL地址
* @return invoker 服务的本地代理
* @throws RpcException 当连接服务提供方失败时抛出
*/
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
/**
* 释放协议:<br>
* 1. 取消该协议所有已经暴露和引用的服务。<br>
* 2. 释放协议所占用的所有资源,比如连接和端口。<br>
* 3. 协议在释放后,依然能暴露和引用新的服务。<br>
*/
void destroy();
}
/**
* 扩展点接口的标识。
* 扩展点声明配置文件,格式修改。
* 以Protocol示例配置文件META-INF/dubbo/com.xxx.Protocol内容
* 由
* com.foo.XxxProtocol
* com.foo.YyyProtocol
* 改成使用KV格式
* xxx=com.foo.XxxProtocol
* yyy=com.foo.YyyProtocol
*
* 原因:
* 当扩展点的static字段或方法签名上引用了三方库
* 如果三方库不存在,会导致类初始化失败,
* Extension标识Dubbo就拿不到了异常信息就和配置信息对应不起来。
*
* 比如:
* Extension("mina")加载失败,
* 当用户配置使用mina时就会报找不到扩展点mina
* 而不是报加载扩展点失败,等难以定位具体问题的错误。
*
* @author william.liangf
* @author ding.lid
* @export
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface SPI {
/**
* default extension name
*
* 默认拓展名
*/
String value() default "";
}
// 配置文件 com.alibaba.dubbo.rpc.Protocol 中的内容
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
```
value 就是该 Protocol 接口 的实现类 DubboProtocol这样就做到了 SPI 扩展。
### ExtensionLoader
ExtensionLoader 扩展加载器,这是 dubbo 实现 SPI 扩展机制 的核心,几乎所有实现的逻辑都被封装在 ExtensionLoader 中,其源码如下。
```java
/**
* 拓展加载器Dubbo使用的扩展点获取
* <ul>
* <li>自动注入关联扩展点。</li>
* <li>自动Wrap上扩展点的Wrap类。</li>
* <li>缺省获得的的扩展点是一个Adaptive Instance。</li>
* </ul>
*
* 另外,该类同时是 ExtensionLoader 的管理容器,例如 {@link #EXTENSION_INSTANCES} 、{@link #EXTENSION_INSTANCES} 属性。
*/
@SuppressWarnings("deprecation")
public class ExtensionLoader<T> {
private static final Logger logger = LoggerFactory.getLogger(ExtensionLoader.class);
private static final String SERVICES_DIRECTORY = "META-INF/services/";
private static final String DUBBO_DIRECTORY = "META-INF/dubbo/";
private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";
private static final Pattern NAME_SEPARATOR = Pattern.compile("\\s*[,]+\\s*");
// ============================== 静态属性 ==============================
/**
* 拓展加载器集合
*
* key拓展接口
*/
private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<Class<?>, ExtensionLoader<?>>();
/**
* 拓展实现类集合
*
* key拓展实现类
* value拓展对象。
*
* 例如key 为 Class<AccessLogFilter>
* value 为 AccessLogFilter 对象
*/
private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<Class<?>, Object>();
// ============================== 对象属性 ==============================
/**
* 拓展接口。
* 例如Protocol
*/
private final Class<?> type;
/**
* 对象工厂
*
* 用于调用 {@link #injectExtension(Object)} 方法,向拓展对象注入依赖的属性。
*
* 例如StubProxyFactoryWrapper 中有 `Protocol protocol` 属性。
*/
private final ExtensionFactory objectFactory;
/**
* 缓存的拓展名与拓展类的映射。
*
* 和 {@link #cachedClasses} 的 KV 对调。
*
* 通过 {@link #loadExtensionClasses} 加载
*/
private final ConcurrentMap<Class<?>, String> cachedNames = new ConcurrentHashMap<Class<?>, String>();
/**
* 缓存的拓展实现类集合。
*
* 不包含如下两种类型:
* 1. 自适应拓展实现类。例如 AdaptiveExtensionFactory
* 2. 带唯一参数为拓展接口的构造方法的实现类,或者说拓展 Wrapper 实现类。例如ProtocolFilterWrapper 。
* 拓展 Wrapper 实现类,会添加到 {@link #cachedWrapperClasses} 中
*
* 通过 {@link #loadExtensionClasses} 加载
*/
private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<Map<String, Class<?>>>();
/**
* 拓展名与 @Activate 的映射
*
* 例如AccessLogFilter。
*
* 用于 {@link #getActivateExtension(URL, String)}
*/
private final Map<String, Activate> cachedActivates = new ConcurrentHashMap<String, Activate>();
/**
* 缓存的拓展对象集合
*
* key拓展名
* value拓展对象
*
* 例如Protocol 拓展
* keydubbo valueDubboProtocol
* keyinjvm valueInjvmProtocol
*
* 通过 {@link #loadExtensionClasses} 加载
*/
private final ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<String, Holder<Object>>();
/**
* 缓存的自适应( Adaptive )拓展对象
*/
private final Holder<Object> cachedAdaptiveInstance = new Holder<Object>();
/**
* 缓存的自适应拓展对象的类
*
* {@link #getAdaptiveExtensionClass()}
*/
private volatile Class<?> cachedAdaptiveClass = null;
/**
* 缓存的默认拓展名
*
* 通过 {@link SPI} 注解获得
*/
private String cachedDefaultName;
/**
* 创建 {@link #cachedAdaptiveInstance} 时发生的异常。
*
* 发生异常后,不再创建,参见 {@link #createAdaptiveExtension()}
*/
private volatile Throwable createAdaptiveInstanceError;
/**
* 拓展 Wrapper 实现类集合
*
* 带唯一参数为拓展接口的构造方法的实现类
*
* 通过 {@link #loadExtensionClasses} 加载
*/
private Set<Class<?>> cachedWrapperClasses;
/**
* 拓展名 与 加载对应拓展类发生的异常 的 映射
*
* key拓展名
* value异常
*
* 在 {@link #loadFile(Map, String)} 时,记录
*/
private Map<String, IllegalStateException> exceptions = new ConcurrentHashMap<String, IllegalStateException>();
private ExtensionLoader(Class<?> type) {
this.type = type;
objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}
/**
* 是否包含 @SPI 注解
*
* @param type 类
* @param <T> 泛型
* @return 是否包含
*/
private static <T> boolean withExtensionAnnotation(Class<T> type) {
return type.isAnnotationPresent(SPI.class);
}
/**
* 根据拓展点的接口,获得拓展加载器
*
* @param type 接口
* @param <T> 泛型
* @return 加载器
*/
@SuppressWarnings("unchecked")
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
if (type == null)
throw new IllegalArgumentException("Extension type == null");
// 必须是接口
if (!type.isInterface()) {
throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
}
// 必须包含 @SPI 注解
if (!withExtensionAnnotation(type)) {
throw new IllegalArgumentException("Extension type(" + type +
") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
}
// 获得接口对应的拓展点加载器
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
return loader;
}
private static ClassLoader findClassLoader() {
return ExtensionLoader.class.getClassLoader();
}
public String getExtensionName(T extensionInstance) {
return getExtensionName(extensionInstance.getClass());
}
public String getExtensionName(Class<?> extensionClass) {
return cachedNames.get(extensionClass);
}
public List<T> getActivateExtension(URL url, String key) {
return getActivateExtension(url, key, null);
}
public List<T> getActivateExtension(URL url, String[] values) {
return getActivateExtension(url, values, null);
}
/**
* 获得符合自动激活条件的拓展对象数组
*/
public List<T> getActivateExtension(URL url, String key, String group) {
// 从 Dubbo URL 获得参数值
String value = url.getParameter(key);
// 获得符合自动激活条件的拓展对象数组
return getActivateExtension(url, value == null || value.length() == 0 ? null : Constants.COMMA_SPLIT_PATTERN.split(value), group);
}
/**
* 获得符合自动激活条件的拓展对象数组
*/
public List<T> getActivateExtension(URL url, String[] values, String group) {
List<T> exts = new ArrayList<T>();
List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
// 处理自动激活的拓展对象们
// 判断不存在配置 `"-name"` 。例如,<dubbo:service filter="-default" /> ,代表移除所有默认过滤器。
if (!names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
// 获得拓展实现类数组
getExtensionClasses();
// 循环
for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
String name = entry.getKey();
Activate activate = entry.getValue();
if (isMatchGroup(group, activate.group())) { // 匹配分组
// 获得拓展对象
T ext = getExtension(name);
if (!names.contains(name) // 不包含在自定义配置里。如果包含,会在下面的代码处理。
&& !names.contains(Constants.REMOVE_VALUE_PREFIX + name) // 判断是否配置移除。例如 <dubbo:service filter="-monitor" />,则 MonitorFilter 会被移除
&& isActive(activate, url)) { // 判断是否激活
exts.add(ext);
}
}
}
// 排序
Collections.sort(exts, ActivateComparator.COMPARATOR);
}
// 处理自定义配置的拓展对象们。例如在 <dubbo:service filter="demo" /> ,代表需要加入 DemoFilter (这个是笔者自定义的)。
List<T> usrs = new ArrayList<T>();
for (int i = 0; i < names.size(); i++) {
String name = names.get(i);
if (!name.startsWith(Constants.REMOVE_VALUE_PREFIX) && !names.contains(Constants.REMOVE_VALUE_PREFIX + name)) { // 判断非移除的
// 将配置的自定义在自动激活的拓展对象们前面。例如,<dubbo:service filter="demo,default,demo2" /> ,则 DemoFilter 就会放在默认的过滤器前面。
if (Constants.DEFAULT_KEY.equals(name)) {
if (!usrs.isEmpty()) {
exts.addAll(0, usrs);
usrs.clear();
}
} else {
// 获得拓展对象
T ext = getExtension(name);
usrs.add(ext);
}
}
}
// 添加到结果集
if (!usrs.isEmpty()) {
exts.addAll(usrs);
}
return exts;
}
/**
* 匹配分组
*
* @param group 过滤的分组条件。若为空,无需过滤
* @param groups 配置的分组
* @return 是否匹配
*/
private boolean isMatchGroup(String group, String[] groups) {
// 为空,无需过滤
if (group == null || group.length() == 0) {
return true;
}
// 匹配
if (groups != null && groups.length > 0) {
for (String g : groups) {
if (group.equals(g)) {
return true;
}
}
}
return false;
}
/**
* 是否激活,通过 Dubbo URL 中是否存在参数名为 `@Activate.value` ,并且参数值非空。
*
* @param activate 自动激活注解
* @param url Dubbo URL
* @return 是否
*/
private boolean isActive(Activate activate, URL url) {
String[] keys = activate.value();
if (keys.length == 0) {
return true;
}
for (String key : keys) {
for (Map.Entry<String, String> entry : url.getParameters().entrySet()) {
String k = entry.getKey();
String v = entry.getValue();
if ((k.equals(key) || k.endsWith("." + key))
&& ConfigUtils.isNotEmpty(v)) {
return true;
}
}
}
return false;
}
/**
* 返回扩展点实例,如果没有指定的扩展点或是还没加载(即实例化)则返回<code>null</code>。
* 注意:此方法不会触发扩展点的加载。
* 一般应该调用{@link #getExtension(String)}方法获得扩展,这个方法会触发扩展点加载。
*/
@SuppressWarnings("unchecked")
public T getLoadedExtension(String name) {
if (name == null || name.length() == 0)
throw new IllegalArgumentException("Extension name == null");
Holder<Object> holder = cachedInstances.get(name);
if (holder == null) {
cachedInstances.putIfAbsent(name, new Holder<Object>());
holder = cachedInstances.get(name);
}
return (T) holder.get();
}
/**
* 返回已经加载的扩展点的名字。
* 一般应该调用 getSupportedExtensions() 方法获得扩展,这个方法会返回所有的扩展点。
*/
public Set<String> getLoadedExtensions() {
return Collections.unmodifiableSet(new TreeSet<String>(cachedInstances.keySet()));
}
/**
* 返回指定名字的扩展对象。如果指定名字的扩展不存在,则抛异常 {@link IllegalStateException}.
*
* @param name 拓展名
* @return 拓展对象
*/
@SuppressWarnings("unchecked")
public T getExtension(String name) {
if (name == null || name.length() == 0)
throw new IllegalArgumentException("Extension name == null");
// 查找 默认的 拓展对象
if ("true".equals(name)) {
return getDefaultExtension();
}
// 从 缓存中 获得对应的拓展对象
Holder<Object> holder = cachedInstances.get(name);
if (holder == null) {
cachedInstances.putIfAbsent(name, new Holder<Object>());
holder = cachedInstances.get(name);
}
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
// 从 缓存中 未获取到,进行创建缓存对象。
if (instance == null) {
instance = createExtension(name);
// 设置创建对象到缓存中
holder.set(instance);
}
}
}
return (T) instance;
}
/**
* 返回缺省的扩展,如果没有设置则返回<code>null</code>。
*/
public T getDefaultExtension() {
getExtensionClasses();
// 如果为 true ,不能继续调用 `#getExtension(true)` 方法,会形成死循环。
if (null == cachedDefaultName || cachedDefaultName.length() == 0
|| "true".equals(cachedDefaultName)) {
return null;
}
return getExtension(cachedDefaultName);
}
public boolean hasExtension(String name) {
if (name == null || name.length() == 0)
throw new IllegalArgumentException("Extension name == null");
try {
return getExtensionClass(name) != null;
} catch (Throwable t) {
return false;
}
}
public Set<String> getSupportedExtensions() {
Map<String, Class<?>> clazzes = getExtensionClasses();
return Collections.unmodifiableSet(new TreeSet<String>(clazzes.keySet()));
}
/**
* 返回缺省的扩展点名,如果没有设置缺省则返回<code>null</code>。
*/
public String getDefaultExtensionName() {
getExtensionClasses();
return cachedDefaultName;
}
/**
* 编程方式添加新扩展点。
*
* @param name 扩展点名
* @param clazz 扩展点类
* @throws IllegalStateException 要添加扩展点名已经存在。
*/
public void addExtension(String name, Class<?> clazz) {
getExtensionClasses(); // load classes
if (!type.isAssignableFrom(clazz)) {
throw new IllegalStateException("Input type " +
clazz + "not implement Extension " + type);
}
if (clazz.isInterface()) {
throw new IllegalStateException("Input type " +
clazz + "can not be interface!");
}
if (!clazz.isAnnotationPresent(Adaptive.class)) {
if (StringUtils.isBlank(name)) {
throw new IllegalStateException("Extension name is blank (Extension " + type + ")!");
}
if (cachedClasses.get().containsKey(name)) {
throw new IllegalStateException("Extension name " +
name + " already existed(Extension " + type + ")!");
}
cachedNames.put(clazz, name);
cachedClasses.get().put(name, clazz);
} else {
if (cachedAdaptiveClass != null) {
throw new IllegalStateException("Adaptive Extension already existed(Extension " + type + ")!");
}
cachedAdaptiveClass = clazz;
}
}
/**
* 编程方式添加替换已有扩展点。
*
* @param name 扩展点名
* @param clazz 扩展点类
* @throws IllegalStateException 要添加扩展点名已经存在。
* @deprecated 不推荐应用使用,一般只在测试时可以使用
*/
@Deprecated
public void replaceExtension(String name, Class<?> clazz) {
getExtensionClasses(); // load classes
if (!type.isAssignableFrom(clazz)) {
throw new IllegalStateException("Input type " +
clazz + "not implement Extension " + type);
}
if (clazz.isInterface()) {
throw new IllegalStateException("Input type " +
clazz + "can not be interface!");
}
if (!clazz.isAnnotationPresent(Adaptive.class)) {
if (StringUtils.isBlank(name)) {
throw new IllegalStateException("Extension name is blank (Extension " + type + ")!");
}
if (!cachedClasses.get().containsKey(name)) {
throw new IllegalStateException("Extension name " +
name + " not existed(Extension " + type + ")!");
}
cachedNames.put(clazz, name);
cachedClasses.get().put(name, clazz);
cachedInstances.remove(name);
} else {
if (cachedAdaptiveClass == null) {
throw new IllegalStateException("Adaptive Extension not existed(Extension " + type + ")!");
}
cachedAdaptiveClass = clazz;
cachedAdaptiveInstance.set(null);
}
}
/**
* 获得自适应拓展对象
*
* @return 拓展对象
*/
@SuppressWarnings("unchecked")
public T getAdaptiveExtension() {
// 从缓存中,获得自适应拓展对象
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
// 若之前未创建报错,
if (createAdaptiveInstanceError == null) {
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
// 创建自适应拓展对象
instance = createAdaptiveExtension();
// 设置到缓存
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
// 记录异常
createAdaptiveInstanceError = t;
throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
}
}
}
// 若之前创建报错,则抛出异常 IllegalStateException
} else {
throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
}
}
return (T) instance;
}
/**
* 获得拓展名不存在时的异常
*
* @param name 拓展名
* @return 异常
*/
private IllegalStateException findException(String name) {
// 在 `#loadFile(...)` 方法中,加载时,发生异常
for (Map.Entry<String, IllegalStateException> entry : exceptions.entrySet()) {
if (entry.getKey().toLowerCase().contains(name.toLowerCase())) {
return entry.getValue();
}
}
// 生成不存在该拓展类实现的异常。
StringBuilder buf = new StringBuilder("No such extension " + type.getName() + " by name " + name);
int i = 1;
for (Map.Entry<String, IllegalStateException> entry : exceptions.entrySet()) {
if (i == 1) {
buf.append(", possible causes: ");
}
buf.append("\r\n(");
buf.append(i++);
buf.append(") ");
buf.append(entry.getKey());
buf.append(":\r\n");
buf.append(StringUtils.toString(entry.getValue()));
}
return new IllegalStateException(buf.toString());
}
/**
* 创建拓展名的拓展对象,并缓存。
*
* @param name 拓展名
* @return 拓展对象
*/
@SuppressWarnings("unchecked")
private T createExtension(String name) {
// 获得拓展名对应的拓展实现类
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name); // 抛出异常
}
try {
// 从缓存中,获得拓展对象。
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
// 当缓存不存在时,创建拓展对象,并添加到缓存中。
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
// 注入依赖的属性
injectExtension(instance);
// 创建 Wrapper 拓展对象
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
type + ") could not be instantiated: " + t.getMessage(), t);
}
}
/**
* 注入依赖的属性
*
* @param instance 拓展对象
* @return 拓展对象
*/
private T injectExtension(T instance) {
try {
if (objectFactory != null) {
for (Method method : instance.getClass().getMethods()) {
if (method.getName().startsWith("set")
&& method.getParameterTypes().length == 1
&& Modifier.isPublic(method.getModifiers())) { // setting && public 方法
// 获得属性的类型
Class<?> pt = method.getParameterTypes()[0];
try {
// 获得属性
String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
// 获得属性值
Object object = objectFactory.getExtension(pt, property);
// 设置属性值
if (object != null) {
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error("fail to inject via method " + method.getName()
+ " of interface " + type.getName() + ": " + e.getMessage(), e);
}
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}
private Class<?> getExtensionClass(String name) {
if (type == null)
throw new IllegalArgumentException("Extension type == null");
if (name == null)
throw new IllegalArgumentException("Extension name == null");
// 获得拓展实现类
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null)
throw new IllegalStateException("No such extension \"" + name + "\" for " + type.getName() + "!");
return clazz;
}
/**
* 获得拓展实现类数组
*
* @return 拓展实现类数组
*/
private Map<String, Class<?>> getExtensionClasses() {
// 从缓存中,获得拓展实现类数组
Map<String, Class<?>> classes = cachedClasses.get();
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
// 从配置文件中,加载拓展实现类数组
classes = loadExtensionClasses();
// 设置到缓存中
cachedClasses.set(classes);
}
}
}
return classes;
}
/**
* 加载拓展实现类数组
*
* @return 拓展实现类数组
*/
private Map<String, Class<?>> loadExtensionClasses() {
// 通过 @SPI 注解,获得默认的拓展实现类名
final SPI defaultAnnotation = type.getAnnotation(SPI.class);
if (defaultAnnotation != null) {
String value = defaultAnnotation.value();
if ((value = value.trim()).length() > 0) {
String[] names = NAME_SEPARATOR.split(value);
if (names.length > 1) {
throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
+ ": " + Arrays.toString(names));
}
if (names.length == 1) cachedDefaultName = names[0];
}
}
// 从配置文件中,加载拓展实现类数组
Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
loadFile(extensionClasses, DUBBO_DIRECTORY);
loadFile(extensionClasses, SERVICES_DIRECTORY);
return extensionClasses;
}
/**
* 从一个配置文件中,加载拓展实现类数组。
*
* @param extensionClasses 拓展类名数组
* @param dir 文件名
*/
private void loadFile(Map<String, Class<?>> extensionClasses, String dir) {
// 完整的文件名
String fileName = dir + type.getName();
try {
Enumeration<java.net.URL> urls;
// 获得文件名对应的所有文件数组
ClassLoader classLoader = findClassLoader();
if (classLoader != null) {
urls = classLoader.getResources(fileName);
} else {
urls = ClassLoader.getSystemResources(fileName);
}
// 遍历文件数组
if (urls != null) {
while (urls.hasMoreElements()) {
java.net.URL url = urls.nextElement();
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8"));
try {
String line;
while ((line = reader.readLine()) != null) {
// 跳过当前被注释掉的情况,例如 #spring=xxxxxxxxx
final int ci = line.indexOf('#');
if (ci >= 0) line = line.substring(0, ci);
line = line.trim();
if (line.length() > 0) {
try {
// 拆分key=value 的配置格式
String name = null;
int i = line.indexOf('=');
if (i > 0) {
name = line.substring(0, i).trim();
line = line.substring(i + 1).trim();
}
if (line.length() > 0) {
// 判断拓展实现,是否实现拓展接口
Class<?> clazz = Class.forName(line, true, classLoader);
if (!type.isAssignableFrom(clazz)) {
throw new IllegalStateException("Error when load extension class(interface: " +
type + ", class line: " + clazz.getName() + "), class "
+ clazz.getName() + "is not subtype of interface.");
}
// 缓存自适应拓展对象的类到 `cachedAdaptiveClass`
if (clazz.isAnnotationPresent(Adaptive.class)) {
if (cachedAdaptiveClass == null) {
cachedAdaptiveClass = clazz;
} else if (!cachedAdaptiveClass.equals(clazz)) {
throw new IllegalStateException("More than 1 adaptive class found: "
+ cachedAdaptiveClass.getClass().getName()
+ ", " + clazz.getClass().getName());
}
} else {
// 缓存拓展 Wrapper 实现类到 `cachedWrapperClasses`
try {
clazz.getConstructor(type);
Set<Class<?>> wrappers = cachedWrapperClasses;
if (wrappers == null) {
cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
wrappers = cachedWrapperClasses;
}
wrappers.add(clazz);
// 缓存拓展实现类到 `extensionClasses`
} catch (NoSuchMethodException e) {
clazz.getConstructor();
// 未配置拓展名自动生成。例如DemoFilter 为 demo 。主要用于兼容 Java SPI 的配置。
if (name == null || name.length() == 0) {
name = findAnnotationName(clazz);
if (name == null || name.length() == 0) {
if (clazz.getSimpleName().length() > type.getSimpleName().length()
&& clazz.getSimpleName().endsWith(type.getSimpleName())) {
name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase();
} else {
throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url);
}
}
}
// 获得拓展名,可以是数组,有多个拓展名。
String[] names = NAME_SEPARATOR.split(name);
if (names != null && names.length > 0) {
// 缓存 @Activate 到 `cachedActivates` 。
Activate activate = clazz.getAnnotation(Activate.class);
if (activate != null) {
cachedActivates.put(names[0], activate);
}
for (String n : names) {
// 缓存到 `cachedNames`
if (!cachedNames.containsKey(clazz)) {
cachedNames.put(clazz, n);
}
// 缓存拓展实现类到 `extensionClasses`
Class<?> c = extensionClasses.get(n);
if (c == null) {
extensionClasses.put(n, clazz);
} else if (c != clazz) {
throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
}
}
}
}
}
}
} catch (Throwable t) {
// 发生异常,记录到异常集合
IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t);
exceptions.put(line, e);
}
}
} // end of while read lines
} finally {
reader.close();
}
} catch (Throwable t) {
logger.error("Exception when load extension class(interface: " +
type + ", class file: " + url + ") in " + url, t);
}
} // end of while urls
}
} catch (Throwable t) {
logger.error("Exception when load extension class(interface: " +
type + ", description file: " + fileName + ").", t);
}
}
@SuppressWarnings("deprecation")
private String findAnnotationName(Class<?> clazz) {
com.alibaba.dubbo.common.Extension extension = clazz.getAnnotation(com.alibaba.dubbo.common.Extension.class);
if (extension == null) {
String name = clazz.getSimpleName();
if (name.endsWith(type.getSimpleName())) {
name = name.substring(0, name.length() - type.getSimpleName().length());
}
return name.toLowerCase();
}
return extension.value();
}
/**
* 创建自适应拓展对象
*
* @return 拓展对象
*/
@SuppressWarnings("unchecked")
private T createAdaptiveExtension() {
try {
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can not create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}
/**
* @return 自适应拓展类
*/
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
/**
* 自动生成自适应拓展的代码实现,并编译后返回该类。
*
* @return 类
*/
private Class<?> createAdaptiveExtensionClass() {
// 自动生成自适应拓展的代码实现的字符串
String code = createAdaptiveExtensionClassCode();
// 编译代码,并返回该类
ClassLoader classLoader = findClassLoader();
com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}
/**
* 自动生成自适应拓展的代码实现的字符串
*
* @return 代码字符串
*/
private String createAdaptiveExtensionClassCode() {
StringBuilder codeBuidler = new StringBuilder();
// 遍历方法数组,判断有 @Adaptive 注解
Method[] methods = type.getMethods();
boolean hasAdaptiveAnnotation = false;
for (Method m : methods) {
if (m.isAnnotationPresent(Adaptive.class)) {
hasAdaptiveAnnotation = true;
break;
}
}
// no need to generate adaptive class since there's no adaptive method found.
// 完全没有Adaptive方法则不需要生成Adaptive类
if (!hasAdaptiveAnnotation)
throw new IllegalStateException("No adaptive method on extension " + type.getName() + ", refuse to create the adaptive class!");
// 生成代码package 和 import
codeBuidler.append("package " + type.getPackage().getName() + ";");
codeBuidler.append("\nimport " + ExtensionLoader.class.getName() + ";");
// 生成代码:类名
codeBuidler.append("\npublic class " + type.getSimpleName() + "$Adaptive" + " implements " + type.getCanonicalName() + " {");
// 循环方法
for (Method method : methods) {
Class<?> rt = method.getReturnType(); // 返回类型
Class<?>[] pts = method.getParameterTypes(); // 参数类型数组
Class<?>[] ets = method.getExceptionTypes(); // 异常类型数组
Adaptive adaptiveAnnotation = method.getAnnotation(Adaptive.class);
StringBuilder code = new StringBuilder(512); // 方法体的代码
// 非 @Adaptive 注解,生成代码:生成的方法为直接抛出异常。因为,非自适应的接口不应该被调用。
if (adaptiveAnnotation == null) {
code.append("throw new UnsupportedOperationException(\"method ")
.append(method.toString()).append(" of interface ")
.append(type.getName()).append(" is not adaptive method!\");");
// @Adaptive 注解,生成方法体的代码
} else {
// 寻找 Dubbo URL 参数的位置
int urlTypeIndex = -1;
for (int i = 0; i < pts.length; ++i) {
if (pts[i].equals(URL.class)) {
urlTypeIndex = i;
break;
}
}
// found parameter in URL type
// 有类型为URL的参数生成代码生成校验 URL 非空的代码
if (urlTypeIndex != -1) {
// Null Point check
String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"url == null\");",
urlTypeIndex);
code.append(s);
s = String.format("\n%s url = arg%d;", URL.class.getName(), urlTypeIndex);
code.append(s);
}
// did not find parameter in URL type
// 参数没有URL类型
else {
String attribMethod = null;
// find URL getter method
// 找到参数的URL属性 。例如Invoker 有 `#getURL()` 方法。
LBL_PTS:
for (int i = 0; i < pts.length; ++i) {
Method[] ms = pts[i].getMethods();
for (Method m : ms) {
String name = m.getName();
if ((name.startsWith("get") || name.length() > 3)
&& Modifier.isPublic(m.getModifiers())
&& !Modifier.isStatic(m.getModifiers())
&& m.getParameterTypes().length == 0
&& m.getReturnType() == URL.class) { // pubic && getting 方法
urlTypeIndex = i;
attribMethod = name;
break LBL_PTS;
}
}
}
// 未找到,抛出异常。
if (attribMethod == null) {
throw new IllegalStateException("fail to create adaptive class for interface " + type.getName()
+ ": not found url parameter or url attribute in parameters of method " + method.getName());
}
// 生成代码:校验 URL 非空
// Null point check
String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"%s argument == null\");",
urlTypeIndex, pts[urlTypeIndex].getName());
code.append(s);
s = String.format("\nif (arg%d.%s() == null) throw new IllegalArgumentException(\"%s argument %s() == null\");",
urlTypeIndex, attribMethod, pts[urlTypeIndex].getName(), attribMethod);
code.append(s);
// 生成 `URL url = arg%d.%s();` 的代码
s = String.format("%s url = arg%d.%s();", URL.class.getName(), urlTypeIndex, attribMethod);
code.append(s);
}
String[] value = adaptiveAnnotation.value();
// value is not set, use the value generated from class name as the key
// 没有设置Key则使用“扩展点接口名的点分隔 作为Key
if (value.length == 0) {
char[] charArray = type.getSimpleName().toCharArray();
StringBuilder sb = new StringBuilder(128);
for (int i = 0; i < charArray.length; i++) {
if (Character.isUpperCase(charArray[i])) {
if (i != 0) {
sb.append(".");
}
sb.append(Character.toLowerCase(charArray[i]));
} else {
sb.append(charArray[i]);
}
}
value = new String[]{sb.toString()};
}
// 判断是否有 Invocation 参数
boolean hasInvocation = false;
for (int i = 0; i < pts.length; ++i) {
if (pts[i].getName().equals("com.alibaba.dubbo.rpc.Invocation")) {
// 生成代码:校验 Invocation 非空
// Null Point check
String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"invocation == null\");", i);
code.append(s);
// 生成代码:获得方法名
s = String.format("\nString methodName = arg%d.getMethodName();", i);
code.append(s);
// 标记有 Invocation 参数
hasInvocation = true;
break;
}
}
// 默认拓展名
String defaultExtName = cachedDefaultName;
// 获得最终拓展名的代码字符串,例如:
// 【简单】1. url.getParameter("proxy", "javassist")
// 【复杂】2. url.getParameter(key1, url.getParameter(key2, defaultExtName))
String getNameCode = null;
for (int i = value.length - 1; i >= 0; --i) { // 倒序的原因因为是顺序获取参数参见【复杂】2. 的例子
if (i == value.length - 1) {
if (null != defaultExtName) {
if (!"protocol".equals(value[i]))
if (hasInvocation) // 当【有】 Invocation 参数时,使用 `URL#getMethodParameter()` 方法。
getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
else // 当【非】 Invocation 参数时,使用 `URL#getParameter()` 方法。
getNameCode = String.format("url.getParameter(\"%s\", \"%s\")", value[i], defaultExtName);
else // 当属性名是 "protocol" ,使用 `URL#getProtocl()` 方法获取。
getNameCode = String.format("( url.getProtocol() == null ? \"%s\" : url.getProtocol() )", defaultExtName);
} else {
if (!"protocol".equals(value[i]))
if (hasInvocation)
getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName); // 此处的 defaultExtName ,可以去掉的。
else
getNameCode = String.format("url.getParameter(\"%s\")", value[i]);
else
getNameCode = "url.getProtocol()";
}
} else {
if (!"protocol".equals(value[i]))
if (hasInvocation)
getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
else
getNameCode = String.format("url.getParameter(\"%s\", %s)", value[i], getNameCode);
else
getNameCode = String.format("url.getProtocol() == null ? (%s) : url.getProtocol()", getNameCode);
}
}
// 生成代码获取参数的代码。例如String extName = url.getParameter("proxy", "javassist");
code.append("\nString extName = ").append(getNameCode).append(";");
// check extName == null?
String s = String.format("\nif(extName == null) " +
"throw new IllegalStateException(\"Fail to get extension(%s) name from url(\" + url.toString() + \") use keys(%s)\");",
type.getName(), Arrays.toString(value));
code.append(s);
// 生成代码:拓展对象,调用方法。例如
// `com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class)
// .getExtension(extName);` 。
s = String.format("\n%s extension = (%<s)%s.getExtensionLoader(%s.class).getExtension(extName);",
type.getName(), ExtensionLoader.class.getSimpleName(), type.getName());
code.append(s);
// return statement
if (!rt.equals(void.class)) {
code.append("\nreturn ");
}
s = String.format("extension.%s(", method.getName());
code.append(s);
for (int i = 0; i < pts.length; i++) {
if (i != 0)
code.append(", ");
code.append("arg").append(i);
}
code.append(");");
}
// 生成方法
codeBuidler.append("\npublic " + rt.getCanonicalName() + " " + method.getName() + "(");
for (int i = 0; i < pts.length; i++) {
if (i > 0) {
codeBuidler.append(", ");
}
codeBuidler.append(pts[i].getCanonicalName());
codeBuidler.append(" ");
codeBuidler.append("arg" + i);
}
codeBuidler.append(")");
if (ets.length > 0) {
codeBuidler.append(" throws "); // 异常
for (int i = 0; i < ets.length; i++) {
if (i > 0) {
codeBuidler.append(", ");
}
codeBuidler.append(ets[i].getCanonicalName());
}
}
codeBuidler.append(" {");
codeBuidler.append(code.toString());
codeBuidler.append("\n}");
}
// 生成类末尾的 `}`
codeBuidler.append("\n}");
// 调试,打印生成的代码
if (logger.isDebugEnabled()) {
logger.debug(codeBuidler.toString());
}
return codeBuidler.toString();
}
@Override
public String toString() {
return this.getClass().getName() + "[" + type.getName() + "]";
}
}
```