electricity_bill_calc_service/service/end_user.go

356 lines
11 KiB
Go

package service
import (
"context"
"database/sql"
"electricity_bill_calc/cache"
"electricity_bill_calc/config"
"electricity_bill_calc/excel"
"electricity_bill_calc/exceptions"
"electricity_bill_calc/global"
"electricity_bill_calc/logger"
"electricity_bill_calc/model"
"fmt"
"io"
"strconv"
"github.com/samber/lo"
"github.com/shopspring/decimal"
"github.com/uptrace/bun"
"go.uber.org/zap"
)
type _EndUserService struct {
l *zap.Logger
}
type MeterAppears struct {
Meter string
Appears int64
}
var EndUserService = _EndUserService{
l: logger.Named("Service", "EndUser"),
}
func (_EndUserService) SearchEndUserRecord(reportId, keyword string, page int) ([]model.EndUserDetail, int64, error) {
var (
conditions = make([]string, 0)
endUsers = make([]model.EndUserDetail, 0)
cond = global.DB.NewSelect().Model(&endUsers)
)
conditions = append(conditions, reportId, strconv.Itoa(page))
cond = cond.Where("report_id = ?", reportId)
if len(keyword) > 0 {
keywordCond := "%" + keyword + "%"
cond = cond.WhereGroup(" and ", func(q *bun.SelectQuery) *bun.SelectQuery {
return q.Where("customer_name like ?", keywordCond).
WhereOr("contact_name like ?", keywordCond).
WhereOr("contact_phone like ?", keywordCond).
WhereOr("meter_04kv_id like ?", keywordCond)
})
conditions = append(conditions, keyword)
}
if cachedTotal, err := cache.RetreiveCount("end_user_detail", conditions...); cachedTotal != -1 && err == nil {
if cachedEndUsers, _ := cache.RetreiveSearch[[]model.EndUserDetail]("end_user_detail", conditions...); cachedEndUsers != nil {
return *cachedEndUsers, cachedTotal, nil
}
}
ctx, cancel := global.TimeoutContext()
defer cancel()
startItem := (page - 1) * config.ServiceSettings.ItemsPageSize
total, err := cond.Limit(config.ServiceSettings.ItemsPageSize).
Offset(startItem).
Order("seq asc", "meter_04kv_id asc").
ScanAndCount(ctx)
relations := []string{"end_user", "report", "park"}
for _, eu := range endUsers {
relations = append(relations, fmt.Sprintf("end_user:%s:%s", eu.ReportId, eu.MeterId))
}
cache.CacheCount(relations, "end_user_detail", int64(total), conditions...)
cache.CacheSearch(endUsers, relations, "end_user_detail", conditions...)
return endUsers, int64(total), err
}
func (_EndUserService) AllEndUserRecord(reportId string) ([]model.EndUserDetail, error) {
if cachedEndUsers, _ := cache.RetreiveSearch[[]model.EndUserDetail]("end_user_detail", "report", reportId); cachedEndUsers != nil {
return *cachedEndUsers, nil
}
ctx, cancel := global.TimeoutContext()
defer cancel()
users := make([]model.EndUserDetail, 0)
err := global.DB.NewSelect().Model(&users).
Where("report_id = ?", reportId).
Order("seq asc", "meter_04kv_id asc").
Scan(ctx)
relations := lo.Map(users, func(eu model.EndUserDetail, _ int) string {
return fmt.Sprintf("end_user:%s:%s", eu.ReportId, eu.MeterId)
})
relations = append(relations, "report", "park")
cache.CacheSearch(users, relations, "end_user_detail", "report", reportId)
return users, err
}
func (_EndUserService) FetchSpecificEndUserRecord(reportId, parkId, meterId string) (*model.EndUserDetail, error) {
if cachedEndUser, _ := cache.RetreiveEntity[model.EndUserDetail]("end_user_detail", fmt.Sprintf("%s_%s_%s", reportId, parkId, meterId)); cachedEndUser != nil {
return cachedEndUser, nil
}
ctx, cancel := global.TimeoutContext()
defer cancel()
record := new(model.EndUserDetail)
err := global.DB.NewSelect().Model(record).
Where("report_id = ?", reportId).
Where("park_id = ?", parkId).
Where("meter_04kv_id = ?", meterId).
Scan(ctx)
cache.CacheEntity(record, []string{fmt.Sprintf("end_user:%s:%s", reportId, meterId), "report", "park"}, "end_user_detail", fmt.Sprintf("%s_%s_%s", reportId, parkId, meterId))
return record, err
}
func (_EndUserService) UpdateEndUserRegisterRecord(tx *bun.Tx, ctx *context.Context, record model.EndUserDetail) (err error) {
record.CalculatePeriod()
updateColumns := []string{
"current_period_overall",
"adjust_overall",
"current_period_critical",
"current_period_peak",
"current_period_flat",
"current_perios_valley",
"adjust_critical",
"adjust_peak",
"adjust_flat",
"adjust_valley",
"overall",
"critical",
"peak",
"flat",
"valley",
}
if record.Initialize {
updateColumns = append(updateColumns,
"last_period_overall",
"last_period_critical",
"last_period_peak",
"last_period_flat",
"last_period_valley",
)
}
_, err = tx.NewUpdate().Model(&record).
WherePK().
Column(updateColumns...).
Exec(*ctx)
cache.AbolishRelation(fmt.Sprintf("end_user:%s:%s", record.ReportId, record.MeterId))
return
}
func (_EndUserService) newVirtualExcelAnalysisError(err error) *excel.ExcelAnalysisError {
return &excel.ExcelAnalysisError{Col: -1, Row: -1, Err: excel.AnalysisError{Err: err}}
}
func (es _EndUserService) BatchImportNonPVRegister(reportId string, file io.Reader) *exceptions.BatchError {
ctx, cancel := global.TimeoutContext()
defer cancel()
errs := exceptions.NewBatchError()
users, err := es.AllEndUserRecord(reportId)
if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err))
return errs
}
var reportDetail = new(model.Report)
err = global.DB.NewSelect().Model(reportDetail).
Where("id = ?", reportId).
Scan(ctx)
if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(fmt.Errorf("未能找到相应的报表。%w", err)))
return errs
}
meterAppers := make([]MeterAppears, 0)
err = global.DB.NewSelect().Model((*model.EndUserDetail)(nil)).
ColumnExpr("meter_04kv_id as meter").
ColumnExpr("count(*) as appears").
Where("park_id = ?", reportDetail.Id).
Group("meter_04kv_id").
Scan(ctx, &meterAppers)
if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err))
return errs
}
indexedUsers := lo.Reduce(
users,
func(acc map[string]model.EndUserDetail, elem model.EndUserDetail, index int) map[string]model.EndUserDetail {
acc[elem.MeterId] = elem
return acc
},
make(map[string]model.EndUserDetail, 0),
)
analyzer, err := excel.NewEndUserNonPVExcelAnalyzer(file)
if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err))
return errs
}
imports, excelErrs := analyzer.Analysis(*new(model.EndUserImport))
if len(excelErrs) > 0 {
for _, e := range excelErrs {
errs.AddError(e)
}
return errs
}
tx, err := global.DB.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err))
return errs
}
for _, im := range imports {
if elem, ok := indexedUsers[im.MeterId]; ok {
if appears, has := lo.Find(meterAppers, func(m MeterAppears) bool {
return m.Meter == elem.MeterId
}); has {
if appears.Appears <= 1 {
elem.LastPeriodOverall = im.LastPeriodOverall
elem.LastPeriodCritical = decimal.Zero
elem.LastPeriodPeak = decimal.Zero
elem.LastPeriodValley = decimal.Zero
elem.LastPeriodFlat = elem.LastPeriodOverall.Sub(elem.LastPeriodCritical).Sub(elem.LastPeriodPeak).Sub(elem.LastPeriodValley)
elem.Initialize = true
}
}
elem.CurrentPeriodOverall = im.CurrentPeriodOverall
elem.AdjustOverall = im.AdjustOverall
elem.CurrentPeriodCritical = decimal.Zero
elem.CurrentPeriodPeak = decimal.Zero
elem.CurrentPeriodValley = decimal.Zero
elem.CurrentPeriodFlat = elem.CurrentPeriodOverall.Sub(elem.CurrentPeriodCritical).Sub(elem.CurrentPeriodPeak).Sub(elem.CurrentPeriodValley)
elem.AdjustCritical = decimal.Zero
elem.AdjustPeak = decimal.Zero
elem.AdjustValley = decimal.Zero
elem.AdjustFlat = elem.AdjustOverall.Sub(elem.AdjustCritical).Sub(elem.AdjustPeak).Sub(elem.AdjustValley)
err := es.UpdateEndUserRegisterRecord(&tx, &ctx, elem)
if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err))
}
} else {
errs.AddError(exceptions.NewNotFoundError(fmt.Sprintf("表计 %s 未找到", im.MeterId)))
}
}
if errs.Len() > 0 {
tx.Rollback()
return errs
}
err = tx.Commit()
if err != nil {
tx.Rollback()
errs.AddError(es.newVirtualExcelAnalysisError(err))
}
cache.AbolishRelation("end_user_detail")
return errs
}
func (es _EndUserService) BatchImportPVRegister(reportId string, file io.Reader) *exceptions.BatchError {
ctx, cancel := global.TimeoutContext()
defer cancel()
errs := exceptions.NewBatchError()
users, err := es.AllEndUserRecord(reportId)
if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err))
return errs
}
var reportDetail = new(model.Report)
err = global.DB.NewSelect().Model(reportDetail).Where("id = ?", reportId).Scan(ctx)
if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(fmt.Errorf("未能找到相应的报表。%w", err)))
return errs
}
meterAppers := make([]MeterAppears, 0)
err = global.DB.NewSelect().Model((*model.EndUserDetail)(nil)).
ColumnExpr("meter_04kv_id as meter").
ColumnExpr("count(*) as appears").
Where("park_id = ?", reportDetail.Id).
Group("meter_04kv_id").
Scan(ctx, &meterAppers)
if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err))
return errs
}
indexedUsers := lo.Reduce(
users,
func(acc map[string]model.EndUserDetail, elem model.EndUserDetail, index int) map[string]model.EndUserDetail {
acc[elem.MeterId] = elem
return acc
},
make(map[string]model.EndUserDetail, 0),
)
analyzer, err := excel.NewEndUserPVExcelAnalyzer(file)
if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err))
return errs
}
imports, excelErrs := analyzer.Analysis(*new(model.EndUserImport))
if len(excelErrs) > 0 {
for _, e := range excelErrs {
errs.AddError(e)
}
return errs
}
tx, err := global.DB.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err))
return errs
}
for _, im := range imports {
if elem, ok := indexedUsers[im.MeterId]; ok {
if appears, has := lo.Find(meterAppers, func(m MeterAppears) bool {
return m.Meter == elem.MeterId
}); has {
if appears.Appears <= 1 {
elem.LastPeriodOverall = im.LastPeriodOverall
elem.LastPeriodCritical = im.LastPeriodCritical.Decimal
elem.LastPeriodPeak = im.LastPeriodPeak.Decimal
elem.LastPeriodValley = im.LastPeriodValley.Decimal
elem.LastPeriodFlat = elem.LastPeriodOverall.Sub(elem.LastPeriodCritical).Sub(elem.LastPeriodPeak).Sub(elem.LastPeriodValley)
elem.Initialize = true
}
}
elem.CurrentPeriodOverall = im.CurrentPeriodOverall
elem.AdjustOverall = im.AdjustOverall
elem.CurrentPeriodCritical = im.CurrentPeriodCritical.Decimal
elem.CurrentPeriodPeak = im.CurrentPeriodPeak.Decimal
elem.CurrentPeriodValley = im.CurrentPeriodValley.Decimal
elem.CurrentPeriodFlat = elem.CurrentPeriodOverall.Sub(elem.CurrentPeriodCritical).Sub(elem.CurrentPeriodPeak).Sub(elem.CurrentPeriodValley)
elem.AdjustCritical = im.AdjustCritical.Decimal
elem.AdjustPeak = im.AdjustPeak.Decimal
elem.AdjustValley = im.AdjustValley.Decimal
elem.AdjustFlat = elem.AdjustOverall.Sub(elem.AdjustCritical).Sub(elem.AdjustPeak).Sub(elem.AdjustValley)
err := es.UpdateEndUserRegisterRecord(&tx, &ctx, elem)
if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err))
}
} else {
errs.AddError(es.newVirtualExcelAnalysisError(exceptions.NewNotFoundError(fmt.Sprintf("表计 %s 未找到", im.MeterId))))
}
}
if errs.Len() > 0 {
tx.Rollback()
return errs
}
err = tx.Commit()
if err != nil {
tx.Rollback()
errs.AddError(es.newVirtualExcelAnalysisError(err))
}
cache.AbolishRelation("end_user_detail")
return errs
}