平常我们做应用功能的时候,经常会碰到A、B、C等多起事件取数据,进行组装后反馈结果集。
//用户信息
userService.getProfile(userId);
//用户信用信息
accountService.getAccount(userId);
//SNS
snsService.getSNS(userId);
//send email\sms
platformService.sendMessage(userId,message)
综上所述,我们能够观察得到其实上述4次事件完全可以并行处理,现有的方式我们的执行计划是采纳串行的方式
明显能够感觉得到A+B+C+D所消耗的时间才是这个method方法执行所消耗的时间,那如果采纳为并行的方式进行调用呢?
串行计划耗时:1+1+3+1=6
并行计划耗时:3
底层最为重要的一步,通过代理的方式,把类中的method都进行future调用
采用的是阿里的异步调用组件,但是代码有些老我进行了微调优,但是核心的内容主要还是proxy的方式,代码挺丑的,但是蛮有趣的方式进行异步调用。
考虑到并不是每个逻辑处理都需要异步的方式,
spring配置
<bean id="autoProxyCreator"
class="spring.lifecycle.BeanNameAutoProxyCreator">
<property name="beanNames">
<list>
<value>*Service</value>
<value>*DAO</value>
</list>
</property>
<property name="interceptorFilter">
<map>
<entry key="*Service" value="transactionInterceptor,paramterInterceptor,monitorInterceptor"/>
</map>
</property>
</bean>
我在AbstractAutoProxyCreator中新增的一属性值
private Map<String, String> interceptorFilter = new HashMap<String, String>();
然后进行拦截工作,指定不同的Service被拦截不同的拦截器
protected Advisor[] buildAdvisors(String beanName, Object[] specificInterceptors) {
Iterator itor = interceptorFilter.entrySet().iterator();
boolean ismatch = false;
while (itor.hasNext()) {
Map.Entry<String, String> entry = (Map.Entry<String, String>) itor.next();
String key = entry.getKey();
String values = entry.getValue();
if (key.contains("*")) {
key = key.replace("*", "");
Pattern pattern = Pattern.compile(".*(" + key + "$)");
Matcher matcher = pattern.matcher(beanName);
ismatch = matcher.matches();
} else {
ismatch = key.matches(beanName);
}
if (ismatch) {
this.interceptorNames = values.split(",");
}
}
...
前几天阿里的一位问起如何有没有考虑异步校验工作,一开始时候没想到,但是回过头想了下异步校验可能有其特殊的场合需要,正常情况下应该只需要顺序校验field。所以依托于其组件上层,写了一个异步注解的校验方式
核心代码如下
public void validateFieldMethod(final Object o) throws Exception {
Field[] fields = o.getClass().getDeclaredFields();
for (final Field field : fields) {
if (field.getAnnotation(ValidationDriver.class) != null) {
field.setAccessible(true);
Future future = executor.submit(new AsyncLoadCallable() {
public Object call() throws Exception {
try {
return AnnotationValidator.validate(field.get(o));
} catch (Throwable e) {
throw new AsyncLoadException("future invoke error!", e);
}
}
public AsyncLoadConfig getConfig() {
return config;
}
});
}
}
}
public class AsyncLoadEnhanceProxy<T> implements AsyncLoadProxy<T> {
private T service;
private AsyncLoadConfig config;
private AsyncLoadExecutor executor;
private Class<T> targetClass;
public AsyncLoadEnhanceProxy(){
}
public AsyncLoadEnhanceProxy(T service, AsyncLoadExecutor executor){
this(service, new AsyncLoadConfig(), executor);
}
public AsyncLoadEnhanceProxy(T service, AsyncLoadConfig config, AsyncLoadExecutor executor){
this.service = service;
this.config = config;
this.executor = executor;
this.targetClass = (Class<T>) service.getClass();// 默认的代理class对象即为service
}
public T getProxy() {
validate();
return getProxyInternal();
}
/**
* 相应的检查方法
*/
private void validate() {
AsyncLoadUtils.notNull(service, "service should not be null");
AsyncLoadUtils.notNull(config, "config should not be null");
AsyncLoadUtils.notNull(executor, "executor should not be null");
if (Modifier.isFinal(targetClass.getModifiers())) { // 目前暂不支持final类型的处理,以后可以考虑使用jdk
// proxy
throw new AsyncLoadException("Enhance proxy not support final class :" + targetClass.getName());
}
if (!Modifier.isPublic(targetClass.getModifiers())) {
// 处理如果是非public属性,则不进行代理,强制访问会出现IllegalAccessException,比如一些内部类或者匿名类不允许直接访问
throw new AsyncLoadException("Enhance proxy not support private/protected class :" + targetClass.getName());
}
}
/**
* 异步校验
* @param o
* @throws Exception
*/
public void validateFieldMethod(final Object o) throws Exception {
Field[] fields = o.getClass().getDeclaredFields();
for (final Field field : fields) {
if (field.getAnnotation(ValidationDriver.class) != null) {
field.setAccessible(true);
Future future = executor.submit(new AsyncLoadCallable() {
public Object call() throws Exception {
try {
return AnnotationValidator.validate(field.get(o));
} catch (Throwable e) {
throw new AsyncLoadException("future invoke error!", e);
}
}
public AsyncLoadConfig getConfig() {
return config;
}
});
}
}
}
class AsyncLoadCallbackFilter implements CallbackFilter {
public int accept(Method method) {
// 预先进行匹配,直接计算好需要处理的method,避免动态匹配浪费性能
if (AsyncLoadObject.class.isAssignableFrom(method.getDeclaringClass())) {// 判断对应的方法是否属于AsyncLoadObject
return 0; // for AsyncLoadServiceInterceptor
} else {
Map<AsyncLoadMethodMatch, Long> matches = config.getMatches();
Set<AsyncLoadMethodMatch> methodMatchs = matches.keySet();
if (methodMatchs != null && !methodMatchs.isEmpty()) {
for (Iterator<AsyncLoadMethodMatch> methodMatch = methodMatchs.iterator(); methodMatch.hasNext();) {
if (methodMatch.next().matches(method)) {
return 2; // for AsyncLoadInterceptor
}
}
}
return 1; // for AsyncLoadDirect
}
}
}
class AsyncLoadServiceInterceptor implements MethodInterceptor {
public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable {
if ("_getOriginalClass".equals(method.getName())) {
return getOriginalClass();
}
throw new AsyncLoadException("method[" + method.getName() + "] is not support!");
}
private Object getOriginalClass() {
return targetClass;
}
}
class AsyncLoadDirect implements Dispatcher {
public Object loadObject() throws Exception {
return service;
}
}
class AsyncLoadInterceptor implements MethodInterceptor {
public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable {
Long timeout = getMatchTimeout(method);
final Object finObj = service;
final Object[] finArgs = args;
final Method finMethod = method;
Class returnClass = method.getReturnType();
if (Void.TYPE.isAssignableFrom(returnClass)) {// 判断返回值是否为void
// 不处理void的函数调用
return finMethod.invoke(finObj, finArgs);
} else if (!Modifier.isPublic(returnClass.getModifiers())) {
// 处理如果是非public属性,则不进行代理,强制访问会出现IllegalAccessException,比如一些内部类或者匿名类不允许直接访问
return finMethod.invoke(finObj, finArgs);
} else if (Modifier.isFinal(returnClass.getModifiers())) {
// 处理特殊的final类型,目前暂不支持,后续可采用jdk proxy
return finMethod.invoke(finObj, finArgs);
} else if (returnClass.isPrimitive() || returnClass.isArray()) {
// 不处理特殊类型,因为无法使用cglib代理
return finMethod.invoke(finObj, finArgs);
} else if (returnClass == Object.class) {
// 针对返回对象是Object类型,不做代理。没有具体的method,代理没任何意义
return finMethod.invoke(finObj, finArgs);
} else {
Future future = executor.submit(new AsyncLoadCallable() {
public Object call() throws Exception {
try {
return finMethod.invoke(finObj, finArgs);// 需要直接委托对应的finObj(service)进行处理
} catch (Throwable e) {
throw new AsyncLoadException("future invoke error!", e);
}
}
public AsyncLoadConfig getConfig() {
return config;
}
});
// 够造一个返回的AsyncLoadResult
AsyncLoadResult result = new AsyncLoadResult(returnClass, future, timeout);
// 继续返回一个代理对象
AsyncLoadObject asyncProxy = (AsyncLoadObject) result.getProxy();
// 添加到barrier中
if (config.getNeedBarrierSupport()) {
AsyncLoadBarrier.addTask((AsyncLoadObject) asyncProxy);
}
// 返回对象
return asyncProxy;
}
}
/**
* 返回对应的匹配的timeout时间,一定能找到对应的匹配点
*
* @param method
* @return
*/
private Long getMatchTimeout(Method method) {
Map<AsyncLoadMethodMatch, Long> matches = config.getMatches();
Set<Map.Entry<AsyncLoadMethodMatch, Long>> entrys = matches.entrySet();
if (entrys != null && !entrys.isEmpty()) {
for (Iterator<Map.Entry<AsyncLoadMethodMatch, Long>> iter = entrys.iterator(); iter.hasNext();) {
Map.Entry<AsyncLoadMethodMatch, Long> entry = iter.next();
if (entry.getKey().matches(method)) {
return entry.getValue();
}
}
}
return config.getDefaultTimeout();
}
}
// =========================== help mehotd =================================
/**
* 优先从Repository进行获取ProxyClass,创建对应的object
*
* @return
*/
private T getProxyInternal() {
Class proxyClass = AsyncLoadProxyRepository.getProxy(targetClass.getName());
if (proxyClass == null) {
Enhancer enhancer = new Enhancer();
if (targetClass.isInterface()) { // 判断是否为接口,优先进行接口代理可以解决service为final
enhancer.setInterfaces(new Class[] { targetClass });
} else {
enhancer.setSuperclass(targetClass);
}
enhancer.setCallbackTypes(new Class[] { AsyncLoadServiceInterceptor.class, AsyncLoadDirect.class,
AsyncLoadInterceptor.class });
enhancer.setCallbackFilter(new AsyncLoadCallbackFilter());
proxyClass = enhancer.createClass();
// 注册proxyClass
AsyncLoadProxyRepository.registerProxy(targetClass.getName(), proxyClass);
}
Enhancer.registerCallbacks(proxyClass, new Callback[] { new AsyncLoadServiceInterceptor(),
new AsyncLoadDirect(), new AsyncLoadInterceptor() });
try {
return (T) AsyncLoadReflectionHelper.newInstance(proxyClass);
} finally {
// clear thread callbacks to allow them to be gc'd
Enhancer.registerStaticCallbacks(proxyClass, null);
}
}
// ====================== setter / getter ===========================
public void setService(T service) {
this.service = service;
if (targetClass == null) {
this.targetClass = (Class<T>) service.getClass();
}
}
public void setConfig(AsyncLoadConfig config) {
this.config = config;
}
public void setExecutor(AsyncLoadExecutor executor) {
this.executor = executor;
}
public void setTargetClass(Class targetClass) {
this.targetClass = targetClass;
}
}
- 大小: 27.6 KB
- 大小: 9 KB
- 大小: 49.2 KB
分享到:
相关推荐
人工智能-项目实践-异步调度-异步调度IP免费代理池 本项目通过爬虫抓取互联网上免费代理网站的IP,并且进行异步检测是否可用,如果可用就放入数据库。定时对数据库中的代理进行维护,然后通过web api的形式供外部...
针对异步访存调度机制,设计一种细粒度化的调度方案以提升系统性能。该机制引入信号量和自旋锁,由异构核间协作运作,以实现对流级调度的局部加速。通过在一组测试程序集以及在对应平台上进行的实验,评估了引入该...
围绕超算资源的易用性和多类软件的集成以及协作需求,开发了超算环境下的科学工作流应用平台,设计了异步并发的流程执行引擎,采取调度算法和调度器、引擎相分离的设计策略,给出了资源调度方案。提出了局部资源池化...
爬取策略和调度器设计 去重和增量爬取的技术和算法 数据存储与管理 分布式存储系统的选择和设计 数据去重和数据合并 大规模数据存储的优化和扩展 反爬虫和限流策略 反爬虫机制的分类和应对策略 IP代理和User-Agent...
爬取策略和调度器设计 去重和增量爬取的技术和算法 数据存储与管理 分布式存储系统的选择和设计 数据去重和数据合并 大规模数据存储的优化和扩展 反爬虫和限流策略 反爬虫机制的分类和应对策略 IP代理和User-Agent...
详细介绍讲解:数字电路基础知识,边沿检测电路,状态机电路,单双端arm读写电路,同步异步FIFO设计,SP优先级调度器设计
为解决短波侦收中任务执行时间长和侦收资源利用率低等问题,以最大侦收概率为目标,结合约束条件建立短波协同侦收资源调度模型,设计运用改进型蚁群优化算法对模型求解,采用粒子群参数优化技术改进蚁群优化算法;...
一种调度系统中高可靠性实时协议转换网关的设计与实现.pdf 三相SPWM波在TMS320F28335中的实现.pdf 基于DSP与ZigBee的电力监测系统设计.pdf 基于DSP开发系统设计与实现.pdf 基于DSP的激光跟踪仪数据通信及处理模块...
本书首先解释了线程的基本概念,包括异步编程、线程的生命周期和同步机制;然后讨论了一些高级话题,包括属性对象、线程私有数据和实时调度。此外,本书还讨论了调度的问题,并给出了避免错误和提高性能等问题的有...
那么异步消息处理可以用哪呢? 1、用于UI线程当Bitmap加载完成后更新ImageView 2、在图片加载类初始化时,我们会在一个子线程中维护一个Loop实例,当然子线程中也就有了MessageQueue,Looper会一直在那loop停着等待...
简述了异步任务处理在复杂Web应用中的必要性,利用Java中基于线程池的执行框架,分析并设计了相应的任务调度方法,以解决Web应用中大型任务处理时间长与系统要求响应时问短的矛盾,实现了用于处理复杂任务的异步调度,...
进程同步模拟设计--司机和售票员问题 进程调度 同步异步
除了异步IO的支持之外,Swoole为PHP多进程的模式设计了多个并发数据结构和IPC通信机制,可以大大简化多进程并发编程的工作。其中包括了并发原子计数器,并发HashTable,Channel,Lock,进程间通信IPC等丰富的功能...
《JAVA多线程设计模式》PDF 下载 《Java线程 高清晰中文第二版》中文第二版(PDF) 前言 第一章 线程简介 Java术语 线程概述 为什么要使用线程? 总结 第二章 Java线程API 通过Thread类创建线程 使用Runable接口...
进程同步模拟设计--读者和写者问题 进程调度问题 同步异步
本, 书首先解释了线程的基本概念,包括异步编程、线程的生命周期和同步机制;然后讨论了, 一些高级话题,包括属性对象、线程私有数据和实时调度。此外,本书还讨论了调度的问, 题,并给出了避免错误和提高性能等问题...
摘 要 基于RS- 232异步串行通信接口, 根据微机通信特点, 借鉴计算机网络中链路层的 通信规程, 设计并实现适合于微机间数据传输的通信协议, 提出了一种较为完善的接口技术途径。 关键词 通信协议 中断激活 链路...
书首先解释了线程的基本概念,包括异步编程、线程的生命周期和同步机制;然后讨论了 一些高级话题,包括属性对象、线程私有数据和实时调度。此外,本书还讨论了调度的问 题,并给出了避免错误和提高性能等问题的有...
29、全异步:任务调度流程全异步化设计实现,如异步调度、异步运行、异步回调等,有效对密集调度进行流量削峰,理论上支持任意时长任务的运行; 30、跨语言:调度中心与执行器提供语言无关的 RESTful API 服务,第三...