package repository import ( "electricity_bill_calc/global" "electricity_bill_calc/logger" "electricity_bill_calc/model" "electricity_bill_calc/model/calculate" "electricity_bill_calc/types" "encoding/json" "errors" "fmt" "github.com/jackc/pgx/v5" "github.com/shopspring/decimal" "golang.org/x/sync/errgroup" "log" "strings" "time" "github.com/doug-martin/goqu/v9" _ "github.com/doug-martin/goqu/v9/dialect/postgres" "github.com/georgysavva/scany/v2/pgxscan" "go.uber.org/zap" ) type _CalculateRepository struct { log *zap.Logger ds goqu.DialectWrapper } var CalculateRepository = _CalculateRepository{ log: logger.Named("Repository", "Calculate"), ds: goqu.Dialect("postgres"), } // 更新当前报表的核算状态 func (cr _CalculateRepository) UpdateReportCalculateStatus(rid string, status string, message string) (bool, error) { ctx, cancel := global.TimeoutContext() defer cancel() var atio int var err error log.Println("11111111111", status) currentTime := time.Now() if status == "success" { atio = 0 } else { atio = 1 } updateResultSql, updateResultArgs, _ := cr.ds. Update(goqu.T("report_task")). Set(goqu.Record{ "status": int16(atio), "last_modified_at": currentTime, "message": message, }).Where(goqu.I("id").Eq(rid)). ToSQL() res, err := global.DB.Exec(ctx, updateResultSql, updateResultArgs...) if err != nil { cr.log.Error("未能更新当前报表的核算状态", zap.Error(err)) return false, err } if res.RowsAffected() == 0 { cr.log.Warn("未能保存当前报表的核算状态", zap.String("Report", rid)) return false, nil } return true, nil } // 获取当前正在等待计算的核算任务ID列表 func (cr _CalculateRepository) ListPendingTasks() ([]string, error) { cr.log.Info("获取当前正在等待计算的核算任务ID列表") ctx, cancel := global.TimeoutContext() defer cancel() var ids []string querySql, queryArgs, _ := cr.ds. From("report_task"). Select("id"). Where(goqu.C("status").Eq(model.REPORT_CALCULATE_TASK_STATUS_PENDING)). Prepared(true).ToSQL() if err := pgxscan.Select(ctx, global.DB, &ids, querySql, queryArgs...); err != nil { cr.log.Error("未能获取到当前正在等待计算的核算任务ID列表", zap.Error(err)) return nil, err } return ids, nil } // 更新指定报表的核算状态 func (cr _CalculateRepository) UpdateReportTaskStatus(rid string, status int16, message *string) (bool, error) { cr.log.Info("更新指定报表的核算状态", zap.String("Report", rid), zap.Int16("Status", status)) ctx, cancel := global.TimeoutContext() defer cancel() currentTime := types.Now() updateSql, updateArgs, _ := cr.ds. Update("report_task"). Set(goqu.Record{ "status": status, "last_modified_at": currentTime, "message": message, }). Where(goqu.C("id").Eq(rid)). Prepared(true).ToSQL() res, err := global.DB.Exec(ctx, updateSql, updateArgs...) if err != nil { cr.log.Error("未能更新指定报表的核算状态", zap.Error(err)) return false, err } if res.RowsAffected() == 0 { cr.log.Warn("未能保存指定报表的核算状态", zap.String("Report", rid)) return false, nil } return res.RowsAffected() > 0, nil } // 获取当前园区中所有公摊表计与商户表计之间的关联关系,包括已经解除的 func (cr _CalculateRepository) GetAllPoolingMeterRelations(pid string, revokedAfter time.Time) ([]model.MeterRelation, error) { cr.log.Info("获取当前园区中所有公摊表计与商户表计之间的关联关系,包括已经解除的", zap.String("pid", pid), zap.Time("revokedAfter", revokedAfter)) ctx, cancel := global.TimeoutContext() defer cancel() relationsSql, relationsArgs, _ := cr.ds. From(goqu.T("meter_relations")). Where(goqu.I("park_id").Eq(pid)). Where(goqu.Or( goqu.I("revoked_at").IsNull(), goqu.I("revoked_at").Gte(revokedAfter), )).ToSQL() var meterRelation []model.MeterRelation err := pgxscan.Select(ctx, global.DB, &meterRelation, relationsSql, relationsArgs...) if err != nil { cr.log.Error("获取当前园区中所有公摊表计与商户表计之间的关联关系,包括已经解除的出错", zap.Error(err)) return nil, err } return meterRelation, nil } // 获取当前园区中所有的商户与表计的关联关系,包括已经解除的 func (cr _CalculateRepository) GetAllTenementMeterRelations(pid string, associatedBefore time.Time, disassociatedAfter time.Time) ([]model.TenementMeter, error) { cr.log.Info("获取当前园区中所有的商户与表计的关联关系,包括已经解除的", zap.String("pid", pid), zap.Time("associatedBefore", associatedBefore), zap.Time("disassociatedAfter", disassociatedAfter)) ctx, cancel := global.TimeoutContext() defer cancel() relationsQuerySql, relationsQueryArgs, _ := cr.ds. From(goqu.T("tenement_meter")). Where(goqu.I("park_id").Eq(pid)). Where(goqu.And( goqu.I("associated_at").IsNull(), goqu.I("associated_at").Lte(associatedBefore), )). Where(goqu.And( goqu.I("associated_at").IsNull(), goqu.I("associated_at").Gte(disassociatedAfter), )).ToSQL() var tenementMeter []model.TenementMeter err := pgxscan.Select(ctx, global.DB, &tenementMeter, relationsQuerySql, relationsQueryArgs...) if err != nil { cr.log.Error("获取当前园区中所有的商户与表计的关联关系,包括已经解除的", zap.Error(err)) return nil, err } return tenementMeter, nil } // 获取指定报表中所有涉及到的指定类型表计在核算时间段内的所有读数数据 func (cr _CalculateRepository) GetMeterReadings(rid string, meterType int16) ([]model.MeterReading, error) { cr.log.Info("获取指定报表中所有涉及到的指定类型表计在核算时间段内的所有读数数据", zap.String("rid", rid), zap.Int16("meterType", meterType)) ctx, cancel := global.TimeoutContext() defer cancel() readingsQuerySql, readingsQueryArgs, _ := cr.ds. From(goqu.T("meter_reading").As(goqu.I("mr"))). Join( goqu.T("report").As("r"), goqu.On(goqu.I("r.park_id").Eq(goqu.I("mr.park_id"))), ). Where( goqu.I("r.id").Eq(rid), goqu.I("mr.meter_type").Eq(meterType), // TODO:2023.08.02 此方法出错优先查看是否这里出问题 goqu.L("mr.read_at < lower(r.period)"), ). Order(goqu.I("mr.read_at").Asc()).Select(goqu.I("mr.*")).ToSQL() var readings []model.MeterReading err := pgxscan.Select(ctx, global.DB, &readings, readingsQuerySql, readingsQueryArgs...) if err != nil { cr.log.Error("获取指定报表中所有涉及到的指定类型表计在核算时间段内的所有读数数据出错", zap.Error(err)) return nil, err } return readings, nil } // 获取指定报表中所有涉及到的表计在核算起始日期前的最后一次读数 func (cr _CalculateRepository) GetLastPeriodReadings(rid string, meterType int16) ([]model.MeterReading, error) { cr.log.Info("获取指定报表中所有涉及到的表计在核算起始日期前的最后一次读数", zap.String("rid", rid), zap.Int16("meterType", meterType)) ctx, cancel := global.TimeoutContext() defer cancel() readingsSql, readingsArgs, _ := cr.ds. From(goqu.T("meter_reading").As("mr")). Select( goqu.MAX("mr.read_at").As("read_at"), goqu.I("mr.park_id"), goqu.I("mr.meter_id"), goqu.I("mr.meter_type"), goqu.I("mr.ratio"), goqu.I("mr.overall"), goqu.I("mr.critical"), goqu.I("mr.peak"), goqu.I("mr.flat"), goqu.I("mr.valley"), ). Join( goqu.T("report").As("r"), goqu.On(goqu.I("r.park_id").Eq(goqu.I("mr.park_id"))), ). Where( goqu.I("r.id").Eq(rid), goqu.I("mr.meter_type").Eq(meterType), goqu.L(" read_at <= lower(r.period)"), ). GroupBy( goqu.I("mr.park_id"), goqu.I("mr.meter_id"), goqu.I("mr.meter_type"), goqu.I("mr.ratio"), goqu.I("mr.overall"), goqu.I("mr.critical"), goqu.I("mr.peak"), goqu.I("mr.flat"), goqu.I("mr.valley"), goqu.I("r.period"), ).ToSQL() var readings []model.MeterReading err := pgxscan.Select(ctx, global.DB, &readings, readingsSql, readingsArgs...) if err != nil { cr.log.Error("获取指定报表中所有涉及到的表计在核算起始日期前的最后一次读数出错", zap.Error(err)) return nil, err } fmt.Println(";;;;;;;;;;;;;;;;;;", readings) return readings, nil } // 取得指定报表所涉及的所有商户信息 func (cr _CalculateRepository) GetAllTenements(rid string) ([]model.Tenement, error) { cr.log.Info("取得指定报表所涉及的所有商户信息", zap.String("rid", rid)) ctx, cancel := global.TimeoutContext() defer cancel() tenementQuerySql, tenementQueryArgs, _ := cr.ds. From(goqu.T("tenement").As("t")). LeftJoin( goqu.T("park_building").As("b"), goqu.On(goqu.I("b.id").Eq(goqu.I("t.building"))), ). Join( goqu.T("report").As("r"), goqu.On(goqu.I("r.park_id").Eq(goqu.I("t.park_id"))), ). Select( goqu.I("t.*"), goqu.I("b.name").As("building_name"), ). Where( goqu.I("r.id").Eq(rid), goqu.L("t.moved_in_at <= upper(r.period)"), ).ToSQL() fmt.Println(tenementQuerySql) var tenements []model.Tenement err := pgxscan.Select(ctx, global.DB, &tenements, tenementQuerySql, tenementQueryArgs...) if err != nil { cr.log.Error("取得指定报表所涉及的所有商户信息出错", zap.Error(err)) return nil, err } return tenements, nil } func (cr _CalculateRepository) ClearReportContent(tx pgx.Tx, rid string) error { ctx, cancel := global.TimeoutContext() defer cancel() querysql, querarg, _ := cr.ds.Delete("report_summary"). Where(goqu.C("report_id").Eq(rid)).ToSQL() _, err := tx.Exec(ctx, querysql, querarg...) if err != nil { return err } querysql, querarg, _ = cr.ds.Delete("report_public_consumption"). Where(goqu.C("report_id").Eq(rid)).ToSQL() _, err = tx.Exec(ctx, querysql, querarg...) if err != nil { return err } querysql, querarg, _ = cr.ds.Delete("report_pooled_consumption"). Where(goqu.C("report_id").Eq(rid)).ToSQL() _, err = tx.Exec(ctx, querysql, querarg...) if err != nil { return err } querysql, querarg, _ = cr.ds.Delete("report_tenement"). Where(goqu.C("report_id").Eq(rid)).ToSQL() _, err = tx.Exec(ctx, querysql, querarg...) if err != nil { return err } return nil } func (cr _CalculateRepository) SaveReportPublics(tx pgx.Tx, rid string, meters []calculate.Meter) error { ctx, cancel := global.TimeoutContext() defer cancel() if len(meters) == 0 { // 如果没有公共表计则直接返回 return nil } // 准备插入表达式 insertExpr := cr.ds.Insert("report_public_consumption"). Cols( "report_id", "park_meter_id", "overall", "critical", "peak", "flat", "valley", "loss_adjust", "consumption_total", "loss_adjust_total", "final_total", ).Prepared(true) // 添加值到插入表达式中 for _, meter := range meters { insertExpr = insertExpr.Vals([]interface{}{ rid, meter.Code, meter.Overall.Fee, meter.Critical.Fee, meter.Peak.Fee, meter.Flat.Fee, meter.Valley.Fee, meter.AdjustLoss.Fee, meter.Overall.Fee, meter.AdjustLoss.Fee, meter.Overall.Fee.Add(meter.AdjustLoss.Fee), }) } // 执行插入语句 inserSql, insertArgs, err := insertExpr.Prepared(true).ToSQL() if err != nil { return err } if _, err := tx.Exec(ctx, inserSql, insertArgs); err != nil { return fmt.Errorf("保存报表核算概要失败: %w", err) } return nil } func (cr _CalculateRepository) SaveReportSummary(tx pgx.Tx, summary calculate.Summary) error { ctx, cancel := global.TimeoutContext() defer cancel() // 构建插入表达式 insertsql, insertArgs, _ := cr.ds.Insert("report_summary"). Cols( "report_id", "overall", "critical", "peak", "flat", "valley", "loss", "loss_fee", "basic_fee", "basic_pooled_price_consumption", "basic_pooled_price_area", "adjust_fee", "adjust_pooled_price_consumption", "adjust_pooled_price_area", "loss_diluted_price", "loss_proportion", "final_diluted_overall", "consumption_fee", "authorize_loss", "overall_area", "total_consumption", ). Vals(goqu.Vals{ summary.ReportId, summary.Overall, summary.Critical, summary.Peak, summary.Flat, summary.Valley, summary.Loss, summary.LossFee, summary.BasicFee, summary.BasicPooledPriceConsumption, summary.BasicPooledPriceArea, summary.AdjustFee, summary.AdjustPooledPriceConsumption, summary.AdjustPooledPriceArea, summary.LossDilutedPrice, summary.LossProportion, summary.FinalDilutedOverall, summary.ConsumptionFee, summary.AuthoizeLoss, summary.OverallArea, summary.TotalConsumption, }).Prepared(true).ToSQL() // 执行插入语句 if _, err := tx.Exec(ctx, insertsql, insertArgs...); err != nil { cr.log.Error("保存报表核算概要失败。") return err } return nil } type NestedMeter struct { Overall model.ConsumptionUnit Critical model.ConsumptionUnit Peak model.ConsumptionUnit Flat model.ConsumptionUnit Valley model.ConsumptionUnit CoveredArea decimal.Decimal // Add other fields here as needed } func (cr _CalculateRepository) SaveReportPoolings(tx pgx.Tx, rid string, meters []calculate.Meter, relations []model.MeterRelation, tenements []calculate.Meter) error { ctx, cancel := global.TimeoutContext() defer cancel() if len(meters) == 0 { return nil } relationsSlaves := make(map[string]bool) for _, r := range relations { relationsSlaves[r.SlaveMeter] = true } tenementCodes := make(map[string]bool) for _, t := range tenements { tenementCodes[t.Code] = true } for _, r := range relations { if _, ok := tenementCodes[r.SlaveMeter]; !ok { return errors.New("unknown tenement meter in active meter relations") } } var insertQueries []goqu.InsertDataset for _, meter := range meters { submeters := make([]NestedMeter, 0) for _, r := range relations { if r.MasterMeter == meter.Code { for _, t := range tenements { if t.Code == r.SlaveMeter { submeters = append(submeters, NestedMeter{ Overall: t.Overall, Critical: t.Critical, Peak: t.Peak, Flat: t.Flat, Valley: t.Valley, }) } } } } submetersJSON, err := json.Marshal(submeters) if err != nil { return err } insertQuery := goqu.Insert("report_pooled_consumption"). Cols("report_id", "pooled_meter_id", "overall", "critical", "peak", "flat", "valley", "pooled_area", "diluted"). Vals(goqu.Vals{rid, meter.Code, meter.Overall, meter.Critical, meter.Peak, meter.Flat, meter.Valley, meter.CoveredArea, submetersJSON}) insertQueries = append(insertQueries, *insertQuery) } eg, _ := errgroup.WithContext(ctx) for _, insertQuery := range insertQueries { insertQuery := insertQuery // Capture loop variable eg.Go(func() error { sql, args, err := insertQuery.ToSQL() if err != nil { return err } _, err = tx.Exec(ctx, sql, args...) return err }) } return eg.Wait() } func (cr _CalculateRepository) SaveReportTenement(tx pgx.Tx, report model.ReportIndex, tenements []model.Tenement, tenementCharges []calculate.TenementCharge) error { if len(tenements) == 0 { // 如果没有商户则直接返回 return nil } cr.log.Info("保存商户报表。") ctx, cancel := global.TimeoutContext() defer cancel() insertQuery := cr.ds.Insert("report_tenement").Prepared(true) values := []goqu.Record{} for _, tenement := range tenements { charge := findTenementCharge(tenementCharges, tenement.Id) values = append(values, goqu.Record{ "report_id": report.Id, "tenement_id": tenement.Id, "tenement_detail": toJSONString(tenement), "calc_period": report.Period, "overall": toJSONString(charge.Overall), "critical": toJSONString(charge.Critical), "peak": toJSONString(charge.Peak), "flat": toJSONString(charge.Flat), "valley": toJSONString(charge.Valley), "loss": toJSONString(charge.Loss), "basic_fee_pooled": charge.BasicFee, "adjust_fee_pooled": charge.AdjustFee, "loss_fee_pooled": charge.LossPooled, "final_pooled": charge.PublicPooled, "final_charge": charge.FinalCharges, "meters": toJSONString(convertToNestedMeters(charge.Submeters)), "pooled": toJSONString(convertToNestedMeters(charge.Poolings)), }) } sql, params, err := insertQuery.Rows(values).Prepared(true).ToSQL() if err != nil { log.Println("sql出现问题................................") return err } tx.Exec(ctx, sql, params...) if err != nil { return err } return nil } // findTenementCharge 在 TenementCharges 切片中查找指定商户的核算内容 func findTenementCharge(charges []calculate.TenementCharge, tenementID string) calculate.TenementCharge { for _, charge := range charges { if charge.Tenement == tenementID { return charge } } return calculate.TenementCharge{} } // convertToNestedMeters 将 Meter 切片转换为 NestedMeter 切片 func convertToNestedMeters(meters []*calculate.Meter) []NestedMeter { nestedMeters := []NestedMeter{} for _, meter := range meters { nestedMeters = append(nestedMeters, NestedMeter{ Overall: meter.Overall, Critical: meter.Critical, Peak: meter.Peak, Flat: meter.Flat, Valley: meter.Valley, CoveredArea: meter.CoveredArea, }) } return nestedMeters } // toJSONString 将对象转换为 JSON 字符串 func toJSONString(obj interface{}) string { return `"` + strings.ReplaceAll(fmt.Sprintf("%#v", obj), `"`, `\"`) + `"` }