// Package db 封装境界/渡劫相关的数据访问。 package db import ( "context" "encoding/json" "fmt" "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" ) // Character 角色基础快照,用于境界逻辑校验。 type Character struct { ID string PlayerID string Name string WorldTier int32 RealmTier int32 MinorRealm int32 RealmStatus string Level int32 Exp int64 Status string SanCurrent int32 SanMax int32 CrimeScore int32 HeavenlyValue int32 KarmaValue int32 BaseStats map[string]interface{} BattleStats map[string]interface{} } // CharacterRealm 角色在单个大境界上的进度。 type CharacterRealm struct { ID string CharacterID string RealmTier int32 MaxMinorReached int32 ExpInTier int64 StatsSnapshot map[string]interface{} IsCurrent bool UpdatedAt time.Time } // RealmConfig 境界静态配置。 type RealmConfig struct { Tier int32 MinorRealmMax int32 Name string WorldTier int32 MainCurrencyCode string IsTribulationRequired bool BaseSuccessRate float64 AttrGrowthTemplate map[string]interface{} } // TribulationRecord 渡劫记录。 type TribulationRecord struct { ID string CharacterID string RealmTier int32 MinorRealm int32 TribulationType string BaseSuccessRate float64 ModifiedSuccessRate float64 HelperIDs []string Result string Penalties map[string]interface{} DropsOnSuccess []map[string]interface{} SanSnapshot int32 CreatedAt time.Time } // BreakthroughRecord 境界突破/破界记录。 type BreakthroughRecord struct { ID string CharacterID string FromRealmTier int32 ToRealmTier int32 FromMinorRealm int32 ToMinorRealm int32 IsSuccess bool IsBreakWorldBarrier bool SourceWorldTier int32 TargetWorldTier int32 CreatedAt time.Time } // RealmStore 定义境界模块所需的数据访问接口,便于单元测试 mock。 type RealmStore interface { GetCharacter(ctx context.Context, id string) (*Character, error) GetCharacterRealm(ctx context.Context, characterID string) (*CharacterRealm, error) GetRealmConfig(ctx context.Context, tier int32) (*RealmConfig, error) EnsureCharacterRealm(ctx context.Context, characterID string, realmTier, maxMinor int32) (*CharacterRealm, error) UpdateCharacterRealm(ctx context.Context, characterID string, realmTier, minorRealm int32, expInTier int64, realmStatus string) error UpdateCharacterWorldAndRealm(ctx context.Context, characterID string, worldTier, realmTier, minorRealm int32, realmStatus string) error UpdateCharacterBaseStats(ctx context.Context, characterID string, baseStats map[string]interface{}) error InsertBreakthroughRecord(ctx context.Context, rec *BreakthroughRecord) error GetLatestTribulation(ctx context.Context, characterID string) (*TribulationRecord, error) InsertTribulationRecord(ctx context.Context, rec *TribulationRecord) (string, error) UpdateTribulationRecord(ctx context.Context, id, result string, penalties map[string]interface{}, drops []map[string]interface{}) error GetInventoryItem(ctx context.Context, characterID, inventoryID string) (itemID string, quantity int32, err error) HasInventoryItem(ctx context.Context, characterID, inventoryID, itemID string, quantity int32) (bool, error) ConsumeInventoryItem(ctx context.Context, characterID, inventoryID string, quantity int32) error GetCurrencyBalance(ctx context.Context, characterID, currencyCode string) (float64, error) GetCurrencyBalances(ctx context.Context, characterID string) (map[string]float64, error) ConsumeCurrency(ctx context.Context, characterID, currencyCode string, amount float64) error SetCurrencyBalance(ctx context.Context, characterID, currencyCode string, amount float64) error CountCharacters(ctx context.Context, ids []string) (int, error) GetCurrencyWorldTier(ctx context.Context, currencyCode string) (int32, error) AuditCurrencyFlow(ctx context.Context, characterID, entityType, entityID, currencyCode, flowType, reasonCode string, amount, balanceAfter float64, worldTier int32, relatedID string) error UpdateCharacterStatus(ctx context.Context, characterID, status string) error WithinTx(ctx context.Context, fn func(tx pgx.Tx) error) error } // PgxRealmStore 基于 pgx Pool 的实现。 type PgxRealmStore struct{} // NewPgxRealmStore 创建新的 RealmStore 实例,使用全局 db.Pool。 func NewPgxRealmStore() *PgxRealmStore { return &PgxRealmStore{} } func (s *PgxRealmStore) pool() *pgxpool.Pool { return Pool } func jsonMap(src interface{}) (map[string]interface{}, error) { if src == nil { return map[string]interface{}{}, nil } switch v := src.(type) { case map[string]interface{}: return v, nil case []byte: var m map[string]interface{} if len(v) == 0 { return m, nil } if err := json.Unmarshal(v, &m); err != nil { return nil, err } return m, nil case string: var m map[string]interface{} if v == "" || v == "{}" { return m, nil } if err := json.Unmarshal([]byte(v), &m); err != nil { return nil, err } return m, nil default: return nil, fmt.Errorf("unsupported json source type %T", src) } } func jsonSlice(src interface{}) ([]map[string]interface{}, error) { if src == nil { return nil, nil } switch v := src.(type) { case []map[string]interface{}: return v, nil case []byte: if len(v) == 0 || string(v) == "[]" { return nil, nil } var m []map[string]interface{} if err := json.Unmarshal(v, &m); err != nil { return nil, err } return m, nil case string: if v == "" || v == "[]" { return nil, nil } var m []map[string]interface{} if err := json.Unmarshal([]byte(v), &m); err != nil { return nil, err } return m, nil default: return nil, fmt.Errorf("unsupported json slice source type %T", src) } } // GetCharacter 按 ID 查询角色基础信息。 func (s *PgxRealmStore) GetCharacter(ctx context.Context, id string) (*Character, error) { row := s.pool().QueryRow(ctx, ` SELECT id, player_id, name, world_tier, realm_tier, minor_realm, realm_status, level, exp, status, san_current, san_max, crime_score, heavenly_value, karma_value, base_stats, battle_stats FROM characters WHERE id = $1 `, id) var c Character var baseStats, battleStats interface{} err := row.Scan(&c.ID, &c.PlayerID, &c.Name, &c.WorldTier, &c.RealmTier, &c.MinorRealm, &c.RealmStatus, &c.Level, &c.Exp, &c.Status, &c.SanCurrent, &c.SanMax, &c.CrimeScore, &c.HeavenlyValue, &c.KarmaValue, &baseStats, &battleStats) if err != nil { return nil, err } c.BaseStats, err = jsonMap(baseStats) if err != nil { return nil, err } c.BattleStats, err = jsonMap(battleStats) if err != nil { return nil, err } return &c, nil } // GetCharacterRealm 查询角色当前境界进度。 func (s *PgxRealmStore) GetCharacterRealm(ctx context.Context, characterID string) (*CharacterRealm, error) { row := s.pool().QueryRow(ctx, ` SELECT id, character_id, realm_tier, max_minor_reached, exp_in_tier, stats_snapshot, is_current, updated_at FROM character_realms WHERE character_id = $1 AND is_current = true `, characterID) var r CharacterRealm var snap interface{} err := row.Scan(&r.ID, &r.CharacterID, &r.RealmTier, &r.MaxMinorReached, &r.ExpInTier, &snap, &r.IsCurrent, &r.UpdatedAt) if err != nil { return nil, err } r.StatsSnapshot, err = jsonMap(snap) return &r, err } // GetRealmConfig 查询境界静态配置。 func (s *PgxRealmStore) GetRealmConfig(ctx context.Context, tier int32) (*RealmConfig, error) { row := s.pool().QueryRow(ctx, ` SELECT tier, minor_realm_max, name, world_tier, main_currency_code, is_tribulation_required, base_success_rate, attr_growth_template FROM realms WHERE tier = $1 `, tier) var cfg RealmConfig var attr interface{} err := row.Scan(&cfg.Tier, &cfg.MinorRealmMax, &cfg.Name, &cfg.WorldTier, &cfg.MainCurrencyCode, &cfg.IsTribulationRequired, &cfg.BaseSuccessRate, &attr) if err != nil { return nil, err } cfg.AttrGrowthTemplate, err = jsonMap(attr) return &cfg, err } // EnsureCharacterRealm 若角色在指定大境界无记录则创建,否则返回当前记录。 func (s *PgxRealmStore) EnsureCharacterRealm(ctx context.Context, characterID string, realmTier, maxMinor int32) (*CharacterRealm, error) { existing, err := s.GetCharacterRealm(ctx, characterID) if err == nil { return existing, nil } if err != pgx.ErrNoRows { return nil, err } _, err = s.pool().Exec(ctx, ` INSERT INTO character_realms (character_id, realm_tier, max_minor_reached, exp_in_tier, stats_snapshot, is_current) VALUES ($1, $2, $3, 0, '{}', true) `, characterID, realmTier, maxMinor) if err != nil { return nil, err } return s.GetCharacterRealm(ctx, characterID) } // UpdateCharacterRealm 同步更新角色当前境界、小境界、境界内修为与状态。 func (s *PgxRealmStore) UpdateCharacterRealm(ctx context.Context, characterID string, realmTier, minorRealm int32, expInTier int64, realmStatus string) error { _, err := s.pool().Exec(ctx, ` UPDATE characters SET realm_tier = $2, minor_realm = $3, realm_status = $4, updated_at = NOW() WHERE id = $1 `, characterID, realmTier, minorRealm, realmStatus) if err != nil { return err } // 同时更新 character_realms 当前记录 _, err = s.pool().Exec(ctx, ` UPDATE character_realms SET realm_tier = $2, max_minor_reached = GREATEST(max_minor_reached, $3), exp_in_tier = $4, updated_at = NOW() WHERE character_id = $1 AND is_current = true `, characterID, realmTier, minorRealm, expInTier) return err } // UpdateCharacterWorldAndRealm 更新世界层级与境界(破界成功后使用)。 func (s *PgxRealmStore) UpdateCharacterWorldAndRealm(ctx context.Context, characterID string, worldTier, realmTier, minorRealm int32, realmStatus string) error { _, err := s.pool().Exec(ctx, ` UPDATE characters SET world_tier = $2, realm_tier = $3, minor_realm = $4, realm_status = $5, updated_at = NOW() WHERE id = $1 `, characterID, worldTier, realmTier, minorRealm, realmStatus) if err != nil { return err } // 关闭旧境界记录,创建新境界记录 _, err = s.pool().Exec(ctx, ` UPDATE character_realms SET is_current = false, updated_at = NOW() WHERE character_id = $1 `, characterID) if err != nil { return err } _, err = s.pool().Exec(ctx, ` INSERT INTO character_realms (character_id, realm_tier, max_minor_reached, exp_in_tier, stats_snapshot, is_current) VALUES ($1, $2, $3, 0, '{}', true) `, characterID, realmTier, minorRealm) return err } // InsertBreakthroughRecord 写入突破/破界记录。 func (s *PgxRealmStore) InsertBreakthroughRecord(ctx context.Context, rec *BreakthroughRecord) error { _, err := s.pool().Exec(ctx, ` INSERT INTO realm_breakthrough_records (character_id, from_realm_tier, to_realm_tier, from_minor_realm, to_minor_realm, is_success, is_break_world_barrier, source_world_tier, target_world_tier, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, NOW()) RETURNING id `, rec.CharacterID, rec.FromRealmTier, rec.ToRealmTier, rec.FromMinorRealm, rec.ToMinorRealm, rec.IsSuccess, rec.IsBreakWorldBarrier, rec.SourceWorldTier, rec.TargetWorldTier) return err } // GetLatestTribulation 查询角色最近一次渡劫记录。 func (s *PgxRealmStore) GetLatestTribulation(ctx context.Context, characterID string) (*TribulationRecord, error) { row := s.pool().QueryRow(ctx, ` SELECT id, character_id, realm_tier, minor_realm, tribulation_type, base_success_rate, modified_success_rate, helper_ids, result, penalties, drops_on_success, san_snapshot, created_at FROM tribulation_records WHERE character_id = $1 ORDER BY created_at DESC LIMIT 1 `, characterID) var r TribulationRecord var helperIDs pgtype.Array[string] var penalties, drops interface{} err := row.Scan(&r.ID, &r.CharacterID, &r.RealmTier, &r.MinorRealm, &r.TribulationType, &r.BaseSuccessRate, &r.ModifiedSuccessRate, &helperIDs, &r.Result, &penalties, &drops, &r.SanSnapshot, &r.CreatedAt) if err != nil { return nil, err } r.HelperIDs = helperIDs.Elements r.Penalties, err = jsonMap(penalties) if err != nil { return nil, err } r.DropsOnSuccess, err = jsonSlice(drops) return &r, err } // InsertTribulationRecord 创建渡劫记录并返回 ID。 func (s *PgxRealmStore) InsertTribulationRecord(ctx context.Context, rec *TribulationRecord) (string, error) { var id string penaltiesJSON, _ := json.Marshal(rec.Penalties) dropsJSON, _ := json.Marshal(rec.DropsOnSuccess) var helperIDs pgtype.Array[string] helperIDs.Elements = rec.HelperIDs err := s.pool().QueryRow(ctx, ` INSERT INTO tribulation_records (character_id, realm_tier, minor_realm, tribulation_type, base_success_rate, modified_success_rate, helper_ids, result, penalties, drops_on_success, san_snapshot, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW()) RETURNING id `, rec.CharacterID, rec.RealmTier, rec.MinorRealm, rec.TribulationType, rec.BaseSuccessRate, rec.ModifiedSuccessRate, helperIDs, rec.Result, penaltiesJSON, dropsJSON, rec.SanSnapshot).Scan(&id) return id, err } // UpdateTribulationRecord 更新渡劫记录的结果、惩罚与掉落。 func (s *PgxRealmStore) UpdateTribulationRecord(ctx context.Context, id, result string, penalties map[string]interface{}, drops []map[string]interface{}) error { penaltiesJSON, _ := json.Marshal(penalties) dropsJSON, _ := json.Marshal(drops) _, err := s.pool().Exec(ctx, ` UPDATE tribulation_records SET result = $2, penalties = $3, drops_on_success = $4 WHERE id = $1 `, id, result, penaltiesJSON, dropsJSON) return err } // HasInventoryItem 校验角色是否持有指定数量物品。 func (s *PgxRealmStore) HasInventoryItem(ctx context.Context, characterID, inventoryID, itemID string, quantity int32) (bool, error) { var n int err := s.pool().QueryRow(ctx, ` SELECT 1 FROM inventories WHERE id = $1 AND character_id = $2 AND item_id = $3 AND quantity >= $4 `, inventoryID, characterID, itemID, quantity).Scan(&n) if err == pgx.ErrNoRows { return false, nil } if err != nil { return false, err } return true, nil } // ConsumeInventoryItem 消耗背包中指定数量物品。 func (s *PgxRealmStore) ConsumeInventoryItem(ctx context.Context, characterID, inventoryID string, quantity int32) error { ct, err := s.pool().Exec(ctx, ` UPDATE inventories SET quantity = quantity - $3 WHERE id = $1 AND character_id = $2 AND quantity >= $3 `, inventoryID, characterID, quantity) if err != nil { return err } if ct.RowsAffected() == 0 { return fmt.Errorf("inventory item not found or insufficient quantity") } return nil } // GetCurrencyBalance 查询角色指定货币余额。 func (s *PgxRealmStore) GetCurrencyBalance(ctx context.Context, characterID, currencyCode string) (float64, error) { var amount float64 err := s.pool().QueryRow(ctx, ` SELECT amount FROM currency_balances WHERE character_id = $1 AND currency_code = $2 `, characterID, currencyCode).Scan(&amount) if err == pgx.ErrNoRows { return 0, nil } return amount, err } // ConsumeCurrency 消耗角色指定货币,余额不足返回错误。 func (s *PgxRealmStore) ConsumeCurrency(ctx context.Context, characterID, currencyCode string, amount float64) error { ct, err := s.pool().Exec(ctx, ` UPDATE currency_balances SET amount = amount - $3, total_spent = total_spent + $3, updated_at = NOW() WHERE character_id = $1 AND currency_code = $2 AND amount >= $3 `, characterID, currencyCode, amount) if err != nil { return err } if ct.RowsAffected() == 0 { return fmt.Errorf("insufficient currency %s", currencyCode) } return nil } // AuditCurrencyFlow 写入经济审计日志。 func (s *PgxRealmStore) AuditCurrencyFlow(ctx context.Context, characterID, entityType, entityID, currencyCode, flowType, reasonCode string, amount, balanceAfter float64, worldTier int32, relatedID string) error { _, err := s.pool().Exec(ctx, ` INSERT INTO economy_audit_logs (character_id, entity_type, entity_id, currency_code, flow_type, reason_code, amount, balance_after, related_id, world_tier, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW()) `, characterID, entityType, entityID, currencyCode, flowType, reasonCode, amount, balanceAfter, relatedID, worldTier) return err } // UpdateCharacterBaseStats 更新角色 base_stats JSONB。 func (s *PgxRealmStore) UpdateCharacterBaseStats(ctx context.Context, characterID string, baseStats map[string]interface{}) error { data, err := json.Marshal(baseStats) if err != nil { return err } _, err = s.pool().Exec(ctx, ` UPDATE characters SET base_stats = $2, updated_at = NOW() WHERE id = $1 `, characterID, data) return err } // GetInventoryItem 按 inventory_id 查询物品模板 ID 与数量。 func (s *PgxRealmStore) GetInventoryItem(ctx context.Context, characterID, inventoryID string) (string, int32, error) { var itemID string var quantity int32 err := s.pool().QueryRow(ctx, ` SELECT item_id, quantity FROM inventories WHERE id = $1 AND character_id = $2 `, inventoryID, characterID).Scan(&itemID, &quantity) return itemID, quantity, err } // GetCurrencyBalances 查询角色全部货币余额。 func (s *PgxRealmStore) GetCurrencyBalances(ctx context.Context, characterID string) (map[string]float64, error) { rows, err := s.pool().Query(ctx, ` SELECT currency_code, amount FROM currency_balances WHERE character_id = $1 `, characterID) if err != nil { return nil, err } defer rows.Close() balances := make(map[string]float64) for rows.Next() { var code string var amount float64 if err := rows.Scan(&code, &amount); err != nil { return nil, err } balances[code] = amount } return balances, rows.Err() } // SetCurrencyBalance 设置角色指定货币余额(用于跨层携带限制扣除)。 func (s *PgxRealmStore) SetCurrencyBalance(ctx context.Context, characterID, currencyCode string, amount float64) error { _, err := s.pool().Exec(ctx, ` INSERT INTO currency_balances (character_id, currency_code, amount, total_earned, total_spent, updated_at) VALUES ($1, $2, $3, 0, 0, NOW()) ON CONFLICT (character_id, currency_code) DO UPDATE SET amount = $3, updated_at = NOW() `, characterID, currencyCode, amount) return err } // CountCharacters 查询给定 ID 列表中有效角色数量。 func (s *PgxRealmStore) CountCharacters(ctx context.Context, ids []string) (int, error) { if len(ids) == 0 { return 0, nil } var count int err := s.pool().QueryRow(ctx, ` SELECT COUNT(*) FROM characters WHERE id = ANY($1) AND status = 'active' `, ids).Scan(&count) return count, err } // UpdateCharacterStatus 更新角色状态。 func (s *PgxRealmStore) UpdateCharacterStatus(ctx context.Context, characterID, status string) error { _, err := s.pool().Exec(ctx, ` UPDATE characters SET status = $2, updated_at = NOW() WHERE id = $1 `, characterID, status) return err } // GetCurrencyWorldTier 查询货币适用世界层级。 func (s *PgxRealmStore) GetCurrencyWorldTier(ctx context.Context, currencyCode string) (int32, error) { var tier int32 err := s.pool().QueryRow(ctx, ` SELECT world_tier FROM currencies WHERE code = $1 `, currencyCode).Scan(&tier) if err == pgx.ErrNoRows { return 0, fmt.Errorf("currency not found: %s", currencyCode) } return tier, err } // WithinTx 在事务内执行回调,便于复杂操作原子性。 func (s *PgxRealmStore) WithinTx(ctx context.Context, fn func(tx pgx.Tx) error) error { tx, err := s.pool().Begin(ctx) if err != nil { return err } defer tx.Rollback(ctx) if err := fn(tx); err != nil { return err } return tx.Commit(ctx) }