refactor: move websocket related logic to step_websocket

This commit is contained in:
lilong.129
2024-08-20 16:14:11 +08:00
parent b9171ca772
commit b43bda4ba3
3 changed files with 44 additions and 63 deletions

View File

@@ -418,11 +418,7 @@ func (r *CaseRunner) NewSession() *SessionRunner {
sessionVariables: make(map[string]interface{}), sessionVariables: make(map[string]interface{}),
summary: newSummary(), summary: newSummary(),
transactions: make(map[string]map[transactionType]time.Time), transactions: make(map[string]map[transactionType]time.Time),
wsConnMap: make(map[string]*websocket.Conn),
inheritWsConnMap: make(map[string]*websocket.Conn),
pongResponseChan: make(chan string, 1),
closeResponseChan: make(chan *wsCloseRespObject, 1),
} }
return sessionRunner return sessionRunner
} }
@@ -438,22 +434,6 @@ type SessionRunner struct {
// transactions stores transaction timing info. // transactions stores transaction timing info.
// key is transaction name, value is map of transaction type and time, e.g. start time and end time. // key is transaction name, value is map of transaction type and time, e.g. start time and end time.
transactions map[string]map[transactionType]time.Time transactions map[string]map[transactionType]time.Time
wsConnMap map[string]*websocket.Conn // save all websocket connections
inheritWsConnMap map[string]*websocket.Conn // inherit all websocket connections
pongResponseChan chan string // channel used to receive pong response message
closeResponseChan chan *wsCloseRespObject // channel used to receive close response message
}
func (r *SessionRunner) inheritConnection(src *SessionRunner) {
log.Info().Msg("inherit session runner")
r.inheritWsConnMap = make(map[string]*websocket.Conn, len(src.wsConnMap)+len(src.inheritWsConnMap))
for k, v := range src.wsConnMap {
r.inheritWsConnMap[k] = v
}
for k, v := range src.inheritWsConnMap {
r.inheritWsConnMap[k] = v
}
} }
// Start runs the test steps in sequential order. // Start runs the test steps in sequential order.
@@ -468,11 +448,6 @@ func (r *SessionRunner) Start(givenVars map[string]interface{}) error {
// update config variables with given variables // update config variables with given variables
r.InitWithParameters(givenVars) r.InitWithParameters(givenVars)
defer func() {
// close session resource after all steps done or fast fail
r.releaseResources()
}()
// run step in sequential order // run step in sequential order
for _, step := range r.caseRunner.TestSteps { for _, step := range r.caseRunner.TestSteps {
select { select {
@@ -675,29 +650,3 @@ func (r *SessionRunner) addSingleStepResult(stepResult *StepResult) {
r.summary.Success = false r.summary.Success = false
} }
} }
// releaseResources releases resources used by session runner
func (r *SessionRunner) releaseResources() {
// close websocket connections
for _, wsConn := range r.wsConnMap {
if wsConn != nil {
log.Info().Str("testcase", r.caseRunner.Config.Name).Msg("websocket disconnected")
err := wsConn.Close()
if err != nil {
log.Error().Err(err).Msg("websocket disconnection failed")
}
}
}
}
func (r *SessionRunner) getWsClient(url string) *websocket.Conn {
if client, ok := r.wsConnMap[url]; ok {
return client
}
if client, ok := r.inheritWsConnMap[url]; ok {
return client
}
return nil
}

View File

@@ -89,8 +89,6 @@ func (s *StepTestCaseWithOptionalArgs) Run(r *SessionRunner) (stepResult *StepRe
return stepResult, err return stepResult, err
} }
sessionRunner := caseRunner.NewSession() sessionRunner := caseRunner.NewSession()
// need to inherit some information from current session
sessionRunner.inheritConnection(r)
start := time.Now() start := time.Now()
// run referenced testcase with step variables // run referenced testcase with step variables

View File

@@ -16,6 +16,12 @@ import (
"github.com/httprunner/httprunner/v4/hrp/internal/json" "github.com/httprunner/httprunner/v4/hrp/internal/json"
) )
var (
wsConnMap map[string]*websocket.Conn // save all websocket connections
pongResponseChan chan string // channel used to receive pong response message
closeResponseChan chan *wsCloseRespObject // channel used to receive close response message
)
const ( const (
wsOpen ActionType = "open" wsOpen ActionType = "open"
wsPing ActionType = "ping" wsPing ActionType = "ping"
@@ -314,7 +320,7 @@ func runStepWebSocket(r *SessionRunner, step *TStep) (stepResult *StepResult, er
case wsOpen: case wsOpen:
log.Info().Int64("timeout(ms)", step.WebSocket.GetTimeout()).Str("url", parsedURL).Msg("open websocket connection") log.Info().Int64("timeout(ms)", step.WebSocket.GetTimeout()).Str("url", parsedURL).Msg("open websocket connection")
// use the current websocket connection if existed // use the current websocket connection if existed
if r.getWsClient(parsedURL) != nil { if getWsClient(parsedURL) != nil {
break break
} }
resp, err = openWithTimeout(parsedURL, parsedHeader, r, step) resp, err = openWithTimeout(parsedURL, parsedHeader, r, step)
@@ -334,7 +340,7 @@ func runStepWebSocket(r *SessionRunner, step *TStep) (stepResult *StepResult, er
case <-timer.C: case <-timer.C:
timer.Stop() timer.Stop()
log.Warn().Msg("pong timeout") log.Warn().Msg("pong timeout")
case pongResponse := <-r.pongResponseChan: case pongResponse := <-pongResponseChan:
resp = pongResponse resp = pongResponse
log.Info().Msg("pong message arrives") log.Info().Msg("pong message arrives")
} }
@@ -426,6 +432,20 @@ func runStepWebSocket(r *SessionRunner, step *TStep) (stepResult *StepResult, er
return stepResult, nil return stepResult, nil
} }
func getWsClient(url string) *websocket.Conn {
if wsConnMap == nil {
wsConnMap = make(map[string]*websocket.Conn)
pongResponseChan = make(chan string, 1)
closeResponseChan = make(chan *wsCloseRespObject, 1)
}
if client, ok := wsConnMap[url]; ok {
return client
}
return nil
}
func printWebSocketResponse(resp interface{}) error { func printWebSocketResponse(resp interface{}) error {
if resp == nil { if resp == nil {
fmt.Println("(response body is empty in this step)") fmt.Println("(response body is empty in this step)")
@@ -470,14 +490,14 @@ func openWithTimeout(urlStr string, requestHeader http.Header, r *SessionRunner,
// the following handlers handle and transport control message from server // the following handlers handle and transport control message from server
conn.SetPongHandler(func(appData string) error { conn.SetPongHandler(func(appData string) error {
r.pongResponseChan <- appData pongResponseChan <- appData
return nil return nil
}) })
conn.SetCloseHandler(func(code int, text string) error { conn.SetCloseHandler(func(code int, text string) error {
message := websocket.FormatCloseMessage(code, "") message := websocket.FormatCloseMessage(code, "")
conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(defaultWriteWait)) conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(defaultWriteWait))
select { select {
case r.closeResponseChan <- &wsCloseRespObject{ case closeResponseChan <- &wsCloseRespObject{
StatusCode: code, StatusCode: code,
Text: text, Text: text,
}: }:
@@ -487,7 +507,7 @@ func openWithTimeout(urlStr string, requestHeader http.Header, r *SessionRunner,
return nil return nil
}) })
r.wsConnMap[urlStr] = conn wsConnMap[urlStr] = conn
openResponseChan <- resp openResponseChan <- resp
}() }()
@@ -504,7 +524,7 @@ func openWithTimeout(urlStr string, requestHeader http.Header, r *SessionRunner,
} }
func readMessageWithTimeout(urlString string, r *SessionRunner, step *TStep) (*wsReadRespObject, error) { func readMessageWithTimeout(urlString string, r *SessionRunner, step *TStep) (*wsReadRespObject, error) {
wsConn := r.getWsClient(urlString) wsConn := getWsClient(urlString)
if wsConn == nil { if wsConn == nil {
return nil, errors.New("try to use existing connection, but there is no connection") return nil, errors.New("try to use existing connection, but there is no connection")
} }
@@ -534,7 +554,7 @@ func readMessageWithTimeout(urlString string, r *SessionRunner, step *TStep) (*w
} }
func writeWebSocket(urlString string, r *SessionRunner, step *TStep, stepVariables map[string]interface{}) error { func writeWebSocket(urlString string, r *SessionRunner, step *TStep, stepVariables map[string]interface{}) error {
wsConn := r.getWsClient(urlString) wsConn := getWsClient(urlString)
if wsConn == nil { if wsConn == nil {
return errors.New("try to use existing connection, but there is no connection") return errors.New("try to use existing connection, but there is no connection")
} }
@@ -600,7 +620,7 @@ func writeWithAction(c *websocket.Conn, step *TStep, messageType int, message []
} }
func closeWithTimeout(urlString string, r *SessionRunner, step *TStep, stepVariables map[string]interface{}) (*wsCloseRespObject, error) { func closeWithTimeout(urlString string, r *SessionRunner, step *TStep, stepVariables map[string]interface{}) (*wsCloseRespObject, error) {
wsConn := r.getWsClient(urlString) wsConn := getWsClient(urlString)
if wsConn == nil { if wsConn == nil {
return nil, errors.New("no connection needs to be closed") return nil, errors.New("no connection needs to be closed")
} }
@@ -639,7 +659,7 @@ func closeWithTimeout(urlString string, r *SessionRunner, step *TStep, stepVaria
return nil, errors.New("close timeout") return nil, errors.New("close timeout")
case err := <-errorChan: case err := <-errorChan:
return nil, err return nil, err
case closeResult := <-r.closeResponseChan: case closeResult := <-closeResponseChan:
return closeResult, nil return closeResult, nil
} }
} }
@@ -673,3 +693,17 @@ func getContentSize(resp interface{}) int64 {
return -1 return -1
} }
} }
// releaseResources releases resources used by session runner
func (r *SessionRunner) releaseResources() {
// close websocket connections
for _, wsConn := range wsConnMap {
if wsConn != nil {
log.Info().Str("testcase", r.caseRunner.Config.Name).Msg("websocket disconnected")
err := wsConn.Close()
if err != nil {
log.Error().Err(err).Msg("websocket disconnection failed")
}
}
}
}