Browse Source

流式

master
常熟吴彦祖 3 weeks ago
parent
commit
a9ab3ba802
  1. 495
      src/main/java/com/xujie/sys/common/utils/StreamCsvExportUtil.java
  2. 24
      src/main/java/com/xujie/sys/modules/pms/mapper/MesTidEpcLogMapper.java
  3. 212
      src/main/java/com/xujie/sys/modules/pms/service/Impl/MesTidEpcLogServiceImpl.java
  4. 54
      src/main/resources/mapper/pms/MesTidEpcLogMapper.xml

495
src/main/java/com/xujie/sys/common/utils/StreamCsvExportUtil.java

@ -0,0 +1,495 @@
package com.xujie.sys.common.utils;
import lombok.extern.slf4j.Slf4j;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.function.Function;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
/**
* 流式CSV导出工具类真正的流式导出边查边写- rqrq
*
* <p><b>功能特点</b></p>
* <ul>
* <li>支持单CSV和多CSVZIP压缩包两种模式</li>
* <li>使用GB18030编码兼容Excel中文显示</li>
* <li>自动处理CSV字段转义逗号引号换行</li>
* <li>支持强制文本格式保留前导零</li>
* <li>内置进度日志记录</li>
* </ul>
*
* <p><b>使用方式</b></p>
* <pre>
* // 1. 创建Writer
* StreamCsvExportUtil.CsvStreamWriter writer = StreamCsvExportUtil.createSingleCsvWriter(response.getOutputStream(), "导出文件.csv");
* // 或创建ZIP Writer
* StreamCsvExportUtil.CsvStreamWriter writer = StreamCsvExportUtil.createZipCsvWriter(response.getOutputStream(), "导出文件.zip", "数据");
*
* // 2. 写入表头
* writer.writeHeader("列1", "列2", "列3");
*
* // 3. 在ResultHandler中逐行写入
* mapper.streamQuery(query, result -> {
* writer.writeRow(
* writer.asText(result.getField1()), // 强制文本格式
* writer.escape(result.getField2()), // 普通转义
* writer.formatDate(result.getDate()) // 日期格式化
* );
* });
*
* // 4. 关闭Writer
* writer.close();
* </pre>
*
* @author rqrq
* @date 2025/02/04
*/
@Slf4j
public class StreamCsvExportUtil {
/**
* 默认编码GB18030兼容Excel中文- rqrq
*/
private static final String DEFAULT_CHARSET = "GB18030";
/**
* 默认缓冲区大小128KB- rqrq
*/
private static final int DEFAULT_BUFFER_SIZE = 8192 * 16;
/**
* 每个CSV文件最大行数默认100万- rqrq
*/
public static final int DEFAULT_MAX_ROWS_PER_CSV = 1000000;
/**
* @Description 创建单CSV文件写入器 - rqrq
* @param outputStream 输出流
* @param fileName 文件名用于日志
* @return CsvStreamWriter
* @author rqrq
*/
public static CsvStreamWriter createSingleCsvWriter(OutputStream outputStream, String fileName) throws IOException {
return new SingleCsvWriter(outputStream, fileName);
}
/**
* @Description 创建ZIP压缩包写入器多CSV文件- rqrq
* @param outputStream 输出流
* @param zipFileName ZIP文件名用于日志
* @param csvFilePrefix CSV文件名前缀"数据"则生成"数据_1.csv""数据_2.csv"
* @return CsvStreamWriter
* @author rqrq
*/
public static CsvStreamWriter createZipCsvWriter(OutputStream outputStream, String zipFileName, String csvFilePrefix) throws IOException {
return new ZipCsvWriter(outputStream, zipFileName, csvFilePrefix, DEFAULT_MAX_ROWS_PER_CSV);
}
/**
* @Description 创建ZIP压缩包写入器自定义每个CSV最大行数- rqrq
* @param outputStream 输出流
* @param zipFileName ZIP文件名
* @param csvFilePrefix CSV文件名前缀
* @param maxRowsPerCsv 每个CSV最大行数
* @return CsvStreamWriter
* @author rqrq
*/
public static CsvStreamWriter createZipCsvWriter(OutputStream outputStream, String zipFileName, String csvFilePrefix, int maxRowsPerCsv) throws IOException {
return new ZipCsvWriter(outputStream, zipFileName, csvFilePrefix, maxRowsPerCsv);
}
/**
* CSV流式写入器接口 - rqrq
*
* <p><b>异常处理机制</b></p>
* <ul>
* <li>写入失败时自动设置错误状态</li>
* <li>后续 writeRow 调用会直接跳过避免资源浪费</li>
* <li>调用方可通过 hasError() 检查是否发生错误</li>
* <li>调用方可通过 getLastError() 获取错误详情</li>
* </ul>
*/
public interface CsvStreamWriter extends AutoCloseable {
/**
* 写入表头行 - rqrq
*/
void writeHeader(String... headers) throws IOException;
/**
* 写入数据行如果已发生错误则跳过- rqrq
*/
void writeRow(String... values) throws IOException;
/**
* 写入数据行安全版本不抛异常返回是否成功- rqrq
* 用于 ResultHandler 中避免抛出 RuntimeException
*/
boolean writeRowSafe(String... values);
/**
* 获取已写入行数 - rqrq
*/
long getWrittenRows();
/**
* 刷新缓冲区 - rqrq
*/
void flush() throws IOException;
/**
* 关闭写入器 - rqrq
*/
@Override
void close() throws IOException;
/**
* 检查是否已发生错误 - rqrq
*/
boolean hasError();
/**
* 获取最后一次错误 - rqrq
*/
Throwable getLastError();
/**
* 手动设置错误状态用于外部捕获到异常时- rqrq
*/
void setError(Throwable e);
/**
* CSV字段转义处理逗号引号换行- rqrq
*/
default String escape(String value) {
if (value == null) {
return "";
}
if (value.contains(",") || value.contains("\"") || value.contains("\n") || value.contains("\r")) {
return "\"" + value.replace("\"", "\"\"") + "\"";
}
return value;
}
/**
* 强制文本格式防止前导零丢失- rqrq
* 使用 ="值" 格式Excel会识别为文本
*/
default String asText(String value) {
if (value == null || value.isEmpty()) {
return "";
}
return "=\"" + value.replace("\"", "\"\"") + "\"";
}
/**
* 格式化日期 - rqrq
*/
default String formatDate(Date date) {
if (date == null) {
return "";
}
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date);
}
/**
* 格式化日期自定义格式- rqrq
*/
default String formatDate(Date date, String pattern) {
if (date == null) {
return "";
}
return new SimpleDateFormat(pattern).format(date);
}
}
/**
* 单CSV文件写入器实现 - rqrq
*/
private static class SingleCsvWriter implements CsvStreamWriter {
private final BufferedWriter writer;
private final String fileName;
private long writtenRows = 0;
// 错误状态volatile保证多线程可见性- rqrq
private volatile boolean hasError = false;
private volatile Throwable lastError = null;
SingleCsvWriter(OutputStream outputStream, String fileName) throws IOException {
this.fileName = fileName;
this.writer = new BufferedWriter(new OutputStreamWriter(outputStream, DEFAULT_CHARSET), DEFAULT_BUFFER_SIZE);
log.info("创建单CSV写入器: {} - rqrq", fileName);
}
@Override
public void writeHeader(String... headers) throws IOException {
if (hasError) return;
try {
writer.write(String.join(",", headers));
writer.newLine();
} catch (IOException e) {
setError(e);
throw e;
}
}
@Override
public void writeRow(String... values) throws IOException {
// 如果已发生错误直接跳过 - rqrq
if (hasError) {
return;
}
try {
writer.write(String.join(",", values));
writer.newLine();
writtenRows++;
if (writtenRows % 100000 == 0) {
log.info("CSV已写入 {} 行 - rqrq", writtenRows);
flush();
}
} catch (IOException e) {
setError(e);
throw e;
}
}
@Override
public boolean writeRowSafe(String... values) {
if (hasError) {
return false;
}
try {
writeRow(values);
return true;
} catch (IOException e) {
// 已在writeRow中设置了错误状态
log.warn("写入行失败,后续行将跳过: {} - rqrq", e.getMessage());
return false;
}
}
@Override
public long getWrittenRows() {
return writtenRows;
}
@Override
public void flush() throws IOException {
if (!hasError) {
writer.flush();
}
}
@Override
public void close() throws IOException {
try {
if (!hasError) {
flush();
}
writer.close();
} catch (IOException e) {
log.warn("关闭写入器时发生错误: {} - rqrq", e.getMessage());
}
log.info("单CSV写入完成: {},共 {} 行,hasError: {} - rqrq", fileName, writtenRows, hasError);
}
@Override
public boolean hasError() {
return hasError;
}
@Override
public Throwable getLastError() {
return lastError;
}
@Override
public void setError(Throwable e) {
this.hasError = true;
this.lastError = e;
log.error("CSV写入器发生错误,后续写入将跳过: {} - rqrq", e.getMessage());
}
}
/**
* ZIP压缩包写入器实现多CSV文件- rqrq
*/
private static class ZipCsvWriter implements CsvStreamWriter {
private final ZipOutputStream zipOut;
private final String zipFileName;
private final String csvFilePrefix;
private final int maxRowsPerCsv;
private BufferedWriter currentWriter;
private int currentCsvIndex = 0;
private long currentCsvRows = 0;
private long totalWrittenRows = 0;
private String[] headerRow;
// 错误状态volatile保证多线程可见性- rqrq
private volatile boolean hasError = false;
private volatile Throwable lastError = null;
ZipCsvWriter(OutputStream outputStream, String zipFileName, String csvFilePrefix, int maxRowsPerCsv) throws IOException {
this.zipOut = new ZipOutputStream(outputStream);
this.zipFileName = zipFileName;
this.csvFilePrefix = csvFilePrefix;
this.maxRowsPerCsv = maxRowsPerCsv;
log.info("创建ZIP写入器: {},每个CSV最多 {} 行 - rqrq", zipFileName, maxRowsPerCsv);
}
@Override
public void writeHeader(String... headers) throws IOException {
if (hasError) return;
this.headerRow = headers;
try {
// 创建第一个CSV文件并写入表头
createNewCsvFile();
} catch (IOException e) {
setError(e);
throw e;
}
}
@Override
public void writeRow(String... values) throws IOException {
// 如果已发生错误直接跳过 - rqrq
if (hasError) {
return;
}
try {
// 检查是否需要创建新CSV文件
if (currentCsvRows >= maxRowsPerCsv) {
closeCurrentCsv();
createNewCsvFile();
}
currentWriter.write(String.join(",", values));
currentWriter.newLine();
currentCsvRows++;
totalWrittenRows++;
if (totalWrittenRows % 100000 == 0) {
log.info("ZIP总计已写入 {} 行,当前CSV {} 有 {} 行 - rqrq",
totalWrittenRows, currentCsvIndex, currentCsvRows);
flush();
}
} catch (IOException e) {
setError(e);
throw e;
}
}
@Override
public boolean writeRowSafe(String... values) {
if (hasError) {
return false;
}
try {
writeRow(values);
return true;
} catch (IOException e) {
// 已在writeRow中设置了错误状态
log.warn("写入行失败,后续行将跳过: {} - rqrq", e.getMessage());
return false;
}
}
@Override
public long getWrittenRows() {
return totalWrittenRows;
}
@Override
public void flush() throws IOException {
if (!hasError && currentWriter != null) {
currentWriter.flush();
}
}
@Override
public void close() throws IOException {
try {
if (!hasError) {
closeCurrentCsv();
zipOut.finish();
}
zipOut.close();
} catch (IOException e) {
log.warn("关闭ZIP写入器时发生错误: {} - rqrq", e.getMessage());
}
log.info("ZIP写入完成: {},共 {} 个CSV文件,总计 {} 行,hasError: {} - rqrq",
zipFileName, currentCsvIndex, totalWrittenRows, hasError);
}
@Override
public boolean hasError() {
return hasError;
}
@Override
public Throwable getLastError() {
return lastError;
}
@Override
public void setError(Throwable e) {
this.hasError = true;
this.lastError = e;
log.error("ZIP写入器发生错误,后续写入将跳过: {} - rqrq", e.getMessage());
}
private void createNewCsvFile() throws IOException {
currentCsvIndex++;
// 使用ASCII文件名避免Windows解压乱码 - rqrq
String csvFileName = "TID_EPC_Log_" + currentCsvIndex + ".csv";
ZipEntry entry = new ZipEntry(csvFileName);
zipOut.putNextEntry(entry);
// 注意ZIP内的Writer不能关闭底层流
currentWriter = new BufferedWriter(new OutputStreamWriter(zipOut, DEFAULT_CHARSET), DEFAULT_BUFFER_SIZE);
// 写入表头
if (headerRow != null) {
currentWriter.write(String.join(",", headerRow));
currentWriter.newLine();
}
currentCsvRows = 0;
log.info("创建CSV文件: {} - rqrq", csvFileName);
}
private void closeCurrentCsv() throws IOException {
if (currentWriter != null) {
currentWriter.flush();
zipOut.closeEntry();
log.info("CSV文件 {} 写入完成,共 {} 行 - rqrq", currentCsvIndex, currentCsvRows);
}
}
}
/**
* @Description 构建CSV数据行通用方法- rqrq
* @param extractors 字段提取器数组
* @param data 数据对象
* @return String[] CSV行数据
* @author rqrq
*/
@SafeVarargs
public static <T> String[] buildRow(T data, Function<T, String>... extractors) {
String[] row = new String[extractors.length];
for (int i = 0; i < extractors.length; i++) {
row[i] = extractors[i].apply(data);
}
return row;
}
}

