GitHub release

DActor

Introduction

DActor框架基于协程思想设计,可同时支持同步和异步代码,简化在线异步代码的开发,用同步代码的思维来开发异步代码,兼顾异步代码的高并发、无阻塞和同步代码的易读性,可维护性。 最大程度的降低阻塞,提高单个线程的处理能力,并可有效的降低线程数。

项目地址

GitHub:https://github.com/allon2/dactor
GitEE:https://gitee.com/handyun/dactor

QQ交流群

783580303

Overview

目前开发过程中的几个常见模型

启动后,可在控制台看到内部调用结果
1.png

Maven dependency

<dependency>
    <groupId>cn.ymotel</groupId>
    <artifactId>dactor</artifactId>
    <version>1.1.1</version>
</dependency>

Gradle dependency

compile group: 'cn.ymotel', name: 'dactor', version:'1.1.1'

代码简单讲解

执行过程为chain->grandfather->parent->Selft。 依次调用执行责任链中逻辑,grandfather中的逻辑,parent的逻辑和自身逻辑。 chain,grandfather,parent都可为空,不设置 在grandfather和parent中的Steps中至少有一个为placeholderActor交易,以调用子逻辑

整个过程中,需要先设置全局占位符

交易中如果未填写beginBeanId或者endBeanId时,系统默认使用全局中配置的beginBeanId或者endBeanId

   <actor id="randomTxt" parent="actorhttpcore" beginBeanId="randomTxtActor">
        <steps>
            <step fromBeanId="randomTxtActor" toBeanId="placeholderActor" conditon=""/>
            <step fromBeanId="placeholderActor" toBeanId="endActor" conditon=""/>
        </steps>
    </actor>

condtion可为空,空字符串,或者是ognl表达式
placeholderActor的作用是在暂存当前环境,并调用子交易,待子交易执行完毕后,再恢复当前环境继续执行
如果在Step中未找到toBeanIdActor,会直接调用endBeanId方法,认为自身交易已执行结束。
交易的请求和流转信息都保存在Message中
如果指定handleException=false或者使用默认设置,直接返回父中执行,如果父中也未捕获,则继续返回上一级执行,
一般来说至少有要有一个actor中指定handleException=true

启动框架接收和执行请求

同步异步写法

同步写法

  public class BeginActor implements Actor {

    /* (non-Javadoc)
     * @see com.ymotel.util.actor.Actor#HandleMessage(com.ymotel.util.actor.Message)
     */
    @Override
    public Object HandleMessage(Message message) throws Exception {
        return message;

    }

}

继承Actor接口,在HandlerMessage返回message方法,框架判断返回不为空的情况下,直接将结果丢给框架进行下一步出来

异步写法

