当前位置: 首页 > news >正文

保定专门做网站的公司/网推是什么

保定专门做网站的公司,网推是什么,复制别人网站的源码做网站模板,网站开发加盟商怎么做0 前言 对于Java WEB应用来说,Spring的Filter可以拦截WEB接口调用,但对于Dubbo接口,Spring的Filter就不起作用了。 Dubbo中的Filter实现是 专门为服务提供方和服务消费方调用过程进行拦截,Dubbo本身的大多功能均基于此扩展点实现&…

0 前言

对于Java WEB应用来说,Spring的Filter可以拦截WEB接口调用,但对于Dubbo接口,Spring的Filter就不起作用了。

Dubbo中的Filter实现是 专门为服务提供方和服务消费方调用过程进行拦截,Dubbo本身的大多功能均基于此扩展点实现,每次远程方法执行,该拦截都会被执行,但请注意其对性能的影响。

所以,在实际业务开发中,使用最多的可能就是对Filter接口进行扩展,在服务调用链路中嵌入我们自身的处理逻辑,如日志打印、调用耗时统计等。

Dubbo官方针对Filter做了很多的原生支持,目前大致有20来个吧,包括我们熟知的RpcContext,accesslog功能都是通过filter来实现了,下面一起详细看一下Filter的实现。

1 构造Filter链

Dubbo的Filter实现入口是 在ProtocolFilterWrapper,因为ProtocolFilterWrapper是Protocol的包装类,所以会在加载的Extension的时候被自动包装进来(理解这里的前提是 理解Dubbo的SPI机制 ),该封装器实现了Protocol接口,并提供了一个参数类型为Protocol的构造方法。Dubbo依据这个构造方法识别出封装器,并将该封装器作为其他Protocol接口实现的代理

接下来,我们看一下ProtocolFilterWrapper中是如何构造Filter链:

public class ProtocolFilterWrapper implements Protocol {private final Protocol protocol;// 带参数构造器,ExtensionLoad通过该构造器识别封装器public ProtocolFilterWrapper(Protocol protocol){if (protocol == null) {throw new IllegalArgumentException("protocol == null");}this.protocol = protocol;}public int getDefaultPort() {return protocol.getDefaultPort();}// 对提供方服务暴露进行封装,组装filter调用链public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {// 向注册中心发布服务的时候并不会进行filter调用链if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {return protocol.export(invoker);}return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));}// 对消费方服务引用进行封装,组装filter调用链public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {// 向注册中心引用服务的时候并不会进行filter调用链if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {return protocol.refer(type, url);}return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);}public void destroy() {protocol.destroy();}// 构造filter调用链private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {Invoker<T> last = invoker;// 获得所有激活的Filter(已经排好序的)List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);if (filters.size() > 0) {for (int i = filters.size() - 1; i >= 0; i --) {final Filter filter = filters.get(i);// 复制引用,构建filter调用链final Invoker<T> next = last;// 这里只是构造一个最简化的Invoker作为调用链的载体Invokerlast = new Invoker<T>() {public Class<T> getInterface() {return invoker.getInterface();}public URL getUrl() {return invoker.getUrl();}public boolean isAvailable() {return invoker.isAvailable();}// 关键代码,单向链表指针传递public Result invoke(Invocation invocation) throws RpcException {return filter.invoke(next, invocation);}public void destroy() {invoker.destroy();}@Overridepublic String toString() {return invoker.toString();}};}}return last;}
}

 

这里的关键代码在buildInvokerChain方法,参数invoker为实际的服务( 对于消费方而言,就是服务的动态代理 )。从ExtensionLoader获取到已经过排序的Filter列表(排序规则可参见ActivateComparator),然后开始倒序组装。

这里是个典型的装饰器模式,不过装饰器链条上的每个节点都是一个匿名内部类Invoker实例

  1. 每个节点invoker持有一个Filter引用,一个下级invoker节点引用以及实际调用的invoker实例(虽然持有但并不实际调用,仅仅是提供获取实际invoker相关参数的功能,如getInterface,getUrl等方法);
  2. 通过invoke方法,invoker节点将下级节点传递给当前的filter进行调用
  3. filter在执行invoke方法时,就会触发下级节点invoker调用其invoke方法,实现调用的向下传递;
  4. 当到达最后一级invoker节点,即实际服务invoker,即可执行真实业务逻辑