24
src/main/java/com/xujie/sys/modules/pms/mapper/MesTidEpcLogMapper.java

@ -7,7 +7,11 @@ import com.xujie.sys.modules.pms.data.MesTidEpcLogData;
import com.xujie.sys.modules.pms.data.MesTidEpcLogExportData; import com.xujie.sys.modules.pms.data.MesTidEpcLogExportData;
import com.xujie.sys.modules.pms.entity.MesTidEpcLogEntity; import com.xujie.sys.modules.pms.entity.MesTidEpcLogEntity;
import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.ResultType;
import org.apache.ibatis.mapping.ResultSetType;
import org.apache.ibatis.session.ResultHandler;
import java.util.List; import java.util.List;
@ -71,6 +75,26 @@ public interface MesTidEpcLogMapper extends BaseMapper<MesTidEpcLogEntity> {
@Param("offset") long offset, @Param("offset") long offset,
@Param("limit") int limit); @Param("limit") int limit);
/**
* @Description 流式查询导出数据真正的逐行读取配合ResultHandler使用- rqrq
*
* <p><b>流式查询原理</b></p>
* <ul>
* <li>使用 FORWARD_ONLY 游标数据逐行从数据库读取</li>
* <li>配合 responseBuffering=adaptive在JDBC URL中配置</li>
* <li>fetchSize=1000 控制每次从数据库预取的行数</li>
* <li>数据不会全部加载到内存避免OOM</li>
* </ul>
*
* @param query 查询条件
* @param handler 结果处理器逐行处理数据
* @author rqrq
* @date 2025/02/04
*/
@Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = 1000)
@ResultType(MesTidEpcLogExportData.class)
void streamExportList(@Param("query") MesTidEpcLogData query, ResultHandler<MesTidEpcLogExportData> handler);
/** /**
* @Description 批量插入日志数据 - rqrq * @Description 批量插入日志数据 - rqrq
* @param list 日志数据列表 * @param list 日志数据列表

212
src/main/java/com/xujie/sys/modules/pms/service/Impl/MesTidEpcLogServiceImpl.java

@ -22,6 +22,8 @@ import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import org.springframework.web.multipart.MultipartFile; import org.springframework.web.multipart.MultipartFile;
import com.xujie.sys.common.utils.StreamCsvExportUtil;
import java.io.BufferedWriter; import java.io.BufferedWriter;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
@ -32,6 +34,7 @@ import java.text.SimpleDateFormat;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream; import java.util.zip.ZipOutputStream;
@ -293,7 +296,15 @@ public class MesTidEpcLogServiceImpl extends ServiceImpl<MesTidEpcLogMapper, Mes
private static final int MAX_ROWS_PER_CSV = 1000000; // 100万条 private static final int MAX_ROWS_PER_CSV = 1000000; // 100万条
/** /**
* @Description 导出CSV文件自动判断100万返回单CSV>100万返回ZIP包含多个CSV- rqrq
* @Description 导出CSV文件真正的流式导出MyBatis ResultHandler + 逐行写入- rqrq
*
* <p><b>V5版本真正的流式导出</b></p>
* <ul>
* <li>使用 MyBatis ResultHandler 逐行从数据库读取</li>
* <li>读一行写一行内存中不存储任何数据批次</li>
* <li>配合 JDBC responseBuffering=adaptive FORWARD_ONLY 游标</li>
* <li>真正的 O(1) 内存占用可导出任意大小数据集</li>
* </ul>
* *
* <p><b>导出策略</b></p> * <p><b>导出策略</b></p>
* <ul> * <ul>
@ -305,15 +316,16 @@ public class MesTidEpcLogServiceImpl extends ServiceImpl<MesTidEpcLogMapper, Mes
* @param data 查询条件 * @param data 查询条件
* @param response HttpServletResponse * @param response HttpServletResponse
* @author rqrq * @author rqrq
* @date 2025/02/04
*/ */
@Override @Override
public void exportCsv(MesTidEpcLogData data, HttpServletResponse response) { public void exportCsv(MesTidEpcLogData data, HttpServletResponse response) {
log.info("开始导出MES TID EPC日志文件 - rqrq");
log.info("开始导出MES TID EPC日志文件(V5:真正流式导出)- rqrq");
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
try { try {
// ============ 查询总数检查数据量 ============ - rqrq
// ============ 步骤1查询总数检查数据量 ============ - rqrq
long countStartTime = System.currentTimeMillis(); long countStartTime = System.currentTimeMillis();
long dataTotal = mesTidEpcLogMapper.getExportCount(data); long dataTotal = mesTidEpcLogMapper.getExportCount(data);
long countEndTime = System.currentTimeMillis(); long countEndTime = System.currentTimeMillis();
@ -329,20 +341,20 @@ public class MesTidEpcLogServiceImpl extends ServiceImpl<MesTidEpcLogMapper, Mes
throw new RuntimeException("该条件数据数量超过五百万(当前" + dataTotal + "条),请缩小扫描时间范围手动分批导出"); throw new RuntimeException("该条件数据数量超过五百万(当前" + dataTotal + "条),请缩小扫描时间范围手动分批导出");
} }
// ============ 根据数据量选择导出方式 ============ - rqrq
// ============ 步骤2根据数据量选择导出方式 ============ - rqrq
if (dataTotal <= MAX_ROWS_PER_CSV) { if (dataTotal <= MAX_ROWS_PER_CSV) {
// 100万直接输出单个CSV - rqrq
log.info("数据量 {} 条 ≤ 100万,输出单个CSV文件 - rqrq", dataTotal);
writeToCsvStreaming(data, dataTotal, response);
// 100万直接输出单个CSV真正流式- rqrq
log.info("数据量 {} 条 ≤ 100万,输出单个CSV文件(真正流式)- rqrq", dataTotal);
exportToCsvRealStreaming(data, dataTotal, response);
} else { } else {
// >100万输出ZIP压缩包包含多个CSV - rqrq
// >100万输出ZIP压缩包包含多个CSV真正流式- rqrq
int csvFileCount = (int) Math.ceil((double) dataTotal / MAX_ROWS_PER_CSV); int csvFileCount = (int) Math.ceil((double) dataTotal / MAX_ROWS_PER_CSV);
log.info("数据量 {} 条 > 100万,输出ZIP压缩包,包含 {} 个CSV文件 - rqrq", dataTotal, csvFileCount);
writeToZipWithMultipleCsv(data, dataTotal, response);
log.info("数据量 {} 条 > 100万,输出ZIP压缩包,包含 {} 个CSV文件(真正流式)- rqrq", dataTotal, csvFileCount);
exportToZipRealStreaming(data, dataTotal, response);
} }
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
log.info("导出完成,总耗时: {} 秒 - rqrq", (endTime - startTime) / 1000.0);
log.info("导出完成(V5),总耗时: {} 秒 - rqrq", (endTime - startTime) / 1000.0);
} catch (Exception e) { } catch (Exception e) {
log.error("导出文件失败 - rqrq: {}", e.getMessage(), e); log.error("导出文件失败 - rqrq: {}", e.getMessage(), e);
@ -351,9 +363,182 @@ public class MesTidEpcLogServiceImpl extends ServiceImpl<MesTidEpcLogMapper, Mes
} }
/** /**
* @Description 写入ZIP压缩包包含多个CSV文件每个最多100万条- rqrq
* @Description 真正的流式CSV导出单文件- rqrq
*
* <p><b>核心原理</b></p>
* <ul>
* <li>MyBatis ResultHandler 逐行回调</li>
* <li>每收到一行数据立即写入CSV</li>
* <li>内存中始终只有1行数据</li>
* </ul>
*
* @param data 查询条件
* @param dataTotal 数据总数
* @param response HttpServletResponse
* @author rqrq
* @date 2025/02/04
*/
private void exportToCsvRealStreaming(MesTidEpcLogData data, long dataTotal, HttpServletResponse response) throws IOException {
log.info("【真正流式导出】单CSV,预计 {} 条 - rqrq", dataTotal);
// 设置响应头 - rqrq
response.setContentType("text/csv;charset=GB18030");
response.setCharacterEncoding("GB18030");
String fileName = URLEncoder.encode("TID_EPC日志导出.csv", "UTF-8");
response.setHeader("Content-Disposition", "attachment;filename=" + fileName);
// 创建流式CSV写入器 - rqrq
StreamCsvExportUtil.CsvStreamWriter writer = StreamCsvExportUtil.createSingleCsvWriter(
response.getOutputStream(), "TID_EPC日志导出.csv");
try {
// 写入表头 - rqrq
writer.writeHeader("序号", "EPC", "TID", "用户区", "LockiBtis", "密匙",
"写码成功", "读码成功", "EPC锁定", "强度/读距", "扫描时间", "计数");
// 使用AtomicLong记录进度lambda中需要final或effectively final- rqrq
AtomicLong processedCount = new AtomicLong(0);
// 真正的流式查询ResultHandler逐行回调 - rqrq
// 使用writeRowSafe避免抛出RuntimeException导致MyBatis继续处理后续行 - rqrq
mesTidEpcLogMapper.streamExportList(data, resultContext -> {
// 如果已发生错误直接跳过后续行避免资源浪费- rqrq
if (writer.hasError()) {
return;
}
MesTidEpcLogExportData row = resultContext.getResultObject();
// 使用安全写入方法失败时自动设置错误状态 - rqrq
boolean success = writer.writeRowSafe(
writer.asText(row.getSeqNo()), // 序号可能以0开头
writer.asText(row.getEpc()), // EPC编码
writer.asText(row.getTid()), // TID编码
writer.asText(row.getUserArea()), // 用户区
writer.asText(row.getLockBits()), // LockBits
writer.asText(row.getSecretKey()), // 密匙重要防止丢失前导0
writer.escape(row.getWriteSuccess()),
writer.escape(row.getReadSuccess()),
writer.escape(row.getEpcLocked()),
writer.escape(row.getSignalStrength()),
writer.formatDate(row.getScanTime()),
writer.asText(row.getCountInfo()) // 计数信息
);
if (success) {
long count = processedCount.incrementAndGet();
if (count % 100000 == 0) {
log.info("流式导出进度: {}/{} 条 - rqrq", count, dataTotal);
}
}
});
// 查询结束后检查是否发生错误 - rqrq
if (writer.hasError()) {
log.error("流式导出过程中发生错误: {} - rqrq", writer.getLastError().getMessage());
// 不抛异常因为部分数据可能已发送给客户端
}
log.info("流式导出完成,共 {} 条 - rqrq", writer.getWrittenRows());
} finally {
writer.close();
}
}
/**
* @Description 真正的流式ZIP导出多CSV文件- rqrq
*
* <p><b>核心原理</b></p>
* <ul>
* <li>MyBatis ResultHandler 逐行回调</li>
* <li>每收到一行数据立即写入当前CSV</li>
* <li>当前CSV达到100万行自动切换到新CSV</li>
* <li>内存中始终只有1行数据</li>
* </ul>
*
* @param data 查询条件
* @param dataTotal 数据总数
* @param response HttpServletResponse
* @author rqrq
* @date 2025/02/04
*/
private void exportToZipRealStreaming(MesTidEpcLogData data, long dataTotal, HttpServletResponse response) throws IOException {
log.info("【真正流式导出】ZIP多CSV,预计 {} 条 - rqrq", dataTotal);
// 设置响应头 - rqrq
response.setContentType("application/zip");
response.setCharacterEncoding("UTF-8");
String fileName = URLEncoder.encode("TID_EPC日志导出.zip", "UTF-8");
response.setHeader("Content-Disposition", "attachment;filename=" + fileName);
// 创建流式ZIP写入器每个CSV最多100万行- rqrq
StreamCsvExportUtil.CsvStreamWriter writer = StreamCsvExportUtil.createZipCsvWriter(
response.getOutputStream(), "TID_EPC日志导出.zip", "TID_EPC日志", MAX_ROWS_PER_CSV);
try {
// 写入表头自动应用到每个新建的CSV文件- rqrq
writer.writeHeader("序号", "EPC", "TID", "用户区", "LockiBtis", "密匙",
"写码成功", "读码成功", "EPC锁定", "强度/读距", "扫描时间", "计数");
// 使用AtomicLong记录进度 - rqrq
AtomicLong processedCount = new AtomicLong(0);
// 真正的流式查询ResultHandler逐行回调 - rqrq
// 使用writeRowSafe避免抛出RuntimeException导致MyBatis继续处理后续行 - rqrq
mesTidEpcLogMapper.streamExportList(data, resultContext -> {
// 如果已发生错误直接跳过后续行避免资源浪费- rqrq
if (writer.hasError()) {
return;
}
MesTidEpcLogExportData row = resultContext.getResultObject();
// 使用安全写入方法失败时自动设置错误状态 - rqrq
boolean success = writer.writeRowSafe(
writer.asText(row.getSeqNo()),
writer.asText(row.getEpc()),
writer.asText(row.getTid()),
writer.asText(row.getUserArea()),
writer.asText(row.getLockBits()),
writer.asText(row.getSecretKey()),
writer.escape(row.getWriteSuccess()),
writer.escape(row.getReadSuccess()),
writer.escape(row.getEpcLocked()),
writer.escape(row.getSignalStrength()),
writer.formatDate(row.getScanTime()),
writer.asText(row.getCountInfo())
);
if (success) {
long count = processedCount.incrementAndGet();
if (count % 100000 == 0) {
log.info("流式导出进度: {}/{} 条 - rqrq", count, dataTotal);
}
}
});
// 查询结束后检查是否发生错误 - rqrq
if (writer.hasError()) {
log.error("流式导出过程中发生错误: {} - rqrq", writer.getLastError().getMessage());
}
log.info("流式导出完成,共 {} 条 - rqrq", writer.getWrittenRows());
} finally {
writer.close();
}
}
// ==================================================================================
// ========================= V4版本伪流式分批查询分批写入已被V5取代=============
// ==================================================================================
/**
* @Description V4备份写入ZIP压缩包伪流式分批查询分批写入- rqrq
* *
* <p><b>流式写入</b>边查边写内存占用低</p>
* <p><b>已被V5取代</b>保留此方法用于对比或回退</p>
* <p><b>调用方式</b>如需使用在exportCsv方法中替换exportToZipRealStreaming为此方法</p>
* *
* @param data 查询条件 * @param data 查询条件
* @param dataTotal 数据总数 * @param dataTotal 数据总数
@ -361,6 +546,7 @@ public class MesTidEpcLogServiceImpl extends ServiceImpl<MesTidEpcLogMapper, Mes
* @author rqrq * @author rqrq
* @date 2025/02/03 * @date 2025/02/03
*/ */
@SuppressWarnings("unused")
private void writeToZipWithMultipleCsv(MesTidEpcLogData data, long dataTotal, HttpServletResponse response) throws IOException { private void writeToZipWithMultipleCsv(MesTidEpcLogData data, long dataTotal, HttpServletResponse response) throws IOException {
log.info("开始生成ZIP压缩包(多CSV),数据总数: {} 条 - rqrq", dataTotal); log.info("开始生成ZIP压缩包(多CSV),数据总数: {} 条 - rqrq", dataTotal);

54
src/main/resources/mapper/pms/MesTidEpcLogMapper.xml

@ -241,6 +241,60 @@
ORDER BY upload_time DESC, id DESC ORDER BY upload_time DESC, id DESC
</select> </select>
<!-- rqrq - 流式查询导出数据(真正的逐行读取,配合ResultHandler使用)-->
<!--
【流式查询说明】
1. 使用 resultSetType="FORWARD_ONLY" 和 fetchSize="1000"
2. 配合JDBC URL中的 responseBuffering=adaptive
3. 数据从数据库逐行读取,不会全部加载到内存
4. ORDER BY id 保证数据顺序稳定
-->
<select id="streamExportList" resultType="MesTidEpcLogExportData" resultSetType="FORWARD_ONLY" fetchSize="1000">
SELECT
seq_no AS seqNo,
epc,
tid,
user_area AS userArea,
lock_bits AS lockBits,
secret_key AS secretKey,
write_success AS writeSuccess,
read_success AS readSuccess,
epc_locked AS epcLocked,
signal_strength AS signalStrength,
scan_time AS scanTime,
count_info AS countInfo
FROM mes_tid_epc_log
WHERE 1 = 1
<if test="query.site != null and query.site != ''">
AND site = #{query.site}
</if>
<if test="query.buNo != null and query.buNo != ''">
AND bu_no = #{query.buNo}
</if>
<if test="query.searchSeqNo != null and query.searchSeqNo != ''">
AND seq_no LIKE '%' + #{query.searchSeqNo} + '%'
</if>
<if test="query.searchEpc != null and query.searchEpc != ''">
AND epc LIKE '%' + #{query.searchEpc} + '%'
</if>
<if test="query.searchTid != null and query.searchTid != ''">
AND tid LIKE '%' + #{query.searchTid} + '%'
</if>
<if test="query.searchUserArea != null and query.searchUserArea != ''">
AND user_area LIKE '%' + #{query.searchUserArea} + '%'
</if>
<if test="query.scanTimeStart != null">
AND scan_time &gt;= #{query.scanTimeStart}
</if>
<if test="query.scanTimeEnd != null">
AND scan_time &lt;= #{query.scanTimeEnd}
</if>
<if test="query.searchBatchNo != null and query.searchBatchNo != ''">
AND batch_no LIKE '%' + #{query.searchBatchNo} + '%'
</if>
ORDER BY id ASC
</select>
<!-- rqrq - 批量插入日志数据 --> <!-- rqrq - 批量插入日志数据 -->
<insert id="batchInsert" parameterType="java.util.List"> <insert id="batchInsert" parameterType="java.util.List">
INSERT INTO mes_tid_epc_log ( INSERT INTO mes_tid_epc_log (

Loading…
Cancel
Save