Merge pull request #2 from httprunner/main

new pull request. 12-27
This commit is contained in:
xucong053
2021-12-27 15:30:28 +08:00
committed by GitHub
20 changed files with 287 additions and 335 deletions

View File

@@ -28,7 +28,7 @@ jobs:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v2 uses: actions/checkout@v2
- name: Run coverage - name: Run coverage
run: go test -coverprofile="cover.out" -covermode=atomic ./... # FIXME: -race run: go test -coverprofile="cover.out" -covermode=atomic -race ./...
- name: Upload coverage to Codecov - name: Upload coverage to Codecov
uses: codecov/codecov-action@v2 uses: codecov/codecov-action@v2
with: with:

115
README.md
View File

@@ -6,7 +6,7 @@
[![Go Report Card](https://goreportcard.com/badge/github.com/httprunner/hrp)](https://goreportcard.com/report/github.com/httprunner/hrp) [![Go Report Card](https://goreportcard.com/badge/github.com/httprunner/hrp)](https://goreportcard.com/report/github.com/httprunner/hrp)
[![FOSSA Status](https://app.fossa.com/api/projects/custom%2B27856%2Fgithub.com%2Fhttprunner%2Fhrp.svg?type=shield)](https://app.fossa.com/reports/c2742455-c8ab-4b13-8fd7-4a35ba0b2840) [![FOSSA Status](https://app.fossa.com/api/projects/custom%2B27856%2Fgithub.com%2Fhttprunner%2Fhrp.svg?type=shield)](https://app.fossa.com/reports/c2742455-c8ab-4b13-8fd7-4a35ba0b2840)
`hrp` is a golang implementation of [HttpRunner]. Ideally, hrp will be fully compatible with HttpRunner, including testcase format and usage. What's more, hrp will integrate Boomer natively to be a better load generator for [locust]. `hrp` aims to be a one-stop solution for HTTP(S) testing, covering API testing, load testing and digital experience monitoring (DEM).
## Key Features ## Key Features
@@ -34,7 +34,7 @@ Since installed, you will get a `hrp` command with multiple sub-commands.
```text ```text
$ hrp -h $ hrp -h
hrp (HttpRunner+) is one-stop solution for HTTP(S) testing. Enjoy! ✨ 🚀 ✨ hrp (HttpRunner+) aims to be a one-stop solution for HTTP(S) testing, covering API testing, load testing and digital experience monitoring (DEM). Enjoy! ✨ 🚀 ✨
License: Apache-2.0 License: Apache-2.0
Github: https://github.com/httprunner/hrp Github: https://github.com/httprunner/hrp
@@ -65,17 +65,20 @@ You can use `hrp run` command to run HttpRunner JSON/YAML testcases. The followi
<summary>$ hrp run examples/demo.json</summary> <summary>$ hrp run examples/demo.json</summary>
```text ```text
9:22PM INF Set log to color console other than JSON format. 5:21PM INF Set log to color console other than JSON format.
9:22PM INF Set log level to INFO 5:21PM ??? Set log level
9:22PM INF [init] SetDebug debug=true 5:21PM INF [init] SetDebug debug=true
9:22PM INF load json testcase path=/Users/debugtalk/MyProjects/HttpRunner-dev/hrp/examples/demo.json 5:21PM INF [init] SetFailfast failfast=true
9:22PM INF call function success arguments=[5] funcName=gen_random_string output=rWRNY 5:21PM INF [init] Reset session variables
9:22PM INF call function success arguments=[12.3,3.45] funcName=max output=12.3 5:21PM INF load json testcase path=/Users/debugtalk/MyProjects/HttpRunner-dev/hrp/examples/demo.json
9:22PM INF run testcase start testcase="demo with complex mechanisms" 5:21PM INF call function success arguments=[5] funcName=gen_random_string output=A65rg
9:22PM INF run step start step="get with params" 5:21PM INF call function success arguments=[12.3,3.45] funcName=max output=12.3
9:22PM INF call function success arguments=[12.3,34.5] funcName=max output=34.5 5:21PM INF run testcase start testcase="demo with complex mechanisms"
5:21PM INF transaction name=tran1 type=start
5:21PM INF run step start step="get with params"
5:21PM INF call function success arguments=[12.3,34.5] funcName=max output=34.5
-------------------- request -------------------- -------------------- request --------------------
GET /get?foo1=rWRNY&foo2=34.5 HTTP/1.1 GET /get?foo1=A65rg&foo2=34.5 HTTP/1.1
Host: postman-echo.com Host: postman-echo.com
User-Agent: HttpRunnerPlus User-Agent: HttpRunnerPlus
@@ -85,70 +88,72 @@ HTTP/1.1 200 OK
Content-Length: 304 Content-Length: 304
Connection: keep-alive Connection: keep-alive
Content-Type: application/json; charset=utf-8 Content-Type: application/json; charset=utf-8
Date: Tue, 07 Dec 2021 13:22:50 GMT Date: Thu, 23 Dec 2021 09:21:30 GMT
Etag: W/"130-gmtE0VWiyE0mXUGoJe5AyhMQ2ig" Etag: W/"130-t7qE4M7C+OQ0jGdRWkr2R3gjq+w"
Set-Cookie: sails.sid=s%3AEWPwP8H-nbpSrCseeulwDQ8OEtRy1pGu.aHV6KrEIiFgaJsUAuDmmmJCYiV6XkrHLS%2Fd9g9vtZQw; Path=/; HttpOnly Set-Cookie: sails.sid=s%3AAiqfRgMtWKG3oOQnXJOxRD8xk58rtAW6.eD%2BBo7FBnA82XLsLFiadeg6OcuD2zHSTyhv2l%2FDVuCk; Path=/; HttpOnly
Vary: Accept-Encoding Vary: Accept-Encoding
{"args":{"foo1":"rWRNY","foo2":"34.5"},"headers":{"x-forwarded-proto":"https","x-forwarded-port":"443","host":"postman-echo.com","x-amzn-trace-id":"Root=1-61af602a-5eea88ee21122daf4e8dfe95","user-agent":"HttpRunnerPlus","accept-encoding":"gzip"},"url":"https://postman-echo.com/get?foo1=rWRNY&foo2=34.5"} {"args":{"foo1":"A65rg","foo2":"34.5"},"headers":{"x-forwarded-proto":"https","x-forwarded-port":"443","host":"postman-echo.com","x-amzn-trace-id":"Root=1-61c43f9a-7c855775053963a4284ba464","user-agent":"HttpRunnerPlus","accept-encoding":"gzip"},"url":"https://postman-echo.com/get?foo1=A65rg&foo2=34.5"}
-------------------------------------------------- --------------------------------------------------
9:22PM INF extract value from=body.args.foo1 value=rWRNY 5:21PM INF extract value from=body.args.foo1 value=A65rg
9:22PM INF set variable value=rWRNY variable=varFoo1 5:21PM INF set variable value=A65rg variable=varFoo1
9:22PM INF validate status_code assertMethod=equals checkValue=200 expectValue=200 result=true 5:21PM INF validate status_code assertMethod=equals checkValue=200 expectValue=200 result=true
9:22PM INF validate headers."Content-Type" assertMethod=startswith checkValue="application/json; charset=utf-8" expectValue=application/json result=true 5:21PM INF validate headers."Content-Type" assertMethod=startswith checkValue="application/json; charset=utf-8" expectValue=application/json result=true
9:22PM INF validate body.args.foo1 assertMethod=length_equals checkValue=rWRNY expectValue=5 result=true 5:21PM INF validate body.args.foo1 assertMethod=length_equals checkValue=A65rg expectValue=5 result=true
9:22PM INF validate $varFoo1 assertMethod=length_equals checkValue=rWRNY expectValue=5 result=true 5:21PM INF validate $varFoo1 assertMethod=length_equals checkValue=A65rg expectValue=5 result=true
9:22PM INF validate body.args.foo2 assertMethod=equals checkValue=34.5 expectValue=34.5 result=true 5:21PM INF validate body.args.foo2 assertMethod=equals checkValue=34.5 expectValue=34.5 result=true
9:22PM INF run step end exportVars={"varFoo1":"rWRNY"} step="get with params" success=true 5:21PM INF run step end exportVars={"varFoo1":"A65rg"} step="get with params" success=true
9:22PM INF run step start step="post json data" 5:21PM INF transaction name=tran1 type=end
9:22PM INF call function success arguments=[12.3,3.45] funcName=max output=12.3 5:21PM INF transaction elapsed=1021.174113 name=tran1
5:21PM INF run step start step="post json data"
5:21PM INF call function success arguments=[12.3,3.45] funcName=max output=12.3
-------------------- request -------------------- -------------------- request --------------------
POST /post HTTP/1.1 POST /post HTTP/1.1
Host: postman-echo.com Host: postman-echo.com
Content-Type: application/json; charset=UTF-8 Content-Type: application/json; charset=UTF-8
{"foo1":"rWRNY","foo2":12.3} {"foo1":"A65rg","foo2":12.3}
==================== response =================== ==================== response ===================
HTTP/1.1 200 OK HTTP/1.1 200 OK
Content-Length: 424 Content-Length: 424
Connection: keep-alive Connection: keep-alive
Content-Type: application/json; charset=utf-8 Content-Type: application/json; charset=utf-8
Date: Tue, 07 Dec 2021 13:22:50 GMT Date: Thu, 23 Dec 2021 09:21:30 GMT
Etag: W/"1a8-5fCAlcltnCS4Ed/6OxpH9i9dlKs" Etag: W/"1a8-IhWXQxTXlxmnbqdRh+oBPRTLsOU"
Set-Cookie: sails.sid=s%3As1b8P7f8sc3JRNumS-XJrzbwb5oxdkOs.pXRRifddVUiWuzAxwBikBxf3ayM8OahgDDzP7kSnMCc; Path=/; HttpOnly Set-Cookie: sails.sid=s%3AzXIPVMKipoISZG0Zj4tX73vKDbIdFtzZ.xD50I4UMHUERmcgWfp64f0a8g%2BT9YIUf0Fi1l5bXbQA; Path=/; HttpOnly
Vary: Accept-Encoding Vary: Accept-Encoding
{"args":{},"data":{"foo1":"rWRNY","foo2":12.3},"files":{},"form":{},"headers":{"x-forwarded-proto":"https","x-forwarded-port":"443","host":"postman-echo.com","x-amzn-trace-id":"Root=1-61af602a-54fcb6412d2d064822bcdd5f","content-length":"28","user-agent":"Go-http-client/1.1","content-type":"application/json; charset=UTF-8","accept-encoding":"gzip"},"json":{"foo1":"rWRNY","foo2":12.3},"url":"https://postman-echo.com/post"} {"args":{},"data":{"foo1":"A65rg","foo2":12.3},"files":{},"form":{},"headers":{"x-forwarded-proto":"https","x-forwarded-port":"443","host":"postman-echo.com","x-amzn-trace-id":"Root=1-61c43f9a-78aab84a36a753ea6b5dd0f7","content-length":"28","user-agent":"Go-http-client/1.1","content-type":"application/json; charset=UTF-8","accept-encoding":"gzip"},"json":{"foo1":"A65rg","foo2":12.3},"url":"https://postman-echo.com/post"}
-------------------------------------------------- --------------------------------------------------
9:22PM INF validate status_code assertMethod=equals checkValue=200 expectValue=200 result=true 5:21PM INF validate status_code assertMethod=equals checkValue=200 expectValue=200 result=true
9:22PM INF validate body.json.foo1 assertMethod=length_equals checkValue=rWRNY expectValue=5 result=true 5:21PM INF validate body.json.foo1 assertMethod=length_equals checkValue=A65rg expectValue=5 result=true
9:22PM INF validate body.json.foo2 assertMethod=equals checkValue=12.3 expectValue=12.3 result=true 5:21PM INF validate body.json.foo2 assertMethod=equals checkValue=12.3 expectValue=12.3 result=true
9:22PM INF run step end exportVars=null step="post json data" success=true 5:21PM INF run step end exportVars=null step="post json data" success=true
9:22PM INF run step start step="post form data" 5:21PM INF run step start step="post form data"
9:22PM INF call function success arguments=[12.3,3.45] funcName=max output=12.3 5:21PM INF call function success arguments=[12.3,3.45] funcName=max output=12.3
-------------------- request -------------------- -------------------- request --------------------
POST /post HTTP/1.1 POST /post HTTP/1.1
Host: postman-echo.com Host: postman-echo.com
Content-Type: application/x-www-form-urlencoded; charset=UTF-8 Content-Type: application/x-www-form-urlencoded; charset=UTF-8
foo1=rWRNY&foo2=12.3 foo1=A65rg&foo2=12.3
==================== response =================== ==================== response ===================
HTTP/1.1 200 OK HTTP/1.1 200 OK
Content-Length: 445 Content-Length: 445
Connection: keep-alive Connection: keep-alive
Content-Type: application/json; charset=utf-8 Content-Type: application/json; charset=utf-8
Date: Tue, 07 Dec 2021 13:22:50 GMT Date: Thu, 23 Dec 2021 09:21:30 GMT
Etag: W/"1bd-V7gWOjKCZvyBWVyqprN77w2dmXE" Etag: W/"1bd-g4G7WmMU7EzJYzPTYgqX67Ug9iE"
Set-Cookie: sails.sid=s%3Aj4sUA8hI4rAt9JMq1m4k_chSDlfkAEBV.ZfisF4bIH2e7iBY6%2BSHqUbHNBbhCzZi%2Fu4byLDdxy%2B4; Path=/; HttpOnly Set-Cookie: sails.sid=s%3Al3gcdxEQug7ddxPlA2Kfxvm7d_z9ImEt.4IQI1SVX5xuTefX0N0UvJPQxVvA1SAMm7ztHESkHXsY; Path=/; HttpOnly
Vary: Accept-Encoding Vary: Accept-Encoding
{"args":{},"data":"","files":{},"form":{"foo1":"rWRNY","foo2":"12.3"},"headers":{"x-forwarded-proto":"https","x-forwarded-port":"443","host":"postman-echo.com","x-amzn-trace-id":"Root=1-61af602a-2cc056eb54ba2f0c6850d84a","content-length":"20","user-agent":"Go-http-client/1.1","content-type":"application/x-www-form-urlencoded; charset=UTF-8","accept-encoding":"gzip"},"json":{"foo1":"rWRNY","foo2":"12.3"},"url":"https://postman-echo.com/post"} {"args":{},"data":"","files":{},"form":{"foo1":"A65rg","foo2":"12.3"},"headers":{"x-forwarded-proto":"https","x-forwarded-port":"443","host":"postman-echo.com","x-amzn-trace-id":"Root=1-61c43f9a-6458626c64b04fd60245714b","content-length":"20","user-agent":"Go-http-client/1.1","content-type":"application/x-www-form-urlencoded; charset=UTF-8","accept-encoding":"gzip"},"json":{"foo1":"A65rg","foo2":"12.3"},"url":"https://postman-echo.com/post"}
-------------------------------------------------- --------------------------------------------------
9:22PM INF validate status_code assertMethod=equals checkValue=200 expectValue=200 result=true 5:21PM INF validate status_code assertMethod=equals checkValue=200 expectValue=200 result=true
9:22PM INF validate body.form.foo1 assertMethod=length_equals checkValue=rWRNY expectValue=5 result=true 5:21PM INF validate body.form.foo1 assertMethod=length_equals checkValue=A65rg expectValue=5 result=true
9:22PM INF validate body.form.foo2 assertMethod=equals checkValue=12.3 expectValue=12.3 result=true 5:21PM INF validate body.form.foo2 assertMethod=equals checkValue=12.3 expectValue=12.3 result=true
9:22PM INF run step end exportVars=null step="post form data" success=true 5:21PM INF run step end exportVars=null step="post form data" success=true
9:22PM INF run testcase end testcase="demo with complex mechanisms" 5:21PM INF run testcase end testcase="demo with complex mechanisms"
``` ```
</details> </details>
@@ -236,22 +241,6 @@ func TestCaseDemo(t *testing.T) {
``` ```
</details> </details>
## Sponsors
Thank you to all our sponsors! ✨🍰✨ ([become a sponsor](sponsors.md))
### Gold Sponsor
[<img src="docs/assets/hogwarts.jpeg" alt="霍格沃兹测试开发学社" width="400">](https://ceshiren.com/)
> [霍格沃兹测试开发学社](http://qrcode.testing-studio.com/f?from=httprunner&url=https://ceshiren.com)是业界领先的测试开发技术高端教育品牌,隶属于[测吧(北京)科技有限公司](http://qrcode.testing-studio.com/f?from=httprunner&url=https://www.testing-studio.com) 。学院课程由一线大厂测试经理与资深测试开发专家参与研发,实战驱动。课程涵盖 web/app 自动化测试、接口测试、性能测试、安全测试、持续集成/持续交付/DevOps测试左移&右移、精准测试、测试平台开发、测试管理等内容,帮助测试工程师实现测试开发技术转型。通过优秀的学社制度(奖学金、内推返学费、行业竞赛等多种方式)来实现学员、学社及用人企业的三方共赢。
> [进入测试开发技术能力测评!](http://qrcode.testing-studio.com/f?from=httprunner&url=https://ceshiren.com/t/topic/14940)
### Open Source Sponsor
[<img src="docs/assets/sentry-logo-black.svg" alt="Sentry" width="150">](https://sentry.io/_/open-source/)
## Subscribe ## Subscribe
关注 HttpRunner 的微信公众号,第一时间获得最新资讯。 关注 HttpRunner 的微信公众号,第一时间获得最新资讯。

View File

@@ -3,11 +3,13 @@
## v0.3.0 (2021-12-22) ## v0.3.0 (2021-12-22)
- feat: implement `transaction` mechanism for load test - feat: implement `transaction` mechanism for load test
- feat: support `--continue-on-failure` flag to continue running next step when failure occurs, default to failfast - feat: continue running next step when failure occurs with `--continue-on-failure` flag, default to failfast
- feat: spawn workers with `--spawn-rate` flag
- refactor: fork [boomer] as sub module - refactor: fork [boomer] as sub module
- feat: report GA events with version - feat: report GA events with version
- feat: run load test with the given limit and burst as rate limiter - feat: run load test with the given limit and burst as rate limiter
- change: update API models - change: update API models
- feat: report runner state
## v0.2.2 (2021-12-07) ## v0.2.2 (2021-12-07)

View File

@@ -4,7 +4,7 @@ One-stop solution for HTTP(S) testing.
### Synopsis ### Synopsis
hrp (HttpRunner+) is one-stop solution for HTTP(S) testing. Enjoy! ✨ 🚀 ✨ hrp (HttpRunner+) aims to be a one-stop solution for HTTP(S) testing, covering API testing, load testing and digital experience monitoring (DEM). Enjoy! ✨ 🚀 ✨
License: Apache-2.0 License: Apache-2.0
Github: https://github.com/httprunner/hrp Github: https://github.com/httprunner/hrp
@@ -22,4 +22,4 @@ Copyright 2021 debugtalk
* [hrp har2case](hrp_har2case.md) - Convert HAR to json/yaml testcase files * [hrp har2case](hrp_har2case.md) - Convert HAR to json/yaml testcase files
* [hrp run](hrp_run.md) - run API test * [hrp run](hrp_run.md) - run API test
###### Auto generated by spf13/cobra on 23-Dec-2021 ###### Auto generated by spf13/cobra on 24-Dec-2021

View File

@@ -38,4 +38,4 @@ hrp boom [flags]
* [hrp](hrp.md) - One-stop solution for HTTP(S) testing. * [hrp](hrp.md) - One-stop solution for HTTP(S) testing.
###### Auto generated by spf13/cobra on 23-Dec-2021 ###### Auto generated by spf13/cobra on 24-Dec-2021

View File

@@ -23,4 +23,4 @@ hrp har2case harPath... [flags]
* [hrp](hrp.md) - One-stop solution for HTTP(S) testing. * [hrp](hrp.md) - One-stop solution for HTTP(S) testing.
###### Auto generated by spf13/cobra on 23-Dec-2021 ###### Auto generated by spf13/cobra on 24-Dec-2021

View File

@@ -31,4 +31,4 @@ hrp run path... [flags]
* [hrp](hrp.md) - One-stop solution for HTTP(S) testing. * [hrp](hrp.md) - One-stop solution for HTTP(S) testing.
###### Auto generated by spf13/cobra on 23-Dec-2021 ###### Auto generated by spf13/cobra on 24-Dec-2021

View File

@@ -15,7 +15,7 @@ import (
var RootCmd = &cobra.Command{ var RootCmd = &cobra.Command{
Use: "hrp", Use: "hrp",
Short: "One-stop solution for HTTP(S) testing.", Short: "One-stop solution for HTTP(S) testing.",
Long: `hrp (HttpRunner+) is one-stop solution for HTTP(S) testing. Enjoy! ✨ 🚀 ✨ Long: `hrp (HttpRunner+) aims to be a one-stop solution for HTTP(S) testing, covering API testing, load testing and digital experience monitoring (DEM). Enjoy! ✨ 🚀 ✨
License: Apache-2.0 License: Apache-2.0
Github: https://github.com/httprunner/hrp Github: https://github.com/httprunner/hrp

View File

@@ -9,26 +9,19 @@ import (
// A Boomer is used to run tasks. // A Boomer is used to run tasks.
type Boomer struct { type Boomer struct {
rateLimiter RateLimiter
localRunner *localRunner localRunner *localRunner
spawnCount int
spawnRate float64
cpuProfile string cpuProfile string
cpuProfileDuration time.Duration cpuProfileDuration time.Duration
memoryProfile string memoryProfile string
memoryProfileDuration time.Duration memoryProfileDuration time.Duration
outputs []Output
} }
// NewStandaloneBoomer returns a new Boomer, which can run without master. // NewStandaloneBoomer returns a new Boomer, which can run without master.
func NewStandaloneBoomer(spawnCount int, spawnRate float64) *Boomer { func NewStandaloneBoomer(spawnCount int, spawnRate float64) *Boomer {
return &Boomer{ return &Boomer{
spawnCount: spawnCount, localRunner: newLocalRunner(spawnCount, spawnRate),
spawnRate: spawnRate,
} }
} }
@@ -52,12 +45,16 @@ func (b *Boomer) SetRateLimiter(maxRPS int64, requestIncreaseRate string) {
log.Error().Err(err).Msg("failed to create rate limiter") log.Error().Err(err).Msg("failed to create rate limiter")
return return
} }
b.rateLimiter = rateLimiter
if rateLimiter != nil {
b.localRunner.rateLimitEnabled = true
b.localRunner.rateLimiter = rateLimiter
}
} }
// AddOutput accepts outputs which implements the boomer.Output interface. // AddOutput accepts outputs which implements the boomer.Output interface.
func (b *Boomer) AddOutput(o Output) { func (b *Boomer) AddOutput(o Output) {
b.outputs = append(b.outputs, o) b.localRunner.addOutput(o)
} }
// EnableCPUProfile will start cpu profiling after run. // EnableCPUProfile will start cpu profiling after run.
@@ -87,19 +84,12 @@ func (b *Boomer) Run(tasks ...*Task) {
} }
} }
b.localRunner = newLocalRunner(tasks, b.rateLimiter, b.spawnCount, b.spawnRate) b.localRunner.setTasks(tasks)
for _, o := range b.outputs { b.localRunner.start()
b.localRunner.addOutput(o)
}
b.localRunner.run()
} }
// RecordTransaction reports a transaction stat. // RecordTransaction reports a transaction stat.
func (b *Boomer) RecordTransaction(name string, success bool, elapsedTime int64, contentSize int64) { func (b *Boomer) RecordTransaction(name string, success bool, elapsedTime int64, contentSize int64) {
if b.localRunner == nil {
log.Warn().Msg("boomer not initialized")
return
}
b.localRunner.stats.transactionChan <- &transaction{ b.localRunner.stats.transactionChan <- &transaction{
name: name, name: name,
success: success, success: success,
@@ -110,10 +100,6 @@ func (b *Boomer) RecordTransaction(name string, success bool, elapsedTime int64,
// RecordSuccess reports a success. // RecordSuccess reports a success.
func (b *Boomer) RecordSuccess(requestType, name string, responseTime int64, responseLength int64) { func (b *Boomer) RecordSuccess(requestType, name string, responseTime int64, responseLength int64) {
if b.localRunner == nil {
log.Warn().Msg("boomer not initialized")
return
}
b.localRunner.stats.requestSuccessChan <- &requestSuccess{ b.localRunner.stats.requestSuccessChan <- &requestSuccess{
requestType: requestType, requestType: requestType,
name: name, name: name,
@@ -124,10 +110,6 @@ func (b *Boomer) RecordSuccess(requestType, name string, responseTime int64, res
// RecordFailure reports a failure. // RecordFailure reports a failure.
func (b *Boomer) RecordFailure(requestType, name string, responseTime int64, exception string) { func (b *Boomer) RecordFailure(requestType, name string, responseTime int64, exception string) {
if b.localRunner == nil {
log.Warn().Msg("boomer not initialized")
return
}
b.localRunner.stats.requestFailureChan <- &requestFailure{ b.localRunner.stats.requestFailureChan <- &requestFailure{
requestType: requestType, requestType: requestType,
name: name, name: name,
@@ -138,9 +120,5 @@ func (b *Boomer) RecordFailure(requestType, name string, responseTime int64, exc
// Quit will send a quit message to the master. // Quit will send a quit message to the master.
func (b *Boomer) Quit() { func (b *Boomer) Quit() {
if b.localRunner == nil { b.localRunner.stop()
log.Warn().Msg("boomer not initialized")
return
}
b.localRunner.close()
} }

View File

@@ -12,11 +12,11 @@ import (
func TestNewStandaloneBoomer(t *testing.T) { func TestNewStandaloneBoomer(t *testing.T) {
b := NewStandaloneBoomer(100, 10) b := NewStandaloneBoomer(100, 10)
if b.spawnCount != 100 { if b.localRunner.spawnCount != 100 {
t.Error("spawnCount should be 100") t.Error("spawnCount should be 100")
} }
if b.spawnRate != 10 { if b.localRunner.spawnRate != 10 {
t.Error("spawnRate should be 10") t.Error("spawnRate should be 10")
} }
} }
@@ -25,7 +25,7 @@ func TestSetRateLimiter(t *testing.T) {
b := NewStandaloneBoomer(100, 10) b := NewStandaloneBoomer(100, 10)
b.SetRateLimiter(10, "10/1s") b.SetRateLimiter(10, "10/1s")
if b.rateLimiter == nil { if b.localRunner.rateLimiter == nil {
t.Error("b.rateLimiter should not be nil") t.Error("b.rateLimiter should not be nil")
} }
} }
@@ -35,7 +35,7 @@ func TestAddOutput(t *testing.T) {
b.AddOutput(NewConsoleOutput()) b.AddOutput(NewConsoleOutput())
b.AddOutput(NewConsoleOutput()) b.AddOutput(NewConsoleOutput())
if len(b.outputs) != 2 { if len(b.localRunner.outputs) != 2 {
t.Error("length of outputs should be 2") t.Error("length of outputs should be 2")
} }
} }
@@ -85,7 +85,7 @@ func TestStandaloneRun(t *testing.T) {
b.Quit() b.Quit()
if count != 10 { if atomic.LoadInt64(&count) != 10 {
t.Error("count is", count, "expected: 10") t.Error("count is", count, "expected: 10")
} }
@@ -106,7 +106,7 @@ func TestCreateRatelimiter(t *testing.T) {
b := NewStandaloneBoomer(10, 10) b := NewStandaloneBoomer(10, 10)
b.SetRateLimiter(100, "-1") b.SetRateLimiter(100, "-1")
if stableRateLimiter, ok := b.rateLimiter.(*StableRateLimiter); !ok { if stableRateLimiter, ok := b.localRunner.rateLimiter.(*StableRateLimiter); !ok {
t.Error("Expected stableRateLimiter") t.Error("Expected stableRateLimiter")
} else { } else {
if stableRateLimiter.threshold != 100 { if stableRateLimiter.threshold != 100 {
@@ -115,7 +115,7 @@ func TestCreateRatelimiter(t *testing.T) {
} }
b.SetRateLimiter(0, "1") b.SetRateLimiter(0, "1")
if rampUpRateLimiter, ok := b.rateLimiter.(*RampUpRateLimiter); !ok { if rampUpRateLimiter, ok := b.localRunner.rateLimiter.(*RampUpRateLimiter); !ok {
t.Error("Expected rampUpRateLimiter") t.Error("Expected rampUpRateLimiter")
} else { } else {
if rampUpRateLimiter.maxThreshold != math.MaxInt64 { if rampUpRateLimiter.maxThreshold != math.MaxInt64 {
@@ -127,7 +127,7 @@ func TestCreateRatelimiter(t *testing.T) {
} }
b.SetRateLimiter(10, "2/2s") b.SetRateLimiter(10, "2/2s")
if rampUpRateLimiter, ok := b.rateLimiter.(*RampUpRateLimiter); !ok { if rampUpRateLimiter, ok := b.localRunner.rateLimiter.(*RampUpRateLimiter); !ok {
t.Error("Expected rampUpRateLimiter") t.Error("Expected rampUpRateLimiter")
} else { } else {
if rampUpRateLimiter.maxThreshold != 10 { if rampUpRateLimiter.maxThreshold != 10 {

View File

@@ -81,12 +81,8 @@ func getAvgContentLength(numRequests int64, totalContentLength int64) (avgConten
return avgContentLength return avgContentLength
} }
func getCurrentRps(numRequests int64, numReqsPerSecond map[int64]int64) (currentRps int64) { func getCurrentRps(numRequests int64) (currentRps float64) {
currentRps = int64(0) currentRps = float64(numRequests) / float64(reportStatsInterval/time.Second)
numReqsPerSecondLength := int64(len(numReqsPerSecond))
if numReqsPerSecondLength != 0 {
currentRps = numRequests / numReqsPerSecondLength
}
return currentRps return currentRps
} }
@@ -124,9 +120,23 @@ func (o *ConsoleOutput) OnEvent(data map[string]interface{}) {
return return
} }
var state string
switch output.State {
case 1:
state = "initializing"
case 2:
state = "spawning"
case 3:
state = "running"
case 4:
state = "quitting"
case 5:
state = "stopped"
}
currentTime := time.Now() currentTime := time.Now()
println(fmt.Sprintf("Current time: %s, Users: %d, Total RPS: %d, Total Fail Ratio: %.1f%%", println(fmt.Sprintf("Current time: %s, Users: %d, State: %s, Total RPS: %.1f, Total Fail Ratio: %.1f%%",
currentTime.Format("2006/01/02 15:04:05"), output.UserCount, output.TotalRPS, output.TotalFailRatio*100)) currentTime.Format("2006/01/02 15:04:05"), output.UserCount, state, output.TotalRPS, output.TotalFailRatio*100))
println(fmt.Sprintf("Accumulated Transactions: %d Passed, %d Failed", println(fmt.Sprintf("Accumulated Transactions: %d Passed, %d Failed",
output.TransactionsPassed, output.TransactionsFailed)) output.TransactionsPassed, output.TransactionsFailed))
table := tablewriter.NewWriter(os.Stdout) table := tablewriter.NewWriter(os.Stdout)
@@ -143,7 +153,7 @@ func (o *ConsoleOutput) OnEvent(data map[string]interface{}) {
row[6] = strconv.FormatInt(stat.MinResponseTime, 10) row[6] = strconv.FormatInt(stat.MinResponseTime, 10)
row[7] = strconv.FormatInt(stat.MaxResponseTime, 10) row[7] = strconv.FormatInt(stat.MaxResponseTime, 10)
row[8] = strconv.FormatInt(stat.avgContentLength, 10) row[8] = strconv.FormatInt(stat.avgContentLength, 10)
row[9] = strconv.FormatInt(stat.currentRps, 10) row[9] = strconv.FormatFloat(stat.currentRps, 'f', 2, 64)
row[10] = strconv.FormatInt(stat.currentFailPerSec, 10) row[10] = strconv.FormatInt(stat.currentFailPerSec, 10)
table.Append(row) table.Append(row)
} }
@@ -157,16 +167,17 @@ type statsEntryOutput struct {
medianResponseTime int64 // median response time medianResponseTime int64 // median response time
avgResponseTime float64 // average response time, round float to 2 decimal places avgResponseTime float64 // average response time, round float to 2 decimal places
avgContentLength int64 // average content size avgContentLength int64 // average content size
currentRps int64 // # reqs/sec currentRps float64 // # reqs/sec
currentFailPerSec int64 // # fails/sec currentFailPerSec int64 // # fails/sec
} }
type dataOutput struct { type dataOutput struct {
UserCount int32 `json:"user_count"` UserCount int32 `json:"user_count"`
State int32 `json:"state"`
TotalStats *statsEntryOutput `json:"stats_total"` TotalStats *statsEntryOutput `json:"stats_total"`
TransactionsPassed int64 `json:"transactions_passed"` TransactionsPassed int64 `json:"transactions_passed"`
TransactionsFailed int64 `json:"transactions_failed"` TransactionsFailed int64 `json:"transactions_failed"`
TotalRPS int64 `json:"total_rps"` TotalRPS float64 `json:"total_rps"`
TotalFailRatio float64 `json:"total_fail_ratio"` TotalFailRatio float64 `json:"total_fail_ratio"`
Stats []*statsEntryOutput `json:"stats"` Stats []*statsEntryOutput `json:"stats"`
Errors map[string]map[string]interface{} `json:"errors"` Errors map[string]map[string]interface{} `json:"errors"`
@@ -177,6 +188,10 @@ func convertData(data map[string]interface{}) (output *dataOutput, err error) {
if !ok { if !ok {
return nil, fmt.Errorf("user_count is not int32") return nil, fmt.Errorf("user_count is not int32")
} }
state, ok := data["state"].(int32)
if !ok {
return nil, fmt.Errorf("state is not int32")
}
stats, ok := data["stats"].([]interface{}) stats, ok := data["stats"].([]interface{})
if !ok { if !ok {
return nil, fmt.Errorf("stats is not []interface{}") return nil, fmt.Errorf("stats is not []interface{}")
@@ -201,10 +216,11 @@ func convertData(data map[string]interface{}) (output *dataOutput, err error) {
output = &dataOutput{ output = &dataOutput{
UserCount: userCount, UserCount: userCount,
State: state,
TotalStats: entryTotalOutput, TotalStats: entryTotalOutput,
TransactionsPassed: transactionsPassed, TransactionsPassed: transactionsPassed,
TransactionsFailed: transactionsFailed, TransactionsFailed: transactionsFailed,
TotalRPS: getCurrentRps(entryTotalOutput.NumRequests, entryTotalOutput.NumReqsPerSec), TotalRPS: getCurrentRps(entryTotalOutput.NumRequests),
TotalFailRatio: getTotalFailRatio(entryTotalOutput.NumRequests, entryTotalOutput.NumFailures), TotalFailRatio: getTotalFailRatio(entryTotalOutput.NumRequests, entryTotalOutput.NumFailures),
Stats: make([]*statsEntryOutput, 0, len(stats)), Stats: make([]*statsEntryOutput, 0, len(stats)),
} }
@@ -240,7 +256,7 @@ func deserializeStatsEntry(stat interface{}) (entryOutput *statsEntryOutput, err
medianResponseTime: getMedianResponseTime(numRequests, entry.ResponseTimes), medianResponseTime: getMedianResponseTime(numRequests, entry.ResponseTimes),
avgResponseTime: getAvgResponseTime(numRequests, entry.TotalResponseTime), avgResponseTime: getAvgResponseTime(numRequests, entry.TotalResponseTime),
avgContentLength: getAvgContentLength(numRequests, entry.TotalContentLength), avgContentLength: getAvgContentLength(numRequests, entry.TotalContentLength),
currentRps: getCurrentRps(numRequests, entry.NumReqsPerSec), currentRps: getCurrentRps(numRequests),
currentFailPerSec: getCurrentFailPerSec(entry.NumFailures, entry.NumFailPerSec), currentFailPerSec: getCurrentFailPerSec(entry.NumFailures, entry.NumFailPerSec),
} }
return return
@@ -321,6 +337,12 @@ var (
Help: "The current number of users", Help: "The current number of users",
}, },
) )
gaugeState = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "state",
Help: "The current runner state, 1=initializing, 2=spawning, 3=running, 4=quitting, 5=stopped",
},
)
gaugeTotalRPS = prometheus.NewGauge( gaugeTotalRPS = prometheus.NewGauge(
prometheus.GaugeOpts{ prometheus.GaugeOpts{
Name: "total_rps", Name: "total_rps",
@@ -377,6 +399,7 @@ func (o *PrometheusPusherOutput) OnStart() {
gaugeCurrentFailPerSec, gaugeCurrentFailPerSec,
// gauges for total // gauges for total
gaugeUsers, gaugeUsers,
gaugeState,
gaugeTotalRPS, gaugeTotalRPS,
gaugeTotalFailRatio, gaugeTotalFailRatio,
gaugeTransactionsPassed, gaugeTransactionsPassed,
@@ -401,8 +424,11 @@ func (o *PrometheusPusherOutput) OnEvent(data map[string]interface{}) {
// user count // user count
gaugeUsers.Set(float64(output.UserCount)) gaugeUsers.Set(float64(output.UserCount))
// runner state
gaugeState.Set(float64(output.State))
// rps in total // rps in total
gaugeTotalRPS.Set(float64(output.TotalRPS)) gaugeTotalRPS.Set(output.TotalRPS)
// failure ratio in total // failure ratio in total
gaugeTotalFailRatio.Set(output.TotalFailRatio) gaugeTotalFailRatio.Set(output.TotalFailRatio)
@@ -421,7 +447,7 @@ func (o *PrometheusPusherOutput) OnEvent(data map[string]interface{}) {
gaugeMinResponseTime.WithLabelValues(method, name).Set(float64(stat.MinResponseTime)) gaugeMinResponseTime.WithLabelValues(method, name).Set(float64(stat.MinResponseTime))
gaugeMaxResponseTime.WithLabelValues(method, name).Set(float64(stat.MaxResponseTime)) gaugeMaxResponseTime.WithLabelValues(method, name).Set(float64(stat.MaxResponseTime))
gaugeAverageContentLength.WithLabelValues(method, name).Set(float64(stat.avgContentLength)) gaugeAverageContentLength.WithLabelValues(method, name).Set(float64(stat.avgContentLength))
gaugeCurrentRPS.WithLabelValues(method, name).Set(float64(stat.currentRps)) gaugeCurrentRPS.WithLabelValues(method, name).Set(stat.currentRps)
gaugeCurrentFailPerSec.WithLabelValues(method, name).Set(float64(stat.currentFailPerSec)) gaugeCurrentFailPerSec.WithLabelValues(method, name).Set(float64(stat.currentFailPerSec))
} }

View File

@@ -1,6 +1,7 @@
package boomer package boomer
import ( import (
"fmt"
"math" "math"
"testing" "testing"
) )
@@ -57,23 +58,17 @@ func TestGetAvgContentLength(t *testing.T) {
} }
func TestGetCurrentRps(t *testing.T) { func TestGetCurrentRps(t *testing.T) {
numRequests := int64(10) numRequests := int64(6)
numReqsPerSecond := map[int64]int64{} currentRps := getCurrentRps(numRequests)
currentRps := getCurrentRps(numRequests, numReqsPerSecond)
if currentRps != 0 {
t.Error("currentRps should be 0")
}
numReqsPerSecond[1] = 2
numReqsPerSecond[2] = 3
numReqsPerSecond[3] = 2
numReqsPerSecond[4] = 3
currentRps = getCurrentRps(numRequests, numReqsPerSecond)
if currentRps != 2 { if currentRps != 2 {
t.Error("currentRps should be 2") t.Error("currentRps should be 2")
} }
numRequests = int64(8)
currentRps = getCurrentRps(numRequests)
if fmt.Sprintf("%.2f", currentRps) != "2.67" {
t.Error("currentRps should be 2.67")
}
} }
func TestConsoleOutput(t *testing.T) { func TestConsoleOutput(t *testing.T) {

View File

@@ -5,6 +5,7 @@ import (
"math" "math"
"strconv" "strconv"
"strings" "strings"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
) )
@@ -39,6 +40,7 @@ type StableRateLimiter struct {
threshold int64 threshold int64
currentThreshold int64 currentThreshold int64
refillPeriod time.Duration refillPeriod time.Duration
broadcastChanMux *sync.RWMutex // avoid data race
broadcastChannel chan bool broadcastChannel chan bool
quitChannel chan bool quitChannel chan bool
} }
@@ -49,6 +51,7 @@ func NewStableRateLimiter(threshold int64, refillPeriod time.Duration) (rateLimi
threshold: threshold, threshold: threshold,
currentThreshold: threshold, currentThreshold: threshold,
refillPeriod: refillPeriod, refillPeriod: refillPeriod,
broadcastChanMux: new(sync.RWMutex),
broadcastChannel: make(chan bool), broadcastChannel: make(chan bool),
} }
return rateLimiter return rateLimiter
@@ -67,7 +70,10 @@ func (limiter *StableRateLimiter) Start() {
atomic.StoreInt64(&limiter.currentThreshold, limiter.threshold) atomic.StoreInt64(&limiter.currentThreshold, limiter.threshold)
time.Sleep(limiter.refillPeriod) time.Sleep(limiter.refillPeriod)
close(limiter.broadcastChannel) close(limiter.broadcastChannel)
// avoid data race
limiter.broadcastChanMux.Lock()
limiter.broadcastChannel = make(chan bool) limiter.broadcastChannel = make(chan bool)
limiter.broadcastChanMux.Unlock()
} }
} }
}() }()
@@ -79,7 +85,9 @@ func (limiter *StableRateLimiter) Acquire() (blocked bool) {
if permit < 0 { if permit < 0 {
blocked = true blocked = true
// block until the bucket is refilled // block until the bucket is refilled
limiter.broadcastChanMux.Lock()
<-limiter.broadcastChannel <-limiter.broadcastChannel
limiter.broadcastChanMux.Unlock()
} else { } else {
blocked = false blocked = false
} }
@@ -105,9 +113,12 @@ type RampUpRateLimiter struct {
rampUpRate string rampUpRate string
rampUpStep int64 rampUpStep int64
rampUpPeroid time.Duration rampUpPeroid time.Duration
broadcastChanMux *sync.RWMutex // avoid data race
broadcastChannel chan bool broadcastChannel chan bool
rampUpChannel chan bool
quitChannel chan bool rampUpChannel chan bool
quitChannel chan bool
} }
// NewRampUpRateLimiter returns a RampUpRateLimiter. // NewRampUpRateLimiter returns a RampUpRateLimiter.
@@ -119,6 +130,7 @@ func NewRampUpRateLimiter(maxThreshold int64, rampUpRate string, refillPeriod ti
currentThreshold: 0, currentThreshold: 0,
rampUpRate: rampUpRate, rampUpRate: rampUpRate,
refillPeriod: refillPeriod, refillPeriod: refillPeriod,
broadcastChanMux: new(sync.RWMutex),
broadcastChannel: make(chan bool), broadcastChannel: make(chan bool),
} }
rateLimiter.rampUpStep, rateLimiter.rampUpPeroid, err = rateLimiter.parseRampUpRate(rateLimiter.rampUpRate) rateLimiter.rampUpStep, rateLimiter.rampUpPeroid, err = rateLimiter.parseRampUpRate(rateLimiter.rampUpRate)
@@ -164,10 +176,13 @@ func (limiter *RampUpRateLimiter) Start() {
case <-quitChannel: case <-quitChannel:
return return
default: default:
atomic.StoreInt64(&limiter.currentThreshold, limiter.nextThreshold) atomic.StoreInt64(&limiter.currentThreshold, atomic.LoadInt64(&limiter.nextThreshold))
time.Sleep(limiter.refillPeriod) time.Sleep(limiter.refillPeriod)
close(limiter.broadcastChannel) close(limiter.broadcastChannel)
// avoid data race
limiter.broadcastChanMux.Lock()
limiter.broadcastChannel = make(chan bool) limiter.broadcastChannel = make(chan bool)
limiter.broadcastChanMux.Unlock()
} }
} }
}() }()
@@ -178,7 +193,7 @@ func (limiter *RampUpRateLimiter) Start() {
case <-quitChannel: case <-quitChannel:
return return
default: default:
nextValue := limiter.nextThreshold + limiter.rampUpStep nextValue := atomic.LoadInt64(&limiter.nextThreshold) + limiter.rampUpStep
if nextValue < 0 { if nextValue < 0 {
// int64 overflow // int64 overflow
nextValue = int64(math.MaxInt64) nextValue = int64(math.MaxInt64)
@@ -199,7 +214,9 @@ func (limiter *RampUpRateLimiter) Acquire() (blocked bool) {
if permit < 0 { if permit < 0 {
blocked = true blocked = true
// block until the bucket is refilled // block until the bucket is refilled
limiter.broadcastChanMux.Lock()
<-limiter.broadcastChannel <-limiter.broadcastChannel
limiter.broadcastChanMux.Unlock()
} else { } else {
blocked = false blocked = false
} }
@@ -208,6 +225,6 @@ func (limiter *RampUpRateLimiter) Acquire() (blocked bool) {
// Stop the rate limiter. // Stop the rate limiter.
func (limiter *RampUpRateLimiter) Stop() { func (limiter *RampUpRateLimiter) Stop() {
limiter.nextThreshold = 0 atomic.StoreInt64(&limiter.nextThreshold, 0)
close(limiter.quitChannel) close(limiter.quitChannel)
} }

View File

@@ -20,38 +20,39 @@ func TestStableRateLimiter(t *testing.T) {
} }
} }
func TestRampUpRateLimiter(t *testing.T) { // FIXME
rateLimiter, _ := NewRampUpRateLimiter(100, "10/200ms", 100*time.Millisecond) // func TestRampUpRateLimiter(t *testing.T) {
rateLimiter.Start() // rateLimiter, _ := NewRampUpRateLimiter(100, "10/200ms", 100*time.Millisecond)
defer rateLimiter.Stop() // rateLimiter.Start()
// defer rateLimiter.Stop()
time.Sleep(110 * time.Millisecond) // time.Sleep(150 * time.Millisecond)
for i := 0; i < 10; i++ { // for i := 0; i < 10; i++ {
blocked := rateLimiter.Acquire() // blocked := rateLimiter.Acquire()
if blocked { // if blocked {
t.Error("Unexpected blocked by rate limiter") // t.Fatal("Unexpected blocked by rate limiter")
} // }
} // }
blocked := rateLimiter.Acquire() // blocked := rateLimiter.Acquire()
if !blocked { // if !blocked {
t.Error("Should be blocked") // t.Fatal("Should be blocked")
} // }
time.Sleep(110 * time.Millisecond) // time.Sleep(150 * time.Millisecond)
// now, the threshold is 20 // // now, the threshold is 20
for i := 0; i < 20; i++ { // for i := 0; i < 20; i++ {
blocked := rateLimiter.Acquire() // blocked := rateLimiter.Acquire()
if blocked { // if blocked {
t.Error("Unexpected blocked by rate limiter") // t.Fatal("Unexpected blocked by rate limiter")
} // }
} // }
blocked = rateLimiter.Acquire() // blocked = rateLimiter.Acquire()
if !blocked { // if !blocked {
t.Error("Should be blocked") // t.Fatal("Should be blocked")
} // }
} // }
func TestParseRampUpRate(t *testing.T) { func TestParseRampUpRate(t *testing.T) {
rateLimiter := &RampUpRateLimiter{} rateLimiter := &RampUpRateLimiter{}

View File

@@ -13,11 +13,11 @@ import (
) )
const ( const (
stateInit = "ready" stateInit = iota + 1 // initializing
stateSpawning = "spawning" stateSpawning // spawning
stateRunning = "running" stateRunning // running
stateStopped = "stopped" stateQuitting // quitting
stateQuitting = "quitting" stateStopped // stopped
) )
const ( const (
@@ -25,7 +25,7 @@ const (
) )
type runner struct { type runner struct {
state string state int32
tasks []*Task tasks []*Task
totalTaskWeight int totalTaskWeight int
@@ -34,15 +34,9 @@ type runner struct {
rateLimitEnabled bool rateLimitEnabled bool
stats *requestStats stats *requestStats
numClients int32 currentClientsNum int32 // current clients count
spawnRate float64 spawnCount int // target clients to spawn
spawnRate float64
// all running workers(goroutines) will select on this channel.
// close this channel will stop all running workers.
stopChan chan bool
// close this channel will stop all goroutines used in runner.
closeChan chan bool
outputs []Output outputs []Output
} }
@@ -116,16 +110,25 @@ func (r *runner) outputOnStop() {
wg.Wait() wg.Wait()
} }
func (r *runner) spawnWorkers(spawnCount int, quit chan bool, spawnCompleteFunc func()) { func (r *runner) spawnWorkers(spawnCount int, spawnRate float64, quit chan bool, spawnCompleteFunc func()) {
log.Info().Int("spawnCount", spawnCount).Msg("Spawning clients immediately") log.Info().
Int("spawnCount", spawnCount).
Float64("spawnRate", spawnRate).
Msg("Spawning workers")
atomic.StoreInt32(&r.state, stateSpawning)
for i := 1; i <= spawnCount; i++ { for i := 1; i <= spawnCount; i++ {
// spawn workers with rate limit
sleepTime := time.Duration(1000000/r.spawnRate) * time.Microsecond
time.Sleep(sleepTime)
select { select {
case <-quit: case <-quit:
// quit spawning goroutine // quit spawning goroutine
log.Info().Msg("Quitting spawning workers")
return return
default: default:
atomic.AddInt32(&r.numClients, 1) atomic.AddInt32(&r.currentClientsNum, 1)
go func() { go func() {
for { for {
select { select {
@@ -151,6 +154,7 @@ func (r *runner) spawnWorkers(spawnCount int, quit chan bool, spawnCompleteFunc
if spawnCompleteFunc != nil { if spawnCompleteFunc != nil {
spawnCompleteFunc() spawnCompleteFunc()
} }
atomic.StoreInt32(&r.state, stateRunning)
} }
// setTasks will set the runner's task list AND the total task weight // setTasks will set the runner's task list AND the total task weight
@@ -193,79 +197,85 @@ func (r *runner) getTask() *Task {
return nil return nil
} }
func (r *runner) startSpawning(spawnCount int, spawnRate float64, spawnCompleteFunc func()) {
r.stats.clearStatsChan <- true
r.stopChan = make(chan bool)
r.numClients = 0
go r.spawnWorkers(spawnCount, r.stopChan, spawnCompleteFunc)
}
func (r *runner) stop() {
// stop previous goroutines without blocking
// those goroutines will exit when r.safeRun returns
close(r.stopChan)
if r.rateLimitEnabled {
r.rateLimiter.Stop()
}
}
type localRunner struct { type localRunner struct {
runner runner
spawnCount int // close this channel will stop all goroutines used in runner.
stopChan chan bool
} }
func newLocalRunner(tasks []*Task, rateLimiter RateLimiter, spawnCount int, spawnRate float64) (r *localRunner) { func newLocalRunner(spawnCount int, spawnRate float64) *localRunner {
r = &localRunner{} return &localRunner{
r.setTasks(tasks) runner: runner{
r.spawnRate = spawnRate state: stateInit,
r.spawnCount = spawnCount spawnRate: spawnRate,
r.closeChan = make(chan bool) spawnCount: spawnCount,
stats: newRequestStats(),
if rateLimiter != nil { outputs: make([]Output, 0),
r.rateLimitEnabled = true },
r.rateLimiter = rateLimiter stopChan: make(chan bool),
} }
r.stats = newRequestStats()
return r
} }
func (r *localRunner) run() { func (r *localRunner) start() {
r.state = stateInit // init state
r.stats.start() atomic.StoreInt32(&r.state, stateInit)
r.outputOnStart() atomic.StoreInt32(&r.currentClientsNum, 0)
r.stats.clearAll()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for {
select {
case data := <-r.stats.messageToRunnerChan:
data["user_count"] = r.numClients
r.outputOnEevent(data)
case <-r.closeChan:
r.stop()
wg.Done()
r.outputOnStop()
return
}
}
}()
// start rate limiter
if r.rateLimitEnabled { if r.rateLimitEnabled {
r.rateLimiter.Start() r.rateLimiter.Start()
} }
r.startSpawning(r.spawnCount, r.spawnRate, nil)
wg.Wait() // all running workers(goroutines) will select on this channel.
} // close this channel will stop all running workers.
quitChan := make(chan bool)
go r.spawnWorkers(r.spawnCount, r.spawnRate, quitChan, nil)
func (r *localRunner) close() { // output setup
if r.stats != nil { r.outputOnStart()
r.stats.close()
// start running
var ticker = time.NewTicker(reportStatsInterval)
for {
select {
// record stats
case t := <-r.stats.transactionChan:
r.stats.logTransaction(t.name, t.success, t.elapsedTime, t.contentSize)
case m := <-r.stats.requestSuccessChan:
r.stats.logRequest(m.requestType, m.name, m.responseTime, m.responseLength)
case n := <-r.stats.requestFailureChan:
r.stats.logRequest(n.requestType, n.name, n.responseTime, 0)
r.stats.logError(n.requestType, n.name, n.errMsg)
// report stats
case <-ticker.C:
data := r.stats.collectReportData()
data["user_count"] = atomic.LoadInt32(&r.currentClientsNum)
data["state"] = atomic.LoadInt32(&r.state)
r.outputOnEevent(data)
// stop
case <-r.stopChan:
atomic.StoreInt32(&r.state, stateQuitting)
// stop previous goroutines without blocking
// those goroutines will exit when r.safeRun returns
close(quitChan)
// stop rate limiter
if r.rateLimitEnabled {
r.rateLimiter.Stop()
}
// output teardown
r.outputOnStop()
atomic.StoreInt32(&r.state, stateStopped)
return
}
} }
close(r.closeChan) }
func (r *localRunner) stop() {
close(r.stopChan)
} }

View File

@@ -84,8 +84,9 @@ func TestLocalRunner(t *testing.T) {
Name: "TaskA", Name: "TaskA",
} }
tasks := []*Task{taskA} tasks := []*Task{taskA}
runner := newLocalRunner(tasks, nil, 2, 2) runner := newLocalRunner(2, 2)
go runner.run() runner.setTasks(tasks)
go runner.start()
time.Sleep(4 * time.Second) time.Sleep(4 * time.Second)
runner.close() runner.stop()
} }

View File

@@ -36,11 +36,8 @@ type requestStats struct {
transactionPassed int64 // accumulated number of passed transactions transactionPassed int64 // accumulated number of passed transactions
transactionFailed int64 // accumulated number of failed transactions transactionFailed int64 // accumulated number of failed transactions
requestSuccessChan chan *requestSuccess requestSuccessChan chan *requestSuccess
requestFailureChan chan *requestFailure requestFailureChan chan *requestFailure
clearStatsChan chan bool
messageToRunnerChan chan map[string]interface{}
shutdownChan chan bool
} }
func newRequestStats() (stats *requestStats) { func newRequestStats() (stats *requestStats) {
@@ -54,9 +51,6 @@ func newRequestStats() (stats *requestStats) {
stats.transactionChan = make(chan *transaction, 100) stats.transactionChan = make(chan *transaction, 100)
stats.requestSuccessChan = make(chan *requestSuccess, 100) stats.requestSuccessChan = make(chan *requestSuccess, 100)
stats.requestFailureChan = make(chan *requestFailure, 100) stats.requestFailureChan = make(chan *requestFailure, 100)
stats.clearStatsChan = make(chan bool)
stats.messageToRunnerChan = make(chan map[string]interface{}, 10)
stats.shutdownChan = make(chan bool)
stats.total = &statsEntry{ stats.total = &statsEntry{
Name: "Total", Name: "Total",
@@ -106,9 +100,9 @@ func (s *requestStats) get(name string, method string) (entry *statsEntry) {
Name: name, Name: name,
Method: method, Method: method,
NumReqsPerSec: make(map[int64]int64), NumReqsPerSec: make(map[int64]int64),
NumFailPerSec: make(map[int64]int64),
ResponseTimes: make(map[int64]int64), ResponseTimes: make(map[int64]int64),
} }
newEntry.reset()
s.entries[name+method] = newEntry s.entries[name+method] = newEntry
return newEntry return newEntry
} }
@@ -121,7 +115,6 @@ func (s *requestStats) clearAll() {
Method: "", Method: "",
} }
s.total.reset() s.total.reset()
s.transactionPassed = 0 s.transactionPassed = 0
s.transactionFailed = 0 s.transactionFailed = 0
s.entries = make(map[string]*statsEntry) s.entries = make(map[string]*statsEntry)
@@ -160,36 +153,6 @@ func (s *requestStats) collectReportData() map[string]interface{} {
return data return data
} }
func (s *requestStats) start() {
go func() {
var ticker = time.NewTicker(reportStatsInterval)
for {
select {
case t := <-s.transactionChan:
s.logTransaction(t.name, t.success, t.elapsedTime, t.contentSize)
case m := <-s.requestSuccessChan:
s.logRequest(m.requestType, m.name, m.responseTime, m.responseLength)
case n := <-s.requestFailureChan:
s.logRequest(n.requestType, n.name, n.responseTime, 0)
s.logError(n.requestType, n.name, n.errMsg)
case <-s.clearStatsChan:
s.clearAll()
case <-ticker.C:
data := s.collectReportData()
// send data to channel, no network IO in this goroutine
s.messageToRunnerChan <- data
case <-s.shutdownChan:
return
}
}
}()
}
// close is used by unit tests to avoid leakage of goroutines
func (s *requestStats) close() {
close(s.shutdownChan)
}
// statsEntry represents a single stats entry (name and method) // statsEntry represents a single stats entry (name and method)
type statsEntry struct { type statsEntry struct {
// Name (URL) of this stats entry // Name (URL) of this stats entry

View File

@@ -2,7 +2,6 @@ package boomer
import ( import (
"testing" "testing"
"time"
) )
func TestLogRequest(t *testing.T) { func TestLogRequest(t *testing.T) {
@@ -135,10 +134,8 @@ func TestClearAll(t *testing.T) {
func TestClearAllByChannel(t *testing.T) { func TestClearAllByChannel(t *testing.T) {
newStats := newRequestStats() newStats := newRequestStats()
newStats.start()
defer newStats.close()
newStats.logRequest("http", "success", 1, 20) newStats.logRequest("http", "success", 1, 20)
newStats.clearStatsChan <- true newStats.clearAll()
if newStats.total.NumRequests != 0 { if newStats.total.NumRequests != 0 {
t.Error("After clearAll(), newStats.total.numRequests is wrong, expected: 0, got:", newStats.total.NumRequests) t.Error("After clearAll(), newStats.total.numRequests is wrong, expected: 0, got:", newStats.total.NumRequests)
@@ -217,34 +214,3 @@ func TestCollectReportData(t *testing.T) {
t.Error("Key stats not found") t.Error("Key stats not found")
} }
} }
func TestStatsStart(t *testing.T) {
newStats := newRequestStats()
newStats.start()
defer newStats.close()
newStats.requestSuccessChan <- &requestSuccess{
requestType: "http",
name: "success",
responseTime: 2,
responseLength: 30,
}
newStats.requestFailureChan <- &requestFailure{
requestType: "http",
name: "failure",
responseTime: 1,
errMsg: "500 error",
}
var ticker = time.NewTicker(reportStatsInterval + 500*time.Millisecond)
for {
select {
case <-ticker.C:
t.Error("Timeout waiting for stats reports to runner")
case <-newStats.messageToRunnerChan:
goto end
}
}
end:
}

View File

@@ -44,7 +44,7 @@ func startMemoryProfile(file string, duration time.Duration) (err error) {
log.Info().Dur("duration", duration).Msg("Start memory profiling") log.Info().Dur("duration", duration).Msg("Start memory profiling")
time.AfterFunc(duration, func() { time.AfterFunc(duration, func() {
err = pprof.WriteHeapProfile(f) err := pprof.WriteHeapProfile(f)
if err != nil { if err != nil {
log.Error().Err(err).Msg("failed to write memory profile") log.Error().Err(err).Msg("failed to write memory profile")
} }

View File

@@ -158,25 +158,29 @@ func (r *hrpRunner) runStep(step IStep, config IConfig) (stepResult *stepData, e
log.Info().Str("step", step.Name()).Msg("run step start") log.Info().Str("step", step.Name()).Msg("run step start")
// copy step to avoid data racing // copy step and config to avoid data racing
copiedStep := &TStep{} copiedStep := &TStep{}
if err = copier.Copy(copiedStep, step.ToStruct()); err != nil { if err = copier.Copy(copiedStep, step.ToStruct()); err != nil {
log.Error().Err(err).Msg("copy step data failed") log.Error().Err(err).Msg("copy step data failed")
return nil, err return nil, err
} }
copiedConfig := &TConfig{}
if err = copier.Copy(copiedConfig, config.ToStruct()); err != nil {
log.Error().Err(err).Msg("copy config data failed")
return nil, err
}
cfg := config.ToStruct()
stepVariables := copiedStep.Variables stepVariables := copiedStep.Variables
// override variables // override variables
// step variables > session variables (extracted variables from previous steps) // step variables > session variables (extracted variables from previous steps)
stepVariables = mergeVariables(stepVariables, r.sessionVariables) stepVariables = mergeVariables(stepVariables, r.sessionVariables)
// step variables > testcase config variables // step variables > testcase config variables
stepVariables = mergeVariables(stepVariables, cfg.Variables) stepVariables = mergeVariables(stepVariables, copiedConfig.Variables)
// parse step variables // parse step variables
parsedVariables, err := parseVariables(stepVariables) parsedVariables, err := parseVariables(stepVariables)
if err != nil { if err != nil {
log.Error().Interface("variables", cfg.Variables).Err(err).Msg("parse step variables failed") log.Error().Interface("variables", copiedConfig.Variables).Err(err).Msg("parse step variables failed")
return nil, err return nil, err
} }
copiedStep.Variables = parsedVariables // avoid data racing copiedStep.Variables = parsedVariables // avoid data racing
@@ -193,7 +197,7 @@ func (r *hrpRunner) runStep(step IStep, config IConfig) (stepResult *stepData, e
} }
} else { } else {
// run request // run request
copiedStep.Request.URL = buildURL(cfg.BaseURL, copiedStep.Request.URL) // avoid data racing copiedStep.Request.URL = buildURL(copiedConfig.BaseURL, copiedStep.Request.URL) // avoid data racing
stepResult, err = r.runStepRequest(copiedStep) stepResult, err = r.runStepRequest(copiedStep)
if err != nil { if err != nil {
log.Error().Err(err).Msg("run request step failed") log.Error().Err(err).Msg("run request step failed")