这条调用链的每个节点都为真实的invoker增加了自定义的功能,在整个链条上不断丰富功能,是典型的装饰器模式。

看到上面的内容,我们大致能明白实现是这样子的,通过获取所有可以被激活的Filter链,然后根据一定顺序构造出一个Filter的调用链,最后的调用链大致是这样子:Filter1->Filter2->Filter3->......->Invoker,这个构造Filter链的逻辑非常简单,重点就在于如何获取被激活的Filter链

// 将key在url中对应的配置值切换成字符串信息数组
public List<T> getActivateExtension(URL url, String key, String group) {String value = url.getParameter(key);return getActivateExtension(url, value == null || value.length() == 0 ? null : Constants.COMMA_SPLIT_PATTERN.split(value), group);
}public List<T> getActivateExtension(URL url, String[] values, String group) {List<T> exts = new ArrayList<T>();// 所有用户自己配置的filter信息(有些Filter是默认激活的,有些是配置激活的,这里这里的names就指的配置激活的filter信息)List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);// 如果这些名称里不包括去除default的标志(-default),换言之就是加载Dubbo提供的默认Filterif (! names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {// 加载extension信息
        getExtensionClasses();for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {// name指的是SPI读取的配置文件的keyString name = entry.getKey();Activate activate = entry.getValue();// group主要是区分是在provider端生效还是consumer端生效if (isMatchGroup(group, activate.group())) {T ext = getExtension(name);// 这里以Filter为例:三个判断条件的含义依次是:// 1. 用户配置的filter列表中不包含当前ext// 2. 用户配置的filter列表中不包含当前ext的加-的key// 3. 如果用户的配置信息(url中体现)中有可以激活的配置key并且数据不为0,false,null,N/A,也就是说有正常的使用if (! names.contains(name)&& ! names.contains(Constants.REMOVE_VALUE_PREFIX + name) && isActive(activate, url)) {exts.add(ext);}}}// 根据Activate注解上的order排序
        Collections.sort(exts, ActivateComparator.COMPARATOR);}// 进行到此步骤的时候Dubbo提供的原生的Filter已经被添加完毕了,下面处理用户自己扩展的FilterList<T> usrs = new ArrayList<T>();for (int i = 0; i < names.size(); i ++) {String name = names.get(i);// 如果单个name不是以-开头并且所有的key里面并不包含-'name'(也就是说如果配置成了"dubbo,-dubbo"这种的可以,这个if是进不去的)if (! name.startsWith(Constants.REMOVE_VALUE_PREFIX)&& ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {// 可以通过default关键字替换Dubbo原生的Filter链,主要用来控制调用链顺序if (Constants.DEFAULT_KEY.equals(name)) {if (usrs.size() > 0) {exts.addAll(0, usrs);usrs.clear();}} else {// 加入用户自己定义的扩展FilterT ext = getExtension(name);usrs.add(ext);}}}if (usrs.size() > 0) {exts.addAll(usrs);}return exts;
}

 

基本上到这里就能看到Filter链是如何被加载进来的,这里设计的非常灵活,忍不住要感叹一下:通过简单的配置‘-’可以手动剔除Dubbo原生的一定加载Filter,通过default来代替Dubbo原生的一定会加载的Filter从而来控制顺序。这些设计虽然都是很小的功能点,但是总体的感觉是十分灵活,考虑的比较周到。

所以,从上面源码分析得知:

默认filter链,先执行原生filter,再依次执行自定义filter,继而回溯到原点

知道了Filter构造的过程之后,我们就详细看几个比较重要的Filter信息。首先,看一下com.alibaba.dubbo.rpc.Filter接口的源码,如下:

@SPI
public interface Filter {Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;
}

 

Dubbo原生的filter定义在META-INF/dubbo/internal/com.alibaba.dubbo.rpc.filter文件中,具体如下:

