Dubbo压测插件的实现——基于Gatling

Dubbo 压测插件已开源,本文涉及代码详见gatling-dubbo

Gatling 是一个开源的基于 Scala、Akka、Netty 实现的高性能压测框架,较之其他基于线程实现的压测框架,Gatling 基于 AKKA Actor 模型实现,请求由事件驱动,在系统资源消耗上低于其他压测框架(如内存、连接池等),使得单台施压机可以模拟更多的用户。此外,Gatling 提供了一套简单高效的 DSL(领域特定语言)方便我们编排业务场景,同时也具备流量控制、压力控制的能力并提供了良好的压测报告,所以有赞选择在 Gatling 基础上扩展分布式能力,开发了自己的全链路压测引擎 MAXIM。全链路压测中我们主要模拟用户实际使用场景,使用 HTTP 接口作为压测入口,但有赞目前后端服务中 Dubbo 应用比重越来越高,如果可以知道 Dubbo 应用单机水位将对我们把控系统后端服务能力大有裨益?;?Gatling 的优势和在有赞的使用基础,我们扩展 Gatling 开发了 gatling-dubbo 压测插件。

插件主要结构

实现 Dubbo 压测插件,需实现以下四部分内容:

Protocol 和 ProtocolBuild

协议部分,这里主要定义 Dubbo 客户端相关内容,如协议、泛化调用、服务 URL、注册中心等内容,ProtocolBuild 则为 DSL 使用 Protocol 的辅助类

Action 和 ActionBuild

执行部分,这里的作用是发起 Dubbo 请求,校验请求结果并记录日志以便后续生成压测报告。ActionBuild 则为 DSL 使用 Action 的辅助类

Check 和 CheckBuild

检查部分,全链路压测中我们都使用Json Path检查请求结果,这里我们实现了一样的检查逻辑。CheckBuild 则为 DSL 使用 Check 的辅助类

DSL

Dubbo 插件的领域特定语言,我们提供了一套简单易用的 API 方便编写 Duboo 压测脚本,风格上与原生 HTTP DSL 保持一致

Protocol

协议部分由 5 个属性组成,这些属性将在 Action 初始化 Dubbo 客户端时使用,分别是:

protocol

协议,设置为dubbo

generic

泛化调用设置,Dubbo 压测插件使用泛化调用发起请求,所以这里设置为true,有赞优化了泛化调用的性能,为了使用该特性,引入了一个新值result_no_change(去掉优化前泛化调用的序列化开销以提升性能)

url

Dubbo 服务的地址:dubbo://IP地址:端口

registryProtocol

Dubbo 注册中心的协议,设置为ETCD3

registryAddress

Dubbo 注册中心的地址

如果是测试 Dubbo 单机水位,则设置 url,注册中心设置为空;如果是测试 Dubbo 集群水位,则设置注册中心(目前支持 ETCD3),url 设置为空。由于目前注册中心只支持 ETCD3,插件在 Dubbo 集群上使用缺乏灵活性,所以我们又实现了客户端层面的负载均衡,如此便可抛开特定的注册中心来测试 Dubbo 集群水位。该特性目前正在内测中。

