0


玩转 jmeter backend listener kafka

说到JMeter后端监听器,大家接触比较多的是Influxdb监听器InfluxdbBackendListenerClient,可以将测试报告实时推送到Influxdb,然后用Grafana展示。但是这种方式在大并发情况下,会因为吞吐量过大,Influxdb本身的性能瓶颈,无法支撑(Influxdb崩溃是常有的事),所以使用Kafka监听器就很有必要了,Kafka作为消息队列中间件,可以起到缓冲器的作用。

本篇文章包括如下五个部分内容:

1、Kafka后端监听器原理介绍

参考 JMeter中的后端监听器_HenryXiao8080的博客

2、下载和使用

提供监听器源码路径

3、通过后端监听器收集测试结果

介始如何通过telegraf收集kafka结果数据

4、通过Grafana进行结果展示

介绍如何通过修改telegraf的配置,获取支持grafana展现的数据

5、Kafka监听器插件扩展开发

介绍如何通过二次开发,扩展监听不同的性能指标

6、有关influxdb2.x的应用介绍

通过kafka监听器可以向influxdb2推送数据

一、Kafka后端监听器原理介绍

jmeter-backend-listener-kafka其实就是通过继承JMeter的AbstractBackendListenerClient来将异步获取到的测试结果集SampleResult进行相应处理(与JMeter原生自带的influxdb、graphite后端监听器原理一样),然后将元数据上报至kakfa,这样你就可以通过消费kafka Topic异步来接收测试结果集:

通过实现的handleSampleResults方法来处理数据并上报至kafka:

二、下载和使用Kafka后端监听器

我们可以从两个地方下载源码或releases的jar包,如下:

原始开源路径:https://github.com/rahulsinghai/jmeter-backend-listener-kafka

Metersphere路径:https://github.com/metersphere/jmeter-backend-listener-kafka

把jar包jmeter.backendlistener.kafka-1.0.4.jar放到Jmeter的lib\ext后重启JMeter即可支持:

为了快速部署验证环境,我这次用Docker装了Kafka和Zookeeper集群,装了influxdb和Grafana,其中Kafka路径和端口如上图所标示,测试前请确保Kafka服务和端口是连通的。

三、通过后端监听器收集测试结果

我们按上图配置好后端监听器,并执行JMeter测试,然后用Offset Explorer连接kafka可以查看到我们监听器收集到的报告数据:

由于存储的是编码后的Key-value格式,我们可以用Telegraf消费消息,往influxdb存储消息,来看收到的是什么消息(当然,你也可以采用别的方式)。

Telegraf的配置如下:

首先配置Output(主要是influxdb的url和database):

  1. ###############################################################################
  2. # OUTPUT PLUGINS #
  3. ###############################################################################
  4. # Configuration for sending metrics to InfluxDB
  5. [[outputs.influxdb]]
  6. ## The full HTTP or UDP URL for your InfluxDB instance.
  7. ##
  8. ## Multiple URLs can be specified for a single cluster, only ONE of the
  9. ## urls will be written to each interval.
  10. # urls = ["unix:///var/run/influxdb.sock"]
  11. # urls = ["udp://127.0.0.1:8089"]
  12. urls = ["http://172.17.2.130:8086"]
  13. ## The target database for metrics; will be created as needed.
  14. ## For UDP url endpoint database needs to be configured on server side.
  15. database = "kafka"

然后配置Input(为了方便查看只配置kafka,把默认其他的CPU、disk等注释掉,以免干扰):

  1. # # Read metrics from Kafka topic(s)
  2. [[inputs.kafka_consumer]]
  3. # ## kafka servers
  4. brokers = ["172.17.2.43:9092"]
  5. # ## topic(s) to consume
  6. topics = ["JMETER_METRICS"]
  7. # ## Add topic as tag if topic_tag is not empty
  8. topic_tag = "JMETER_METRICS"
  9. # ## the name of the consumer group
  10. consumer_group = "telegraf_metrics_consumers"
  11. # ## Offset (must be either "oldest" or "newest")
  12. offset = "oldest"
  13. # ## Data format to consume.
  14. # ## Each data format has its own unique set of configuration options, read
  15. # ## more about them here:
  16. # ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  17. data_format = "value"
  18. data_type = "string"