echo=com.alibaba.dubbo.rpc.filter.EchoFilter
generic=com.alibaba.dubbo.rpc.filter.GenericFilter
genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter
token=com.alibaba.dubbo.rpc.filter.TokenFilter
accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter
activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
context=com.alibaba.dubbo.rpc.filter.ContextFilter
consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter
executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter
compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter
timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter
monitor=com.alibaba.dubbo.monitor.support.MonitorFilter
validation=com.alibaba.dubbo.validation.filter.ValidationFilter
cache=com.alibaba.dubbo.cache.filter.CacheFilter
trace=com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilter
future=com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter

 

Dubbo自带超时过滤器TimeoutFilter实现如下:

@Activate(group = Constants.PROVIDER)
public class TimeoutFilter implements Filter {private static final Logger logger = LoggerFactory.getLogger(TimeoutFilter.class);public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {long start = System.currentTimeMillis();Result result = invoker.invoke(invocation);long elapsed = System.currentTimeMillis() - start;if (invoker.getUrl() != null&& elapsed > invoker.getUrl().getMethodParameter(invocation.getMethodName(),"timeout", Integer.MAX_VALUE)) {if (logger.isWarnEnabled()) {logger.warn("invoke time out. method: " + invocation.getMethodName()+ "arguments: " + Arrays.toString(invocation.getArguments()) + " , url is "+ invoker.getUrl() + ", invoke elapsed " + elapsed + " ms.");}}return result;}
}

 

  1. 注解@Activate是否是Dubbo Filter必须的,其上的group和order分别扮演什么样的角色?

    对于Dubbo原生自带的filter,注解@Activate是必须,其group用于provider/consumer的站队,而order值是filter顺序的依据。但是对于自定义filter而言,注解@Activate没被用到,其分组和顺序,完全由用户手工配置指定。如果自定义filter添加了@Activate注解,并指定了group了,则这些自定义filter将升级为原生filter组

  2. Filter的顺序是否可以调整, 如何实现?

    可以调整,通过'-'符号可以去除某些filter,而default代表默认激活的原生filter子链,通过重排default和自定义filter的顺序,达到实现顺序控制的目的

让我们来构建几个case,来看看如何配置能满足。假定自定义filter的对象为filter1,filter2:

case 1: 其执行顺序为, 原生filter子链->filter1->filter2

<dubbo:reference filter="filter1,filter2"/>

 

case 2: 其执行顺序为, filter1->filter2->原生filter子链

<dubbo:reference filter="filter1,filter2,default"/>

 

case 3: 其执行顺序为, filter1->原生filter子链->filter2, 同时去掉原生的TokenFilter(token)

<dubbo:service filter="filter1,default,filter2,-token"/>

 

Filter在作用端区分的话主要是区分为consumer和provider,下面我们就以这个为区分来分别介绍一些重点的Filter。

2 Consumer

2.1 ConsumerContextFilter

package com.alibaba.dubbo.rpc.filter;import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.common.utils.NetUtils;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcInvocation;/*** ConsumerContextInvokerFilter(默认触发)*/
@Activate(group = Constants.CONSUMER, order = -10000)
public class ConsumerContextFilter implements Filter {public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {// 在当前的RpcContext中记录本地调用的一次状态信息
        RpcContext.getContext().setInvoker(invoker).setInvocation(invocation).setLocalAddress(NetUtils.getLocalHost(), 0).setRemoteAddress(invoker.getUrl().getHost(),invoker.getUrl().getPort());if (invocation instanceof RpcInvocation) {((RpcInvocation) invocation).setInvoker(invoker);}try {return invoker.invoke(invocation);} finally {RpcContext.getContext().clearAttachments();}}}

 

其实简单来看这个Filter的话是十分简单,它又是怎么将客户端设置的隐式参数传递给服务端呢

载体就是Invocation对象,在客户端调用Invoker.invoke方法时候,会去取当前状态记录器RpcContext中的attachments属性,然后设置到RpcInvocation对象中,在RpcInvocation传递到provider的时候会通过另外一个过滤器ContextFilter将RpcInvocation对象重新设置回RpcContext中供服务端逻辑重新获取隐式参数

这就是为什么RpcContext只能记录一次请求的状态信息,因为在第二次调用的时候参数已经被新的RpcInvocation覆盖掉,第一次的请求信息对于第二次执行是不可见的。

2.2 ActiveLimitFilter

