fix: skip already-reviewing stores on resubmit; clean up orphaned SUBMITTING states

- StoreSubmissionService: skip UNDER_REVIEW/APPROVED stores in preflight loop to prevent duplicate submissions
- StoreSubmissionService: post-batch sweep marks any still-SUBMITTING stores in same batch as REJECTED
- StoreSubmissionService: @EventListener(ApplicationReadyEvent) clears orphaned SUBMITTING states on startup
- AppVersionRepository: add findAllWithSubmittingStores() native query for startup sweep

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
这个提交包含在:
XuqmGroup 2026-05-16 15:58:27 +08:00
父节点 4136cd57b6
当前提交 8dea96ef5e
共有 2 个文件被更改,包括 89 次插入0 次删除

查看文件

@ -2,6 +2,7 @@ package com.xuqm.update.repository;
import com.xuqm.update.entity.AppVersionEntity; import com.xuqm.update.entity.AppVersionEntity;
import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.List; import java.util.List;
@ -28,4 +29,7 @@ public interface AppVersionRepository extends JpaRepository<AppVersionEntity, St
List<AppVersionEntity> findByStoreSubmitModeAndStoreSubmitScheduledAtBefore( List<AppVersionEntity> findByStoreSubmitModeAndStoreSubmitScheduledAtBefore(
String storeSubmitMode, LocalDateTime before); String storeSubmitMode, LocalDateTime before);
@Query(value = "SELECT * FROM update_app_version WHERE store_review_status LIKE '%SUBMITTING%'", nativeQuery = true)
List<AppVersionEntity> findAllWithSubmittingStores();
} }

查看文件

@ -12,6 +12,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.FileSystemResource; import org.springframework.core.io.FileSystemResource;
import org.springframework.http.*; import org.springframework.http.*;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -147,6 +149,19 @@ public class StoreSubmissionService {
"packageName", v.getPackageName() == null ? "" : v.getPackageName() "packageName", v.getPackageName() == null ? "" : v.getPackageName()
), null); ), null);
try { try {
// Skip stores already in active review to avoid duplicate submissions
String activeState = currentStoreState(v.getStoreReviewStatus(), storeType);
if ("UNDER_REVIEW".equals(activeState) || "APPROVED".equals(activeState)) {
skippedCount.incrementAndGet();
log.info("Skipping {} for version={} batchId={} — already in state {}",
storeType, versionId, batchId, activeState);
recordStoreEvent(v, versionId, batchId, storeType, "STORE_SUBMIT_STORE_SKIPPED", Map.of(
"reason", "already in " + activeState,
"currentState", activeState
), null);
continue;
}
AppStoreConfigEntity cfg = configRepo AppStoreConfigEntity cfg = configRepo
.findByAppKeyAndStoreType(v.getAppKey(), AppStoreConfigEntity.StoreType.valueOf(storeType)) .findByAppKeyAndStoreType(v.getAppKey(), AppStoreConfigEntity.StoreType.valueOf(storeType))
.orElse(null); .orElse(null);
@ -222,6 +237,8 @@ public class StoreSubmissionService {
executeSinglePlan(plan, v, apkFile, versionId, batchId, successCount, rejectedCount); executeSinglePlan(plan, v, apkFile, versionId, batchId, successCount, rejectedCount);
} }
} }
sweepStuckSubmittingForBatch(versionId, batchId);
recordBatchEvent(v, versionId, batchId, "STORE_SUBMIT_BATCH_END", startedAt, Map.of( recordBatchEvent(v, versionId, batchId, "STORE_SUBMIT_BATCH_END", startedAt, Map.of(
"targets", targets, "targets", targets,
"successCount", successCount.get(), "successCount", successCount.get(),
@ -242,6 +259,74 @@ public class StoreSubmissionService {
} }
} }
/**
* On startup, mark any stores stuck in SUBMITTING as FAILED.
* A SUBMITTING state with no corresponding active thread means the service was
* restarted mid-batch the submission did not complete.
*/
@EventListener(ApplicationReadyEvent.class)
public void sweepOrphanedSubmittingOnStartup() {
List<AppVersionEntity> candidates = versionRepo.findAllWithSubmittingStores();
if (candidates.isEmpty()) return;
log.info("Startup sweep: found {} version(s) with orphaned SUBMITTING states", candidates.size());
for (AppVersionEntity v : candidates) {
sweepStuckSubmittingForVersion(v.getId(), null, "服务重启,提交中断");
}
}
/**
* After a batch finishes, sweep any stores in this batch that are still SUBMITTING.
* This catches cases where a thread was cancelled (timeout) or threw an unchecked
* exception that bypassed the normal failure path in executeSinglePlan.
*/
private void sweepStuckSubmittingForBatch(String versionId, String batchId) {
sweepStuckSubmittingForVersion(versionId, batchId, "提交中断:未收到厂商确认响应");
}
private String currentStoreState(String reviewStatusJson, String storeType) {
if (reviewStatusJson == null || reviewStatusJson.isBlank()) return "";
try {
Map<String, Object> reviewMap = mapper.readValue(reviewStatusJson, new TypeReference<>() {});
Object entry = reviewMap.get(storeType);
if (entry instanceof Map<?, ?> m) {
Object state = m.get("state");
return state instanceof String s ? s : "";
}
return entry instanceof String s ? s : "";
} catch (Exception e) {
return "";
}
}
@SuppressWarnings("unchecked")
private void sweepStuckSubmittingForVersion(String versionId, String matchBatchId, String failReason) {
try {
AppVersionEntity v = versionRepo.findById(versionId).orElse(null);
if (v == null || v.getStoreReviewStatus() == null) return;
Map<String, Object> reviewMap = mapper.readValue(v.getStoreReviewStatus(),
new TypeReference<Map<String, Object>>() {});
for (Map.Entry<String, Object> entry : new ArrayList<>(reviewMap.entrySet())) {
String store = entry.getKey();
if (!(entry.getValue() instanceof Map<?, ?> rawInfo)) continue;
Map<String, Object> info = (Map<String, Object>) rawInfo;
String state = info.get("state") instanceof String s ? s : "";
if (!"SUBMITTING".equals(state)) continue;
String entryBatchId = info.get("batchId") instanceof String s ? s : "";
if (matchBatchId != null && !matchBatchId.equals(entryBatchId)) continue;
log.warn("Sweeping stuck SUBMITTING store={} version={} batchId={} reason={}",
store, versionId, entryBatchId, failReason);
try {
storeService.updateStoreReview(versionId, store,
AppVersionEntity.StoreReviewState.REJECTED, failReason);
} catch (Exception ex) {
log.error("Failed to sweep stuck store={} version={}: {}", store, versionId, ex.getMessage());
}
}
} catch (Exception ex) {
log.error("sweepStuckSubmitting failed for version={}: {}", versionId, ex.getMessage());
}
}
private void executeSinglePlan(SubmissionPlan plan, AppVersionEntity v, File apkFile, private void executeSinglePlan(SubmissionPlan plan, AppVersionEntity v, File apkFile,
String versionId, String batchId, String versionId, String batchId,
AtomicInteger successCount, AtomicInteger rejectedCount) { AtomicInteger successCount, AtomicInteger rejectedCount) {