启动telegraf,看同步数据的日志是否正常:

  1. 2022-08-04T00:29:48Z I! Starting Telegraf 1.10.2
  2. 2022-08-04T00:29:48Z I! Loaded inputs: kafka_consumer
  3. 2022-08-04T00:29:48Z I! Loaded aggregators:
  4. 2022-08-04T00:29:48Z I! Loaded processors:
  5. 2022-08-04T00:29:48Z I! Loaded outputs: influxdb
  6. 2022-08-04T00:29:48Z I! Tags enabled: host=172.17.2.43
  7. 2022-08-04T00:29:48Z I! [agent] Config: Interval:10s, Quiet:false, Hostname:"172.17.2.43", Flush Interval:10s
  8. 2022-08-04T00:29:48Z I! Started the kafka consumer service, brokers: [172.17.2.43:9092], topics: [JMETER_METRICS]

同步正常,我们就用InfluxDB Studio连接influxdb查看收集到的数据:

查看value值,可以看到收集到的测试结果内容,value值如下:

  1. {
  2. \"ContentType\":\"text/html; charset\\u003dUTF-8\",
  3. \"IdleTime\":0,
  4. \"ElapsedTime\":\"2022-08-04T00:00:01.000+0800\",
  5. \"ErrorCount\":0,
  6. \"Timestamp\":\"2022-08-04T10:01:22.259+0800\",
  7. \"URL\":\"https://mp.weixin.qq.com/s/dWBD8ZNYnzuao5ca3gMi3Q\",
  8. \"SampleStartTime\":\"2022-08-04T10:01:22.259+0800\",
  9. \"Success\":true,
  10. \"Bytes\":64438,
  11. \"SentBytes\":689,
  12. \"AllThreads\":1,
  13. \"TestElement.name\":\"Thread-11\",
  14. \"DataType\":\"text\",
  15. \"ResponseTime\":396,
  16. \"SampleCount\":1,
  17. \"FailureMessage\":\"\",
  18. \"ConnectTime\":176,
  19. \"ResponseCode\":\"200\",
  20. \"TestStartTime\":1659578481614,
  21. \"AssertionResults\":[
  22. ],
  23. \"Latency\":342,
  24. \"InjectorHostname\":\"ZGH-PC\",
  25. \"GrpThreads\":1,
  26. \"SampleEndTime\":\"2022-08-04T10:01:22.655+0800\",
  27. \"BodySize\":61665,
  28. \"ThreadName\":\"threadGroup 1-1\",
  29. \"SampleLabel\":\"chrome-192.168.1.246\"
  30. }