package com.alibaba.dubbo.rpc.filter;import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcStatus;/*** LimitInvokerFilter(当配置了actives并且值不为0的时候触发)*/
@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY)
public class ActiveLimitFilter implements Filter {public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {URL url = invoker.getUrl();String methodName = invocation.getMethodName();int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);// 主要记录每台机器针对某个方法的并发数量RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());if (max > 0) {long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);long start = System.currentTimeMillis();long remain = timeout;int active = count.getActive();if (active >= max) {synchronized (count) {// 这个while循环是必要的,因为在一次wait结束后,可能线程调用已经结束了,腾出来consumer的空间while ((active = count.getActive()) >= max) {try {count.wait(remain);} catch (InterruptedException e) {}// 如果wait方法被中断的话,remain这时候有可能大于0// 如果其中一个线程运行结束后自动调用notify方法的话,也有可能remain大于0long elapsed = System.currentTimeMillis() - start;remain = timeout - elapsed;if (remain <= 0) {throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "+ invoker.getInterface().getName() + ", method: "+ invocation.getMethodName() + ", elapsed: " + elapsed+ ", timeout: " + timeout + ". concurrent invokes: " + active+ ". max concurrent invoke limit: " + max);}}}}}try {// 调用开始和结束后增减并发数量long begin = System.currentTimeMillis();RpcStatus.beginCount(url, methodName);try {Result result = invoker.invoke(invocation);RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);return result;} catch (RuntimeException t) {RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);throw t;}} finally {if (max > 0) {// 这里很关键,因为一个调用完成后要通知正在等待执行的队列synchronized (count) {count.notify();}}}}}

 

ActiveLimitFilter主要用于 限制同一个客户端对于一个服务端方法的并发调用量(客户端限流)。

2.3 FutureFilter

Future主要是处理事件信息,主要有以下几个事件:

