7 changed files with 390 additions and 2 deletions
-
165src/main/java/com/heai/modules/production/config/TCPClient.java
-
18src/main/java/com/heai/modules/production/dao/TcpMapper.java
-
52src/main/java/com/heai/modules/production/entity/ScaleTCPConfigData.java
-
84src/main/java/com/heai/modules/production/entity/TcpData.java
-
56src/main/java/com/heai/modules/production/service/impl/AbnormalServiceImpl.java
-
4src/main/java/com/heai/modules/production/service/impl/DailyPlanServiceImpl.java
-
13src/main/resources/mapper/production/TcpMapper.xml
@ -0,0 +1,165 @@ |
|||
package com.heai.modules.production.config; |
|||
|
|||
import com.heai.modules.production.dao.TcpMapper; |
|||
import com.heai.modules.production.entity.ScaleTCPConfigData; |
|||
import com.heai.modules.production.entity.TcpData; |
|||
import lombok.Getter; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.stereotype.Component; |
|||
import org.springframework.util.StringUtils; |
|||
|
|||
import java.io.BufferedReader; |
|||
import java.io.IOException; |
|||
import java.io.InputStreamReader; |
|||
import java.net.InetSocketAddress; |
|||
import java.net.Socket; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.concurrent.*; |
|||
import java.util.regex.Matcher; |
|||
import java.util.regex.Pattern; |
|||
|
|||
@Slf4j |
|||
@Component |
|||
public class TCPClient { |
|||
|
|||
@Getter |
|||
private final Map<String, Socket> socketMap = new ConcurrentHashMap<>(); |
|||
|
|||
@Autowired |
|||
private TcpMapper tcpMapper; |
|||
|
|||
private final ExecutorService executorService = Executors.newCachedThreadPool(); |
|||
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); // 定时任务线程池 |
|||
|
|||
private static final long CHECK_INTERVAL = 5; // 定时检测间隔(秒) |
|||
|
|||
public void start(List<ScaleTCPConfigData> scaleTCPConfigData) { |
|||
startTCPClient(scaleTCPConfigData); |
|||
} |
|||
private void startTCPClient(List<ScaleTCPConfigData> scaleTCPConfigData) { |
|||
if (!socketMap.isEmpty()){ |
|||
clearAllConnections(); |
|||
} |
|||
log.info("需要连接的设备数量: {}", scaleTCPConfigData.size()); |
|||
|
|||
for (ScaleTCPConfigData configData : scaleTCPConfigData) { |
|||
executorService.submit(() -> connectAndRead(configData)); |
|||
// 为每个设备配置一个定时任务,定期检测连接状态 这个业务 重连不能由后台去触发 |
|||
// scheduler.scheduleWithFixedDelay(() -> { |
|||
// checkConnection(configData); |
|||
// }, CHECK_INTERVAL, CHECK_INTERVAL, TimeUnit.SECONDS); |
|||
} |
|||
} |
|||
|
|||
private void connectAndRead(ScaleTCPConfigData configData) { |
|||
String serverAddress = configData.getIp(); |
|||
int port = configData.getPort(); |
|||
String key = serverAddress + ":" + port; |
|||
|
|||
BufferedReader in = null; |
|||
Socket socket = null; |
|||
|
|||
try { |
|||
socket = new Socket(); |
|||
InetSocketAddress socketAddress = new InetSocketAddress(serverAddress, port); |
|||
socket.connect(socketAddress, 30000); |
|||
|
|||
if (!socket.isConnected()) { |
|||
log.error("连接设备失败: IP: {}, Port: {}", configData.getIp(), configData.getPort()); |
|||
return; |
|||
} |
|||
in = new BufferedReader(new InputStreamReader(socket.getInputStream())); |
|||
socketMap.put(key, socket); // 保存新连接 |
|||
log.info("连接设备成功, IP: {}, Port: {}", configData.getIp(), configData.getPort()); |
|||
|
|||
// 读取数据 |
|||
String responseLine; |
|||
while ((responseLine = in.readLine()) != null) { |
|||
if (StringUtils.hasText(responseLine)) { |
|||
// log.info("{}设备, IP: {}, Port: {}, 采集数据: {}", configData.getEquipmentNo(), configData.getIp(), configData.getPort(), responseLine); |
|||
// if (StringUtils.hasText(configData.getRegexp())) { |
|||
// log.info("正则表达式: {}", configData.getRegexp()); |
|||
// Pattern pattern = Pattern.compile(configData.getRegexp()); |
|||
// Matcher matcher = pattern.matcher(responseLine.trim()); |
|||
// if (matcher.find()) { |
|||
// saveData(configData, matcher.group(1)); |
|||
// } |
|||
// } else { |
|||
// saveData(configData, responseLine); |
|||
// } |
|||
|
|||
TcpData tcpData=new TcpData(); |
|||
tcpData.setIp(configData.getIp()); |
|||
tcpData.setPort(configData.getPort()); |
|||
tcpData.setType(configData.getType()); |
|||
tcpData.setValue(responseLine.trim()); |
|||
tcpData.setFlag(configData.getDataFlag()); |
|||
tcpMapper.saveTcpData(tcpData); |
|||
} |
|||
} |
|||
|
|||
} catch (IOException e) { |
|||
log.error("连接设备失败或读取数据时发生错误, IP: {}, Port: {}", configData.getIp(), configData.getPort(), e); |
|||
closeConnection(key); |
|||
} finally { |
|||
try { |
|||
if (in != null) { |
|||
in.close(); |
|||
} |
|||
if (socket != null && !socket.isClosed()) { |
|||
socket.close(); |
|||
} |
|||
socketMap.remove(key); |
|||
log.info("连接已关闭: {}", key); |
|||
} catch (IOException e) { |
|||
log.error("关闭连接失败: {}", key, e); |
|||
} |
|||
} |
|||
} |
|||
|
|||
|
|||
/** |
|||
* 清除所有连接 |
|||
*/ |
|||
private void clearAllConnections() { |
|||
log.info("清除所有连接"); |
|||
for (String key : socketMap.keySet()) { |
|||
closeConnection(key); |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 关闭指定连接 |
|||
* @param key 连接的标识符 |
|||
*/ |
|||
private void closeConnection(String key) { |
|||
try { |
|||
Socket socket = socketMap.get(key); |
|||
if (socket != null && !socket.isClosed()) { |
|||
socket.close(); |
|||
socketMap.remove(key); |
|||
log.info("连接已关闭: {}", key); |
|||
} |
|||
} catch (IOException e) { |
|||
log.error("关闭连接失败: {}", key, e); |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 定时检测连接状态 |
|||
* @param config 设备配置 |
|||
*/ |
|||
public void checkConnection(ScaleTCPConfigData config) { |
|||
String key = config.getIp() + ":" + config.getPort(); |
|||
Socket socket = socketMap.get(key); |
|||
|
|||
if (socket == null || socket.isClosed() || !socket.isConnected()) { |
|||
log.warn("检测到连接断开,尝试重新连接: {}", key); |
|||
closeConnection(key); // 先关闭连接 |
|||
executorService.submit(() -> connectAndRead(config)); // 尝试重新连接 |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,18 @@ |
|||
package com.heai.modules.production.dao; |
|||
|
|||
import com.heai.modules.production.entity.ScaleTCPConfigData; |
|||
import com.heai.modules.production.entity.TcpData; |
|||
import org.apache.ibatis.annotations.Mapper; |
|||
import org.springframework.stereotype.Repository; |
|||
|
|||
@Mapper |
|||
@Repository |
|||
public interface TcpMapper { |
|||
|
|||
void saveTcpData(TcpData data); |
|||
|
|||
ScaleTCPConfigData selectTCPMachine(String type); |
|||
|
|||
|
|||
|
|||
} |
|||
@ -0,0 +1,52 @@ |
|||
package com.heai.modules.production.entity; |
|||
|
|||
import org.apache.ibatis.type.Alias; |
|||
|
|||
@Alias("ScaleTCPConfigData") |
|||
public class ScaleTCPConfigData { |
|||
private String ip; |
|||
private Integer port; |
|||
private String remark; |
|||
private String type; |
|||
private String dataFlag; |
|||
|
|||
public String getIp() { |
|||
return ip; |
|||
} |
|||
|
|||
public void setIp(String ip) { |
|||
this.ip = ip; |
|||
} |
|||
|
|||
public Integer getPort() { |
|||
return port; |
|||
} |
|||
|
|||
public void setPort(Integer port) { |
|||
this.port = port; |
|||
} |
|||
|
|||
public String getRemark() { |
|||
return remark; |
|||
} |
|||
|
|||
public void setRemark(String remark) { |
|||
this.remark = remark; |
|||
} |
|||
|
|||
public String getType() { |
|||
return type; |
|||
} |
|||
|
|||
public void setType(String type) { |
|||
this.type = type; |
|||
} |
|||
|
|||
public String getDataFlag() { |
|||
return dataFlag; |
|||
} |
|||
|
|||
public void setDataFlag(String dataFlag) { |
|||
this.dataFlag = dataFlag; |
|||
} |
|||
} |
|||
@ -0,0 +1,84 @@ |
|||
package com.heai.modules.production.entity; |
|||
|
|||
import java.util.Date; |
|||
|
|||
public class TcpData { |
|||
/** |
|||
* |
|||
*/ |
|||
private String ip; |
|||
|
|||
/** |
|||
* |
|||
*/ |
|||
private Integer port; |
|||
|
|||
/** |
|||
* |
|||
*/ |
|||
private String type; |
|||
|
|||
/** |
|||
* |
|||
*/ |
|||
private String value; |
|||
|
|||
/** |
|||
* |
|||
*/ |
|||
private Date createDate; |
|||
|
|||
/** |
|||
* |
|||
*/ |
|||
private String flag; |
|||
|
|||
public String getIp() { |
|||
return ip; |
|||
} |
|||
|
|||
public void setIp(String ip) { |
|||
this.ip = ip; |
|||
} |
|||
|
|||
public Integer getPort() { |
|||
return port; |
|||
} |
|||
|
|||
public void setPort(Integer port) { |
|||
this.port = port; |
|||
} |
|||
|
|||
public String getType() { |
|||
return type; |
|||
} |
|||
|
|||
public void setType(String type) { |
|||
this.type = type; |
|||
} |
|||
|
|||
public String getValue() { |
|||
return value; |
|||
} |
|||
|
|||
public void setValue(String value) { |
|||
this.value = value; |
|||
} |
|||
|
|||
public Date getCreateDate() { |
|||
return createDate; |
|||
} |
|||
|
|||
public void setCreateDate(Date createDate) { |
|||
this.createDate = createDate; |
|||
} |
|||
|
|||
public String getFlag() { |
|||
return flag; |
|||
} |
|||
|
|||
public void setFlag(String flag) { |
|||
this.flag = flag; |
|||
} |
|||
} |
|||
|
|||
@ -0,0 +1,13 @@ |
|||
<?xml version="1.0" encoding="UTF-8"?> |
|||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> |
|||
|
|||
<mapper namespace="com.heai.modules.production.dao.TcpMapper"> |
|||
<insert id="saveTcpData"> |
|||
insert into tcp_data (ip,port,[type],[value],create_date,flag) |
|||
values (#{ip},#{port},#{type},#{value},GetDate(),#{flag}) |
|||
</insert> |
|||
<select id="selectTCPMachine" resultType="ScaleTCPConfigData"> |
|||
select ip,port,remark,[type],dataFlag from machine_tcp |
|||
where [type] =#{type} |
|||
</select> |
|||
</mapper> |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue