package repository import ( "context" "electricity_bill_calc/cache" "electricity_bill_calc/config" "electricity_bill_calc/global" "electricity_bill_calc/logger" "electricity_bill_calc/model" "electricity_bill_calc/tools" "electricity_bill_calc/tools/serial" "electricity_bill_calc/types" "electricity_bill_calc/vo" "fmt" "strings" "github.com/doug-martin/goqu/v9" _ "github.com/doug-martin/goqu/v9/dialect/postgres" "github.com/georgysavva/scany/v2/pgxscan" "github.com/jackc/pgx/v5" "github.com/shopspring/decimal" "go.uber.org/zap" ) type _MeterRepository struct { log *zap.Logger ds goqu.DialectWrapper } var MeterRepository = _MeterRepository{ log: logger.Named("Repository", "Meter"), ds: goqu.Dialect("postgres"), } // 获取指定园区中所有的表计信息 func (mr _MeterRepository) AllMeters(pid string) ([]*model.MeterDetail, error) { mr.log.Info("列出指定园区中的所有表计", zap.String("park id", pid)) cacheConditions := []string{pid} if meters, err := cache.RetrieveSearch[[]*model.MeterDetail]("all_meters_in", cacheConditions...); err == nil { mr.log.Info("从缓存中获取到了指定园区中的表计信息", zap.Int("count", len(*meters))) return *meters, nil } ctx, cancel := global.TimeoutContext() defer cancel() var meters []*model.MeterDetail metersSql, metersArgs, _ := mr.ds. From(goqu.T("meter_04kv").As("m")). LeftJoin(goqu.T("park_building").As("b"), goqu.On(goqu.I("m.building").Eq(goqu.I("b.id")))). Select( "m.*", goqu.I("b.name").As("building_name"), ). Where( goqu.I("m.park_id").Eq(pid), goqu.I("m.detachedAt").IsNull(), ). Order(goqu.I("m.seq").Asc()). Prepared(true).ToSQL() if err := pgxscan.Select(ctx, global.DB, &meters, metersSql, metersArgs...); err != nil { mr.log.Error("查询表计信息失败", zap.Error(err)) return make([]*model.MeterDetail, 0), err } cache.CacheSearch(meters, []string{fmt.Sprintf("meter:%s", pid)}, "all_meters_in", cacheConditions...) return meters, nil } // 列出指定园区下的所有表计信息,包含已经拆除的表计 func (mr _MeterRepository) AllUsedMeters(pid string) ([]*model.MeterDetail, error) { mr.log.Info("列出指定园区中的所有使用过的表计", zap.String("park id", pid)) cacheConditions := []string{pid} if meters, err := cache.RetrieveSearch[[]*model.MeterDetail]("all_used_meters_in", cacheConditions...); err == nil { mr.log.Info("从缓存中获取到了指定园区中的表计信息", zap.Int("count", len(*meters))) return *meters, nil } ctx, cancel := global.TimeoutContext() defer cancel() var meters []*model.MeterDetail metersSql, metersArgs, _ := mr.ds. From(goqu.T("meter_04kv").As("m")). LeftJoin(goqu.T("park_building").As("b"), goqu.On(goqu.I("m.building").Eq(goqu.I("b.id")))). Select( "m.*", goqu.I("b.name").As("building_name"), ). Where( goqu.I("m.park_id").Eq(pid), ). Order(goqu.I("m.seq").Asc()). Prepared(true).ToSQL() if err := pgxscan.Select(ctx, global.DB, &meters, metersSql, metersArgs...); err != nil { mr.log.Error("查询表计信息失败", zap.Error(err)) return make([]*model.MeterDetail, 0), err } cache.CacheSearch(meters, []string{fmt.Sprintf("meter:%s", pid)}, "all_used_meters_in", cacheConditions...) return meters, nil } // 列出指定核算报表中所使用的所有表计,包含已经拆除的表计 func (mr _MeterRepository) AllUsedMetersInReport(rid string) ([]*model.MeterDetail, error) { mr.log.Info("列出指定核算报表中所使用的所有表计", zap.String("report id", rid)) ctx, cancel := global.TimeoutContext() defer cancel() var meters []*model.MeterDetail metersSql, metersArgs, _ := mr.ds. From(goqu.T("meter_04kv").As("m")). LeftJoin(goqu.T("park_building").As("b"), goqu.On(goqu.I("m.building").Eq(goqu.I("b.id")))). Join(goqu.T("report").As("r"), goqu.On(goqu.I("m.park_id").Eq(goqu.I("r.park_id")))). Where( goqu.I("r.id").Eq(rid), goqu.I("m.enabled").Eq(true), goqu.L("m.attached_at::date < upper(r.period)"), goqu.Or( goqu.I("m.detached_at").IsNull(), goqu.L("m.detached_at::date >= lower(r.period)"), ), ). Select( "m.*", goqu.I("b.name").As("building_name"), ). Order(goqu.I("m.seq").Asc()). Prepared(true).ToSQL() if err := pgxscan.Select(ctx, global.DB, &meters, metersSql, metersArgs...); err != nil { mr.log.Error("查询表计信息失败", zap.Error(err)) return make([]*model.MeterDetail, 0), err } return meters, nil } // 分页列出指定园区下的表计信息 func (mr _MeterRepository) MetersIn(pid string, page uint, keyword *string) ([]*model.MeterDetail, int64, error) { mr.log.Info("分页列出指定园区下的表计信息", zap.String("park id", pid), zap.Uint("page", page), zap.String("keyword", tools.DefaultTo(keyword, ""))) cacheConditions := []string{ pid, tools.DefaultOrEmptyStr(keyword, "UNDEF"), fmt.Sprintf("%d", page), } if meters, total, err := cache.RetrievePagedSearch[[]*model.MeterDetail]("meter", cacheConditions...); err == nil { mr.log.Info("从缓存中获取到了指定园区中的表计信息", zap.Int("count", len(*meters)), zap.Int64("total", total)) return *meters, total, nil } ctx, cancel := global.TimeoutContext() defer cancel() meterQuery := mr.ds. From(goqu.T("meter_04kv").As("m")). LeftJoin(goqu.T("park_building").As("b"), goqu.On(goqu.I("m.building").Eq(goqu.I("b.id")))). Select( "m.*", goqu.I("b.name").As("building_name"), ). Where( goqu.I("m.park_id").Eq(pid), goqu.I("m.detachedAt").IsNull(), ) countQuery := mr.ds. From(goqu.T("meter_04kv").As("m")). Select(goqu.COUNT("*")). Where( goqu.I("m.park_id").Eq(pid), goqu.I("m.detachedAt").IsNull(), ) if keyword != nil && len(*keyword) > 0 { pattern := fmt.Sprintf("%%%s%%", *keyword) meterQuery = meterQuery.Where( goqu.Or( goqu.I("m.code").ILike(pattern), goqu.I("m.address").ILike(pattern), ), ) countQuery = countQuery.Where( goqu.Or( goqu.I("m.code").ILike(pattern), goqu.I("m.address").ILike(pattern), ), ) } startRow := (page - 1) * config.ServiceSettings.ItemsPageSize meterQuery = meterQuery.Order(goqu.I("m.seq").Asc()).Offset(startRow).Limit(config.ServiceSettings.ItemsPageSize) meterSql, meterArgs, _ := meterQuery.Prepared(true).ToSQL() countSql, countArgs, _ := countQuery.Prepared(true).ToSQL() var ( meters []*model.MeterDetail total int64 ) if err := pgxscan.Select(ctx, global.DB, &meters, meterSql, meterArgs...); err != nil { mr.log.Error("查询表计信息失败", zap.Error(err)) return make([]*model.MeterDetail, 0), 0, err } if err := pgxscan.Get(ctx, global.DB, &total, countSql, countArgs...); err != nil { mr.log.Error("查询表计数量失败", zap.Error(err)) return make([]*model.MeterDetail, 0), 0, err } cache.CachePagedSearch(meters, total, []string{fmt.Sprintf("meter:%s", pid)}, "meter", cacheConditions...) return meters, total, nil } // 列出指定园区中指定列表中所有表计的详细信息,将忽略所有表计的当前状态 func (mr _MeterRepository) ListMetersByIDs(pid string, ids []string) ([]*model.MeterDetail, error) { mr.log.Info("列出指定园区中指定列表中所有表计的详细信息", zap.String("park id", pid), zap.Strings("meter ids", ids)) cacheConditions := []string{ pid, strings.Join(ids, ","), } if meters, err := cache.RetrieveSearch[[]*model.MeterDetail]("meter_slice", cacheConditions...); err == nil && meters != nil { mr.log.Info("从缓存中获取到了指定园区中所需的表计信息", zap.Int("count", len(*meters))) return *meters, nil } ctx, cancel := global.TimeoutContext() defer cancel() var meters []*model.MeterDetail metersSql, metersArgs, _ := mr.ds. From(goqu.T("meter_04kv").As("m")). LeftJoin(goqu.T("park_building").As("b"), goqu.On(goqu.I("m.building").Eq(goqu.I("b.id")))). Select( "m.*", goqu.I("b.name").As("building_name"), ). Where( goqu.I("m.park_id").Eq(pid), goqu.I("m.code").In(ids), ). Order(goqu.I("m.seq").Asc()). Prepared(true).ToSQL() if err := pgxscan.Select(ctx, global.DB, &meters, metersSql, metersArgs...); err != nil { mr.log.Error("查询表计信息失败", zap.Error(err)) return make([]*model.MeterDetail, 0), err } cache.CacheSearch(meters, []string{fmt.Sprintf("meter:%s", pid), fmt.Sprintf("meter_slice:%s", pid)}, "meter_slice", cacheConditions...) return meters, nil } // 获取指定表计的详细信息 func (mr _MeterRepository) FetchMeterDetail(pid, code string) (*model.MeterDetail, error) { mr.log.Info("获取指定表计的详细信息", zap.String("park id", pid), zap.String("meter code", code)) cacheConditions := fmt.Sprintf("%s:%s", pid, code) if meter, err := cache.RetrieveEntity[*model.MeterDetail]("meter", cacheConditions); err == nil { mr.log.Info("从缓存中获取到了指定表计的详细信息", zap.String("code", (**meter).Code)) return *meter, nil } ctx, cancel := global.TimeoutContext() defer cancel() var meter model.MeterDetail meterSql, meterArgs, _ := mr.ds. From(goqu.T("meter_04kv").As("m")). LeftJoin(goqu.T("park_building").As("b"), goqu.On(goqu.I("m.building").Eq(goqu.I("b.id")))). Select( "m.*", goqu.I("b.name").As("building_name"), ). Where( goqu.I("m.park_id").Eq(pid), goqu.I("m.code").Eq(code), ). Prepared(true).ToSQL() if err := pgxscan.Get(ctx, global.DB, &meter, meterSql, meterArgs...); err != nil { mr.log.Error("查询表计信息失败", zap.Error(err)) return nil, err } cache.CacheEntity(meter, []string{fmt.Sprintf("meter:%s", pid), "park"}, "meter", cacheConditions) return &meter, nil } // 创建一条新的表计信息 func (mr _MeterRepository) CreateMeter(tx pgx.Tx, ctx context.Context, pid string, meter vo.MeterCreationForm) (bool, error) { mr.log.Info("创建一条新的表计信息", zap.String("park id", pid), zap.String("meter code", meter.Code)) timeNow := types.Now() meterSql, meterArgs, _ := mr.ds. Insert(goqu.T("meter_04kv")). Cols( "park_id", "code", "address", "ratio", "seq", "meter_type", "building", "on_floor", "area", "enabled", "attached_at", "created_at", "last_modified_at", ). Vals( goqu.Vals{pid, meter.Code, meter.Address, meter.Ratio, meter.Seq, meter.MeterType, meter.Building, meter.OnFloor, meter.Area, meter.Enabled, timeNow, timeNow, timeNow, }, ). Prepared(true).ToSQL() ok, err := tx.Exec(ctx, meterSql, meterArgs...) if err != nil { mr.log.Error("创建表计信息失败", zap.Error(err)) return false, err } if ok.RowsAffected() > 0 { cache.AbolishRelation(fmt.Sprintf("meter:%s", pid)) cache.AbolishRelation(fmt.Sprintf("meter_relations:%s:%s", pid, meter.Code)) } return ok.RowsAffected() > 0, nil } // 创建或者更新一条表计的信息 func (mr _MeterRepository) CreateOrUpdateMeter(tx pgx.Tx, ctx context.Context, pid string, meter vo.MeterCreationForm) (bool, error) { mr.log.Info("创建或者更新一条表计的信息", zap.String("park id", pid), zap.String("meter code", meter.Code)) timeNow := types.Now() meterSql, meterArgs, _ := mr.ds. Insert(goqu.T("meter_04kv")). Cols( "park_id", "code", "address", "ratio", "seq", "meter_type", "building", "on_floor", "area", "enabled", "attached_at", "created_at", "last_modified_at", ). Vals( goqu.Vals{pid, meter.Code, meter.Address, meter.Ratio, meter.Seq, meter.MeterType, meter.Building, meter.OnFloor, meter.Area, meter.Enabled, timeNow, timeNow, timeNow, }, ). OnConflict( goqu.DoUpdate("meter_04kv_pkey", goqu.Record{ "address": goqu.I("excluded.address"), "seq": goqu.I("excluded.seq"), "ratio": goqu.I("excluded.ratio"), "meter_type": goqu.I("excluded.meter_type"), "building": goqu.I("excluded.building"), "on_floor": goqu.I("excluded.on_floor"), "area": goqu.I("excluded.area"), "last_modified_at": goqu.I("excluded.last_modified_at"), }), ). Prepared(true).ToSQL() res, err := tx.Exec(ctx, meterSql, meterArgs...) if err != nil { mr.log.Error("创建或者更新表计信息失败", zap.Error(err)) return false, err } return res.RowsAffected() > 0, nil } // 记录一条表计的抄表信息 func (mr _MeterRepository) RecordReading(tx pgx.Tx, ctx context.Context, pid, code string, meterType int16, ratio decimal.Decimal, reading *vo.MeterReadingForm) (bool, error) { mr.log.Info("记录一条表计的抄表信息", zap.String("park id", pid), zap.String("meter code", code)) readAt := tools.DefaultTo(reading.ReadAt, types.Now()) readingSql, readingArgs, _ := mr.ds. Insert(goqu.T("meter_reading")). Cols( "park_id", "meter_id", "read_at", "meter_type", "ratio", "overall", "critical", "peak", "flat", "valley", ). Vals( goqu.Vals{pid, code, readAt, meterType, ratio, reading.Overall, reading.Critical, reading.Peak, reading.Flat, reading.Valley}, ). Prepared(true).ToSQL() ok, err := tx.Exec(ctx, readingSql, readingArgs...) if err != nil { mr.log.Error("记录表计抄表信息失败", zap.Error(err)) return false, err } if ok.RowsAffected() > 0 { cache.AbolishRelation(fmt.Sprintf("meter_reading:%s", pid)) } return ok.RowsAffected() > 0, nil } // 更新一条表计的详细信息 func (mr _MeterRepository) UpdateMeter(tx pgx.Tx, ctx context.Context, pid, code string, detail *vo.MeterModificationForm) (bool, error) { mr.log.Info("更新一条表计的详细信息", zap.String("park id", pid), zap.String("meter code", code)) timeNow := types.Now() meterSql, meterArgs, _ := mr.ds. Update(goqu.T("meter_04kv")). Set( goqu.Record{ "address": detail.Address, "seq": detail.Seq, "ratio": detail.Ratio, "enabled": detail.Enabled, "meter_type": detail.MeterType, "building": detail.Building, "on_floor": detail.OnFloor, "area": detail.Area, "last_modified_at": timeNow, }, ). Where( goqu.I("park_id").Eq(pid), goqu.I("code").Eq(code), ). Prepared(true).ToSQL() ok, err := tx.Exec(ctx, meterSql, meterArgs...) if err != nil { mr.log.Error("更新表计信息失败", zap.Error(err)) return false, err } if ok.RowsAffected() > 0 { cache.AbolishRelation(fmt.Sprintf("meter:%s", pid)) } return ok.RowsAffected() > 0, nil } // 列出指定园区中已经存在的表计编号,无论该表计是否已经不再使用。 func (mr _MeterRepository) ListMeterCodes(pid string) ([]string, error) { mr.log.Info("列出指定园区中已经存在的表计编号", zap.String("park id", pid)) cacheConditions := []string{pid} if codes, err := cache.RetrieveSearch[[]string]("meter_codes", cacheConditions...); err == nil { mr.log.Info("从缓存中获取到了指定园区中的表计编号", zap.Int("count", len(*codes))) return *codes, nil } ctx, cancel := global.TimeoutContext() defer cancel() var codes []string codesSql, codesArgs, _ := mr.ds. From(goqu.T("meter_04kv")). Select("code"). Where( goqu.I("park_id").Eq(pid), ). Order(goqu.I("seq").Asc()). Prepared(true).ToSQL() if err := pgxscan.Select(ctx, global.DB, &codes, codesSql, codesArgs...); err != nil { mr.log.Error("查询表计编号失败", zap.Error(err)) return make([]string, 0), err } cache.CacheSearch(codes, []string{fmt.Sprintf("meter:%s", pid), fmt.Sprintf("park:%s", pid)}, "meter_codes", cacheConditions...) return codes, nil } // 解除指定园区中指定表计的使用 func (mr _MeterRepository) DetachMeter(tx pgx.Tx, ctx context.Context, pid, code string) (bool, error) { mr.log.Info("解除指定园区中指定表计的使用", zap.String("park id", pid), zap.String("meter code", code)) timeNow := types.Now() meterSql, meterArgs, _ := mr.ds. Update(goqu.T("meter_04kv")). Set( goqu.Record{ "detached_at": timeNow, "last_modified_at": timeNow, }, ). Where( goqu.I("park_id").Eq(pid), goqu.I("code").Eq(code), ). Prepared(true).ToSQL() ok, err := tx.Exec(ctx, meterSql, meterArgs...) if err != nil { mr.log.Error("解除表计使用失败", zap.Error(err)) return false, err } if ok.RowsAffected() > 0 { cache.AbolishRelation(fmt.Sprintf("meter:%s", pid)) cache.AbolishRelation(fmt.Sprintf("meter_relations:%s:%s", pid, code)) } return ok.RowsAffected() > 0, nil } // 将商户表计绑定到公摊表计上 func (mr _MeterRepository) BindMeter(tx pgx.Tx, ctx context.Context, pid, masterMeter, slaveMeter string) (bool, error) { mr.log.Info("将商户表计绑定到公摊表计上", zap.String("master meter code", masterMeter), zap.String("slave meter code", slaveMeter)) masterDetail, err := mr.FetchMeterDetail(pid, masterMeter) if err != nil { mr.log.Error("查询公摊表计信息失败", zap.Error(err)) return false, err } if masterDetail.MeterType != model.METER_INSTALLATION_POOLING { mr.log.Error("给定的公摊表计不是公摊表计", zap.Error(err)) return false, fmt.Errorf("给定的公摊表计不是公摊表计") } slaveDetail, err := mr.FetchMeterDetail(pid, slaveMeter) if err != nil { mr.log.Error("查询商户表计信息失败", zap.Error(err)) return false, err } if slaveDetail.MeterType != model.METER_INSTALLATION_TENEMENT { mr.log.Error("给定的商户表计不是商户表计", zap.Error(err)) return false, fmt.Errorf("给定的商户表计不是商户表计") } timeNow := types.Now() serial.StringSerialRequestChan <- 1 code := serial.Prefix("PB", <-serial.StringSerialResponseChan) relationSql, relationArgs, _ := mr.ds. Insert(goqu.T("meter_relations")). Cols( "id", "park_id", "master_meter_id", "slave_meter_id", "established_at", ). Vals( goqu.Vals{ code, pid, masterMeter, slaveMeter, timeNow, }, ). Prepared(true).ToSQL() ok, err := tx.Exec(ctx, relationSql, relationArgs...) if err != nil { mr.log.Error("绑定表计关系失败", zap.Error(err)) return false, err } if ok.RowsAffected() > 0 { cache.AbolishRelation(fmt.Sprintf("meter_relations:%s:%s", pid, masterMeter)) } return ok.RowsAffected() > 0, nil } // 解除两个表计之间的关联 func (mr _MeterRepository) UnbindMeter(tx pgx.Tx, ctx context.Context, pid, masterMeter, slaveMeter string) (bool, error) { mr.log.Info("解除两个表计之间的关联", zap.String("master meter code", masterMeter), zap.String("slave meter code", slaveMeter)) relationSql, relationArgs, _ := mr.ds. Update(goqu.T("meter_relations")). Set( goqu.Record{ "revoked_at": types.Now(), }, ). Where( goqu.I("park_id").Eq(pid), goqu.I("master_meter_id").Eq(masterMeter), goqu.I("slave_meter_id").Eq(slaveMeter), goqu.I("revoke_at").IsNull(), ). Prepared(true).ToSQL() ok, err := tx.Exec(ctx, relationSql, relationArgs...) if err != nil { mr.log.Error("解除表计关系失败", zap.Error(err)) return false, err } if ok.RowsAffected() > 0 { cache.AbolishRelation(fmt.Sprintf("meter_relations:%s:%s", pid, masterMeter)) } return ok.RowsAffected() > 0, nil } // 列出指定公摊表计的所有关联表计关系 func (mr _MeterRepository) ListPooledMeterRelations(pid, code string) ([]*model.MeterRelation, error) { mr.log.Info("列出指定公摊表计的所有关联表计关系", zap.String("park id", pid), zap.String("meter code", code)) ctx, cancel := global.TimeoutContext() defer cancel() var relations []*model.MeterRelation relationsSql, relationsArgs, _ := mr.ds. From(goqu.T("meter_relations")). Select("*"). Where( goqu.I("r.park_id").Eq(pid), goqu.I("r.master_meter_id").Eq(code), goqu.I("r.revoke_at").IsNull(), ). Prepared(true).ToSQL() if err := pgxscan.Select(ctx, global.DB, &relations, relationsSql, relationsArgs...); err != nil { mr.log.Error("查询表计关系失败", zap.Error(err)) return make([]*model.MeterRelation, 0), err } return relations, nil } // 列出指定公摊表计列表所包含的全部关联表计关系 func (mr _MeterRepository) ListPooledMeterRelationsByCodes(pid string, codes []string) ([]*model.MeterRelation, error) { mr.log.Info("列出指定公摊表计列表所包含的全部关联表计关系", zap.String("park id", pid), zap.Strings("meter codes", codes)) cacheConditions := []string{ pid, strings.Join(codes, ","), } if relations, err := cache.RetrieveSearch[[]*model.MeterRelation]("meter_relations", cacheConditions...); err == nil { mr.log.Info("从缓存中获取到了所需的关联表计信息", zap.Int("count", len(*relations))) return *relations, nil } ctx, cancel := global.TimeoutContext() defer cancel() var relations []*model.MeterRelation relationsSql, relationsArgs, _ := mr.ds. From(goqu.T("meter_relations")). Select("*"). Where( goqu.I("r.park_id").Eq(pid), goqu.I("r.master_meter_id").Eq(goqu.Func("any", codes)), goqu.I("r.revoke_at").IsNull(), ). Prepared(true).ToSQL() if err := pgxscan.Select(ctx, global.DB, &relations, relationsSql, relationsArgs...); err != nil { mr.log.Error("查询表计关系失败", zap.Error(err)) return make([]*model.MeterRelation, 0), err } return relations, nil } // 列出指定商户表计、园区表计与公摊表计之间的关联关系 func (mr _MeterRepository) ListMeterRelations(pid, code string) ([]*model.MeterRelation, error) { mr.log.Info("列出指定商户表计、园区表计与公摊表计之间的关联关系", zap.String("park id", pid), zap.String("meter code", code)) ctx, cancel := global.TimeoutContext() defer cancel() var relations []*model.MeterRelation relationsSql, relationsArgs, _ := mr.ds. From(goqu.T("meter_relations")). Select("*"). Where( goqu.I("r.park_id").Eq(pid), goqu.I("r.slave_meter_id").Eq(code), goqu.I("r.revoke_at").IsNull(), ). Prepared(true).ToSQL() if err := pgxscan.Select(ctx, global.DB, &relations, relationsSql, relationsArgs...); err != nil { mr.log.Error("查询表计关系失败", zap.Error(err)) return make([]*model.MeterRelation, 0), err } return relations, nil } // 列出指定园区中的所有公摊表计 func (mr _MeterRepository) ListPoolingMeters(pid string, page uint, keyword *string) ([]*model.MeterDetail, int64, error) { mr.log.Info("列出指定园区中的所有公摊表计", zap.String("park id", pid), zap.Uint("page", page), zap.String("keyword", tools.DefaultTo(keyword, ""))) cacheConditions := []string{ pid, tools.DefaultOrEmptyStr(keyword, "UNDEF"), fmt.Sprintf("%d", page), } if meters, total, err := cache.RetrievePagedSearch[[]*model.MeterDetail]("pooling_meters", cacheConditions...); err == nil { mr.log.Info("从缓存中获取到了指定园区中的公摊表计信息", zap.Int("count", len(*meters)), zap.Int64("total", total)) return *meters, total, nil } ctx, cancel := global.TimeoutContext() defer cancel() meterQuery := mr.ds. From(goqu.T("meter_04kv").As("m")). LeftJoin(goqu.T("park_building").As("b"), goqu.On(goqu.I("m.building").Eq(goqu.I("b.id")))). Select( "m.*", goqu.I("b.name").As("building_name"), ). Where( goqu.I("m.park_id").Eq(pid), goqu.I("m.enabled").IsTrue(), goqu.I("m.meter_type").Eq(model.METER_INSTALLATION_POOLING), ) countQuery := mr.ds. From(goqu.T("meter_04kv").As("m")). Select(goqu.COUNT("*")). Where( goqu.I("m.park_id").Eq(pid), goqu.I("m.enabled").IsTrue(), goqu.I("m.meter_type").Eq(model.METER_INSTALLATION_POOLING), ) if keyword != nil && len(*keyword) > 0 { pattern := fmt.Sprintf("%%%s%%", *keyword) meterQuery = meterQuery.Where( goqu.Or( goqu.I("m.code").ILike(pattern), goqu.I("m.address").ILike(pattern), ), ) countQuery = countQuery.Where( goqu.Or( goqu.I("m.code").ILike(pattern), goqu.I("m.address").ILike(pattern), ), ) } startRow := (page - 1) * config.ServiceSettings.ItemsPageSize meterQuery = meterQuery.Order(goqu.I("m.code").Asc()).Offset(startRow).Limit(config.ServiceSettings.ItemsPageSize) meterSql, meterArgs, _ := meterQuery.Prepared(true).ToSQL() countSql, countArgs, _ := countQuery.Prepared(true).ToSQL() var ( meters []*model.MeterDetail total int64 ) if err := pgxscan.Select(ctx, global.DB, &meters, meterSql, meterArgs...); err != nil { mr.log.Error("查询公摊表计信息失败", zap.Error(err)) return make([]*model.MeterDetail, 0), 0, err } if err := pgxscan.Get(ctx, global.DB, &total, countSql, countArgs...); err != nil { mr.log.Error("查询公摊表计数量失败", zap.Error(err)) return make([]*model.MeterDetail, 0), 0, err } cache.CachePagedSearch(meters, total, []string{fmt.Sprintf("meter:%s", pid), "park"}, "pooling_meters", cacheConditions...) return meters, total, nil } // 列出目前尚未绑定到公摊表计的商户表计 func (mr _MeterRepository) ListUnboundMeters(uid string, pid *string, keyword *string, limit *uint) ([]*model.MeterDetail, error) { mr.log.Info("列出目前尚未绑定到公摊表计的商户表计", zap.Stringp("park id", pid), zap.String("user id", uid), zap.String("keyword", tools.DefaultTo(keyword, "")), zap.Uint("limit", tools.DefaultTo(limit, 0))) cacheConditions := []string{ tools.DefaultTo(pid, "UNDEF"), tools.DefaultOrEmptyStr(keyword, "UNDEF"), tools.DefaultStrTo("%d", limit, "0"), } if meters, err := cache.RetrieveSearch[[]*model.MeterDetail]("unbound_pooling_meters", cacheConditions...); err == nil { mr.log.Info("从缓存中获取到了指定园区中的商户表计信息", zap.Int("count", len(*meters))) return *meters, nil } ctx, cancel := global.TimeoutContext() defer cancel() meterQuery := mr.ds. From(goqu.T("meter_04kv").As("m")). LeftJoin(goqu.T("park_building").As("b"), goqu.On(goqu.I("m.building").Eq(goqu.I("b.id")))). Select( "m.*", goqu.I("b.name").As("building_name"), ). Where( goqu.I("m.meter_type").Eq(model.METER_INSTALLATION_TENEMENT), goqu.I("m.enabled").IsTrue(), ) if pid != nil && len(*pid) > 0 { meterQuery = meterQuery.Where( goqu.I("m.park_id").Eq(*pid), ) } if keyword != nil && len(*keyword) > 0 { pattern := fmt.Sprintf("%%%s%%", *keyword) meterQuery = meterQuery.Where( goqu.Or( goqu.I("m.code").ILike(pattern), goqu.I("m.address").ILike(pattern), ), ) } slaveMeterQuery := mr.ds. From("meter_relations"). Select("id") if pid != nil && len(*pid) > 0 { slaveMeterQuery = slaveMeterQuery.Where( goqu.I("park_id").Eq(*pid), ) } else { slaveMeterQuery = slaveMeterQuery.Where( goqu.I("park_id").In( mr.ds. From("park"). Select("id"). Where(goqu.I("user_id").Eq(uid)), )) } slaveMeterQuery = slaveMeterQuery.Where( goqu.I("revoke_at").IsNull(), ) meterQuery = meterQuery.Where( goqu.I("m.code").NotIn(slaveMeterQuery), ). Order(goqu.I("m.attached_at").Asc()) if limit != nil && *limit > 0 { meterQuery = meterQuery.Limit(*limit) } meterSql, meterArgs, _ := meterQuery.Prepared(true).ToSQL() var meters []*model.MeterDetail if err := pgxscan.Select(ctx, global.DB, &meters, meterSql, meterArgs...); err != nil { mr.log.Error("查询商户表计信息失败", zap.Error(err)) return make([]*model.MeterDetail, 0), err } cache.CacheSearch(meters, []string{fmt.Sprintf("meter:%s", tools.DefaultTo(pid, "ALL")), "park"}, "unbound_pooling_meters", cacheConditions...) return meters, nil } // 列出目前未绑定到商户的商户表计 func (mr _MeterRepository) ListUnboundTenementMeters(uid string, pid *string, keyword *string, limit *uint) ([]*model.MeterDetail, error) { mr.log.Info("列出目前未绑定到商户的商户表计", zap.Stringp("park id", pid), zap.String("user id", uid), zap.String("keyword", tools.DefaultTo(keyword, "")), zap.Uint("limit", tools.DefaultTo(limit, 0))) cacheConditions := []string{ tools.DefaultTo(pid, "UNDEF"), tools.DefaultOrEmptyStr(keyword, "UNDEF"), tools.DefaultStrTo("%d", limit, "0"), } if meters, err := cache.RetrieveSearch[[]*model.MeterDetail]("unbound_tenement_meters", cacheConditions...); err == nil { mr.log.Info("从缓存中获取到了指定园区中的商户表计信息", zap.Int("count", len(*meters))) return *meters, nil } ctx, cancel := global.TimeoutContext() defer cancel() meterQuery := mr.ds. From(goqu.T("meter_04kv").As("m")). LeftJoin(goqu.T("park_building").As("b"), goqu.On(goqu.I("m.building").Eq(goqu.I("b.id")))). Select( "m.*", goqu.I("b.name").As("building_name"), ). Where( goqu.I("m.meter_type").Eq(model.METER_INSTALLATION_TENEMENT), goqu.I("m.enabled").IsTrue(), ) if pid != nil && len(*pid) > 0 { meterQuery = meterQuery.Where( goqu.I("m.park_id").Eq(*pid), ) } if keyword != nil && len(*keyword) > 0 { pattern := fmt.Sprintf("%%%s%%", *keyword) meterQuery = meterQuery.Where( goqu.Or( goqu.I("m.code").ILike(pattern), goqu.I("m.address").ILike(pattern), ), ) } subMeterQuery := mr.ds. From("tenement_meter"). Select("meter_id") if pid != nil && len(*pid) > 0 { subMeterQuery = subMeterQuery.Where( goqu.I("park_id").Eq(*pid), ) } else { subMeterQuery = subMeterQuery.Where( goqu.I("park_id").In( mr.ds. From("park"). Select("id"). Where(goqu.I("user_id").Eq(uid)), )) } subMeterQuery = subMeterQuery.Where( goqu.I("disassociated_at").IsNull(), ) meterQuery = meterQuery.Where( goqu.I("m.code").NotIn(subMeterQuery), ). Order(goqu.I("m.attached_at").Asc()) if limit != nil && *limit > 0 { meterQuery = meterQuery.Limit(*limit) } meterSql, meterArgs, _ := meterQuery.Prepared(true).ToSQL() var meters []*model.MeterDetail if err := pgxscan.Select(ctx, global.DB, &meters, meterSql, meterArgs...); err != nil { mr.log.Error("查询商户表计信息失败", zap.Error(err)) return make([]*model.MeterDetail, 0), err } cache.CacheSearch(meters, []string{fmt.Sprintf("meter:%s", tools.DefaultTo(pid, "ALL")), "park"}, "unbound_tenement_meters", cacheConditions...) return meters, nil } // 查询指定园区中的符合条件的抄表记录 func (mr _MeterRepository) ListMeterReadings(pid string, keyword *string, page uint, start, end *types.Date, buidling *string) ([]*model.MeterReading, int64, error) { mr.log.Info("查询指定园区中的符合条件的抄表记录", zap.String("park id", pid), zap.String("keyword", tools.DefaultTo(keyword, "")), zap.Uint("page", page), logger.DateFieldp("start", start), logger.DateFieldp("end", end), zap.String("building", tools.DefaultTo(buidling, ""))) cacheConditions := []string{ pid, cache.NullableStringKey(keyword), fmt.Sprintf("%d", page), cache.NullableConditionKey(start), cache.NullableConditionKey(end), cache.NullableStringKey(buidling), } if readings, total, err := cache.RetrievePagedSearch[[]*model.MeterReading]("meter_reading", cacheConditions...); err == nil && readings != nil && total != -1 { mr.log.Info("从缓存中获取到了指定园区中的抄表记录", zap.Int("count", len(*readings)), zap.Int64("total", total)) return *readings, total, nil } ctx, cancel := global.TimeoutContext() defer cancel() readingQuery := mr.ds. From(goqu.T("meter_reading").As("r")). LeftJoin(goqu.T("meter_04kv").As("m"), goqu.On(goqu.I("r.meter_id").Eq(goqu.I("m.code")))). Select("r.*"). Where( goqu.I("r.park_id").Eq(pid), ) countQuery := mr.ds. From(goqu.T("meter_reading").As("r")). LeftJoin(goqu.T("meter_04kv").As("m"), goqu.On(goqu.I("r.meter_id").Eq(goqu.I("m.code")))). Select(goqu.COUNT("*")). Where( goqu.I("r.park_id").Eq(pid), ) if keyword != nil && len(*keyword) > 0 { pattern := fmt.Sprintf("%%%s%%", *keyword) readingQuery = readingQuery.Where( goqu.Or( goqu.I("m.code").ILike(pattern), goqu.I("m.address").ILike(pattern), ), ) countQuery = countQuery.Where( goqu.Or( goqu.I("m.code").ILike(pattern), goqu.I("m.address").ILike(pattern), ), ) } if start != nil { readingQuery = readingQuery.Where( goqu.I("r.read_at").Gte(start.ToBeginningOfDate()), ) countQuery = countQuery.Where( goqu.I("r.read_at").Gte(start.ToBeginningOfDate()), ) } if end != nil { readingQuery = readingQuery.Where( goqu.I("r.read_at").Lte(end.ToEndingOfDate()), ) countQuery = countQuery.Where( goqu.I("r.read_at").Lte(end.ToEndingOfDate()), ) } if buidling != nil && len(*buidling) > 0 { readingQuery = readingQuery.Where( goqu.I("m.building").Eq(*buidling), ) countQuery = countQuery.Where( goqu.I("m.building").Eq(*buidling), ) } startRow := (page - 1) * config.ServiceSettings.ItemsPageSize readingQuery = readingQuery.Order(goqu.I("r.read_at").Desc()).Offset(startRow).Limit(config.ServiceSettings.ItemsPageSize) readingSql, readingArgs, _ := readingQuery.Prepared(true).ToSQL() countSql, countArgs, _ := countQuery.Prepared(true).ToSQL() var ( readings []*model.MeterReading total int64 ) if err := pgxscan.Select(ctx, global.DB, &readings, readingSql, readingArgs...); err != nil { mr.log.Error("查询抄表记录失败", zap.Error(err)) return make([]*model.MeterReading, 0), 0, err } if err := pgxscan.Get(ctx, global.DB, &total, countSql, countArgs...); err != nil { mr.log.Error("查询抄表记录数量失败", zap.Error(err)) return make([]*model.MeterReading, 0), 0, err } cache.CachePagedSearch(readings, total, []string{fmt.Sprintf("meter_reading:%s", pid), "park"}, "meter_reading", cacheConditions...) return readings, total, nil } // 修改指定表计的指定抄表记录 func (mr _MeterRepository) UpdateMeterReading(pid, mid string, readAt types.DateTime, reading *vo.MeterReadingForm) (bool, error) { mr.log.Info("修改指定表计的指定抄表记录", zap.String("park id", pid), zap.String("meter id", mid), logger.DateTimeField("read at", readAt), zap.Any("reading", reading)) ctx, cancel := global.TimeoutContext() defer cancel() updateSql, updateArgs, _ := mr.ds. Update(goqu.T("meter_reading")). Set( goqu.Record{ "overall": reading.Overall, "critical": reading.Critical, "peak": reading.Peak, "flat": reading.Flat, "valley": reading.Valley, }, ). Where( goqu.I("park_id").Eq(pid), goqu.I("meter_id").Eq(mid), goqu.I("read_at").Eq(readAt), ). Prepared(true).ToSQL() ok, err := global.DB.Exec(ctx, updateSql, updateArgs...) if err != nil { mr.log.Error("更新抄表记录失败", zap.Error(err)) return false, err } if ok.RowsAffected() > 0 { cache.AbolishRelation(fmt.Sprintf("meter_reading:%s", pid)) } return ok.RowsAffected() > 0, nil } // 列出指定园区中指定时间区域内的所有表计抄表记录 func (mr _MeterRepository) ListMeterReadingsByTimeRange(pid string, start, end types.Date) ([]*model.MeterReading, error) { mr.log.Info("列出指定园区中指定时间区域内的所有表计抄表记录", zap.String("park id", pid), zap.Time("start", start.Time), zap.Time("end", end.Time)) ctx, cancel := global.TimeoutContext() defer cancel() var readings []*model.MeterReading readingSql, readingArgs, _ := mr.ds. From(goqu.T("meter_reading").As("r")). Select("*"). Where( goqu.I("r.park_id").Eq(pid), goqu.I("r.read_at").Gte(start.ToBeginningOfDate()), goqu.I("r.read_at").Lte(end.ToEndingOfDate()), ). Order(goqu.I("r.read_at").Desc()). Prepared(true).ToSQL() if err := pgxscan.Select(ctx, global.DB, &readings, readingSql, readingArgs...); err != nil { mr.log.Error("查询抄表记录失败", zap.Error(err)) return make([]*model.MeterReading, 0), err } return readings, nil } // 列出指定园区中在指定日期之前的最后一次抄表记录 func (mr _MeterRepository) ListLastMeterReading(pid string, date types.Date) ([]*model.MeterReading, error) { mr.log.Info("列出指定园区中在指定日期之前的最后一次抄表记录", zap.String("park id", pid), zap.Time("date", date.Time)) ctx, cancel := global.TimeoutContext() defer cancel() var readings []*model.MeterReading readingSql, readingArgs, _ := mr.ds. From(goqu.T("meter_reading")). Select( goqu.MAX("read_at").As("read_at"), "park_id", "meter_id", "overall", "critical", "peak", "flat", "valley", ). Where( goqu.I("park_id").Eq(pid), goqu.I("read_at").Lt(date.ToEndingOfDate()), ). GroupBy("park_id", "meter_id", "overall", "critical", "peak", "flat", "valley"). Order(goqu.I("read_at").Desc()). Limit(1). Prepared(true).ToSQL() if err := pgxscan.Select(ctx, global.DB, &readings, readingSql, readingArgs...); err != nil { mr.log.Error("查询抄表记录失败", zap.Error(err)) return make([]*model.MeterReading, 0), err } return readings, nil } // 列出指定园区中的表计与商户的关联详细记录,用于写入Excel模板文件 func (mr _MeterRepository) ListMeterDocForTemplate(pid string) ([]*model.SimpleMeterDocument, error) { mr.log.Info("列出指定园区中的表计与商户的关联详细记录", zap.String("park id", pid)) cacheConditions := []string{pid} if docs, err := cache.RetrieveSearch[[]*model.SimpleMeterDocument]("simple_meter_doc", cacheConditions...); err == nil { mr.log.Info("从缓存中获取到了指定园区中的表计与商户的关联详细记录", zap.Int("count", len(*docs))) return *docs, nil } ctx, cancel := global.TimeoutContext() defer cancel() var docs []*model.SimpleMeterDocument docSql, docArgs, _ := mr.ds. From(goqu.T("meter_04kv").As("m")). LeftJoin( goqu.T("tenement_meter").As("tm"), goqu.On( goqu.I("m.code").Eq(goqu.I("tm.meter_id")), goqu.I("m.park_id").Eq(goqu.I("tm.park_id")), ), ). LeftJoin( goqu.T("tenement").As("t"), goqu.On( goqu.I("tm.tenement_id").Eq(goqu.I("t.id")), goqu.I("tm.park_id").Eq(goqu.I("t.park_id")), ), ). Select( "m.code", "m.address", "m.ratio", "m.seq", goqu.I("t.full_name").As("tenement_name"), ). Where( goqu.I("m.park_id").Eq(pid), goqu.I("m.enabled").IsTrue(), goqu.I("m.disassociated_at").IsNull(), ). Order(goqu.I("m.seq").Asc()). Prepared(true).ToSQL() if err := pgxscan.Select(ctx, global.DB, &docs, docSql, docArgs...); err != nil { mr.log.Error("查询表计与商户关联信息失败", zap.Error(err)) return make([]*model.SimpleMeterDocument, 0), err } cache.CacheSearch(docs, []string{fmt.Sprintf("park:%s", pid), fmt.Sprintf("meter:%s", pid), "park"}, "simple_meter_doc", cacheConditions...) return docs, nil }