6 changed files with 567 additions and 15 deletions
-
28src/main/java/com/gaotao/modules/base/dao/PrintTaskDao.java
-
119src/main/java/com/gaotao/modules/base/entity/PrintTask.java
-
94src/main/java/com/gaotao/modules/base/service/Impl/PrintTaskServiceImpl.java
-
107src/main/java/com/gaotao/modules/base/service/Impl/ReportLabelListServiceImpl.java
-
63src/main/java/com/gaotao/modules/base/service/PrintTaskService.java
-
171src/main/java/com/gaotao/modules/base/task/PrintTaskScheduler.java
@ -0,0 +1,28 @@ |
|||||
|
package com.gaotao.modules.base.dao; |
||||
|
|
||||
|
import com.baomidou.mybatisplus.core.mapper.BaseMapper; |
||||
|
import com.gaotao.modules.base.entity.PrintTask; |
||||
|
import org.apache.ibatis.annotations.Mapper; |
||||
|
import org.apache.ibatis.annotations.Param; |
||||
|
import org.apache.ibatis.annotations.Select; |
||||
|
|
||||
|
import java.util.List; |
||||
|
|
||||
|
/** |
||||
|
* 打印任务队列 Dao |
||||
|
*/ |
||||
|
@Mapper |
||||
|
public interface PrintTaskDao extends BaseMapper<PrintTask> { |
||||
|
|
||||
|
/** |
||||
|
* 获取待执行的打印任务(使用READPAST避免锁等待) |
||||
|
* |
||||
|
* @param limit 获取数量 |
||||
|
* @return 待执行任务列表 |
||||
|
*/ |
||||
|
@Select("SELECT TOP ${limit} * FROM print_task WITH (READPAST) " + |
||||
|
"WHERE task_status = 'PENDING' " + |
||||
|
"ORDER BY created_date ASC") |
||||
|
List<PrintTask> selectPendingTasksWithReadPast(@Param("limit") int limit); |
||||
|
} |
||||
|
|
||||
@ -0,0 +1,119 @@ |
|||||
|
package com.gaotao.modules.base.entity; |
||||
|
|
||||
|
import com.baomidou.mybatisplus.annotation.IdType; |
||||
|
import com.baomidou.mybatisplus.annotation.TableField; |
||||
|
import com.baomidou.mybatisplus.annotation.TableId; |
||||
|
import com.baomidou.mybatisplus.annotation.TableName; |
||||
|
import lombok.Data; |
||||
|
|
||||
|
import java.io.Serializable; |
||||
|
import java.util.Date; |
||||
|
|
||||
|
/** |
||||
|
* 打印任务队列实体类 |
||||
|
* |
||||
|
* <p><b>核心字段说明:</b></p> |
||||
|
* <ul> |
||||
|
* <li><b>printer_ip</b>:打印机IP地址</li> |
||||
|
* <li><b>zpl_code</b>:ZPL打印代码</li> |
||||
|
* <li><b>task_status</b>:任务状态(PENDING=待执行, PROCESSING=执行中, SUCCESS=成功, FAILED=失败)</li> |
||||
|
* </ul> |
||||
|
* |
||||
|
* <p><b>使用说明:</b></p> |
||||
|
* <ul> |
||||
|
* <li>所有打印任务先插入此表,不直接打印</li> |
||||
|
* <li>定时任务按created_date顺序(FIFO)取出PENDING状态的任务</li> |
||||
|
* <li>执行时先更新为PROCESSING,防止重复执行</li> |
||||
|
* <li>成功后更新为SUCCESS,失败后更新为FAILED并记录错误信息</li> |
||||
|
* </ul> |
||||
|
*/ |
||||
|
@Data |
||||
|
@TableName("print_task") |
||||
|
public class PrintTask implements Serializable { |
||||
|
private static final long serialVersionUID = 1L; |
||||
|
|
||||
|
/** |
||||
|
* 主键ID |
||||
|
*/ |
||||
|
@TableId(type = IdType.AUTO) |
||||
|
private Long id; |
||||
|
|
||||
|
/** |
||||
|
* 站点编码 |
||||
|
*/ |
||||
|
@TableField("site") |
||||
|
private String site; |
||||
|
|
||||
|
/** |
||||
|
* 打印机IP地址 |
||||
|
*/ |
||||
|
@TableField("printer_ip") |
||||
|
private String printerIp; |
||||
|
|
||||
|
/** |
||||
|
* ZPL打印代码 |
||||
|
*/ |
||||
|
@TableField("zpl_code") |
||||
|
private String zplCode; |
||||
|
|
||||
|
/** |
||||
|
* RFID标签标识 |
||||
|
*/ |
||||
|
@TableField("rfid_flag") |
||||
|
private String rfidFlag; |
||||
|
|
||||
|
/** |
||||
|
* 打印份数 |
||||
|
*/ |
||||
|
@TableField("copies") |
||||
|
private Integer copies; |
||||
|
|
||||
|
/** |
||||
|
* 标签数据(JSON格式) |
||||
|
*/ |
||||
|
@TableField("label_data") |
||||
|
private String labelData; |
||||
|
|
||||
|
/** |
||||
|
* 任务状态:PENDING=待执行, PROCESSING=执行中, SUCCESS=成功, FAILED=失败 |
||||
|
*/ |
||||
|
@TableField("task_status") |
||||
|
private String taskStatus; |
||||
|
|
||||
|
/** |
||||
|
* 重试次数 |
||||
|
*/ |
||||
|
@TableField("retry_count") |
||||
|
private Integer retryCount; |
||||
|
|
||||
|
/** |
||||
|
* 错误信息 |
||||
|
*/ |
||||
|
@TableField("error_message") |
||||
|
private String errorMessage; |
||||
|
|
||||
|
/** |
||||
|
* 创建人 |
||||
|
*/ |
||||
|
@TableField("created_by") |
||||
|
private String createdBy; |
||||
|
|
||||
|
/** |
||||
|
* 创建时间 |
||||
|
*/ |
||||
|
@TableField("created_date") |
||||
|
private Date createdDate; |
||||
|
|
||||
|
/** |
||||
|
* 开始执行时间 |
||||
|
*/ |
||||
|
@TableField("started_date") |
||||
|
private Date startedDate; |
||||
|
|
||||
|
/** |
||||
|
* 完成时间 |
||||
|
*/ |
||||
|
@TableField("completed_date") |
||||
|
private Date completedDate; |
||||
|
} |
||||
|
|
||||
@ -0,0 +1,94 @@ |
|||||
|
package com.gaotao.modules.base.service.Impl; |
||||
|
|
||||
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; |
||||
|
import com.gaotao.modules.base.dao.PrintTaskDao; |
||||
|
import com.gaotao.modules.base.entity.PrintTask; |
||||
|
import com.gaotao.modules.base.service.PrintTaskService; |
||||
|
import com.google.gson.Gson; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.springframework.stereotype.Service; |
||||
|
import org.springframework.transaction.annotation.Transactional; |
||||
|
|
||||
|
import java.util.Date; |
||||
|
import java.util.List; |
||||
|
import java.util.Map; |
||||
|
|
||||
|
/** |
||||
|
* 打印任务队列服务实现类 |
||||
|
*/ |
||||
|
@Slf4j |
||||
|
@Service |
||||
|
public class PrintTaskServiceImpl extends ServiceImpl<PrintTaskDao, PrintTask> implements PrintTaskService { |
||||
|
|
||||
|
private static final Gson gson = new Gson(); |
||||
|
|
||||
|
@Override |
||||
|
@Transactional |
||||
|
public Long addPrintTask(String printerIp, String zplCode, String rfidFlag, |
||||
|
Integer copies, Map<String, Object> labelData, |
||||
|
String site, String createdBy) { |
||||
|
PrintTask task = new PrintTask(); |
||||
|
task.setSite(site); |
||||
|
task.setPrinterIp(printerIp); |
||||
|
task.setZplCode(zplCode); |
||||
|
task.setRfidFlag(rfidFlag); |
||||
|
task.setCopies(copies != null && copies > 0 ? copies : 1); |
||||
|
task.setLabelData(labelData != null ? gson.toJson(labelData) : null); |
||||
|
task.setTaskStatus("PENDING"); |
||||
|
task.setRetryCount(0); |
||||
|
task.setCreatedBy(createdBy); |
||||
|
task.setCreatedDate(new Date()); |
||||
|
|
||||
|
boolean saved = this.save(task); |
||||
|
if (!saved) { |
||||
|
log.error("添加打印任务失败:printerIp={}, zplCode前50字符={}", |
||||
|
printerIp, zplCode.substring(0, Math.min(50, zplCode.length()))); |
||||
|
throw new RuntimeException("添加打印任务失败"); |
||||
|
} |
||||
|
|
||||
|
log.info("✓ 打印任务已加入队列:taskId={}, printerIp={}, copies={}", |
||||
|
task.getId(), printerIp, task.getCopies()); |
||||
|
return task.getId(); |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public List<PrintTask> getPendingTasks(int limit) { |
||||
|
// 使用原生SQL + READPAST提示,避免锁等待 |
||||
|
// READPAST: 跳过被其他事务锁定的行,确保不阻塞INSERT操作 |
||||
|
return this.baseMapper.selectPendingTasksWithReadPast(limit); |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
@Transactional |
||||
|
public boolean markAsProcessing(Long taskId) { |
||||
|
return this.lambdaUpdate() |
||||
|
.set(PrintTask::getTaskStatus, "PROCESSING") |
||||
|
.set(PrintTask::getStartedDate, new Date()) |
||||
|
.eq(PrintTask::getId, taskId) |
||||
|
.eq(PrintTask::getTaskStatus, "PENDING") // 乐观锁:只更新PENDING状态的任务 |
||||
|
.update(); |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
@Transactional |
||||
|
public boolean markAsSuccess(Long taskId) { |
||||
|
return this.lambdaUpdate() |
||||
|
.set(PrintTask::getTaskStatus, "SUCCESS") |
||||
|
.set(PrintTask::getCompletedDate, new Date()) |
||||
|
.eq(PrintTask::getId, taskId) |
||||
|
.update(); |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
@Transactional |
||||
|
public boolean markAsFailed(Long taskId, String errorMessage) { |
||||
|
return this.lambdaUpdate() |
||||
|
.set(PrintTask::getTaskStatus, "FAILED") |
||||
|
.set(PrintTask::getErrorMessage, errorMessage) |
||||
|
.set(PrintTask::getCompletedDate, new Date()) |
||||
|
.setSql("retry_count = retry_count + 1") |
||||
|
.eq(PrintTask::getId, taskId) |
||||
|
.update(); |
||||
|
} |
||||
|
} |
||||
|
|
||||
@ -0,0 +1,63 @@ |
|||||
|
package com.gaotao.modules.base.service; |
||||
|
|
||||
|
import com.baomidou.mybatisplus.extension.service.IService; |
||||
|
import com.gaotao.modules.base.entity.PrintTask; |
||||
|
|
||||
|
import java.util.List; |
||||
|
import java.util.Map; |
||||
|
|
||||
|
/** |
||||
|
* 打印任务队列服务接口 |
||||
|
*/ |
||||
|
public interface PrintTaskService extends IService<PrintTask> { |
||||
|
|
||||
|
/** |
||||
|
* 添加打印任务到队列 |
||||
|
* |
||||
|
* @param printerIp 打印机IP |
||||
|
* @param zplCode ZPL代码 |
||||
|
* @param rfidFlag RFID标识 |
||||
|
* @param copies 打印份数 |
||||
|
* @param labelData 标签数据 |
||||
|
* @param site 站点 |
||||
|
* @param createdBy 创建人 |
||||
|
* @return 任务ID |
||||
|
*/ |
||||
|
Long addPrintTask(String printerIp, String zplCode, String rfidFlag, |
||||
|
Integer copies, Map<String, Object> labelData, |
||||
|
String site, String createdBy); |
||||
|
|
||||
|
/** |
||||
|
* 获取待执行的打印任务(FIFO顺序) |
||||
|
* |
||||
|
* @param limit 获取任务数量 |
||||
|
* @return 待执行任务列表 |
||||
|
*/ |
||||
|
List<PrintTask> getPendingTasks(int limit); |
||||
|
|
||||
|
/** |
||||
|
* 更新任务为执行中 |
||||
|
* |
||||
|
* @param taskId 任务ID |
||||
|
* @return 是否成功 |
||||
|
*/ |
||||
|
boolean markAsProcessing(Long taskId); |
||||
|
|
||||
|
/** |
||||
|
* 更新任务为成功 |
||||
|
* |
||||
|
* @param taskId 任务ID |
||||
|
* @return 是否成功 |
||||
|
*/ |
||||
|
boolean markAsSuccess(Long taskId); |
||||
|
|
||||
|
/** |
||||
|
* 更新任务为失败 |
||||
|
* |
||||
|
* @param taskId 任务ID |
||||
|
* @param errorMessage 错误信息 |
||||
|
* @return 是否成功 |
||||
|
*/ |
||||
|
boolean markAsFailed(Long taskId, String errorMessage); |
||||
|
} |
||||
|
|
||||
@ -0,0 +1,171 @@ |
|||||
|
package com.gaotao.modules.base.task; |
||||
|
|
||||
|
import com.gaotao.modules.base.entity.PrintTask; |
||||
|
import com.gaotao.modules.base.service.PrintTaskService; |
||||
|
import com.google.gson.Gson; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.beans.factory.annotation.Value; |
||||
|
import org.springframework.scheduling.annotation.Scheduled; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
import java.io.OutputStream; |
||||
|
import java.net.Socket; |
||||
|
import java.util.List; |
||||
|
import java.util.Map; |
||||
|
|
||||
|
/** |
||||
|
* 打印任务定时调度器 |
||||
|
* |
||||
|
* <p><b>功能说明:</b></p> |
||||
|
* <ul> |
||||
|
* <li>每秒扫描一次打印任务队列</li> |
||||
|
* <li>按FIFO顺序(created_date)取出PENDING状态的任务</li> |
||||
|
* <li>执行打印任务并更新状态</li> |
||||
|
* <li>失败任务记录错误信息</li> |
||||
|
* </ul> |
||||
|
* |
||||
|
* <p><b>并发控制:</b></p> |
||||
|
* <ul> |
||||
|
* <li>使用数据库乐观锁(markAsProcessing时检查状态)</li> |
||||
|
* <li>确保同一任务不会被重复执行</li> |
||||
|
* </ul> |
||||
|
*/ |
||||
|
@Slf4j |
||||
|
@Component |
||||
|
public class PrintTaskScheduler { |
||||
|
|
||||
|
@Autowired |
||||
|
private PrintTaskService printTaskService; |
||||
|
|
||||
|
private static final Gson gson = new Gson(); |
||||
|
|
||||
|
@Value("${dashboard.push.enabled:true}") |
||||
|
private boolean dashboardPushEnabled; |
||||
|
|
||||
|
// 每秒执行一次 |
||||
|
@Scheduled(fixedDelay = 500) |
||||
|
public void processPrintTasks() { |
||||
|
try { |
||||
|
// 1. 获取待执行的任务(每次处理10个) |
||||
|
List<PrintTask> pendingTasks = printTaskService.getPendingTasks(10); |
||||
|
|
||||
|
if (pendingTasks.isEmpty()) { |
||||
|
return; // 没有待处理任务 |
||||
|
} |
||||
|
|
||||
|
log.info("【打印队列】发现 {} 个待执行任务", pendingTasks.size()); |
||||
|
|
||||
|
// 2. 按FIFO顺序逐个执行 |
||||
|
for (PrintTask task : pendingTasks) { |
||||
|
processTask(task); |
||||
|
} |
||||
|
|
||||
|
} catch (Exception e) { |
||||
|
log.error("打印任务调度器执行异常: {}", e.getMessage(), e); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 处理单个打印任务 |
||||
|
*/ |
||||
|
private void processTask(PrintTask task) { |
||||
|
Long taskId = task.getId(); |
||||
|
String printerIp = task.getPrinterIp(); |
||||
|
|
||||
|
try { |
||||
|
// 1. 标记为执行中(乐观锁,防止并发重复执行) |
||||
|
boolean marked = printTaskService.markAsProcessing(taskId); |
||||
|
if (!marked) { |
||||
|
log.debug("任务 {} 已被其他线程处理,跳过", taskId); |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
log.info("【打印队列】开始执行任务 taskId={}, 打印机={}, 份数={}", |
||||
|
taskId, printerIp, task.getCopies()); |
||||
|
|
||||
|
// 2. 解析标签数据 |
||||
|
Map<String, Object> labelData = null; |
||||
|
if (task.getLabelData() != null && !task.getLabelData().trim().isEmpty()) { |
||||
|
labelData = gson.fromJson(task.getLabelData(), Map.class); |
||||
|
} |
||||
|
|
||||
|
// 3. 执行打印 |
||||
|
boolean success = executePrint( |
||||
|
printerIp, |
||||
|
task.getZplCode(), |
||||
|
task.getCopies(), |
||||
|
task.getRfidFlag(), |
||||
|
labelData |
||||
|
); |
||||
|
|
||||
|
// 4. 更新任务状态 |
||||
|
if (success) { |
||||
|
printTaskService.markAsSuccess(taskId); |
||||
|
log.info("【打印队列】✓ 任务完成 taskId={}, 打印机={}", taskId, printerIp); |
||||
|
} else { |
||||
|
printTaskService.markAsFailed(taskId, "打印失败,未返回成功标识"); |
||||
|
log.error("【打印队列】✗ 任务失败 taskId={}, 打印机={}", taskId, printerIp); |
||||
|
} |
||||
|
|
||||
|
} catch (Exception e) { |
||||
|
log.error("【打印队列】✗ 任务执行异常 taskId={}, 错误: {}", taskId, e.getMessage(), e); |
||||
|
printTaskService.markAsFailed(taskId, "执行异常: " + e.getMessage()); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 执行打印(发送ZPL到打印机) |
||||
|
* 注意:ZPL代码已在入队时组装好(包括RFID指令),这里只需要直接发送 |
||||
|
* |
||||
|
* @param printerIp 打印机IP |
||||
|
* @param zplCode ZPL代码(已包含RFID指令) |
||||
|
* @param copies 打印份数 |
||||
|
* @param rfidFlag RFID标识(用于日志) |
||||
|
* @param labelData 标签数据(未使用,保留以备后续扩展) |
||||
|
* @return 是否成功 |
||||
|
*/ |
||||
|
private boolean executePrint(String printerIp, String zplCode, Integer copies, |
||||
|
String rfidFlag, Map<String, Object> labelData) { |
||||
|
Socket socket = null; |
||||
|
try { |
||||
|
// 1. 连接打印机 |
||||
|
socket = new Socket(printerIp, 9100); |
||||
|
socket.setSoTimeout(10000); // 10秒超时 |
||||
|
|
||||
|
OutputStream os = socket.getOutputStream(); |
||||
|
|
||||
|
int actualCopies = copies != null && copies > 0 ? copies : 1; |
||||
|
|
||||
|
// 2. 直接发送ZPL代码(已在入队时组装好,包括RFID指令) |
||||
|
log.debug("开始发送ZPL到打印机 {}, 份数: {}, RFID: {}", printerIp, actualCopies, rfidFlag); |
||||
|
|
||||
|
for (int i = 0; i < actualCopies; i++) { |
||||
|
os.write(zplCode.getBytes("UTF-8")); |
||||
|
os.flush(); |
||||
|
|
||||
|
// RFID标签需要等待处理时间 |
||||
|
if ("Y".equals(rfidFlag)) { |
||||
|
Thread.sleep(1000); // RFID标签等待1秒 |
||||
|
log.debug("RFID标签打印完成 {} / {}", i + 1, actualCopies); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
log.debug("✓ ZPL已发送到打印机 {}, 份数: {}", printerIp, actualCopies); |
||||
|
return true; |
||||
|
|
||||
|
} catch (Exception e) { |
||||
|
log.error("发送ZPL到打印机失败: printerIp={}, error={}", printerIp, e.getMessage(), e); |
||||
|
return false; |
||||
|
} finally { |
||||
|
if (socket != null) { |
||||
|
try { |
||||
|
socket.close(); |
||||
|
} catch (Exception e) { |
||||
|
log.warn("关闭socket失败: {}", e.getMessage()); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
Write
Preview
Loading…
Cancel
Save
Reference in new issue