feat(jvm): 新增持续监控与采样链路

- 后端新增监控会话管理,支持启动、停止和历史查询

- JMX、Endpoint、Agent Provider 补齐监控快照采集能力

- JMX helper 增加内存、GC、线程、类加载采样并更新内嵌运行时

- 生成 Wails 监控接口绑定并补充后端回归测试
This commit is contained in:
Syngnat
2026-04-26 14:33:41 +08:00
parent a43c84f968
commit 9d08b185d0
21 changed files with 1664 additions and 13 deletions

View File

@@ -1 +1 @@
26a843d5fd071d0c7e9d8022e98eb4e3
571d014306268cf67665967059cda912

View File

@@ -134,6 +134,8 @@ export function JVMCancelDiagnosticCommand(arg1:connection.ConnectionConfig,arg2
export function JVMExecuteDiagnosticCommand(arg1:connection.ConnectionConfig,arg2:string,arg3:jvm.DiagnosticCommandRequest):Promise<connection.QueryResult>;
export function JVMGetMonitoringHistory(arg1:connection.ConnectionConfig,arg2:string):Promise<connection.QueryResult>;
export function JVMGetValue(arg1:connection.ConnectionConfig,arg2:string):Promise<connection.QueryResult>;
export function JVMListAuditRecords(arg1:string,arg2:number):Promise<connection.QueryResult>;
@@ -150,6 +152,10 @@ export function JVMProbeDiagnosticCapabilities(arg1:connection.ConnectionConfig)
export function JVMStartDiagnosticSession(arg1:connection.ConnectionConfig,arg2:jvm.DiagnosticSessionRequest):Promise<connection.QueryResult>;
export function JVMStartMonitoring(arg1:connection.ConnectionConfig):Promise<connection.QueryResult>;
export function JVMStopMonitoring(arg1:connection.ConnectionConfig,arg2:string):Promise<connection.QueryResult>;
export function ListSQLDirectory(arg1:string):Promise<connection.QueryResult>;
export function LogWindowDiagnostic(arg1:string,arg2:string):Promise<void>;

View File

@@ -258,6 +258,10 @@ export function JVMExecuteDiagnosticCommand(arg1, arg2, arg3) {
return window['go']['app']['App']['JVMExecuteDiagnosticCommand'](arg1, arg2, arg3);
}
export function JVMGetMonitoringHistory(arg1, arg2) {
return window['go']['app']['App']['JVMGetMonitoringHistory'](arg1, arg2);
}
export function JVMGetValue(arg1, arg2) {
return window['go']['app']['App']['JVMGetValue'](arg1, arg2);
}
@@ -290,6 +294,14 @@ export function JVMStartDiagnosticSession(arg1, arg2) {
return window['go']['app']['App']['JVMStartDiagnosticSession'](arg1, arg2);
}
export function JVMStartMonitoring(arg1) {
return window['go']['app']['App']['JVMStartMonitoring'](arg1);
}
export function JVMStopMonitoring(arg1, arg2) {
return window['go']['app']['App']['JVMStopMonitoring'](arg1, arg2);
}
export function ListSQLDirectory(arg1) {
return window['go']['app']['App']['ListSQLDirectory'](arg1);
}

View File

@@ -0,0 +1,77 @@
package app
import (
"context"
"fmt"
"strings"
"GoNavi-Wails/internal/connection"
"GoNavi-Wails/internal/jvm"
)
type jvmMonitoringService interface {
Start(ctx context.Context, cfg connection.ConnectionConfig, requestedMode string) (jvm.MonitoringSessionSnapshot, error)
GetHistory(connectionID string, providerMode string) (jvm.MonitoringSessionSnapshot, error)
Stop(connectionID string, providerMode string) error
}
var currentJVMMonitoringManager jvmMonitoringService = jvm.NewMonitoringManager()
func (a *App) JVMStartMonitoring(cfg connection.ConnectionConfig) connection.QueryResult {
snapshot, err := currentJVMMonitoringManager.Start(a.ctx, cfg, "")
if err != nil {
return connection.QueryResult{Success: false, Message: err.Error()}
}
return connection.QueryResult{Success: true, Data: snapshot}
}
func (a *App) JVMGetMonitoringHistory(cfg connection.ConnectionConfig, providerMode string) connection.QueryResult {
connectionID, resolvedMode, err := resolveJVMMonitoringLookup(cfg, providerMode)
if err != nil {
return connection.QueryResult{Success: false, Message: err.Error()}
}
snapshot, err := currentJVMMonitoringManager.GetHistory(connectionID, resolvedMode)
if err != nil {
return connection.QueryResult{Success: false, Message: err.Error()}
}
return connection.QueryResult{Success: true, Data: snapshot}
}
func (a *App) JVMStopMonitoring(cfg connection.ConnectionConfig, providerMode string) connection.QueryResult {
connectionID, resolvedMode, err := resolveJVMMonitoringLookup(cfg, providerMode)
if err != nil {
return connection.QueryResult{Success: false, Message: err.Error()}
}
if err := currentJVMMonitoringManager.Stop(connectionID, resolvedMode); err != nil {
return connection.QueryResult{Success: false, Message: err.Error()}
}
return connection.QueryResult{Success: true, Data: map[string]any{
"connectionId": connectionID,
"providerMode": resolvedMode,
"status": "stopped",
}}
}
func resolveJVMMonitoringLookup(cfg connection.ConnectionConfig, requestedMode string) (string, string, error) {
normalized, resolvedMode, err := jvm.ResolveProviderMode(cfg, requestedMode)
if err != nil {
return "", "", err
}
return resolveJVMMonitoringConnectionID(normalized), resolvedMode, nil
}
func resolveJVMMonitoringConnectionID(cfg connection.ConnectionConfig) string {
if trimmed := strings.TrimSpace(cfg.ID); trimmed != "" {
return trimmed
}
host := strings.TrimSpace(cfg.Host)
if host == "" {
host = "unknown"
}
if cfg.Port > 0 {
return fmt.Sprintf("%s:%d", host, cfg.Port)
}
return host
}

View File

@@ -0,0 +1,147 @@
package app
import (
"context"
"errors"
"testing"
"GoNavi-Wails/internal/connection"
"GoNavi-Wails/internal/jvm"
)
type fakeJVMMonitoringManager struct {
startSnapshot jvm.MonitoringSessionSnapshot
startErr error
historySnapshot jvm.MonitoringSessionSnapshot
historyErr error
stopErr error
startCfg connection.ConnectionConfig
startMode string
historyConnection string
historyMode string
stopConnection string
stopMode string
}
func (f *fakeJVMMonitoringManager) Start(_ context.Context, cfg connection.ConnectionConfig, mode string) (jvm.MonitoringSessionSnapshot, error) {
f.startCfg = cfg
f.startMode = mode
return f.startSnapshot, f.startErr
}
func (f *fakeJVMMonitoringManager) GetHistory(connectionID string, providerMode string) (jvm.MonitoringSessionSnapshot, error) {
f.historyConnection = connectionID
f.historyMode = providerMode
return f.historySnapshot, f.historyErr
}
func (f *fakeJVMMonitoringManager) Stop(connectionID string, providerMode string) error {
f.stopConnection = connectionID
f.stopMode = providerMode
return f.stopErr
}
func swapJVMMonitoringManager(manager jvmMonitoringService) func() {
prev := currentJVMMonitoringManager
currentJVMMonitoringManager = manager
return func() { currentJVMMonitoringManager = prev }
}
func TestJVMStartMonitoringReturnsManagerSnapshot(t *testing.T) {
app := NewAppWithSecretStore(nil)
manager := &fakeJVMMonitoringManager{
startSnapshot: jvm.MonitoringSessionSnapshot{
ConnectionID: "conn-monitor",
ProviderMode: jvm.ModeEndpoint,
Running: true,
Points: []jvm.JVMMonitoringPoint{
{Timestamp: 1713945600000, ThreadCount: 21},
},
},
}
restore := swapJVMMonitoringManager(manager)
defer restore()
res := app.JVMStartMonitoring(connection.ConnectionConfig{
ID: "conn-monitor",
Type: "jvm",
Host: "orders.internal",
JVM: connection.JVMConfig{
PreferredMode: jvm.ModeEndpoint,
AllowedModes: []string{jvm.ModeEndpoint},
},
})
if !res.Success {
t.Fatalf("expected success, got %+v", res)
}
snapshot, ok := res.Data.(jvm.MonitoringSessionSnapshot)
if !ok {
t.Fatalf("expected monitoring snapshot, got %#v", res.Data)
}
if !snapshot.Running || len(snapshot.Points) != 1 {
t.Fatalf("unexpected snapshot: %#v", snapshot)
}
if manager.startCfg.ID != "conn-monitor" {
t.Fatalf("expected manager to receive config ID, got %#v", manager.startCfg)
}
}
func TestJVMGetMonitoringHistoryResolvesPreferredMode(t *testing.T) {
app := NewAppWithSecretStore(nil)
manager := &fakeJVMMonitoringManager{
historySnapshot: jvm.MonitoringSessionSnapshot{
ConnectionID: "conn-history",
ProviderMode: jvm.ModeJMX,
Running: true,
},
}
restore := swapJVMMonitoringManager(manager)
defer restore()
res := app.JVMGetMonitoringHistory(connection.ConnectionConfig{
ID: "conn-history",
Type: "jvm",
Host: "orders.internal",
JVM: connection.JVMConfig{
PreferredMode: jvm.ModeJMX,
AllowedModes: []string{jvm.ModeJMX},
},
}, "")
if !res.Success {
t.Fatalf("expected success, got %+v", res)
}
if manager.historyConnection != "conn-history" || manager.historyMode != jvm.ModeJMX {
t.Fatalf("unexpected manager history args: connection=%q mode=%q", manager.historyConnection, manager.historyMode)
}
}
func TestJVMStopMonitoringReturnsManagerError(t *testing.T) {
app := NewAppWithSecretStore(nil)
manager := &fakeJVMMonitoringManager{
stopErr: errors.New("session not found"),
}
restore := swapJVMMonitoringManager(manager)
defer restore()
res := app.JVMStopMonitoring(connection.ConnectionConfig{
ID: "conn-stop",
Type: "jvm",
Host: "orders.internal",
JVM: connection.JVMConfig{
PreferredMode: jvm.ModeAgent,
AllowedModes: []string{jvm.ModeAgent},
},
}, "")
if res.Success {
t.Fatalf("expected failure, got %+v", res)
}
if res.Message != "session not found" {
t.Fatalf("expected message %q, got %#v", "session not found", res)
}
if manager.stopConnection != "conn-stop" || manager.stopMode != jvm.ModeAgent {
t.Fatalf("unexpected manager stop args: connection=%q mode=%q", manager.stopConnection, manager.stopMode)
}
}

View File

@@ -91,6 +91,20 @@ func (p *AgentProvider) GetValue(ctx context.Context, cfg connection.ConnectionC
return snapshot, nil
}
func (p *AgentProvider) GetMonitoringSnapshot(ctx context.Context, cfg connection.ConnectionConfig, previous *JVMMonitoringPoint) (JVMMonitoringSnapshot, error) {
runtime, err := newAgentRuntime(cfg)
if err != nil {
return JVMMonitoringSnapshot{}, err
}
var snapshot JVMMonitoringSnapshot
if err := runtime.doJSON(ctx, http.MethodGet, "get monitoring snapshot", "metrics", nil, nil, &snapshot); err != nil {
return JVMMonitoringSnapshot{}, err
}
finalizeMonitoringSnapshot(&snapshot, previous)
return snapshot, nil
}
func (p *AgentProvider) PreviewChange(ctx context.Context, cfg connection.ConnectionConfig, req ChangeRequest) (ChangePreview, error) {
runtime, err := newAgentRuntime(cfg)
if err != nil {

View File

@@ -59,6 +59,51 @@ func TestAgentProviderListResourcesBuildsRequestAndDecodesResponse(t *testing.T)
}
}
func TestAgentProviderGetMonitoringSnapshotDecodesResponse(t *testing.T) {
provider := &AgentProvider{}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
t.Fatalf("expected GET request, got %s", r.Method)
}
if r.URL.Path != "/gonavi/agent/jvm/metrics" {
t.Fatalf("expected path /gonavi/agent/jvm/metrics, got %s", r.URL.Path)
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(JVMMonitoringSnapshot{
Point: JVMMonitoringPoint{
Timestamp: 1713945600000,
ThreadCount: 27,
HeapUsedBytes: 402653184,
GCCollectionCount: 128,
GCDeltaCount: 2,
ProcessCpuLoad: 0.29,
CommittedVirtualMemoryBytes: 2147483648,
},
RecentGCEvents: []RecentGCEvent{{
Timestamp: 1713945600000,
Name: "ConcurrentMarkSweep",
DurationMs: 12,
}},
AvailableMetrics: []string{"thread.count", "heap.used", "gc.count", "cpu.process", "memory.virtual"},
})
}))
defer server.Close()
snapshot, err := provider.GetMonitoringSnapshot(context.Background(), newAgentProviderTestConfig(server.URL+"/gonavi/agent/jvm", 3), nil)
if err != nil {
t.Fatalf("GetMonitoringSnapshot returned error: %v", err)
}
if snapshot.Point.ThreadCount != 27 || snapshot.Point.GCDeltaCount != 2 || snapshot.Point.ProcessCpuLoad != 0.29 {
t.Fatalf("unexpected monitoring snapshot: %#v", snapshot)
}
if len(snapshot.RecentGCEvents) != 1 || snapshot.RecentGCEvents[0].Name != "ConcurrentMarkSweep" {
t.Fatalf("unexpected recent gc events: %#v", snapshot.RecentGCEvents)
}
if len(snapshot.AvailableMetrics) != 5 {
t.Fatalf("unexpected available metrics: %#v", snapshot)
}
}
func TestAgentProviderRealAgentRoundTrip(t *testing.T) {
if _, err := exec.LookPath("java"); err != nil {
t.Skipf("java 不可用,跳过真实 Agent 集成测试: %v", err)
@@ -214,7 +259,18 @@ func startAgentFixture(t *testing.T) agentFixtureProcess {
t.Fatalf("write agent manifest failed: %v", err)
}
agentJar := filepath.Join(t.TempDir(), "gonavi-test-agent.jar")
agentJarFile, err := os.CreateTemp("", "gonavi-test-agent-*.jar")
if err != nil {
t.Fatalf("create agent jar temp file failed: %v", err)
}
agentJar := agentJarFile.Name()
if closeErr := agentJarFile.Close(); closeErr != nil {
t.Fatalf("close agent jar temp file failed: %v", closeErr)
}
_ = os.Remove(agentJar)
t.Cleanup(func() {
_ = os.Remove(agentJar)
})
jarCmd := exec.Command(jarBin, "cmf", manifestPath, agentJar, "-C", classesDir, "com")
output, err = jarCmd.CombinedOutput()
if err != nil {

View File

@@ -92,6 +92,20 @@ func (p *HTTPProvider) GetValue(ctx context.Context, cfg connection.ConnectionCo
return snapshot, nil
}
func (p *HTTPProvider) GetMonitoringSnapshot(ctx context.Context, cfg connection.ConnectionConfig, previous *JVMMonitoringPoint) (JVMMonitoringSnapshot, error) {
runtime, err := newEndpointRuntime(cfg)
if err != nil {
return JVMMonitoringSnapshot{}, err
}
var snapshot JVMMonitoringSnapshot
if err := runtime.doJSON(ctx, http.MethodGet, "get monitoring snapshot", "metrics", nil, nil, &snapshot); err != nil {
return JVMMonitoringSnapshot{}, err
}
finalizeMonitoringSnapshot(&snapshot, previous)
return snapshot, nil
}
func (p *HTTPProvider) PreviewChange(ctx context.Context, cfg connection.ConnectionConfig, req ChangeRequest) (ChangePreview, error) {
runtime, err := newEndpointRuntime(cfg)
if err != nil {

View File

@@ -93,6 +93,52 @@ func TestHTTPProviderGetValueDecodesResponse(t *testing.T) {
}
}
func TestHTTPProviderGetMonitoringSnapshotDecodesResponse(t *testing.T) {
provider := &HTTPProvider{}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
t.Fatalf("expected GET request, got %s", r.Method)
}
if r.URL.Path != "/manage/jvm/metrics" {
t.Fatalf("expected path /manage/jvm/metrics, got %s", r.URL.Path)
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(JVMMonitoringSnapshot{
Point: JVMMonitoringPoint{
Timestamp: 1713945600000,
ThreadCount: 18,
HeapUsedBytes: 805306368,
ProcessCpuLoad: 0.48,
ProcessRssBytes: 1879048192,
LoadedClassCount: 4096,
},
RecentGCEvents: []RecentGCEvent{{
Timestamp: 1713945600000,
Name: "G1 Old Generation",
DurationMs: 41,
}},
AvailableMetrics: []string{"thread.count", "heap.used", "cpu.process", "memory.rss", "class.loading"},
MissingMetrics: []string{"cpu.system"},
ProviderWarnings: []string{"endpoint cpu metric unavailable"},
})
}))
defer server.Close()
snapshot, err := provider.GetMonitoringSnapshot(context.Background(), newHTTPProviderTestConfig(server.URL+"/manage/jvm", 3), nil)
if err != nil {
t.Fatalf("GetMonitoringSnapshot returned error: %v", err)
}
if snapshot.Point.ThreadCount != 18 || snapshot.Point.HeapUsedBytes != 805306368 || snapshot.Point.ProcessRssBytes != 1879048192 {
t.Fatalf("unexpected monitoring snapshot: %#v", snapshot)
}
if len(snapshot.RecentGCEvents) != 1 || snapshot.RecentGCEvents[0].Name != "G1 Old Generation" {
t.Fatalf("unexpected recent gc events: %#v", snapshot.RecentGCEvents)
}
if len(snapshot.MissingMetrics) != 1 || snapshot.MissingMetrics[0] != "cpu.system" {
t.Fatalf("unexpected missing metrics: %#v", snapshot)
}
}
func TestHTTPProviderPreviewChangeAndApplySendJSONBody(t *testing.T) {
provider := NewHTTPProvider()
request := ChangeRequest{

View File

@@ -12,6 +12,7 @@ import (
"os"
"os/exec"
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
@@ -32,6 +33,7 @@ const (
jmxHelperCommandPing = "ping"
jmxHelperCommandList = "list"
jmxHelperCommandGet = "get"
jmxHelperCommandMonitor = "monitor"
jmxHelperCommandPreview = "preview"
jmxHelperCommandApply = "apply"
@@ -44,6 +46,11 @@ var (
jmxHelperLookPath = exec.LookPath
)
var (
jmxHelperSensitiveJSONFieldPattern = regexp.MustCompile(`(?i)("(?:password|apiKey|token|secret)"\s*:\s*")([^"]*)(")`)
jmxHelperSensitivePairPattern = regexp.MustCompile(`(?i)\b(password|api[_-]?key|token|secret)(\s*[:=]\s*)([^&\s;,"'}]+)`)
)
//go:embed jmxhelper_assets/jmx-helper-runtime.jar
var embeddedJMXHelperJar []byte
@@ -94,13 +101,14 @@ type jmxHelperChangePlan struct {
}
type jmxHelperResponse struct {
OK bool `json:"ok"`
Error string `json:"error,omitempty"`
Details map[string]any `json:"details,omitempty"`
Resources []jmxHelperResource `json:"resources,omitempty"`
Snapshot *jmxHelperSnapshot `json:"snapshot,omitempty"`
Preview *jmxHelperPreview `json:"preview,omitempty"`
ApplyResult *jmxHelperApplyResponse `json:"applyResult,omitempty"`
OK bool `json:"ok"`
Error string `json:"error,omitempty"`
Details map[string]any `json:"details,omitempty"`
Resources []jmxHelperResource `json:"resources,omitempty"`
Snapshot *jmxHelperSnapshot `json:"snapshot,omitempty"`
MonitoringSnapshot *jmxHelperMonitoringSnapshot `json:"monitoringSnapshot,omitempty"`
Preview *jmxHelperPreview `json:"preview,omitempty"`
ApplyResult *jmxHelperApplyResponse `json:"applyResult,omitempty"`
}
type jmxHelperResource struct {
@@ -127,6 +135,38 @@ type jmxHelperSnapshot struct {
Metadata map[string]any `json:"metadata,omitempty"`
}
type jmxHelperMonitoringPoint struct {
Timestamp int64 `json:"timestamp"`
HeapUsedBytes int64 `json:"heapUsedBytes,omitempty"`
HeapCommittedBytes int64 `json:"heapCommittedBytes,omitempty"`
HeapMaxBytes int64 `json:"heapMaxBytes,omitempty"`
NonHeapUsedBytes int64 `json:"nonHeapUsedBytes,omitempty"`
NonHeapCommittedBytes int64 `json:"nonHeapCommittedBytes,omitempty"`
GCCollectionCount int64 `json:"gcCollectionCount,omitempty"`
GCCollectionTimeMs int64 `json:"gcCollectionTimeMs,omitempty"`
GCDeltaCount int64 `json:"gcDeltaCount,omitempty"`
GCDeltaTimeMs int64 `json:"gcDeltaTimeMs,omitempty"`
ThreadCount int `json:"threadCount,omitempty"`
DaemonThreadCount int `json:"daemonThreadCount,omitempty"`
PeakThreadCount int `json:"peakThreadCount,omitempty"`
ThreadStateCounts map[string]int `json:"threadStateCounts,omitempty"`
LoadedClassCount int `json:"loadedClassCount,omitempty"`
UnloadedClassCount int64 `json:"unloadedClassCount,omitempty"`
ClassLoadDelta int64 `json:"classLoadDelta,omitempty"`
ProcessCpuLoad float64 `json:"processCpuLoad,omitempty"`
SystemCpuLoad float64 `json:"systemCpuLoad,omitempty"`
ProcessRssBytes int64 `json:"processRssBytes,omitempty"`
CommittedVirtualMemoryBytes int64 `json:"committedVirtualMemoryBytes,omitempty"`
}
type jmxHelperMonitoringSnapshot struct {
Point jmxHelperMonitoringPoint `json:"point"`
RecentGCEvents []RecentGCEvent `json:"recentGcEvents,omitempty"`
AvailableMetrics []string `json:"availableMetrics,omitempty"`
MissingMetrics []string `json:"missingMetrics,omitempty"`
ProviderWarnings []string `json:"providerWarnings,omitempty"`
}
type jmxHelperPreview struct {
Allowed bool `json:"allowed"`
RequiresConfirmation bool `json:"requiresConfirmation,omitempty"`
@@ -366,6 +406,11 @@ func helperContextSummary(cfg connection.ConnectionConfig, target *jmxResourceTa
}
}
func redactJMXHelperOutput(text string) string {
redacted := jmxHelperSensitiveJSONFieldPattern.ReplaceAllString(text, `${1}<redacted>${3}`)
return jmxHelperSensitivePairPattern.ReplaceAllString(redacted, `${1}${2}<redacted>`)
}
func runJMXHelper(
ctx context.Context,
cfg connection.ConnectionConfig,
@@ -414,6 +459,7 @@ func runJMXHelper(
defer cancel()
cmd := jmxHelperCommandContext(execCtx, runtimeInfo.javaBinary, "-cp", runtimeInfo.classpath, jmxHelperMainClass)
configureJMXHelperCommand(cmd)
cmd.Stdin = bytes.NewReader(input)
var stdout bytes.Buffer
var stderr bytes.Buffer
@@ -421,7 +467,7 @@ func runJMXHelper(
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
stderrText := strings.TrimSpace(stderr.String())
stderrText := strings.TrimSpace(redactJMXHelperOutput(stderr.String()))
if stderrText == "" {
stderrText = "<empty>"
}
@@ -436,23 +482,24 @@ func runJMXHelper(
var response jmxHelperResponse
if err := json.Unmarshal(stdout.Bytes(), &response); err != nil {
stdoutText := strings.TrimSpace(redactJMXHelperOutput(stdout.String()))
return jmxHelperResponse{}, fmt.Errorf(
"decode JMX helper %s response failed for %s: %w; stdout: %s",
command,
helperContextSummary(cfg, target),
err,
strings.TrimSpace(stdout.String()),
stdoutText,
)
}
if !response.OK {
errText := strings.TrimSpace(response.Error)
errText := strings.TrimSpace(redactJMXHelperOutput(response.Error))
if errText == "" {
errText = "unknown helper failure"
}
if len(response.Details) > 0 {
detailsJSON, marshalErr := json.Marshal(response.Details)
if marshalErr == nil {
errText += "; details=" + string(detailsJSON)
errText += "; details=" + redactJMXHelperOutput(string(detailsJSON))
}
}
return jmxHelperResponse{}, fmt.Errorf("jmx helper %s failed for %s: %s", command, helperContextSummary(cfg, target), errText)
@@ -649,6 +696,42 @@ func previewFromHelper(target jmxResourceTarget, preview *jmxHelperPreview) (Cha
return result, nil
}
func monitoringSnapshotFromHelper(snapshot *jmxHelperMonitoringSnapshot) (JVMMonitoringSnapshot, error) {
if snapshot == nil {
return JVMMonitoringSnapshot{}, fmt.Errorf("helper did not return monitoring snapshot")
}
return JVMMonitoringSnapshot{
Point: JVMMonitoringPoint{
Timestamp: snapshot.Point.Timestamp,
HeapUsedBytes: snapshot.Point.HeapUsedBytes,
HeapCommittedBytes: snapshot.Point.HeapCommittedBytes,
HeapMaxBytes: snapshot.Point.HeapMaxBytes,
NonHeapUsedBytes: snapshot.Point.NonHeapUsedBytes,
NonHeapCommittedBytes: snapshot.Point.NonHeapCommittedBytes,
GCCollectionCount: snapshot.Point.GCCollectionCount,
GCCollectionTimeMs: snapshot.Point.GCCollectionTimeMs,
GCDeltaCount: snapshot.Point.GCDeltaCount,
GCDeltaTimeMs: snapshot.Point.GCDeltaTimeMs,
ThreadCount: snapshot.Point.ThreadCount,
DaemonThreadCount: snapshot.Point.DaemonThreadCount,
PeakThreadCount: snapshot.Point.PeakThreadCount,
ThreadStateCounts: cloneStringIntMap(snapshot.Point.ThreadStateCounts),
LoadedClassCount: snapshot.Point.LoadedClassCount,
UnloadedClassCount: snapshot.Point.UnloadedClassCount,
ClassLoadDelta: snapshot.Point.ClassLoadDelta,
ProcessCpuLoad: snapshot.Point.ProcessCpuLoad,
SystemCpuLoad: snapshot.Point.SystemCpuLoad,
ProcessRssBytes: snapshot.Point.ProcessRssBytes,
CommittedVirtualMemoryBytes: snapshot.Point.CommittedVirtualMemoryBytes,
},
RecentGCEvents: append([]RecentGCEvent(nil), snapshot.RecentGCEvents...),
AvailableMetrics: append([]string(nil), snapshot.AvailableMetrics...),
MissingMetrics: append([]string(nil), snapshot.MissingMetrics...),
ProviderWarnings: append([]string(nil), snapshot.ProviderWarnings...),
}, nil
}
func applyResultFromHelper(target jmxResourceTarget, result *jmxHelperApplyResponse) (ApplyResult, error) {
if result == nil {
return ApplyResult{}, fmt.Errorf("helper did not return apply result for %s", buildJMXResourcePath(target))

View File

@@ -0,0 +1,8 @@
//go:build !windows
package jvm
import "os/exec"
func configureJMXHelperCommand(_ *exec.Cmd) {
}

View File

@@ -0,0 +1,15 @@
//go:build windows
package jvm
import (
"os/exec"
"syscall"
)
func configureJMXHelperCommand(cmd *exec.Cmd) {
if cmd == nil {
return
}
cmd.SysProcAttr = &syscall.SysProcAttr{HideWindow: true}
}

View File

@@ -6,6 +6,7 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"testing"
)
@@ -82,3 +83,18 @@ func TestEnsureJMXHelperRuntimeUsesOverrideClasspath(t *testing.T) {
t.Fatalf("expected override mode to skip cache writes, stat err=%v", err)
}
}
func TestRedactJMXHelperOutputMasksSensitiveFields(t *testing.T) {
output := `{"password":"secret-pass","apiKey":"agent-token","details":"token=abc123 password: raw-secret"}`
redacted := redactJMXHelperOutput(output)
for _, secret := range []string{"secret-pass", "agent-token", "abc123", "raw-secret"} {
if strings.Contains(redacted, secret) {
t.Fatalf("expected %q to be redacted from %q", secret, redacted)
}
}
if !strings.Contains(redacted, "<redacted>") {
t.Fatalf("expected redaction marker, got %q", redacted)
}
}

View File

@@ -0,0 +1,18 @@
//go:build windows
package jvm
import (
"os/exec"
"testing"
)
func TestConfigureJMXHelperCommandHidesWindowOnWindows(t *testing.T) {
cmd := exec.Command("java")
configureJMXHelperCommand(cmd)
if cmd.SysProcAttr == nil || !cmd.SysProcAttr.HideWindow {
t.Fatalf("expected JMX helper command to hide Windows console window, got %#v", cmd.SysProcAttr)
}
}

View File

@@ -74,6 +74,19 @@ func (p *JMXProvider) GetValue(ctx context.Context, cfg connection.ConnectionCon
return valueSnapshotFromHelper(target, resp.Snapshot)
}
func (p *JMXProvider) GetMonitoringSnapshot(ctx context.Context, cfg connection.ConnectionConfig, previous *JVMMonitoringPoint) (JVMMonitoringSnapshot, error) {
resp, err := jmxHelperRunner(ctx, cfg, jmxHelperCommandMonitor, nil, nil)
if err != nil {
return JVMMonitoringSnapshot{}, fmt.Errorf("jmx get monitoring snapshot failed: %w", err)
}
snapshot, err := monitoringSnapshotFromHelper(resp.MonitoringSnapshot)
if err != nil {
return JVMMonitoringSnapshot{}, err
}
finalizeMonitoringSnapshot(&snapshot, previous)
return snapshot, nil
}
func (p *JMXProvider) PreviewChange(ctx context.Context, cfg connection.ConnectionConfig, req ChangeRequest) (ChangePreview, error) {
target, err := parseRequiredResourcePath(req.ResourceID)
if err != nil {

View File

@@ -135,6 +135,51 @@ func TestJMXProviderGetValueUsesHelperSnapshot(t *testing.T) {
}
}
func TestJMXProviderGetMonitoringSnapshotUsesHelperMonitorCommand(t *testing.T) {
helper := &stubJMXHelper{
response: jmxHelperResponse{
MonitoringSnapshot: &jmxHelperMonitoringSnapshot{
Point: jmxHelperMonitoringPoint{
Timestamp: 1713945600000,
ThreadCount: 33,
HeapUsedBytes: 536870912,
ProcessCpuLoad: 0.37,
LoadedClassCount: 2048,
ProcessRssBytes: 1610612736,
CommittedVirtualMemoryBytes: 2147483648,
},
RecentGCEvents: []RecentGCEvent{{
Timestamp: 1713945600000,
Name: "G1 Young Generation",
Cause: "G1 Evacuation Pause",
DurationMs: 18,
}},
AvailableMetrics: []string{"thread.count", "heap.used", "class.loading", "memory.rss"},
MissingMetrics: []string{"cpu.system"},
},
},
}
withStubJMXHelper(t, helper.run)
provider := &JMXProvider{}
snapshot, err := provider.GetMonitoringSnapshot(context.Background(), newJMXProviderTestConfig(), nil)
if err != nil {
t.Fatalf("GetMonitoringSnapshot returned error: %v", err)
}
if helper.lastRequest.Command != jmxHelperCommandMonitor {
t.Fatalf("expected helper command %q, got %#v", jmxHelperCommandMonitor, helper.lastRequest)
}
if snapshot.Point.ThreadCount != 33 || snapshot.Point.HeapUsedBytes != 536870912 || snapshot.Point.LoadedClassCount != 2048 {
t.Fatalf("unexpected monitoring snapshot: %#v", snapshot)
}
if len(snapshot.RecentGCEvents) != 1 || snapshot.RecentGCEvents[0].DurationMs != 18 {
t.Fatalf("unexpected recent gc events: %#v", snapshot.RecentGCEvents)
}
if len(snapshot.MissingMetrics) != 1 || snapshot.MissingMetrics[0] != "cpu.system" {
t.Fatalf("unexpected missing metrics: %#v", snapshot)
}
}
func TestJMXProviderPreviewAndApplyUseHelperPayload(t *testing.T) {
request := ChangeRequest{
ProviderMode: ModeJMX,

View File

@@ -0,0 +1,351 @@
package jvm
import (
"context"
"fmt"
"strings"
"sync"
"time"
"GoNavi-Wails/internal/connection"
)
const (
defaultMonitoringPointLimit = 180
defaultMonitoringEventLimit = 20
defaultMonitoringInterval = 2 * time.Second
maxMonitoringSampleFailures = 3
)
var monitoringProviderFactory = NewProvider
type monitoringManager struct {
mu sync.Mutex
limit int
interval time.Duration
sessions map[string]*monitoringSession
}
type monitoringSession struct {
mu sync.Mutex
connectionID string
providerMode string
limit int
running bool
points []JVMMonitoringPoint
recentGCEvents []RecentGCEvent
availableMetrics []string
missingMetrics []string
providerWarnings []string
cancel context.CancelFunc
generation int64
}
func newMonitoringManagerForTest(limit int) *monitoringManager {
return newMonitoringManager(limit, 0)
}
func NewMonitoringManager() *monitoringManager {
return newMonitoringManager(defaultMonitoringPointLimit, defaultMonitoringInterval)
}
func newMonitoringManager(limit int, interval time.Duration) *monitoringManager {
if limit <= 0 {
limit = defaultMonitoringPointLimit
}
return &monitoringManager{
limit: limit,
interval: interval,
sessions: make(map[string]*monitoringSession),
}
}
func (m *monitoringManager) ensureSession(connectionID string, providerMode string) *monitoringSession {
m.mu.Lock()
defer m.mu.Unlock()
key := connectionID + ":" + providerMode
if session, ok := m.sessions[key]; ok {
return session
}
session := &monitoringSession{
connectionID: connectionID,
providerMode: providerMode,
limit: m.limit,
}
m.sessions[key] = session
return session
}
func (m *monitoringManager) Start(ctx context.Context, raw connection.ConnectionConfig, requestedMode string) (MonitoringSessionSnapshot, error) {
cfg, providerMode, err := ResolveProviderMode(raw, requestedMode)
if err != nil {
return MonitoringSessionSnapshot{}, err
}
connectionID := resolveMonitoringConnectionID(cfg)
session := m.ensureSession(connectionID, providerMode)
provider, err := monitoringProviderFactory(providerMode)
if err != nil {
return MonitoringSessionSnapshot{}, err
}
monitoringProvider, ok := provider.(MonitoringCapableProvider)
if !ok {
return MonitoringSessionSnapshot{}, fmt.Errorf("%s provider does not implement monitoring snapshot yet", ModeDisplayLabel(providerMode))
}
generation := session.reset(connectionID, providerMode)
if err := m.sampleOnce(ctx, monitoringProvider, cfg, session, generation); err != nil {
session.markStopped(generation)
return MonitoringSessionSnapshot{}, err
}
session.markRunning(generation)
if m.interval > 0 {
loopCtx, cancel := context.WithCancel(context.Background())
session.setCancel(cancel)
go m.runSampler(loopCtx, monitoringProvider, cfg, session, generation)
}
return session.snapshot(), nil
}
func (m *monitoringManager) Stop(connectionID string, providerMode string) error {
m.mu.Lock()
session, ok := m.sessions[m.sessionKey(connectionID, providerMode)]
m.mu.Unlock()
if !ok {
return fmt.Errorf("monitoring session not found for %s %s", connectionID, providerMode)
}
session.stop()
return nil
}
func (m *monitoringManager) GetHistory(connectionID string, providerMode string) (MonitoringSessionSnapshot, error) {
m.mu.Lock()
session, ok := m.sessions[m.sessionKey(connectionID, providerMode)]
m.mu.Unlock()
if !ok {
return MonitoringSessionSnapshot{}, fmt.Errorf("monitoring session not found for %s %s", connectionID, providerMode)
}
return session.snapshot(), nil
}
func (s *monitoringSession) appendPoint(point JVMMonitoringPoint) {
s.mu.Lock()
defer s.mu.Unlock()
s.points = append(s.points, point)
if len(s.points) > s.limit {
s.points = append([]JVMMonitoringPoint(nil), s.points[len(s.points)-s.limit:]...)
}
}
func (m *monitoringManager) sessionKey(connectionID string, providerMode string) string {
return strings.TrimSpace(connectionID) + ":" + strings.TrimSpace(providerMode)
}
func (m *monitoringManager) runSampler(ctx context.Context, provider MonitoringCapableProvider, cfg connection.ConnectionConfig, session *monitoringSession, generation int64) {
ticker := time.NewTicker(m.interval)
defer ticker.Stop()
consecutiveFailures := 0
for {
select {
case <-ctx.Done():
session.markStopped(generation)
return
case <-ticker.C:
if err := m.sampleOnce(ctx, provider, cfg, session, generation); err != nil {
consecutiveFailures++
session.appendWarning(err.Error())
if consecutiveFailures >= maxMonitoringSampleFailures {
session.appendWarning(fmt.Sprintf("监控采样连续失败 %d 次,已自动停止本次监控会话", consecutiveFailures))
session.markStopped(generation)
return
}
continue
}
consecutiveFailures = 0
}
}
}
func (m *monitoringManager) sampleOnce(ctx context.Context, provider MonitoringCapableProvider, cfg connection.ConnectionConfig, session *monitoringSession, generation int64) error {
previous, ok := session.previousPoint(generation)
if !ok {
return nil
}
snapshot, err := provider.GetMonitoringSnapshot(ctx, cfg, previous)
if err != nil {
return err
}
session.applySnapshot(snapshot, generation)
return nil
}
func (s *monitoringSession) snapshot() MonitoringSessionSnapshot {
s.mu.Lock()
defer s.mu.Unlock()
return MonitoringSessionSnapshot{
ConnectionID: s.connectionID,
ProviderMode: s.providerMode,
Running: s.running,
Points: append([]JVMMonitoringPoint(nil), s.points...),
RecentGCEvents: append([]RecentGCEvent(nil), s.recentGCEvents...),
AvailableMetrics: append([]string(nil), s.availableMetrics...),
MissingMetrics: append([]string(nil), s.missingMetrics...),
ProviderWarnings: append([]string(nil), s.providerWarnings...),
}
}
func (s *monitoringSession) previousPoint(generation int64) (*JVMMonitoringPoint, bool) {
s.mu.Lock()
defer s.mu.Unlock()
if generation != s.generation {
return nil, false
}
if len(s.points) == 0 {
return nil, true
}
point := s.points[len(s.points)-1]
if point.ThreadStateCounts != nil {
point.ThreadStateCounts = cloneStringIntMap(point.ThreadStateCounts)
}
return &point, true
}
func (s *monitoringSession) applySnapshot(snapshot JVMMonitoringSnapshot, generation int64) bool {
s.mu.Lock()
defer s.mu.Unlock()
if generation != s.generation {
return false
}
s.points = append(s.points, cloneMonitoringPoint(snapshot.Point))
if len(s.points) > s.limit {
s.points = append([]JVMMonitoringPoint(nil), s.points[len(s.points)-s.limit:]...)
}
s.recentGCEvents = append([]RecentGCEvent(nil), snapshot.RecentGCEvents...)
if len(s.recentGCEvents) > defaultMonitoringEventLimit {
s.recentGCEvents = append([]RecentGCEvent(nil), s.recentGCEvents[len(s.recentGCEvents)-defaultMonitoringEventLimit:]...)
}
s.availableMetrics = append([]string(nil), snapshot.AvailableMetrics...)
s.missingMetrics = append([]string(nil), snapshot.MissingMetrics...)
s.providerWarnings = append([]string(nil), snapshot.ProviderWarnings...)
return true
}
func (s *monitoringSession) appendWarning(warning string) {
s.mu.Lock()
defer s.mu.Unlock()
trimmed := strings.TrimSpace(warning)
if trimmed == "" {
return
}
for _, existing := range s.providerWarnings {
if existing == trimmed {
return
}
}
s.providerWarnings = append(s.providerWarnings, trimmed)
if len(s.providerWarnings) > defaultMonitoringEventLimit {
s.providerWarnings = append([]string(nil), s.providerWarnings[len(s.providerWarnings)-defaultMonitoringEventLimit:]...)
}
}
func (s *monitoringSession) reset(connectionID string, providerMode string) int64 {
s.mu.Lock()
defer s.mu.Unlock()
if s.cancel != nil {
s.cancel()
s.cancel = nil
}
s.generation++
s.connectionID = connectionID
s.providerMode = providerMode
s.running = false
s.points = nil
s.recentGCEvents = nil
s.availableMetrics = nil
s.missingMetrics = nil
s.providerWarnings = nil
return s.generation
}
func (s *monitoringSession) setCancel(cancel context.CancelFunc) {
s.mu.Lock()
defer s.mu.Unlock()
s.cancel = cancel
}
func (s *monitoringSession) markRunning(generation int64) {
s.mu.Lock()
defer s.mu.Unlock()
if generation != s.generation {
return
}
s.running = true
}
func (s *monitoringSession) markStopped(generation int64) {
s.mu.Lock()
defer s.mu.Unlock()
if generation != s.generation {
return
}
s.running = false
s.cancel = nil
}
func (s *monitoringSession) stop() {
s.mu.Lock()
cancel := s.cancel
s.cancel = nil
s.generation++
s.running = false
s.mu.Unlock()
if cancel != nil {
cancel()
}
}
func resolveMonitoringConnectionID(cfg connection.ConnectionConfig) string {
if trimmed := strings.TrimSpace(cfg.ID); trimmed != "" {
return trimmed
}
host := strings.TrimSpace(cfg.Host)
if host == "" {
host = "unknown"
}
if cfg.Port > 0 {
return fmt.Sprintf("%s:%d", host, cfg.Port)
}
return host
}
func cloneMonitoringPoint(point JVMMonitoringPoint) JVMMonitoringPoint {
cloned := point
cloned.ThreadStateCounts = cloneStringIntMap(point.ThreadStateCounts)
return cloned
}
func cloneStringIntMap(input map[string]int) map[string]int {
if len(input) == 0 {
return nil
}
cloned := make(map[string]int, len(input))
for key, value := range input {
cloned[key] = value
}
return cloned
}

View File

@@ -0,0 +1,353 @@
package jvm
import (
"context"
"errors"
"sync"
"testing"
"time"
"GoNavi-Wails/internal/connection"
)
type fakeMonitoringProvider struct {
snapshot JVMMonitoringSnapshot
snapshotErr error
}
type blockingMonitoringProvider struct {
fakeMonitoringProvider
started chan struct{}
release chan struct{}
once sync.Once
}
func (f fakeMonitoringProvider) Mode() string { return ModeJMX }
func (f fakeMonitoringProvider) TestConnection(context.Context, connection.ConnectionConfig) error {
return nil
}
func (f fakeMonitoringProvider) ProbeCapabilities(context.Context, connection.ConnectionConfig) ([]Capability, error) {
return nil, nil
}
func (f fakeMonitoringProvider) ListResources(context.Context, connection.ConnectionConfig, string) ([]ResourceSummary, error) {
return nil, nil
}
func (f fakeMonitoringProvider) GetValue(context.Context, connection.ConnectionConfig, string) (ValueSnapshot, error) {
return ValueSnapshot{}, nil
}
func (f fakeMonitoringProvider) PreviewChange(context.Context, connection.ConnectionConfig, ChangeRequest) (ChangePreview, error) {
return ChangePreview{}, nil
}
func (f fakeMonitoringProvider) ApplyChange(context.Context, connection.ConnectionConfig, ChangeRequest) (ApplyResult, error) {
return ApplyResult{}, nil
}
func (f fakeMonitoringProvider) GetMonitoringSnapshot(context.Context, connection.ConnectionConfig, *JVMMonitoringPoint) (JVMMonitoringSnapshot, error) {
return f.snapshot, f.snapshotErr
}
func (p *blockingMonitoringProvider) GetMonitoringSnapshot(context.Context, connection.ConnectionConfig, *JVMMonitoringPoint) (JVMMonitoringSnapshot, error) {
p.once.Do(func() {
close(p.started)
})
<-p.release
return p.snapshot, p.snapshotErr
}
func swapMonitoringProviderFactory(factory func(mode string) (Provider, error)) func() {
prev := monitoringProviderFactory
monitoringProviderFactory = factory
return func() { monitoringProviderFactory = prev }
}
func TestMonitoringRingBufferKeepsLatestPoints(t *testing.T) {
manager := newMonitoringManagerForTest(3)
session := manager.ensureSession("conn-1", ModeJMX)
for i := 1; i <= 5; i++ {
session.appendPoint(JVMMonitoringPoint{Timestamp: int64(i)})
}
snapshot := session.snapshot()
if len(snapshot.Points) != 3 {
t.Fatalf("expected 3 points, got %d", len(snapshot.Points))
}
if snapshot.Points[0].Timestamp != 3 || snapshot.Points[2].Timestamp != 5 {
t.Fatalf("unexpected points order: %#v", snapshot.Points)
}
}
func TestMonitoringSessionSnapshotCarriesProviderWarningsAndGCEvents(t *testing.T) {
manager := newMonitoringManagerForTest(5)
session := manager.ensureSession("conn-2", ModeEndpoint)
session.running = true
session.availableMetrics = []string{"heap.used", "thread.count", "memory.rss"}
session.missingMetrics = []string{"cpu.process", "gc.events"}
session.providerWarnings = []string{"endpoint metrics degraded"}
session.recentGCEvents = []RecentGCEvent{
{
Timestamp: 1713945600000,
Name: "G1 Young Generation",
Cause: "G1 Evacuation Pause",
Action: "end of minor GC",
DurationMs: 21,
BeforeUsedBytes: 734003200,
AfterUsedBytes: 503316480,
},
}
session.appendPoint(JVMMonitoringPoint{
Timestamp: 1713945600000,
ThreadCount: 18,
HeapUsedBytes: 503316480,
ProcessRssBytes: 1073741824,
})
snapshot := session.snapshot()
if !snapshot.Running {
t.Fatalf("expected session to be running")
}
if snapshot.ProviderMode != ModeEndpoint {
t.Fatalf("expected provider mode %q, got %q", ModeEndpoint, snapshot.ProviderMode)
}
if len(snapshot.AvailableMetrics) != 3 {
t.Fatalf("expected available metrics, got %#v", snapshot.AvailableMetrics)
}
if len(snapshot.MissingMetrics) != 2 || snapshot.MissingMetrics[0] != "cpu.process" {
t.Fatalf("unexpected missing metrics: %#v", snapshot.MissingMetrics)
}
if len(snapshot.ProviderWarnings) != 1 {
t.Fatalf("expected provider warning, got %#v", snapshot.ProviderWarnings)
}
if len(snapshot.RecentGCEvents) != 1 {
t.Fatalf("expected recent gc event, got %#v", snapshot.RecentGCEvents)
}
if len(snapshot.Points) != 1 || snapshot.Points[0].ThreadCount != 18 || snapshot.Points[0].HeapUsedBytes != 503316480 {
t.Fatalf("unexpected points snapshot: %#v", snapshot.Points)
}
}
func TestMonitoringManagerStartSamplesImmediatelyAndReturnsHistory(t *testing.T) {
manager := newMonitoringManagerForTest(5)
restore := swapMonitoringProviderFactory(func(mode string) (Provider, error) {
return fakeMonitoringProvider{
snapshot: JVMMonitoringSnapshot{
Point: JVMMonitoringPoint{
Timestamp: 1713945600000,
ThreadCount: 12,
HeapUsedBytes: 268435456,
ProcessCpuLoad: 0.42,
},
AvailableMetrics: []string{"thread.count", "heap.used"},
MissingMetrics: []string{"cpu.process"},
ProviderWarnings: []string{"jmx cpu metric unavailable"},
},
}, nil
})
defer restore()
readOnly := true
cfg := connection.ConnectionConfig{
ID: "conn-monitor",
Type: "jvm",
Host: "orders.internal",
JVM: connection.JVMConfig{
ReadOnly: &readOnly,
PreferredMode: ModeJMX,
AllowedModes: []string{ModeJMX},
},
}
snapshot, err := manager.Start(context.Background(), cfg, "")
if err != nil {
t.Fatalf("Start returned error: %v", err)
}
if !snapshot.Running {
t.Fatalf("expected started session to be running")
}
if len(snapshot.Points) != 1 || snapshot.Points[0].ThreadCount != 12 || snapshot.Points[0].HeapUsedBytes != 268435456 {
t.Fatalf("unexpected initial points: %#v", snapshot.Points)
}
history, err := manager.GetHistory("conn-monitor", ModeJMX)
if err != nil {
t.Fatalf("GetHistory returned error: %v", err)
}
if len(history.MissingMetrics) != 1 || history.MissingMetrics[0] != "cpu.process" {
t.Fatalf("unexpected history missing metrics: %#v", history.MissingMetrics)
}
if len(history.ProviderWarnings) != 1 {
t.Fatalf("unexpected provider warnings: %#v", history.ProviderWarnings)
}
}
func TestMonitoringManagerStopMarksSessionStopped(t *testing.T) {
manager := newMonitoringManagerForTest(5)
restore := swapMonitoringProviderFactory(func(mode string) (Provider, error) {
return fakeMonitoringProvider{
snapshot: JVMMonitoringSnapshot{
Point: JVMMonitoringPoint{Timestamp: 1713945600000, ThreadCount: 7},
},
}, nil
})
defer restore()
cfg := connection.ConnectionConfig{
ID: "conn-stop",
Type: "jvm",
Host: "orders.internal",
JVM: connection.JVMConfig{
PreferredMode: ModeEndpoint,
AllowedModes: []string{ModeEndpoint},
},
}
if _, err := manager.Start(context.Background(), cfg, ModeEndpoint); err != nil {
t.Fatalf("Start returned error: %v", err)
}
if err := manager.Stop("conn-stop", ModeEndpoint); err != nil {
t.Fatalf("Stop returned error: %v", err)
}
history, err := manager.GetHistory("conn-stop", ModeEndpoint)
if err != nil {
t.Fatalf("GetHistory returned error: %v", err)
}
if history.Running {
t.Fatalf("expected session to stop running, got %#v", history)
}
}
func TestMonitoringSessionIgnoresStaleStopFromPreviousSampler(t *testing.T) {
session := &monitoringSession{}
firstGeneration := session.reset("conn-race", ModeJMX)
session.markRunning(firstGeneration)
secondGeneration := session.reset("conn-race", ModeJMX)
session.markRunning(secondGeneration)
session.markStopped(firstGeneration)
if snapshot := session.snapshot(); !snapshot.Running {
t.Fatalf("expected stale sampler stop to be ignored, got %#v", snapshot)
}
session.markStopped(secondGeneration)
if snapshot := session.snapshot(); snapshot.Running {
t.Fatalf("expected active generation stop to mark stopped, got %#v", snapshot)
}
}
func TestMonitoringSessionIgnoresStalePointFromPreviousSampler(t *testing.T) {
manager := newMonitoringManager(5, time.Millisecond)
session := &monitoringSession{limit: 5}
provider := &blockingMonitoringProvider{
fakeMonitoringProvider: fakeMonitoringProvider{
snapshot: JVMMonitoringSnapshot{
Point: JVMMonitoringPoint{
Timestamp: 1713945600000,
ThreadCount: 8,
},
AvailableMetrics: []string{"thread.count"},
},
},
started: make(chan struct{}),
release: make(chan struct{}),
}
firstGeneration := session.reset("conn-race", ModeJMX)
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
manager.runSampler(ctx, provider, connection.ConnectionConfig{}, session, firstGeneration)
close(done)
}()
select {
case <-provider.started:
case <-time.After(time.Second):
t.Fatal("sampler did not start within 1s")
}
secondGeneration := session.reset("conn-race", ModeJMX)
session.markRunning(secondGeneration)
close(provider.release)
cancel()
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("sampler did not stop within 1s")
}
snapshot := session.snapshot()
if !snapshot.Running {
t.Fatalf("expected new generation to remain running, got %#v", snapshot)
}
if len(snapshot.Points) != 0 {
t.Fatalf("expected stale sampler point to be ignored, got %#v", snapshot.Points)
}
}
func TestFinalizeMonitoringSnapshotPreservesProviderDeltaWhenClassTotalMissing(t *testing.T) {
snapshot := JVMMonitoringSnapshot{
Point: JVMMonitoringPoint{
Timestamp: 1713945602000,
ClassLoadDelta: 3,
},
AvailableMetrics: []string{"class.delta"},
}
finalizeMonitoringSnapshot(&snapshot, &JVMMonitoringPoint{
Timestamp: 1713945600000,
LoadedClassCount: 200,
})
if snapshot.Point.ClassLoadDelta != 3 {
t.Fatalf("expected provider class delta to be preserved, got %#v", snapshot.Point)
}
}
func TestMonitoringSamplerStopsAfterConsecutiveFailures(t *testing.T) {
manager := newMonitoringManager(5, time.Millisecond)
session := &monitoringSession{limit: 5}
generation := session.reset("conn-fail", ModeJMX)
session.markRunning(generation)
provider := fakeMonitoringProvider{snapshotErr: errors.New("collector unavailable")}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
done := make(chan struct{})
go func() {
manager.runSampler(ctx, provider, connection.ConnectionConfig{}, session, generation)
close(done)
}()
deadline := time.After(time.Second)
for {
select {
case <-done:
snapshot := session.snapshot()
if snapshot.Running {
t.Fatalf("expected session to stop after consecutive failures, got %#v", snapshot)
}
if len(snapshot.ProviderWarnings) == 0 {
t.Fatalf("expected provider warnings to explain sampling failure")
}
return
case <-deadline:
t.Fatal("sampler did not stop after consecutive failures")
case <-time.After(10 * time.Millisecond):
}
}
}
func TestMonitoringSessionDeduplicatesProviderWarnings(t *testing.T) {
session := &monitoringSession{}
session.appendWarning("collector unavailable")
session.appendWarning("collector unavailable")
session.appendWarning(" collector unavailable ")
snapshot := session.snapshot()
if len(snapshot.ProviderWarnings) != 1 {
t.Fatalf("expected duplicate provider warnings to be collapsed, got %#v", snapshot.ProviderWarnings)
}
}

View File

@@ -0,0 +1,90 @@
package jvm
import (
"context"
"GoNavi-Wails/internal/connection"
)
type JVMMonitoringPoint struct {
Timestamp int64 `json:"timestamp"`
HeapUsedBytes int64 `json:"heapUsedBytes,omitempty"`
HeapCommittedBytes int64 `json:"heapCommittedBytes,omitempty"`
HeapMaxBytes int64 `json:"heapMaxBytes,omitempty"`
NonHeapUsedBytes int64 `json:"nonHeapUsedBytes,omitempty"`
NonHeapCommittedBytes int64 `json:"nonHeapCommittedBytes,omitempty"`
GCCollectionCount int64 `json:"gcCollectionCount,omitempty"`
GCCollectionTimeMs int64 `json:"gcCollectionTimeMs,omitempty"`
GCDeltaCount int64 `json:"gcDeltaCount,omitempty"`
GCDeltaTimeMs int64 `json:"gcDeltaTimeMs,omitempty"`
ThreadCount int `json:"threadCount,omitempty"`
DaemonThreadCount int `json:"daemonThreadCount,omitempty"`
PeakThreadCount int `json:"peakThreadCount,omitempty"`
ThreadStateCounts map[string]int `json:"threadStateCounts,omitempty"`
LoadedClassCount int `json:"loadedClassCount,omitempty"`
UnloadedClassCount int64 `json:"unloadedClassCount,omitempty"`
ClassLoadDelta int64 `json:"classLoadDelta,omitempty"`
ProcessCpuLoad float64 `json:"processCpuLoad,omitempty"`
SystemCpuLoad float64 `json:"systemCpuLoad,omitempty"`
ProcessRssBytes int64 `json:"processRssBytes,omitempty"`
CommittedVirtualMemoryBytes int64 `json:"committedVirtualMemoryBytes,omitempty"`
}
type RecentGCEvent struct {
Timestamp int64 `json:"timestamp"`
Name string `json:"name,omitempty"`
Cause string `json:"cause,omitempty"`
Action string `json:"action,omitempty"`
DurationMs int64 `json:"durationMs,omitempty"`
BeforeUsedBytes int64 `json:"beforeUsedBytes,omitempty"`
AfterUsedBytes int64 `json:"afterUsedBytes,omitempty"`
}
type MonitoringSessionSnapshot struct {
ConnectionID string `json:"connectionId"`
ProviderMode string `json:"providerMode"`
Running bool `json:"running"`
Points []JVMMonitoringPoint `json:"points,omitempty"`
RecentGCEvents []RecentGCEvent `json:"recentGcEvents,omitempty"`
AvailableMetrics []string `json:"availableMetrics,omitempty"`
MissingMetrics []string `json:"missingMetrics,omitempty"`
ProviderWarnings []string `json:"providerWarnings,omitempty"`
}
type JVMMonitoringSnapshot struct {
Point JVMMonitoringPoint `json:"point"`
RecentGCEvents []RecentGCEvent `json:"recentGcEvents,omitempty"`
AvailableMetrics []string `json:"availableMetrics,omitempty"`
MissingMetrics []string `json:"missingMetrics,omitempty"`
ProviderWarnings []string `json:"providerWarnings,omitempty"`
}
type MonitoringCapableProvider interface {
Provider
GetMonitoringSnapshot(ctx context.Context, cfg connection.ConnectionConfig, previous *JVMMonitoringPoint) (JVMMonitoringSnapshot, error)
}
func finalizeMonitoringSnapshot(snapshot *JVMMonitoringSnapshot, previous *JVMMonitoringPoint) {
if snapshot == nil || previous == nil {
return
}
if hasMonitoringMetric(snapshot.AvailableMetrics, "gc.count") && snapshot.Point.GCCollectionCount >= previous.GCCollectionCount {
snapshot.Point.GCDeltaCount = snapshot.Point.GCCollectionCount - previous.GCCollectionCount
}
if hasMonitoringMetric(snapshot.AvailableMetrics, "gc.time") && snapshot.Point.GCCollectionTimeMs >= previous.GCCollectionTimeMs {
snapshot.Point.GCDeltaTimeMs = snapshot.Point.GCCollectionTimeMs - previous.GCCollectionTimeMs
}
if hasMonitoringMetric(snapshot.AvailableMetrics, "class.loading") {
snapshot.Point.ClassLoadDelta = int64(snapshot.Point.LoadedClassCount) - int64(previous.LoadedClassCount)
}
}
func hasMonitoringMetric(metrics []string, expected string) bool {
for _, metric := range metrics {
if metric == expected {
return true
}
}
return false
}

View File

@@ -1,5 +1,12 @@
package com.gonavi.jmxhelper;
import java.lang.management.ClassLoadingMXBean;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -46,6 +53,8 @@ final class JmxRuntime {
return listResources(server, connection, target);
case "get":
return singleton("snapshot", getValue(server, target));
case "monitor":
return singleton("monitoringSnapshot", getMonitoringSnapshot(server));
case "preview":
return singleton("preview", previewChange(server, target, change));
case "apply":
@@ -210,6 +219,208 @@ final class JmxRuntime {
throw new IllegalArgumentException("unsupported target kind: " + target.kind);
}
private static Map<String, Object> getMonitoringSnapshot(MBeanServerConnection server) throws Exception {
LinkedHashMap<String, Object> result = new LinkedHashMap<>();
LinkedHashMap<String, Object> point = new LinkedHashMap<>();
List<String> availableMetrics = new ArrayList<>();
List<String> missingMetrics = new ArrayList<>();
List<String> providerWarnings = new ArrayList<>();
long sampleTimestamp = System.currentTimeMillis();
point.put("timestamp", sampleTimestamp);
try {
ThreadMXBean threadBean = ManagementFactory.newPlatformMXBeanProxy(
server,
ManagementFactory.THREAD_MXBEAN_NAME,
ThreadMXBean.class
);
point.put("threadCount", threadBean.getThreadCount());
point.put("daemonThreadCount", threadBean.getDaemonThreadCount());
point.put("peakThreadCount", threadBean.getPeakThreadCount());
addUnique(availableMetrics, "thread.count");
long[] threadIds = threadBean.getAllThreadIds();
ThreadInfo[] infos = threadBean.getThreadInfo(threadIds, 0);
Map<String, Object> stateCounts = new LinkedHashMap<>();
for (ThreadInfo info : infos) {
if (info == null || info.getThreadState() == null) {
continue;
}
String state = info.getThreadState().name();
int current = stateCounts.get(state) instanceof Number
? ((Number) stateCounts.get(state)).intValue()
: 0;
stateCounts.put(state, current + 1);
}
if (!stateCounts.isEmpty()) {
point.put("threadStateCounts", stateCounts);
addUnique(availableMetrics, "thread.states");
}
} catch (Exception error) {
addUnique(missingMetrics, "thread.count");
addUnique(providerWarnings, "thread metrics unavailable: " + error.getMessage());
}
try {
MemoryMXBean memoryBean = ManagementFactory.newPlatformMXBeanProxy(
server,
ManagementFactory.MEMORY_MXBEAN_NAME,
MemoryMXBean.class
);
MemoryUsage heap = memoryBean.getHeapMemoryUsage();
if (heap != null) {
point.put("heapUsedBytes", heap.getUsed());
point.put("heapCommittedBytes", heap.getCommitted());
point.put("heapMaxBytes", heap.getMax());
addUnique(availableMetrics, "heap.used");
} else {
addUnique(missingMetrics, "heap.used");
}
MemoryUsage nonHeap = memoryBean.getNonHeapMemoryUsage();
if (nonHeap != null) {
point.put("nonHeapUsedBytes", nonHeap.getUsed());
point.put("nonHeapCommittedBytes", nonHeap.getCommitted());
addUnique(availableMetrics, "heap.non_heap");
}
} catch (Exception error) {
addUnique(missingMetrics, "heap.used");
addUnique(providerWarnings, "heap metrics unavailable: " + error.getMessage());
}
try {
ClassLoadingMXBean classLoadingBean = ManagementFactory.newPlatformMXBeanProxy(
server,
ManagementFactory.CLASS_LOADING_MXBEAN_NAME,
ClassLoadingMXBean.class
);
point.put("loadedClassCount", classLoadingBean.getLoadedClassCount());
point.put("unloadedClassCount", classLoadingBean.getUnloadedClassCount());
addUnique(availableMetrics, "class.loading");
} catch (Exception error) {
addUnique(missingMetrics, "class.loading");
addUnique(providerWarnings, "class loading metrics unavailable: " + error.getMessage());
}
try {
List<Map<String, Object>> recentGcEvents = new ArrayList<>();
long totalCount = 0L;
long totalTime = 0L;
Set<ObjectName> names = server.queryNames(
new ObjectName(ManagementFactory.GARBAGE_COLLECTOR_MXBEAN_DOMAIN_TYPE + ",*"),
null
);
for (ObjectName name : names) {
Long collectionCount = safeLongAttribute(server, name, "CollectionCount");
if (collectionCount != null && collectionCount >= 0L) {
totalCount += collectionCount.longValue();
}
Long collectionTime = safeLongAttribute(server, name, "CollectionTime");
if (collectionTime != null && collectionTime >= 0L) {
totalTime += collectionTime.longValue();
}
Object lastGcInfo = safeAttribute(server, name, "LastGcInfo");
if (lastGcInfo instanceof CompositeData) {
CompositeData data = (CompositeData) lastGcInfo;
LinkedHashMap<String, Object> event = new LinkedHashMap<>();
event.put("timestamp", sampleTimestamp);
event.put("name", name.getKeyProperty("name"));
Object gcCause = compositeValue(data, "GcCause");
if (gcCause != null) {
event.put("cause", String.valueOf(gcCause));
}
Object gcAction = compositeValue(data, "GcAction");
if (gcAction != null) {
event.put("action", String.valueOf(gcAction));
}
Object duration = compositeValue(data, "duration");
if (duration instanceof Number) {
event.put("durationMs", ((Number) duration).longValue());
}
long beforeUsedBytes = sumMemoryUsage(compositeValue(data, "memoryUsageBeforeGc"));
if (beforeUsedBytes > 0L) {
event.put("beforeUsedBytes", beforeUsedBytes);
}
long afterUsedBytes = sumMemoryUsage(compositeValue(data, "memoryUsageAfterGc"));
if (afterUsedBytes > 0L) {
event.put("afterUsedBytes", afterUsedBytes);
}
recentGcEvents.add(event);
}
}
point.put("gcCollectionCount", totalCount);
point.put("gcCollectionTimeMs", totalTime);
result.put("recentGcEvents", recentGcEvents);
addUnique(availableMetrics, "gc.count");
addUnique(availableMetrics, "gc.time");
if (!recentGcEvents.isEmpty()) {
addUnique(availableMetrics, "gc.events");
} else {
addUnique(missingMetrics, "gc.events");
}
} catch (Exception error) {
addUnique(missingMetrics, "gc.count");
addUnique(missingMetrics, "gc.time");
addUnique(missingMetrics, "gc.events");
addUnique(providerWarnings, "gc metrics unavailable: " + error.getMessage());
}
try {
ObjectName osName = new ObjectName("java.lang:type=OperatingSystem");
Double processCpuLoad = safeDoubleAttribute(server, osName, "ProcessCpuLoad");
if (processCpuLoad != null && processCpuLoad >= 0d) {
point.put("processCpuLoad", processCpuLoad.doubleValue());
addUnique(availableMetrics, "cpu.process");
} else {
addUnique(missingMetrics, "cpu.process");
}
Double systemCpuLoad = safeDoubleAttribute(server, osName, "SystemCpuLoad");
if (systemCpuLoad != null && systemCpuLoad >= 0d) {
point.put("systemCpuLoad", systemCpuLoad.doubleValue());
addUnique(availableMetrics, "cpu.system");
} else {
addUnique(missingMetrics, "cpu.system");
}
Long processRssBytes = firstNumericAttribute(
server,
osName,
"ProcessResidentMemorySize",
"ResidentSetSize",
"ResidentMemorySize"
);
if (processRssBytes != null && processRssBytes >= 0L) {
point.put("processRssBytes", processRssBytes.longValue());
addUnique(availableMetrics, "memory.rss");
} else {
addUnique(missingMetrics, "memory.rss");
}
Long committedVirtualMemoryBytes = safeLongAttribute(server, osName, "CommittedVirtualMemorySize");
if (committedVirtualMemoryBytes != null && committedVirtualMemoryBytes >= 0L) {
point.put("committedVirtualMemoryBytes", committedVirtualMemoryBytes.longValue());
addUnique(availableMetrics, "memory.virtual");
} else {
addUnique(missingMetrics, "memory.virtual");
}
} catch (Exception error) {
addUnique(missingMetrics, "cpu.process");
addUnique(missingMetrics, "cpu.system");
addUnique(missingMetrics, "memory.rss");
addUnique(missingMetrics, "memory.virtual");
addUnique(providerWarnings, "process/system metrics unavailable: " + error.getMessage());
}
result.put("point", point);
result.put("availableMetrics", availableMetrics);
result.put("missingMetrics", missingMetrics);
result.put("providerWarnings", providerWarnings);
return result;
}
private static Map<String, Object> previewChange(
MBeanServerConnection server,
TargetSpec target,
@@ -858,6 +1069,82 @@ final class JmxRuntime {
return result;
}
private static void addUnique(List<String> items, String value) {
if (value == null || value.isEmpty() || items.contains(value)) {
return;
}
items.add(value);
}
private static Object safeAttribute(MBeanServerConnection server, ObjectName objectName, String attribute) {
try {
return server.getAttribute(objectName, attribute);
} catch (Exception error) {
return null;
}
}
private static Long safeLongAttribute(MBeanServerConnection server, ObjectName objectName, String attribute) {
Object value = safeAttribute(server, objectName, attribute);
return value instanceof Number ? ((Number) value).longValue() : null;
}
private static Double safeDoubleAttribute(MBeanServerConnection server, ObjectName objectName, String attribute) {
Object value = safeAttribute(server, objectName, attribute);
return value instanceof Number ? ((Number) value).doubleValue() : null;
}
private static Long firstNumericAttribute(
MBeanServerConnection server,
ObjectName objectName,
String... attributeNames
) {
for (String attributeName : attributeNames) {
Long value = safeLongAttribute(server, objectName, attributeName);
if (value != null) {
return value;
}
}
return null;
}
private static Object compositeValue(CompositeData data, String key) {
if (data == null || key == null || !data.getCompositeType().containsKey(key)) {
return null;
}
return data.get(key);
}
private static long sumMemoryUsage(Object value) {
long total = 0L;
if (value instanceof TabularData) {
for (Object item : ((TabularData) value).values()) {
total += usedFromMemoryUsage(item);
}
return total;
}
if (value instanceof Map<?, ?>) {
for (Object item : ((Map<?, ?>) value).values()) {
total += usedFromMemoryUsage(item);
}
return total;
}
return usedFromMemoryUsage(value);
}
private static long usedFromMemoryUsage(Object value) {
if (!(value instanceof CompositeData)) {
return 0L;
}
CompositeData data = (CompositeData) value;
Object used = compositeValue(data, "used");
if (used instanceof Number) {
return ((Number) used).longValue();
}
Object nestedValue = compositeValue(data, "value");
return usedFromMemoryUsage(nestedValue);
}
@SuppressWarnings("unchecked")
private static Map<String, Object> requiredObject(Object value, String label) {
if (value instanceof Map<?, ?>) {