diff --git a/hrp/runner.go b/hrp/runner.go index bb728f64..109dd479 100644 --- a/hrp/runner.go +++ b/hrp/runner.go @@ -410,6 +410,7 @@ func (r *CaseRunner) NewSession() *SessionRunner { summary: newSummary(), transactions: make(map[string]map[transactionType]time.Time), + ws: newWSSession(), } return sessionRunner } @@ -425,6 +426,9 @@ type SessionRunner struct { // transactions stores transaction timing info. // key is transaction name, value is map of transaction type and time, e.g. start time and end time. transactions map[string]map[transactionType]time.Time + + // websocket session + ws *wsSession } // Start runs the test steps in sequential order. diff --git a/hrp/step_websocket.go b/hrp/step_websocket.go index 57c0c2e2..23b7c4c3 100644 --- a/hrp/step_websocket.go +++ b/hrp/step_websocket.go @@ -4,7 +4,6 @@ import ( "bytes" "fmt" "net/http" - "sync" "testing" "time" "unsafe" @@ -17,17 +16,18 @@ import ( "github.com/httprunner/httprunner/v4/hrp/internal/json" ) -var ( - wsMutex sync.Mutex +func newWSSession() *wsSession { + return &wsSession{ + wsConnMap: make(map[string]*websocket.Conn), + pongResponseChan: make(chan string, 1), + closeResponseChan: make(chan *wsCloseRespObject, 1), + } +} + +type wsSession struct { wsConnMap map[string]*websocket.Conn // save all websocket connections pongResponseChan chan string // channel used to receive pong response message closeResponseChan chan *wsCloseRespObject // channel used to receive close response message -) - -func init() { - wsConnMap = make(map[string]*websocket.Conn) - pongResponseChan = make(chan string, 1) - closeResponseChan = make(chan *wsCloseRespObject, 1) } const ( @@ -321,7 +321,7 @@ func runStepWebSocket(r *SessionRunner, step *TStep) (stepResult *StepResult, er case wsOpen: log.Info().Int64("timeout(ms)", step.WebSocket.GetTimeout()).Str("url", parsedURL).Msg("open websocket connection") // use the current websocket connection if existed - if getWsClient(parsedURL) != nil { + if getWsClient(r, parsedURL) != nil { break } resp, err = openWithTimeout(parsedURL, parsedHeader, r, step) @@ -341,7 +341,7 @@ func runStepWebSocket(r *SessionRunner, step *TStep) (stepResult *StepResult, er case <-timer.C: timer.Stop() log.Warn().Msg("pong timeout") - case pongResponse := <-pongResponseChan: + case pongResponse := <-r.ws.pongResponseChan: resp = pongResponse log.Info().Msg("pong message arrives") } @@ -433,8 +433,8 @@ func runStepWebSocket(r *SessionRunner, step *TStep) (stepResult *StepResult, er return stepResult, nil } -func getWsClient(url string) *websocket.Conn { - if client, ok := wsConnMap[url]; ok { +func getWsClient(r *SessionRunner, url string) *websocket.Conn { + if client, ok := r.ws.wsConnMap[url]; ok { return client } @@ -485,14 +485,14 @@ func openWithTimeout(urlStr string, requestHeader http.Header, r *SessionRunner, // the following handlers handle and transport control message from server conn.SetPongHandler(func(appData string) error { - pongResponseChan <- appData + r.ws.pongResponseChan <- appData return nil }) conn.SetCloseHandler(func(code int, text string) error { message := websocket.FormatCloseMessage(code, "") conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(defaultWriteWait)) select { - case closeResponseChan <- &wsCloseRespObject{ + case r.ws.closeResponseChan <- &wsCloseRespObject{ StatusCode: code, Text: text, }: @@ -502,10 +502,8 @@ func openWithTimeout(urlStr string, requestHeader http.Header, r *SessionRunner, return nil }) - wsMutex.Lock() - wsConnMap[urlStr] = conn - wsMutex.Unlock() + r.ws.wsConnMap[urlStr] = conn openResponseChan <- resp }() @@ -522,7 +520,7 @@ func openWithTimeout(urlStr string, requestHeader http.Header, r *SessionRunner, } func readMessageWithTimeout(urlString string, r *SessionRunner, step *TStep) (*wsReadRespObject, error) { - wsConn := getWsClient(urlString) + wsConn := getWsClient(r, urlString) if wsConn == nil { return nil, errors.New("try to use existing connection, but there is no connection") } @@ -552,7 +550,7 @@ func readMessageWithTimeout(urlString string, r *SessionRunner, step *TStep) (*w } func writeWebSocket(urlString string, r *SessionRunner, step *TStep, stepVariables map[string]interface{}) error { - wsConn := getWsClient(urlString) + wsConn := getWsClient(r, urlString) if wsConn == nil { return errors.New("try to use existing connection, but there is no connection") } @@ -618,7 +616,7 @@ func writeWithAction(c *websocket.Conn, step *TStep, messageType int, message [] } func closeWithTimeout(urlString string, r *SessionRunner, step *TStep, stepVariables map[string]interface{}) (*wsCloseRespObject, error) { - wsConn := getWsClient(urlString) + wsConn := getWsClient(r, urlString) if wsConn == nil { return nil, errors.New("no connection needs to be closed") } @@ -657,7 +655,7 @@ func closeWithTimeout(urlString string, r *SessionRunner, step *TStep, stepVaria return nil, errors.New("close timeout") case err := <-errorChan: return nil, err - case closeResult := <-closeResponseChan: + case closeResult := <-r.ws.closeResponseChan: return closeResult, nil } } @@ -695,7 +693,7 @@ func getContentSize(resp interface{}) int64 { // releaseResources releases resources used by session runner func (r *SessionRunner) releaseResources() { // close websocket connections - for _, wsConn := range wsConnMap { + for _, wsConn := range r.ws.wsConnMap { if wsConn != nil { log.Info().Str("testcase", r.caseRunner.Config.Name).Msg("websocket disconnected") err := wsConn.Close()