diff --git a/src/main/java/com/xujie/sys/modules/reader/config/TCPClient.java b/src/main/java/com/xujie/sys/modules/reader/config/TCPClient.java index b2ea7766..3abde8ef 100644 --- a/src/main/java/com/xujie/sys/modules/reader/config/TCPClient.java +++ b/src/main/java/com/xujie/sys/modules/reader/config/TCPClient.java @@ -3,6 +3,7 @@ package com.xujie.sys.modules.reader.config; import com.xujie.sys.modules.reader.entity.EquipmentDataDetail; import com.xujie.sys.modules.reader.entity.EquipmentFolderLocation; import com.xujie.sys.modules.reader.service.EquipmentDataDetailService; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -13,13 +14,12 @@ import java.io.IOException; import java.io.InputStreamReader; import java.net.InetSocketAddress; import java.net.Socket; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.stream.Collectors; @Slf4j @Component @@ -28,10 +28,13 @@ public class TCPClient { @Autowired private EquipmentDataDetailService equipmentDataDetailService; - // 存储活跃的连接 + @Getter private final Map socketMap = new ConcurrentHashMap<>(); - // 线程池用于管理并发连接 + private final ExecutorService executorService = Executors.newCachedThreadPool(); + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); // 定时任务线程池 + + private static final long CHECK_INTERVAL = 30; // 定时检测间隔(秒) public void start(List serverConfigs) { startTCPClient(serverConfigs); @@ -42,14 +45,15 @@ public class TCPClient { clearAllConnections(); } log.info("需要连接的设备数量: {}", serverConfigs.size()); - // 为每个设备配置启动一个新的连接任务 + for (EquipmentFolderLocation config : serverConfigs) { - executorService.submit(new Runnable() { - @Override - public void run() { - connectAndRead(config); - } - }); + executorService.submit(() -> connectAndRead(config)); + // 为每个设备配置一个定时任务,定期检测连接状态 + if (false){ + scheduler.scheduleWithFixedDelay(() -> { + checkConnection(config); + }, CHECK_INTERVAL, CHECK_INTERVAL, TimeUnit.SECONDS); + } } } @@ -64,37 +68,37 @@ public class TCPClient { try { socket = new Socket(); InetSocketAddress socketAddress = new InetSocketAddress(serverAddress, port); - socket.connect(socketAddress,30000); + socket.connect(socketAddress, 30000); if (!socket.isConnected()) { log.error("连接设备失败: IP: {}, Port: {}", config.getIp(), config.getPort()); return; } in = new BufferedReader(new InputStreamReader(socket.getInputStream())); - socketMap.put(key, socket); // 保存新的连接 + socketMap.put(key, socket); // 保存新连接 log.info("连接设备成功, IP: {}, Port: {}", config.getIp(), config.getPort()); // 读取数据 String responseLine; while ((responseLine = in.readLine()) != null) { - if (StringUtils.hasText(responseLine) && !"STO".equals(responseLine) && !"\u0018".equals(responseLine)) { + if (StringUtils.hasText(responseLine)) { log.info("{}设备, IP: {}, Port: {}, 采集数据: {}", config.getEquipmentNo(), config.getIp(), config.getPort(), responseLine); - if (StringUtils.hasText(config.getRegexp())){ - log.info("正则表达式: {}",config.getRegexp()); + if (StringUtils.hasText(config.getRegexp())) { + log.info("正则表达式: {}", config.getRegexp()); Pattern pattern = Pattern.compile(config.getRegexp()); Matcher matcher = pattern.matcher(responseLine.trim()); - if (matcher.find()){ + if (matcher.find()) { saveData(config, matcher.group(1)); } - }else { + } else { saveData(config, responseLine); } } } + } catch (IOException e) { log.error("连接设备失败或读取数据时发生错误, IP: {}, Port: {}", config.getIp(), config.getPort(), e); } finally { - // 确保在退出时关闭连接和流 try { if (in != null) { in.close(); @@ -110,11 +114,6 @@ public class TCPClient { } } - /** - * 处理并保存数据 - * @param equipmentFolderLocation 设备信息 - * @param value 数据值 - */ private void saveData(EquipmentFolderLocation equipmentFolderLocation, String value) { EquipmentDataDetail detail = new EquipmentDataDetail(); detail.setEquipmentNo(equipmentFolderLocation.getEquipmentNo()); @@ -125,9 +124,23 @@ public class TCPClient { detail.setValue0(value); equipmentDataDetailService.save(detail); } + /** + * 定时检测连接状态 + * @param config 设备配置 + */ + private void checkConnection(EquipmentFolderLocation config) { + String key = config.getIp() + ":" + config.getPort(); + Socket socket = socketMap.get(key); + + if (socket == null || socket.isClosed() || !socket.isConnected()) { + log.warn("检测到连接断开,尝试重新连接: {}", key); + closeConnection(key); // 先关闭连接 + executorService.submit(() -> connectAndRead(config)); // 尝试重新连接 + } + } /** - * 通过 key 关闭连接 + * 关闭指定连接 * @param key 连接的标识符 */ private void closeConnection(String key) { @@ -152,19 +165,4 @@ public class TCPClient { closeConnection(key); } } - - public static void main(String[] args) { - String input = "1.79260e-08,1.15843e-03,40"; - String regex = "^([1-9]\\d*(\\.\\d+)?|1\\.\\d+)(?=e.*|,|$)"; - - Pattern pattern = Pattern.compile(regex); - Matcher matcher = pattern.matcher(input); - System.out.println(regex.length()); - if (matcher.find()) { - String matchedValue = matcher.group(1); - System.out.println("匹配到的值:" + matchedValue); - } else { - System.out.println("没有匹配到符合条件的值"); - } - } } diff --git a/src/main/java/com/xujie/sys/modules/reader/service/impl/ModbusCommunicateServiceImpl.java b/src/main/java/com/xujie/sys/modules/reader/service/impl/ModbusCommunicateServiceImpl.java index 37b8a090..5d422caf 100644 --- a/src/main/java/com/xujie/sys/modules/reader/service/impl/ModbusCommunicateServiceImpl.java +++ b/src/main/java/com/xujie/sys/modules/reader/service/impl/ModbusCommunicateServiceImpl.java @@ -194,7 +194,7 @@ public class ModbusCommunicateServiceImpl implements ModbusCommunicateService { detail.setCreateDate(createDate); detail.setEquipmentNo(device.getEquipmentNo()); equipmentDataDetailService.save(detail); - if (num == 0){ + if (num == 0 || num == 0.0){ sort.setBatchNo(sort.getBatchNo() + 1); } equipmentFolderSortService.lambdaUpdate()