refactor(enduser):终端用户抄表控制基本完成迁移。

This commit is contained in:
徐涛 2022-09-17 07:05:17 +08:00
parent e250ef6792
commit fc3f931362

View File

@ -1,11 +1,14 @@
package service package service
import ( import (
"context"
"database/sql"
"electricity_bill_calc/cache" "electricity_bill_calc/cache"
"electricity_bill_calc/config" "electricity_bill_calc/config"
"electricity_bill_calc/excel" "electricity_bill_calc/excel"
"electricity_bill_calc/exceptions" "electricity_bill_calc/exceptions"
"electricity_bill_calc/global" "electricity_bill_calc/global"
"electricity_bill_calc/logger"
"electricity_bill_calc/model" "electricity_bill_calc/model"
"errors" "errors"
"fmt" "fmt"
@ -14,73 +17,82 @@ import (
"github.com/samber/lo" "github.com/samber/lo"
"github.com/shopspring/decimal" "github.com/shopspring/decimal"
"xorm.io/builder" "github.com/uptrace/bun"
"xorm.io/xorm" "go.uber.org/zap"
"xorm.io/xorm/schemas"
) )
type _EndUserService struct{} type _EndUserService struct {
l *zap.Logger
}
type MeterAppears struct { type MeterAppears struct {
Meter string Meter string
Appears int64 Appears int64
} }
var EndUserService _EndUserService var EndUserService = _EndUserService{
l: logger.Named("Service", "EndUser"),
}
func (_EndUserService) SearchEndUserRecord(reportId, keyword string, page int) ([]model.EndUserDetail, int64, error) { func (_EndUserService) SearchEndUserRecord(reportId, keyword string, page int) ([]model.EndUserDetail, int64, error) {
var conditions = make([]string, 0) var (
conditions = make([]string, 0)
endUsers = make([]model.EndUserDetail, 0)
cond = global.DB.NewSelect().Model(&endUsers)
)
conditions = append(conditions, reportId, strconv.Itoa(page)) conditions = append(conditions, reportId, strconv.Itoa(page))
cond := builder.NewCond().And(builder.Eq{"report_id": reportId}) cond = cond.Where("report_id = ?", reportId)
if len(keyword) > 0 { if len(keyword) > 0 {
cond = cond.And( keywordCond := "%" + keyword + "%"
builder.Like{"customer_name", keyword}. cond = cond.WhereGroup(" and ", func(q *bun.SelectQuery) *bun.SelectQuery {
Or(builder.Like{"contact_name", keyword}). return q.Where("customer_name like ?", keywordCond).
Or(builder.Like{"contact_phone", keyword}). WhereOr("contact_name like ?", keywordCond).
Or(builder.Like{"meter_04kv_id", keyword}), WhereOr("contact_phone like ?", keywordCond).
) WhereOr("meter_04kv_id like ?", keywordCond)
})
conditions = append(conditions, keyword) conditions = append(conditions, keyword)
} }
var (
total int64
err error
)
if cachedTotal, err := cache.RetreiveCount("end_user_detail", conditions...); cachedTotal != -1 && err == nil { if cachedTotal, err := cache.RetreiveCount("end_user_detail", conditions...); cachedTotal != -1 && err == nil {
total = cachedTotal if cachedEndUsers, _ := cache.RetreiveSearch[[]model.EndUserDetail]("end_user_detail", conditions...); cachedEndUsers != nil {
} else { return *cachedEndUsers, cachedTotal, nil
total, err = global.DBConn.
Table(&model.EndUserDetail{}).
Where(cond).
Count()
if err != nil {
return make([]model.EndUserDetail, 0), -1, err
} }
cache.CacheCount([]string{"end_user", "report", "park"}, "end_user_detail", total, conditions...)
} }
ctx, cancel := global.TimeoutContext()
defer cancel()
startItem := (page - 1) * config.ServiceSettings.ItemsPageSize startItem := (page - 1) * config.ServiceSettings.ItemsPageSize
if cachedEndUsers, _ := cache.RetreiveSearch[[]model.EndUserDetail]("end_user_detail", conditions...); cachedEndUsers != nil { total, err := cond.Limit(config.ServiceSettings.ItemsPageSize).
return *cachedEndUsers, total, nil Offset(startItem).
Order("seq asc", "meter_04kv_id asc").
ScanAndCount(ctx)
relations := []string{"end_user", "report", "park"}
for _, eu := range endUsers {
relations = append(relations, fmt.Sprintf("end_user:%s:%d", eu.ReportId, eu.MeterId))
} }
endUsers := make([]model.EndUserDetail, 0) cache.CacheCount(relations, "end_user_detail", int64(total), conditions...)
err = global.DBConn. cache.CacheSearch(endUsers, relations, "end_user_detail", conditions...)
Where(cond). return endUsers, int64(total), err
Limit(config.ServiceSettings.ItemsPageSize, startItem).
Asc("seq").
Find(&endUsers)
cache.CacheSearch(endUsers, []string{"end_user_detail", "report", "park"}, "end_user_detail", conditions...)
return endUsers, total, err
} }
func (_EndUserService) AllEndUserRecord(reportId string) ([]model.EndUserDetail, error) { func (_EndUserService) AllEndUserRecord(reportId string) ([]model.EndUserDetail, error) {
if cachedEndUsers, _ := cache.RetreiveSearch[[]model.EndUserDetail]("end_user_detail", "report", reportId); cachedEndUsers != nil { if cachedEndUsers, _ := cache.RetreiveSearch[[]model.EndUserDetail]("end_user_detail", "report", reportId); cachedEndUsers != nil {
return *cachedEndUsers, nil return *cachedEndUsers, nil
} }
ctx, cancel := global.TimeoutContext()
defer cancel()
users := make([]model.EndUserDetail, 0) users := make([]model.EndUserDetail, 0)
err := global.DBConn. err := global.DB.NewSelect().Model(&users).
Where(builder.Eq{"report_id": reportId}). Where("report_id = ?", reportId).
Asc("seq", "meter_04kv_id"). Order("seq asc", "meter_04kv_id asc").
Find(&users) Scan(ctx)
cache.CacheSearch(users, []string{"end_user_detail", "report", "park"}, "end_user_detail", "report", reportId) relations := lo.Map(users, func(eu model.EndUserDetail, _ int) string {
return fmt.Sprintf("end_user:%s:%s", eu.ReportId, eu.MeterId)
})
relations = append(relations, "report", "park")
cache.CacheSearch(users, relations, "end_user_detail", "report", reportId)
return users, err return users, err
} }
@ -88,64 +100,53 @@ func (_EndUserService) FetchSpecificEndUserRecord(reportId, parkId, meterId stri
if cachedEndUser, _ := cache.RetreiveEntity[model.EndUserDetail]("end_user_detail", fmt.Sprintf("%s_%s_%s", reportId, parkId, meterId)); cachedEndUser != nil { if cachedEndUser, _ := cache.RetreiveEntity[model.EndUserDetail]("end_user_detail", fmt.Sprintf("%s_%s_%s", reportId, parkId, meterId)); cachedEndUser != nil {
return cachedEndUser, nil return cachedEndUser, nil
} }
ctx, cancel := global.TimeoutContext()
defer cancel()
record := new(model.EndUserDetail) record := new(model.EndUserDetail)
_, err := global.DBConn. err := global.DB.NewSelect().Model(record).
ID(schemas.NewPK(reportId, parkId, meterId)). Where("report_id = ?", reportId).
NoAutoCondition(). Where("park_id = ?", parkId).
Get(record) Where("meter_04kv_id = ?", meterId).
cache.CacheEntity(record, []string{"end_user_detail", "report", "park"}, "end_user_detail", fmt.Sprintf("%s_%s_%s", reportId, parkId, meterId)) Scan(ctx)
cache.CacheEntity(record, []string{fmt.Sprintf("end_user:%s:%s", reportId, meterId), "report", "park"}, "end_user_detail", fmt.Sprintf("%s_%s_%s", reportId, parkId, meterId))
return record, err return record, err
} }
func (_EndUserService) UpdateEndUserRegisterRecord(tx *xorm.Session, record model.EndUserDetail) (err error) { func (_EndUserService) UpdateEndUserRegisterRecord(tx *bun.Tx, ctx *context.Context, record model.EndUserDetail) (err error) {
record.CalculatePeriod() record.CalculatePeriod()
if record.Initialize { updateColumns := []string{
_, err = tx.ID(schemas.NewPK(record.ReportId, record.ParkId, record.MeterId)). "current_period_overall",
Cols( "adjust_overall",
"last_period_overall", "current_period_critical",
"current_period_overall", "current_period_peak",
"adjust_overall", "current_period_flat",
"last_period_critical", "current_perios_valley",
"last_period_peak", "adjust_critical",
"last_period_flat", "adjust_peak",
"last_period_valley", "adjust_flat",
"current_period_critical", "adjust_valley",
"current_period_peak", "overall",
"current_period_flat", "critical",
"current_period_valley", "peak",
"adjust_critical", "flat",
"adjust_peak", "valley",
"adjust_flat",
"adjust_valley",
"overall",
"critical",
"peak",
"flat",
"valley",
).
Update(record)
} else {
_, err = tx.ID(schemas.NewPK(record.ReportId, record.ParkId, record.MeterId)).
Cols(
"current_period_overall",
"adjust_overall",
"current_period_critical",
"current_period_peak",
"current_period_flat",
"current_perios_valley",
"adjust_critical",
"adjust_peak",
"adjust_flat",
"adjust_valley",
"overall",
"critical",
"peak",
"flat",
"valley",
).
Update(record)
} }
cache.AbolishRelation("end_user_detail") if record.Initialize {
updateColumns = append(updateColumns,
"last_period_overall",
"last_period_critical",
"last_period_peak",
"last_period_flat",
"last_period_valley",
)
}
_, err = tx.NewUpdate().Model(&record).
WherePK().
Column(updateColumns...).
Exec(*ctx)
cache.AbolishRelation(fmt.Sprintf("end_user:%s:%s", record.ReportId, record.MeterId))
return return
} }
@ -154,6 +155,9 @@ func (_EndUserService) newVirtualExcelAnalysisError(err error) *excel.ExcelAnaly
} }
func (es _EndUserService) BatchImportNonPVRegister(reportId string, file io.Reader) *exceptions.BatchError { func (es _EndUserService) BatchImportNonPVRegister(reportId string, file io.Reader) *exceptions.BatchError {
ctx, cancel := global.TimeoutContext()
defer cancel()
errs := exceptions.NewBatchError() errs := exceptions.NewBatchError()
users, err := es.AllEndUserRecord(reportId) users, err := es.AllEndUserRecord(reportId)
if err != nil { if err != nil {
@ -161,17 +165,24 @@ func (es _EndUserService) BatchImportNonPVRegister(reportId string, file io.Read
return errs return errs
} }
reportDetail := new(model.Report) reportDetail := new(model.Report)
has, err := global.DBConn.ID(reportId).NoAutoCondition().Get(reportDetail) err = global.DB.NewSelect().Model(reportDetail).
Where("id = ?", reportId).
Scan(ctx)
if err != nil { if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err)) errs.AddError(es.newVirtualExcelAnalysisError(err))
return errs return errs
} }
if !has { if reportDetail == nil {
errs.AddError(es.newVirtualExcelAnalysisError(errors.New("未能找到相应的报表。"))) errs.AddError(es.newVirtualExcelAnalysisError(errors.New("未能找到相应的报表。")))
return errs return errs
} }
meterAppers := make([]MeterAppears, 0) meterAppers := make([]MeterAppears, 0)
err = global.DBConn.Table(new(model.EndUserDetail)).Where(builder.Eq{"park_id": reportDetail.ParkId}).Select("meter_04kv_id as meter, count(*) as appears").GroupBy("meter_04kv_id").Find(&meterAppers) err = global.DB.NewSelect().Model((*model.EndUserDetail)(nil)).
ColumnExpr("meter_04kv_id as meter").
ColumnExpr("count(*) as appears").
Where("park_id = ?", reportDetail.Id).
Group("meter_04kv_id").
Scan(ctx, &meterAppers)
if err != nil { if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err)) errs.AddError(es.newVirtualExcelAnalysisError(err))
return errs return errs
@ -196,12 +207,11 @@ func (es _EndUserService) BatchImportNonPVRegister(reportId string, file io.Read
} }
return errs return errs
} }
tx := global.DBConn.NewSession() tx, err := global.DB.BeginTx(ctx, &sql.TxOptions{})
if err = tx.Begin(); err != nil { if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err)) errs.AddError(es.newVirtualExcelAnalysisError(err))
return errs return errs
} }
defer tx.Close()
for _, im := range imports { for _, im := range imports {
if elem, ok := indexedUsers[im.MeterId]; ok { if elem, ok := indexedUsers[im.MeterId]; ok {
@ -227,7 +237,7 @@ func (es _EndUserService) BatchImportNonPVRegister(reportId string, file io.Read
elem.AdjustPeak = decimal.Zero elem.AdjustPeak = decimal.Zero
elem.AdjustValley = decimal.Zero elem.AdjustValley = decimal.Zero
elem.AdjustFlat = elem.AdjustOverall.Sub(elem.AdjustCritical).Sub(elem.AdjustPeak).Sub(elem.AdjustValley) elem.AdjustFlat = elem.AdjustOverall.Sub(elem.AdjustCritical).Sub(elem.AdjustPeak).Sub(elem.AdjustValley)
err := es.UpdateEndUserRegisterRecord(tx, elem) err := es.UpdateEndUserRegisterRecord(&tx, &ctx, elem)
if err != nil { if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err)) errs.AddError(es.newVirtualExcelAnalysisError(err))
} }
@ -250,6 +260,9 @@ func (es _EndUserService) BatchImportNonPVRegister(reportId string, file io.Read
} }
func (es _EndUserService) BatchImportPVRegister(reportId string, file io.Reader) *exceptions.BatchError { func (es _EndUserService) BatchImportPVRegister(reportId string, file io.Reader) *exceptions.BatchError {
ctx, cancel := global.TimeoutContext()
defer cancel()
errs := exceptions.NewBatchError() errs := exceptions.NewBatchError()
users, err := es.AllEndUserRecord(reportId) users, err := es.AllEndUserRecord(reportId)
if err != nil { if err != nil {
@ -257,17 +270,22 @@ func (es _EndUserService) BatchImportPVRegister(reportId string, file io.Reader)
return errs return errs
} }
reportDetail := new(model.Report) reportDetail := new(model.Report)
has, err := global.DBConn.ID(reportId).NoAutoCondition().Get(reportDetail) err = global.DB.NewSelect().Model(reportDetail).Where("id = ?", reportId).Scan(ctx)
if err != nil { if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err)) errs.AddError(es.newVirtualExcelAnalysisError(err))
return errs return errs
} }
if !has { if reportDetail != nil {
errs.AddError(es.newVirtualExcelAnalysisError(errors.New("未能找到相应的报表。"))) errs.AddError(es.newVirtualExcelAnalysisError(errors.New("未能找到相应的报表。")))
return errs return errs
} }
meterAppers := make([]MeterAppears, 0) meterAppers := make([]MeterAppears, 0)
err = global.DBConn.Table(new(model.EndUserDetail)).Where(builder.Eq{"park_id": reportDetail.ParkId}).Select("meter_04kv_id as meter, count(*) as appears").GroupBy("meter_04kv_id").Find(&meterAppers) err = global.DB.NewSelect().Model((*model.EndUserDetail)(nil)).
ColumnExpr("meter_04kv_id as meter").
ColumnExpr("count(*) as appears").
Where("park_id = ?", reportDetail.Id).
Group("meter_04kv_id").
Scan(ctx, &meterAppers)
if err != nil { if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err)) errs.AddError(es.newVirtualExcelAnalysisError(err))
return errs return errs
@ -293,12 +311,11 @@ func (es _EndUserService) BatchImportPVRegister(reportId string, file io.Reader)
return errs return errs
} }
tx := global.DBConn.NewSession() tx, err := global.DB.BeginTx(ctx, &sql.TxOptions{})
if err = tx.Begin(); err != nil { if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err)) errs.AddError(es.newVirtualExcelAnalysisError(err))
return errs return errs
} }
defer tx.Close()
for _, im := range imports { for _, im := range imports {
if elem, ok := indexedUsers[im.MeterId]; ok { if elem, ok := indexedUsers[im.MeterId]; ok {
@ -324,7 +341,7 @@ func (es _EndUserService) BatchImportPVRegister(reportId string, file io.Reader)
elem.AdjustPeak = im.AdjustPeak.Decimal elem.AdjustPeak = im.AdjustPeak.Decimal
elem.AdjustValley = im.AdjustValley.Decimal elem.AdjustValley = im.AdjustValley.Decimal
elem.AdjustFlat = elem.AdjustOverall.Sub(elem.AdjustCritical).Sub(elem.AdjustPeak).Sub(elem.AdjustValley) elem.AdjustFlat = elem.AdjustOverall.Sub(elem.AdjustCritical).Sub(elem.AdjustPeak).Sub(elem.AdjustValley)
err := es.UpdateEndUserRegisterRecord(tx, elem) err := es.UpdateEndUserRegisterRecord(&tx, &ctx, elem)
if err != nil { if err != nil {
errs.AddError(es.newVirtualExcelAnalysisError(err)) errs.AddError(es.newVirtualExcelAnalysisError(err))
} }