480 lines
16 KiB
Go
480 lines
16 KiB
Go
package service
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"electricity_bill_calc/cache"
|
||
"electricity_bill_calc/config"
|
||
"electricity_bill_calc/excel"
|
||
"electricity_bill_calc/exceptions"
|
||
"electricity_bill_calc/global"
|
||
"electricity_bill_calc/logger"
|
||
"electricity_bill_calc/model"
|
||
"fmt"
|
||
"io"
|
||
"strconv"
|
||
"time"
|
||
|
||
mapset "github.com/deckarep/golang-set/v2"
|
||
"github.com/samber/lo"
|
||
"github.com/shopspring/decimal"
|
||
"github.com/uptrace/bun"
|
||
"go.uber.org/zap"
|
||
)
|
||
|
||
type _EndUserService struct {
|
||
l *zap.Logger
|
||
}
|
||
|
||
type MeterAppears struct {
|
||
Meter string
|
||
Appears int64
|
||
}
|
||
|
||
var EndUserService = _EndUserService{
|
||
l: logger.Named("Service", "EndUser"),
|
||
}
|
||
|
||
func (_EndUserService) SearchEndUserRecord(reportId, keyword string, page int) ([]model.EndUserDetail, int64, error) {
|
||
var (
|
||
conditions = make([]string, 0)
|
||
endUsers = make([]model.EndUserDetail, 0)
|
||
cond = global.DB.NewSelect().Model(&endUsers)
|
||
)
|
||
conditions = append(conditions, reportId, strconv.Itoa(page))
|
||
cond = cond.Where("report_id = ?", reportId)
|
||
if len(keyword) > 0 {
|
||
keywordCond := "%" + keyword + "%"
|
||
cond = cond.WhereGroup(" and ", func(q *bun.SelectQuery) *bun.SelectQuery {
|
||
return q.Where("customer_name like ?", keywordCond).
|
||
WhereOr("contact_name like ?", keywordCond).
|
||
WhereOr("contact_phone like ?", keywordCond).
|
||
WhereOr("meter_04kv_id like ?", keywordCond)
|
||
})
|
||
conditions = append(conditions, keyword)
|
||
}
|
||
if cachedTotal, err := cache.RetreiveCount("end_user_detail", conditions...); cachedTotal != -1 && err == nil {
|
||
if cachedEndUsers, _ := cache.RetreiveSearch[[]model.EndUserDetail]("end_user_detail", conditions...); cachedEndUsers != nil {
|
||
return *cachedEndUsers, cachedTotal, nil
|
||
}
|
||
}
|
||
|
||
ctx, cancel := global.TimeoutContext()
|
||
defer cancel()
|
||
|
||
startItem := (page - 1) * config.ServiceSettings.ItemsPageSize
|
||
total, err := cond.Limit(config.ServiceSettings.ItemsPageSize).
|
||
Offset(startItem).
|
||
Order("seq asc", "meter_04kv_id asc").
|
||
ScanAndCount(ctx)
|
||
|
||
relations := []string{"end_user", "report", "park"}
|
||
for _, eu := range endUsers {
|
||
relations = append(relations, fmt.Sprintf("end_user:%s:%s", eu.ReportId, eu.MeterId))
|
||
}
|
||
cache.CacheCount(relations, "end_user_detail", int64(total), conditions...)
|
||
cache.CacheSearch(endUsers, relations, "end_user_detail", conditions...)
|
||
return endUsers, int64(total), err
|
||
}
|
||
|
||
func (_EndUserService) AllEndUserRecord(reportId string) ([]model.EndUserDetail, error) {
|
||
if cachedEndUsers, _ := cache.RetreiveSearch[[]model.EndUserDetail]("end_user_detail", "report", reportId); cachedEndUsers != nil {
|
||
return *cachedEndUsers, nil
|
||
}
|
||
ctx, cancel := global.TimeoutContext()
|
||
defer cancel()
|
||
|
||
users := make([]model.EndUserDetail, 0)
|
||
err := global.DB.NewSelect().Model(&users).
|
||
Where("report_id = ?", reportId).
|
||
Order("seq asc", "meter_04kv_id asc").
|
||
Scan(ctx)
|
||
relations := lo.Map(users, func(eu model.EndUserDetail, _ int) string {
|
||
return fmt.Sprintf("end_user:%s:%s", eu.ReportId, eu.MeterId)
|
||
})
|
||
relations = append(relations, "report", "park")
|
||
cache.CacheSearch(users, relations, "end_user_detail", "report", reportId)
|
||
return users, err
|
||
}
|
||
|
||
func (_EndUserService) FetchSpecificEndUserRecord(reportId, parkId, meterId string) (*model.EndUserDetail, error) {
|
||
if cachedEndUser, _ := cache.RetreiveEntity[model.EndUserDetail]("end_user_detail", fmt.Sprintf("%s_%s_%s", reportId, parkId, meterId)); cachedEndUser != nil {
|
||
return cachedEndUser, nil
|
||
}
|
||
ctx, cancel := global.TimeoutContext()
|
||
defer cancel()
|
||
|
||
record := new(model.EndUserDetail)
|
||
err := global.DB.NewSelect().Model(record).
|
||
Where("report_id = ?", reportId).
|
||
Where("park_id = ?", parkId).
|
||
Where("meter_04kv_id = ?", meterId).
|
||
Scan(ctx)
|
||
cache.CacheEntity(record, []string{fmt.Sprintf("end_user:%s:%s", reportId, meterId), "report", "park"}, "end_user_detail", fmt.Sprintf("%s_%s_%s", reportId, parkId, meterId))
|
||
return record, err
|
||
}
|
||
|
||
func (_EndUserService) UpdateEndUserRegisterRecord(tx *bun.Tx, ctx *context.Context, record model.EndUserDetail) (err error) {
|
||
record.CalculatePeriod()
|
||
updateColumns := []string{
|
||
"current_period_overall",
|
||
"adjust_overall",
|
||
"current_period_critical",
|
||
"current_period_peak",
|
||
"current_period_flat",
|
||
"current_period_valley",
|
||
"adjust_critical",
|
||
"adjust_peak",
|
||
"adjust_flat",
|
||
"adjust_valley",
|
||
"overall",
|
||
"critical",
|
||
"peak",
|
||
"flat",
|
||
"valley",
|
||
}
|
||
if record.Initialize {
|
||
updateColumns = append(updateColumns,
|
||
"last_period_overall",
|
||
"last_period_critical",
|
||
"last_period_peak",
|
||
"last_period_flat",
|
||
"last_period_valley",
|
||
)
|
||
}
|
||
|
||
_, err = tx.NewUpdate().Model(&record).
|
||
WherePK().
|
||
Column(updateColumns...).
|
||
Exec(*ctx)
|
||
cache.AbolishRelation(fmt.Sprintf("end_user:%s:%s", record.ReportId, record.MeterId))
|
||
return
|
||
}
|
||
|
||
func (_EndUserService) newVirtualExcelAnalysisError(err error) *excel.ExcelAnalysisError {
|
||
return &excel.ExcelAnalysisError{Col: -1, Row: -1, Err: excel.AnalysisError{Err: err}}
|
||
}
|
||
|
||
func (es _EndUserService) BatchImportNonPVRegister(reportId string, file io.Reader) *exceptions.BatchError {
|
||
ctx, cancel := global.TimeoutContext(120)
|
||
defer cancel()
|
||
|
||
errs := exceptions.NewBatchError()
|
||
users, err := es.AllEndUserRecord(reportId)
|
||
if err != nil {
|
||
errs.AddError(es.newVirtualExcelAnalysisError(err))
|
||
return errs
|
||
}
|
||
var reportDetail = new(model.Report)
|
||
err = global.DB.NewSelect().Model(reportDetail).
|
||
Where("id = ?", reportId).
|
||
Scan(ctx)
|
||
if err != nil {
|
||
errs.AddError(es.newVirtualExcelAnalysisError(fmt.Errorf("未能找到相应的报表。%w", err)))
|
||
return errs
|
||
}
|
||
meterAppers := make([]MeterAppears, 0)
|
||
err = global.DB.NewSelect().Model((*model.EndUserDetail)(nil)).
|
||
ColumnExpr("meter_04kv_id as meter").
|
||
ColumnExpr("count(*) as appears").
|
||
Where("park_id = ?", reportDetail.ParkId).
|
||
Group("meter_04kv_id").
|
||
Scan(ctx, &meterAppers)
|
||
if err != nil {
|
||
errs.AddError(es.newVirtualExcelAnalysisError(err))
|
||
return errs
|
||
}
|
||
indexedUsers := lo.Reduce(
|
||
users,
|
||
func(acc map[string]model.EndUserDetail, elem model.EndUserDetail, index int) map[string]model.EndUserDetail {
|
||
acc[elem.MeterId] = elem
|
||
return acc
|
||
},
|
||
make(map[string]model.EndUserDetail, 0),
|
||
)
|
||
analyzer, err := excel.NewEndUserNonPVExcelAnalyzer(file)
|
||
if err != nil {
|
||
errs.AddError(es.newVirtualExcelAnalysisError(err))
|
||
return errs
|
||
}
|
||
imports, excelErrs := analyzer.Analysis(*new(model.EndUserImport))
|
||
if len(excelErrs) > 0 {
|
||
for _, e := range excelErrs {
|
||
errs.AddError(e)
|
||
}
|
||
return errs
|
||
}
|
||
tx, err := global.DB.BeginTx(ctx, &sql.TxOptions{})
|
||
if err != nil {
|
||
errs.AddError(es.newVirtualExcelAnalysisError(err))
|
||
return errs
|
||
}
|
||
|
||
for _, im := range imports {
|
||
if elem, ok := indexedUsers[im.MeterId]; ok {
|
||
if appears, has := lo.Find(meterAppers, func(m MeterAppears) bool {
|
||
return m.Meter == elem.MeterId
|
||
}); has {
|
||
if appears.Appears <= 1 {
|
||
elem.LastPeriodOverall = im.LastPeriodOverall
|
||
elem.LastPeriodCritical = decimal.Zero
|
||
elem.LastPeriodPeak = decimal.Zero
|
||
elem.LastPeriodValley = decimal.Zero
|
||
elem.LastPeriodFlat = elem.LastPeriodOverall.Sub(elem.LastPeriodCritical).Sub(elem.LastPeriodPeak).Sub(elem.LastPeriodValley)
|
||
elem.Initialize = true
|
||
}
|
||
}
|
||
elem.CurrentPeriodOverall = im.CurrentPeriodOverall
|
||
elem.AdjustOverall = im.AdjustOverall
|
||
elem.CurrentPeriodCritical = decimal.Zero
|
||
elem.CurrentPeriodPeak = decimal.Zero
|
||
elem.CurrentPeriodValley = decimal.Zero
|
||
elem.CurrentPeriodFlat = elem.CurrentPeriodOverall.Sub(elem.CurrentPeriodCritical).Sub(elem.CurrentPeriodPeak).Sub(elem.CurrentPeriodValley)
|
||
elem.AdjustCritical = decimal.Zero
|
||
elem.AdjustPeak = decimal.Zero
|
||
elem.AdjustValley = decimal.Zero
|
||
elem.AdjustFlat = elem.AdjustOverall.Sub(elem.AdjustCritical).Sub(elem.AdjustPeak).Sub(elem.AdjustValley)
|
||
err := es.UpdateEndUserRegisterRecord(&tx, &ctx, elem)
|
||
if err != nil {
|
||
errs.AddError(es.newVirtualExcelAnalysisError(err))
|
||
}
|
||
} else {
|
||
errs.AddError(exceptions.NewNotFoundError(fmt.Sprintf("表计 %s 未找到", im.MeterId)))
|
||
}
|
||
}
|
||
if errs.Len() > 0 {
|
||
tx.Rollback()
|
||
return errs
|
||
}
|
||
|
||
err = tx.Commit()
|
||
if err != nil {
|
||
tx.Rollback()
|
||
errs.AddError(es.newVirtualExcelAnalysisError(err))
|
||
}
|
||
cache.AbolishRelation("end_user_detail")
|
||
return errs
|
||
}
|
||
|
||
func (es _EndUserService) BatchImportPVRegister(reportId string, file io.Reader) *exceptions.BatchError {
|
||
ctx, cancel := global.TimeoutContext(120)
|
||
defer cancel()
|
||
|
||
errs := exceptions.NewBatchError()
|
||
users, err := es.AllEndUserRecord(reportId)
|
||
if err != nil {
|
||
errs.AddError(es.newVirtualExcelAnalysisError(err))
|
||
return errs
|
||
}
|
||
var reportDetail = new(model.Report)
|
||
err = global.DB.NewSelect().Model(reportDetail).Where("id = ?", reportId).Scan(ctx)
|
||
if err != nil {
|
||
errs.AddError(es.newVirtualExcelAnalysisError(fmt.Errorf("未能找到相应的报表。%w", err)))
|
||
return errs
|
||
}
|
||
meterAppers := make([]MeterAppears, 0)
|
||
err = global.DB.NewSelect().Model((*model.EndUserDetail)(nil)).
|
||
ColumnExpr("meter_04kv_id as meter").
|
||
ColumnExpr("count(*) as appears").
|
||
Where("park_id = ?", reportDetail.Id).
|
||
Group("meter_04kv_id").
|
||
Scan(ctx, &meterAppers)
|
||
if err != nil {
|
||
errs.AddError(es.newVirtualExcelAnalysisError(err))
|
||
return errs
|
||
}
|
||
indexedUsers := lo.Reduce(
|
||
users,
|
||
func(acc map[string]model.EndUserDetail, elem model.EndUserDetail, index int) map[string]model.EndUserDetail {
|
||
acc[elem.MeterId] = elem
|
||
return acc
|
||
},
|
||
make(map[string]model.EndUserDetail, 0),
|
||
)
|
||
analyzer, err := excel.NewEndUserPVExcelAnalyzer(file)
|
||
if err != nil {
|
||
errs.AddError(es.newVirtualExcelAnalysisError(err))
|
||
return errs
|
||
}
|
||
imports, excelErrs := analyzer.Analysis(*new(model.EndUserImport))
|
||
if len(excelErrs) > 0 {
|
||
for _, e := range excelErrs {
|
||
errs.AddError(e)
|
||
}
|
||
return errs
|
||
}
|
||
|
||
tx, err := global.DB.BeginTx(ctx, &sql.TxOptions{})
|
||
if err != nil {
|
||
errs.AddError(es.newVirtualExcelAnalysisError(err))
|
||
return errs
|
||
}
|
||
|
||
for _, im := range imports {
|
||
if elem, ok := indexedUsers[im.MeterId]; ok {
|
||
if appears, has := lo.Find(meterAppers, func(m MeterAppears) bool {
|
||
return m.Meter == elem.MeterId
|
||
}); has {
|
||
if appears.Appears <= 1 {
|
||
elem.LastPeriodOverall = im.LastPeriodOverall
|
||
elem.LastPeriodCritical = im.LastPeriodCritical.Decimal
|
||
elem.LastPeriodPeak = im.LastPeriodPeak.Decimal
|
||
elem.LastPeriodValley = im.LastPeriodValley.Decimal
|
||
elem.LastPeriodFlat = elem.LastPeriodOverall.Sub(elem.LastPeriodCritical).Sub(elem.LastPeriodPeak).Sub(elem.LastPeriodValley)
|
||
elem.Initialize = true
|
||
}
|
||
}
|
||
elem.CurrentPeriodOverall = im.CurrentPeriodOverall
|
||
elem.AdjustOverall = im.AdjustOverall
|
||
elem.CurrentPeriodCritical = im.CurrentPeriodCritical.Decimal
|
||
elem.CurrentPeriodPeak = im.CurrentPeriodPeak.Decimal
|
||
elem.CurrentPeriodValley = im.CurrentPeriodValley.Decimal
|
||
elem.CurrentPeriodFlat = elem.CurrentPeriodOverall.Sub(elem.CurrentPeriodCritical).Sub(elem.CurrentPeriodPeak).Sub(elem.CurrentPeriodValley)
|
||
elem.AdjustCritical = im.AdjustCritical.Decimal
|
||
elem.AdjustPeak = im.AdjustPeak.Decimal
|
||
elem.AdjustValley = im.AdjustValley.Decimal
|
||
elem.AdjustFlat = elem.AdjustOverall.Sub(elem.AdjustCritical).Sub(elem.AdjustPeak).Sub(elem.AdjustValley)
|
||
err := es.UpdateEndUserRegisterRecord(&tx, &ctx, elem)
|
||
if err != nil {
|
||
errs.AddError(es.newVirtualExcelAnalysisError(err))
|
||
}
|
||
} else {
|
||
errs.AddError(es.newVirtualExcelAnalysisError(exceptions.NewNotFoundError(fmt.Sprintf("表计 %s 未找到", im.MeterId))))
|
||
}
|
||
}
|
||
if errs.Len() > 0 {
|
||
tx.Rollback()
|
||
return errs
|
||
}
|
||
|
||
err = tx.Commit()
|
||
if err != nil {
|
||
tx.Rollback()
|
||
errs.AddError(es.newVirtualExcelAnalysisError(err))
|
||
}
|
||
cache.AbolishRelation("end_user_detail")
|
||
return errs
|
||
}
|
||
|
||
func (es _EndUserService) StatEndUserRecordInPeriod(requestUser, requestPark, startDate, endDate string) ([]model.EndUserPeriodStat, error) {
|
||
var (
|
||
conditions = make([]string, 0)
|
||
relations = []string{
|
||
fmt.Sprintf("park:%s", requestPark),
|
||
"end_user_detail",
|
||
}
|
||
cond = global.DB.NewSelect().
|
||
Model((*model.EndUserDetail)(nil)).
|
||
Relation("Report", func(sq *bun.SelectQuery) *bun.SelectQuery {
|
||
return sq.ExcludeColumn("*")
|
||
}).
|
||
Relation("Park", func(sq *bun.SelectQuery) *bun.SelectQuery {
|
||
return sq.ExcludeColumn("*")
|
||
})
|
||
)
|
||
if len(requestUser) > 0 {
|
||
cond = cond.Where("park.user_id = ?", requestUser)
|
||
conditions = append(conditions, requestUser)
|
||
} else {
|
||
conditions = append(conditions, "_")
|
||
}
|
||
if len(requestPark) > 0 {
|
||
cond = cond.Where("eud.park_id = ?", requestPark)
|
||
conditions = append(conditions, requestPark)
|
||
} else {
|
||
conditions = append(conditions, "_")
|
||
}
|
||
if len(startDate) > 0 {
|
||
parseTime, err := time.Parse("2006-01", startDate)
|
||
if err != nil {
|
||
return make([]model.EndUserPeriodStat, 0), fmt.Errorf("不能解析给定的参数[startDate],%w", err)
|
||
}
|
||
start := model.NewDate(parseTime)
|
||
cond = cond.Where("report.period >= ?::date", start.ToString())
|
||
conditions = append(conditions, startDate)
|
||
} else {
|
||
conditions = append(conditions, "_")
|
||
}
|
||
if len(endDate) > 0 {
|
||
parseTime, err := time.Parse("2006-01", endDate)
|
||
if err != nil {
|
||
return make([]model.EndUserPeriodStat, 0), fmt.Errorf("不能解析给定的参数[endDate],%w", err)
|
||
}
|
||
end := model.NewDate(parseTime)
|
||
cond = cond.Where("report.period <= ?::date", end.ToString())
|
||
conditions = append(conditions, endDate)
|
||
}
|
||
if cached, err := cache.RetreiveSearch[[]model.EndUserPeriodStat]("end_user_stat", conditions...); cached != nil && err == nil {
|
||
return *cached, nil
|
||
}
|
||
ctx, cancel := global.TimeoutContext(120)
|
||
defer cancel()
|
||
var endUserSums []model.EndUserPeriodStat
|
||
err := cond.Column("eud.meter_04kv_id", "eud.park_id").
|
||
ColumnExpr("sum(?) as overall", bun.Ident("eud.overall")).
|
||
ColumnExpr("sum(?) as overall_fee", bun.Ident("eud.overall_fee")).
|
||
ColumnExpr("sum(?) as critical", bun.Ident("eud.critical")).
|
||
ColumnExpr("sum(?) as critical_fee", bun.Ident("eud.critical_fee")).
|
||
ColumnExpr("sum(?) as peak", bun.Ident("eud.peak")).
|
||
ColumnExpr("sum(?) as peak_fee", bun.Ident("eud.peak_fee")).
|
||
ColumnExpr("sum(?) as valley", bun.Ident("eud.valley")).
|
||
ColumnExpr("sum(?) as valley_fee", bun.Ident("eud.valley_fee")).
|
||
ColumnExpr("sum(?) as final_diluted", bun.Ident("eud.final_diluted")).
|
||
Where("report.published = ?", true).
|
||
Group("eud.meter_04kv_id", "eud.park_id").
|
||
Scan(ctx, &endUserSums)
|
||
if err != nil {
|
||
return make([]model.EndUserPeriodStat, 0), fmt.Errorf("未能完成终端用户在指定期限内的统计,%w", err)
|
||
}
|
||
parkIds := lo.Reduce(
|
||
endUserSums,
|
||
func(acc mapset.Set[string], elem model.EndUserPeriodStat, _ int) mapset.Set[string] {
|
||
acc.Add(elem.ParkId)
|
||
return acc
|
||
},
|
||
mapset.NewSet[string](),
|
||
)
|
||
meterArchives := make([]model.Meter04KV, 0)
|
||
if len(parkIds.ToSlice()) > 0 {
|
||
err = global.DB.NewSelect().
|
||
Model(&meterArchives).Relation("ParkDetail").
|
||
Where("park_id in (?)", bun.In(parkIds.ToSlice())).
|
||
Scan(ctx)
|
||
if err != nil {
|
||
return make([]model.EndUserPeriodStat, 0), fmt.Errorf("未能获取到终端表计的最新基础档案,%w", err)
|
||
}
|
||
}
|
||
filledStats := lo.Map(
|
||
endUserSums,
|
||
func(elem model.EndUserPeriodStat, _ int) model.EndUserPeriodStat {
|
||
archive, has := lo.Find(meterArchives, func(meter model.Meter04KV) bool {
|
||
return meter.Code == elem.MeterId
|
||
})
|
||
if has {
|
||
if archive.Address != nil {
|
||
elem.Address = *archive.Address
|
||
} else {
|
||
elem.Address = ""
|
||
}
|
||
if archive.CustomerName != nil {
|
||
elem.CustomerName = *archive.CustomerName
|
||
} else {
|
||
elem.CustomerName = ""
|
||
}
|
||
elem.IsPublicMeter = archive.IsPublicMeter
|
||
elem.Kind = archive.ParkDetail.SubmeterType
|
||
}
|
||
if elem.OverallFee.Valid && elem.AdjustFee.Valid && !elem.OverallFee.Decimal.IsZero() {
|
||
elem.AdjustProportion = decimal.NewNullDecimal(
|
||
elem.AdjustFee.Decimal.Div(elem.OverallFee.Decimal).RoundBank(8),
|
||
)
|
||
} else {
|
||
elem.AdjustProportion = decimal.NullDecimal{}
|
||
}
|
||
return elem
|
||
},
|
||
)
|
||
cache.CacheSearch(filledStats, relations, "end_user_stat", conditions...)
|
||
return filledStats, nil
|
||
}
|