package repository import ( "electricity_bill_calc/global" "electricity_bill_calc/logger" "electricity_bill_calc/model" "electricity_bill_calc/types" "time" "github.com/doug-martin/goqu/v9" _ "github.com/doug-martin/goqu/v9/dialect/postgres" "github.com/georgysavva/scany/v2/pgxscan" "go.uber.org/zap" ) type _CalculateRepository struct { log *zap.Logger ds goqu.DialectWrapper } var CalculateRepository = _CalculateRepository{ log: logger.Named("Repository", "Calculate"), ds: goqu.Dialect("postgres"), } // 获取当前正在等待计算的核算任务ID列表 func (cr _CalculateRepository) ListPendingTasks() ([]string, error) { cr.log.Info("获取当前正在等待计算的核算任务ID列表") ctx, cancel := global.TimeoutContext() defer cancel() var ids []string querySql, queryArgs, _ := cr.ds. From("report_task"). Select("id"). Where(goqu.C("status").Eq(model.REPORT_CALCULATE_TASK_STATUS_PENDING)). Prepared(true).ToSQL() if err := pgxscan.Select(ctx, global.DB, &ids, querySql, queryArgs...); err != nil { cr.log.Error("未能获取到当前正在等待计算的核算任务ID列表", zap.Error(err)) return nil, err } return ids, nil } // 更新指定报表的核算状态 func (cr _CalculateRepository) UpdateReportTaskStatus(rid string, status int16, message *string) (bool, error) { cr.log.Info("更新指定报表的核算状态", zap.String("Report", rid), zap.Int16("Status", status)) ctx, cancel := global.TimeoutContext() defer cancel() currentTime := types.Now() updateSql, updateArgs, _ := cr.ds. Update("report_task"). Set(goqu.Record{ "status": status, "last_modified_at": currentTime, "message": message, }). Where(goqu.C("id").Eq(rid)). Prepared(true).ToSQL() res, err := global.DB.Exec(ctx, updateSql, updateArgs...) if err != nil { cr.log.Error("未能更新指定报表的核算状态", zap.Error(err)) return false, err } if res.RowsAffected() == 0 { cr.log.Warn("未能保存指定报表的核算状态", zap.String("Report", rid)) return false, nil } return res.RowsAffected() > 0, nil } //获取当前园区中所有公摊表计与商户表计之间的关联关系,包括已经解除的 func (cr _CalculateRepository) GetAllPoolingMeterRelations(pid string, revokedAfter time.Time) ([]model.MeterRelation, error) { cr.log.Info("获取当前园区中所有公摊表计与商户表计之间的关联关系,包括已经解除的", zap.String("pid", pid), zap.Time("revokedAfter", revokedAfter)) ctx, cancel := global.TimeoutContext() defer cancel() relationsSql, relationsArgs, _ := cr.ds. From(goqu.T("meter_relations")). Where(goqu.I("park_id").Eq(pid)). Where(goqu.Or( goqu.I("revoked_at").IsNull(), goqu.I("revoked_at").Gte(revokedAfter), )).ToSQL() var meterRelation []model.MeterRelation err := pgxscan.Select(ctx, global.DB, meterRelation, relationsSql, relationsArgs...) if err != nil { cr.log.Error("获取当前园区中所有公摊表计与商户表计之间的关联关系,包括已经解除的出错", zap.Error(err)) return nil, err } return meterRelation, nil } //获取当前园区中所有的商户与表计的关联关系,包括已经解除的 func (cr _CalculateRepository) GetAllTenementMeterRelations(pid string, associatedBefore time.Time, disassociatedAfter time.Time) ([]model.TenementMeter, error) { cr.log.Info("获取当前园区中所有的商户与表计的关联关系,包括已经解除的", zap.String("pid", pid), zap.Time("associatedBefore", associatedBefore), zap.Time("disassociatedAfter", disassociatedAfter)) ctx, cancel := global.TimeoutContext() defer cancel() relationsQuerySql, relationsQueryArgs, _ := cr.ds. From(goqu.T("tenement_meter")). Where(goqu.I("park_id").Eq(pid)). Where(goqu.And( goqu.I("associated_at").IsNull(), goqu.I("associated_at").Lte(associatedBefore), )). Where(goqu.And( goqu.I("associated_at").IsNull(), goqu.I("associated_at").Gte(disassociatedAfter), )).ToSQL() var tenementMeter []model.TenementMeter err := pgxscan.Select(ctx, global.DB, tenementMeter, relationsQuerySql, relationsQueryArgs...) if err != nil { cr.log.Error("获取当前园区中所有的商户与表计的关联关系,包括已经解除的", zap.Error(err)) return nil, err } return tenementMeter, nil } //获取指定报表中所有涉及到的指定类型表计在核算时间段内的所有读数数据 func (cr _CalculateRepository) GetMeterReadings(rid string, meterType int16) ([]model.MeterReading, error) { cr.log.Info("获取指定报表中所有涉及到的指定类型表计在核算时间段内的所有读数数据", zap.String("rid", rid), zap.Int16("meterType", meterType)) ctx, cancel := global.TimeoutContext() defer cancel() readingsQuerySql, readingsQueryArgs, _ := cr.ds. From(goqu.T("meter_reading").As(goqu.I("mr"))). Join( goqu.T("report").As("r"), goqu.On(goqu.I("r.park_id").Eq(goqu.I("mr.park_id"))), ). Where( goqu.I("r.id").Eq(rid), goqu.I("mr.meter_type").Eq(meterType), // TODO:2023.08.02 此方法出错优先查看是否这里出问题 goqu.I("mr.read_at::date <@ r.period"), ). Order(goqu.I("mr.read_at").Asc()).Select(goqu.I("mr.*")).ToSQL() var readings []model.MeterReading err := pgxscan.Select(ctx, global.DB, readings, readingsQuerySql, readingsQueryArgs...) if err != nil { cr.log.Error("获取指定报表中所有涉及到的指定类型表计在核算时间段内的所有读数数据出错", zap.Error(err)) return nil, err } return readings, nil } // 获取指定报表中所有涉及到的表计在核算起始日期前的最后一次读数 func (cr _CalculateRepository) GetLastPeriodReadings(rid string, meterType int16) ([]model.MeterReading, error) { cr.log.Info("获取指定报表中所有涉及到的表计在核算起始日期前的最后一次读数", zap.String("rid", rid), zap.Int16("meterType", meterType)) ctx, cancel := global.TimeoutContext() defer cancel() readingsSql, readingsArgs, _ := cr.ds.From(goqu.T("meter_reading").As("mr")). Select( goqu.MAX("mr.read_at").As("read_at"), goqu.I("mr.park_id"), goqu.I("mr.meter_id"), goqu.I("mr.meter_type"), goqu.I("mr.ratio"), goqu.I("mr.overall"), goqu.I("mr.critical"), goqu.I("mr.peak"), goqu.I("mr.flat"), goqu.I("mr.valley"), ). Join( goqu.T("report").As("r"), goqu.On(goqu.I("r.park_id").Eq(goqu.I("mr.park_id"))), ). Where( goqu.I("r.id").Eq(rid), goqu.I("mr.meter_type").Eq(meterType), goqu.I(" mr.read_at::date <= lower(r.period)"), ). GroupBy( goqu.I("mr.park_id"), goqu.I("mr.meter_id"), goqu.I("mr.meter_type"), goqu.I("mr.ratio"), goqu.I("mr.overall"), goqu.I("mr.critical"), goqu.I("mr.peak"), goqu.I("mr.flat"), goqu.I("mr.valley"), goqu.I("r.period"), ).ToSQL() var readings []model.MeterReading err := pgxscan.Select(ctx, global.DB, readings, readingsSql, readingsArgs...) if err != nil { cr.log.Error("获取指定报表中所有涉及到的表计在核算起始日期前的最后一次读数出错", zap.Error(err)) return nil, err } return readings, nil } // 取得指定报表所涉及的所有商户信息 func (cr _CalculateRepository) GetAllTenements(rid string) ([]model.Tenement, error) { cr.log.Info("取得指定报表所涉及的所有商户信息", zap.String("rid", rid)) ctx, cancel := global.TimeoutContext() defer cancel() tenementQuerySql, tenementQueryArgs, _ := cr.ds. From(goqu.T("tenement").As("t")). LeftJoin( goqu.T("park_building").As("b"), goqu.On(goqu.I("b.id").Eq(goqu.I("t.building"))), ). Join( goqu.T("report").As("r"), goqu.On(goqu.I("r.park_id").Eq(goqu.I("t.park_id"))), ). Select( goqu.I("t.*"), goqu.I("b.name").As("building_name"), ). Where( goqu.I("r.id").Eq(rid), goqu.I("t.moved_in_at <= upper(r.period)"), ).ToSQL() var tenements []model.Tenement err := pgxscan.Select(ctx, global.DB, tenements, tenementQuerySql, tenementQueryArgs...) if err != nil { cr.log.Error("取得指定报表所涉及的所有商户信息出错", zap.Error(err)) return nil, err } return tenements, nil }