objectDubboProtocol{valDubboProtocolKey=newProtocolKey{typeProtocol=DubboProtocoltypeComponents=DubboComponentsdefprotocolClass:Class[io.gatling.core.protocol.Protocol] = classOf[DubboProtocol].asInstanceOf[Class[io.gatling.core.protocol.Protocol]]defdefaultProtocolValue(configuration:GatlingConfiguration):DubboProtocol=thrownewIllegalStateException("Can't provide a default value for DubboProtocol")defnewComponents(system:ActorSystem, coreComponents:CoreComponents):DubboProtocol=>DubboComponents= {? ? ? dubboProtocol =>DubboComponents(dubboProtocol)? ? }? }}caseclassDubboProtocol(protocol:String, //dubbo? ? generic:String, //泛化调用?? ? url:String, //use url or? ? registryProtocol:String,? //use registry? ? registryAddress:String//use registry)extendsProtocol{typeComponents=DubboComponents}

为了方便 Action 中使用上面这些属性,我们将其装进了 Gatling 的 ProtocolComponents:

caseclassDubboComponents(dubboProtocol:DubboProtocol)extendsProtocolComponents{defonStart:Option[Session=>Session] =NonedefonExit:Option[Session=>Unit] =None}

以上就是关于 Protocol 的定义。为了能在 DSL 中配置上述 Protocol,我们定义了 DubboProtocolBuilder,包含了 5 个方法分别设置 Protocol 的 protocol、generic、url、registryProtocol、registryAddress 5 个属性。

objectDubboProtocolBuilderBase{defprotocol(protocol:String) =DubboProtocolBuilderGenericStep(protocol)}caseclassDubboProtocolBuilderGenericStep(protocol:String){defgeneric(generic:String) =DubboProtocolBuilderUrlStep(protocol, generic)}caseclassDubboProtocolBuilderUrlStep(protocol:String, generic:String){defurl(url:String) =DubboProtocolBuilderRegistryProtocolStep(protocol, generic, url)}caseclassDubboProtocolBuilderRegistryProtocolStep(protocol:String, generic:String, url:String){defregistryProtocol(registryProtocol:String) =DubboProtocolBuilderRegistryAddressStep(protocol, generic, url, registryProtocol)}caseclassDubboProtocolBuilderRegistryAddressStep(protocol:String, generic:String, url:String, registryProtocol:String){defregistryAddress(registryAddress:String) =DubboProtocolBuilder(protocol, generic, url, registryProtocol, registryAddress)}caseclassDubboProtocolBuilder(protocol:String, generic:String, url:String, registryProtocol:String, registryAddress:String){defbuild=DubboProtocol(? ? protocol = protocol,? ? generic = generic,? ? url = url,? ? registryProtocol = registryProtocol,? ? registryAddress = registryAddress? )}

Action

DubboAction 包含了 Duboo 请求逻辑、请求结果校验逻辑以及压力控制逻辑,需要扩展 ExitableAction 并实现 execute 方法。

DubboAction 类的域 argTypes、argValues 分别是泛化调用请求参数类型和请求参数值,需为 Expression[] 类型,这样当使用数据 Feeder 作为压测脚本参数输入时,可以使用类似${args_types}、${args_values}这样的表达式从数据 Feeder 中解析对应字段的值。

execute 方法必须以异步方式执行 Dubbo 请求,这样前一个 Dubbo 请求执行后但还未等响应返回时虚拟用户就可以通过 AKKA Message 立即发起下一个请求,如此一个虚拟用户可以在很短的时间内构造大量请求。请求方式方面,相比于泛化调用,原生 API 调用需要客户端载入 Dubbo 服务相应的 API 包,但有时候却拿不到,此外,当被测 Dubbo 应用多了,客户端需要载入多个 API 包,所以出于使用上的便利性,Dubbo 压测插件使用泛化调用发起请求。

异步请求响应后会执行 onComplete 方法,校验请求结果,并根据校验结果记录请求成功或失败日志,压测报告就是使用这些日志统计计算的。

为了控制压测时的 RPS,则需要实现 throttle 逻辑。实践中发现,高并发情况下,泛化调用性能远不如原生 API 调用性能,且响应时间成倍增长(如此不能表征 Dubbo 应用的真正性能),导致 Dubbo 压测插件压力控制不准,解决办法是优化泛化调用性能,使之与原生 API 调用的性能相近

classDubboAction(interface:String,? ? method:String,? ? argTypes:Expression[Array[String]],? ? argValues:Expression[Array[Object]],? ? genericService:GenericService,? ? checks:List[DubboCheck],? ? coreComponents:CoreComponents,? ? throttled:Boolean,? ? val objectMapper:ObjectMapper,? ? val next:Action)extendsExitableActionwithNameGen{overridedefstatsEngine:StatsEngine= coreComponents.statsEngineoverridedefname:String= genName("dubboRequest")overridedefexecute(session:Session):Unit= recover(session) {? ? argTypes(session) flatMap { argTypesArray =>? ? ? argValues(session) map { argValuesArray =>valstartTime =System.currentTimeMillis()valf =Future{try{? ? ? ? ? ? genericService.$invoke(method, argTypes(session).get, argValues(session).get)? ? ? ? ? }finally{? ? ? ? ? }? ? ? ? }? ? ? ? f.onComplete {caseSuccess(result) =>valendTime =System.currentTimeMillis()valresultMap = result.asInstanceOf[JMap[String,Any]]valresultJson = objectMapper.writeValueAsString(resultMap)val(newSession, error) =Check.check(resultJson, session, checks)? ? ? ? ? ? errormatch{caseNone=>? ? ? ? ? ? ? ? statsEngine.logResponse(session, interface +"."+ method,ResponseTimings(startTime, endTime),Status("OK"),None,None)? ? ? ? ? ? ? ? throttle(newSession(session))caseSome(Failure(errorMessage)) =>? ? ? ? ? ? ? ? statsEngine.logResponse(session, interface +"."+ method,ResponseTimings(startTime, endTime),Status("KO"),None,Some(errorMessage))? ? ? ? ? ? ? ? throttle(newSession(session).markAsFailed)? ? ? ? ? ? }caseFuFailure(e) =>valendTime =System.currentTimeMillis()? ? ? ? ? ? statsEngine.logResponse(session, interface +"."+ method,ResponseTimings(startTime, endTime),Status("KO"),None,Some(e.getMessage))? ? ? ? ? ? throttle(session.markAsFailed)? ? ? ? }? ? ? }? ? }? }privatedefthrottle(s:Session):Unit= {if(throttled) {? ? ? coreComponents.throttler.throttle(s.scenario, () => next ! s)? ? }else{? ? ? next ! s? ? }? }}

DubboActionBuilder 则是获取 Protocol 属性并初始化 Dubbo 客户端:

caseclassDubboActionBuilder(interface:String, method:String, argTypes:Expression[Array[String]], argValues:Expression[Array[Object]], checks:List[DubboCheck])extendsActionBuilder{privatedefcomponents(protocolComponentsRegistry:ProtocolComponentsRegistry):DubboComponents=? ? protocolComponentsRegistry.components(DubboProtocol.DubboProtocolKey)overridedefbuild(ctx:ScenarioContext, next:Action):Action= {importctx._valprotocol = components(protocolComponentsRegistry).dubboProtocol//Dubbo客户端配置valreference =newReferenceConfig[GenericService]valapplication =newApplicationConfigapplication.setName("gatling-dubbo")? ? reference.setApplication(application)? ? reference.setProtocol(protocol.protocol)? ? reference.setGeneric(protocol.generic)if(protocol.url =="") {valregistry =newRegistryConfigregistry.setProtocol(protocol.registryProtocol)? ? ? registry.setAddress(protocol.registryAddress)? ? ? reference.setRegistry(registry)? ? }else{? ? ? reference.setUrl(protocol.url)? ? }? ? reference.setInterface(interface)valcache =ReferenceConfigCache.getCachevalgenericService = cache.get(reference)valobjectMapper:ObjectMapper=newObjectMapper()newDubboAction(interface, method, argTypes, argValues, genericService, checks, coreComponents, throttled, objectMapper, next)? }}

LambdaProcessBuilder 则提供了设置 Dubbo 泛化调用入参的 DSL 以及接下来要介绍的 Check 部分的 DSL

caseclassDubboProcessBuilder(interface:String, method:String, argTypes:Expression[Array[String]] = _ =>Success(Array.empty[String]),argValues:Expression[Array[Object]] = _ =>Success(Array.empty[Object]), checks:List[DubboCheck] =Nil)extendsDubboCheckSupport{defargTypes(argTypes:Expression[Array[String]]):DubboProcessBuilder= copy(argTypes = argTypes)defargValues(argValues:Expression[Array[Object]]):DubboProcessBuilder= copy(argValues = argValues)defcheck(dubboChecks:DubboCheck*):DubboProcessBuilder= copy(checks = checks ::: dubboChecks.toList)defbuild():ActionBuilder=DubboActionBuilder(interface, method, argTypes, argValues, checks)}

Check

全链路压测中,我们都使用Json Path校验 HTTP 请求结果,Dubbo 压测插件中,我们也实现了基于Json Path的校验。实现 Check,必须实现 Gatling check 中的 Extender 和 Preparer:

packageobjectdubbo{typeDubboCheck=Check[String]valDubboStringExtender:Extender[DubboCheck,String] =? ? (check:DubboCheck) => checkvalDubboStringPreparer:Preparer[String,String] =? ? (result:String) =>Success(result)}

基于Json Path的校验逻辑:

traitDubboJsonPathOfType{? self:DubboJsonPathCheckBuilder[String] =>defofType[X:JsonFilter](implicitextractorFactory:JsonPathExtractorFactory) =newDubboJsonPathCheckBuilder[X](path, jsonParsers)}objectDubboJsonPathCheckBuilder{valCharsParsingThreshold=200*1000defpreparer(jsonParsers:JsonParsers):Preparer[String,Any] =? ? response => {if(response.length() >CharsParsingThreshold|| jsonParsers.preferJackson)? ? ? ? jsonParsers.safeParseJackson(response)elsejsonParsers.safeParseBoon(response)? ? }defjsonPath(path:Expression[String])(implicitextractorFactory:JsonPathExtractorFactory, jsonParsers:JsonParsers) =newDubboJsonPathCheckBuilder[String](path, jsonParsers)withDubboJsonPathOfType}classDubboJsonPathCheckBuilder[X:JsonFilter](private[check] val path:Expression[String],? ? private[check] val jsonParsers:JsonParsers)(implicit extractorFactory:JsonPathExtractorFactory)extendsDefaultMultipleFindCheckBuilder[DubboCheck,String,Any,X](DubboStringExtender,DubboJsonPathCheckBuilder.preparer(jsonParsers)? ) {importextractorFactory._deffindExtractor(occurrence:Int) = path.map(newSingleExtractor[X](_, occurrence))deffindAllExtractor= path.map(newMultipleExtractor[X])defcountExtractor= path.map(newCountExtractor)}

DubboCheckSupport 则提供了设置 jsonPath 表达式的 DSL

traitDubboCheckSupport{defjsonPath(path:Expression[String])(implicitextractorFactory:JsonPathExtractorFactory, jsonParsers:JsonParsers) =DubboJsonPathCheckBuilder.jsonPath(path)}

Dubbo 压测脚本中可以设置一个或多个 check 校验请求结果,使用 DSL check 方法*

DSL

traitAwsDsl提供顶层 DSL。我们还定义了 dubboProtocolBuilder2DubboProtocol、dubboProcessBuilder2ActionBuilder 两个 Scala 隐式方法,以自动构造 DubboProtocol 和 ActionBuilder。

此外,泛化调用中使用的参数类型为 Java 类型,而我们的压测脚本使用 Scala 编写,所以这里需要做两种语言间的类型转换,所以我们定义了 transformJsonDubboData 方法

traitDubboDslextendsDubboCheckSupport{valDubbo=DubboProtocolBuilderBasedefdubbo(interface:String, method:String) =DubboProcessBuilder(interface, method)implicitdefdubboProtocolBuilder2DubboProtocol(builder:DubboProtocolBuilder):DubboProtocol= builder.buildimplicitdefdubboProcessBuilder2ActionBuilder(builder:DubboProcessBuilder):ActionBuilder= builder.build()deftransformJsonDubboData(argTypeName:String, argValueName:String, session:Session):Session= {? ? session.set(argTypeName, toArray(session(argTypeName).as[JList[String]]))? ? ? .set(argValueName, toArray(session(argValueName).as[JList[Any]]))? }privatedeftoArray[T:ClassTag](value:JList[T]):Array[T] = {? ? value.asScala.toArray? }}

objectPredefextendsDubboDsl

Dubbo 压测脚本和数据 Feeder 示例

压测脚本示例:

importio.gatling.core.Predef._importio.gatling.dubbo.Predef._importscala.concurrent.duration._classDubboTestextendsSimulation{valdubboConfig =Dubbo.protocol("dubbo")? ? .generic("true")//直连某台Dubbo机器,只单独压测一台机器的水位.url("dubbo://IP地址:端口")//或设置注册中心,压测该Dubbo应用集群的水位,支持ETCD3注册中心.registryProtocol("")? ? .registryAddress("")valjsonFileFeeder = jsonFile("data.json").circular//数据FeedervaldubboScenario = scenario("load test dubbo")? ? .forever("repeated") {? ? ? feed(jsonFileFeeder)? ? ? ? .exec(session => transformJsonDubboData("args_types1","args_values1", session))? ? ? ? .exec(dubbo("com.xxx.xxxService","methodName")? ? ? ? ? .argTypes("${args_types1}")? ? ? ? ? .argValues("${args_values1}")? ? ? ? ? .check(jsonPath("$.code").is("200"))? ? ? ? )? ? }? setUp(? ? dubboScenario.inject(atOnceUsers(10))? ? ? .throttle(? ? ? ? reachRps(10) in (1seconds),? ? ? ? holdFor(30seconds))? ).protocols(dubboConfig)}

data.json 示例:

[? {"args_types1": ["com.xxx.xxxDTO"],"args_values1": [{"field1":"111","field2":"222","field3":"333"}]? }]

Dubbo 压测报告示例

在此我向大家推荐一个架构学习交流群。交流学习群号:938837867 暗号:555 里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化、分布式架构等这些成为架构师必备

?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,100评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,308评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,718评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,275评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,376评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,454评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,464评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,248评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,686评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,974评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,150评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,817评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,484评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,140评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,374评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,012评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,041评论 2 351

推荐阅读更多精彩内容