其实我们看kafka监听器的源码/jmeter/backendlistener/model/MetricsRow.java,也能知道收集的测试结果数据格式:

  1. public Map<String, Object> getRowAsMap(BackendListenerContext context, String servicePrefixName)
  2. throws UnknownHostException {
  3. SimpleDateFormat sdf = new SimpleDateFormat(this.kafkaTimestamp);
  4. // add all the default SampleResult parameters
  5. addFilteredMetricToMetricsMap("AllThreads", this.sampleResult.getAllThreads());
  6. addFilteredMetricToMetricsMap("BodySize", this.sampleResult.getBodySizeAsLong());
  7. addFilteredMetricToMetricsMap("Bytes", this.sampleResult.getBytesAsLong());
  8. addFilteredMetricToMetricsMap("SentBytes", this.sampleResult.getSentBytes());
  9. addFilteredMetricToMetricsMap("ConnectTime", this.sampleResult.getConnectTime());
  10. addFilteredMetricToMetricsMap("ContentType", this.sampleResult.getContentType());
  11. addFilteredMetricToMetricsMap("DataType", this.sampleResult.getDataType());
  12. addFilteredMetricToMetricsMap("ErrorCount", this.sampleResult.getErrorCount());
  13. addFilteredMetricToMetricsMap("GrpThreads", this.sampleResult.getGroupThreads());
  14. addFilteredMetricToMetricsMap("IdleTime", this.sampleResult.getIdleTime());
  15. addFilteredMetricToMetricsMap("Latency", this.sampleResult.getLatency());
  16. addFilteredMetricToMetricsMap("ResponseTime", this.sampleResult.getTime());
  17. addFilteredMetricToMetricsMap("SampleCount", this.sampleResult.getSampleCount());
  18. addFilteredMetricToMetricsMap("SampleLabel", this.sampleResult.getSampleLabel());
  19. addFilteredMetricToMetricsMap("ThreadName", this.sampleResult.getThreadName());
  20. addFilteredMetricToMetricsMap("URL", this.sampleResult.getURL());
  21. addFilteredMetricToMetricsMap("ResponseCode", this.sampleResult.getResponseCode());
  22. addFilteredMetricToMetricsMap("TestStartTime", JMeterContextService.getTestStartTime());
  23. addFilteredMetricToMetricsMap(
  24. "SampleStartTime", sdf.format(new Date(this.sampleResult.getStartTime())));
  25. addFilteredMetricToMetricsMap(
  26. "SampleEndTime", sdf.format(new Date(this.sampleResult.getEndTime())));
  27. addFilteredMetricToMetricsMap(
  28. "Timestamp", sdf.format(new Date(this.sampleResult.getTimeStamp())));
  29. addFilteredMetricToMetricsMap("InjectorHostname", InetAddress.getLocalHost().getHostName());
  30. // Add the details according to the mode that is set
  31. switch (this.kafkaTestMode) {
  32. case "debug":
  33. case "error":
  34. addDetails();
  35. break;
  36. case "info":
  37. if (!this.sampleResult.isSuccessful()) {
  38. addDetails();
  39. }
  40. break;
  41. default:
  42. break;
  43. }
  44. addAssertions();
  45. addElapsedTime(sdf);
  46. addCustomFields(context, servicePrefixName);
  47. parseHeadersAsJsonProps(this.allReqHeaders, this.allResHeaders);
  48. return this.metricsMap;
  49. }

大家发现这些内容,只要经过计算就可以生成JMeter测试报告,有线程数,有响应时间,有Sample名称数量和成功标识、Bytes等指标。但是缺少TPS,90%响应时间等指标,这些指标可以参考Influxdb监听器自己进行扩展和重计数。

四、通过Grafana进行结果展示

以上的Key Value格式是不利于在Grafana中展现的,我们可以在Telegraf中改变传输格式为json:

  1. # # Read metrics from Kafka topic(s)
  2. [[inputs.kafka_consumer]]
  3. # ## kafka servers
  4. brokers = ["172.17.2.43:9092"]
  5. # ## topic(s) to consume 可以添加多个测试项目的topic
  6. topics = ["JMETER_METRICS"]
  7. # ## Add topic as tag if topic_tag is not empty
  8. topic_tag = "JMETER_METRICS"
  9. # ## the name of the consumer group
  10. consumer_group = "telegraf_metrics_consumers"
  11. # ## Offset (must be either "oldest" or "newest")
  12. offset = "oldest"
  13. # ## Data format to consume.
  14. # ## Each data format has its own unique set of configuration options, read
  15. # ## more about them here:
  16. # ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  17. data_format = "json"

再次测试,这时候发现能够在influxdb中按不同字段显示指标值了:

但是展示的字段不全,到telegraf官网查看配置说明,发现可以添加显示字段:

修改telegraf.conf配置,在data_format配置下添加缺少的字段,同时把SampleLabel添加为tag Key(也可以按需要添加多个):

  1. # ## Data format to consume.
  2. # ## Each data format has its own unique set of configuration options, read
  3. # ## more about them here:
  4. # ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  5. data_format = "json"
  6. tag_keys = ["SampleLabel"]
  7. json_string_fields=["Success", "ThreadName", "SampleLabel", "Timestamp", "URL", "FailureMessage", "ResponseCode", "AssertionResults", "InjectorHostname", "SampleStartTime", "SampleEndTime"]

再次测试,查看inluxdb中同步的结果数据,发现能看到更多的字段了:

有了这些字段,我们就可以在grafana中配置展示:

五、Kafka监听器插件扩展开发

