package service import ( "context" "database/sql" "electricity_bill_calc/cache" "electricity_bill_calc/config" "electricity_bill_calc/excel" "electricity_bill_calc/exceptions" "electricity_bill_calc/global" "electricity_bill_calc/logger" "electricity_bill_calc/model" "fmt" "io" "strconv" "time" mapset "github.com/deckarep/golang-set/v2" "github.com/samber/lo" "github.com/shopspring/decimal" "github.com/uptrace/bun" "go.uber.org/zap" ) type _EndUserService struct { l *zap.Logger } type MeterAppears struct { Meter string Appears int64 } var EndUserService = _EndUserService{ l: logger.Named("Service", "EndUser"), } func (_EndUserService) SearchEndUserRecord(reportId, keyword string, page int) ([]model.EndUserDetail, int64, error) { var ( conditions = make([]string, 0) endUsers = make([]model.EndUserDetail, 0) cond = global.DB.NewSelect().Model(&endUsers) ) conditions = append(conditions, reportId, strconv.Itoa(page)) cond = cond.Where("report_id = ?", reportId) if len(keyword) > 0 { keywordCond := "%" + keyword + "%" cond = cond.WhereGroup(" and ", func(q *bun.SelectQuery) *bun.SelectQuery { return q.Where("customer_name like ?", keywordCond). WhereOr("contact_name like ?", keywordCond). WhereOr("contact_phone like ?", keywordCond). WhereOr("meter_04kv_id like ?", keywordCond) }) conditions = append(conditions, keyword) } if cachedTotal, err := cache.RetreiveCount("end_user_detail", conditions...); cachedTotal != -1 && err == nil { if cachedEndUsers, _ := cache.RetreiveSearch[[]model.EndUserDetail]("end_user_detail", conditions...); cachedEndUsers != nil { return *cachedEndUsers, cachedTotal, nil } } ctx, cancel := global.TimeoutContext() defer cancel() startItem := (page - 1) * config.ServiceSettings.ItemsPageSize total, err := cond.Limit(config.ServiceSettings.ItemsPageSize). Offset(startItem). Order("seq asc", "meter_04kv_id asc"). ScanAndCount(ctx) relations := []string{"end_user", "report", "park"} for _, eu := range endUsers { relations = append(relations, fmt.Sprintf("end_user:%s:%s", eu.ReportId, eu.MeterId)) } cache.CacheCount(relations, "end_user_detail", int64(total), conditions...) cache.CacheSearch(endUsers, relations, "end_user_detail", conditions...) return endUsers, int64(total), err } func (_EndUserService) AllEndUserRecord(reportId string) ([]model.EndUserDetail, error) { if cachedEndUsers, _ := cache.RetreiveSearch[[]model.EndUserDetail]("end_user_detail", "report", reportId); cachedEndUsers != nil { return *cachedEndUsers, nil } ctx, cancel := global.TimeoutContext() defer cancel() users := make([]model.EndUserDetail, 0) err := global.DB.NewSelect().Model(&users). Where("report_id = ?", reportId). Order("seq asc", "meter_04kv_id asc"). Scan(ctx) relations := lo.Map(users, func(eu model.EndUserDetail, _ int) string { return fmt.Sprintf("end_user:%s:%s", eu.ReportId, eu.MeterId) }) relations = append(relations, "report", "park") cache.CacheSearch(users, relations, "end_user_detail", "report", reportId) return users, err } func (_EndUserService) FetchSpecificEndUserRecord(reportId, parkId, meterId string) (*model.EndUserDetail, error) { if cachedEndUser, _ := cache.RetreiveEntity[model.EndUserDetail]("end_user_detail", fmt.Sprintf("%s_%s_%s", reportId, parkId, meterId)); cachedEndUser != nil { return cachedEndUser, nil } ctx, cancel := global.TimeoutContext() defer cancel() record := new(model.EndUserDetail) err := global.DB.NewSelect().Model(record). Where("report_id = ?", reportId). Where("park_id = ?", parkId). Where("meter_04kv_id = ?", meterId). Scan(ctx) cache.CacheEntity(record, []string{fmt.Sprintf("end_user:%s:%s", reportId, meterId), "report", "park"}, "end_user_detail", fmt.Sprintf("%s_%s_%s", reportId, parkId, meterId)) return record, err } func (_EndUserService) UpdateEndUserRegisterRecord(tx *bun.Tx, ctx *context.Context, record model.EndUserDetail) (err error) { record.CalculatePeriod() updateColumns := []string{ "current_period_overall", "adjust_overall", "current_period_critical", "current_period_peak", "current_period_flat", "current_period_valley", "adjust_critical", "adjust_peak", "adjust_flat", "adjust_valley", "overall", "critical", "peak", "flat", "valley", } if record.Initialize { updateColumns = append(updateColumns, "last_period_overall", "last_period_critical", "last_period_peak", "last_period_flat", "last_period_valley", ) } _, err = tx.NewUpdate().Model(&record). WherePK(). Column(updateColumns...). Exec(*ctx) cache.AbolishRelation(fmt.Sprintf("end_user:%s:%s", record.ReportId, record.MeterId)) return } func (_EndUserService) newVirtualExcelAnalysisError(err error) *excel.ExcelAnalysisError { return &excel.ExcelAnalysisError{Col: -1, Row: -1, Err: excel.AnalysisError{Err: err}} } func (es _EndUserService) BatchImportNonPVRegister(reportId string, file io.Reader) *exceptions.BatchError { ctx, cancel := global.TimeoutContext(120) defer cancel() errs := exceptions.NewBatchError() users, err := es.AllEndUserRecord(reportId) if err != nil { errs.AddError(es.newVirtualExcelAnalysisError(err)) return errs } var reportDetail = new(model.Report) err = global.DB.NewSelect().Model(reportDetail). Where("id = ?", reportId). Scan(ctx) if err != nil { errs.AddError(es.newVirtualExcelAnalysisError(fmt.Errorf("未能找到相应的报表。%w", err))) return errs } meterAppers := make([]MeterAppears, 0) err = global.DB.NewSelect().Model((*model.EndUserDetail)(nil)). ColumnExpr("meter_04kv_id as meter"). ColumnExpr("count(*) as appears"). Where("park_id = ?", reportDetail.ParkId). Group("meter_04kv_id"). Scan(ctx, &meterAppers) if err != nil { errs.AddError(es.newVirtualExcelAnalysisError(err)) return errs } indexedUsers := lo.Reduce( users, func(acc map[string]model.EndUserDetail, elem model.EndUserDetail, index int) map[string]model.EndUserDetail { acc[elem.MeterId] = elem return acc }, make(map[string]model.EndUserDetail, 0), ) analyzer, err := excel.NewEndUserNonPVExcelAnalyzer(file) if err != nil { errs.AddError(es.newVirtualExcelAnalysisError(err)) return errs } imports, excelErrs := analyzer.Analysis(*new(model.EndUserImport)) if len(excelErrs) > 0 { for _, e := range excelErrs { errs.AddError(e) } return errs } tx, err := global.DB.BeginTx(ctx, &sql.TxOptions{}) if err != nil { errs.AddError(es.newVirtualExcelAnalysisError(err)) return errs } for _, im := range imports { if elem, ok := indexedUsers[im.MeterId]; ok { if appears, has := lo.Find(meterAppers, func(m MeterAppears) bool { return m.Meter == elem.MeterId }); has { if appears.Appears <= 1 { elem.LastPeriodOverall = im.LastPeriodOverall elem.LastPeriodCritical = decimal.Zero elem.LastPeriodPeak = decimal.Zero elem.LastPeriodValley = decimal.Zero elem.LastPeriodFlat = elem.LastPeriodOverall.Sub(elem.LastPeriodCritical).Sub(elem.LastPeriodPeak).Sub(elem.LastPeriodValley) elem.Initialize = true } } elem.CurrentPeriodOverall = im.CurrentPeriodOverall elem.AdjustOverall = im.AdjustOverall elem.CurrentPeriodCritical = decimal.Zero elem.CurrentPeriodPeak = decimal.Zero elem.CurrentPeriodValley = decimal.Zero elem.CurrentPeriodFlat = elem.CurrentPeriodOverall.Sub(elem.CurrentPeriodCritical).Sub(elem.CurrentPeriodPeak).Sub(elem.CurrentPeriodValley) elem.AdjustCritical = decimal.Zero elem.AdjustPeak = decimal.Zero elem.AdjustValley = decimal.Zero elem.AdjustFlat = elem.AdjustOverall.Sub(elem.AdjustCritical).Sub(elem.AdjustPeak).Sub(elem.AdjustValley) err := es.UpdateEndUserRegisterRecord(&tx, &ctx, elem) if err != nil { errs.AddError(es.newVirtualExcelAnalysisError(err)) } } else { errs.AddError(exceptions.NewNotFoundError(fmt.Sprintf("表计 %s 未找到", im.MeterId))) } } if errs.Len() > 0 { tx.Rollback() return errs } err = tx.Commit() if err != nil { tx.Rollback() errs.AddError(es.newVirtualExcelAnalysisError(err)) } cache.AbolishRelation("end_user_detail") return errs } func (es _EndUserService) BatchImportPVRegister(reportId string, file io.Reader) *exceptions.BatchError { ctx, cancel := global.TimeoutContext(120) defer cancel() errs := exceptions.NewBatchError() users, err := es.AllEndUserRecord(reportId) if err != nil { errs.AddError(es.newVirtualExcelAnalysisError(err)) return errs } var reportDetail = new(model.Report) err = global.DB.NewSelect().Model(reportDetail).Where("id = ?", reportId).Scan(ctx) if err != nil { errs.AddError(es.newVirtualExcelAnalysisError(fmt.Errorf("未能找到相应的报表。%w", err))) return errs } meterAppers := make([]MeterAppears, 0) err = global.DB.NewSelect().Model((*model.EndUserDetail)(nil)). ColumnExpr("meter_04kv_id as meter"). ColumnExpr("count(*) as appears"). Where("park_id = ?", reportDetail.Id). Group("meter_04kv_id"). Scan(ctx, &meterAppers) if err != nil { errs.AddError(es.newVirtualExcelAnalysisError(err)) return errs } indexedUsers := lo.Reduce( users, func(acc map[string]model.EndUserDetail, elem model.EndUserDetail, index int) map[string]model.EndUserDetail { acc[elem.MeterId] = elem return acc }, make(map[string]model.EndUserDetail, 0), ) analyzer, err := excel.NewEndUserPVExcelAnalyzer(file) if err != nil { errs.AddError(es.newVirtualExcelAnalysisError(err)) return errs } imports, excelErrs := analyzer.Analysis(*new(model.EndUserImport)) if len(excelErrs) > 0 { for _, e := range excelErrs { errs.AddError(e) } return errs } tx, err := global.DB.BeginTx(ctx, &sql.TxOptions{}) if err != nil { errs.AddError(es.newVirtualExcelAnalysisError(err)) return errs } for _, im := range imports { if elem, ok := indexedUsers[im.MeterId]; ok { if appears, has := lo.Find(meterAppers, func(m MeterAppears) bool { return m.Meter == elem.MeterId }); has { if appears.Appears <= 1 { elem.LastPeriodOverall = im.LastPeriodOverall elem.LastPeriodCritical = im.LastPeriodCritical.Decimal elem.LastPeriodPeak = im.LastPeriodPeak.Decimal elem.LastPeriodValley = im.LastPeriodValley.Decimal elem.LastPeriodFlat = elem.LastPeriodOverall.Sub(elem.LastPeriodCritical).Sub(elem.LastPeriodPeak).Sub(elem.LastPeriodValley) elem.Initialize = true } } elem.CurrentPeriodOverall = im.CurrentPeriodOverall elem.AdjustOverall = im.AdjustOverall elem.CurrentPeriodCritical = im.CurrentPeriodCritical.Decimal elem.CurrentPeriodPeak = im.CurrentPeriodPeak.Decimal elem.CurrentPeriodValley = im.CurrentPeriodValley.Decimal elem.CurrentPeriodFlat = elem.CurrentPeriodOverall.Sub(elem.CurrentPeriodCritical).Sub(elem.CurrentPeriodPeak).Sub(elem.CurrentPeriodValley) elem.AdjustCritical = im.AdjustCritical.Decimal elem.AdjustPeak = im.AdjustPeak.Decimal elem.AdjustValley = im.AdjustValley.Decimal elem.AdjustFlat = elem.AdjustOverall.Sub(elem.AdjustCritical).Sub(elem.AdjustPeak).Sub(elem.AdjustValley) err := es.UpdateEndUserRegisterRecord(&tx, &ctx, elem) if err != nil { errs.AddError(es.newVirtualExcelAnalysisError(err)) } } else { errs.AddError(es.newVirtualExcelAnalysisError(exceptions.NewNotFoundError(fmt.Sprintf("表计 %s 未找到", im.MeterId)))) } } if errs.Len() > 0 { tx.Rollback() return errs } err = tx.Commit() if err != nil { tx.Rollback() errs.AddError(es.newVirtualExcelAnalysisError(err)) } cache.AbolishRelation("end_user_detail") return errs } func (es _EndUserService) StatEndUserRecordInPeriod(requestUser, requestPark, startDate, endDate string) ([]model.EndUserPeriodStat, error) { var ( conditions = make([]string, 0) relations = []string{ fmt.Sprintf("park:%s", requestPark), "end_user_detail", } cond = global.DB.NewSelect(). Model((*model.EndUserDetail)(nil)). Relation("Report", func(sq *bun.SelectQuery) *bun.SelectQuery { return sq.ExcludeColumn("*") }). Relation("Park", func(sq *bun.SelectQuery) *bun.SelectQuery { return sq.ExcludeColumn("*") }) ) if len(requestUser) > 0 { cond = cond.Where("park.user_id = ?", requestUser) conditions = append(conditions, requestUser) } else { conditions = append(conditions, "_") } if len(requestPark) > 0 { cond = cond.Where("eud.park_id = ?", requestPark) conditions = append(conditions, requestPark) } else { conditions = append(conditions, "_") } if len(startDate) > 0 { parseTime, err := time.Parse("2006-01", startDate) if err != nil { return make([]model.EndUserPeriodStat, 0), fmt.Errorf("不能解析给定的参数[startDate],%w", err) } start := model.NewDate(parseTime) cond = cond.Where("report.period >= ?::date", start.ToString()) conditions = append(conditions, startDate) } else { conditions = append(conditions, "_") } if len(endDate) > 0 { parseTime, err := time.Parse("2006-01", endDate) if err != nil { return make([]model.EndUserPeriodStat, 0), fmt.Errorf("不能解析给定的参数[endDate],%w", err) } end := model.NewDate(parseTime) cond = cond.Where("report.period <= ?::date", end.ToString()) conditions = append(conditions, endDate) } if cached, err := cache.RetreiveSearch[[]model.EndUserPeriodStat]("end_user_stat", conditions...); cached != nil && err == nil { return *cached, nil } ctx, cancel := global.TimeoutContext(120) defer cancel() var endUserSums []model.EndUserPeriodStat err := cond.Column("eud.meter_04kv_id"). ColumnExpr("sum(?) as overall", bun.Ident("eud.overall")). ColumnExpr("sum(?) as overall_fee", bun.Ident("eud.overall_fee")). ColumnExpr("sum(?) as critical", bun.Ident("eud.critical")). ColumnExpr("sum(?) as critical_fee", bun.Ident("eud.critical_fee")). ColumnExpr("sum(?) as peak", bun.Ident("eud.peak")). ColumnExpr("sum(?) as peak_fee", bun.Ident("eud.peak_fee")). ColumnExpr("sum(?) as valley", bun.Ident("eud.valley")). ColumnExpr("sum(?) as valley_fee", bun.Ident("eud.valley_fee")). ColumnExpr("sum(?) as final_diluted", bun.Ident("eud.final_diluted")). Where("report.published = ?", true). Group("eud.meter_04kv_id"). Scan(ctx, &endUserSums) if err != nil { return make([]model.EndUserPeriodStat, 0), fmt.Errorf("未能完成终端用户在指定期限内的统计,%w", err) } meterIds := lo.Reduce( endUserSums, func(acc mapset.Set[string], elem model.EndUserPeriodStat, _ int) mapset.Set[string] { acc.Add(elem.MeterId) return acc }, mapset.NewSet[string](), ) meterArchives := make([]model.Meter04KV, 0) if len(meterIds.ToSlice()) > 0 { err = global.DB.NewSelect().Model(&meterArchives).Relation("ParkDetail"). Where("code in (?)", bun.In(meterIds.ToSlice())). Scan(ctx) if err != nil { return make([]model.EndUserPeriodStat, 0), fmt.Errorf("未能获取到终端表计的最新基础档案,%w", err) } } filledStats := lo.Map( endUserSums, func(elem model.EndUserPeriodStat, _ int) model.EndUserPeriodStat { archive, has := lo.Find(meterArchives, func(meter model.Meter04KV) bool { return meter.Code == elem.MeterId }) if has { if archive.Address != nil { elem.Address = *archive.Address } else { elem.Address = "" } if archive.CustomerName != nil { elem.CustomerName = *archive.CustomerName } else { elem.CustomerName = "" } elem.IsPublicMeter = archive.IsPublicMeter elem.Kind = archive.ParkDetail.SubmeterType } if elem.OverallFee.Valid && elem.AdjustFee.Valid && !elem.OverallFee.Decimal.IsZero() { elem.AdjustProportion = decimal.NewNullDecimal( elem.AdjustFee.Decimal.Div(elem.OverallFee.Decimal).RoundBank(8), ) } else { elem.AdjustProportion = decimal.NullDecimal{} } return elem }, ) cache.CacheSearch(filledStats, relations, "end_user_stat", conditions...) return filledStats, nil }