electricity_bill_calc_service/service/end_user.go

471 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"context"
"database/sql"
"electricity_bill_calc/cache"
"electricity_bill_calc/config"
"electricity_bill_calc/excel"
"electricity_bill_calc/exceptions"
"electricity_bill_calc/global"
"electricity_bill_calc/logger"
"electricity_bill_calc/model"
"fmt"
"io"
"strconv"
"time"
mapset "github.com/deckarep/golang-set/v2"
"github.com/samber/lo"
"github.com/shopspring/decimal"
"github.com/uptrace/bun"
"go.uber.org/zap"
)
type _EndUserService struct {
l *zap.Logger
}
type MeterAppears struct {
Meter string
Appears int64
}
var EndUserService = _EndUserService{
l: logger.Named("Service", "EndUser"),
}
func (_EndUserService) SearchEndUserRecord(reportId, keyword string, page int) ([]model.EndUserDetail, int64, error) {
var (
conditions = make([]string, 0)
endUsers = make([]model.EndUserDetail, 0)
cond = global.DB.NewSelect().Model(&endUsers)
)
conditions = append(conditions, reportId, strconv.Itoa(page))
cond = cond.Where("report_id = ?", reportId)
if len(keyword) > 0 {
keywordCond := "%" + keyword + "%"
cond = cond.WhereGroup(" and ", func(q *bun.SelectQuery) *bun.SelectQuery {
return q.Where("customer_name like ?", keywordCond).
WhereOr("contact_name like ?", keywordCond).
WhereOr("contact_phone like ?", keywordCond).
WhereOr("meter_04kv_id like ?", keywordCond)
})
conditions = append(conditions, keyword)
}
if cachedTotal, err := cache.RetreiveCount("end_user_detail", conditions...); cachedTotal != -1 && err == nil {
if cachedEndUsers, _ := cache.RetreiveSearch[[]model.EndUserDetail]("end_user_detail", conditions...); cachedEndUsers != nil {
return *cachedEndUsers, cachedTotal, nil
}
}
ctx, cancel := global.TimeoutContext()
defer cancel()
startItem := (page - 1) * config.ServiceSettings.ItemsPageSize
total, err := cond.Limit(config.ServiceSettings.ItemsPageSize).
Offset(startItem).
Order("seq asc", "meter_04kv_id asc").
ScanAndCount(ctx)
relations := []string{"end_user", "report", "park"}
for _, eu := range endUsers {
relations = append(relations, fmt.Sprintf("end_user:%s:%s", eu.ReportId, eu.MeterId))
}
cache.CacheCount(relations, "end_user_detail", int64(total), conditions...)
cache.CacheSearch(endUsers, relations, "end_user_detail", conditions...)
return endUsers, int64(total), err
}
func (_EndUserService) AllEndUserRecord(reportId string) ([]model.EndUserDetail, error) {
if cachedEndUsers, _ := cache.RetreiveSearch[[]model.EndUserDetail]("end_user_detail", "report", reportId); cachedEndUsers != nil {
return *cachedEndUsers, nil
}
ctx, cancel := global.TimeoutContext()
defer cancel()
users := make([]model.EndUserDetail, 0)
err := global.DB.NewSelect().Model(&users).
Where("report_id = ?", reportId).
Order("seq asc", "meter_04kv_id asc").
Scan(ctx)
relations := lo.Map(users, func(eu model.EndUserDetail, _ int) string {
return fmt.Sprintf("end_user:%s:%s", eu.ReportId, eu.MeterId)
})
relations = append(relations, "report", "park")
cache.CacheSearch(users, relations, "end_user_detail", "report", reportId)
return users, err
}
func (_EndUserService) FetchSpecificEndUserRecord(reportId, parkId, meterId string) (*model.EndUserDetail, error) {
if cachedEndUser, _ := cache.RetreiveEntity[model.EndUserDetail]("end_user_detail", fmt.Sprintf("%s_%s_%s", reportId, parkId, meterId)); cachedEndUser != nil {
return cachedEndUser, nil
}
ctx, cancel := global.TimeoutContext()
defer cancel()
record := new(model.EndUserDetail)
err := global.DB.NewSelect().Model(record).
Where("report_id = ?", reportId).
Where("park_id = ?", parkId).
Where("meter_04kv_id = ?", meterId).
Scan(ctx)
cache.CacheEntity(record, []string{fmt.Sprintf("end_user:%s:%s", reportId, meterId), "report", "park"}, "end_user_detail", fmt.Sprintf("%s_%s_%s", reportId, parkId, meterId))
return record, err
}
func (_EndUserService) UpdateEndUserRegisterRecord(tx *bun.Tx, ctx *context.Context, record model.EndUserDetail) (err error) {
record.CalculatePeriod()
updateColumns := []string{
"current_period_overall",
"adjust_overall",
"current_period_critical",
"current_period_peak",
"current_period_flat",
"current_period_valley",
"adjust_critical",
"adjust_peak",
"adjust_flat",
"adjust_valley",
"overall",
"critical",
"peak",
"flat",
"valley",
}
if record.Initialize {
updateColumns = append(updateColumns,
"last_period_overall",
"last_period_critical",
"last_period_peak",
"last_period_flat",
"last_period_valley",
)
}
_, err = tx.NewUpdate().Model(&record).
WherePK().
Column(updateColumns...).
Exec(*ctx)
cache.AbolishRelation(fmt.Sprintf("end_user:%s:%s", record.ReportId, record.MeterId))
return
}
func (_EndUserService) newVirtualExcelAnalysisError(err error) *excel.ExcelAnalysisError {
return &excel.ExcelAnalysisError{Col: -1, Row: -1, Err: excel.AnalysisError{Err: err}}
}
func (es _EndUserService) BatchImportNonPVRegister(reportId string, file io.Reader) *exceptions.BatchError {
ctx, cancel := global.TimeoutContext(120)
defer cancel()
errs := exceptions.NewBatchError()
users, err := es.AllEndUserRecord(reportId)
if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err))
return errs
}
var reportDetail = new(model.Report)
err = global.DB.NewSelect().Model(reportDetail).
Where("id = ?", reportId).
Scan(ctx)
if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(fmt.Errorf("未能找到相应的报表。%w", err)))
return errs
}
meterAppers := make([]MeterAppears, 0)
err = global.DB.NewSelect().Model((*model.EndUserDetail)(nil)).
ColumnExpr("meter_04kv_id as meter").
ColumnExpr("count(*) as appears").
Where("park_id = ?", reportDetail.ParkId).
Group("meter_04kv_id").
Scan(ctx, &meterAppers)
if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err))
return errs
}
indexedUsers := lo.Reduce(
users,
func(acc map[string]model.EndUserDetail, elem model.EndUserDetail, index int) map[string]model.EndUserDetail {
acc[elem.MeterId] = elem
return acc
},
make(map[string]model.EndUserDetail, 0),
)
analyzer, err := excel.NewEndUserNonPVExcelAnalyzer(file)
if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err))
return errs
}
imports, excelErrs := analyzer.Analysis(*new(model.EndUserImport))
if len(excelErrs) > 0 {
for _, e := range excelErrs {
errs.AddError(e)
}
return errs
}
tx, err := global.DB.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err))
return errs
}
for _, im := range imports {
if elem, ok := indexedUsers[im.MeterId]; ok {
if appears, has := lo.Find(meterAppers, func(m MeterAppears) bool {
return m.Meter == elem.MeterId
}); has {
if appears.Appears <= 1 {
elem.LastPeriodOverall = im.LastPeriodOverall
elem.LastPeriodCritical = decimal.Zero
elem.LastPeriodPeak = decimal.Zero
elem.LastPeriodValley = decimal.Zero
elem.LastPeriodFlat = elem.LastPeriodOverall.Sub(elem.LastPeriodCritical).Sub(elem.LastPeriodPeak).Sub(elem.LastPeriodValley)
elem.Initialize = true
}
}
elem.CurrentPeriodOverall = im.CurrentPeriodOverall
elem.AdjustOverall = im.AdjustOverall
elem.CurrentPeriodCritical = decimal.Zero
elem.CurrentPeriodPeak = decimal.Zero
elem.CurrentPeriodValley = decimal.Zero
elem.CurrentPeriodFlat = elem.CurrentPeriodOverall.Sub(elem.CurrentPeriodCritical).Sub(elem.CurrentPeriodPeak).Sub(elem.CurrentPeriodValley)
elem.AdjustCritical = decimal.Zero
elem.AdjustPeak = decimal.Zero
elem.AdjustValley = decimal.Zero
elem.AdjustFlat = elem.AdjustOverall.Sub(elem.AdjustCritical).Sub(elem.AdjustPeak).Sub(elem.AdjustValley)
err := es.UpdateEndUserRegisterRecord(&tx, &ctx, elem)
if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err))
}
} else {
errs.AddError(exceptions.NewNotFoundError(fmt.Sprintf("表计 %s 未找到", im.MeterId)))
}
}
if errs.Len() > 0 {
tx.Rollback()
return errs
}
err = tx.Commit()
if err != nil {
tx.Rollback()
errs.AddError(es.newVirtualExcelAnalysisError(err))
}
cache.AbolishRelation("end_user_detail")
return errs
}
func (es _EndUserService) BatchImportPVRegister(reportId string, file io.Reader) *exceptions.BatchError {
ctx, cancel := global.TimeoutContext(120)
defer cancel()
errs := exceptions.NewBatchError()
users, err := es.AllEndUserRecord(reportId)
if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err))
return errs
}
var reportDetail = new(model.Report)
err = global.DB.NewSelect().Model(reportDetail).Where("id = ?", reportId).Scan(ctx)
if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(fmt.Errorf("未能找到相应的报表。%w", err)))
return errs
}
meterAppers := make([]MeterAppears, 0)
err = global.DB.NewSelect().Model((*model.EndUserDetail)(nil)).
ColumnExpr("meter_04kv_id as meter").
ColumnExpr("count(*) as appears").
Where("park_id = ?", reportDetail.Id).
Group("meter_04kv_id").
Scan(ctx, &meterAppers)
if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err))
return errs
}
indexedUsers := lo.Reduce(
users,
func(acc map[string]model.EndUserDetail, elem model.EndUserDetail, index int) map[string]model.EndUserDetail {
acc[elem.MeterId] = elem
return acc
},
make(map[string]model.EndUserDetail, 0),
)
analyzer, err := excel.NewEndUserPVExcelAnalyzer(file)
if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err))
return errs
}
imports, excelErrs := analyzer.Analysis(*new(model.EndUserImport))
if len(excelErrs) > 0 {
for _, e := range excelErrs {
errs.AddError(e)
}
return errs
}
tx, err := global.DB.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err))
return errs
}
for _, im := range imports {
if elem, ok := indexedUsers[im.MeterId]; ok {
if appears, has := lo.Find(meterAppers, func(m MeterAppears) bool {
return m.Meter == elem.MeterId
}); has {
if appears.Appears <= 1 {
elem.LastPeriodOverall = im.LastPeriodOverall
elem.LastPeriodCritical = im.LastPeriodCritical.Decimal
elem.LastPeriodPeak = im.LastPeriodPeak.Decimal
elem.LastPeriodValley = im.LastPeriodValley.Decimal
elem.LastPeriodFlat = elem.LastPeriodOverall.Sub(elem.LastPeriodCritical).Sub(elem.LastPeriodPeak).Sub(elem.LastPeriodValley)
elem.Initialize = true
}
}
elem.CurrentPeriodOverall = im.CurrentPeriodOverall
elem.AdjustOverall = im.AdjustOverall
elem.CurrentPeriodCritical = im.CurrentPeriodCritical.Decimal
elem.CurrentPeriodPeak = im.CurrentPeriodPeak.Decimal
elem.CurrentPeriodValley = im.CurrentPeriodValley.Decimal
elem.CurrentPeriodFlat = elem.CurrentPeriodOverall.Sub(elem.CurrentPeriodCritical).Sub(elem.CurrentPeriodPeak).Sub(elem.CurrentPeriodValley)
elem.AdjustCritical = im.AdjustCritical.Decimal
elem.AdjustPeak = im.AdjustPeak.Decimal
elem.AdjustValley = im.AdjustValley.Decimal
elem.AdjustFlat = elem.AdjustOverall.Sub(elem.AdjustCritical).Sub(elem.AdjustPeak).Sub(elem.AdjustValley)
err := es.UpdateEndUserRegisterRecord(&tx, &ctx, elem)
if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err))
}
} else {
errs.AddError(es.newVirtualExcelAnalysisError(exceptions.NewNotFoundError(fmt.Sprintf("表计 %s 未找到", im.MeterId))))
}
}
if errs.Len() > 0 {
tx.Rollback()
return errs
}
err = tx.Commit()
if err != nil {
tx.Rollback()
errs.AddError(es.newVirtualExcelAnalysisError(err))
}
cache.AbolishRelation("end_user_detail")
return errs
}
func (es _EndUserService) StatEndUserRecordInPeriod(requestUser, requestPark, startDate, endDate string) ([]model.EndUserPeriodStat, error) {
var (
conditions = make([]string, 0)
relations = []string{
fmt.Sprintf("park:%s", requestPark),
"end_user_detail",
}
cond = global.DB.NewSelect().
Model((*model.EndUserDetail)(nil)).
Relation("Report", func(sq *bun.SelectQuery) *bun.SelectQuery {
return sq.ExcludeColumn("*")
}).
Relation("Park", func(sq *bun.SelectQuery) *bun.SelectQuery {
return sq.ExcludeColumn("*")
})
)
if len(requestUser) > 0 {
cond = cond.Where("Park.user_id = ?", requestUser)
conditions = append(conditions, requestUser)
} else {
conditions = append(conditions, "_")
}
if len(requestPark) > 0 {
cond = cond.Where("eud.park_id = ?", requestPark)
conditions = append(conditions, requestPark)
} else {
conditions = append(conditions, "_")
}
if len(startDate) > 0 {
parseTime, err := time.Parse("2006-01", startDate)
if err != nil {
return make([]model.EndUserPeriodStat, 0), fmt.Errorf("不能解析给定的参数[startDate]%w", err)
}
start := model.NewDate(parseTime)
cond = cond.Where("report.period >= ?::date", start.ToString())
conditions = append(conditions, startDate)
} else {
conditions = append(conditions, "_")
}
if len(endDate) > 0 {
parseTime, err := time.Parse("2006-01", endDate)
if err != nil {
return make([]model.EndUserPeriodStat, 0), fmt.Errorf("不能解析给定的参数[endDate]%w", err)
}
end := model.NewDate(parseTime)
cond = cond.Where("report.period <= ?::date", end.ToString())
conditions = append(conditions, endDate)
}
if cached, err := cache.RetreiveSearch[[]model.EndUserPeriodStat]("end_user_stat", conditions...); cached != nil && err == nil {
return *cached, nil
}
ctx, cancel := global.TimeoutContext(120)
defer cancel()
var endUserSums []model.EndUserPeriodStat
err := cond.Column("eud.meter_04kv_id").
ColumnExpr("sum(?) as overall", bun.Ident("eud.overall")).
ColumnExpr("sum(?) as overall_fee", bun.Ident("eud.overall_fee")).
ColumnExpr("sum(?) as critical", bun.Ident("eud.critical")).
ColumnExpr("sum(?) as critical_fee", bun.Ident("eud.critical_fee")).
ColumnExpr("sum(?) as peak", bun.Ident("eud.peak")).
ColumnExpr("sum(?) as peak_fee", bun.Ident("eud.peak_fee")).
ColumnExpr("sum(?) as valley", bun.Ident("eud.valley")).
ColumnExpr("sum(?) as valley_fee", bun.Ident("eud.valley_fee")).
ColumnExpr("sum(?) as final_diluted", bun.Ident("eud.final_diluted")).
Where("report.published = ?", true).
Group("eud.meter_04kv_id").
Scan(ctx, &endUserSums)
if err != nil {
return make([]model.EndUserPeriodStat, 0), fmt.Errorf("未能完成终端用户在指定期限内的统计,%w", err)
}
meterIds := lo.Reduce(
endUserSums,
func(acc mapset.Set[string], elem model.EndUserPeriodStat, _ int) mapset.Set[string] {
acc.Add(elem.MeterId)
return acc
},
mapset.NewSet[string](),
)
meterArchives := make([]model.Meter04KV, 0)
if len(meterIds.ToSlice()) > 0 {
err = global.DB.NewSelect().Model(&meterArchives).Relation("ParkDetail").
Where("code in (?)", bun.In(meterIds.ToSlice())).
Scan(ctx)
if err != nil {
return make([]model.EndUserPeriodStat, 0), fmt.Errorf("未能获取到终端表计的最新基础档案,%w", err)
}
}
filledStats := lo.Map(
endUserSums,
func(elem model.EndUserPeriodStat, _ int) model.EndUserPeriodStat {
archive, has := lo.Find(meterArchives, func(meter model.Meter04KV) bool {
return meter.Code == elem.MeterId
})
if has {
elem.Address = archive.Address
elem.CustomerName = archive.CustomerName
elem.IsPublicMeter = archive.IsPublicMeter
elem.Kind = archive.ParkDetail.SubmeterType
}
if elem.OverallFee.Valid && !elem.OverallFee.Decimal.IsZero() {
elem.AdjustProportion = decimal.NewNullDecimal(
elem.AdjustFee.Decimal.Div(elem.OverallFee.Decimal).RoundBank(8),
)
} else {
elem.AdjustProportion = decimal.NullDecimal{}
}
return elem
},
)
cache.CacheSearch(filledStats, relations, "end_user_stat", conditions...)
return filledStats, nil
}