commit
94ec304ac1
@ -0,0 +1,98 @@
|
||||
# Spring Import
|
||||
- Author: [HuiFer](https://github.com/huifer)
|
||||
- 源码阅读仓库: [SourceHot-spring](https://github.com/SourceHot/spring-framework-read)
|
||||
|
||||
## 分析
|
||||
- org.springframework.context.annotation.Import
|
||||
```java
|
||||
@Target(ElementType.TYPE)
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Documented
|
||||
public @interface Import {
|
||||
|
||||
/**
|
||||
* {@link Configuration @Configuration}, {@link ImportSelector},
|
||||
* {@link ImportBeanDefinitionRegistrar}, or regular component classes to import.
|
||||
*
|
||||
* 需要导入的类
|
||||
*/
|
||||
Class<?>[] value();
|
||||
}
|
||||
```
|
||||
### ImportBeanDefinitionRegistrar
|
||||
- 注册Import Bean
|
||||
- `org.springframework.context.annotation.ImportBeanDefinitionRegistrar`
|
||||
```java
|
||||
public interface ImportBeanDefinitionRegistrar {
|
||||
|
||||
/**
|
||||
* Register bean definitions as necessary based on the given annotation metadata of
|
||||
* the importing {@code @Configuration} class.
|
||||
* <p>Note that {@link BeanDefinitionRegistryPostProcessor} types may <em>not</em> be
|
||||
* registered here, due to lifecycle constraints related to {@code @Configuration}
|
||||
* class processing.
|
||||
*
|
||||
* 对import value属性的注册
|
||||
* @param importingClassMetadata annotation metadata of the importing class
|
||||
* @param registry current bean definition registry
|
||||
*/
|
||||
void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry);
|
||||
|
||||
}
|
||||
```
|
||||
- 两个实现类
|
||||
1. `org.springframework.context.annotation.AutoProxyRegistrar`
|
||||
2. `org.springframework.context.annotation.AspectJAutoProxyRegistrar`
|
||||
|
||||
#### AutoProxyRegistrar
|
||||
```java
|
||||
public class AutoProxyRegistrar implements ImportBeanDefinitionRegistrar {
|
||||
|
||||
private final Log logger = LogFactory.getLog(getClass());
|
||||
|
||||
/**
|
||||
* 注册import bean定义
|
||||
*/
|
||||
@Override
|
||||
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
|
||||
boolean candidateFound = false;
|
||||
// 获取注解
|
||||
Set<String> annTypes = importingClassMetadata.getAnnotationTypes();
|
||||
for (String annType : annTypes) {
|
||||
// 注解属性
|
||||
AnnotationAttributes candidate = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
|
||||
if (candidate == null) {
|
||||
continue;
|
||||
}
|
||||
// 获取 mode 属性
|
||||
Object mode = candidate.get("mode");
|
||||
// 获取代理对象
|
||||
Object proxyTargetClass = candidate.get("proxyTargetClass");
|
||||
if (mode != null && proxyTargetClass != null && AdviceMode.class == mode.getClass() &&
|
||||
Boolean.class == proxyTargetClass.getClass()) {
|
||||
candidateFound = true;
|
||||
if (mode == AdviceMode.PROXY) {
|
||||
// 注册
|
||||
AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);
|
||||
if ((Boolean) proxyTargetClass) {
|
||||
AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!candidateFound && logger.isInfoEnabled()) {
|
||||
String name = getClass().getSimpleName();
|
||||
logger.info(String.format("%s was imported but no annotations were found " +
|
||||
"having both 'mode' and 'proxyTargetClass' attributes of type " +
|
||||
"AdviceMode and boolean respectively. This means that auto proxy " +
|
||||
"creator registration and configuration may not have occurred as " +
|
||||
"intended, and components may not be proxied as expected. Check to " +
|
||||
"ensure that %s has been @Import'ed on the same class where these " +
|
||||
"annotations are declared; otherwise remove the import of %s " +
|
||||
"altogether.", name, name, name));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
```
|
@ -0,0 +1,302 @@
|
||||
# Spring 定时任务
|
||||
- Author: [HuiFer](https://github.com/huifer)
|
||||
- 源码阅读仓库: [SourceHot-spring](https://github.com/SourceHot/spring-framework-read)
|
||||
|
||||
## EnableScheduling
|
||||
- 首先关注的类为启动定时任务的注解`@EnableScheduling`
|
||||
```java
|
||||
@Target(ElementType.TYPE)
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Import(SchedulingConfiguration.class)
|
||||
@Documented
|
||||
public @interface EnableScheduling {
|
||||
|
||||
}
|
||||
```
|
||||
|
||||
## SchedulingConfiguration
|
||||
- 注册定时任务相关信息
|
||||
```java
|
||||
@Configuration
|
||||
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
|
||||
public class SchedulingConfiguration {
|
||||
|
||||
/**
|
||||
* 开启定时任务
|
||||
* @return
|
||||
*/
|
||||
@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
|
||||
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
|
||||
public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
|
||||
// 注册 ScheduledAnnotationBeanPostProcessor
|
||||
return new ScheduledAnnotationBeanPostProcessor();
|
||||
}
|
||||
|
||||
}
|
||||
```
|
||||
|
||||
## ScheduledAnnotationBeanPostProcessor
|
||||
- 关注application事件,以及spring生命周期相关的接口实现
|
||||
```java
|
||||
/**
|
||||
* application 事件
|
||||
* @param event the event to respond to
|
||||
*/
|
||||
@Override
|
||||
public void onApplicationEvent(ContextRefreshedEvent event) {
|
||||
if (event.getApplicationContext() == this.applicationContext) {
|
||||
// Running in an ApplicationContext -> register tasks this late...
|
||||
// giving other ContextRefreshedEvent listeners a chance to perform
|
||||
// their work at the same time (e.g. Spring Batch's job registration).
|
||||
// 注册定时任务
|
||||
finishRegistration();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
```java
|
||||
@Override
|
||||
public Object postProcessAfterInitialization(Object bean, String beanName) {
|
||||
if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
|
||||
bean instanceof ScheduledExecutorService) {
|
||||
// Ignore AOP infrastructure such as scoped proxies.
|
||||
return bean;
|
||||
}
|
||||
|
||||
// 当前类
|
||||
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
|
||||
if (!this.nonAnnotatedClasses.contains(targetClass)) {
|
||||
// 方法扫描,存在 Scheduled、Schedules 注解的全部扫描
|
||||
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
|
||||
(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
|
||||
|
||||
Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
|
||||
method, Scheduled.class, Schedules.class);
|
||||
return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
|
||||
});
|
||||
if (annotatedMethods.isEmpty()) {
|
||||
this.nonAnnotatedClasses.add(targetClass);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
|
||||
}
|
||||
}
|
||||
else {
|
||||
// Non-empty set of methods
|
||||
annotatedMethods.forEach((method, scheduledMethods) ->
|
||||
// 处理 scheduled 相关信息
|
||||
scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
|
||||
"': " + annotatedMethods);
|
||||
}
|
||||
}
|
||||
}
|
||||
return bean;
|
||||
}
|
||||
```
|
||||
|
||||
- 处理定时任务注解
|
||||
```java
|
||||
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
|
||||
try {
|
||||
Runnable runnable = createRunnable(bean, method);
|
||||
boolean processedSchedule = false;
|
||||
String errorMessage =
|
||||
"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
|
||||
|
||||
Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
|
||||
|
||||
// Determine initial delay
|
||||
// 是否延迟执行
|
||||
long initialDelay = scheduled.initialDelay();
|
||||
// 延迟执行时间
|
||||
String initialDelayString = scheduled.initialDelayString();
|
||||
// 是否有延迟执行的时间
|
||||
if (StringUtils.hasText(initialDelayString)) {
|
||||
Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
|
||||
if (this.embeddedValueResolver != null) {
|
||||
initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
|
||||
}
|
||||
if (StringUtils.hasLength(initialDelayString)) {
|
||||
try {
|
||||
initialDelay = parseDelayAsLong(initialDelayString);
|
||||
}
|
||||
catch (RuntimeException ex) {
|
||||
throw new IllegalArgumentException(
|
||||
"Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check cron expression
|
||||
// 获取cron表达式
|
||||
String cron = scheduled.cron();
|
||||
// cron表达式是否存在
|
||||
if (StringUtils.hasText(cron)) {
|
||||
// 获取时区
|
||||
String zone = scheduled.zone();
|
||||
if (this.embeddedValueResolver != null) {
|
||||
// 字符串转换
|
||||
cron = this.embeddedValueResolver.resolveStringValue(cron);
|
||||
zone = this.embeddedValueResolver.resolveStringValue(zone);
|
||||
}
|
||||
if (StringUtils.hasLength(cron)) {
|
||||
// cron 是否延迟
|
||||
Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
|
||||
processedSchedule = true;
|
||||
if (!Scheduled.CRON_DISABLED.equals(cron)) {
|
||||
TimeZone timeZone;
|
||||
if (StringUtils.hasText(zone)) {
|
||||
// 时区解析
|
||||
timeZone = StringUtils.parseTimeZoneString(zone);
|
||||
}
|
||||
else {
|
||||
// 默认时区获取
|
||||
timeZone = TimeZone.getDefault();
|
||||
}
|
||||
// 创建任务
|
||||
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// At this point we don't need to differentiate between initial delay set or not anymore
|
||||
if (initialDelay < 0) {
|
||||
initialDelay = 0;
|
||||
}
|
||||
|
||||
// Check fixed delay
|
||||
// 获取间隔调用时间
|
||||
long fixedDelay = scheduled.fixedDelay();
|
||||
// 间隔时间>0
|
||||
if (fixedDelay >= 0) {
|
||||
Assert.isTrue(!processedSchedule, errorMessage);
|
||||
processedSchedule = true;
|
||||
// 创建任务,间隔时间定时任务
|
||||
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
|
||||
}
|
||||
// 延迟时间
|
||||
String fixedDelayString = scheduled.fixedDelayString();
|
||||
if (StringUtils.hasText(fixedDelayString)) {
|
||||
if (this.embeddedValueResolver != null) {
|
||||
fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
|
||||
}
|
||||
if (StringUtils.hasLength(fixedDelayString)) {
|
||||
Assert.isTrue(!processedSchedule, errorMessage);
|
||||
processedSchedule = true;
|
||||
try {
|
||||
fixedDelay = parseDelayAsLong(fixedDelayString);
|
||||
}
|
||||
catch (RuntimeException ex) {
|
||||
throw new IllegalArgumentException(
|
||||
"Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
|
||||
}
|
||||
// 创建延迟时间任务
|
||||
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
|
||||
}
|
||||
}
|
||||
|
||||
// Check fixed rate
|
||||
// 获取调用频率
|
||||
long fixedRate = scheduled.fixedRate();
|
||||
if (fixedRate >= 0) {
|
||||
Assert.isTrue(!processedSchedule, errorMessage);
|
||||
processedSchedule = true;
|
||||
// 创建调用频率的定时任务
|
||||
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
|
||||
}
|
||||
String fixedRateString = scheduled.fixedRateString();
|
||||
if (StringUtils.hasText(fixedRateString)) {
|
||||
if (this.embeddedValueResolver != null) {
|
||||
fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
|
||||
}
|
||||
if (StringUtils.hasLength(fixedRateString)) {
|
||||
Assert.isTrue(!processedSchedule, errorMessage);
|
||||
processedSchedule = true;
|
||||
try {
|
||||
fixedRate = parseDelayAsLong(fixedRateString);
|
||||
}
|
||||
catch (RuntimeException ex) {
|
||||
throw new IllegalArgumentException(
|
||||
"Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
|
||||
}
|
||||
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
|
||||
}
|
||||
}
|
||||
|
||||
// Check whether we had any attribute set
|
||||
Assert.isTrue(processedSchedule, errorMessage);
|
||||
|
||||
// Finally register the scheduled tasks
|
||||
synchronized (this.scheduledTasks) {
|
||||
// 定时任务注册
|
||||
Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
|
||||
regTasks.addAll(tasks);
|
||||
}
|
||||
}
|
||||
catch (IllegalArgumentException ex) {
|
||||
throw new IllegalStateException(
|
||||
"Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
## 定时任务
|
||||
- CronTask
|
||||
- cron定时任务
|
||||
- FixedDelayTask
|
||||
- 间隔时间的定时任务
|
||||
- FixedRateTask
|
||||
- 调用频率的定时任务
|
||||
- ScheduledTask
|
||||
- 定时任务对象
|
||||
### cron 表达式解析
|
||||
- `org.springframework.scheduling.support.CronSequenceGenerator.doParse`
|
||||
```java
|
||||
private void doParse(String[] fields) {
|
||||
setNumberHits(this.seconds, fields[0], 0, 60);
|
||||
setNumberHits(this.minutes, fields[1], 0, 60);
|
||||
setNumberHits(this.hours, fields[2], 0, 24);
|
||||
setDaysOfMonth(this.daysOfMonth, fields[3]);
|
||||
setMonths(this.months, fields[4]);
|
||||
setDays(this.daysOfWeek, replaceOrdinals(fields[5], "SUN,MON,TUE,WED,THU,FRI,SAT"), 8);
|
||||
|
||||
if (this.daysOfWeek.get(7)) {
|
||||
// Sunday can be represented as 0 or 7
|
||||
this.daysOfWeek.set(0);
|
||||
this.daysOfWeek.clear(7);
|
||||
}
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
### 执行定时任务
|
||||
- 这里以 CronTask 任务进行分析,其他定时任务同理
|
||||
- `org.springframework.scheduling.config.ScheduledTaskRegistrar.scheduleCronTask`
|
||||
```java
|
||||
@Nullable
|
||||
public ScheduledTask scheduleCronTask(CronTask task) {
|
||||
// 从未执行的任务列表中删除,并且获取这个任务
|
||||
ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
|
||||
boolean newTask = false;
|
||||
// 没有这个任务
|
||||
if (scheduledTask == null) {
|
||||
scheduledTask = new ScheduledTask(task);
|
||||
newTask = true;
|
||||
}
|
||||
// 任务调度器是否为空
|
||||
if (this.taskScheduler != null) {
|
||||
|
||||
scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
|
||||
}
|
||||
else {
|
||||
// 添加到cron任务列表
|
||||
addCronTask(task);
|
||||
// 保存到没有执行的任务中
|
||||
this.unresolvedTasks.put(task, scheduledTask);
|
||||
}
|
||||
return (newTask ? scheduledTask : null);
|
||||
}
|
||||
|
||||
```
|
@ -0,0 +1,411 @@
|
||||
# Spring EnableJms 注解
|
||||
- Author: [HuiFer](https://github.com/huifer)
|
||||
- 源码阅读仓库: [SourceHot-spring](https://github.com/SourceHot/spring-framework-read)
|
||||
- 源码路径: `org.springframework.jms.annotation.EnableJms`
|
||||
|
||||
## 源码分析
|
||||
```java
|
||||
@Target(ElementType.TYPE)
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Documented
|
||||
@Import(JmsBootstrapConfiguration.class)
|
||||
public @interface EnableJms {
|
||||
}
|
||||
```
|
||||
|
||||
- 该类的切入点在`@Import(JmsBootstrapConfiguration.class)` , 直接看`JmsBootstrapConfiguration`就可以了
|
||||
|
||||
|
||||
```java
|
||||
@Configuration
|
||||
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
|
||||
public class JmsBootstrapConfiguration {
|
||||
|
||||
/**
|
||||
* jms 监听注解后处理, 将{@link JmsListener} 注册到{@link JmsListenerContainerFactory}
|
||||
* @return
|
||||
*/
|
||||
@Bean(name = JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
|
||||
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
|
||||
public JmsListenerAnnotationBeanPostProcessor jmsListenerAnnotationProcessor() {
|
||||
return new JmsListenerAnnotationBeanPostProcessor();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* JMS 监听注册
|
||||
* @return
|
||||
*/
|
||||
@Bean(name = JmsListenerConfigUtils.JMS_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
|
||||
public JmsListenerEndpointRegistry defaultJmsListenerEndpointRegistry() {
|
||||
return new JmsListenerEndpointRegistry();
|
||||
}
|
||||
|
||||
}
|
||||
```
|
||||
|
||||
### JmsListenerAnnotationBeanPostProcessor
|
||||
|
||||
类图
|
||||
|
||||
![image-20200304085303580](/images/springmessage/image-20200304085303580.png)
|
||||
|
||||
|
||||
|
||||
- 主要关注
|
||||
|
||||
1. **afterSingletonsInstantiated**
|
||||
2. **postProcessAfterInitialization**
|
||||
|
||||
#### afterSingletonsInstantiated
|
||||
```JAVA
|
||||
@Override
|
||||
public void afterSingletonsInstantiated() {
|
||||
// Remove resolved singleton classes from cache
|
||||
this.nonAnnotatedClasses.clear();
|
||||
|
||||
if (this.beanFactory instanceof ListableBeanFactory) {
|
||||
// Apply JmsListenerConfigurer beans from the BeanFactory, if any
|
||||
// 根据类型获取bean
|
||||
Map<String, JmsListenerConfigurer> beans =
|
||||
((ListableBeanFactory) this.beanFactory).getBeansOfType(JmsListenerConfigurer.class);
|
||||
|
||||
List<JmsListenerConfigurer> configurers = new ArrayList<>(beans.values());
|
||||
// 排序 Order
|
||||
AnnotationAwareOrderComparator.sort(configurers);
|
||||
for (JmsListenerConfigurer configurer : configurers) {
|
||||
// 放入jms监听配置,开发者自定义
|
||||
configurer.configureJmsListeners(this.registrar);
|
||||
}
|
||||
}
|
||||
|
||||
if (this.containerFactoryBeanName != null) {
|
||||
this.registrar.setContainerFactoryBeanName(this.containerFactoryBeanName);
|
||||
}
|
||||
|
||||
if (this.registrar.getEndpointRegistry() == null) {
|
||||
// Determine JmsListenerEndpointRegistry bean from the BeanFactory
|
||||
if (this.endpointRegistry == null) {
|
||||
Assert.state(this.beanFactory != null, "BeanFactory must be set to find endpoint registry by bean name");
|
||||
this.endpointRegistry = this.beanFactory.getBean(
|
||||
JmsListenerConfigUtils.JMS_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME, JmsListenerEndpointRegistry.class);
|
||||
}
|
||||
this.registrar.setEndpointRegistry(this.endpointRegistry);
|
||||
}
|
||||
|
||||
|
||||
// Set the custom handler method factory once resolved by the configurer
|
||||
MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
|
||||
if (handlerMethodFactory != null) {
|
||||
this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);
|
||||
}
|
||||
|
||||
// Actually register all listeners
|
||||
this.registrar.afterPropertiesSet();
|
||||
}
|
||||
```
|
||||
|
||||
- 关注最后一行`this.registrar.afterPropertiesSet()`
|
||||
|
||||
```JAVA
|
||||
@Override
|
||||
public void afterPropertiesSet() {
|
||||
registerAllEndpoints();
|
||||
}
|
||||
|
||||
protected void registerAllEndpoints() {
|
||||
Assert.state(this.endpointRegistry != null, "No JmsListenerEndpointRegistry set");
|
||||
synchronized (this.mutex) {
|
||||
for (JmsListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
|
||||
// 注册监听
|
||||
this.endpointRegistry.registerListenerContainer(
|
||||
descriptor.endpoint, resolveContainerFactory(descriptor));
|
||||
}
|
||||
this.startImmediately = true; // trigger immediate startup
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
- 注册监听在下面分析会讲详见下文
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
#### postProcessAfterInitialization
|
||||
|
||||
```JAVA
|
||||
@Override
|
||||
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
|
||||
if (bean instanceof AopInfrastructureBean || bean instanceof JmsListenerContainerFactory ||
|
||||
bean instanceof JmsListenerEndpointRegistry) {
|
||||
// Ignore AOP infrastructure such as scoped proxies.
|
||||
return bean;
|
||||
}
|
||||
|
||||
// 获取 bean 的代理对象.class
|
||||
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
|
||||
if (!this.nonAnnotatedClasses.contains(targetClass)) {
|
||||
Map<Method, Set<JmsListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
|
||||
(MethodIntrospector.MetadataLookup<Set<JmsListener>>) method -> {
|
||||
Set<JmsListener> listenerMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
|
||||
method, JmsListener.class, JmsListeners.class);
|
||||
return (!listenerMethods.isEmpty() ? listenerMethods : null);
|
||||
});
|
||||
if (annotatedMethods.isEmpty()) {
|
||||
this.nonAnnotatedClasses.add(targetClass);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("No @JmsListener annotations found on bean type: " + targetClass);
|
||||
}
|
||||
} else {
|
||||
// Non-empty set of methods
|
||||
annotatedMethods.forEach((method, listeners) ->
|
||||
listeners.forEach(listener -> processJmsListener(listener, method, bean)));
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(annotatedMethods.size() + " @JmsListener methods processed on bean '" + beanName +
|
||||
"': " + annotatedMethods);
|
||||
}
|
||||
}
|
||||
}
|
||||
return bean;
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
|
||||
|
||||
```JAVA
|
||||
protected void processJmsListener(JmsListener jmsListener, Method mostSpecificMethod, Object bean) {
|
||||
Method invocableMethod = AopUtils.selectInvocableMethod(mostSpecificMethod, bean.getClass());
|
||||
|
||||
// 设置 监听方法信息
|
||||
MethodJmsListenerEndpoint endpoint = createMethodJmsListenerEndpoint();
|
||||
endpoint.setBean(bean);
|
||||
endpoint.setMethod(invocableMethod);
|
||||
endpoint.setMostSpecificMethod(mostSpecificMethod);
|
||||
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
|
||||
endpoint.setEmbeddedValueResolver(this.embeddedValueResolver);
|
||||
endpoint.setBeanFactory(this.beanFactory);
|
||||
endpoint.setId(getEndpointId(jmsListener));
|
||||
endpoint.setDestination(resolve(jmsListener.destination()));
|
||||
if (StringUtils.hasText(jmsListener.selector())) {
|
||||
endpoint.setSelector(resolve(jmsListener.selector()));
|
||||
}
|
||||
if (StringUtils.hasText(jmsListener.subscription())) {
|
||||
endpoint.setSubscription(resolve(jmsListener.subscription()));
|
||||
}
|
||||
if (StringUtils.hasText(jmsListener.concurrency())) {
|
||||
endpoint.setConcurrency(resolve(jmsListener.concurrency()));
|
||||
}
|
||||
|
||||
JmsListenerContainerFactory<?> factory = null;
|
||||
String containerFactoryBeanName = resolve(jmsListener.containerFactory());
|
||||
if (StringUtils.hasText(containerFactoryBeanName)) {
|
||||
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
|
||||
try {
|
||||
factory = this.beanFactory.getBean(containerFactoryBeanName, JmsListenerContainerFactory.class);
|
||||
} catch (NoSuchBeanDefinitionException ex) {
|
||||
throw new BeanInitializationException("Could not register JMS listener endpoint on [" +
|
||||
mostSpecificMethod + "], no " + JmsListenerContainerFactory.class.getSimpleName() +
|
||||
" with id '" + containerFactoryBeanName + "' was found in the application context", ex);
|
||||
}
|
||||
}
|
||||
|
||||
// 注册监听点 到 JmsListenerContainerFactory
|
||||
this.registrar.registerEndpoint(endpoint, factory);
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
- 将监听点注册的重要方法 **`org.springframework.jms.config.JmsListenerEndpointRegistrar#registerEndpoint(org.springframework.jms.config.JmsListenerEndpoint, org.springframework.jms.config.JmsListenerContainerFactory<?>)`**
|
||||
|
||||
```java
|
||||
public void registerEndpoint(JmsListenerEndpoint endpoint, @Nullable JmsListenerContainerFactory<?> factory) {
|
||||
Assert.notNull(endpoint, "Endpoint must not be null");
|
||||
Assert.hasText(endpoint.getId(), "Endpoint id must be set");
|
||||
|
||||
// Factory may be null, we defer the resolution right before actually creating the container
|
||||
// jms 监听点描述
|
||||
JmsListenerEndpointDescriptor descriptor = new JmsListenerEndpointDescriptor(endpoint, factory);
|
||||
|
||||
synchronized (this.mutex) {
|
||||
if (this.startImmediately) { // register and start immediately
|
||||
Assert.state(this.endpointRegistry != null, "No JmsListenerEndpointRegistry set");
|
||||
// 注册
|
||||
this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
|
||||
resolveContainerFactory(descriptor), true);
|
||||
}
|
||||
else {
|
||||
this.endpointDescriptors.add(descriptor);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
|
||||
|
||||
- `org.springframework.jms.config.JmsListenerEndpointRegistry#registerListenerContainer(org.springframework.jms.config.JmsListenerEndpoint, org.springframework.jms.config.JmsListenerContainerFactory<?>, boolean)`
|
||||
|
||||
```java
|
||||
public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory,
|
||||
boolean startImmediately) {
|
||||
|
||||
Assert.notNull(endpoint, "Endpoint must not be null");
|
||||
Assert.notNull(factory, "Factory must not be null");
|
||||
String id = endpoint.getId();
|
||||
Assert.hasText(id, "Endpoint id must be set");
|
||||
|
||||
synchronized (this.listenerContainers) {
|
||||
if (this.listenerContainers.containsKey(id)) {
|
||||
throw new IllegalStateException("Another endpoint is already registered with id '" + id + "'");
|
||||
}
|
||||
// 创建消息监听容器
|
||||
MessageListenerContainer container = createListenerContainer(endpoint, factory);
|
||||
this.listenerContainers.put(id, container);
|
||||
if (startImmediately) {
|
||||
// 启动消息监听容器
|
||||
startIfNecessary(container);
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
|
||||
- `org.springframework.jms.config.JmsListenerEndpointRegistry#createListenerContainer`
|
||||
|
||||
```java
|
||||
/**
|
||||
* Create and start a new container using the specified factory.
|
||||
* 创建监听容器
|
||||
*/
|
||||
protected MessageListenerContainer createListenerContainer(JmsListenerEndpoint endpoint,
|
||||
JmsListenerContainerFactory<?> factory) {
|
||||
|
||||
// 创建监听 容器
|
||||
MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
|
||||
|
||||
if (listenerContainer instanceof InitializingBean) {
|
||||
try {
|
||||
// 后置方法
|
||||
((InitializingBean) listenerContainer).afterPropertiesSet();
|
||||
} catch (Exception ex) {
|
||||
throw new BeanInitializationException("Failed to initialize message listener container", ex);
|
||||
}
|
||||
}
|
||||
|
||||
int containerPhase = listenerContainer.getPhase();
|
||||
if (containerPhase < Integer.MAX_VALUE) { // a custom phase value
|
||||
if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {
|
||||
throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " +
|
||||
this.phase + " vs " + containerPhase);
|
||||
}
|
||||
this.phase = listenerContainer.getPhase();
|
||||
}
|
||||
|
||||
return listenerContainer;
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
- 关键接口`JmsListenerContainerFactory<C extends MessageListenerContainer>`
|
||||
|
||||
```JAVA
|
||||
public interface JmsListenerContainerFactory<C extends MessageListenerContainer> {
|
||||
|
||||
/**
|
||||
* Create a {@link MessageListenerContainer} for the given {@link JmsListenerEndpoint}.
|
||||
* 创建肩痛容器
|
||||
* @param endpoint the endpoint to configure
|
||||
* @return the created container
|
||||
*/
|
||||
C createListenerContainer(JmsListenerEndpoint endpoint);
|
||||
|
||||
}
|
||||
```
|
||||
|
||||
![image-20200304092154712](/images/springmessage/image-20200304092154712.png)
|
||||
|
||||
|
||||
|
||||
- 注册完成后是否立即启动
|
||||
|
||||
```java
|
||||
this.listenerContainers.put(id, container);
|
||||
if (startImmediately) {
|
||||
// 启动消息监听容器
|
||||
startIfNecessary(container);
|
||||
}
|
||||
|
||||
private void startIfNecessary(MessageListenerContainer listenerContainer) {
|
||||
if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
|
||||
listenerContainer.start();
|
||||
}
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
- 具体实现: `org.springframework.jms.listener.AbstractJmsListeningContainer#start`
|
||||
|
||||
- 执行完`start`方法就结束了`processJmsListener`的调用链路, `postProcessAfterInitialization` 也结束了
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
### JmsListenerEndpointRegistry
|
||||
|
||||
- 这个类辅助**JmsListenerAnnotationBeanPostProcessor** 处理
|
||||
|
||||
#### registerListenerContainer
|
||||
|
||||
```java
|
||||
/**
|
||||
* Create a message listener container for the given {@link JmsListenerEndpoint}.
|
||||
* <p>This create the necessary infrastructure to honor that endpoint
|
||||
* with regards to its configuration.
|
||||
* <p>The {@code startImmediately} flag determines if the container should be
|
||||
* started immediately.
|
||||
* <p>
|
||||
* 注册监听容器
|
||||
*
|
||||
* @param endpoint the endpoint to add
|
||||
* 监听点
|
||||
* @param factory the listener factory to use
|
||||
* 监听容器工厂
|
||||
* @param startImmediately start the container immediately if necessary
|
||||
* 是否立即启动容器
|
||||
* @see #getListenerContainers()
|
||||
* @see #getListenerContainer(String)
|
||||
*/
|
||||
public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory,
|
||||
boolean startImmediately) {
|
||||
|
||||
Assert.notNull(endpoint, "Endpoint must not be null");
|
||||
Assert.notNull(factory, "Factory must not be null");
|
||||
String id = endpoint.getId();
|
||||
Assert.hasText(id, "Endpoint id must be set");
|
||||
|
||||
synchronized (this.listenerContainers) {
|
||||
if (this.listenerContainers.containsKey(id)) {
|
||||
throw new IllegalStateException("Another endpoint is already registered with id '" + id + "'");
|
||||
}
|
||||
// 创建消息监听容器
|
||||
MessageListenerContainer container = createListenerContainer(endpoint, factory);
|
||||
this.listenerContainers.put(id, container);
|
||||
if (startImmediately) {
|
||||
// 启动消息监听容器
|
||||
startIfNecessary(container);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
```
|
||||
|
@ -0,0 +1,201 @@
|
||||
# Spring JmsTemplate
|
||||
- Author: [HuiFer](https://github.com/huifer)
|
||||
- 源码阅读仓库: [SourceHot-spring](https://github.com/SourceHot/spring-framework-read)
|
||||
- 源码路径: `org.springframework.jms.core.JmsTemplate`
|
||||
|
||||
|
||||
## 源码分析
|
||||
### send 发送消息
|
||||
|
||||
```java
|
||||
@Override
|
||||
public void send(final String destinationName, final MessageCreator messageCreator) throws JmsException {
|
||||
// 执行.
|
||||
execute(session -> {
|
||||
Destination destination = resolveDestinationName(session, destinationName);
|
||||
doSend(session, destination, messageCreator);
|
||||
return null;
|
||||
}, false);
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
```java
|
||||
@Nullable
|
||||
public <T> T execute(SessionCallback<T> action, boolean startConnection) throws JmsException {
|
||||
Assert.notNull(action, "Callback object must not be null");
|
||||
Connection conToClose = null;
|
||||
Session sessionToClose = null;
|
||||
try {
|
||||
Session sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(
|
||||
obtainConnectionFactory(), this.transactionalResourceFactory, startConnection);
|
||||
if (sessionToUse == null) {
|
||||
// 创建链接
|
||||
conToClose = createConnection();
|
||||
// 创建session
|
||||
sessionToClose = createSession(conToClose);
|
||||
if (startConnection) {
|
||||
conToClose.start();
|
||||
}
|
||||
sessionToUse = sessionToClose;
|
||||
}
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Executing callback on JMS Session: " + sessionToUse);
|
||||
}
|
||||
/**
|
||||
* sessionCallback 执行
|
||||
* {@link JmsTemplate#doSend(Session, javax.jms.Destination, org.springframework.jms.core.MessageCreator)}
|
||||
*/
|
||||
return action.doInJms(sessionToUse);
|
||||
} catch (JMSException ex) {
|
||||
throw convertJmsAccessException(ex);
|
||||
} finally {
|
||||
// 资源释放
|
||||
JmsUtils.closeSession(sessionToClose);
|
||||
ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), startConnection);
|
||||
}
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
- 最后`action.doInJms(sessionToUse)`的操作
|
||||
```java
|
||||
Destination destination = resolveDestinationName(session, destinationName);
|
||||
doSend(session, destination, messageCreator);
|
||||
return null;
|
||||
```
|
||||
|
||||
- `doSend`真正做的发送方法
|
||||
```java
|
||||
protected void doSend(Session session, Destination destination, MessageCreator messageCreator)
|
||||
throws JMSException {
|
||||
|
||||
Assert.notNull(messageCreator, "MessageCreator must not be null");
|
||||
|
||||
// 创建消息生产者
|
||||
MessageProducer producer = createProducer(session, destination);
|
||||
try {
|
||||
// 创建消息
|
||||
Message message = messageCreator.createMessage(session);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Sending created message: " + message);
|
||||
}
|
||||
// 发送
|
||||
doSend(producer, message);
|
||||
// Check commit - avoid commit call within a JTA transaction.
|
||||
if (session.getTransacted() && isSessionLocallyTransacted(session)) {
|
||||
// Transacted session created by this template -> commit.
|
||||
JmsUtils.commitIfNecessary(session);
|
||||
}
|
||||
} finally {
|
||||
// 关闭消息生产者
|
||||
JmsUtils.closeMessageProducer(producer);
|
||||
}
|
||||
}
|
||||
|
||||
```
|
||||
1. `createProducer`中通过`javax.jms.Session.createProducer`创建`MessageProducer`,第三方消息中间件独立实现
|
||||
2. `createMessage`
|
||||
```java
|
||||
@Override
|
||||
public javax.jms.Message createMessage(Session session) throws JMSException {
|
||||
try {
|
||||
// 消息转换
|
||||
return this.messageConverter.toMessage(this.message, session);
|
||||
} catch (Exception ex) {
|
||||
throw new MessageConversionException("Could not convert '" + this.message + "'", ex);
|
||||
}
|
||||
}
|
||||
```
|
||||
- 消息转换后续在更新
|
||||
3. `doSend` 这里也是第三方消息中间件实现
|
||||
```java
|
||||
protected void doSend(MessageProducer producer, Message message) throws JMSException {
|
||||
if (this.deliveryDelay >= 0) {
|
||||
producer.setDeliveryDelay(this.deliveryDelay);
|
||||
}
|
||||
if (isExplicitQosEnabled()) {
|
||||
// 发送消息,第三方消息中间件实现
|
||||
producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive());
|
||||
} else {
|
||||
producer.send(message);
|
||||
}
|
||||
}
|
||||
```
|
||||
4. `closeMessageProducer` 这个方法特别,直接关闭
|
||||
```java
|
||||
public static void closeMessageProducer(@Nullable MessageProducer producer) {
|
||||
if (producer != null) {
|
||||
try {
|
||||
producer.close();
|
||||
} catch (JMSException ex) {
|
||||
logger.trace("Could not close JMS MessageProducer", ex);
|
||||
} catch (Throwable ex) {
|
||||
// We don't trust the JMS provider: It might throw RuntimeException or Error.
|
||||
logger.trace("Unexpected exception on closing JMS MessageProducer", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
### receive 接收消息
|
||||
```java
|
||||
@Override
|
||||
@Nullable
|
||||
public Message receive(String destinationName) throws JmsException {
|
||||
return receiveSelected(destinationName, null);
|
||||
}
|
||||
@Override
|
||||
@Nullable
|
||||
public Message receiveSelected(final String destinationName, @Nullable final String messageSelector) throws JmsException {
|
||||
return execute(session -> {
|
||||
Destination destination = resolveDestinationName(session, destinationName);
|
||||
return doReceive(session, destination, messageSelector);
|
||||
}, true);
|
||||
}
|
||||
@Nullable
|
||||
protected Message doReceive(Session session, Destination destination, @Nullable String messageSelector)
|
||||
throws JMSException {
|
||||
|
||||
return doReceive(session, createConsumer(session, destination, messageSelector));
|
||||
}
|
||||
@Nullable
|
||||
protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException {
|
||||
try {
|
||||
// Use transaction timeout (if available).
|
||||
long timeout = getReceiveTimeout();
|
||||
// 链接工厂
|
||||
ConnectionFactory connectionFactory = getConnectionFactory();
|
||||
// JMS 资源信息
|
||||
JmsResourceHolder resourceHolder = null;
|
||||
if (connectionFactory != null) {
|
||||
// 从连接对象中获取JMS 资源信息
|
||||
resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager.getResource(connectionFactory);
|
||||
}
|
||||
if (resourceHolder != null && resourceHolder.hasTimeout()) {
|
||||
// 超时时间
|
||||
timeout = Math.min(timeout, resourceHolder.getTimeToLiveInMillis());
|
||||
}
|
||||
// 具体的消息
|
||||
Message message = receiveFromConsumer(consumer, timeout);
|
||||
if (session.getTransacted()) {
|
||||
// 事务性操作
|
||||
// Commit necessary - but avoid commit call within a JTA transaction.
|
||||
if (isSessionLocallyTransacted(session)) {
|
||||
// Transacted session created by this template -> commit.
|
||||
JmsUtils.commitIfNecessary(session);
|
||||
}
|
||||
} else if (isClientAcknowledge(session)) {
|
||||
// Manually acknowledge message, if any.
|
||||
if (message != null) {
|
||||
message.acknowledge();
|
||||
}
|
||||
}
|
||||
return message;
|
||||
} finally {
|
||||
JmsUtils.closeMessageConsumer(consumer);
|
||||
}
|
||||
}
|
||||
|
||||
```
|
@ -0,0 +1,197 @@
|
||||
# Spring MessageConverter
|
||||
- Author: [HuiFer](https://github.com/huifer)
|
||||
- 源码阅读仓库: [SourceHot-spring](https://github.com/SourceHot/spring-framework-read)
|
||||
- 源码路径: `org.springframework.messaging.converter.MessageConverter`
|
||||
## MessageConverter
|
||||
- 消息转换接口
|
||||
- 类图如下
|
||||
![image-20200305085013723](/images/springmessage/image-20200305085013723.png)
|
||||
- 两个方法
|
||||
1. fromMessage: 从消息转换到Object
|
||||
```java
|
||||
Object fromMessage(Message<?> message, Class<?> targetClass);
|
||||
```
|
||||
2. toMessage: 从Object转换到消息
|
||||
```java
|
||||
Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);
|
||||
```
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
| 序号 | class | 作用 |
|
||||
| ---- | ------------------------------- | -------------------- |
|
||||
| 1 | ByteArrayMessageConverter | byte数组消息转换器 |
|
||||
| 2 | MappingJackson2MessageConverter | jackson2的消息转换器 |
|
||||
| 3 | MarshallingMessageConverter | xml的消息转换器 |
|
||||
| 4 | StringMessageConverter | 字符串消息转换器 |
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
## AbstractMessageConverter
|
||||
|
||||
类图:
|
||||
|
||||
![image-20200305085845017](/images/springmessage/image-20200305085845017.png)
|
||||
|
||||
### fromMessage
|
||||
|
||||
```java
|
||||
@Override
|
||||
@Nullable
|
||||
public final Object fromMessage(Message<?> message, Class<?> targetClass) {
|
||||
return fromMessage(message, targetClass, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
public final Object fromMessage(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) {
|
||||
if (!canConvertFrom(message, targetClass)) {
|
||||
return null;
|
||||
}
|
||||
return convertFromInternal(message, targetClass, conversionHint);
|
||||
}
|
||||
|
||||
|
||||
// 子类实现
|
||||
@Nullable
|
||||
protected Object convertFromInternal(
|
||||
Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) {
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
- 真正的转换过程
|
||||
|
||||
```JAVA
|
||||
@Override
|
||||
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) {
|
||||
Charset charset = getContentTypeCharset(getMimeType(message.getHeaders()));
|
||||
Object payload = message.getPayload();
|
||||
return (payload instanceof String ? payload : new String((byte[]) payload, charset));
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
|
||||
|
||||
### toMessage
|
||||
|
||||
```JAVA
|
||||
@Override
|
||||
@Nullable
|
||||
public final Message<?> toMessage(Object payload, @Nullable MessageHeaders headers) {
|
||||
return toMessage(payload, headers, null);
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
|
||||
|
||||
```JAVA
|
||||
@Override
|
||||
@Nullable
|
||||
public final Message<?> toMessage(Object payload, @Nullable MessageHeaders headers, @Nullable Object conversionHint) {
|
||||
if (!canConvertTo(payload, headers)) {
|
||||
return null;
|
||||
}
|
||||
// 子类实现
|
||||
Object payloadToUse = convertToInternal(payload, headers, conversionHint);
|
||||
if (payloadToUse == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
MimeType mimeType = getDefaultContentType(payloadToUse);
|
||||
if (headers != null) {
|
||||
MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(headers, MessageHeaderAccessor.class);
|
||||
if (accessor != null && accessor.isMutable()) {
|
||||
if (mimeType != null) {
|
||||
accessor.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, mimeType);
|
||||
}
|
||||
// 创建信息对象
|
||||
return MessageBuilder.createMessage(payloadToUse, accessor.getMessageHeaders());
|
||||
}
|
||||
}
|
||||
|
||||
MessageBuilder<?> builder = MessageBuilder.withPayload(payloadToUse);
|
||||
if (headers != null) {
|
||||
builder.copyHeaders(headers);
|
||||
}
|
||||
if (mimeType != null) {
|
||||
builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, mimeType);
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
||||
```
|
||||
|
||||
- `org.springframework.messaging.converter.StringMessageConverter#convertToInternal`
|
||||
|
||||
```JAVA
|
||||
@Override
|
||||
@Nullable
|
||||
protected Object convertToInternal(
|
||||
Object payload, @Nullable MessageHeaders headers, @Nullable Object conversionHint) {
|
||||
|
||||
if (byte[].class == getSerializedPayloadClass()) {
|
||||
// 获取编码
|
||||
Charset charset = getContentTypeCharset(getMimeType(headers));
|
||||
// 获取byte数组
|
||||
payload = ((String) payload).getBytes(charset);
|
||||
}
|
||||
return payload;
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
|
||||
|
||||
- 创建**Message**对象
|
||||
|
||||
```JAVA
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T> Message<T> createMessage(@Nullable T payload, MessageHeaders messageHeaders) {
|
||||
Assert.notNull(payload, "Payload must not be null");
|
||||
Assert.notNull(messageHeaders, "MessageHeaders must not be null");
|
||||
if (payload instanceof Throwable) {
|
||||
return (Message<T>) new ErrorMessage((Throwable) payload, messageHeaders);
|
||||
}
|
||||
else {
|
||||
return new GenericMessage<>(payload, messageHeaders);
|
||||
}
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
|
||||
|
||||
```JAVA
|
||||
@SuppressWarnings("unchecked")
|
||||
public Message<T> build() {
|
||||
if (this.originalMessage != null && !this.headerAccessor.isModified()) {
|
||||
return this.originalMessage;
|
||||
}
|
||||
MessageHeaders headersToUse = this.headerAccessor.toMessageHeaders();
|
||||
if (this.payload instanceof Throwable) {
|
||||
return (Message<T>) new ErrorMessage((Throwable) this.payload, headersToUse);
|
||||
}
|
||||
else {
|
||||
return new GenericMessage<>(this.payload, headersToUse);
|
||||
}
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
|
||||
|
||||
- 两种创建方式基本相同,如果出现异常组装异常消息对象`ErrorMessage`,成功创建`GenericMessage`
|
||||
|
||||
![image-20200305090846313](/images/springmessage/image-20200305090846313.png)
|
||||
|
||||
从类图上看`ErrorMessage`是`GenericMessage`的子类
|
After Width: | Height: | Size: 13 KiB |
After Width: | Height: | Size: 20 KiB |
After Width: | Height: | Size: 20 KiB |
After Width: | Height: | Size: 9.7 KiB |
After Width: | Height: | Size: 7.1 KiB |
Loading…
Reference in new issue