  1. oninvoke 在方法调用前触发(如果调用出现异常则会直接触发onthrow方法)
  2. onreturn 在方法返回会触发(如果调用出现异常则会直接触发onthrow方法)
  3. onthrow 调用出现异常时候触发
package com.alibaba.dubbo.rpc.protocol.dubbo.filter;import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.remoting.exchange.ResponseCallback;
import com.alibaba.dubbo.remoting.exchange.ResponseFuture;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.StaticContext;
import com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter;
import com.alibaba.dubbo.rpc.support.RpcUtils;import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.Future;/*** EventFilter*/
@Activate(group = Constants.CONSUMER)
public class FutureFilter implements Filter {protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class);public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);// 这里主要处理回调逻辑,主要区分三个时间:oninvoke:调用前触发,onreturn:调用后触发 onthrow:出现异常情况时候触发
        fireInvokeCallback(invoker, invocation);// 需要在调用前配置好是否有返回值,已供invoker判断是否需要返回future.Result result = invoker.invoke(invocation);if (isAsync) {asyncCallback(invoker, invocation);} else {syncCallback(invoker, invocation, result);}return result;}private void syncCallback(final Invoker<?> invoker, final Invocation invocation, final Result result) {if (result.hasException()) {fireThrowCallback(invoker, invocation, result.getException());} else {fireReturnCallback(invoker, invocation, result.getValue());}}/*** 同步异步的主要处理区别:* 1. 同步调用的话,事件触发是直接调用的,没有任何逻辑;* 2. 异步的话就是首先获取到调用产生的Future对象,然后复写Future的done()方法,*    将fireThrowCallback和fireReturnCallback逻辑引入即可。*/private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {Future<?> f = RpcContext.getContext().getFuture();if (f instanceof FutureAdapter) {ResponseFuture future = ((FutureAdapter<?>) f).getFuture();future.setCallback(new ResponseCallback() {public void done(Object rpcResult) {if (rpcResult == null) {logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName()));return;}///must be rpcResultif (!(rpcResult instanceof Result)) {logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName()));return;}Result result = (Result) rpcResult;if (result.hasException()) {fireThrowCallback(invoker, invocation, result.getException());} else {fireReturnCallback(invoker, invocation, result.getValue());}}public void caught(Throwable exception) {fireThrowCallback(invoker, invocation, exception);}});}}private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {final Method onInvokeMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY));final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY));if (onInvokeMethod == null && onInvokeInst == null) {return;}if (onInvokeMethod == null || onInvokeInst == null) {throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());}// 由于JDK的安全检查耗时较多.所以通过setAccessible(true)的方式关闭安全检查就可以达到提升反射速度的目的if (!onInvokeMethod.isAccessible()) {onInvokeMethod.setAccessible(true);}// 从下面代码可以看出oninvoke的方法参数要与调用的方法参数一致Object[] params = invocation.getArguments();try {onInvokeMethod.invoke(onInvokeInst, params);} catch (InvocationTargetException e) {fireThrowCallback(invoker, invocation, e.getTargetException());} catch (Throwable e) {fireThrowCallback(invoker, invocation, e);}}// fireReturnCallback的逻辑与fireThrowCallback基本一样,所以不用看了private void fireReturnCallback(final Invoker<?> invoker, final Invocation invocation, final Object result) {final Method onReturnMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_METHOD_KEY));final Object onReturnInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_INSTANCE_KEY));//not set onreturn callbackif (onReturnMethod == null && onReturnInst == null) {return;}if (onReturnMethod == null || onReturnInst == null) {throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onReturnMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());}if (!onReturnMethod.isAccessible()) {onReturnMethod.setAccessible(true);}Object[] args = invocation.getArguments();Object[] params;Class<?>[] rParaTypes = onReturnMethod.getParameterTypes();if (rParaTypes.length > 1) {if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {params = new Object[2];params[0] = result;params[1] = args;} else {params = new Object[args.length + 1];params[0] = result;System.arraycopy(args, 0, params, 1, args.length);}} else {params = new Object[]{result};}try {onReturnMethod.invoke(onReturnInst, params);} catch (InvocationTargetException e) {fireThrowCallback(invoker, invocation, e.getTargetException());} catch (Throwable e) {fireThrowCallback(invoker, invocation, e);}}// fireReturnCallback的逻辑与fireThrowCallback基本一样,所以不用看了private void fireThrowCallback(final Invoker<?> invoker, final Invocation invocation, final Throwable exception) {final Method onthrowMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_METHOD_KEY));final Object onthrowInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_INSTANCE_KEY));//onthrow callback not configuredif (onthrowMethod == null && onthrowInst == null) {return;}if (onthrowMethod == null || onthrowInst == null) {throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onthrow callback config , but no such " + (onthrowMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());}if (!onthrowMethod.isAccessible()) {onthrowMethod.setAccessible(true);}Class<?>[] rParaTypes = onthrowMethod.getParameterTypes();if (rParaTypes[0].isAssignableFrom(exception.getClass())) {try {// 因为onthrow方法的参数第一个值必须为异常信息,所以这里需要构造参数列表Object[] args = invocation.getArguments();Object[] params;if (rParaTypes.length > 1) {// 回调方法只有一个参数而且这个参数是数组(单独拎出来计算的好处是这样可以少复制一个数组)if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {params = new Object[2];params[0] = exception;params[1] = args;} else {// 回调方法有多于一个参数params = new Object[args.length + 1];params[0] = exception;System.arraycopy(args, 0, params, 1, args.length);}} else {// 回调方法没有参数params = new Object[]{exception};}onthrowMethod.invoke(onthrowInst, params);} catch (Throwable e) {logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), e);}} else {logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), exception);}}
}

 

3 Provider

3.1 ContextFilter

ContextFilter和ConsumerContextFilter是结合使用的,之前的介绍中已经看了ConsumerContextFilter,下面再简单看一下ContextFilter,来验证上面讲到的逻辑。

