@@ -20,6 +20,7 @@ import java.util.*;
import java.util.concurrent.CompletableFuture ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.CopyOnWriteArrayList ;
import java.util.concurrent.Semaphore ;
import java.util.stream.Collectors ;
@RestController
@@ -71,14 +72,6 @@ public class ApiServerV1 {
}
}
}
@GetMapping ( " /status/history/{ip} " )
public List < Status > getStatus ( @PathVariable String ip , @RequestParam ( defaultValue = " 0 " , required = false ) int limit ) {
List < Status > statuses = statusDao . findByHostOrderByTimestampDesc ( ip ) ;
if ( limit > 0 ) {
return statuses . stream ( ) . limit ( limit ) . collect ( Collectors . toList ( ) ) ;
}
return statuses ;
}
@GetMapping ( " /server " )
public List < Server > getAllServers ( ) {
@@ -138,4 +131,178 @@ public class ApiServerV1 {
public void deleteServer ( @RequestBody Server server ) {
serverDao . delete ( server ) ;
}
@GetMapping ( " /status/history/{ip} " )
public List < Status > getStatus ( @PathVariable String ip ,
@RequestParam ( defaultValue = " 0000-00-00 00:00:00 " , required = false ) String startDate ,
@RequestParam ( defaultValue = " 9999-99-99 99:99:99 " , required = false ) String endDate ) {
System . out . println ( " getStatus: " + ip + " 开始加载 " ) ;
List < Status > statuses = statusDao . findByHostOrderByTimestampDesc ( ip ) ;
System . out . println ( " getStatus: " + ip + " 加载完成 " ) ;
// 时间范围过滤
LocalDateTime start = parseDateTime ( startDate ) ;
LocalDateTime end = parseDateTime ( endDate ) ;
List < Status > filteredStatuses = statuses . stream ( )
. filter ( status - > status . getTimestamp ( ) ! = null )
. filter ( status - > ! status . getTimestamp ( ) . isBefore ( start ) )
. filter ( status - > ! status . getTimestamp ( ) . isAfter ( end ) )
. collect ( Collectors . toList ( ) ) ;
// 数据优化 - 移除变化较小的数据点
List < Status > optimizedStatuses = optimizeStatusData ( filteredStatuses ) ;
System . out . println ( " getStatus: " + ip + " 数据优化完成 " ) ;
System . out . print ( optimizedStatuses . size ( ) ) ;
return optimizedStatuses ;
}
@GetMapping ( " /status/history/all/{ips} " )
public Map < String , List < Status > > getStatusIps ( @PathVariable String ips ,
@RequestParam ( defaultValue = " 0000-00-00 00:00:00 " , required = false ) String startDate ,
@RequestParam ( defaultValue = " 9999-99-99 99:99:99 " , required = false ) String endDate ) {
List < String > ipList = Arrays . asList ( ips . split ( " , " ) ) ;
Map < String , List < Status > > optimizedStatuses = new ConcurrentHashMap < > ( ) ;
// 限制并发数量,避免数据库连接耗尽
Semaphore semaphore = new Semaphore ( 10 ) ; // 最多同时处理10个IP
List < CompletableFuture < Void > > futures = ipList . stream ( )
. map ( ip - > CompletableFuture . runAsync ( ( ) - > {
try {
semaphore . acquire ( ) ; // 获取许可
List < Status > statuses = statusDao . findByHostOrderByTimestampDesc ( ip ) ;
// 时间范围过滤
LocalDateTime start = parseDateTime ( startDate ) ;
LocalDateTime end = parseDateTime ( endDate ) ;
List < Status > filteredStatuses = statuses . stream ( )
. filter ( status - > status . getTimestamp ( ) ! = null )
. filter ( status - > ! status . getTimestamp ( ) . isBefore ( start ) )
. filter ( status - > ! status . getTimestamp ( ) . isAfter ( end ) )
. collect ( Collectors . toList ( ) ) ;
// 数据优化 - 移除变化较小的数据点
List < Status > optimizedStatuse = optimizeStatusData ( filteredStatuses ) ;
optimizedStatuses . put ( ip , optimizedStatuse ) ;
} catch ( Exception e ) {
log . error ( " Error processing status for IP: {} " , ip , e ) ;
optimizedStatuses . put ( ip , new ArrayList < > ( ) ) ;
} finally {
semaphore . release ( ) ; // 释放许可
}
} ) )
. collect ( Collectors . toList ( ) ) ;
// 等待所有任务完成
CompletableFuture . allOf ( futures . toArray ( new CompletableFuture [ 0 ] ) ) . join ( ) ;
return optimizedStatuses ;
}
private LocalDateTime parseDateTime ( String dateTimeStr ) {
try {
// 处理默认值情况
if ( " 0000-00-00 00:00:00 " . equals ( dateTimeStr ) | | " 9999-99-99 99:99:99 " . equals ( dateTimeStr ) ) {
return " 0000-00-00 00:00:00 " . equals ( dateTimeStr ) ?
LocalDateTime . MIN : LocalDateTime . MAX ;
}
return LocalDateTime . parse ( dateTimeStr . replace ( " " , " T " ) ) ;
} catch ( Exception e ) {
return LocalDateTime . now ( ) ;
}
}
private List < Status > optimizeStatusData ( List < Status > statuses ) {
if ( statuses . size ( ) < = 2 ) {
return statuses ; // 数据点太少无需优化
}
List < Status > result = new ArrayList < > ( ) ;
result . add ( statuses . get ( 0 ) ) ; // 始终保留第一个点
// 计算CPU和内存使用率变化的阈值( 基于整体数据计算)
double cpuThreshold = calculateThreshold ( statuses , Status : : getCpuUsagePercent ) ;
double memoryThreshold = calculateThreshold ( statuses , Status : : getMemoryUsagePercent ) ;
// 设置最小阈值,避免过度优化
cpuThreshold = Math . max ( cpuThreshold , 0 . 5 ) ; // 最小0.5%
memoryThreshold = Math . max ( memoryThreshold , 0 . 5 ) ; // 最小0.5%
Status previousStatus = statuses . get ( 0 ) ;
for ( int i = 1 ; i < statuses . size ( ) - 1 ; i + + ) {
Status current = statuses . get ( i ) ;
// 检查CPU或内存使用率是否有显著变化
boolean significantChange =
hasSignificantChange ( previousStatus , current , cpuThreshold , memoryThreshold ) ;
if ( significantChange ) {
result . add ( current ) ;
previousStatus = current ;
}
}
// 始终保留最后一个点
if ( ! statuses . isEmpty ( ) & & ! result . contains ( statuses . get ( statuses . size ( ) - 1 ) ) ) {
result . add ( statuses . get ( statuses . size ( ) - 1 ) ) ;
}
return result ;
}
private boolean hasSignificantChange ( Status prev , Status curr ,
double cpuThreshold , double memoryThreshold ) {
Double prevCpu = prev . getCpuUsagePercent ( ) ;
Double currCpu = curr . getCpuUsagePercent ( ) ;
Double prevMem = prev . getMemoryUsagePercent ( ) ;
Double currMem = curr . getMemoryUsagePercent ( ) ;
// 如果任一值为空,则认为有变化
if ( prevCpu = = null | | currCpu = = null | | prevMem = = null | | currMem = = null ) {
return true ;
}
// 检查变化是否超过阈值
return Math . abs ( currCpu - prevCpu ) > = cpuThreshold | |
Math . abs ( currMem - prevMem ) > = memoryThreshold ;
}
// 在Status类中添加辅助方法获取使用率数值
private static class StatusExtensions {
public static Double getCpuUsagePercent ( Status status ) {
return status . getCpuInfo ( ) ! = null ? status . getCpuInfo ( ) . getUsagePercent ( ) : null ;
}
public static Double getMemoryUsagePercent ( Status status ) {
return status . getMemoryInfo ( ) ! = null ? status . getMemoryInfo ( ) . getUsagePercent ( ) : null ;
}
}
private double calculateThreshold ( List < Status > statuses , java . util . function . Function < Status , Double > getter ) {
// 收集所有有效数值
List < Double > values = statuses . stream ( )
. map ( getter )
. filter ( Objects : : nonNull )
. collect ( Collectors . toList ( ) ) ;
if ( values . size ( ) < 2 ) {
return 0 . 0 ;
}
// 计算相邻数值间的差值
List < Double > differences = new ArrayList < > ( ) ;
for ( int i = 1 ; i < values . size ( ) ; i + + ) {
differences . add ( Math . abs ( values . get ( i ) - values . get ( i - 1 ) ) ) ;
}
// 计算平均差值作为阈值基础
double averageDifference = differences . stream ( )
. mapToDouble ( Double : : doubleValue )
. average ( )
. orElse ( 0 . 0 ) ;
// 返回平均差值的一半作为阈值,这样可以过滤掉较小的变化
return averageDifference / 2 ;
}
}