Merge remote-tracking branch 'origin/master'

This commit is contained in:
machongwei
2022-08-18 20:09:59 +08:00
25 changed files with 1153 additions and 225 deletions

View File

@@ -64,6 +64,18 @@ type Profile struct {
DisableKeepalive bool `json:"disable-keepalive,omitempty" yaml:"disable-keepalive,omitempty" mapstructure:"disable-keepalive,omitempty"`
}
func NewProfile() *Profile {
return &Profile{
SpawnCount: 1,
SpawnRate: 1,
MaxRPS: -1,
LoopCount: -1,
RequestIncreaseRate: "-1",
CPUProfileDuration: 30 * time.Second,
MemoryProfileDuration: 30 * time.Second,
}
}
func (b *Boomer) GetProfile() *Profile {
switch b.mode {
case DistributedMasterMode:
@@ -159,7 +171,18 @@ func (b *Boomer) RunWorker() {
// TestCaseBytesChan gets test case bytes chan
func (b *Boomer) TestCaseBytesChan() chan []byte {
return b.masterRunner.testCaseBytes
return b.masterRunner.testCaseBytesChan
}
func (b *Boomer) GetTestCaseBytes() []byte {
switch b.mode {
case DistributedMasterMode:
return b.masterRunner.testCasesBytes
case DistributedWorkerMode:
return b.workerRunner.testCasesBytes
default:
return nil
}
}
func ProfileToBytes(profile *Profile) []byte {
@@ -193,7 +216,7 @@ func (b *Boomer) GetTasksChan() chan *task {
func (b *Boomer) GetRebalanceChan() chan bool {
switch b.mode {
case DistributedWorkerMode:
return b.workerRunner.rebalance
return b.workerRunner.controller.getRebalanceChan()
default:
return nil
}
@@ -482,6 +505,9 @@ func (b *Boomer) Start(Args *Profile) error {
if b.masterRunner.isStopping() {
return errors.New("Please wait for all workers to finish")
}
if int(Args.SpawnCount) < b.masterRunner.server.getAvailableClientsLength() {
return errors.New("spawn count should be greater than available worker count")
}
b.SetSpawnCount(Args.SpawnCount)
b.SetSpawnRate(Args.SpawnRate)
b.SetRunTime(Args.RunTime)
@@ -495,6 +521,9 @@ func (b *Boomer) ReBalance(Args *Profile) error {
if !b.masterRunner.isStarting() {
return errors.New("no start")
}
if int(Args.SpawnCount) < b.masterRunner.server.getAvailableClientsLength() {
return errors.New("spawn count should be greater than available worker count")
}
b.SetSpawnCount(Args.SpawnCount)
b.SetSpawnRate(Args.SpawnRate)
b.SetRunTime(Args.RunTime)
@@ -520,8 +549,9 @@ func (b *Boomer) GetWorkersInfo() []WorkerNode {
func (b *Boomer) GetMasterInfo() map[string]interface{} {
masterInfo := make(map[string]interface{})
masterInfo["state"] = b.masterRunner.getState()
masterInfo["workers"] = b.masterRunner.server.getClientsLength()
masterInfo["workers"] = b.masterRunner.server.getAvailableClientsLength()
masterInfo["target_users"] = b.masterRunner.getSpawnCount()
masterInfo["current_users"] = b.masterRunner.server.getCurrentUsers()
return masterInfo
}

View File

@@ -313,11 +313,12 @@ func (c *grpcClient) sendMessage(msg *genericMessage) {
return
}
err := c.config.getBiStreamClient().Send(&messager.StreamRequest{Type: msg.Type, Data: msg.Data, NodeID: msg.NodeID})
switch err {
case nil:
if err == nil {
atomic.StoreInt32(&c.failCount, 0)
default:
//log.Error().Err(err).Interface("genericMessage", *msg).Msg("failed to send message")
return
}
//log.Error().Err(err).Interface("genericMessage", *msg).Msg("failed to send message")
if msg.Type == "heartbeat" {
atomic.AddInt32(&c.failCount, 1)
}
}

View File

@@ -19,8 +19,8 @@ type genericMessage struct {
}
type task struct {
Profile *Profile `json:"profile,omitempty"`
TestCases []byte `json:"testcases,omitempty"`
Profile *Profile `json:"profile,omitempty"`
TestCasesBytes []byte `json:"testcases,omitempty"`
}
func newGenericMessage(t string, data map[string][]byte, nodeID string) (msg *genericMessage) {

View File

@@ -6,6 +6,7 @@ import (
"os"
"sort"
"strconv"
"sync"
"time"
"github.com/google/uuid"
@@ -454,8 +455,8 @@ var (
)
var (
minResponseTimeMap = map[string]float64{}
maxResponseTimeMap = map[string]float64{}
minResponseTimeMap = sync.Map{}
maxResponseTimeMap = sync.Map{}
)
// NewPrometheusPusherOutput returns a PrometheusPusherOutput.
@@ -577,19 +578,19 @@ func (o *PrometheusPusherOutput) OnEvent(data map[string]interface{}) {
}
// every stat in total
key := fmt.Sprintf("%v_%v", method, name)
if _, ok := minResponseTimeMap[key]; !ok {
minResponseTimeMap[key] = float64(stat.MinResponseTime)
} else {
minResponseTimeMap[key] = math.Min(float64(stat.MinResponseTime), minResponseTimeMap[key])
minResponseTime, loaded := minResponseTimeMap.LoadOrStore(key, float64(stat.MinResponseTime))
if loaded {
minResponseTime = math.Min(minResponseTime.(float64), float64(stat.MinResponseTime))
minResponseTimeMap.Store(key, minResponseTime)
}
gaugeTotalMinResponseTime.WithLabelValues(method, name).Set(minResponseTimeMap[key])
gaugeTotalMinResponseTime.WithLabelValues(method, name).Set(minResponseTime.(float64))
if _, ok := maxResponseTimeMap[key]; !ok {
maxResponseTimeMap[key] = float64(stat.MaxResponseTime)
} else {
maxResponseTimeMap[key] = math.Max(float64(stat.MaxResponseTime), maxResponseTimeMap[key])
maxResponseTime, loaded := maxResponseTimeMap.LoadOrStore(key, float64(stat.MaxResponseTime))
if loaded {
maxResponseTime = math.Max(maxResponseTime.(float64), float64(stat.MaxResponseTime))
maxResponseTimeMap.Store(key, maxResponseTime)
}
gaugeTotalMaxResponseTime.WithLabelValues(method, name).Set(maxResponseTimeMap[key])
gaugeTotalMaxResponseTime.WithLabelValues(method, name).Set(maxResponseTime.(float64))
counterTotalNumRequests.WithLabelValues(method, name).Add(float64(stat.NumRequests))
counterTotalNumFailures.WithLabelValues(method, name).Add(float64(stat.NumFailures))
@@ -639,4 +640,7 @@ func resetPrometheusMetrics() {
gaugeTotalFailPerSec.Set(0)
gaugeTransactionsPassed.Set(0)
gaugeTransactionsFailed.Set(0)
minResponseTimeMap = sync.Map{}
maxResponseTimeMap = sync.Map{}
}

View File

@@ -49,10 +49,10 @@ func getStateName(state int32) (stateName string) {
}
const (
reportStatsInterval = 3 * time.Second
heartbeatInterval = 1 * time.Second
heartbeatLiveness = 3 * time.Second
reconnectInterval = 3 * time.Second
reportStatsInterval = 3 * time.Second
heartbeatInterval = 1 * time.Second
heartbeatLiveness = 3 * time.Second
stateMachineInterval = 1 * time.Second
)
type Loop struct {
@@ -86,6 +86,7 @@ type Controller struct {
currentClientsNum int64 // current clients count
spawnCount int64 // target clients to spawn
spawnRate float64
rebalance chan bool // dynamically balance boomer running parameters
spawnDone chan struct{}
tasks []*Task
}
@@ -143,6 +144,12 @@ func (c *Controller) spawnCompete() {
close(c.spawnDone)
}
func (c *Controller) getRebalanceChan() chan bool {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.rebalance
}
func (c *Controller) isFinished() bool {
// return true when workers acquired
return atomic.LoadInt64(&c.currentClientsNum) == atomic.LoadInt64(&c.spawnCount)
@@ -178,6 +185,7 @@ func (c *Controller) reset() {
c.spawnRate = 0
atomic.StoreInt64(&c.currentClientsNum, 0)
c.spawnDone = make(chan struct{})
c.rebalance = make(chan bool)
c.tasks = []*Task{}
c.once = sync.Once{}
}
@@ -200,9 +208,6 @@ type runner struct {
controller *Controller
loop *Loop // specify loop count for testcase, count = loopCount * spawnCount
// dynamically balance boomer running parameters
rebalance chan bool
// stop signals the run goroutine should shutdown.
stopChan chan bool
// all running workers(goroutines) will select on this channel.
@@ -367,7 +372,6 @@ func (r *runner) reportTestResult() {
func (r *runner) reset() {
r.controller.reset()
r.stats.clearAll()
r.rebalance = make(chan bool)
r.stoppingChan = make(chan bool)
r.doneChan = make(chan bool)
r.reportedChan = make(chan bool)
@@ -459,16 +463,18 @@ func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan boo
continue
}
r.controller.once.Do(func() {
// spawning compete
r.controller.spawnCompete()
if spawnCompleteFunc != nil {
spawnCompleteFunc()
}
r.updateState(StateRunning)
})
r.controller.once.Do(
func() {
// spawning compete
r.controller.spawnCompete()
if spawnCompleteFunc != nil {
spawnCompleteFunc()
}
r.updateState(StateRunning)
},
)
<-r.rebalance
<-r.controller.getRebalanceChan()
if r.isStarting() {
// rebalance spawn count
r.controller.setSpawn(r.getSpawnCount(), r.getSpawnRate())
@@ -660,7 +666,7 @@ func (r *localRunner) start() {
r.wgMu.Lock()
r.updateState(StateStopping)
close(r.stoppingChan)
close(r.rebalance)
close(r.controller.rebalance)
r.wgMu.Unlock()
// wait for goroutines before closing
@@ -699,7 +705,8 @@ type workerRunner struct {
masterPort int
client *grpcClient
profile *Profile
profile *Profile
testCasesBytes []byte
tasksChan chan *task
@@ -745,10 +752,10 @@ func (r *workerRunner) onSpawnMessage(msg *genericMessage) {
log.Error().Msg("miss tasks")
}
r.tasksChan <- &task{
Profile: profile,
TestCases: msg.Tasks,
Profile: profile,
TestCasesBytes: msg.Tasks,
}
log.Info().Msg("on spawn message successful")
log.Info().Msg("on spawn message successfully")
}
func (r *workerRunner) onRebalanceMessage(msg *genericMessage) {
@@ -762,7 +769,7 @@ func (r *workerRunner) onRebalanceMessage(msg *genericMessage) {
r.tasksChan <- &task{
Profile: profile,
}
log.Info().Msg("on rebalance message successful")
log.Info().Msg("on rebalance message successfully")
}
// Runner acts as a state machine.
@@ -862,6 +869,9 @@ func (r *workerRunner) run() {
// wait for goroutines before closing
r.wg.Wait()
// notify master that worker is quitting
r.onQuiting()
var ticker = time.NewTicker(1 * time.Second)
if r.client != nil {
// waitting for quit message is sent to master
@@ -910,6 +920,7 @@ func (r *workerRunner) run() {
if !r.isStarting() && !r.isStopping() {
r.updateState(StateMissing)
}
continue
}
CPUUsage := GetCurrentCPUPercent()
MemoryUsage := GetCurrentMemoryPercent()
@@ -951,13 +962,18 @@ func (r *workerRunner) start() {
// block concurrent waitgroup adds in GoAttach while stopping
r.wgMu.Lock()
r.updateState(StateStopping)
close(r.controller.rebalance)
close(r.stoppingChan)
close(r.rebalance)
r.wgMu.Unlock()
// wait for goroutines before closing
r.wg.Wait()
// reset loop
if r.loop != nil {
r.loop = nil
}
close(r.doneChan)
// wait until all stats are reported successfully
@@ -984,7 +1000,6 @@ func (r *workerRunner) stop() {
}
func (r *workerRunner) close() {
r.onQuiting()
close(r.closeChan)
}
@@ -1003,8 +1018,8 @@ type masterRunner struct {
profile *Profile
parseTestCasesChan chan bool
testCaseBytes chan []byte
tcb []byte
testCaseBytesChan chan []byte
testCasesBytes []byte
}
func newMasterRunner(masterBindHost string, masterBindPort int) *masterRunner {
@@ -1021,7 +1036,7 @@ func newMasterRunner(masterBindHost string, masterBindPort int) *masterRunner {
masterBindPort: masterBindPort,
server: newServer(masterBindHost, masterBindPort),
parseTestCasesChan: make(chan bool),
testCaseBytes: make(chan []byte),
testCaseBytesChan: make(chan []byte),
}
}
@@ -1044,23 +1059,15 @@ func (r *masterRunner) heartbeatWorker() {
if !ok {
log.Error().Msg("failed to get worker information")
}
if atomic.LoadInt32(&workerInfo.Heartbeat) < 0 {
if workerInfo.getState() == StateQuitting {
return true
}
if workerInfo.getState() != StateMissing {
workerInfo.setState(StateMissing)
}
if r.isStopping() {
// all running workers missed, setting state to stopped
if r.server.getClientsLength() <= 0 {
r.updateState(StateStopped)
go func() {
if atomic.LoadInt32(&workerInfo.Heartbeat) < 0 {
if workerInfo.getState() != StateMissing {
workerInfo.setState(StateMissing)
}
return true
} else {
atomic.AddInt32(&workerInfo.Heartbeat, -1)
}
} else {
atomic.AddInt32(&workerInfo.Heartbeat, -1)
}
}()
return true
})
case <-reportTicker.C:
@@ -1084,72 +1091,85 @@ func (r *masterRunner) clientListener() {
if !ok {
continue
}
switch msg.Type {
case typeClientReady:
workerInfo.setState(StateInit)
if r.getState() == StateRunning {
log.Warn().Str("worker id", workerInfo.ID).Msg("worker joined, ready to rebalance the load of each worker")
go func() {
switch msg.Type {
case typeClientReady:
workerInfo.setState(StateInit)
case typeClientStopped:
workerInfo.setState(StateStopped)
case typeHeartbeat:
if workerInfo.getState() == StateMissing {
workerInfo.setState(int32(builtin.BytesToInt64(msg.Data["state"])))
}
workerInfo.updateHeartbeat(3)
currentCPUUsage, ok := msg.Data["current_cpu_usage"]
if ok {
workerInfo.updateCPUUsage(builtin.ByteToFloat64(currentCPUUsage))
}
currentPidCpuUsage, ok := msg.Data["current_pid_cpu_usage"]
if ok {
workerInfo.updateWorkerCPUUsage(builtin.ByteToFloat64(currentPidCpuUsage))
}
currentMemoryUsage, ok := msg.Data["current_memory_usage"]
if ok {
workerInfo.updateMemoryUsage(builtin.ByteToFloat64(currentMemoryUsage))
}
currentPidMemoryUsage, ok := msg.Data["current_pid_memory_usage"]
if ok {
workerInfo.updateWorkerMemoryUsage(builtin.ByteToFloat64(currentPidMemoryUsage))
}
currentUsers, ok := msg.Data["current_users"]
if ok {
workerInfo.updateUserCount(builtin.BytesToInt64(currentUsers))
}
case typeSpawning:
workerInfo.setState(StateSpawning)
case typeSpawningComplete:
workerInfo.setState(StateRunning)
case typeQuit:
if workerInfo.getState() == StateQuitting {
break
}
workerInfo.setState(StateQuitting)
case typeException:
// Todo
default:
}
}()
}
}
}
func (r *masterRunner) stateMachine() {
ticker := time.NewTicker(stateMachineInterval)
for {
select {
case <-r.closeChan:
return
case <-ticker.C:
switch r.getState() {
case StateSpawning:
if r.server.getCurrentUsers() == int(r.getSpawnCount()) {
log.Warn().Msg("all workers spawn done, setting state as running")
r.updateState(StateRunning)
}
case StateRunning:
if r.server.getStartingClientsLength() == 0 {
r.updateState(StateStopped)
continue
}
if r.server.getWorkersLengthByState(StateInit) != 0 {
err := r.rebalance()
if err != nil {
log.Error().Err(err).Msg("failed to rebalance")
}
}
case typeClientStopped:
workerInfo.setState(StateStopped)
if r.server.getWorkersLengthByState(StateStopped)+r.server.getWorkersLengthByState(StateInit) == r.server.getClientsLength() {
case StateStopping:
if r.server.getReadyClientsLength() == r.server.getAvailableClientsLength() {
r.updateState(StateStopped)
}
case typeHeartbeat:
if workerInfo.getState() != int32(builtin.BytesToInt64(msg.Data["state"])) {
workerInfo.setState(int32(builtin.BytesToInt64(msg.Data["state"])))
}
workerInfo.updateHeartbeat(3)
currentCPUUsage, ok := msg.Data["current_cpu_usage"]
if ok {
workerInfo.updateCPUUsage(builtin.ByteToFloat64(currentCPUUsage))
}
currentPidCpuUsage, ok := msg.Data["current_pid_cpu_usage"]
if ok {
workerInfo.updateWorkerCPUUsage(builtin.ByteToFloat64(currentPidCpuUsage))
}
currentMemoryUsage, ok := msg.Data["current_memory_usage"]
if ok {
workerInfo.updateMemoryUsage(builtin.ByteToFloat64(currentMemoryUsage))
}
currentPidMemoryUsage, ok := msg.Data["current_pid_memory_usage"]
if ok {
workerInfo.updateWorkerMemoryUsage(builtin.ByteToFloat64(currentPidMemoryUsage))
}
currentUsers, ok := msg.Data["current_users"]
if ok {
workerInfo.updateUserCount(builtin.BytesToInt64(currentUsers))
}
case typeSpawning:
workerInfo.setState(StateSpawning)
case typeSpawningComplete:
workerInfo.setState(StateRunning)
if r.server.getWorkersLengthByState(StateRunning) == r.server.getClientsLength() {
log.Warn().Msg("all workers spawn done, setting state as running")
r.updateState(StateRunning)
}
case typeQuit:
if workerInfo.getState() == StateQuitting {
break
}
workerInfo.setState(StateQuitting)
if r.isStarting() {
if r.server.getClientsLength() > 0 {
log.Warn().Str("worker id", workerInfo.ID).Msg("worker quited, ready to rebalance the load of each worker")
err := r.rebalance()
if err != nil {
log.Error().Err(err).Msg("failed to rebalance")
}
}
}
case typeException:
// Todo
default:
}
}
}
}
@@ -1179,7 +1199,7 @@ func (r *masterRunner) run() {
case <-r.closeChan:
return
case <-ticker.C:
c := r.server.getClientsLength()
c := r.server.getAvailableClientsLength()
log.Info().Msg(fmt.Sprintf("expected worker number: %v, current worker count: %v", r.expectWorkers, c))
if c >= r.expectWorkers {
err = r.start()
@@ -1198,6 +1218,9 @@ func (r *masterRunner) run() {
}()
}
// master state machine
r.goAttach(r.stateMachine)
// listen and deal message from worker
r.goAttach(r.clientListener)
@@ -1207,13 +1230,13 @@ func (r *masterRunner) run() {
}
func (r *masterRunner) start() error {
numWorkers := r.server.getClientsLength()
numWorkers := r.server.getAvailableClientsLength()
if numWorkers == 0 {
return errors.New("current available workers: 0")
}
// fetching testcase
testcase, err := r.fetchTestCase()
// fetching testcases
testCasesBytes, err := r.fetchTestCases()
if err != nil {
return err
}
@@ -1256,19 +1279,19 @@ func (r *masterRunner) start() error {
Type: "spawn",
Profile: ProfileToBytes(workerProfile),
NodeID: workerInfo.ID,
Tasks: testcase,
Tasks: testCasesBytes,
}
cur++
}
return true
})
log.Warn().Interface("profile", r.profile).Msg("send spawn data to worker successful")
log.Warn().Interface("profile", r.profile).Msg("send spawn data to worker successfully")
return nil
}
func (r *masterRunner) rebalance() error {
numWorkers := r.server.getClientsLength()
numWorkers := r.server.getAvailableClientsLength()
if numWorkers == 0 {
return errors.New("current available workers: 0")
}
@@ -1309,7 +1332,7 @@ func (r *masterRunner) rebalance() error {
Type: "spawn",
Profile: ProfileToBytes(workerProfile),
NodeID: workerInfo.ID,
Tasks: r.tcb,
Tasks: r.testCasesBytes,
}
} else {
workerInfo.getStream() <- &messager.StreamResponse{
@@ -1323,22 +1346,22 @@ func (r *masterRunner) rebalance() error {
return true
})
log.Warn().Msg("send rebalance data to worker successful")
log.Warn().Msg("send rebalance data to worker successfully")
return nil
}
func (r *masterRunner) fetchTestCase() ([]byte, error) {
func (r *masterRunner) fetchTestCases() ([]byte, error) {
ticker := time.NewTicker(30 * time.Second)
if len(r.testCaseBytes) > 0 {
<-r.testCaseBytes
if len(r.testCaseBytesChan) > 0 {
<-r.testCaseBytesChan
}
r.parseTestCasesChan <- true
select {
case <-ticker.C:
return nil, errors.New("parse testcases timeout")
case tcb := <-r.testCaseBytes:
r.tcb = tcb
return tcb, nil
case testCasesBytes := <-r.testCaseBytesChan:
r.testCasesBytes = testCasesBytes
return testCasesBytes, nil
}
}
@@ -1369,22 +1392,23 @@ func (r *masterRunner) close() {
func (r *masterRunner) reportStats() {
currentTime := time.Now()
println()
println("========================= HttpRunner Master for Distributed Load Testing ========================= ")
println(fmt.Sprintf("Current time: %s, State: %v, Current Available Workers: %v, Target Users: %v",
currentTime.Format("2006/01/02 15:04:05"), getStateName(r.getState()), r.server.getClientsLength(), r.getSpawnCount()))
println("==================================== HttpRunner Master for Distributed Load Testing ==================================== ")
println(fmt.Sprintf("Current time: %s, State: %v, Current Available Workers: %v, Target Users: %v, Current Users: %v",
currentTime.Format("2006/01/02 15:04:05"), getStateName(r.getState()), r.server.getAvailableClientsLength(), r.getSpawnCount(), r.server.getCurrentUsers()))
table := tablewriter.NewWriter(os.Stdout)
table.SetColMinWidth(0, 20)
table.SetColMinWidth(0, 40)
table.SetColMinWidth(1, 10)
table.SetColMinWidth(2, 10)
table.SetHeader([]string{"Worker ID", "IP", "State", "Current Users", "CPU Usage (%)", "Memory Usage (%)"})
for _, worker := range r.server.getAllWorkers() {
row := make([]string, 6)
row[0] = worker.ID
row[1] = worker.IP
row[2] = fmt.Sprintf("%v", getStateName(worker.getState()))
row[3] = fmt.Sprintf("%v", worker.getUserCount())
row[4] = fmt.Sprintf("%.2f", worker.getCPUUsage())
row[5] = fmt.Sprintf("%.2f", worker.getMemoryUsage())
row[2] = fmt.Sprintf("%v", getStateName(worker.State))
row[3] = fmt.Sprintf("%v", worker.UserCount)
row[4] = fmt.Sprintf("%.2f", worker.CPUUsage)
row[5] = fmt.Sprintf("%.2f", worker.MemoryUsage)
table.Append(row)
}
table.Render()

View File

@@ -338,6 +338,7 @@ func TestOnQuitMessage(t *testing.T) {
go runner.onMessage(newGenericMessage("quit", nil, runner.nodeID))
close(runner.doneChan)
<-runner.closeChan
runner.onQuiting()
if runner.getState() != StateQuitting {
t.Error("Runner's state should be StateQuitting")
}
@@ -348,6 +349,7 @@ func TestOnQuitMessage(t *testing.T) {
runner.client.shutdownChan = make(chan bool)
runner.onMessage(newGenericMessage("quit", nil, runner.nodeID))
<-runner.closeChan
runner.onQuiting()
if runner.getState() != StateQuitting {
t.Error("Runner's state should be StateQuitting")
}
@@ -395,7 +397,7 @@ func TestOnMessage(t *testing.T) {
// increase goroutines while running
runner.onMessage(newMessageToWorker("rebalance", ProfileToBytes(&Profile{SpawnCount: 15, SpawnRate: 15}), nil, nil))
runner.rebalance <- true
runner.controller.rebalance <- true
time.Sleep(2 * time.Second)
if runner.getState() != StateRunning {
@@ -460,6 +462,7 @@ func TestClientListener(t *testing.T) {
runner.updateState(StateInit)
runner.setSpawnCount(10)
runner.setSpawnRate(10)
go runner.stateMachine()
go runner.clientListener()
runner.server.clients.Store("testID1", &WorkerNode{ID: "testID1", Heartbeat: 3, stream: make(chan *messager.StreamResponse, 10)})
runner.server.clients.Store("testID2", &WorkerNode{ID: "testID2", Heartbeat: 3, stream: make(chan *messager.StreamResponse, 10)})
@@ -483,6 +486,7 @@ func TestClientListener(t *testing.T) {
Type: typeClientStopped,
NodeID: "testID2",
}
runner.updateState(StateRunning)
worker2, ok := runner.server.getClients().Load("testID2")
if !ok {
t.Fatal("error")
@@ -515,7 +519,7 @@ func TestHeartbeatWorker(t *testing.T) {
runner.server.clients.Store("testID2", &WorkerNode{ID: "testID2", Heartbeat: 1, State: StateInit, stream: make(chan *messager.StreamResponse, 10)})
go runner.clientListener()
go runner.heartbeatWorker()
time.Sleep(3 * time.Second)
time.Sleep(4 * time.Second)
worker1, ok := runner.server.getClients().Load("testID1")
if !ok {
t.Fatal()
@@ -525,7 +529,7 @@ func TestHeartbeatWorker(t *testing.T) {
t.Fatal()
}
if workerInfo1.getState() != StateMissing {
t.Error("expected state of worker runner is missing, but got", workerInfo1.getState())
t.Error("expected state of worker runner is missing, but got", getStateName(workerInfo1.getState()))
}
runner.server.recvChannel() <- &genericMessage{
Type: typeHeartbeat,

View File

@@ -53,6 +53,24 @@ func (w *WorkerNode) setState(state int32) {
atomic.StoreInt32(&w.State, state)
}
func (w *WorkerNode) isStarting() bool {
return w.getState() == StateRunning || w.getState() == StateSpawning
}
func (w *WorkerNode) isStopping() bool {
return w.getState() == StateStopping
}
func (w *WorkerNode) isAvailable() bool {
state := w.getState()
return state != StateMissing && state != StateQuitting
}
func (w *WorkerNode) isReady() bool {
state := w.getState()
return state == StateInit || state == StateStopped
}
func (w *WorkerNode) updateHeartbeat(heartbeat int32) {
atomic.StoreInt32(&w.Heartbeat, heartbeat)
}
@@ -130,8 +148,8 @@ func (w *WorkerNode) getMemoryUsage() float64 {
}
func (w *WorkerNode) setStream(stream chan *messager.StreamResponse) {
w.mutex.RLock()
defer w.mutex.RUnlock()
w.mutex.Lock()
defer w.mutex.Unlock()
w.stream = stream
}
@@ -302,26 +320,19 @@ func (s *grpcServer) Register(ctx context.Context, req *messager.RegisterRequest
wn := newWorkerNode(req.NodeID, clientIp, req.Os, req.Arch)
s.clients.Store(req.NodeID, wn)
log.Warn().Str("worker id", req.NodeID).Msg("worker joined")
return &messager.RegisterResponse{Code: "0", Message: "register successfully"}, nil
return &messager.RegisterResponse{Code: "0", Message: "register successful"}, nil
}
func (s *grpcServer) SignOut(_ context.Context, req *messager.SignOutRequest) (*messager.SignOutResponse, error) {
// delete worker information
s.clients.Delete(req.NodeID)
log.Warn().Str("worker id", req.NodeID).Msg("worker quited")
return &messager.SignOutResponse{Code: "0", Message: "sign out successfully"}, nil
return &messager.SignOutResponse{Code: "0", Message: "sign out successful"}, nil
}
func (s *grpcServer) valid(token string) (isValid bool) {
s.clients.Range(func(key, value interface{}) bool {
if workerInfo, ok := value.(*WorkerNode); ok {
if workerInfo.ID == token {
isValid = true
}
}
return true
})
return
func (s *grpcServer) validClientToken(token string) bool {
_, ok := s.clients.Load(token)
return ok
}
func (s *grpcServer) BidirectionalStreamingMessage(srv messager.Message_BidirectionalStreamingMessageServer) error {
@@ -332,7 +343,7 @@ func (s *grpcServer) BidirectionalStreamingMessage(srv messager.Message_Bidirect
return status.Error(codes.Unauthenticated, "missing token header")
}
ok = s.valid(token)
ok = s.validClientToken(token)
if !ok {
return status.Error(codes.Unauthenticated, "invalid token")
}
@@ -404,7 +415,7 @@ func (s *grpcServer) sendMsg(srv messager.Message_BidirectionalStreamingMessageS
func (s *grpcServer) sendBroadcasts(msg *genericMessage) {
s.clients.Range(func(key, value interface{}) bool {
if workerInfo, ok := value.(*WorkerNode); ok {
if workerInfo.getState() == StateQuitting || workerInfo.getState() == StateMissing {
if !workerInfo.isAvailable() {
return true
}
workerInfo.getStream() <- &messager.StreamResponse{
@@ -517,10 +528,10 @@ func (s *grpcServer) getClients() *sync.Map {
return s.clients
}
func (s *grpcServer) getClientsLength() (l int) {
func (s *grpcServer) getAvailableClientsLength() (l int) {
s.clients.Range(func(key, value interface{}) bool {
if workerInfo, ok := value.(*WorkerNode); ok {
if workerInfo.getState() != StateQuitting && workerInfo.getState() != StateMissing {
if workerInfo.isAvailable() {
l++
}
}
@@ -528,3 +539,39 @@ func (s *grpcServer) getClientsLength() (l int) {
})
return
}
func (s *grpcServer) getReadyClientsLength() (l int) {
s.clients.Range(func(key, value interface{}) bool {
if workerInfo, ok := value.(*WorkerNode); ok {
if workerInfo.isReady() {
l++
}
}
return true
})
return
}
func (s *grpcServer) getStartingClientsLength() (l int) {
s.clients.Range(func(key, value interface{}) bool {
if workerInfo, ok := value.(*WorkerNode); ok {
if workerInfo.isStarting() {
l++
}
}
return true
})
return
}
func (s *grpcServer) getCurrentUsers() (l int) {
s.clients.Range(func(key, value interface{}) bool {
if workerInfo, ok := value.(*WorkerNode); ok {
if workerInfo.isStarting() {
l += int(workerInfo.getUserCount())
}
}
return true
})
return
}

72
hrp/internal/dial/curl.go Normal file
View File

@@ -0,0 +1,72 @@
package dial
import (
"bytes"
"fmt"
"os"
"os/exec"
"path/filepath"
"time"
"github.com/rs/zerolog/log"
"github.com/httprunner/httprunner/v4/hrp/internal/builtin"
)
const (
normalResult = "STDOUT"
errorResult = "STDERR"
failedResult = "FAILED"
)
type CurlResult struct {
Result string `json:"result"`
ErrorMsg string `json:"errorMsg"`
ResultType string `json:"resultType"`
}
func DoCurl(args []string) (err error) {
var saveTests bool
for i, arg := range args {
if arg == "--save-tests" {
args = append(args[:i], args[i+1:]...)
saveTests = true
}
}
var curlResult CurlResult
defer func() {
if saveTests {
dir, _ := os.Getwd()
curlResultName := fmt.Sprintf("curl_result_%v.json", time.Now().Format("20060102150405"))
curlResultPath := filepath.Join(dir, curlResultName)
err = builtin.Dump2JSON(curlResult, curlResultPath)
if err != nil {
log.Error().Err(err).Msg("save dns resolution result failed")
}
}
}()
cmd := exec.Command("curl", args...)
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
err = cmd.Run()
if err != nil {
log.Error().Err(err).Msgf("fail to run curl command")
curlResult.ErrorMsg = err.Error()
curlResult.Result = stderr.String()
curlResult.ResultType = errorResult
return
}
if stdout.String() != "" {
fmt.Printf(stdout.String())
curlResult.Result = stdout.String()
curlResult.ResultType = normalResult
} else if stderr.String() != "" {
fmt.Printf(stderr.String())
curlResult.ErrorMsg = stderr.String()
curlResult.ResultType = errorResult
}
return
}

251
hrp/internal/dial/dns.go Normal file
View File

@@ -0,0 +1,251 @@
package dial
import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/miekg/dns"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/httprunner/httprunner/v4/hrp/internal/builtin"
)
const (
httpDnsUrl = "https://dig.bdurl.net/q"
googleDnsUrl = "https://dns.google/resolve"
)
const (
DnsSourceTypeLocal = iota
DnsSourceTypeHttp
DnsSourceTypeGoogle
)
const (
DnsRecordTypeA = 1
DnsRecordTypeAAAA = 28
DnsRecordTypeCNAME = 5
)
var dnsHttpClient = &http.Client{
Timeout: 5 * time.Minute,
}
type DnsOptions struct {
DnsSourceType int
DnsRecordType int
DnsServer string
SaveTests bool
}
type DnsResult struct {
DnsList []string `json:"dnsList"`
DnsSource int `json:"dnsType"`
DnsRecordType int `json:"dnsRecordType"`
DnsServer string `json:"dnsServer,omitempty"`
Ttl int `json:"ttl"`
Suc bool `json:"suc"`
ErrMsg string `json:"errMsg"`
}
type googleDnsResp struct {
Answer []googleDnsAnswer `json:"Answer"`
}
type httpDnsResp struct {
Ips []string `json:"ips"`
Ttl int `json:"ttl"`
}
type googleDnsAnswer struct {
Name string `json:"name"`
Type int `json:"type"`
TTL int `json:"TTL"`
Data string `json:"data"`
}
func ParseIP(s string) (net.IP, int) {
ip := net.ParseIP(s)
if ip == nil {
return nil, 0
}
for i := 0; i < len(s); i++ {
switch s[i] {
case '.':
return ip, 4
case ':':
return ip, 6
}
}
return nil, 0
}
func localDns(src string, dnsRecordType int, dnsServer string) (dnsResult DnsResult, err error) {
dnsResult.DnsSource = DnsSourceTypeLocal
dnsResult.DnsRecordType = dnsRecordType
if dnsServer == "" {
config, _ := dns.ClientConfigFromFile("/etc/resolv.conf")
dnsServer = config.Servers[0]
} else {
dnsResult.DnsServer = dnsServer
}
_, ipType := ParseIP(dnsServer)
if ipType == 4 {
dnsServer += ":53"
}
c := dns.Client{
Timeout: 5 * time.Second,
}
m := dns.Msg{}
m.SetQuestion(src+".", uint16(dnsRecordType))
r, _, err := c.Exchange(&m, dnsServer)
if err != nil {
return
}
for _, ans := range r.Answer {
switch dnsRecordType {
case DnsRecordTypeA:
record, isType := ans.(*dns.A)
if isType {
dnsResult.Ttl = int(record.Hdr.Ttl)
dnsResult.DnsList = append(dnsResult.DnsList, record.A.String())
}
case DnsRecordTypeAAAA:
record, isType := ans.(*dns.AAAA)
if isType {
dnsResult.Ttl = int(record.Hdr.Ttl)
dnsResult.DnsList = append(dnsResult.DnsList, record.AAAA.String())
}
case DnsRecordTypeCNAME:
record, isType := ans.(*dns.CNAME)
if isType {
dnsResult.Ttl = int(record.Hdr.Ttl)
dnsResult.DnsList = append(dnsResult.DnsList, record.Target)
}
}
}
return
}
func httpDns(url string, dnsRecordType int) (dnsResult DnsResult, err error) {
target := httpDnsUrl + "?host=" + url
if dnsRecordType == DnsRecordTypeAAAA {
target += "&aid=13&f=2"
}
resp, err := dnsHttpClient.Get(target)
dnsResult.DnsSource = DnsSourceTypeHttp
dnsResult.DnsRecordType = dnsRecordType
if err != nil {
return
}
defer resp.Body.Close()
var buf []byte
buf, err = ioutil.ReadAll(resp.Body)
if err != nil {
return
}
var result httpDnsResp
err = json.Unmarshal(buf, &result)
if err != nil {
return
}
dnsResult.DnsList = result.Ips
dnsResult.Ttl = result.Ttl
return
}
func googleDns(url string, dnsRecordType int) (dnsResult DnsResult, err error) {
resp, err := dnsHttpClient.Get(googleDnsUrl + "?name=" + url + "&type=" + strconv.Itoa(dnsRecordType))
dnsResult.DnsSource = DnsSourceTypeGoogle
dnsResult.DnsRecordType = dnsRecordType
if err != nil {
return
}
defer resp.Body.Close()
var buf []byte
buf, err = ioutil.ReadAll(resp.Body)
if err != nil {
return
}
var result googleDnsResp
err = json.Unmarshal(buf, &result)
if err != nil {
return
}
if len(result.Answer) == 0 {
return
}
for _, answer := range result.Answer {
if answer.Type == dnsRecordType {
dnsResult.Ttl = answer.TTL
dnsResult.DnsList = append(dnsResult.DnsList, answer.Data)
}
}
return
}
func DoDns(dnsOptions *DnsOptions, args []string) (err error) {
if len(args) != 1 {
return errors.New("there should be one argument")
}
var dnsResult DnsResult
defer func() {
if dnsOptions.SaveTests {
dir, _ := os.Getwd()
dnsResultName := fmt.Sprintf("dns_result_%v.json", time.Now().Format("20060102150405"))
dnsResultPath := filepath.Join(dir, dnsResultName)
err = builtin.Dump2JSON(dnsResult, dnsResultPath)
if err != nil {
log.Error().Err(err).Msg("save dns resolution result failed")
}
}
}()
dnsTarget := args[0]
parsedURL, err := url.Parse(dnsTarget)
if err == nil && parsedURL.Host != "" {
log.Info().Msgf("parse input url %v and extract host %v", dnsTarget, parsedURL.Host)
dnsTarget = strings.Split(parsedURL.Host, ":")[0]
}
log.Info().Msgf("resolve DNS for %v", dnsTarget)
dnsRecordType := dnsOptions.DnsRecordType
dnsServer := dnsOptions.DnsServer
switch dnsOptions.DnsSourceType {
case DnsSourceTypeLocal:
dnsResult, err = localDns(dnsTarget, dnsRecordType, dnsServer)
case DnsSourceTypeHttp:
dnsResult, err = httpDns(dnsTarget, dnsRecordType)
case DnsSourceTypeGoogle:
dnsResult, err = googleDns(dnsTarget, dnsRecordType)
}
if err != nil {
dnsResult.Suc = false
dnsResult.ErrMsg = err.Error()
log.Error().Err(err).Msgf("fail to do DNS for %s", dnsTarget)
} else {
dnsResult.Suc = true
dnsResult.ErrMsg = ""
fmt.Printf("\nDNS resolution done, result IP list: %v\n", dnsResult.DnsList)
}
return
}

116
hrp/internal/dial/ping.go Normal file
View File

@@ -0,0 +1,116 @@
package dial
import (
"fmt"
"net/url"
"os"
"path/filepath"
"strings"
"time"
"github.com/go-ping/ping"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/httprunner/httprunner/v4/hrp/internal/builtin"
)
type PingOptions struct {
Count int
Timeout time.Duration
Interval time.Duration
SaveTests bool
}
type PingResult struct {
Suc bool `json:"suc"`
ErrMsg string `json:"errMsg"`
Ip string `json:"ip"`
AvgCost int `json:"avgCost"`
MaxCost int `json:"maxCost"`
MinCost int `json:"minCost"`
Lost int `json:"lost"`
PingCount int `json:"pingCount"`
PacketSize int `json:"packetSize"`
ReceivePacketCount int `json:"receivePacketCount"`
SendPacketCount int `json:"sendPacketCount"`
SuccessCount int `json:"successCount"`
DebugLog string `json:"debugLog"`
}
func DoPing(pingOptions *PingOptions, args []string) (err error) {
if len(args) != 1 {
return errors.New("there should be one argument")
}
var pingResult PingResult
defer func() {
if pingOptions.SaveTests {
dir, _ := os.Getwd()
pingResultName := fmt.Sprintf("ping_result_%v.json", time.Now().Format("20060102150405"))
pingResultPath := filepath.Join(dir, pingResultName)
err = builtin.Dump2JSON(pingResult, pingResultPath)
if err != nil {
log.Error().Err(err).Msg("save ping result failed")
}
}
}()
pingTarget := args[0]
parsedURL, err := url.Parse(pingTarget)
if err == nil && parsedURL.Host != "" {
log.Info().Msgf("parse input url %v and extract host %v", pingTarget, parsedURL.Host)
pingTarget = strings.Split(parsedURL.Host, ":")[0]
}
log.Info().Msgf("ping host %v", pingTarget)
pinger, err := ping.NewPinger(pingTarget)
if err != nil {
log.Error().Err(err).Msgf("fail to get pinger for %s", pingTarget)
pingResult.Suc = false
pingResult.ErrMsg = err.Error()
pingResult.DebugLog = err.Error()
return
}
pinger.Count = pingOptions.Count
pinger.Timeout = pingOptions.Timeout
pinger.Interval = pingOptions.Interval
pinger.OnRecv = func(pkt *ping.Packet) {
pingResult.DebugLog += fmt.Sprintf("%d bytes from %s: icmp_seq=%d time=%v\n",
pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt)
}
pinger.OnFinish = func(stats *ping.Statistics) {
pingResult.DebugLog += fmt.Sprintf("\n--- %s ping statistics ---\n", stats.Addr)
pingResult.DebugLog += fmt.Sprintf("%d packets transmitted, %d packets received, %v%% packet loss\n",
stats.PacketsSent, stats.PacketsRecv, stats.PacketLoss)
pingResult.DebugLog += fmt.Sprintf("round-trip min/avg/max/stddev = %v/%v/%v/%v\n",
stats.MinRtt, stats.AvgRtt, stats.MaxRtt, stats.StdDevRtt)
}
pingResult.DebugLog += fmt.Sprintf("PING %s (%s):\n", pinger.Addr(), pinger.IPAddr())
err = pinger.Run() // blocks until finished
if err != nil {
log.Error().Err(err).Msgf("fail to run ping for %s", parsedURL)
pingResult.Suc = false
pingResult.ErrMsg = err.Error()
pingResult.DebugLog = err.Error()
return
}
fmt.Print(pingResult.DebugLog)
stats := pinger.Statistics() // get send/receive/rtt stats
pingResult.Ip = pinger.IPAddr().String()
pingResult.AvgCost = int(stats.AvgRtt / time.Millisecond)
pingResult.MaxCost = int(stats.MaxRtt / time.Millisecond)
pingResult.MinCost = int(stats.MinRtt / time.Millisecond)
pingResult.Lost = int(stats.PacketLoss)
pingResult.PingCount = pingOptions.Count
pingResult.PacketSize = pinger.Size
pingResult.ReceivePacketCount = stats.PacketsRecv
pingResult.SendPacketCount = stats.PacketsSent
pingResult.SuccessCount = stats.PacketsRecv
pingResult.Suc = true
pingResult.ErrMsg = ""
return
}

View File

@@ -0,0 +1,20 @@
package dial
type TraceRouteOptions struct {
MaxTTL int
Queries int
SaveTests bool
}
type TraceRouteResult struct {
IP string `json:"ip"`
Details []TraceRouteResultNode `json:"details"`
Suc bool `json:"suc"`
ErrMsg string `json:"errMsg"`
}
type TraceRouteResultNode struct {
Id int `json:"id"`
Ip string `json:"ip"`
Time string `json:"time"`
}

View File

@@ -0,0 +1,106 @@
//go:build darwin || linux
// +build darwin linux
package dial
import (
"bufio"
"fmt"
"net/url"
"os"
"os/exec"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/httprunner/httprunner/v4/hrp/internal/builtin"
)
var (
regexIPAddr = regexp.MustCompile(`([\d.]+)`)
regexElapsedTime = regexp.MustCompile(`(\d+\.\d+)`)
regexTraceroutePass = regexp.MustCompile(fmt.Sprintf(`(\d+)[\s*]+(\S+)\s+\(%s\)\s+%s\s+ms`, regexIPAddr, regexElapsedTime))
regexTracerouteFailure = regexp.MustCompile(`(\d+)[\s*]+$`)
)
func DoTraceRoute(traceRouteOptions *TraceRouteOptions, args []string) (err error) {
if len(args) != 1 {
return errors.New("there should be one argument")
}
var traceRouteResult TraceRouteResult
defer func() {
if traceRouteOptions.SaveTests {
dir, _ := os.Getwd()
traceRouteResultName := fmt.Sprintf("traceroute_result_%v.json", time.Now().Format("20060102150405"))
traceRouteResultPath := filepath.Join(dir, traceRouteResultName)
err = builtin.Dump2JSON(traceRouteResult, traceRouteResultPath)
if err != nil {
log.Error().Err(err).Msg("save traceroute result failed")
}
}
}()
traceRouteTarget := args[0]
parsedURL, err := url.Parse(traceRouteTarget)
if err == nil && parsedURL.Host != "" {
log.Info().Msgf("parse input url %v and extract host %v", traceRouteTarget, parsedURL.Host)
traceRouteTarget = strings.Split(parsedURL.Host, ":")[0]
}
cmd := exec.Command("traceroute", "-m", strconv.Itoa(traceRouteOptions.MaxTTL),
"-q", strconv.Itoa(traceRouteOptions.Queries), traceRouteTarget)
stdout, _ := cmd.StdoutPipe()
startT := time.Now()
defer func() {
log.Info().Msgf("for target %s, traceroute costs %v", traceRouteTarget, time.Since(startT))
}()
log.Info().Msgf("start to traceroute %v", traceRouteTarget)
err = cmd.Start()
if err != nil {
traceRouteResult.Suc = false
traceRouteResult.ErrMsg = "execute traceroute failed"
log.Error().Err(err).Msg("start command failed")
return
}
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
hopLine := scanner.Text()
fmt.Println(hopLine)
failureLine := regexTracerouteFailure.FindStringSubmatch(hopLine)
if len(failureLine) == 2 {
hopID, _ := strconv.Atoi(failureLine[1])
traceRouteResult.Details = append(traceRouteResult.Details, TraceRouteResultNode{
Id: hopID,
})
continue
}
passLine := regexTraceroutePass.FindStringSubmatch(hopLine)
if len(passLine) == 5 {
hopID, _ := strconv.Atoi(passLine[1])
traceRouteResult.Details = append(traceRouteResult.Details, TraceRouteResultNode{
Id: hopID,
Ip: passLine[3],
Time: passLine[4],
})
traceRouteResult.Suc = true
}
}
hopCount := len(traceRouteResult.Details)
traceRouteResult.IP = traceRouteResult.Details[hopCount-1].Ip
err = cmd.Wait()
if err != nil {
traceRouteResult.Suc = false
traceRouteResult.ErrMsg = "wait traceroute finish failed"
log.Error().Err(err).Msg("wait command failed")
return
}
return
}

View File

@@ -0,0 +1,105 @@
//go:build windows
// +build windows
package dial
import (
"bufio"
"fmt"
"net/url"
"os"
"os/exec"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/httprunner/httprunner/v4/hrp/internal/builtin"
)
var (
regexTracertPass = regexp.MustCompile(`(\d+)[\s*<]+(\d+)\s+ms`)
regexTracertFailure = regexp.MustCompile(`(\d+)[\s*]+Request timed out`)
)
func DoTraceRoute(traceRouteOptions *TraceRouteOptions, args []string) (err error) {
if len(args) != 1 {
return errors.New("there should be one argument")
}
var traceRouteResult TraceRouteResult
defer func() {
if traceRouteOptions.SaveTests {
dir, _ := os.Getwd()
traceRouteResultName := fmt.Sprintf("traceroute_result_%v.json", time.Now().Format("20060102150405"))
traceRouteResultPath := filepath.Join(dir, traceRouteResultName)
err = builtin.Dump2JSON(traceRouteResult, traceRouteResultPath)
if err != nil {
log.Error().Err(err).Msg("save traceroute result failed")
}
}
}()
traceRouteTarget := args[0]
parsedURL, err := url.Parse(traceRouteTarget)
if err == nil && parsedURL.Host != "" {
log.Info().Msgf("parse input url %v and extract host %v", traceRouteTarget, parsedURL.Host)
traceRouteTarget = strings.Split(parsedURL.Host, ":")[0]
}
cmd := exec.Command("tracert", "-h", strconv.Itoa(traceRouteOptions.MaxTTL), traceRouteTarget)
stdout, _ := cmd.StdoutPipe()
startT := time.Now()
defer func() {
log.Info().Msgf("for target %s, traceroute costs %v", traceRouteTarget, time.Since(startT))
}()
log.Info().Msgf("start to traceroute %v", traceRouteTarget)
err = cmd.Start()
if err != nil {
traceRouteResult.Suc = false
traceRouteResult.ErrMsg = "execute traceroute failed"
log.Error().Err(err).Msg("start command failed")
return
}
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
hopLine := scanner.Text()
fmt.Println(hopLine)
failureLine := regexTracertFailure.FindStringSubmatch(hopLine)
if len(failureLine) == 2 {
hopID, _ := strconv.Atoi(failureLine[1])
traceRouteResult.Details = append(traceRouteResult.Details, TraceRouteResultNode{
Id: hopID,
})
continue
}
passLine := regexTracertPass.FindStringSubmatch(hopLine)
if len(passLine) == 3 {
hopID, _ := strconv.Atoi(passLine[1])
fields := strings.Fields(hopLine)
hopIP := strings.Trim(fields[len(fields)-1], "[]")
traceRouteResult.Details = append(traceRouteResult.Details, TraceRouteResultNode{
Id: hopID,
Ip: hopIP,
Time: passLine[2],
})
traceRouteResult.Suc = true
}
}
hopCount := len(traceRouteResult.Details)
traceRouteResult.IP = traceRouteResult.Details[hopCount-1].Ip
err = cmd.Wait()
if err != nil {
traceRouteResult.Suc = false
traceRouteResult.ErrMsg = "wait traceroute finish failed"
log.Error().Err(err).Msg("wait command failed")
return
}
return
}