package com.alibaba.dubbo.rpc.filter;import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcInvocation;import java.util.HashMap;
import java.util.Map;/*** ContextInvokerFilter*/
@Activate(group = Constants.PROVIDER, order = -10000)
public class ContextFilter implements Filter {public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {Map<String, String> attachments = invocation.getAttachments();if (attachments != null) {// 隐式参数重剔除一些核心消息attachments = new HashMap<String, String>(attachments);attachments.remove(Constants.PATH_KEY);attachments.remove(Constants.GROUP_KEY);attachments.remove(Constants.VERSION_KEY);attachments.remove(Constants.DUBBO_VERSION_KEY);attachments.remove(Constants.TOKEN_KEY);attachments.remove(Constants.TIMEOUT_KEY);attachments.remove(Constants.ASYNC_KEY);// Remove async property to avoid being passed to the following invoke chain.
        }RpcContext.getContext().setInvoker(invoker).setInvocation(invocation)
//                .setAttachments(attachments)  // merged from dubbox
                .setLocalAddress(invoker.getUrl().getHost(),invoker.getUrl().getPort());// mreged from dubbox// we may already added some attachments into RpcContext before this filter (e.g. in rest protocol)if (attachments != null) {// 这里又重新将invocation和attachments信息设置到RpcContext,// 这里设置以后provider的代码就可以获取到consumer端传递的一些隐式参数了if (RpcContext.getContext().getAttachments() != null) {RpcContext.getContext().getAttachments().putAll(attachments);} else {RpcContext.getContext().setAttachments(attachments);}}if (invocation instanceof RpcInvocation) {((RpcInvocation) invocation).setInvoker(invoker);}try {return invoker.invoke(invocation);} finally {RpcContext.removeContext();}}
}

 

3.2 EchoFilter

回响测试主要用来检测服务是否正常(网络状态),单纯的检测网络情况的话其实不需要执行真正的业务逻辑的,所以通过Filter验证一下即可。

package com.alibaba.dubbo.rpc.filter;import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcResult;/*** EchoInvokerFilter*/
@Activate(group = Constants.PROVIDER, order = -110000)
public class EchoFilter implements Filter {public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {if (inv.getMethodName().equals(Constants.$ECHO) && inv.getArguments() != null && inv.getArguments().length == 1)return new RpcResult(inv.getArguments()[0]);return invoker.invoke(inv);}}

 

3.3 ExecuteLimitFilter

服务端接口限制限流的具体执行逻辑就是在ExecuteLimitFilter中,因为服务端不需要考虑重试等待逻辑,一旦当前执行的线程数量大于指定数量,就直接返回失败了,所以实现逻辑相对于ActiveLimitFilter倒是简便了不少。

package com.alibaba.dubbo.rpc.filter;import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcStatus;import java.util.concurrent.Semaphore;/*** ThreadLimitInvokerFilter*/
@Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY)
public class ExecuteLimitFilter implements Filter {public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {URL url = invoker.getUrl();String methodName = invocation.getMethodName();Semaphore executesLimit = null;boolean acquireResult = false;int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);if (max > 0) {RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
//            if (count.getActive() >= max) {/*** http://manzhizhen.iteye.com/blog/2386408* use semaphore for concurrency control (to limit thread number)*/executesLimit = count.getSemaphore(max);if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");}}long begin = System.currentTimeMillis();boolean isSuccess = true;RpcStatus.beginCount(url, methodName);try {Result result = invoker.invoke(invocation);return result;} catch (Throwable t) {isSuccess = false;if (t instanceof RuntimeException) {throw (RuntimeException) t;} else {throw new RpcException("unexpected exception when ExecuteLimitFilter", t);}} finally {RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);if(acquireResult) {executesLimit.release();}}}
}

 

3.4 ExceptionFilter

Dubbo 对于异常的处理有自己的一套规则:

