博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
motan源码分析五:cluster相关
阅读量:6269 次
发布时间:2019-06-22

本文共 9385 字,大约阅读时间需要 31 分钟。

上一章我们分析了客户端调用服务端相关的源码,但是到了cluster里面的部分我们就没有分析了,本章将深入分析cluster和它的相关支持类。

1.clustersupport的创建过程,上一章的ReferConfig的initRef()方法中调用了相关的创建代码:

for(Iterator iterator = protocols.iterator(); iterator.hasNext();)        {            ProtocolConfig protocol = (ProtocolConfig)iterator.next();            LoggerUtil.info((new StringBuilder("ProtocolConfig's")).append(protocol.getName()).toString());            Map params = new HashMap();            params.put(URLParamType.nodeType.getName(), "referer");            params.put(URLParamType.version.getName(), URLParamType.version.getValue());            params.put(URLParamType.refreshTimestamp.getName(), String.valueOf(System.currentTimeMillis()));            collectConfigParams(params, new AbstractConfig[] {                protocol, basicReferer, extConfig, this            });            collectMethodConfigParams(params, getMethods());            URL refUrl = new URL(protocol.getName(), localIp, 0, interfaceClass.getName(), params);            ClusterSupport clusterSupport = createClusterSupport(refUrl, configHandler, registryUrls);//创建clustersupport            clusterSupports.add(clusterSupport);            clusters.add(clusterSupport.getCluster());//获取对应的cluster            proxy = proxy != null ? proxy : refUrl.getParameter(URLParamType.proxy.getName(), URLParamType.proxy.getValue());        }    private ClusterSupport createClusterSupport(URL refUrl, ConfigHandler configHandler, List registryUrls)    {        List regUrls = new ArrayList();        if(StringUtils.isNotBlank(directUrl) || "injvm".equals(refUrl.getProtocol()))        {            URL regUrl = new URL("local", "127.0.0.1", 0, com/weibo/api/motan/registry/RegistryService.getName());            if(StringUtils.isNotBlank(directUrl))            {                StringBuilder duBuf = new StringBuilder(128);                String dus[] = MotanConstants.COMMA_SPLIT_PATTERN.split(directUrl);                String as[];                int j = (as = dus).length;                for(int i = 0; i < j; i++)                {                    String du = as[i];                    if(du.contains(":"))                    {                        String hostPort[] = du.split(":");                        URL durl = refUrl.createCopy();                        durl.setHost(hostPort[0].trim());                        durl.setPort(Integer.parseInt(hostPort[1].trim()));                        durl.addParameter(URLParamType.nodeType.getName(), "service");                        duBuf.append(StringTools.urlEncode(durl.toFullStr())).append(",");                    }                }                if(duBuf.length() > 0)                {                    duBuf.deleteCharAt(duBuf.length() - 1);                    regUrl.addParameter(URLParamType.directUrl.getName(), duBuf.toString());                }            }            regUrls.add(regUrl);        } else//走注册中心的方式        {            if(registryUrls == null || registryUrls.isEmpty())                throw new IllegalStateException(String.format("No registry to reference %s on the consumer %s , please config 
in your spring config.", new Object[] { interfaceClass, "127.0.0.1" })); URL url; for(Iterator iterator = registryUrls.iterator(); iterator.hasNext(); regUrls.add(url.createCopy())) url = (URL)iterator.next(); } URL url; for(Iterator iterator1 = regUrls.iterator(); iterator1.hasNext(); url.addParameter(URLParamType.embed.getName(), StringTools.urlEncode(refUrl.toFullStr()))) url = (URL)iterator1.next(); return configHandler.buildClusterSupport(interfaceClass, regUrls);//调用simpleconfighandler的创建clustersupport方法 } public
ClusterSupport
buildClusterSupport(Class
interfaceClass, List
registryUrls) { ClusterSupport
clusterSupport = new ClusterSupport
(interfaceClass, registryUrls);//创建cluster支持类,将业务接口和注册中心信息传递进去 clusterSupport.init();//初始化 return clusterSupport; }

2.clustersupport的init和prepare方法

public void init() {        prepareCluster();        URL subUrl = toSubscribeUrl(url);        for (URL ru : registryUrls) {//循环注册中心的url            String directUrlStr = ru.getParameter(URLParamType.directUrl.getName());            // 如果有directUrl,直接使用这些directUrls进行初始化,不用到注册中心discover            if (StringUtils.isNotBlank(directUrlStr)) {                List
directUrls = parseDirectUrls(directUrlStr); if (!directUrls.isEmpty()) { notify(ru, directUrls); LoggerUtil.info("Use direct urls, refUrl={}, directUrls={}", url, directUrls); continue; } } // client 注册自己,同时订阅service列表 Registry registry = getRegistry(ru);//获取zookeeper的注册中心 registry.subscribe(subUrl, this);//注册自己并订阅服务 } boolean check = Boolean.parseBoolean(url.getParameter(URLParamType.check.getName(), URLParamType.check.getValue())); if (!CollectionUtil.isEmpty(cluster.getReferers()) || !check) { cluster.init();//初始化集群 if (CollectionUtil.isEmpty(cluster.getReferers()) && !check) { LoggerUtil.warn(String.format("refer:%s", this.url.getPath() + "/" + this.url.getVersion()), "No services"); } return; } throw new MotanFrameworkException(String.format("ClusterSupport No service urls for the refer:%s, registries:%s", this.url.getIdentity(), registryUrls), MotanErrorMsgConstant.SERVICE_UNFOUND); } private void prepareCluster() { String clusterName = url.getParameter(URLParamType.cluster.getName(), URLParamType.cluster.getValue());//集群名称 String loadbalanceName = url.getParameter(URLParamType.loadbalance.getName(), URLParamType.loadbalance.getValue());//负载均衡名称 String haStrategyName = url.getParameter(URLParamType.haStrategy.getName(), URLParamType.haStrategy.getValue());//ha高可用名称 cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(clusterName);//获取具体的集群对象 LoadBalance
loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(loadbalanceName);//获取具体的负载均衡方式,目前motan支持6种负载方式 HaStrategy
ha = ExtensionLoader.getExtensionLoader(HaStrategy.class).getExtension(haStrategyName);//获取高可用的方式,目前支持两种failfast和failover方式 cluster.setLoadBalance(loadBalance); cluster.setHaStrategy(ha); cluster.setUrl(url); }

3.负载均衡,motan支持6种方式,分别是:轮训、随机、hash、本地服务优先、权重可配置、低并发优先,具体代码可见com.weibo.api.motan.cluster.loadbalance目录,本文我们主要看一下轮训的方式:

public class RoundRobinLoadBalance
extends AbstractLoadBalance
{ private AtomicInteger idx = new AtomicInteger(0); @Override protected Referer
doSelect(Request request) { List
> referers = getReferers();//获取所有服务器的引用 int index = idx.incrementAndGet();//自增 for (int i = 0; i < referers.size(); i++) { Referer
ref = referers.get((i + index) % referers.size());//利用自增数去模,达到轮训的目的 if (ref.isAvailable()) { return ref; } } return null; } @Override protected void doSelectToHolder(Request request, List
> refersHolder) { List
> referers = getReferers(); int index = idx.incrementAndGet(); for (int i = 0; i < referers.size(); i++) { Referer
referer = referers.get((i + index) % referers.size()); if (referer.isAvailable()) { refersHolder.add(referer); } } }}

4.motan支持failfast和failover两种方式,failfast只调用一次,如果失败则直接返回失败,failover循环调用若干次,直到成功或循环结束后

public Response call(Request request, LoadBalance
loadBalance) { List
> referers = selectReferers(request, loadBalance);//获取所有的引用 if (referers.isEmpty()) { throw new MotanServiceException(String.format("FailoverHaStrategy No referers for request:%s, loadbalance:%s", request, loadBalance)); } URL refUrl = referers.get(0).getUrl(); // 先使用method的配置 int tryCount = refUrl.getMethodParameter(request.getMethodName(), request.getParamtersDesc(), URLParamType.retries.getName(), URLParamType.retries.getIntValue());//获取重试次数 // 如果有问题,则设置为不重试 if (tryCount < 0) { tryCount = 0; } for (int i = 0; i <= tryCount; i++) { Referer
refer = referers.get(i % referers.size());//循环调用 try { request.setRetries(i); return refer.call(request); } catch (RuntimeException e) { // 对于业务异常,直接抛出 if (ExceptionUtil.isBizException(e)) { throw e;//业务异常退出调用 } else if (i >= tryCount) { throw e; } LoggerUtil.warn(String.format("FailoverHaStrategy Call false for request:%s error=%s", request, e.getMessage())); } } throw new MotanFrameworkException("FailoverHaStrategy.call should not come here!"); }

本章知识点总结:

1.一个cluster有一个cluster的支持类,有一个ha,有一个loadbalance;

2.motan支持6种负载均衡方式;

3.motan支持failover的ha方式;

 

转载于:https://www.cnblogs.com/mantu/p/5885996.html

你可能感兴趣的文章
【转】如何实现一个配置中心
查看>>
Docker —— 用于统一开发和部署的轻量级 Linux 容器【转】
查看>>
Threejs 官网 - Three.js 的图形用户界面工具(GUI Tools with Three.js)
查看>>
Atitit.Java exe bat 作为windows系统服务程序运行
查看>>
session的生命周期
查看>>
数据库的本质、概念及其应用实践(二)
查看>>
iOS开发多线程--(NSOperation/Queue)
查看>>
php的ajax简单实例
查看>>
maven常用构建命令
查看>>
C#:关联程序和文件
查看>>
推荐科研软件
查看>>
gradle
查看>>
如何取消未知类型文件默认用记事本打开
查看>>
[Javascript] Immute Object
查看>>
Java 关于finally、static
查看>>
Posix mq和SystemV mq区别
查看>>
P6 EPPM Manual Installation Guide (Oracle Database)
查看>>
XMPP协议、IM、客户端互联详解
查看>>
PHP写文件函数
查看>>
mysql的sql_mode合理设置
查看>>