XuqmGroup-Server/im-service/src/main/java/com/xuqm/im/service/WebhookDispatchService.java

225 行
10 KiB
Java

package com.xuqm.im.service;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.xuqm.im.entity.WebhookAlertEntity;
import com.xuqm.im.entity.WebhookConfigEntity;
import com.xuqm.im.entity.WebhookDeliveryEntity;
import com.xuqm.im.model.WebhookCallbackEnvelope;
import com.xuqm.im.repository.WebhookAlertRepository;
import com.xuqm.im.repository.WebhookConfigRepository;
import com.xuqm.im.repository.WebhookDeliveryRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.net.URI;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpClient;
import java.time.Duration;
import java.time.LocalDateTime;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HexFormat;
import java.util.List;
import java.util.UUID;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
@Component
public class WebhookDispatchService {
private static final Logger log = LoggerFactory.getLogger(WebhookDispatchService.class);
private static final int MAX_RETRIES = 3;
private static final long[] RETRY_DELAYS_MS = {1000L, 5000L, 15000L};
private static final int ALERT_THRESHOLD = 10;
private final WebhookConfigRepository webhookRepository;
private final WebhookDeliveryRepository deliveryRepository;
private final WebhookAlertRepository alertRepository;
private final ImAppSecretClient appSecretClient;
private final ObjectMapper objectMapper;
@Value("${im.webhook-timeout-ms:3000}")
private int webhookTimeoutMs;
public WebhookDispatchService(WebhookConfigRepository webhookRepository,
WebhookDeliveryRepository deliveryRepository,
WebhookAlertRepository alertRepository,
ImAppSecretClient appSecretClient,
ObjectMapper objectMapper) {
this.webhookRepository = webhookRepository;
this.deliveryRepository = deliveryRepository;
this.alertRepository = alertRepository;
this.appSecretClient = appSecretClient;
this.objectMapper = objectMapper;
}
@Async
2026-05-07 19:39:42 +08:00
public void dispatch(String appKey, String callbackType, String callbackEvent, Object payload) {
List<WebhookConfigEntity> webhooks = webhookRepository.findByAppIdAndEnabledTrue(appKey);
if (webhooks.isEmpty()) {
return;
}
try {
2026-05-07 19:39:42 +08:00
String appSecret = appSecretClient.getAppSecret(appKey);
long requestTime = System.currentTimeMillis();
String nonce = UUID.randomUUID().toString().replace("-", "");
String callbackId = UUID.randomUUID().toString();
WebhookCallbackEnvelope envelope = new WebhookCallbackEnvelope(
callbackId,
callbackType,
callbackEvent,
requestTime,
objectMapper.valueToTree(payload),
null,
2026-05-07 19:39:42 +08:00
appKey
);
String body = objectMapper.writeValueAsString(envelope);
2026-05-07 19:39:42 +08:00
String signature = signWebhook(appKey, appSecret, requestTime, nonce, body);
for (WebhookConfigEntity webhook : webhooks) {
2026-05-07 19:39:42 +08:00
deliverWithRetry(appKey, callbackId, callbackEvent, webhook, body, signature, requestTime, nonce);
}
} catch (Exception e) {
2026-05-07 19:39:42 +08:00
log.warn("Webhook dispatch prepare failed appKey={} event={}: {}", appKey, callbackEvent, e.getMessage());
}
}
2026-05-07 19:39:42 +08:00
private void deliverWithRetry(String appKey, String callbackId, String callbackEvent,
WebhookConfigEntity webhook, String body, String signature,
long requestTime, String nonce) {
HttpClient client = HttpClient.newBuilder()
.connectTimeout(Duration.ofMillis(webhookTimeoutMs))
.build();
for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) {
WebhookDeliveryEntity delivery = new WebhookDeliveryEntity();
delivery.setId(UUID.randomUUID().toString());
2026-05-07 19:39:42 +08:00
delivery.setAppId(appKey);
delivery.setCallbackId(callbackId);
delivery.setCallbackEvent(callbackEvent);
delivery.setUrl(webhook.getUrl());
delivery.setAttempt(attempt);
delivery.setCreatedAt(LocalDateTime.now());
try {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(webhook.getUrl()))
.timeout(Duration.ofMillis(webhookTimeoutMs))
.header("Content-Type", "application/json")
2026-05-07 19:39:42 +08:00
.header("X-App-Id", appKey)
.header("X-App-Timestamp", String.valueOf(requestTime))
.header("X-App-Nonce", nonce)
.header("X-App-Signature", signature)
.POST(HttpRequest.BodyPublishers.ofString(body))
.build();
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
delivery.setHttpStatus(response.statusCode());
delivery.setResponseBody(truncate(response.body(), 4000));
if (response.statusCode() >= 200 && response.statusCode() < 300) {
delivery.setSuccess(true);
deliveryRepository.save(delivery);
if (webhook.getConsecutiveFailures() > 0) {
webhook.setConsecutiveFailures(0);
webhook.setLastFailureAt(null);
webhookRepository.save(webhook);
}
2026-05-07 19:39:42 +08:00
log.info("Webhook delivered appKey={} event={} url={} attempt={} status={}",
appKey, callbackEvent, webhook.getUrl(), attempt, response.statusCode());
return;
} else {
delivery.setSuccess(false);
delivery.setErrorMessage("HTTP " + response.statusCode());
deliveryRepository.save(delivery);
2026-05-07 19:39:42 +08:00
log.warn("Webhook returned non-2xx appKey={} event={} url={} attempt={} status={}",
appKey, callbackEvent, webhook.getUrl(), attempt, response.statusCode());
}
} catch (Exception e) {
delivery.setSuccess(false);
delivery.setErrorMessage(truncate(e.getMessage(), 4000));
deliveryRepository.save(delivery);
2026-05-07 19:39:42 +08:00
log.warn("Webhook delivery failed appKey={} event={} url={} attempt={}: {}",
appKey, callbackEvent, webhook.getUrl(), attempt, e.getMessage());
}
if (attempt < MAX_RETRIES) {
long delay = RETRY_DELAYS_MS[attempt - 1];
2026-05-07 19:39:42 +08:00
log.info("Webhook retry scheduled appKey={} event={} url={} delayMs={}",
appKey, callbackEvent, webhook.getUrl(), delay);
try {
Thread.sleep(delay);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
} else {
2026-05-07 19:39:42 +08:00
handleMaxRetriesExceeded(appKey, callbackEvent, webhook);
}
}
}
2026-05-07 19:39:42 +08:00
private void handleMaxRetriesExceeded(String appKey, String callbackEvent, WebhookConfigEntity webhook) {
int failures = webhook.getConsecutiveFailures() + 1;
webhook.setConsecutiveFailures(failures);
webhook.setLastFailureAt(LocalDateTime.now());
webhookRepository.save(webhook);
2026-05-07 19:39:42 +08:00
log.error("Webhook max retries exceeded appKey={} event={} url={} consecutiveFailures={}",
appKey, callbackEvent, webhook.getUrl(), failures);
if (failures >= ALERT_THRESHOLD && webhook.isEnabled()) {
webhook.setEnabled(false);
webhookRepository.save(webhook);
2026-05-07 19:39:42 +08:00
log.warn("Webhook auto-disabled after {} consecutive failures appKey={} url={}",
ALERT_THRESHOLD, appKey, webhook.getUrl());
WebhookAlertEntity alert = new WebhookAlertEntity();
alert.setId(UUID.randomUUID().toString());
2026-05-07 19:39:42 +08:00
alert.setAppId(appKey);
alert.setWebhookId(webhook.getId());
alert.setWebhookUrl(webhook.getUrl());
alert.setAlertType("AUTO_DISABLED");
alert.setDescription("Webhook 在连续 " + ALERT_THRESHOLD + " 次投递失败后已自动禁用。事件:" + callbackEvent);
alert.setAcknowledged(false);
alert.setCreatedAt(LocalDateTime.now());
alertRepository.save(alert);
}
}
2026-05-07 19:39:42 +08:00
private String signWebhook(String appKey, String appSecret, long requestTime, String nonce, String body) {
String payload = appKey + "\n" + requestTime + "\n" + nonce + "\n" + sha256Hex(body);
return hmacSha256Hex(appSecret, payload);
}
private String sha256Hex(String value) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
return HexFormat.of().formatHex(digest.digest(value.getBytes(StandardCharsets.UTF_8)));
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException("Failed to hash webhook body", e);
}
}
private String hmacSha256Hex(String secret, String payload) {
try {
Mac mac = Mac.getInstance("HmacSHA256");
mac.init(new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), "HmacSHA256"));
return HexFormat.of().formatHex(mac.doFinal(payload.getBytes(StandardCharsets.UTF_8)));
} catch (Exception e) {
throw new IllegalStateException("Failed to sign webhook body", e);
}
}
private String truncate(String value, int maxLength) {
if (value == null) return null;
return value.length() > maxLength ? value.substring(0, maxLength) : value;
}
}