From a9ab3ba802de5bc67d64c8d164a909f69a77feb7 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=B8=B8=E7=86=9F=E5=90=B4=E5=BD=A6=E7=A5=96?=
功能特点: 使用方式: 异常处理机制: 【流式查询原理】 【V5版本:真正的流式导出】 【导出策略】
+ *
+ *
+ *
+ * // 1. 创建Writer
+ * StreamCsvExportUtil.CsvStreamWriter writer = StreamCsvExportUtil.createSingleCsvWriter(response.getOutputStream(), "导出文件.csv");
+ * // 或创建ZIP Writer
+ * StreamCsvExportUtil.CsvStreamWriter writer = StreamCsvExportUtil.createZipCsvWriter(response.getOutputStream(), "导出文件.zip", "数据");
+ *
+ * // 2. 写入表头
+ * writer.writeHeader("列1", "列2", "列3");
+ *
+ * // 3. 在ResultHandler中逐行写入
+ * mapper.streamQuery(query, result -> {
+ * writer.writeRow(
+ * writer.asText(result.getField1()), // 强制文本格式
+ * writer.escape(result.getField2()), // 普通转义
+ * writer.formatDate(result.getDate()) // 日期格式化
+ * );
+ * });
+ *
+ * // 4. 关闭Writer
+ * writer.close();
+ *
+ *
+ * @author rqrq
+ * @date 2025/02/04
+ */
+@Slf4j
+public class StreamCsvExportUtil {
+
+ /**
+ * 默认编码(GB18030兼容Excel中文)- rqrq
+ */
+ private static final String DEFAULT_CHARSET = "GB18030";
+
+ /**
+ * 默认缓冲区大小(128KB)- rqrq
+ */
+ private static final int DEFAULT_BUFFER_SIZE = 8192 * 16;
+
+ /**
+ * 每个CSV文件最大行数(默认100万)- rqrq
+ */
+ public static final int DEFAULT_MAX_ROWS_PER_CSV = 1000000;
+
+ /**
+ * @Description 创建单CSV文件写入器 - rqrq
+ * @param outputStream 输出流
+ * @param fileName 文件名(用于日志)
+ * @return CsvStreamWriter
+ * @author rqrq
+ */
+ public static CsvStreamWriter createSingleCsvWriter(OutputStream outputStream, String fileName) throws IOException {
+ return new SingleCsvWriter(outputStream, fileName);
+ }
+
+ /**
+ * @Description 创建ZIP压缩包写入器(多CSV文件)- rqrq
+ * @param outputStream 输出流
+ * @param zipFileName ZIP文件名(用于日志)
+ * @param csvFilePrefix CSV文件名前缀(如"数据",则生成"数据_1.csv"、"数据_2.csv")
+ * @return CsvStreamWriter
+ * @author rqrq
+ */
+ public static CsvStreamWriter createZipCsvWriter(OutputStream outputStream, String zipFileName, String csvFilePrefix) throws IOException {
+ return new ZipCsvWriter(outputStream, zipFileName, csvFilePrefix, DEFAULT_MAX_ROWS_PER_CSV);
+ }
+
+ /**
+ * @Description 创建ZIP压缩包写入器(自定义每个CSV最大行数)- rqrq
+ * @param outputStream 输出流
+ * @param zipFileName ZIP文件名
+ * @param csvFilePrefix CSV文件名前缀
+ * @param maxRowsPerCsv 每个CSV最大行数
+ * @return CsvStreamWriter
+ * @author rqrq
+ */
+ public static CsvStreamWriter createZipCsvWriter(OutputStream outputStream, String zipFileName, String csvFilePrefix, int maxRowsPerCsv) throws IOException {
+ return new ZipCsvWriter(outputStream, zipFileName, csvFilePrefix, maxRowsPerCsv);
+ }
+
+ /**
+ * CSV流式写入器接口 - rqrq
+ *
+ *
+ *
+ */
+ public interface CsvStreamWriter extends AutoCloseable {
+
+ /**
+ * 写入表头行 - rqrq
+ */
+ void writeHeader(String... headers) throws IOException;
+
+ /**
+ * 写入数据行(如果已发生错误则跳过)- rqrq
+ */
+ void writeRow(String... values) throws IOException;
+
+ /**
+ * 写入数据行(安全版本,不抛异常,返回是否成功)- rqrq
+ * 用于 ResultHandler 中避免抛出 RuntimeException
+ */
+ boolean writeRowSafe(String... values);
+
+ /**
+ * 获取已写入行数 - rqrq
+ */
+ long getWrittenRows();
+
+ /**
+ * 刷新缓冲区 - rqrq
+ */
+ void flush() throws IOException;
+
+ /**
+ * 关闭写入器 - rqrq
+ */
+ @Override
+ void close() throws IOException;
+
+ /**
+ * 检查是否已发生错误 - rqrq
+ */
+ boolean hasError();
+
+ /**
+ * 获取最后一次错误 - rqrq
+ */
+ Throwable getLastError();
+
+ /**
+ * 手动设置错误状态(用于外部捕获到异常时)- rqrq
+ */
+ void setError(Throwable e);
+
+ /**
+ * CSV字段转义(处理逗号、引号、换行)- rqrq
+ */
+ default String escape(String value) {
+ if (value == null) {
+ return "";
+ }
+ if (value.contains(",") || value.contains("\"") || value.contains("\n") || value.contains("\r")) {
+ return "\"" + value.replace("\"", "\"\"") + "\"";
+ }
+ return value;
+ }
+
+ /**
+ * 强制文本格式(防止前导零丢失)- rqrq
+ * 使用 ="值" 格式,Excel会识别为文本
+ */
+ default String asText(String value) {
+ if (value == null || value.isEmpty()) {
+ return "";
+ }
+ return "=\"" + value.replace("\"", "\"\"") + "\"";
+ }
+
+ /**
+ * 格式化日期 - rqrq
+ */
+ default String formatDate(Date date) {
+ if (date == null) {
+ return "";
+ }
+ return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date);
+ }
+
+ /**
+ * 格式化日期(自定义格式)- rqrq
+ */
+ default String formatDate(Date date, String pattern) {
+ if (date == null) {
+ return "";
+ }
+ return new SimpleDateFormat(pattern).format(date);
+ }
+ }
+
+ /**
+ * 单CSV文件写入器实现 - rqrq
+ */
+ private static class SingleCsvWriter implements CsvStreamWriter {
+ private final BufferedWriter writer;
+ private final String fileName;
+ private long writtenRows = 0;
+
+ // 错误状态(volatile保证多线程可见性)- rqrq
+ private volatile boolean hasError = false;
+ private volatile Throwable lastError = null;
+
+ SingleCsvWriter(OutputStream outputStream, String fileName) throws IOException {
+ this.fileName = fileName;
+ this.writer = new BufferedWriter(new OutputStreamWriter(outputStream, DEFAULT_CHARSET), DEFAULT_BUFFER_SIZE);
+ log.info("创建单CSV写入器: {} - rqrq", fileName);
+ }
+
+ @Override
+ public void writeHeader(String... headers) throws IOException {
+ if (hasError) return;
+ try {
+ writer.write(String.join(",", headers));
+ writer.newLine();
+ } catch (IOException e) {
+ setError(e);
+ throw e;
+ }
+ }
+
+ @Override
+ public void writeRow(String... values) throws IOException {
+ // 如果已发生错误,直接跳过 - rqrq
+ if (hasError) {
+ return;
+ }
+
+ try {
+ writer.write(String.join(",", values));
+ writer.newLine();
+ writtenRows++;
+
+ if (writtenRows % 100000 == 0) {
+ log.info("CSV已写入 {} 行 - rqrq", writtenRows);
+ flush();
+ }
+ } catch (IOException e) {
+ setError(e);
+ throw e;
+ }
+ }
+
+ @Override
+ public boolean writeRowSafe(String... values) {
+ if (hasError) {
+ return false;
+ }
+ try {
+ writeRow(values);
+ return true;
+ } catch (IOException e) {
+ // 已在writeRow中设置了错误状态
+ log.warn("写入行失败,后续行将跳过: {} - rqrq", e.getMessage());
+ return false;
+ }
+ }
+
+ @Override
+ public long getWrittenRows() {
+ return writtenRows;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (!hasError) {
+ writer.flush();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (!hasError) {
+ flush();
+ }
+ writer.close();
+ } catch (IOException e) {
+ log.warn("关闭写入器时发生错误: {} - rqrq", e.getMessage());
+ }
+ log.info("单CSV写入完成: {},共 {} 行,hasError: {} - rqrq", fileName, writtenRows, hasError);
+ }
+
+ @Override
+ public boolean hasError() {
+ return hasError;
+ }
+
+ @Override
+ public Throwable getLastError() {
+ return lastError;
+ }
+
+ @Override
+ public void setError(Throwable e) {
+ this.hasError = true;
+ this.lastError = e;
+ log.error("CSV写入器发生错误,后续写入将跳过: {} - rqrq", e.getMessage());
+ }
+ }
+
+ /**
+ * ZIP压缩包写入器实现(多CSV文件)- rqrq
+ */
+ private static class ZipCsvWriter implements CsvStreamWriter {
+ private final ZipOutputStream zipOut;
+ private final String zipFileName;
+ private final String csvFilePrefix;
+ private final int maxRowsPerCsv;
+
+ private BufferedWriter currentWriter;
+ private int currentCsvIndex = 0;
+ private long currentCsvRows = 0;
+ private long totalWrittenRows = 0;
+ private String[] headerRow;
+
+ // 错误状态(volatile保证多线程可见性)- rqrq
+ private volatile boolean hasError = false;
+ private volatile Throwable lastError = null;
+
+ ZipCsvWriter(OutputStream outputStream, String zipFileName, String csvFilePrefix, int maxRowsPerCsv) throws IOException {
+ this.zipOut = new ZipOutputStream(outputStream);
+ this.zipFileName = zipFileName;
+ this.csvFilePrefix = csvFilePrefix;
+ this.maxRowsPerCsv = maxRowsPerCsv;
+ log.info("创建ZIP写入器: {},每个CSV最多 {} 行 - rqrq", zipFileName, maxRowsPerCsv);
+ }
+
+ @Override
+ public void writeHeader(String... headers) throws IOException {
+ if (hasError) return;
+ this.headerRow = headers;
+ try {
+ // 创建第一个CSV文件并写入表头
+ createNewCsvFile();
+ } catch (IOException e) {
+ setError(e);
+ throw e;
+ }
+ }
+
+ @Override
+ public void writeRow(String... values) throws IOException {
+ // 如果已发生错误,直接跳过 - rqrq
+ if (hasError) {
+ return;
+ }
+
+ try {
+ // 检查是否需要创建新CSV文件
+ if (currentCsvRows >= maxRowsPerCsv) {
+ closeCurrentCsv();
+ createNewCsvFile();
+ }
+
+ currentWriter.write(String.join(",", values));
+ currentWriter.newLine();
+ currentCsvRows++;
+ totalWrittenRows++;
+
+ if (totalWrittenRows % 100000 == 0) {
+ log.info("ZIP总计已写入 {} 行,当前CSV {} 有 {} 行 - rqrq",
+ totalWrittenRows, currentCsvIndex, currentCsvRows);
+ flush();
+ }
+ } catch (IOException e) {
+ setError(e);
+ throw e;
+ }
+ }
+
+ @Override
+ public boolean writeRowSafe(String... values) {
+ if (hasError) {
+ return false;
+ }
+ try {
+ writeRow(values);
+ return true;
+ } catch (IOException e) {
+ // 已在writeRow中设置了错误状态
+ log.warn("写入行失败,后续行将跳过: {} - rqrq", e.getMessage());
+ return false;
+ }
+ }
+
+ @Override
+ public long getWrittenRows() {
+ return totalWrittenRows;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (!hasError && currentWriter != null) {
+ currentWriter.flush();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (!hasError) {
+ closeCurrentCsv();
+ zipOut.finish();
+ }
+ zipOut.close();
+ } catch (IOException e) {
+ log.warn("关闭ZIP写入器时发生错误: {} - rqrq", e.getMessage());
+ }
+ log.info("ZIP写入完成: {},共 {} 个CSV文件,总计 {} 行,hasError: {} - rqrq",
+ zipFileName, currentCsvIndex, totalWrittenRows, hasError);
+ }
+
+ @Override
+ public boolean hasError() {
+ return hasError;
+ }
+
+ @Override
+ public Throwable getLastError() {
+ return lastError;
+ }
+
+ @Override
+ public void setError(Throwable e) {
+ this.hasError = true;
+ this.lastError = e;
+ log.error("ZIP写入器发生错误,后续写入将跳过: {} - rqrq", e.getMessage());
+ }
+
+ private void createNewCsvFile() throws IOException {
+ currentCsvIndex++;
+ // 使用ASCII文件名避免Windows解压乱码 - rqrq
+ String csvFileName = "TID_EPC_Log_" + currentCsvIndex + ".csv";
+ ZipEntry entry = new ZipEntry(csvFileName);
+ zipOut.putNextEntry(entry);
+
+ // 注意:ZIP内的Writer不能关闭底层流
+ currentWriter = new BufferedWriter(new OutputStreamWriter(zipOut, DEFAULT_CHARSET), DEFAULT_BUFFER_SIZE);
+
+ // 写入表头
+ if (headerRow != null) {
+ currentWriter.write(String.join(",", headerRow));
+ currentWriter.newLine();
+ }
+
+ currentCsvRows = 0;
+ log.info("创建CSV文件: {} - rqrq", csvFileName);
+ }
+
+ private void closeCurrentCsv() throws IOException {
+ if (currentWriter != null) {
+ currentWriter.flush();
+ zipOut.closeEntry();
+ log.info("CSV文件 {} 写入完成,共 {} 行 - rqrq", currentCsvIndex, currentCsvRows);
+ }
+ }
+ }
+
+ /**
+ * @Description 构建CSV数据行(通用方法)- rqrq
+ * @param extractors 字段提取器数组
+ * @param data 数据对象
+ * @return String[] CSV行数据
+ * @author rqrq
+ */
+ @SafeVarargs
+ public static
+ *
+ *
+ * @param query 查询条件
+ * @param handler 结果处理器,逐行处理数据
+ * @author rqrq
+ * @date 2025/02/04
+ */
+ @Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = 1000)
+ @ResultType(MesTidEpcLogExportData.class)
+ void streamExportList(@Param("query") MesTidEpcLogData query, ResultHandler
+ *
*
*
@@ -305,15 +316,16 @@ public class MesTidEpcLogServiceImpl extends ServiceImpl
【核心原理】
+ *【流式写入】边查边写,内存占用低
+ *【已被V5取代】保留此方法用于对比或回退
+ *【调用方式】如需使用,在exportCsv方法中替换exportToZipRealStreaming为此方法
* * @param data 查询条件 * @param dataTotal 数据总数 @@ -361,6 +546,7 @@ public class MesTidEpcLogServiceImpl extends ServiceImpl