From 54df59222299c11d0ab3a6d251799bf8e9d4adcf Mon Sep 17 00:00:00 2001 From: luolianxin Date: Fri, 28 Feb 2025 17:06:16 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=9C=8D=E5=8A=A1=E7=AB=AFco?= =?UTF-8?q?nfig=E4=B8=8Eevent=E7=9B=B8=E5=85=B3=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../lawenforcement/config/AsyncConfig.java | 53 ++++++++ .../config/SyncApiProperties.java | 34 +++++ .../config/ThirdPartyProperties.java | 52 ++++++++ .../event/DataChangeAction.java | 17 +++ .../lawenforcement/event/DataChangeEvent.java | 39 ++++++ .../event/aop/DataChangeAspect.java | 123 ++++++++++++++++++ .../event/aop/PublishDataChange.java | 24 ++++ .../event/listener/DataSyncListener.java | 79 +++++++++++ 8 files changed, 421 insertions(+) create mode 100644 server/src/main/java/com/aisino/iles/lawenforcement/config/AsyncConfig.java create mode 100644 server/src/main/java/com/aisino/iles/lawenforcement/config/SyncApiProperties.java create mode 100644 server/src/main/java/com/aisino/iles/lawenforcement/config/ThirdPartyProperties.java create mode 100644 server/src/main/java/com/aisino/iles/lawenforcement/event/DataChangeAction.java create mode 100644 server/src/main/java/com/aisino/iles/lawenforcement/event/DataChangeEvent.java create mode 100644 server/src/main/java/com/aisino/iles/lawenforcement/event/aop/DataChangeAspect.java create mode 100644 server/src/main/java/com/aisino/iles/lawenforcement/event/aop/PublishDataChange.java create mode 100644 server/src/main/java/com/aisino/iles/lawenforcement/event/listener/DataSyncListener.java diff --git a/server/src/main/java/com/aisino/iles/lawenforcement/config/AsyncConfig.java b/server/src/main/java/com/aisino/iles/lawenforcement/config/AsyncConfig.java new file mode 100644 index 0000000..d818cf9 --- /dev/null +++ b/server/src/main/java/com/aisino/iles/lawenforcement/config/AsyncConfig.java @@ -0,0 +1,53 @@ +package com.aisino.iles.lawenforcement.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +@Configuration +@EnableAsync +public class AsyncConfig { + + /** + * 定义一个专用于@Async任务的线程池Bean。 + * Spring Boot会自动检测到这个Bean,并用它来执行所有@Async方法。 + */ + @Bean + public Executor asyncTaskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + + // 核心线程数:线程池中保持的最小线程数,即使它们处于空闲状态。 + // 通常设置为CPU核心数,以充分利用CPU资源。 + int corePoolSize = Runtime.getRuntime().availableProcessors(); + executor.setCorePoolSize(corePoolSize); + + // 最大线程数:线程池允许创建的最大线程数。 + // 对于I/O密集型任务(如我们的数据同步),可以设置得比核心数大很多。 + executor.setMaxPoolSize(corePoolSize * 2); + + // 队列容量:当所有核心线程都在忙时,新任务会进入这个队列等待。 + // 设置一个合理的队列容量可以应对突发流量。 + executor.setQueueCapacity(200); + + // 线程名称前缀:为线程池中的线程设置一个有意义的前缀。 + // 这对于日志分析和问题排查至关重要。 + executor.setThreadNamePrefix("async-task-"); + + // 拒绝策略:当线程池和队列都满了之后,如何处理新来的任务。 + // CallerRunsPolicy:由提交任务的线程自己来执行这个任务。这是一种有效的"反压"策略, + // 可以减慢任务提交方的速度,防止系统被压垮。 + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + + // 空闲线程存活时间:当线程数超过核心数时,多余的空闲线程在终止前等待新任务的最长时间。 + executor.setKeepAliveSeconds(60); + + // 初始化线程池 + executor.initialize(); + + return executor; + } +} diff --git a/server/src/main/java/com/aisino/iles/lawenforcement/config/SyncApiProperties.java b/server/src/main/java/com/aisino/iles/lawenforcement/config/SyncApiProperties.java new file mode 100644 index 0000000..94cacde --- /dev/null +++ b/server/src/main/java/com/aisino/iles/lawenforcement/config/SyncApiProperties.java @@ -0,0 +1,34 @@ +package com.aisino.iles.lawenforcement.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +import java.util.Map; + +/** + * 数据同步API配置属性。 + *

+ * 用于从 application.yml 文件中加载与第三方系统同步数据相关的API端点配置。 + *

+ */ +@Data +@Component +@ConfigurationProperties(prefix = "iles.sync.api") +public class SyncApiProperties { + + /** + * API基础URL。 + */ + private String baseUrl; + + /** + * API端点映射。 + *

+ * Key是完全限定的DTO类名 (e.g., "com.aisino.iles.lawenforcement.model.dto.DjbZfajxxRequestDto"), + * Value是对应的相对API路径 (e.g., "/djbZfajxx/save"). + *

+ */ + private Map endpoints; + private Map dtoConverters; +} diff --git a/server/src/main/java/com/aisino/iles/lawenforcement/config/ThirdPartyProperties.java b/server/src/main/java/com/aisino/iles/lawenforcement/config/ThirdPartyProperties.java new file mode 100644 index 0000000..9fc02fd --- /dev/null +++ b/server/src/main/java/com/aisino/iles/lawenforcement/config/ThirdPartyProperties.java @@ -0,0 +1,52 @@ +package com.aisino.iles.lawenforcement.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +import java.util.Collections; +import java.util.List; + +/** + * @author Cascade + * @description 用于映射所有 third-party.* 配置的统一属性类 + * @date 2025/07/03 + */ +@Component +@ConfigurationProperties(prefix = "third-party") +@Data +public class ThirdPartyProperties { + + private Person person = new Person(); + private Enterprise enterprise = new Enterprise(); + + /** + * 人员相关配置 + */ + @Data + public static class Person { + /** + * 人员查询URL + */ + private String query = ""; + + /** + * 人员角色列表 + */ + private List roles = Collections.emptyList(); + + } + + /** + * 企业相关配置 + */ + @Data + public static class Enterprise { + /** + * 用于获取令牌的移动电话号码 + */ + private String yddh; + + } + +} diff --git a/server/src/main/java/com/aisino/iles/lawenforcement/event/DataChangeAction.java b/server/src/main/java/com/aisino/iles/lawenforcement/event/DataChangeAction.java new file mode 100644 index 0000000..d9d1c88 --- /dev/null +++ b/server/src/main/java/com/aisino/iles/lawenforcement/event/DataChangeAction.java @@ -0,0 +1,17 @@ +package com.aisino.iles.lawenforcement.event; + +/** + * 数据变更操作类型枚举 + * @author hx + */ +public enum DataChangeAction { + + /** + * 保存 + */ + SAVE, + /** + * 删除 + */ + DELETE +} diff --git a/server/src/main/java/com/aisino/iles/lawenforcement/event/DataChangeEvent.java b/server/src/main/java/com/aisino/iles/lawenforcement/event/DataChangeEvent.java new file mode 100644 index 0000000..5202c22 --- /dev/null +++ b/server/src/main/java/com/aisino/iles/lawenforcement/event/DataChangeEvent.java @@ -0,0 +1,39 @@ +package com.aisino.iles.lawenforcement.event; + +import lombok.Getter; +import org.springframework.context.ApplicationEvent; + +/** + * 通用数据变更事件 + * @param 变更的数据实体类型 + * @author hx + */ +@Getter +public class DataChangeEvent extends ApplicationEvent { + + private final T data; + private final DataChangeAction action; + private final Class entityType; + + /** + * 创建一个新的 DataChangeEvent. + * + * @param source 事件源对象 + * @param data 变更的数据实体 + * @param action 数据变更的动作类型 (CREATE, UPDATE, DELETE) + */ + public DataChangeEvent(Object source, T data, DataChangeAction action) { + super(source); + this.data = data; + this.action = action; + entityType = null; + } + + public DataChangeEvent(Object source, T data, DataChangeAction action, Class entityType) { + super(source); + this.data = data; + this.action = action; + this.entityType = entityType; + } + +} diff --git a/server/src/main/java/com/aisino/iles/lawenforcement/event/aop/DataChangeAspect.java b/server/src/main/java/com/aisino/iles/lawenforcement/event/aop/DataChangeAspect.java new file mode 100644 index 0000000..2d66275 --- /dev/null +++ b/server/src/main/java/com/aisino/iles/lawenforcement/event/aop/DataChangeAspect.java @@ -0,0 +1,123 @@ +package com.aisino.iles.lawenforcement.event.aop; + +import com.aisino.iles.lawenforcement.config.SyncApiProperties; +import com.aisino.iles.lawenforcement.model.dto.DataDtoConverter; +import com.aisino.iles.lawenforcement.event.DataChangeEvent; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.aspectj.lang.JoinPoint; +import org.aspectj.lang.annotation.AfterReturning; +import org.aspectj.lang.annotation.Aspect; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.stereotype.Component; + +import java.util.Collection; + +/** + * 数据变更事件发布的AOP切面。 + * 拦截所有被 @PublishDataChange 注解标记的方法,并在方法成功返回后发布事件。 + * @author hx + */ +@Slf4j +@Aspect +@Component +@RequiredArgsConstructor +public class DataChangeAspect { + + private final ApplicationEventPublisher eventPublisher; + private final SyncApiProperties syncApiProperties; + + /** + * 在被 @PublishDataChange 注解标记的方法成功返回后执行。 + * + * @param joinPoint 连接点,包含方法信息 + * @param annotation 注解实例,用于获取action类型 + * @param returnedValue 方法的返回值,即被操作的数据实体 + */ + @SneakyThrows + @AfterReturning(pointcut = "@annotation(annotation)", returning = "returnedValue") + public void publishEventOnSuccess(JoinPoint joinPoint, PublishDataChange annotation, Object returnedValue) { + if (returnedValue == null) { + log.warn("AOP切面: 方法 {} 返回值为null,无法发布数据变更事件。", joinPoint.getSignature().toShortString()); + return; + } + + log.info("AOP切面: 拦截到方法 {} 成功返回, 准备发布 {} 事件。", joinPoint.getSignature().toShortString(), annotation.action()); + + if (returnedValue instanceof Collection) { + @SuppressWarnings("unchecked") // Safe cast after instanceof check + Collection returnedCollection = (Collection) returnedValue; + if (returnedCollection.isEmpty()) { + log.warn("AOP切面: 方法 {} 返回的集合为空,不发布事件。", joinPoint.getSignature().toShortString()); + return; + } + log.info("AOP切面: 方法 {} 返回集合,将为集合中 {} 个元素分别发布事件。", joinPoint.getSignature().toShortString(), returnedCollection.size()); + for (Object item : returnedCollection) { + if (item != null) { + try { + String converterClassName = syncApiProperties.getDtoConverters().get(item.getClass().getName()); + if (converterClassName == null || converterClassName.isEmpty()) { + log.warn("AOP切面: 未找到类型 {} 的转换器配置。", item.getClass().getName()); + continue; + } + + Class converterClass = Class.forName(converterClassName); + if (!DataDtoConverter.class.isAssignableFrom(converterClass)) { + continue; + } + + @SuppressWarnings("unchecked") + DataDtoConverter converter = (DataDtoConverter) converterClass.getConstructor().newInstance(); + Object convertedItem = converter.convert(item); + + if (convertedItem == null) { + log.warn("AOP切面: 转换后的对象为null。原始对象: {}", item); + return; + } + + DataChangeEvent event = new DataChangeEvent<>(joinPoint.getTarget(), convertedItem, annotation.action()); + eventPublisher.publishEvent(event); + log.info("AOP切面: {} 事件已成功发布 (来自集合), 数据: {}", annotation.action(), item); + } catch (ClassNotFoundException e) { + log.error("AOP切面: 找不到转换器类。错误: {}", e.getMessage()); + } catch (Exception e) { + log.error("AOP切面: 创建或使用转换器时发生错误。错误: {}", e.getMessage()); + } + } else { + log.warn("AOP切面: 集合中的一个元素为null,跳过该元素的事件发布。方法: {}", joinPoint.getSignature().toShortString()); + } + } + } else { + try { + String converterClassName = syncApiProperties.getDtoConverters().get(returnedValue.getClass().getName()); + if (converterClassName == null || converterClassName.isEmpty()) { + log.warn("AOP切面: 未找到类型 {} 的转换器配置。", returnedValue.getClass().getName()); + return; + } + + Class converterClass = Class.forName(converterClassName); + if (!DataDtoConverter.class.isAssignableFrom(converterClass)) { + return; + } + + @SuppressWarnings("unchecked") + DataDtoConverter converter = (DataDtoConverter) converterClass.getConstructor().newInstance(); + Object convertedItem = converter.convert(returnedValue); + + if (convertedItem == null) { + log.warn("AOP切面: 转换后的对象为null。原始对象: {}", returnedValue); + return; + } + + DataChangeEvent event = new DataChangeEvent<>(joinPoint.getTarget(), convertedItem, annotation.action()); + eventPublisher.publishEvent(event); + log.info("AOP切面: {} 事件已成功发布, 数据: {}", annotation.action(), returnedValue); + } catch (ClassNotFoundException e) { + log.error("AOP切面: 找不到转换器类。错误: {}", e.getMessage()); + } catch (Exception e) { + log.error("AOP切面: 创建或使用转换器时发生错误。错误: {}", e.getMessage()); + } + } + } +} diff --git a/server/src/main/java/com/aisino/iles/lawenforcement/event/aop/PublishDataChange.java b/server/src/main/java/com/aisino/iles/lawenforcement/event/aop/PublishDataChange.java new file mode 100644 index 0000000..1c69a9e --- /dev/null +++ b/server/src/main/java/com/aisino/iles/lawenforcement/event/aop/PublishDataChange.java @@ -0,0 +1,24 @@ +package com.aisino.iles.lawenforcement.event.aop; + +import com.aisino.iles.lawenforcement.event.DataChangeAction; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * 自定义注解,用于标记需要发布数据变更事件的方法。 + * 当方法成功返回时,AOP切面将自动发布一个DataChangeEvent。 + * @author hx + */ +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +public @interface PublishDataChange { + + /** + * 定义数据变更的操作类型 (CREATE, UPDATE, etc.) + * @return 数据变更操作类型 + */ + DataChangeAction action(); +} diff --git a/server/src/main/java/com/aisino/iles/lawenforcement/event/listener/DataSyncListener.java b/server/src/main/java/com/aisino/iles/lawenforcement/event/listener/DataSyncListener.java new file mode 100644 index 0000000..b840534 --- /dev/null +++ b/server/src/main/java/com/aisino/iles/lawenforcement/event/listener/DataSyncListener.java @@ -0,0 +1,79 @@ +package com.aisino.iles.lawenforcement.event.listener; + +import com.aisino.iles.lawenforcement.event.DataChangeEvent; +import com.aisino.iles.lawenforcement.service.DataSyncService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +/** + * 数据变更事件监听器,负责将数据同步到第三方应用。 + * @author hx + */ +@Slf4j +@Component +public class DataSyncListener { + + private final DataSyncService dataSyncService; + + @Autowired + public DataSyncListener(DataSyncService dataSyncService) { + this.dataSyncService = dataSyncService; + } + + /** + * 异步监听数据变更事件。 + * + * @param event 数据变更事件 + * @param 数据类型 + */ + @Async + @EventListener + public void onDataChangeEvent(DataChangeEvent event) { + T data = event.getData(); + if (data == null) { + log.warn("接收到数据为空的事件,已忽略。"); + return; + } + + Class dataType = determineDataType(event); + if (dataType != null) { + log.info("接收到数据变更事件,类型: {}", dataType.getSimpleName()); + try { + dataSyncService.sendData(data, dataType); + } catch (Exception e) { + // 异常已在DataSyncService的恢复方法中记录,此处无需重复处理 + log.trace("数据类型 {} 的同步在重试后最终失败,此为预期行为。", dataType.getSimpleName()); + } + } else { + log.warn("无法确定事件中的数据类型,已忽略。事件: {}", event); + } + } + + /** + * 从数据变更事件中确定关键的数据类型。 + */ + private Class determineDataType(DataChangeEvent event) { + T data = event.getData(); + if (data == null) return null; + + // 优先使用事件中明确指定的实体类型 + if (event.getEntityType() != null) { + return event.getEntityType(); + } + + // 如果是集合类型,尝试获取第一个元素的类型 + if (data instanceof Iterable) { + Iterable iterable = (Iterable) data; + if (iterable.iterator().hasNext()) { + return iterable.iterator().next().getClass(); + } + return null; // 空集合无法确定类型 + } + + // 对于单个对象,直接返回其类型 + return data.getClass(); + } +}