通过上面的演示,我们发现不能像influxdb后端监听器那样收集到【点击率hits】等常规的性能指标,那么我们可以参考Influxdb后端监听器的源码对Kafka监听器进行改造,同过由上面提到的监听器原理可以知道,监听器是通过handleSampleResults方法来处理数据并上报至kafka或influxdb,那么我们就从这个函数着手,这个函数所属于KafkaBackendClient.java类文件中:

  1. io.github.rahulsinghai.jmeter.backendlistener.kafka.KafkaBackendClient

我们将handleSampleResults方法修改如下(注释为add的内容):

  1. @Override
  2. public void handleSampleResults(List<SampleResult> results, BackendListenerContext context) {
  3. for (SampleResult sr : results) {
  4. String sampleLabel = sr.getSampleLabel(); // add部分
  5. SamplerMetric samplerMetric = getSamplerMetric(sampleLabel); // add部分
  6. /* jmeter 5.1.1之后版本,SamplerMetric支持addCumulated
  7. Pattern samplersToFilter;
  8. if (samplersToFilter.matcher(sampleLabel).find()) {
  9. samplerMetric.add(sr);
  10. }
  11. samplerMetric = getSamplerMetric("all");
  12. samplerMetric.addCumulated(sr);
  13. */
  14. samplerMetric.add(sr); // add部分
  15. MetricsRow row =
  16. new MetricsRow(
  17. sr,
  18. context.getParameter(KAFKA_TEST_MODE),
  19. context.getParameter(KAFKA_TIMESTAMP),
  20. this.buildNumber,
  21. context.getBooleanParameter(KAFKA_PARSE_REQ_HEADERS, false),
  22. context.getBooleanParameter(KAFKA_PARSE_RES_HEADERS, false),
  23. fields,
  24. samplerMetric); // add参数samplerMetric
  25. if (validateSample(context, sr)) {
  26. try {
  27. // Prefix to skip from adding service specific parameters to the metrics row
  28. String servicePrefixName = "kafka.";
  29. this.publisher.addToList(new Gson().toJson(row.getRowAsMap(context, servicePrefixName)));
  30. } catch (Exception e) {
  31. logger.error(
  32. "The Kafka Backend Listener was unable to add sampler to the list of samplers to send... More info in JMeter's console.");
  33. e.printStackTrace();
  34. }
  35. }
  36. }

我们在这个方法中增加了SamplerMetric的调用(上面标示的add 部分),关于SamplerMetric类中我们可以看到有我们需要的指标计算,可以get到我们所要的指标,如下:

  1. public int getTotal() {
  2. return successes+failures;
  3. }
  4. public int getSuccesses() {
  5. return successes;
  6. }
  7. public int getFailures() {
  8. return failures;
  9. }
  10. public double getOkMaxTime() {
  11. return okResponsesStats.getMax();
  12. }
  13. public double getOkMinTime() {
  14. return okResponsesStats.getMin();
  15. }
  16. public double getOkMean() {
  17. return okResponsesStats.getMean();
  18. }
  19. public double getOkPercentile(double percentile) {
  20. return okResponsesStats.getPercentile(percentile);
  21. }
  22. public double getKoMaxTime() {
  23. return koResponsesStats.getMax();
  24. }
  25. public double getKoMinTime() {
  26. return koResponsesStats.getMin();
  27. }
  28. public double getKoMean() {
  29. return koResponsesStats.getMean();
  30. }
  31. public double getKoPercentile(double percentile) {
  32. return koResponsesStats.getPercentile(percentile);
  33. }
  34. public double getAllMaxTime() {
  35. return allResponsesStats.getMax();
  36. }
  37. public double getAllMinTime() {
  38. return allResponsesStats.getMin();
  39. }
  40. public double getAllMean() {
  41. return allResponsesStats.getMean();
  42. }
  43. public double getAllPercentile(double percentile) {
  44. return pctResponseStats.getPercentile(percentile);
  45. }
  46. /**
  47. * Returns hits to server
  48. * @return the hits
  49. */
  50. public int getHits() {
  51. return hits;
  52. }
  53. public Map<ErrorMetric, Integer> getErrors() {
  54. return errors;
  55. }
  56. public long getSentBytes() {
  57. return sentBytes;
  58. }
  59. public long getReceivedBytes() {
  60. return receivedBytes;
  61. }

由于我们在MetricsRow方法调用时加了samplerMetric参数,所以需要改一下MetricsRow类的构造函数(add参数):

  1. public MetricsRow(
  2. SampleResult sr,
  3. String testMode,
  4. String timeStamp,
  5. int buildNumber,
  6. boolean parseReqHeaders,
  7. boolean parseResHeaders,
  8. Set<String> fields,
  9. SamplerMetric samplerMetric) { // add参数 samplerMetric
  10. this.sampleResult = sr;
  11. this.kafkaTestMode = testMode.trim();
  12. this.kafkaTimestamp = timeStamp.trim();
  13. this.ciBuildNumber = buildNumber;
  14. this.metricsMap = new HashMap<>();
  15. this.allReqHeaders = parseReqHeaders;
  16. this.allResHeaders = parseResHeaders;
  17. this.fields = fields;
  18. this.samplerMetric = samplerMetric;
  19. }

然后我们在MetricsRow的getRowAsMap函数中就可以添加SamplerMetric类提供的指标,以下只具例了其中三个指标:

  1. addFilteredMetricToMetricsMap(
  2. "Hits", this.samplerMetric.getHits());
  3. addFilteredMetricToMetricsMap(
  4. "TotalRequest", this.samplerMetric.getTotal());
  5. addFilteredMetricToMetricsMap(
  6. "AllMaxTime", this.samplerMetric.getAllMaxTime());

重新构建 jmeter-backend-listener-kafka 的源代码,生成jar包,替换Jmeter原来的jar包,重新测试,这回我们就可以看到数据库中收集到指标就有Hits了:

这样添加指标的目的就达到了,如果还需要其他指标,也可以基于这个方式继续在MetricsRow中的getRowAsMap函数中添加各类指标,以上过程其实不难理解,只要懂点Java的并在理解了监听器原理后,参照influxdb监听器的源代码我们就轻松完成Kafka监听器的改造,如果对性能指标的计算原理了解的话,还可以扩展个性化的性能指标计算。

当然,我们完全可以不用去改造jmeter-backend-listener-kafka,只要在外部加个处理程序,对收集到的基础sampler指标值进行重计算,就像JMeter的html报告生成那样,通过计算也能得到想要的性能测试报告。另外还可以像Metersphere那样,加个 data-streaming 读取kafka数据,并重计算后发给mysql保存,最后从mysql读取测试结果数据进行报告展现(其中data-streaming对测试结果数据的计算处理应该也是借鉴了JMeter原生代码)

六、有关influxdb2.x应用介绍

由于influxdb已经推出2.x版本,以上都是基于1.x版本,下一篇文章会提到influxdb2监听器的使用《JMeter关于influxDB 2.x 后端监听器使用》,对于Kafka监听器来说,通过telegraf也可以支持influxdb2的数据格式传输,目前telegraf已经支持influxdb2的数据写入:

  1. [[outputs.influxdb_v2]]
  2. urls = ["http://localhost:8086"]
  3. token = "$INFLUX_TOKEN"
  4. organization = "example-org"
  5. bucket = "example-bucket"

参考influxdb的官方文档Manually configure Telegraf for InfluxDB v2.0 | InfluxDB OSS 2.0 Documentation

传给influxdb2的数据在influxdb界面上也可以查询的到:

通过InfluxDB 2.x的flux语法可以展示Hits图:

  1. from(bucket: "kafka")
  2. |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  3. |> filter(fn: (r) => r["_measurement"] == "kafka_consumer")
  4. |> filter(fn: (r) => r["JMETER_METRICS"] == "JMETER_METRICS")
  5. |> filter(fn: (r) => r["_field"] == "Hits")
  6. |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  7. |> yield(name: "mean")

目前主要是Grafana官网上提供Jmeter的influxdb2格式模板比较少,希望以后能多一些,因为基于influxdb2.x的Grafana展示效果会比influxdb1.x要好。

标签: java kafka JMeter

本文转载自: https://blog.csdn.net/smooth00/article/details/126153100
版权归原作者 smooth-z 所有, 如有侵权,请联系我们删除。

“玩转 jmeter backend listener kafka”的评论:

还没有评论