Dubbo 压测插件已开源,本文涉及代码详见gatling-dubbo
Gatling 是一个开源的基于 Scala、Akka、Netty 实现的高性能压测框架,较之其他基于线程实现的压测框架,Gatling 基于 AKKA Actor 模型实现,请求由事件驱动,在系统资源消耗上低于其他压测框架(如内存、连接池等),使得单台施压机可以模拟更多的用户。此外,Gatling 提供了一套简单高效的 DSL(领域特定语言)方便我们编排业务场景,同时也具备流量控制、压力控制的能力并提供了良好的压测报告,所以有赞选择在 Gatling 基础上扩展分布式能力,开发了自己的全链路压测引擎 MAXIM。全链路压测中我们主要模拟用户实际使用场景,使用 HTTP 接口作为压测入口,但有赞目前后端服务中 Dubbo 应用比重越来越高,如果可以知道 Dubbo 应用单机水位将对我们把控系统后端服务能力大有裨益?;?Gatling 的优势和在有赞的使用基础,我们扩展 Gatling 开发了 gatling-dubbo 压测插件。
插件主要结构
实现 Dubbo 压测插件,需实现以下四部分内容:
Protocol 和 ProtocolBuild
协议部分,这里主要定义 Dubbo 客户端相关内容,如协议、泛化调用、服务 URL、注册中心等内容,ProtocolBuild 则为 DSL 使用 Protocol 的辅助类
Action 和 ActionBuild
执行部分,这里的作用是发起 Dubbo 请求,校验请求结果并记录日志以便后续生成压测报告。ActionBuild 则为 DSL 使用 Action 的辅助类
Check 和 CheckBuild
检查部分,全链路压测中我们都使用Json Path检查请求结果,这里我们实现了一样的检查逻辑。CheckBuild 则为 DSL 使用 Check 的辅助类
DSL
Dubbo 插件的领域特定语言,我们提供了一套简单易用的 API 方便编写 Duboo 压测脚本,风格上与原生 HTTP DSL 保持一致
Protocol
协议部分由 5 个属性组成,这些属性将在 Action 初始化 Dubbo 客户端时使用,分别是:
protocol
协议,设置为dubbo
generic
泛化调用设置,Dubbo 压测插件使用泛化调用发起请求,所以这里设置为true,有赞优化了泛化调用的性能,为了使用该特性,引入了一个新值result_no_change(去掉优化前泛化调用的序列化开销以提升性能)
url
Dubbo 服务的地址:dubbo://IP地址:端口
registryProtocol
Dubbo 注册中心的协议,设置为ETCD3
registryAddress
Dubbo 注册中心的地址
如果是测试 Dubbo 单机水位,则设置 url,注册中心设置为空;如果是测试 Dubbo 集群水位,则设置注册中心(目前支持 ETCD3),url 设置为空。由于目前注册中心只支持 ETCD3,插件在 Dubbo 集群上使用缺乏灵活性,所以我们又实现了客户端层面的负载均衡,如此便可抛开特定的注册中心来测试 Dubbo 集群水位。该特性目前正在内测中。
objectDubboProtocol{valDubboProtocolKey=newProtocolKey{typeProtocol=DubboProtocoltypeComponents=DubboComponentsdefprotocolClass:Class[io.gatling.core.protocol.Protocol] = classOf[DubboProtocol].asInstanceOf[Class[io.gatling.core.protocol.Protocol]]defdefaultProtocolValue(configuration:GatlingConfiguration):DubboProtocol=thrownewIllegalStateException("Can't provide a default value for DubboProtocol")defnewComponents(system:ActorSystem, coreComponents:CoreComponents):DubboProtocol=>DubboComponents= {? ? ? dubboProtocol =>DubboComponents(dubboProtocol)? ? }? }}caseclassDubboProtocol(protocol:String, //dubbo? ? generic:String, //泛化调用?? ? url:String, //use url or? ? registryProtocol:String,? //use registry? ? registryAddress:String//use registry)extendsProtocol{typeComponents=DubboComponents}
为了方便 Action 中使用上面这些属性,我们将其装进了 Gatling 的 ProtocolComponents:
caseclassDubboComponents(dubboProtocol:DubboProtocol)extendsProtocolComponents{defonStart:Option[Session=>Session] =NonedefonExit:Option[Session=>Unit] =None}
以上就是关于 Protocol 的定义。为了能在 DSL 中配置上述 Protocol,我们定义了 DubboProtocolBuilder,包含了 5 个方法分别设置 Protocol 的 protocol、generic、url、registryProtocol、registryAddress 5 个属性。
objectDubboProtocolBuilderBase{defprotocol(protocol:String) =DubboProtocolBuilderGenericStep(protocol)}caseclassDubboProtocolBuilderGenericStep(protocol:String){defgeneric(generic:String) =DubboProtocolBuilderUrlStep(protocol, generic)}caseclassDubboProtocolBuilderUrlStep(protocol:String, generic:String){defurl(url:String) =DubboProtocolBuilderRegistryProtocolStep(protocol, generic, url)}caseclassDubboProtocolBuilderRegistryProtocolStep(protocol:String, generic:String, url:String){defregistryProtocol(registryProtocol:String) =DubboProtocolBuilderRegistryAddressStep(protocol, generic, url, registryProtocol)}caseclassDubboProtocolBuilderRegistryAddressStep(protocol:String, generic:String, url:String, registryProtocol:String){defregistryAddress(registryAddress:String) =DubboProtocolBuilder(protocol, generic, url, registryProtocol, registryAddress)}caseclassDubboProtocolBuilder(protocol:String, generic:String, url:String, registryProtocol:String, registryAddress:String){defbuild=DubboProtocol(? ? protocol = protocol,? ? generic = generic,? ? url = url,? ? registryProtocol = registryProtocol,? ? registryAddress = registryAddress? )}
Action
DubboAction 包含了 Duboo 请求逻辑、请求结果校验逻辑以及压力控制逻辑,需要扩展 ExitableAction 并实现 execute 方法。
DubboAction 类的域 argTypes、argValues 分别是泛化调用请求参数类型和请求参数值,需为 Expression[] 类型,这样当使用数据 Feeder 作为压测脚本参数输入时,可以使用类似${args_types}、${args_values}这样的表达式从数据 Feeder 中解析对应字段的值。
execute 方法必须以异步方式执行 Dubbo 请求,这样前一个 Dubbo 请求执行后但还未等响应返回时虚拟用户就可以通过 AKKA Message 立即发起下一个请求,如此一个虚拟用户可以在很短的时间内构造大量请求。请求方式方面,相比于泛化调用,原生 API 调用需要客户端载入 Dubbo 服务相应的 API 包,但有时候却拿不到,此外,当被测 Dubbo 应用多了,客户端需要载入多个 API 包,所以出于使用上的便利性,Dubbo 压测插件使用泛化调用发起请求。
异步请求响应后会执行 onComplete 方法,校验请求结果,并根据校验结果记录请求成功或失败日志,压测报告就是使用这些日志统计计算的。
为了控制压测时的 RPS,则需要实现 throttle 逻辑。实践中发现,高并发情况下,泛化调用性能远不如原生 API 调用性能,且响应时间成倍增长(如此不能表征 Dubbo 应用的真正性能),导致 Dubbo 压测插件压力控制不准,解决办法是优化泛化调用性能,使之与原生 API 调用的性能相近
classDubboAction(interface:String,? ? method:String,? ? argTypes:Expression[Array[String]],? ? argValues:Expression[Array[Object]],? ? genericService:GenericService,? ? checks:List[DubboCheck],? ? coreComponents:CoreComponents,? ? throttled:Boolean,? ? val objectMapper:ObjectMapper,? ? val next:Action)extendsExitableActionwithNameGen{overridedefstatsEngine:StatsEngine= coreComponents.statsEngineoverridedefname:String= genName("dubboRequest")overridedefexecute(session:Session):Unit= recover(session) {? ? argTypes(session) flatMap { argTypesArray =>? ? ? argValues(session) map { argValuesArray =>valstartTime =System.currentTimeMillis()valf =Future{try{? ? ? ? ? ? genericService.$invoke(method, argTypes(session).get, argValues(session).get)? ? ? ? ? }finally{? ? ? ? ? }? ? ? ? }? ? ? ? f.onComplete {caseSuccess(result) =>valendTime =System.currentTimeMillis()valresultMap = result.asInstanceOf[JMap[String,Any]]valresultJson = objectMapper.writeValueAsString(resultMap)val(newSession, error) =Check.check(resultJson, session, checks)? ? ? ? ? ? errormatch{caseNone=>? ? ? ? ? ? ? ? statsEngine.logResponse(session, interface +"."+ method,ResponseTimings(startTime, endTime),Status("OK"),None,None)? ? ? ? ? ? ? ? throttle(newSession(session))caseSome(Failure(errorMessage)) =>? ? ? ? ? ? ? ? statsEngine.logResponse(session, interface +"."+ method,ResponseTimings(startTime, endTime),Status("KO"),None,Some(errorMessage))? ? ? ? ? ? ? ? throttle(newSession(session).markAsFailed)? ? ? ? ? ? }caseFuFailure(e) =>valendTime =System.currentTimeMillis()? ? ? ? ? ? statsEngine.logResponse(session, interface +"."+ method,ResponseTimings(startTime, endTime),Status("KO"),None,Some(e.getMessage))? ? ? ? ? ? throttle(session.markAsFailed)? ? ? ? }? ? ? }? ? }? }privatedefthrottle(s:Session):Unit= {if(throttled) {? ? ? coreComponents.throttler.throttle(s.scenario, () => next ! s)? ? }else{? ? ? next ! s? ? }? }}
DubboActionBuilder 则是获取 Protocol 属性并初始化 Dubbo 客户端:
caseclassDubboActionBuilder(interface:String, method:String, argTypes:Expression[Array[String]], argValues:Expression[Array[Object]], checks:List[DubboCheck])extendsActionBuilder{privatedefcomponents(protocolComponentsRegistry:ProtocolComponentsRegistry):DubboComponents=? ? protocolComponentsRegistry.components(DubboProtocol.DubboProtocolKey)overridedefbuild(ctx:ScenarioContext, next:Action):Action= {importctx._valprotocol = components(protocolComponentsRegistry).dubboProtocol//Dubbo客户端配置valreference =newReferenceConfig[GenericService]valapplication =newApplicationConfigapplication.setName("gatling-dubbo")? ? reference.setApplication(application)? ? reference.setProtocol(protocol.protocol)? ? reference.setGeneric(protocol.generic)if(protocol.url =="") {valregistry =newRegistryConfigregistry.setProtocol(protocol.registryProtocol)? ? ? registry.setAddress(protocol.registryAddress)? ? ? reference.setRegistry(registry)? ? }else{? ? ? reference.setUrl(protocol.url)? ? }? ? reference.setInterface(interface)valcache =ReferenceConfigCache.getCachevalgenericService = cache.get(reference)valobjectMapper:ObjectMapper=newObjectMapper()newDubboAction(interface, method, argTypes, argValues, genericService, checks, coreComponents, throttled, objectMapper, next)? }}
LambdaProcessBuilder 则提供了设置 Dubbo 泛化调用入参的 DSL 以及接下来要介绍的 Check 部分的 DSL
caseclassDubboProcessBuilder(interface:String, method:String, argTypes:Expression[Array[String]] = _ =>Success(Array.empty[String]),argValues:Expression[Array[Object]] = _ =>Success(Array.empty[Object]), checks:List[DubboCheck] =Nil)extendsDubboCheckSupport{defargTypes(argTypes:Expression[Array[String]]):DubboProcessBuilder= copy(argTypes = argTypes)defargValues(argValues:Expression[Array[Object]]):DubboProcessBuilder= copy(argValues = argValues)defcheck(dubboChecks:DubboCheck*):DubboProcessBuilder= copy(checks = checks ::: dubboChecks.toList)defbuild():ActionBuilder=DubboActionBuilder(interface, method, argTypes, argValues, checks)}
Check
全链路压测中,我们都使用Json Path校验 HTTP 请求结果,Dubbo 压测插件中,我们也实现了基于Json Path的校验。实现 Check,必须实现 Gatling check 中的 Extender 和 Preparer:
packageobjectdubbo{typeDubboCheck=Check[String]valDubboStringExtender:Extender[DubboCheck,String] =? ? (check:DubboCheck) => checkvalDubboStringPreparer:Preparer[String,String] =? ? (result:String) =>Success(result)}
基于Json Path的校验逻辑:
traitDubboJsonPathOfType{? self:DubboJsonPathCheckBuilder[String] =>defofType[X:JsonFilter](implicitextractorFactory:JsonPathExtractorFactory) =newDubboJsonPathCheckBuilder[X](path, jsonParsers)}objectDubboJsonPathCheckBuilder{valCharsParsingThreshold=200*1000defpreparer(jsonParsers:JsonParsers):Preparer[String,Any] =? ? response => {if(response.length() >CharsParsingThreshold|| jsonParsers.preferJackson)? ? ? ? jsonParsers.safeParseJackson(response)elsejsonParsers.safeParseBoon(response)? ? }defjsonPath(path:Expression[String])(implicitextractorFactory:JsonPathExtractorFactory, jsonParsers:JsonParsers) =newDubboJsonPathCheckBuilder[String](path, jsonParsers)withDubboJsonPathOfType}classDubboJsonPathCheckBuilder[X:JsonFilter](private[check] val path:Expression[String],? ? private[check] val jsonParsers:JsonParsers)(implicit extractorFactory:JsonPathExtractorFactory)extendsDefaultMultipleFindCheckBuilder[DubboCheck,String,Any,X](DubboStringExtender,DubboJsonPathCheckBuilder.preparer(jsonParsers)? ) {importextractorFactory._deffindExtractor(occurrence:Int) = path.map(newSingleExtractor[X](_, occurrence))deffindAllExtractor= path.map(newMultipleExtractor[X])defcountExtractor= path.map(newCountExtractor)}
DubboCheckSupport 则提供了设置 jsonPath 表达式的 DSL
traitDubboCheckSupport{defjsonPath(path:Expression[String])(implicitextractorFactory:JsonPathExtractorFactory, jsonParsers:JsonParsers) =DubboJsonPathCheckBuilder.jsonPath(path)}
Dubbo 压测脚本中可以设置一个或多个 check 校验请求结果,使用 DSL check 方法*
DSL
traitAwsDsl提供顶层 DSL。我们还定义了 dubboProtocolBuilder2DubboProtocol、dubboProcessBuilder2ActionBuilder 两个 Scala 隐式方法,以自动构造 DubboProtocol 和 ActionBuilder。
此外,泛化调用中使用的参数类型为 Java 类型,而我们的压测脚本使用 Scala 编写,所以这里需要做两种语言间的类型转换,所以我们定义了 transformJsonDubboData 方法
traitDubboDslextendsDubboCheckSupport{valDubbo=DubboProtocolBuilderBasedefdubbo(interface:String, method:String) =DubboProcessBuilder(interface, method)implicitdefdubboProtocolBuilder2DubboProtocol(builder:DubboProtocolBuilder):DubboProtocol= builder.buildimplicitdefdubboProcessBuilder2ActionBuilder(builder:DubboProcessBuilder):ActionBuilder= builder.build()deftransformJsonDubboData(argTypeName:String, argValueName:String, session:Session):Session= {? ? session.set(argTypeName, toArray(session(argTypeName).as[JList[String]]))? ? ? .set(argValueName, toArray(session(argValueName).as[JList[Any]]))? }privatedeftoArray[T:ClassTag](value:JList[T]):Array[T] = {? ? value.asScala.toArray? }}
objectPredefextendsDubboDsl
Dubbo 压测脚本和数据 Feeder 示例
压测脚本示例:
importio.gatling.core.Predef._importio.gatling.dubbo.Predef._importscala.concurrent.duration._classDubboTestextendsSimulation{valdubboConfig =Dubbo.protocol("dubbo")? ? .generic("true")//直连某台Dubbo机器,只单独压测一台机器的水位.url("dubbo://IP地址:端口")//或设置注册中心,压测该Dubbo应用集群的水位,支持ETCD3注册中心.registryProtocol("")? ? .registryAddress("")valjsonFileFeeder = jsonFile("data.json").circular//数据FeedervaldubboScenario = scenario("load test dubbo")? ? .forever("repeated") {? ? ? feed(jsonFileFeeder)? ? ? ? .exec(session => transformJsonDubboData("args_types1","args_values1", session))? ? ? ? .exec(dubbo("com.xxx.xxxService","methodName")? ? ? ? ? .argTypes("${args_types1}")? ? ? ? ? .argValues("${args_values1}")? ? ? ? ? .check(jsonPath("$.code").is("200"))? ? ? ? )? ? }? setUp(? ? dubboScenario.inject(atOnceUsers(10))? ? ? .throttle(? ? ? ? reachRps(10) in (1seconds),? ? ? ? holdFor(30seconds))? ).protocols(dubboConfig)}
data.json 示例:
[? {"args_types1": ["com.xxx.xxxDTO"],"args_values1": [{"field1":"111","field2":"222","field3":"333"}]? }]
Dubbo 压测报告示例
在此我向大家推荐一个架构学习交流群。交流学习群号:938837867 暗号:555 里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化、分布式架构等这些成为架构师必备