lawless/server/internal/db/realm_store.go

556 行
20 KiB
Go

// Package db 封装境界/渡劫相关的数据访问。
package db
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
)
// Character 角色基础快照,用于境界逻辑校验。
type Character struct {
ID string
PlayerID string
Name string
WorldTier int32
RealmTier int32
MinorRealm int32
RealmStatus string
Level int32
Exp int64
Status string
SanCurrent int32
SanMax int32
CrimeScore int32
HeavenlyValue int32
KarmaValue int32
BaseStats map[string]interface{}
BattleStats map[string]interface{}
}
// CharacterRealm 角色在单个大境界上的进度。
type CharacterRealm struct {
ID string
CharacterID string
RealmTier int32
MaxMinorReached int32
ExpInTier int64
StatsSnapshot map[string]interface{}
IsCurrent bool
UpdatedAt time.Time
}
// RealmConfig 境界静态配置。
type RealmConfig struct {
Tier int32
MinorRealmMax int32
Name string
WorldTier int32
MainCurrencyCode string
IsTribulationRequired bool
BaseSuccessRate float64
AttrGrowthTemplate map[string]interface{}
}
// TribulationRecord 渡劫记录。
type TribulationRecord struct {
ID string
CharacterID string
RealmTier int32
MinorRealm int32
TribulationType string
BaseSuccessRate float64
ModifiedSuccessRate float64
HelperIDs []string
Result string
Penalties map[string]interface{}
DropsOnSuccess []map[string]interface{}
SanSnapshot int32
CreatedAt time.Time
}
// BreakthroughRecord 境界突破/破界记录。
type BreakthroughRecord struct {
ID string
CharacterID string
FromRealmTier int32
ToRealmTier int32
FromMinorRealm int32
ToMinorRealm int32
IsSuccess bool
IsBreakWorldBarrier bool
SourceWorldTier int32
TargetWorldTier int32
CreatedAt time.Time
}
// RealmStore 定义境界模块所需的数据访问接口,便于单元测试 mock。
type RealmStore interface {
GetCharacter(ctx context.Context, id string) (*Character, error)
GetCharacterRealm(ctx context.Context, characterID string) (*CharacterRealm, error)
GetRealmConfig(ctx context.Context, tier int32) (*RealmConfig, error)
EnsureCharacterRealm(ctx context.Context, characterID string, realmTier, maxMinor int32) (*CharacterRealm, error)
UpdateCharacterRealm(ctx context.Context, characterID string, realmTier, minorRealm int32, expInTier int64, realmStatus string) error
UpdateCharacterWorldAndRealm(ctx context.Context, characterID string, worldTier, realmTier, minorRealm int32, realmStatus string) error
UpdateCharacterBaseStats(ctx context.Context, characterID string, baseStats map[string]interface{}) error
InsertBreakthroughRecord(ctx context.Context, rec *BreakthroughRecord) error
GetLatestTribulation(ctx context.Context, characterID string) (*TribulationRecord, error)
InsertTribulationRecord(ctx context.Context, rec *TribulationRecord) (string, error)
UpdateTribulationRecord(ctx context.Context, id, result string, penalties map[string]interface{}, drops []map[string]interface{}) error
GetInventoryItem(ctx context.Context, characterID, inventoryID string) (itemID string, quantity int32, err error)
HasInventoryItem(ctx context.Context, characterID, inventoryID, itemID string, quantity int32) (bool, error)
ConsumeInventoryItem(ctx context.Context, characterID, inventoryID string, quantity int32) error
GetCurrencyBalance(ctx context.Context, characterID, currencyCode string) (float64, error)
GetCurrencyBalances(ctx context.Context, characterID string) (map[string]float64, error)
ConsumeCurrency(ctx context.Context, characterID, currencyCode string, amount float64) error
SetCurrencyBalance(ctx context.Context, characterID, currencyCode string, amount float64) error
CountCharacters(ctx context.Context, ids []string) (int, error)
GetCurrencyWorldTier(ctx context.Context, currencyCode string) (int32, error)
AuditCurrencyFlow(ctx context.Context, characterID, entityType, entityID, currencyCode, flowType, reasonCode string, amount, balanceAfter float64, worldTier int32, relatedID string) error
UpdateCharacterStatus(ctx context.Context, characterID, status string) error
WithinTx(ctx context.Context, fn func(tx pgx.Tx) error) error
}
// PgxRealmStore 基于 pgx Pool 的实现。
type PgxRealmStore struct{}
// NewPgxRealmStore 创建新的 RealmStore 实例,使用全局 db.Pool。
func NewPgxRealmStore() *PgxRealmStore {
return &PgxRealmStore{}
}
func (s *PgxRealmStore) pool() *pgxpool.Pool {
return Pool
}
func jsonMap(src interface{}) (map[string]interface{}, error) {
if src == nil {
return map[string]interface{}{}, nil
}
switch v := src.(type) {
case map[string]interface{}:
return v, nil
case []byte:
var m map[string]interface{}
if len(v) == 0 {
return m, nil
}
if err := json.Unmarshal(v, &m); err != nil {
return nil, err
}
return m, nil
case string:
var m map[string]interface{}
if v == "" || v == "{}" {
return m, nil
}
if err := json.Unmarshal([]byte(v), &m); err != nil {
return nil, err
}
return m, nil
default:
return nil, fmt.Errorf("unsupported json source type %T", src)
}
}
func jsonSlice(src interface{}) ([]map[string]interface{}, error) {
if src == nil {
return nil, nil
}
switch v := src.(type) {
case []map[string]interface{}:
return v, nil
case []byte:
if len(v) == 0 || string(v) == "[]" {
return nil, nil
}
var m []map[string]interface{}
if err := json.Unmarshal(v, &m); err != nil {
return nil, err
}
return m, nil
case string:
if v == "" || v == "[]" {
return nil, nil
}
var m []map[string]interface{}
if err := json.Unmarshal([]byte(v), &m); err != nil {
return nil, err
}
return m, nil
default:
return nil, fmt.Errorf("unsupported json slice source type %T", src)
}
}
// GetCharacter 按 ID 查询角色基础信息。
func (s *PgxRealmStore) GetCharacter(ctx context.Context, id string) (*Character, error) {
row := s.pool().QueryRow(ctx, `
SELECT id, player_id, name, world_tier, realm_tier, minor_realm, realm_status,
level, exp, status, san_current, san_max, crime_score, heavenly_value, karma_value,
base_stats, battle_stats
FROM characters
WHERE id = $1
`, id)
var c Character
var baseStats, battleStats interface{}
err := row.Scan(&c.ID, &c.PlayerID, &c.Name, &c.WorldTier, &c.RealmTier, &c.MinorRealm, &c.RealmStatus,
&c.Level, &c.Exp, &c.Status, &c.SanCurrent, &c.SanMax, &c.CrimeScore, &c.HeavenlyValue, &c.KarmaValue,
&baseStats, &battleStats)
if err != nil {
return nil, err
}
c.BaseStats, err = jsonMap(baseStats)
if err != nil {
return nil, err
}
c.BattleStats, err = jsonMap(battleStats)
if err != nil {
return nil, err
}
return &c, nil
}
// GetCharacterRealm 查询角色当前境界进度。
func (s *PgxRealmStore) GetCharacterRealm(ctx context.Context, characterID string) (*CharacterRealm, error) {
row := s.pool().QueryRow(ctx, `
SELECT id, character_id, realm_tier, max_minor_reached, exp_in_tier, stats_snapshot, is_current, updated_at
FROM character_realms
WHERE character_id = $1 AND is_current = true
`, characterID)
var r CharacterRealm
var snap interface{}
err := row.Scan(&r.ID, &r.CharacterID, &r.RealmTier, &r.MaxMinorReached, &r.ExpInTier, &snap, &r.IsCurrent, &r.UpdatedAt)
if err != nil {
return nil, err
}
r.StatsSnapshot, err = jsonMap(snap)
return &r, err
}
// GetRealmConfig 查询境界静态配置。
func (s *PgxRealmStore) GetRealmConfig(ctx context.Context, tier int32) (*RealmConfig, error) {
row := s.pool().QueryRow(ctx, `
SELECT tier, minor_realm_max, name, world_tier, main_currency_code, is_tribulation_required, base_success_rate, attr_growth_template
FROM realms
WHERE tier = $1
`, tier)
var cfg RealmConfig
var attr interface{}
err := row.Scan(&cfg.Tier, &cfg.MinorRealmMax, &cfg.Name, &cfg.WorldTier, &cfg.MainCurrencyCode,
&cfg.IsTribulationRequired, &cfg.BaseSuccessRate, &attr)
if err != nil {
return nil, err
}
cfg.AttrGrowthTemplate, err = jsonMap(attr)
return &cfg, err
}
// EnsureCharacterRealm 若角色在指定大境界无记录则创建,否则返回当前记录。
func (s *PgxRealmStore) EnsureCharacterRealm(ctx context.Context, characterID string, realmTier, maxMinor int32) (*CharacterRealm, error) {
existing, err := s.GetCharacterRealm(ctx, characterID)
if err == nil {
return existing, nil
}
if err != pgx.ErrNoRows {
return nil, err
}
_, err = s.pool().Exec(ctx, `
INSERT INTO character_realms (character_id, realm_tier, max_minor_reached, exp_in_tier, stats_snapshot, is_current)
VALUES ($1, $2, $3, 0, '{}', true)
`, characterID, realmTier, maxMinor)
if err != nil {
return nil, err
}
return s.GetCharacterRealm(ctx, characterID)
}
// UpdateCharacterRealm 同步更新角色当前境界、小境界、境界内修为与状态。
func (s *PgxRealmStore) UpdateCharacterRealm(ctx context.Context, characterID string, realmTier, minorRealm int32, expInTier int64, realmStatus string) error {
_, err := s.pool().Exec(ctx, `
UPDATE characters
SET realm_tier = $2, minor_realm = $3, realm_status = $4, updated_at = NOW()
WHERE id = $1
`, characterID, realmTier, minorRealm, realmStatus)
if err != nil {
return err
}
// 同时更新 character_realms 当前记录
_, err = s.pool().Exec(ctx, `
UPDATE character_realms
SET realm_tier = $2, max_minor_reached = GREATEST(max_minor_reached, $3), exp_in_tier = $4, updated_at = NOW()
WHERE character_id = $1 AND is_current = true
`, characterID, realmTier, minorRealm, expInTier)
return err
}
// UpdateCharacterWorldAndRealm 更新世界层级与境界(破界成功后使用)。
func (s *PgxRealmStore) UpdateCharacterWorldAndRealm(ctx context.Context, characterID string, worldTier, realmTier, minorRealm int32, realmStatus string) error {
_, err := s.pool().Exec(ctx, `
UPDATE characters
SET world_tier = $2, realm_tier = $3, minor_realm = $4, realm_status = $5, updated_at = NOW()
WHERE id = $1
`, characterID, worldTier, realmTier, minorRealm, realmStatus)
if err != nil {
return err
}
// 关闭旧境界记录,创建新境界记录
_, err = s.pool().Exec(ctx, `
UPDATE character_realms SET is_current = false, updated_at = NOW() WHERE character_id = $1
`, characterID)
if err != nil {
return err
}
_, err = s.pool().Exec(ctx, `
INSERT INTO character_realms (character_id, realm_tier, max_minor_reached, exp_in_tier, stats_snapshot, is_current)
VALUES ($1, $2, $3, 0, '{}', true)
`, characterID, realmTier, minorRealm)
return err
}
// InsertBreakthroughRecord 写入突破/破界记录。
func (s *PgxRealmStore) InsertBreakthroughRecord(ctx context.Context, rec *BreakthroughRecord) error {
_, err := s.pool().Exec(ctx, `
INSERT INTO realm_breakthrough_records
(character_id, from_realm_tier, to_realm_tier, from_minor_realm, to_minor_realm,
is_success, is_break_world_barrier, source_world_tier, target_world_tier, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, NOW())
RETURNING id
`, rec.CharacterID, rec.FromRealmTier, rec.ToRealmTier, rec.FromMinorRealm, rec.ToMinorRealm,
rec.IsSuccess, rec.IsBreakWorldBarrier, rec.SourceWorldTier, rec.TargetWorldTier)
return err
}
// GetLatestTribulation 查询角色最近一次渡劫记录。
func (s *PgxRealmStore) GetLatestTribulation(ctx context.Context, characterID string) (*TribulationRecord, error) {
row := s.pool().QueryRow(ctx, `
SELECT id, character_id, realm_tier, minor_realm, tribulation_type, base_success_rate,
modified_success_rate, helper_ids, result, penalties, drops_on_success, san_snapshot, created_at
FROM tribulation_records
WHERE character_id = $1
ORDER BY created_at DESC
LIMIT 1
`, characterID)
var r TribulationRecord
var helperIDs pgtype.Array[string]
var penalties, drops interface{}
err := row.Scan(&r.ID, &r.CharacterID, &r.RealmTier, &r.MinorRealm, &r.TribulationType, &r.BaseSuccessRate,
&r.ModifiedSuccessRate, &helperIDs, &r.Result, &penalties, &drops, &r.SanSnapshot, &r.CreatedAt)
if err != nil {
return nil, err
}
r.HelperIDs = helperIDs.Elements
r.Penalties, err = jsonMap(penalties)
if err != nil {
return nil, err
}
r.DropsOnSuccess, err = jsonSlice(drops)
return &r, err
}
// InsertTribulationRecord 创建渡劫记录并返回 ID。
func (s *PgxRealmStore) InsertTribulationRecord(ctx context.Context, rec *TribulationRecord) (string, error) {
var id string
penaltiesJSON, _ := json.Marshal(rec.Penalties)
dropsJSON, _ := json.Marshal(rec.DropsOnSuccess)
var helperIDs pgtype.Array[string]
helperIDs.Elements = rec.HelperIDs
err := s.pool().QueryRow(ctx, `
INSERT INTO tribulation_records
(character_id, realm_tier, minor_realm, tribulation_type, base_success_rate, modified_success_rate,
helper_ids, result, penalties, drops_on_success, san_snapshot, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW())
RETURNING id
`, rec.CharacterID, rec.RealmTier, rec.MinorRealm, rec.TribulationType, rec.BaseSuccessRate, rec.ModifiedSuccessRate,
helperIDs, rec.Result, penaltiesJSON, dropsJSON, rec.SanSnapshot).Scan(&id)
return id, err
}
// UpdateTribulationRecord 更新渡劫记录的结果、惩罚与掉落。
func (s *PgxRealmStore) UpdateTribulationRecord(ctx context.Context, id, result string, penalties map[string]interface{}, drops []map[string]interface{}) error {
penaltiesJSON, _ := json.Marshal(penalties)
dropsJSON, _ := json.Marshal(drops)
_, err := s.pool().Exec(ctx, `
UPDATE tribulation_records
SET result = $2, penalties = $3, drops_on_success = $4
WHERE id = $1
`, id, result, penaltiesJSON, dropsJSON)
return err
}
// HasInventoryItem 校验角色是否持有指定数量物品。
func (s *PgxRealmStore) HasInventoryItem(ctx context.Context, characterID, inventoryID, itemID string, quantity int32) (bool, error) {
var n int
err := s.pool().QueryRow(ctx, `
SELECT 1 FROM inventories
WHERE id = $1 AND character_id = $2 AND item_id = $3 AND quantity >= $4
`, inventoryID, characterID, itemID, quantity).Scan(&n)
if err == pgx.ErrNoRows {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
}
// ConsumeInventoryItem 消耗背包中指定数量物品。
func (s *PgxRealmStore) ConsumeInventoryItem(ctx context.Context, characterID, inventoryID string, quantity int32) error {
ct, err := s.pool().Exec(ctx, `
UPDATE inventories
SET quantity = quantity - $3
WHERE id = $1 AND character_id = $2 AND quantity >= $3
`, inventoryID, characterID, quantity)
if err != nil {
return err
}
if ct.RowsAffected() == 0 {
return fmt.Errorf("inventory item not found or insufficient quantity")
}
return nil
}
// GetCurrencyBalance 查询角色指定货币余额。
func (s *PgxRealmStore) GetCurrencyBalance(ctx context.Context, characterID, currencyCode string) (float64, error) {
var amount float64
err := s.pool().QueryRow(ctx, `
SELECT amount FROM currency_balances
WHERE character_id = $1 AND currency_code = $2
`, characterID, currencyCode).Scan(&amount)
if err == pgx.ErrNoRows {
return 0, nil
}
return amount, err
}
// ConsumeCurrency 消耗角色指定货币,余额不足返回错误。
func (s *PgxRealmStore) ConsumeCurrency(ctx context.Context, characterID, currencyCode string, amount float64) error {
ct, err := s.pool().Exec(ctx, `
UPDATE currency_balances
SET amount = amount - $3, total_spent = total_spent + $3, updated_at = NOW()
WHERE character_id = $1 AND currency_code = $2 AND amount >= $3
`, characterID, currencyCode, amount)
if err != nil {
return err
}
if ct.RowsAffected() == 0 {
return fmt.Errorf("insufficient currency %s", currencyCode)
}
return nil
}
// AuditCurrencyFlow 写入经济审计日志。
func (s *PgxRealmStore) AuditCurrencyFlow(ctx context.Context, characterID, entityType, entityID, currencyCode, flowType, reasonCode string, amount, balanceAfter float64, worldTier int32, relatedID string) error {
_, err := s.pool().Exec(ctx, `
INSERT INTO economy_audit_logs
(character_id, entity_type, entity_id, currency_code, flow_type, reason_code, amount, balance_after, related_id, world_tier, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW())
`, characterID, entityType, entityID, currencyCode, flowType, reasonCode, amount, balanceAfter, relatedID, worldTier)
return err
}
// UpdateCharacterBaseStats 更新角色 base_stats JSONB。
func (s *PgxRealmStore) UpdateCharacterBaseStats(ctx context.Context, characterID string, baseStats map[string]interface{}) error {
data, err := json.Marshal(baseStats)
if err != nil {
return err
}
_, err = s.pool().Exec(ctx, `
UPDATE characters SET base_stats = $2, updated_at = NOW() WHERE id = $1
`, characterID, data)
return err
}
// GetInventoryItem 按 inventory_id 查询物品模板 ID 与数量。
func (s *PgxRealmStore) GetInventoryItem(ctx context.Context, characterID, inventoryID string) (string, int32, error) {
var itemID string
var quantity int32
err := s.pool().QueryRow(ctx, `
SELECT item_id, quantity FROM inventories
WHERE id = $1 AND character_id = $2
`, inventoryID, characterID).Scan(&itemID, &quantity)
return itemID, quantity, err
}
// GetCurrencyBalances 查询角色全部货币余额。
func (s *PgxRealmStore) GetCurrencyBalances(ctx context.Context, characterID string) (map[string]float64, error) {
rows, err := s.pool().Query(ctx, `
SELECT currency_code, amount FROM currency_balances WHERE character_id = $1
`, characterID)
if err != nil {
return nil, err
}
defer rows.Close()
balances := make(map[string]float64)
for rows.Next() {
var code string
var amount float64
if err := rows.Scan(&code, &amount); err != nil {
return nil, err
}
balances[code] = amount
}
return balances, rows.Err()
}
// SetCurrencyBalance 设置角色指定货币余额(用于跨层携带限制扣除)。
func (s *PgxRealmStore) SetCurrencyBalance(ctx context.Context, characterID, currencyCode string, amount float64) error {
_, err := s.pool().Exec(ctx, `
INSERT INTO currency_balances (character_id, currency_code, amount, total_earned, total_spent, updated_at)
VALUES ($1, $2, $3, 0, 0, NOW())
ON CONFLICT (character_id, currency_code)
DO UPDATE SET amount = $3, updated_at = NOW()
`, characterID, currencyCode, amount)
return err
}
// CountCharacters 查询给定 ID 列表中有效角色数量。
func (s *PgxRealmStore) CountCharacters(ctx context.Context, ids []string) (int, error) {
if len(ids) == 0 {
return 0, nil
}
var count int
err := s.pool().QueryRow(ctx, `
SELECT COUNT(*) FROM characters WHERE id = ANY($1) AND status = 'active'
`, ids).Scan(&count)
return count, err
}
// UpdateCharacterStatus 更新角色状态。
func (s *PgxRealmStore) UpdateCharacterStatus(ctx context.Context, characterID, status string) error {
_, err := s.pool().Exec(ctx, `
UPDATE characters SET status = $2, updated_at = NOW() WHERE id = $1
`, characterID, status)
return err
}
// GetCurrencyWorldTier 查询货币适用世界层级。
func (s *PgxRealmStore) GetCurrencyWorldTier(ctx context.Context, currencyCode string) (int32, error) {
var tier int32
err := s.pool().QueryRow(ctx, `
SELECT world_tier FROM currencies WHERE code = $1
`, currencyCode).Scan(&tier)
if err == pgx.ErrNoRows {
return 0, fmt.Errorf("currency not found: %s", currencyCode)
}
return tier, err
}
// WithinTx 在事务内执行回调,便于复杂操作原子性。
func (s *PgxRealmStore) WithinTx(ctx context.Context, fn func(tx pgx.Tx) error) error {
tx, err := s.pool().Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
if err := fn(tx); err != nil {
return err
}
return tx.Commit(ctx)
}