Browse Source

乐观锁

master
常熟吴彦祖 3 months ago
parent
commit
131ca34981
  1. 29
      src/main/java/com/gaotao/modules/automatedWarehouse/service/impl/WcsTaskServiceImpl.java
  2. 20
      src/main/java/com/gaotao/modules/automatedWarehouse/task/WcsTaskScheduler.java
  3. 52
      src/main/java/com/gaotao/modules/other/service/impl/InventoryMoveServiceImpl.java
  4. 10
      src/main/resources/mapper/automatedWarehouse/WcsIntegrationMapper.xml

29
src/main/java/com/gaotao/modules/automatedWarehouse/service/impl/WcsTaskServiceImpl.java

@ -38,14 +38,23 @@ public class WcsTaskServiceImpl implements WcsTaskService {
@Override
public void processWcsCallbackTask(WcsCallbackTask callbackTask) {
// 判断是否为超时恢复的任务
boolean isTimeoutRecovery = "处理中".equals(callbackTask.getStatus());
try {
if (isTimeoutRecovery) {
log.warn("检测到超时任务,尝试恢复:id={}, palletId={}, transType={}, processStartTime={}, retryCount={}",
callbackTask.getId(), callbackTask.getPalletId(), callbackTask.getTransTypeDesc(),
callbackTask.getProcessStartTime(), callbackTask.getRetryCount());
} else {
log.info("处理WCS回调任务:palletId={}, transType={}, currentStatus={}",
callbackTask.getPalletId(), callbackTask.getTransTypeDesc(), callbackTask.getStatus());
}
// 1. 使用乐观锁更新状态为处理中防止重复处理
int updateCount = wcsIntegrationMapper.updateWcsCallbackTaskStatusWithLock(
callbackTask.getId(),
callbackTask.getStatus(), // 原状态已录入 or 处理失败
callbackTask.getStatus(), // 原状态已录入 or 处理失败 or 处理中
"处理中",
new Date()
);
@ -57,7 +66,8 @@ public class WcsTaskServiceImpl implements WcsTaskService {
return; // 跳过不抛异常
}
log.info("成功锁定任务,开始处理:palletId={}", callbackTask.getPalletId());
log.info("成功锁定任务,开始处理:palletId={}, isTimeoutRecovery={}",
callbackTask.getPalletId(), isTimeoutRecovery);
// 3. 构建移库请求参数
WareHouseTransferRequest request = buildWareHouseTransferRequest(callbackTask);
@ -81,13 +91,20 @@ public class WcsTaskServiceImpl implements WcsTaskService {
wcsIntegrationMapper.updateWcsCallbackTaskStatus(callbackTask.getId(), "已完成", null, new Date(), null);
} catch (Exception e) {
log.error("处理WCS回调任务失败:palletId={}", callbackTask.getPalletId(), e); // 记录完整堆栈
log.error("处理WCS回调任务失败:palletId={}, isTimeoutRecovery={}",
callbackTask.getPalletId(), isTimeoutRecovery, e); // 记录完整堆栈
// 更新重试次数
int newRetryCount = (callbackTask.getRetryCount() == null ? 0 : callbackTask.getRetryCount()) + 1;
// 更新重试次数超时恢复的任务也算作一次重试
int currentRetryCount = (callbackTask.getRetryCount() == null ? 0 : callbackTask.getRetryCount());
int newRetryCount = isTimeoutRecovery ? currentRetryCount + 1 : currentRetryCount + 1;
String status = newRetryCount >= 3 ? "处理失败" : "已录入"; // 重试3次后标记为失败
wcsIntegrationMapper.updateWcsCallbackTaskStatus(callbackTask.getId(), status, null, null, e.getMessage());
String errorMsg = (isTimeoutRecovery ? "[超时恢复]" : "") + e.getMessage();
log.warn("任务处理失败,更新状态:id={}, retryCount={}->{}, status={}",
callbackTask.getId(), currentRetryCount, newRetryCount, status);
wcsIntegrationMapper.updateWcsCallbackTaskStatus(callbackTask.getId(), status, null, null, errorMsg);
wcsIntegrationMapper.updateWcsCallbackTaskRetryCount(callbackTask.getId(), newRetryCount);
throw new RuntimeException("处理WCS回调任务失败: " + e.getMessage(), e);

20
src/main/java/com/gaotao/modules/automatedWarehouse/task/WcsTaskScheduler.java

@ -35,20 +35,36 @@ public class WcsTaskScheduler {
// 获取待处理的WCS回调任务
List<WcsCallbackTask> list = wcsTaskService.getPendingWcsCallbackTasks("55");
if (list.isEmpty()) {
log.info("无待处理的WCS回调任务");
return;
}
// 统计信息
long normalTaskCount = list.stream().filter(t -> !"处理中".equals(t.getStatus())).count();
long timeoutTaskCount = list.stream().filter(t -> "处理中".equals(t.getStatus())).count();
log.info("获取到 {} 个待处理任务(正常: {}, 超时恢复: {})", list.size(), normalTaskCount, timeoutTaskCount);
int successCount = 0;
int failCount = 0;
for (WcsCallbackTask callbackTask : list) {
try {
// 委托给业务服务处理
wcsTaskService.processWcsCallbackTask(callbackTask);
successCount++;
} catch (Exception e) {
log.error("处理WCS回调任务失败:palletId={}, error={}", callbackTask.getPalletId(), e.getMessage());
failCount++;
// 继续处理下一个数据
}
}
log.info("=== WCS入库/出库任务处理完成:总数={}, 成功={}, 失败={} ===", list.size(), successCount, failCount);
} catch (Exception e) {
log.error("=== WCS入库/出库定时任务执行失败 ===", e);
}
log.info("=== WCS入库/出库任务处理完成 ===");
}
/**

52
src/main/java/com/gaotao/modules/other/service/impl/InventoryMoveServiceImpl.java

@ -70,9 +70,6 @@ public class InventoryMoveServiceImpl implements InventoryMoveService {
public void confirmInventoryMoveForPallet(InventoryMoveRequestDto dto) {
try {
// 1. 验证目标库位
//validateTargetLocation(dto.getSite(), dto.getTargetLocationId());
// 2. 验证HandlingUnit
List<HandlingUnit> handlingUnits = validateHandlingUnits(dto);
@ -84,55 +81,6 @@ public class InventoryMoveServiceImpl implements InventoryMoveService {
// 4. 合并相同条件的HandlingUnit后调用IFS接口在更新HU之前
syncToIFSBatch(dto, handlingUnits);
//
// // 5. 按库存主键字段汇总HandlingUnit数据
// Map<String, InventoryGroup> inventoryGroups = groupHandlingUnitsByInventoryKey(handlingUnits, originalLocations, dto.getTargetLocationId());
//
// // 6. 批量处理库存操作出库和入库
// for (InventoryGroup group : inventoryGroups.values()) {
// // 从原库位出库
// /* inventoryStockService.changeInventoryStock(
// group.site,
// group.warehouseId,
// group.partNo,
// group.batchNo,
// group.originalLocationId,
// group.totalQty,
// group.wdr,
// "OUT"
// );*/
//
// // 到目标库位入库
// inventoryStockService.createStockWithLock(
// group.site,
// group.warehouseId,
// group.partNo,
// group.batchNo,
// group.targetLocationId,
// group.wdr,
// group.totalQty,
// "Y"
// );
//
// // 创建汇总的出库和入库事务记录
// createOutboundTransactionForGroup(dto, group, currentUser);
// createInboundTransactionForGroup(dto, group, currentUser);
// }
//
// // 7. 更新每个HandlingUnit的库位信息
// for (HandlingUnit hu : handlingUnits) {
// handlingUnitService.lambdaUpdate()
// .set(HandlingUnit::getLocationId, dto.getTargetLocationId())
// .set(HandlingUnit::getModifiedDate, new Date())
// .set(HandlingUnit::getModifiedBy, currentUser.getUserDisplay())
// .set(HandlingUnit::getInStockFlag, "Y")
// .eq(HandlingUnit::getSite, hu.getSite())
// .eq(HandlingUnit::getUnitId, hu.getUnitId())
// .update();
// }
//
// log.info("库存移库完成,处理了{}个HandlingUnit,共{}个库存分组", handlingUnits.size(), inventoryGroups.size());
} catch (Exception e) {
log.error("IFS库存移库失败", e);
throw new XJException("IFS库存移库失败: " + e.getMessage());

10
src/main/resources/mapper/automatedWarehouse/WcsIntegrationMapper.xml

@ -523,7 +523,7 @@
)
</insert>
<!-- 查询指定状态的WCS回调任务列表 - AI制作 -->
<!-- 查询指定状态的WCS回调任务列表(含超时恢复机制) - AI制作 -->
<select id="getWcsCallbackTaskListWithStatus" resultType="WcsCallbackTask">
select id, site, pallet_id as palletId, trans_type_desc as transTypeDesc,
to_warehouse_id as toWarehouseId, to_location_id as toLocationId,
@ -531,8 +531,12 @@
process_end_time as processEndTime, error_msg as errorMsg,
retry_count as retryCount, remark,task_no,item_no,to_station
from wcs_callback_task
where site = #{site} and status in ('已录入', '处理失败')
and (retry_count is null or 3>retry_count )
where site = #{site}
and (
status in ('已录入', '处理失败')
or (status = '处理中' and process_start_time &lt; DATEADD(MINUTE, -10, GETDATE()))
)
and (retry_count is null or retry_count &lt; 3)
order by created_time
</select>

Loading…
Cancel
Save