XuqmGroup-Server/im-service/src/main/java/com/xuqm/im/service/WebhookDispatchService.java

187 行
8.3 KiB
Java

package com.xuqm.im.service;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.xuqm.im.entity.WebhookConfigEntity;
import com.xuqm.im.entity.WebhookDeliveryEntity;
import com.xuqm.im.model.WebhookCallbackEnvelope;
import com.xuqm.im.repository.WebhookConfigRepository;
import com.xuqm.im.repository.WebhookDeliveryRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.net.URI;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpClient;
import java.time.Duration;
import java.time.LocalDateTime;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HexFormat;
import java.util.List;
import java.util.UUID;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
@Component
public class WebhookDispatchService {
private static final Logger log = LoggerFactory.getLogger(WebhookDispatchService.class);
private static final int MAX_RETRIES = 3;
private static final long[] RETRY_DELAYS_MS = {1000L, 5000L, 15000L};
private final WebhookConfigRepository webhookRepository;
private final WebhookDeliveryRepository deliveryRepository;
private final ImAppSecretClient appSecretClient;
private final ObjectMapper objectMapper;
@Value("${im.webhook-timeout-ms:3000}")
private int webhookTimeoutMs;
public WebhookDispatchService(WebhookConfigRepository webhookRepository,
WebhookDeliveryRepository deliveryRepository,
ImAppSecretClient appSecretClient,
ObjectMapper objectMapper) {
this.webhookRepository = webhookRepository;
this.deliveryRepository = deliveryRepository;
this.appSecretClient = appSecretClient;
this.objectMapper = objectMapper;
}
@Async
public void dispatch(String appId, String callbackType, String callbackEvent, Object payload) {
List<WebhookConfigEntity> webhooks = webhookRepository.findByAppIdAndEnabledTrue(appId);
if (webhooks.isEmpty()) {
return;
}
try {
String appSecret = appSecretClient.getAppSecret(appId);
long requestTime = System.currentTimeMillis();
String nonce = UUID.randomUUID().toString().replace("-", "");
String callbackId = UUID.randomUUID().toString();
WebhookCallbackEnvelope envelope = new WebhookCallbackEnvelope(
callbackId,
callbackType,
callbackEvent,
requestTime,
objectMapper.valueToTree(payload),
null,
appId
);
String body = objectMapper.writeValueAsString(envelope);
String signature = signWebhook(appId, appSecret, requestTime, nonce, body);
for (WebhookConfigEntity webhook : webhooks) {
deliverWithRetry(appId, callbackId, callbackEvent, webhook, body, signature, requestTime, nonce);
}
} catch (Exception e) {
log.warn("Webhook dispatch prepare failed appId={} event={}: {}", appId, callbackEvent, e.getMessage());
}
}
private void deliverWithRetry(String appId, String callbackId, String callbackEvent,
WebhookConfigEntity webhook, String body, String signature,
long requestTime, String nonce) {
HttpClient client = HttpClient.newBuilder()
.connectTimeout(Duration.ofMillis(webhookTimeoutMs))
.build();
for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) {
WebhookDeliveryEntity delivery = new WebhookDeliveryEntity();
delivery.setId(UUID.randomUUID().toString());
delivery.setAppId(appId);
delivery.setCallbackId(callbackId);
delivery.setCallbackEvent(callbackEvent);
delivery.setUrl(webhook.getUrl());
delivery.setAttempt(attempt);
delivery.setCreatedAt(LocalDateTime.now());
try {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(webhook.getUrl()))
.timeout(Duration.ofMillis(webhookTimeoutMs))
.header("Content-Type", "application/json")
.header("X-App-Id", appId)
.header("X-App-Timestamp", String.valueOf(requestTime))
.header("X-App-Nonce", nonce)
.header("X-App-Signature", signature)
.POST(HttpRequest.BodyPublishers.ofString(body))
.build();
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
delivery.setHttpStatus(response.statusCode());
delivery.setResponseBody(truncate(response.body(), 4000));
if (response.statusCode() >= 200 && response.statusCode() < 300) {
delivery.setSuccess(true);
deliveryRepository.save(delivery);
log.info("Webhook delivered appId={} event={} url={} attempt={} status={}",
appId, callbackEvent, webhook.getUrl(), attempt, response.statusCode());
return;
} else {
delivery.setSuccess(false);
delivery.setErrorMessage("HTTP " + response.statusCode());
deliveryRepository.save(delivery);
log.warn("Webhook returned non-2xx appId={} event={} url={} attempt={} status={}",
appId, callbackEvent, webhook.getUrl(), attempt, response.statusCode());
}
} catch (Exception e) {
delivery.setSuccess(false);
delivery.setErrorMessage(truncate(e.getMessage(), 4000));
deliveryRepository.save(delivery);
log.warn("Webhook delivery failed appId={} event={} url={} attempt={}: {}",
appId, callbackEvent, webhook.getUrl(), attempt, e.getMessage());
}
if (attempt < MAX_RETRIES) {
long delay = RETRY_DELAYS_MS[attempt - 1];
log.info("Webhook retry scheduled appId={} event={} url={} delayMs={}",
appId, callbackEvent, webhook.getUrl(), delay);
try {
Thread.sleep(delay);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
} else {
log.error("Webhook max retries exceeded appId={} event={} url={}",
appId, callbackEvent, webhook.getUrl());
}
}
}
private String signWebhook(String appId, String appSecret, long requestTime, String nonce, String body) {
String payload = appId + "\n" + requestTime + "\n" + nonce + "\n" + sha256Hex(body);
return hmacSha256Hex(appSecret, payload);
}
private String sha256Hex(String value) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
return HexFormat.of().formatHex(digest.digest(value.getBytes(StandardCharsets.UTF_8)));
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException("Failed to hash webhook body", e);
}
}
private String hmacSha256Hex(String secret, String payload) {
try {
Mac mac = Mac.getInstance("HmacSHA256");
mac.init(new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), "HmacSHA256"));
return HexFormat.of().formatHex(mac.doFinal(payload.getBytes(StandardCharsets.UTF_8)));
} catch (Exception e) {
throw new IllegalStateException("Failed to sign webhook body", e);
}
}
private String truncate(String value, int maxLength) {
if (value == null) return null;
return value.length() > maxLength ? value.substring(0, maxLength) : value;
}
}