以HttpClientActor为例,在HandlerMessage中最后返回null,框架收到请求后等待下一步处理,释放线程,在HttpClient中的CallBack中调用
message.getControlMessage().getMessageDispatcher().sendMessage(message);框架收到后进行下一步处理。

    public Object HandleMessage(final Message message) throws Exception {
        Map context = message.getContext();

//		
        HttpUriRequest request = getHttpBuild(message.getContext()).build();

        if (referer != null) {
            request.addHeader("Referer", referer);
        }


        /**
         * 通过此处可共用会话,进行类似登录后交易
         */
        HttpClientContext tmplocalContext = null;
        if (context.containsKey(HTTPCLIENT_CONTEXT)) {
            tmplocalContext = (HttpClientContext) context.get(HTTPCLIENT_CONTEXT);
        } else {
            tmplocalContext = HttpClientContext.create();
            CookieStore cookieStore = new BasicCookieStore();
            tmplocalContext.setCookieStore(cookieStore);
            context.put(HTTPCLIENT_CONTEXT, tmplocalContext);
        }

        final HttpClientContext localContext = tmplocalContext;
//        CookieStore cookieStore = new BasicCookieStore();
//        localContext.setCookieStore(cookieStore);
        if (logger.isInfoEnabled()) {
            logger.info("HandleMessage(Message) - httpclient----" + request); //$NON-NLS-1$
        }
//		 final HttpGet httpget = new HttpGet(uri);

        final String tmpcontent = content;
        final String tmpcharset = charset != null ? charset : (String) context.get(CHARSET);


        httpClientHelper.getHttpclient().execute(request, localContext, new FutureCallback<HttpResponse>() {

            public void completed(final HttpResponse response) {
                try {
                    /**
                     * 完成后及时清除
                     */
                    message.getContext().remove(tmpcontent);

                    actorHttpClientResponse.handleResponse(response, localContext, tmpcharset, message);
//					String responseString=HandleResponse((String)message.getContext().get(CHARSET),response);
//					message.getContext().put(RESPONSE, responseString);
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    if (logger.isErrorEnabled()) {
                        logger.error("$FutureCallback<HttpResponse>.completed(HttpResponse)", e); //$NON-NLS-1$
                    }
                    message.setException(e);
                    message.getControlMessage().getMessageDispatcher().sendMessage(message);
                }
//                  System.out.println(httpget.getRequestLine() + "->" + response.getStatusLine());
            }

            public void failed(final Exception ex) {
                if (logger.isErrorEnabled()) {
                    logger.error("$FutureCallback<HttpResponse>.failed(Exception)", ex); //$NON-NLS-1$
                }
                message.setException(ex);
                message.getControlMessage().getMessageDispatcher().sendMessage(message);

//                  System.out.println(httpget.getRequestLine() + "->" + ex);
            }

            public void cancelled() {
                Exception exception = new Exception("已取消");
                message.setException(exception);
                message.getControlMessage().getMessageDispatcher().sendMessage(message);

//                  System.out.println(httpget.getRequestLine() + " cancelled");
            }

        });
        return null;
    }

配置和API说明

配置说明

通过在xml中的Step实现内部Actor之间的流程跳转 在配置文件中包含 Actor、chain、和global配置 。 程序整个执行顺序为根据交易码找到对应的Actor,然后执行按照chain->parent->selft的顺序进行执行。
chain执行到placeholder处,调用parent交易继续执行,在parent交易中执行到placeholder交易后,调用selft自身交易继续执行。 自身交易执行完毕,弹出parent的placeholder处交易继续执行.parent执行完毕,弹出chain中代码继续执行。 global配置如下

  <actor:global id="actorglobal">
         <actor:param name="beginBeanId" value="beginActor"/>
         <actor:param name="endBeanId" value="endActor"/>
   </actor:global>

beginBeanId为默认的开始Actor,value中的值是在Spring中对应的beanName,程序初始化时将会取得此值,对未指定beginBeanId或者endBeanId的Actor初始化全局配置。
beginActor和endActor都需要继承Actor接口。 actor配置如下

    <actor:actor id="actorhttpcore"   parent="chainparent" chain="unLoginChain"  handleException="true"  endBeanId="FinishActor" >

        <actor:steps>
            <actor:step xpoint="" ypont="" fromBeanId="beginActor"  conditon="" toBeanId="placeholderActor"/>
             <actor:step xpoint="" ypont="" fromBeanId="beginActor"  conditon="" async="true"  toBeanId="placeholderActor"/>
           <actor:step xpoint="" ypont=""  fromBeanId="placeholderActor" conditon="context._SUFFIX=='json'"  toBeanId="JsonViewResolverActor"/>
            <actor:step xpoint="" ypont=""  fromBeanId="placeholderActor" conditon="exception==null" toBeanId="ViewResolveActor"/>
            <actor:step xpoint="" ypont=""   fromBeanId="placeholderActor" conditon="exception!=null"  toBeanId="ErrorViewResolveActor"/>
        </actor:steps>
             <results>
                    <result name="success">htmlstream:</result>
             </results>
    </actor:actor>

属性handleException如果不设置的话,遇到异常,程序将会认为子类中已经执行完毕,跳到parent中PlaceHolder处执行。设置为true,将不会直接跳转到parent中,由子类进行自我处理。
parent和chain为调用具体交易前需要调用的公共交易,由于大部分交易都有通用的前置交易和统一的后置交易。通过设置parent或者chain,可提高代码复用度。
fromBeanId和toBeanId配置的是Actor或者实现Actor接口的beanId。 parent和chain中的ref都需要是Actor.
results中可定义返回的state和需要处理的viewActor
async标记是否是旁路交易,默认值为false,为true值时,会将上下文内容设置复制一份,重新生成一份Message,进行执行,不影响主流程。 chain配置

   <actor:chain id="isLoginChain">
          <list>
              <ref bean="actorhttpcore"></ref>
              <ref bean="isLoginActor1"></ref>
  
          </list>
      </actor:chain>

