electricity_bill_calc_service/repository/calculate.go

577 lines
18 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package repository
import (
"context"
"electricity_bill_calc/global"
"electricity_bill_calc/logger"
"electricity_bill_calc/model"
"electricity_bill_calc/model/calculate"
"electricity_bill_calc/types"
"encoding/json"
"errors"
"fmt"
"github.com/jackc/pgx/v5"
"github.com/shopspring/decimal"
"golang.org/x/sync/errgroup"
"time"
"github.com/doug-martin/goqu/v9"
_ "github.com/doug-martin/goqu/v9/dialect/postgres"
"github.com/georgysavva/scany/v2/pgxscan"
"go.uber.org/zap"
)
type _CalculateRepository struct {
log *zap.Logger
ds goqu.DialectWrapper
}
var CalculateRepository = _CalculateRepository{
log: logger.Named("Repository", "Calculate"),
ds: goqu.Dialect("postgres"),
}
// 更新当前报表的核算状态
func (cr _CalculateRepository) UpdateReportCalculateStatus(rid string, status string,
message string) (bool, error) {
ctx, cancel := global.TimeoutContext()
defer cancel()
var atio int
var err error
currentTime := time.Now()
if status == "success" {
atio = 1 //创建报表成功
} else {
atio = 2 // 数据不足
}
updateResultSql, updateResultArgs, _ := cr.ds.
Update(goqu.T("report_task")).
Set(goqu.Record{
"status": int16(atio),
"last_modified_at": currentTime,
"message": message,
}).Where(goqu.I("id").Eq(rid)).
ToSQL()
res, err := global.DB.Exec(ctx, updateResultSql, updateResultArgs...)
if err != nil {
cr.log.Error("未能更新当前报表的核算状态", zap.Error(err))
return false, err
}
if res.RowsAffected() == 0 {
cr.log.Warn("未能保存当前报表的核算状态", zap.String("Report", rid))
return false, nil
}
return true, nil
}
// 获取当前正在等待计算的核算任务ID列表
func (cr _CalculateRepository) ListPendingTasks() ([]string, error) {
cr.log.Info("获取当前正在等待计算的核算任务ID列表")
ctx, cancel := global.TimeoutContext()
defer cancel()
var ids []string
querySql, queryArgs, _ := cr.ds.
From("report_task").
Select("id").
Where(goqu.C("status").Eq(model.REPORT_CALCULATE_TASK_STATUS_PENDING)).
Prepared(true).ToSQL()
if err := pgxscan.Select(ctx, global.DB, &ids, querySql, queryArgs...); err != nil {
cr.log.Error("未能获取到当前正在等待计算的核算任务ID列表", zap.Error(err))
return nil, err
}
return ids, nil
}
// 更新指定报表的核算状态
func (cr _CalculateRepository) UpdateReportTaskStatus(rid string, status int16, message *string) (bool, error) {
cr.log.Info("更新指定报表的核算状态", zap.String("Report", rid), zap.Int16("Status", status))
ctx, cancel := global.TimeoutContext()
defer cancel()
currentTime := types.Now()
updateSql, updateArgs, _ := cr.ds.
Update("report_task").
Set(goqu.Record{
"status": status,
"last_modified_at": currentTime,
"message": message,
}).
Where(goqu.C("id").Eq(rid)).
Prepared(true).ToSQL()
res, err := global.DB.Exec(ctx, updateSql, updateArgs...)
if err != nil {
cr.log.Error("未能更新指定报表的核算状态", zap.Error(err))
return false, err
}
if res.RowsAffected() == 0 {
cr.log.Warn("未能保存指定报表的核算状态", zap.String("Report", rid))
return false, nil
}
return res.RowsAffected() > 0, nil
}
// 获取当前园区中所有公摊表计与商户表计之间的关联关系,包括已经解除的
func (cr _CalculateRepository) GetAllPoolingMeterRelations(pid string, revokedAfter time.Time) ([]model.MeterRelation, error) {
cr.log.Info("获取当前园区中所有公摊表计与商户表计之间的关联关系,包括已经解除的", zap.String("pid", pid), zap.Time("revokedAfter", revokedAfter))
ctx, cancel := global.TimeoutContext()
defer cancel()
relationsSql, relationsArgs, _ := cr.ds.
From(goqu.T("meter_relations")).
Where(goqu.I("park_id").Eq(pid)).
Where(goqu.Or(
goqu.I("revoked_at").IsNull(),
goqu.I("revoked_at").Gte(revokedAfter),
)).ToSQL()
var meterRelation []model.MeterRelation
err := pgxscan.Select(ctx, global.DB, &meterRelation, relationsSql, relationsArgs...)
if err != nil {
cr.log.Error("获取当前园区中所有公摊表计与商户表计之间的关联关系,包括已经解除的出错", zap.Error(err))
return nil, err
}
return meterRelation, nil
}
// 获取当前园区中所有的商户与表计的关联关系,包括已经解除的
func (cr _CalculateRepository) GetAllTenementMeterRelations(pid string, associatedBefore time.Time, disassociatedAfter time.Time) ([]model.TenementMeter, error) {
cr.log.Info("获取当前园区中所有的商户与表计的关联关系,包括已经解除的", zap.String("pid", pid), zap.Time("associatedBefore", associatedBefore), zap.Time("disassociatedAfter", disassociatedAfter))
ctx, cancel := global.TimeoutContext()
defer cancel()
relationsQuerySql, relationsQueryArgs, _ := cr.ds.
From(goqu.T("tenement_meter")).
Where(goqu.I("park_id").Eq(pid)).
Where(goqu.And(
goqu.I("associated_at").IsNotNull(),
goqu.I("associated_at").Lte(associatedBefore),
)).
Where(goqu.And(
goqu.Or(
goqu.I("disassociated_at").IsNull(),
goqu.I("disassociated_at").Gte(disassociatedAfter),
),
)).ToSQL()
var tenementMeter []model.TenementMeter
err := pgxscan.Select(ctx, global.DB, &tenementMeter, relationsQuerySql, relationsQueryArgs...)
if err != nil {
cr.log.Error("获取当前园区中所有的商户与表计的关联关系,包括已经解除的", zap.Error(err))
return nil, err
}
fmt.Println("==", tenementMeter)
return tenementMeter, nil
}
// 获取指定报表中所有涉及到的指定类型表计在核算时间段内的所有读数数据
func (cr _CalculateRepository) GetMeterReadings(rid string, meterType int16) ([]model.MeterReading, error) {
cr.log.Info("获取指定报表中所有涉及到的指定类型表计在核算时间段内的所有读数数据", zap.String("rid", rid), zap.Int16("meterType", meterType))
ctx, cancel := global.TimeoutContext()
defer cancel()
readingsQuerySql, readingsQueryArgs, _ := cr.ds.
From(goqu.T("meter_reading").As(goqu.I("mr"))).
Join(
goqu.T("report").As("r"),
goqu.On(goqu.I("r.park_id").Eq(goqu.I("mr.park_id"))),
).
Where(
goqu.I("r.id").Eq(rid),
goqu.I("mr.meter_type").Eq(meterType),
// TODO2023.08.02 此方法出错优先查看是否这里出问题
goqu.L("?::date <@ ?", goqu.I("mr.read_at"), goqu.I("r.period")),
).
Order(goqu.I("mr.read_at").Asc()).Select(goqu.I("mr.*")).ToSQL()
var readings []model.MeterReading
err := pgxscan.Select(ctx, global.DB, &readings, readingsQuerySql, readingsQueryArgs...)
if err != nil {
cr.log.Error("获取指定报表中所有涉及到的指定类型表计在核算时间段内的所有读数数据出错", zap.Error(err))
return nil, err
}
return readings, nil
}
// 获取指定报表中所有涉及到的表计在核算起始日期前的最后一次读数
func (cr _CalculateRepository) GetLastPeriodReadings(rid string, meterType int16) ([]model.MeterReading, error) {
cr.log.Info("获取指定报表中所有涉及到的表计在核算起始日期前的最后一次读数", zap.String("rid", rid), zap.Int16("meterType", meterType))
ctx, cancel := global.TimeoutContext()
defer cancel()
readingsSql, readingsArgs, _ := cr.ds.
From(goqu.T("meter_reading").As("mr")).
Select(
goqu.MAX("mr.read_at").As("read_at"),
goqu.I("mr.park_id"),
goqu.I("mr.meter_id"),
goqu.I("mr.meter_type"),
goqu.I("mr.ratio"),
goqu.I("mr.overall"),
goqu.I("mr.critical"),
goqu.I("mr.peak"),
goqu.I("mr.flat"),
goqu.I("mr.valley"),
).
Join(
goqu.T("report").As("r"),
goqu.On(goqu.I("r.park_id").Eq(goqu.I("mr.park_id"))),
).
Where(
goqu.I("r.id").Eq(rid),
goqu.I("mr.meter_type").Eq(meterType),
goqu.L(" read_at <= lower(r.period)"),
).
GroupBy(
goqu.I("mr.park_id"),
goqu.I("mr.meter_id"),
goqu.I("mr.meter_type"),
goqu.I("mr.ratio"),
goqu.I("mr.overall"),
goqu.I("mr.critical"),
goqu.I("mr.peak"),
goqu.I("mr.flat"),
goqu.I("mr.valley"),
goqu.I("r.period"),
).ToSQL()
var readings []model.MeterReading
err := pgxscan.Select(ctx, global.DB, &readings, readingsSql, readingsArgs...)
if err != nil {
cr.log.Error("获取指定报表中所有涉及到的表计在核算起始日期前的最后一次读数出错", zap.Error(err))
return nil, err
}
return readings, nil
}
// 取得指定报表所涉及的所有商户信息
func (cr _CalculateRepository) GetAllTenements(rid string) ([]model.Tenement, error) {
cr.log.Info("取得指定报表所涉及的所有商户信息", zap.String("rid", rid))
ctx, cancel := global.TimeoutContext()
defer cancel()
tenementQuerySql, tenementQueryArgs, _ := cr.ds.
From(goqu.T("tenement").As("t")).
LeftJoin(
goqu.T("park_building").As("b"),
goqu.On(goqu.I("b.id").Eq(goqu.I("t.building"))),
).
Join(
goqu.T("report").As("r"),
goqu.On(goqu.I("r.park_id").Eq(goqu.I("t.park_id"))),
).
Select(
goqu.I("t.*"),
goqu.I("b.name").As("building_name"),
).
Where(
goqu.I("r.id").Eq(rid),
goqu.L("t.moved_in_at <= upper(r.period)"),
).ToSQL()
fmt.Println(tenementQuerySql)
var tenements []model.Tenement
err := pgxscan.Select(ctx, global.DB, &tenements, tenementQuerySql, tenementQueryArgs...)
if err != nil {
cr.log.Error("取得指定报表所涉及的所有商户信息出错", zap.Error(err))
return nil, err
}
return tenements, nil
}
func (cr _CalculateRepository) ClearReportContent(tx pgx.Tx, rid string) error {
ctx, cancel := global.TimeoutContext()
defer cancel()
querysql, querarg, _ := cr.ds.Delete("report_summary").
Where(goqu.C("report_id").Eq(rid)).ToSQL()
_, err := tx.Exec(ctx, querysql, querarg...)
if err != nil {
return err
}
querysql, querarg, _ = cr.ds.Delete("report_public_consumption").
Where(goqu.C("report_id").Eq(rid)).ToSQL()
_, err = tx.Exec(ctx, querysql, querarg...)
if err != nil {
return err
}
querysql, querarg, _ = cr.ds.Delete("report_pooled_consumption").
Where(goqu.C("report_id").Eq(rid)).ToSQL()
_, err = tx.Exec(ctx, querysql, querarg...)
if err != nil {
return err
}
querysql, querarg, _ = cr.ds.Delete("report_tenement").
Where(goqu.C("report_id").Eq(rid)).ToSQL()
_, err = tx.Exec(ctx, querysql, querarg...)
if err != nil {
return err
}
return nil
}
func (cr _CalculateRepository) SaveReportPublics(tx pgx.Tx, ctx context.Context, rid string, meters []calculate.Meter) error {
if len(meters) == 0 {
// 如果没有公共表计则直接返回
return nil
}
for _, meter := range meters {
// 准备插入表达式
insertExpr := cr.ds.Insert("report_public_consumption").
Cols(
"report_id", "park_meter_id", "overall", "critical", "peak", "flat", "valley",
"loss_adjust", "consumption_total", "loss_adjust_total", "final_total",
)
// 添加值到插入表达式中
overall, _ := json.Marshal(meter.Overall)
criyical, _ := json.Marshal(meter.Critical)
peak, _ := json.Marshal(meter.Peak)
flat, _ := json.Marshal(meter.Flat)
valley, _ := json.Marshal(meter.Valley)
adjustLoss, _ := json.Marshal(meter.AdjustLoss)
insertExpr = insertExpr.Vals(goqu.Vals{
rid,
meter.Code,
overall,
criyical,
peak,
flat,
valley,
adjustLoss,
meter.Overall.Fee,
meter.AdjustLoss.Fee,
meter.Overall.Fee.Add(meter.AdjustLoss.Fee),
})
// 执行插入语句
inserSql, insertArgs, _ := insertExpr.ToSQL()
_, err := tx.Exec(ctx, inserSql, insertArgs...)
if err != nil {
_ = tx.Rollback(ctx)
return fmt.Errorf("保存报表核算概要失败: %w", err)
}
}
return nil
}
func (cr _CalculateRepository) SaveReportSummary(tx pgx.Tx, ctx context.Context, summary calculate.Summary) error {
// 构建插入表达式
Overall, _ := json.Marshal(summary.Overall)
Critical, _ := json.Marshal(summary.Critical)
Peak, _ := json.Marshal(summary.Peak)
Flat, _ := json.Marshal(summary.Flat)
Valley, _ := json.Marshal(summary.Valley)
AuthoizeLoss, _ := json.Marshal(summary.AuthoizeLoss)
insertsql, insertArgs, err := cr.ds.Insert(goqu.T("report_summary")).
Cols(
"report_id", "overall", "critical", "peak", "flat", "valley",
"loss", "loss_fee", "basic_fee", "basic_pooled_price_consumption", "basic_pooled_price_area",
"adjust_fee", "adjust_pooled_price_consumption", "adjust_pooled_price_area",
"loss_diluted_price", "loss_proportion", "final_diluted_overall",
"consumption_fee", "authorize_loss", "overall_area", "total_consumption",
).
Vals(goqu.Vals{
summary.ReportId, Overall, Critical, Peak, Flat,
Valley, summary.Loss, summary.LossFee, summary.BasicFee,
summary.BasicPooledPriceConsumption, summary.BasicPooledPriceArea,
summary.AdjustFee, summary.AdjustPooledPriceConsumption, summary.AdjustPooledPriceArea,
summary.LossDilutedPrice, summary.LossProportion, summary.FinalDilutedOverall,
summary.ConsumptionFee, AuthoizeLoss, summary.OverallArea, summary.TotalConsumption,
}).ToSQL()
if err != nil {
fmt.Println(err)
return err
}
// 执行插入语句
if _, err := tx.Exec(ctx, insertsql, insertArgs...); err != nil {
cr.log.Error("保存报表核算概要失败。")
return err
}
return nil
}
type NestedMeter struct {
Overall model.ConsumptionUnit
Critical model.ConsumptionUnit
Peak model.ConsumptionUnit
Flat model.ConsumptionUnit
Valley model.ConsumptionUnit
CoveredArea decimal.Decimal
// Add other fields here as needed
}
func (cr _CalculateRepository) SaveReportPoolings(tx pgx.Tx,
rid string,
meters []calculate.Meter,
relations []model.MeterRelation,
tenements []calculate.Meter) error {
ctx, cancel := global.TimeoutContext()
defer cancel()
if len(meters) == 0 {
return nil
}
relationsSlaves := make(map[string]bool)
for _, r := range relations {
relationsSlaves[r.SlaveMeter] = true
}
tenementCodes := make(map[string]bool)
for _, t := range tenements {
tenementCodes[t.Code] = true
}
for _, r := range relations {
if _, ok := tenementCodes[r.SlaveMeter]; !ok {
return errors.New("unknown tenement meter in active meter relations")
}
}
var insertQueries []goqu.InsertDataset
for _, meter := range meters {
submeters := make([]NestedMeter, 0)
for _, r := range relations {
if r.MasterMeter == meter.Code {
for _, t := range tenements {
if t.Code == r.SlaveMeter {
submeters = append(submeters, NestedMeter{
Overall: t.Overall,
Critical: t.Critical,
Peak: t.Peak,
Flat: t.Flat,
Valley: t.Valley,
})
}
}
}
}
submetersJSON, err := json.Marshal(submeters)
if err != nil {
return err
}
overall, _ := json.Marshal(meter.Overall)
criyical, _ := json.Marshal(meter.Critical)
peak, _ := json.Marshal(meter.Peak)
flat, _ := json.Marshal(meter.Flat)
valley, _ := json.Marshal(meter.Valley)
insertQuery := goqu.Insert("report_pooled_consumption").
Cols("report_id", "pooled_meter_id", "overall", "critical", "peak", "flat", "valley", "pooled_area", "diluted").
Vals(goqu.Vals{rid, meter.Code, overall, criyical, peak, flat, valley, meter.CoveredArea, submetersJSON})
insertQueries = append(insertQueries, *insertQuery)
}
eg, _ := errgroup.WithContext(ctx)
for _, insertQuery := range insertQueries {
insertQuery := insertQuery // Capture loop variable
eg.Go(func() error {
sql, args, err := insertQuery.ToSQL()
if err != nil {
return err
}
_, err = tx.Exec(ctx, sql, args...)
return err
})
}
return eg.Wait()
}
func (cr _CalculateRepository) SaveReportTenement(tx pgx.Tx, report model.ReportIndex, tenements []model.Tenement, tenementCharges []calculate.TenementCharge) error {
if len(tenements) == 0 {
// 如果没有商户则直接返回
return nil
}
cr.log.Info("保存商户报表。")
ctx, cancel := global.TimeoutContext()
defer cancel()
insertQuery := cr.ds.
Insert("report_tenement")
var rows []goqu.Record
for _, tenement := range tenements {
tenementCharge := findTenementCharge(tenementCharges, tenement.Id)
tenementDetail, _ := json.Marshal(tenement)
overallJSON, _ := json.Marshal(tenementCharge.Overall)
criticalJSON, _ := json.Marshal(tenementCharge.Critical)
peakJSON, _ := json.Marshal(tenementCharge.Peak)
flatJSON, _ := json.Marshal(tenementCharge.Flat)
valleyJSON, _ := json.Marshal(tenementCharge.Valley)
lossJSON, _ := json.Marshal(tenementCharge.Loss)
submetersJSON, _ := json.Marshal(convertToNestedMeters(tenementCharge.Submeters))
poolingsJSON, _ := json.Marshal(convertToNestedMeters(tenementCharge.Poolings))
row := goqu.Record{
"report_id": report.Id,
"tenement_id": tenement.Id,
"tenement_detail": tenementDetail,
"calc_period": report.Period,
"overall": overallJSON,
"critical": criticalJSON,
"peak": peakJSON,
"flat": flatJSON,
"valley": valleyJSON,
"loss": lossJSON,
"basic_fee_pooled": tenementCharge.BasicFee,
"adjust_fee_pooled": tenementCharge.AdjustFee,
"loss_fee_pooled": tenementCharge.LossPooled,
"final_pooled": tenementCharge.PublicPooled,
"final_charge": tenementCharge.FinalCharges,
"meters": submetersJSON,
"pooled": poolingsJSON,
}
rows = append(rows, row)
}
sql, params, err := insertQuery.Rows(rows).Prepared(true).ToSQL()
if err != nil {
fmt.Println(err)
}
_, err = tx.Exec(ctx, sql, params...)
if err != nil {
fmt.Println(err.Error())
return err
}
return nil
}
// findTenementCharge 在 TenementCharges 切片中查找指定商户的核算内容
func findTenementCharge(charges []calculate.TenementCharge, tenementID string) calculate.TenementCharge {
for _, charge := range charges {
if charge.Tenement == tenementID {
return charge
}
}
return calculate.TenementCharge{}
}
// convertToNestedMeters 将 Meter 切片转换为 NestedMeter 切片
func convertToNestedMeters(meters []*calculate.Meter) []NestedMeter {
nestedMeters := []NestedMeter{}
for _, meter := range meters {
nestedMeters = append(nestedMeters, NestedMeter{
Overall: meter.Overall,
Critical: meter.Critical,
Peak: meter.Peak,
Flat: meter.Flat,
Valley: meter.Valley,
CoveredArea: meter.CoveredArea,
})
}
return nestedMeters
}