  1. 如果是 checked异常 则直接抛出;
  2. 如果是unchecked异常 但是在接口上有声明,也会直接抛出;
  3. 如果异常类和接口类在同一jar包里,直接抛出;
  4. 如果是 JDK自带的异常 ,直接抛出;
  5. 如果是 Dubbo的异常 ,直接抛出;
  6. 其余的都包装成RuntimeException然后抛出(避免异常在Client不能反序列化问题)
package com.alibaba.dubbo.rpc.filter;import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.utils.ReflectUtils;
import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcResult;
import com.alibaba.dubbo.rpc.service.GenericService;import java.lang.reflect.Method;/*** ExceptionInvokerFilter* <p>* Functions:* <ol>* <li>unexpected exception will be logged in ERROR level on provider side. Unexpected exception are unchecked* exception not declared on the interface</li>* <li>Wrap the exception not introduced in API package into RuntimeException. Framework will serialize the outer exception but stringnize its cause in order to avoid of possible serialization problem on client side</li>* </ol>*/
@Activate(group = Constants.PROVIDER)
public class ExceptionFilter implements Filter {private final Logger logger;public ExceptionFilter() {this(LoggerFactory.getLogger(ExceptionFilter.class));}public ExceptionFilter(Logger logger) {this.logger = logger;}public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {try {Result result = invoker.invoke(invocation);if (result.hasException() && GenericService.class != invoker.getInterface()) {try {Throwable exception = result.getException();// directly throw if it's checked exceptionif (!(exception instanceof RuntimeException) && (exception instanceof Exception)) {return result;}// directly throw if the exception appears in the signaturetry {Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());Class<?>[] exceptionClassses = method.getExceptionTypes();for (Class<?> exceptionClass : exceptionClassses) {if (exception.getClass().equals(exceptionClass)) {return result;}}} catch (NoSuchMethodException e) {return result;}// for the exception not found in method's signature, print ERROR message in server's log.logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()+ ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()+ ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);// directly throw if exception class and interface class are in the same jar file.String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {return result;}// directly throw if it's JDK exceptionString className = exception.getClass().getName();if (className.startsWith("java.") || className.startsWith("javax.")) {return result;}// directly throw if it's dubbo exceptionif (exception instanceof RpcException) {return result;}// otherwise, wrap with RuntimeException and throw back to the clientreturn new RpcResult(new RuntimeException(StringUtils.toString(exception)));} catch (Throwable e) {logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost()+ ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()+ ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);return result;}}return result;} catch (RuntimeException e) {logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()+ ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()+ ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);throw e;}}
}

 

到这,Dubbo中的几个核心Filter已经讲完,Filter其实没有那么复杂,在开发过程中,也可以参考此思路实现自己的Filter链。


转载自:
作者:猿码道
链接:https://www.jianshu.com/p/6dd76ce7338f

转载于:https://www.cnblogs.com/technologykai/articles/10951345.html

http://www.lbrq.cn/news/1429237.html

相关文章:

  • 菏泽网站建设哪好/百度推广网址
  • dj网站建设/长沙seo网络公司
  • 网站域名是不是网址/seo教学视频教程
  • 日本做爰动漫网站/武汉竞价托管公司
  • 烟台广告公司南网站建设评价/seo优化咨询
  • 网络营销课程教案/宜昌网站seo收费
  • 传媒网站给行业做宣传/晚上免费b站软件
  • 网站做js跳转/深圳百度seo怎么做
  • 免费做产品宣传的网站/网络营销环境
  • 学校做网站的软件/上海优化公司
  • 和网站开发公司如何签合同/企业宣传
  • 建设工程人力资源网查询平台/青岛seo关键字排名
  • 深圳网站建设ejaket/站长全网指数查询
  • 佳木斯市郊区建设局网站/seo高级教程
  • 西安疫情紧急通告公告/最优化方法
  • html网站制作答辩问题/整合营销案例举例说明
  • 怎么做网页游戏代理/seo搜索引擎优化是通过优化答案
  • 惠东做网站/2345网址导航电脑版
  • 如何上传模板到网站/搜索引擎排名优化方法
  • 织梦网站模板怎么做/最近的重要新闻
  • 网站速度对seo的影响/网络推广的基本方法
  • 金华网站制作建设/杭州seo百度关键词排名推广
  • dreamweaver动态网页制作/深圳网站seo外包公司哪家好
  • 互力互通网站建设/官网优化包括什么内容
  • 成都网站建设 冠辰/2021年经典营销案例
  • 网站建设销售销售流程/商丘 峰少 seo博客
  • 怎样给公司做网站/网络营销的渠道有哪些
  • 农家乐网站建设营销方案/百度电脑端网页版入口
  • 手机网站绑定域名是什么意思/快速排名官网
  • 大数据比赛网站建设/台州seo网站排名优化
  • 【Canvas与玻璃光】铝圈蓝底玻璃光按钮
  • Vue2与Vue3生命周期函数全面解析:从入门到精通
  • 一个集成多源威胁情报的聚合平台,提供实时威胁情报查询和播报服务、主动拦截威胁IP,集成AI等多项常用安全类工具
  • python的美食交流社区系统
  • Scikit-learn (sklearn) 库详细介绍
  • STM32F103C8T6学习——直接存储器访问(DMA)标准库实战3(ADC数据采集+DMA回传)