Browse Source

20241025

java8
qiezi 1 year ago
parent
commit
05bc3d7588
  1. 84
      src/main/java/com/xujie/sys/modules/reader/config/TCPClient.java
  2. 2
      src/main/java/com/xujie/sys/modules/reader/service/impl/ModbusCommunicateServiceImpl.java

84
src/main/java/com/xujie/sys/modules/reader/config/TCPClient.java

@ -3,6 +3,7 @@ package com.xujie.sys.modules.reader.config;
import com.xujie.sys.modules.reader.entity.EquipmentDataDetail;
import com.xujie.sys.modules.reader.entity.EquipmentFolderLocation;
import com.xujie.sys.modules.reader.service.EquipmentDataDetailService;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -13,13 +14,12 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@Slf4j
@Component
@ -28,10 +28,13 @@ public class TCPClient {
@Autowired
private EquipmentDataDetailService equipmentDataDetailService;
// 存储活跃的连接
@Getter
private final Map<String, Socket> socketMap = new ConcurrentHashMap<>();
// 线程池用于管理并发连接
private final ExecutorService executorService = Executors.newCachedThreadPool();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); // 定时任务线程池
private static final long CHECK_INTERVAL = 30; // 定时检测间隔
public void start(List<EquipmentFolderLocation> serverConfigs) {
startTCPClient(serverConfigs);
@ -42,14 +45,15 @@ public class TCPClient {
clearAllConnections();
}
log.info("需要连接的设备数量: {}", serverConfigs.size());
// 为每个设备配置启动一个新的连接任务
for (EquipmentFolderLocation config : serverConfigs) {
executorService.submit(new Runnable() {
@Override
public void run() {
connectAndRead(config);
}
});
executorService.submit(() -> connectAndRead(config));
// 为每个设备配置一个定时任务定期检测连接状态
if (false){
scheduler.scheduleWithFixedDelay(() -> {
checkConnection(config);
}, CHECK_INTERVAL, CHECK_INTERVAL, TimeUnit.SECONDS);
}
}
}
@ -64,37 +68,37 @@ public class TCPClient {
try {
socket = new Socket();
InetSocketAddress socketAddress = new InetSocketAddress(serverAddress, port);
socket.connect(socketAddress,30000);
socket.connect(socketAddress, 30000);
if (!socket.isConnected()) {
log.error("连接设备失败: IP: {}, Port: {}", config.getIp(), config.getPort());
return;
}
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
socketMap.put(key, socket); // 保存新连接
socketMap.put(key, socket); // 保存新连接
log.info("连接设备成功, IP: {}, Port: {}", config.getIp(), config.getPort());
// 读取数据
String responseLine;
while ((responseLine = in.readLine()) != null) {
if (StringUtils.hasText(responseLine) && !"STO".equals(responseLine) && !"\u0018".equals(responseLine)) {
if (StringUtils.hasText(responseLine)) {
log.info("{}设备, IP: {}, Port: {}, 采集数据: {}", config.getEquipmentNo(), config.getIp(), config.getPort(), responseLine);
if (StringUtils.hasText(config.getRegexp())){
log.info("正则表达式: {}",config.getRegexp());
if (StringUtils.hasText(config.getRegexp())) {
log.info("正则表达式: {}", config.getRegexp());
Pattern pattern = Pattern.compile(config.getRegexp());
Matcher matcher = pattern.matcher(responseLine.trim());
if (matcher.find()){
if (matcher.find()) {
saveData(config, matcher.group(1));
}
}else {
} else {
saveData(config, responseLine);
}
}
}
} catch (IOException e) {
log.error("连接设备失败或读取数据时发生错误, IP: {}, Port: {}", config.getIp(), config.getPort(), e);
} finally {
// 确保在退出时关闭连接和流
try {
if (in != null) {
in.close();
@ -110,11 +114,6 @@ public class TCPClient {
}
}
/**
* 处理并保存数据
* @param equipmentFolderLocation 设备信息
* @param value 数据值
*/
private void saveData(EquipmentFolderLocation equipmentFolderLocation, String value) {
EquipmentDataDetail detail = new EquipmentDataDetail();
detail.setEquipmentNo(equipmentFolderLocation.getEquipmentNo());
@ -125,9 +124,23 @@ public class TCPClient {
detail.setValue0(value);
equipmentDataDetailService.save(detail);
}
/**
* 定时检测连接状态
* @param config 设备配置
*/
private void checkConnection(EquipmentFolderLocation config) {
String key = config.getIp() + ":" + config.getPort();
Socket socket = socketMap.get(key);
if (socket == null || socket.isClosed() || !socket.isConnected()) {
log.warn("检测到连接断开,尝试重新连接: {}", key);
closeConnection(key); // 先关闭连接
executorService.submit(() -> connectAndRead(config)); // 尝试重新连接
}
}
/**
* 通过 key 关闭连接
* 关闭指定连接
* @param key 连接的标识符
*/
private void closeConnection(String key) {
@ -152,19 +165,4 @@ public class TCPClient {
closeConnection(key);
}
}
public static void main(String[] args) {
String input = "1.79260e-08,1.15843e-03,40";
String regex = "^([1-9]\\d*(\\.\\d+)?|1\\.\\d+)(?=e.*|,|$)";
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(input);
System.out.println(regex.length());
if (matcher.find()) {
String matchedValue = matcher.group(1);
System.out.println("匹配到的值:" + matchedValue);
} else {
System.out.println("没有匹配到符合条件的值");
}
}
}

2
src/main/java/com/xujie/sys/modules/reader/service/impl/ModbusCommunicateServiceImpl.java

@ -194,7 +194,7 @@ public class ModbusCommunicateServiceImpl implements ModbusCommunicateService {
detail.setCreateDate(createDate);
detail.setEquipmentNo(device.getEquipmentNo());
equipmentDataDetailService.save(detail);
if (num == 0){
if (num == 0 || num == 0.0){
sort.setBatchNo(sort.getBatchNo() + 1);
}
equipmentFolderSortService.lambdaUpdate()

Loading…
Cancel
Save