chain可直观展现Actor调用顺序.
在chain中可顺序并列多个parent类。每个parent中的Step都需要有placeHolderActor,以调用子类。
依次执行list中的交易,再执行自身交易。自身交易执行完毕,再依次回溯责任链中的每个交易,直到无可用交易。 命名空间

 <actor:actors xmlns="http://www.ymotel.cn/schema/dactor"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns:aop="http://www.springframework.org/schema/aop"
               xmlns:actor="http://www.ymotel.cn/schema/dactor"
               xmlns:beans="http://www.springframework.org/schema/beans"
               xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
 	http://www.ymotel.cn/schema/dactor http://www.ymotel.cn/schema/dactor.xsd"  namespace="np">
     <!-- parent 和beginBeanId为全局name,randomTxt2在Spring中的全程是np.randomTxt2-->
     <actor id="randomTxt2" parent="actorhttpcore" beginBeanId="randomTxtActor">
     </actor>
 </actor:actors>

在actor中可增加命名空间,简化代码开发。在actor中配置namespace=np,则实例中的actor的id会自动拼装为np.randomTxt2
http://localhost:8080/example/np.randomTxt2.json为使用命名空间的例子,相关配置在conf/namespace.xml中。

重要类方法说明

cn.ymotel.dactor.core.MessageDispatcher是交易流转的核心接口类
public void startMessage(Message message, ActorTransactionCfg actorcfg, boolean blocked) throws Exception
方法,用于开始整个流程,其中message需要在执行前进行构造,actorcfg可通过spring的getBean方法得到为Actor对象,如下

<actor id="randomTxt1" parent="randomTxt" beginBeanId="randomTxtActor">
    </actor>
通过getBean('randomTxt1')即可得到ActorTransactionCfg对象。  
blocked为是否阻塞,一般在交易初次放入队列是为false,表示如果队列满,则直接扔给客户端进行处理。为true则一般为内部交易,必须提交给队列进行处理。   sendMessage方法内部调用,用于将处理完毕的Message重新放入队列,继续下一步流程。  cn.ymotel.dactor.core.disruptor.MessageRingBufferDispatcher是MessageDispatcher的接口实现类。,在启动Spring是需要在配置中加上  ```
 MessageRingBufferDispatcher的strategy、bufferSize、threadNumber为三个可设置属性.正常情况下使用默认设置即可。  
 strategy默认使用ringBuffer的BlockingWaitStrategy策略进行调度,如果交易量比较大,可调整此策略。  
 bufferSize默认使用1024。   
 threadNumber默认使用CPU个数的线程数。   
 ## 其他默认Actor说明
 cn.ymotel.dactor.message.Message.Actor,所有需要在执行的交易都必须继承此接口。   
 public Object HandleMessage(Message message) throws Exception;程序通过调用HandleMessage对象,如果返回的不是message对象或者为NULL,则认为此交易是异步执行,不再自行调度。由异步交易在收到请求后,自己调用将Message再此放入队列中。  
 cn.ymotel.dactor.action.PlaceholderActor 交易为特殊交易,用来将当前队列暂存,并调用子交易。  
 cn.ymotel.dactor.action.BeginActor 为Actor中step的默认开始交易。  
 cn.ymotel.dactor.action.EndActor 为Actor中step的默认结束交易。
 cn.ymotel.dactor.action.JsonViewResolverActor为需要返回Json的J2EE view
  cn.ymotel.dactor.action.ViewResolveActor为需要返回J2EE view的统一处理Actor  
  cn.ymotel.dactor.action.httpclient.HttpClientActor 提供的异步调用httpClient的Actor  
cn.ymotel.dactor.action.netty.aysnsocket.TcpClientActor 提供的异步调用netty的Actor  
 ## 交易流程举例说明

``` 以上交易的交易流程图如下 randomTxt2.png 以上的完整例子都可在example中得到