electricity_bill_calc_service/service/meter.go

801 lines
31 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"electricity_bill_calc/cache"
"electricity_bill_calc/excel"
"electricity_bill_calc/global"
"electricity_bill_calc/logger"
"electricity_bill_calc/model"
"electricity_bill_calc/repository"
"electricity_bill_calc/tools"
"electricity_bill_calc/types"
"electricity_bill_calc/vo"
"fmt"
"mime/multipart"
"strings"
"github.com/samber/lo"
"github.com/shopspring/decimal"
"go.uber.org/zap"
)
type _MeterService struct {
log *zap.Logger
}
var MeterService = _MeterService{
log: logger.Named("Service", "Meter"),
}
// 创建一条新的表计记录
func (ms _MeterService) CreateMeterRecord(pid string, form *vo.MeterCreationForm) error {
ms.log.Info("创建一条新的表计记录", zap.String("park id", pid))
ctx, cancel := global.TimeoutContext()
defer cancel()
tx, err := global.DB.Begin(ctx)
if err != nil {
ms.log.Error("无法启动数据库事务。", zap.Error(err))
return err
}
ok, err := repository.MeterRepository.CreateMeter(tx, ctx, pid, *form)
if err != nil {
ms.log.Error("无法创建一条新的表计记录。", zap.Error(err))
tx.Rollback(ctx)
return err
}
if !ok {
ms.log.Error("数据库未能记录新的表计记录。")
tx.Rollback(ctx)
return err
}
ok, err = repository.MeterRepository.RecordReading(tx, ctx, pid, form.Code, form.MeterType, form.Ratio, &form.MeterReadingForm)
if err != nil {
ms.log.Error("无法记录表计读数。", zap.Error(err))
tx.Rollback(ctx)
return err
}
if !ok {
ms.log.Error("数据库未能记录表计读数。")
tx.Rollback(ctx)
return err
}
err = tx.Commit(ctx)
if err != nil {
ms.log.Error("未能成功提交数据库事务。", zap.Error(err))
tx.Rollback(ctx)
return err
}
cache.AbolishRelation(fmt.Sprintf("meter:%s", pid))
return nil
}
// 更新指定表计的信息
func (ms _MeterService) UpdateMeterRecord(pid string, code string, form *vo.MeterModificationForm) error {
ms.log.Info("更新指定表计的信息", zap.String("park id", pid), zap.String("meter code", code))
ctx, cancel := global.TimeoutContext()
defer cancel()
tx, err := global.DB.Begin(ctx)
if err != nil {
ms.log.Error("无法启动数据库事务。", zap.Error(err))
return err
}
ok, err := repository.MeterRepository.UpdateMeter(tx, ctx, pid, code, form)
if err != nil {
ms.log.Error("无法更新指定表计的信息。", zap.Error(err))
tx.Rollback(ctx)
return err
}
if !ok {
ms.log.Error("数据库未能更新指定表计的信息。")
tx.Rollback(ctx)
return err
}
err = tx.Commit(ctx)
if err != nil {
ms.log.Error("未能成功提交数据库事务。", zap.Error(err))
tx.Rollback(ctx)
return err
}
cache.AbolishRelation(fmt.Sprintf("meter:%s", pid))
return nil
}
// 处理上传的Excel格式表计档案文件根据表号自动更新数据库
func (ms _MeterService) BatchImportMeters(pid string, file *multipart.FileHeader) ([]excel.ExcelAnalysisError, error) {
ms.log.Info("处理上传的Excel格式表计档案文件", zap.String("park id", pid))
ctx, cancel := global.TimeoutContext(10)
defer cancel()
archiveFile, err := file.Open()
if err != nil {
ms.log.Error("无法打开上传的Excel格式表计档案文件。", zap.Error(err))
return make([]excel.ExcelAnalysisError, 0), fmt.Errorf("无法打开上传的文件,%w", err)
}
analyzer, err := excel.NewMeterArchiveExcelAnalyzer(archiveFile)
if err != nil {
ms.log.Error("无法根据上传的 Excel 文件创建表计档案分析器。", zap.Error(err))
return make([]excel.ExcelAnalysisError, 0), fmt.Errorf("无法创建表计档案解析器,%w", err)
}
records, errs := analyzer.Analysis(*new(model.MeterImportRow))
if len(errs) > 0 {
ms.log.Error("表计档案分析器在解析上传的 Excel 文件时发生错误。", zap.Int("error count", len(errs)))
return errs, fmt.Errorf("表计档案分析器在解析上传的 Excel 文件时发生错误。")
}
// 步骤1对目前已经解析到的数据进行重复检测记录重复内容并直接返回
var codeStat = make(map[string]int, 0)
for _, record := range records {
if _, ok := codeStat[record.Code]; !ok {
codeStat[record.Code] = 0
}
codeStat[record.Code]++
}
duplicatedCodes := make([]string, 0)
for code, count := range codeStat {
if count > 1 {
duplicatedCodes = append(duplicatedCodes, code)
}
}
if len(duplicatedCodes) > 0 {
ms.log.Error("表计档案分析器在解析上传的 Excel 文件时发现重复的表计编号。", zap.Strings("duplicated codes", duplicatedCodes))
return []excel.ExcelAnalysisError{
{Row: 0, Col: 0, Err: excel.AnalysisError{Err: fmt.Errorf("表计档案分析器在解析上传的 Excel 文件时发现重复的表计编号。(%s)", strings.Join(duplicatedCodes, ", "))}},
}, fmt.Errorf("表计档案分析器在解析上传的 Excel 文件时发现重复的表计编号。(%s)", strings.Join(duplicatedCodes, ", "))
}
// 步骤2获取指定园区下的所有建筑信息
buildings, err := repository.ParkRepository.RetrieveParkBuildings(pid)
if err != nil {
ms.log.Error("无法获取指定园区下的所有建筑信息。", zap.Error(err))
return make([]excel.ExcelAnalysisError, 0), fmt.Errorf("无法获取指定园区下的所有建筑信息,%w", err)
}
buildingNames := lo.Map(buildings, func(element *model.ParkBuilding, _ int) string {
return element.Name
})
// 步骤2.1:获取表计档案中出现的所有建筑,并对档案中新出现的建筑进行创建操作
unexistsBuildingNames := make([]string, 0)
for _, record := range records {
if !lo.Contains(buildingNames, *record.Building) {
unexistsBuildingNames = append(unexistsBuildingNames, *record.Building)
}
}
tx, err := global.DB.Begin(ctx)
if err != nil {
ms.log.Error("无法在自动导入建筑阶段启动数据库事务。", zap.Error(err))
return make([]excel.ExcelAnalysisError, 0), fmt.Errorf("无法在自动导入建筑阶段启动数据库事务,%w", err)
}
for _, name := range unexistsBuildingNames {
_, err := repository.ParkRepository.CreateParkBuildingWithTransaction(tx, ctx, pid, name, nil)
if err != nil {
ms.log.Error("无法在自动导入建筑阶段创建新的建筑。", zap.String("building name", name), zap.Error(err))
tx.Rollback(ctx)
return make([]excel.ExcelAnalysisError, 0), fmt.Errorf("无法在自动导入建筑阶段创建新的建筑,%w", err)
}
}
err = tx.Commit(ctx)
if err != nil {
ms.log.Error("无法在自动导入建筑阶段提交数据库事务。", zap.Error(err))
tx.Rollback(ctx)
return make([]excel.ExcelAnalysisError, 0), fmt.Errorf("无法在自动导入建筑阶段提交数据库事务,%w", err)
}
buildings, err = repository.ParkRepository.RetrieveParkBuildings(pid)
if err != nil {
ms.log.Error("无法重新获取指定园区下的所有建筑信息。", zap.Error(err))
return make([]excel.ExcelAnalysisError, 0), fmt.Errorf("无法重新获取指定园区下的所有建筑信息,%w", err)
}
// 步骤2.3检测并替换表计档案中的建筑ID
for _, record := range records {
for _, building := range buildings {
if building.Name == *record.Building {
record.Building = &building.Id
break
}
}
}
// 步骤3启动数据库事务直接构建表计插入语句但提供On Conflict Do Update功能
tx, err = global.DB.Begin(ctx)
if err != nil {
ms.log.Error("无法启动数据插入阶段的数据库事务。", zap.Error(err))
return make([]excel.ExcelAnalysisError, 0), fmt.Errorf("无法启动数据插入阶段的数据库事务,%w", err)
}
var meterCreationForms = make([]vo.MeterCreationForm, 0)
for row, element := range records {
if element.MeterType != nil {
meterType, err := model.ParseMeterInstallationType(*element.MeterType)
if err != nil {
ms.log.Error("无法识别表计类型。", zap.Int("record_index", row), zap.Error(err))
errs = append(errs, excel.ExcelAnalysisError{
Row: row + 1,
Col: 3,
Err: excel.AnalysisError{
Err: fmt.Errorf("表计类型无法识别"),
},
})
}
meterCreationForms = append(meterCreationForms, vo.MeterCreationForm{
Code: element.Code,
Address: element.Address,
MeterType: meterType,
Ratio: element.Ratio,
Seq: element.Seq,
Enabled: true,
Building: element.Building,
OnFloor: element.OnFloor,
Area: element.Area,
MeterReadingForm: vo.MeterReadingForm{
ReadAt: &element.ReadAt,
Overall: element.Overall,
Critical: element.Critical.Decimal,
Peak: element.Peak.Decimal,
Flat: element.Flat.Decimal,
Valley: element.Valley.Decimal,
},
})
} else {
ms.log.Error("表计类型不能为空。", zap.Int("record_index", row))
errs = append(errs, excel.ExcelAnalysisError{
Row: row + 1,
Col: 3,
Err: excel.AnalysisError{
Err: fmt.Errorf("表计类型不能为空"),
},
})
}
}
if len(errs) > 0 {
ms.log.Error("表计档案分析器在解析上传的 Excel 文件时发生错误。", zap.Int("error count", len(errs)))
tx.Rollback(ctx)
return errs, fmt.Errorf("表计档案分析器在解析上传的 Excel 文件时发生错误。")
}
for _, record := range meterCreationForms {
_, err := repository.MeterRepository.CreateOrUpdateMeter(tx, ctx, pid, record)
if err != nil {
ms.log.Error("无法在数据插入阶段创建或更新表计。", zap.String("meter code", record.Code), zap.Error(err))
tx.Rollback(ctx)
return make([]excel.ExcelAnalysisError, 0), fmt.Errorf("无法在数据插入阶段创建或更新表计,%w", err)
}
}
// 步骤5将全部抄表信息保存进入数据库
for _, record := range meterCreationForms {
_, err := repository.MeterRepository.RecordReading(tx, ctx, pid, record.Code, record.MeterType, record.Ratio, &record.MeterReadingForm)
if err != nil {
ms.log.Error("无法在数据插入阶段保存抄表信息。", zap.String("meter code", record.Code), zap.Error(err))
tx.Rollback(ctx)
return make([]excel.ExcelAnalysisError, 0), fmt.Errorf("无法在数据插入阶段保存抄表信息,%w", err)
}
}
// 步骤6执行事务更新数据库
err = tx.Commit(ctx)
if err != nil {
ms.log.Error("无法在数据插入阶段提交数据库事务。", zap.Error(err))
tx.Rollback(ctx)
return make([]excel.ExcelAnalysisError, 0), fmt.Errorf("无法在数据插入阶段提交数据库事务,%w", err)
}
return make([]excel.ExcelAnalysisError, 0), nil
}
// 更换系统中的表计
func (ms _MeterService) ReplaceMeter(
pid string,
oldMeterCode string,
oldMeterReading *vo.MeterReadingForm,
newMeterCode string,
newMeterRatio decimal.Decimal,
newMeterReading *vo.MeterReadingForm,
) error {
ms.log.Info("更换系统中的表计", zap.String("park id", pid), zap.String("old meter code", oldMeterCode), zap.String("new meter code", newMeterCode))
ctx, cancel := global.TimeoutContext()
defer cancel()
tx, err := global.DB.Begin(ctx)
if err != nil {
ms.log.Error("无法启动数据库事务。", zap.Error(err))
return err
}
// 步骤1读取旧表信息
oldMeter, err := repository.MeterRepository.FetchMeterDetail(pid, oldMeterCode)
if err != nil {
ms.log.Error("无法读取旧表信息。", zap.Error(err))
tx.Rollback(ctx)
return fmt.Errorf("要替换的旧表计不存在:%w", err)
}
// 步骤2写入旧表读数
ok, err := repository.MeterRepository.RecordReading(tx, ctx, pid, oldMeterCode, oldMeter.MeterType, oldMeter.Ratio, oldMeterReading)
switch {
case err != nil:
ms.log.Error("无法写入旧表读数。", zap.Error(err))
tx.Rollback(ctx)
return err
case !ok:
ms.log.Error("数据库未能写入旧表读数。")
tx.Rollback(ctx)
return fmt.Errorf("旧表计读数未能成功保存到数据库。")
}
// 步骤3从系统移除旧表计
ok, err = repository.MeterRepository.DetachMeter(tx, ctx, pid, oldMeterCode)
switch {
case err != nil:
ms.log.Error("无法从系统移除旧表计。", zap.Error(err))
tx.Rollback(ctx)
return err
case !ok:
ms.log.Error("未能从系统移除旧表计。")
tx.Rollback(ctx)
return fmt.Errorf("旧表计未能成功从系统移除。")
}
// 步骤4获取旧表计的关联信息
var oldRelations []*model.MeterRelation
switch oldMeter.MeterType {
case model.METER_INSTALLATION_POOLING:
oldRelations, err = repository.MeterRepository.ListPooledMeterRelations(pid, oldMeterCode)
if err != nil {
ms.log.Error("无法获取旧表计的关联信息。", zap.Error(err))
tx.Rollback(ctx)
return err
}
default:
oldRelations, err = repository.MeterRepository.ListMeterRelations(pid, oldMeterCode)
if err != nil {
ms.log.Error("无法获取旧表计的关联信息。", zap.Error(err))
tx.Rollback(ctx)
return err
}
}
// 步骤5将旧表计的关联信息设置为解除
for _, relation := range oldRelations {
ok, err = repository.MeterRepository.UnbindMeter(tx, ctx, pid, relation.MasterMeter, relation.SlaveMeter)
switch {
case err != nil:
ms.log.Error("无法将旧表计的关联信息设置为解除。", zap.String("master meter", relation.MasterMeter), zap.String("slave meter", relation.SlaveMeter), zap.Error(err))
tx.Rollback(ctx)
return err
case !ok:
ms.log.Error("未能将旧表计的关联信息设置为解除。", zap.String("master meter", relation.MasterMeter), zap.String("slave meter", relation.SlaveMeter))
tx.Rollback(ctx)
return fmt.Errorf("旧表计的关联信息未能成功设置为解除。")
}
}
// 步骤6将旧表计的部分信息赋予新表计
newMeterCreationForm := vo.MeterCreationForm{
Code: newMeterCode,
Address: oldMeter.Address,
MeterType: oldMeter.MeterType,
Ratio: newMeterRatio,
Seq: oldMeter.Seq,
Enabled: oldMeter.Enabled,
Building: oldMeter.Building,
OnFloor: oldMeter.OnFloor,
Area: oldMeter.Area,
MeterReadingForm: *newMeterReading,
}
// 步骤7将新表计写入系统
ok, err = repository.MeterRepository.CreateMeter(tx, ctx, pid, newMeterCreationForm)
switch {
case err != nil:
ms.log.Error("无法将新表计写入系统。", zap.Error(err))
tx.Rollback(ctx)
return err
case !ok:
ms.log.Error("未能将新表计写入系统。")
tx.Rollback(ctx)
return fmt.Errorf("新表计未能成功写入系统。")
}
// 步骤8将新表计的读数写入系统
ok, err = repository.MeterRepository.RecordReading(tx, ctx, pid, newMeterCode, newMeterCreationForm.MeterType, newMeterCreationForm.Ratio, &newMeterCreationForm.MeterReadingForm)
switch {
case err != nil:
ms.log.Error("无法将新表计的读数写入系统。", zap.Error(err))
tx.Rollback(ctx)
return err
case !ok:
ms.log.Error("未能将新表计的读数写入系统。")
tx.Rollback(ctx)
return fmt.Errorf("新表计的读数未能成功写入系统。")
}
// 步骤9将旧表计的关联信息复制一份赋予新表计
switch oldMeter.MeterType {
case model.METER_INSTALLATION_POOLING:
for _, relation := range oldRelations {
ok, err = repository.MeterRepository.BindMeter(tx, ctx, pid, newMeterCode, relation.SlaveMeter)
switch {
case err != nil:
ms.log.Error("无法将旧表计的关联信息赋予新表计。", zap.String("master meter", newMeterCode), zap.String("slave meter", relation.SlaveMeter), zap.Error(err))
tx.Rollback(ctx)
return err
case !ok:
ms.log.Error("未能将旧表计的关联信息赋予新表计。", zap.String("master meter", newMeterCode), zap.String("slave meter", relation.SlaveMeter))
tx.Rollback(ctx)
return fmt.Errorf("旧表计的关联信息未能成功赋予新表计。")
}
}
default:
for _, relation := range oldRelations {
ok, err = repository.MeterRepository.BindMeter(tx, ctx, pid, relation.MasterMeter, newMeterCode)
switch {
case err != nil:
ms.log.Error("无法将旧表计的关联信息赋予新表计。", zap.String("master meter", relation.MasterMeter), zap.String("slave meter", newMeterCode), zap.Error(err))
tx.Rollback(ctx)
return err
case !ok:
ms.log.Error("未能将旧表计的关联信息赋予新表计。", zap.String("master meter", relation.MasterMeter), zap.String("slave meter", newMeterCode))
tx.Rollback(ctx)
return fmt.Errorf("旧表计的关联信息未能成功赋予新表计。")
}
}
}
// 步骤10提交事务
err = tx.Commit(ctx)
if err != nil {
ms.log.Error("未能成功提交数据库事务。", zap.Error(err))
tx.Rollback(ctx)
return err
}
cache.AbolishRelation(fmt.Sprintf("meter:%s", pid))
return nil
}
// 列出园区中指定公摊表计下的所有关联表计
func (ms _MeterService) ListPooledMeterRelations(pid, masterMeter string) ([]*model.MeterDetail, error) {
ms.log.Info("列出园区中指定公摊表计下的所有关联表计", zap.String("park id", pid), zap.String("meter code", masterMeter))
relations, err := repository.MeterRepository.ListPooledMeterRelations(pid, masterMeter)
if err != nil {
ms.log.Error("无法列出园区中指定公摊表计下的所有关联关系。", zap.Error(err))
return make([]*model.MeterDetail, 0), err
}
relatedMeterCodes := lo.Map(relations, func(element *model.MeterRelation, _ int) string {
return element.SlaveMeter
})
meters, err := repository.MeterRepository.ListMetersByIDs(pid, relatedMeterCodes)
if err != nil {
ms.log.Error("无法列出园区中指定公摊表计下的所有关联表计详细信息。", zap.Error(err))
return make([]*model.MeterDetail, 0), err
}
return meters, nil
}
// 列出指定园区中所有的公摊表计
func (ms _MeterService) SearchPooledMetersDetail(pid string, page uint, keyword *string) ([]*model.PooledMeterDetailCompound, int64, error) {
ms.log.Info("列出指定园区中所有的公摊表计", zap.String("park id", pid), zap.Uint("page", page), zap.String("keyword", *keyword))
cacheConditions := []string{
pid,
fmt.Sprintf("%d", page),
tools.DefaultTo(keyword, "UNDEFINED"),
}
if meters, total, err := cache.RetrievePagedSearch[[]*model.PooledMeterDetailCompound]("assemble_pooled_meters_detail", cacheConditions...); err == nil {
ms.log.Info("已经从缓存中获取到了指定园区中所有的公摊表计。", zap.Int("count", len(*meters)), zap.Int64("total", total))
return *meters, total, nil
}
poolingMeters, total, err := repository.MeterRepository.ListPoolingMeters(pid, page, keyword)
if err != nil {
ms.log.Error("无法列出指定园区中所有的公摊表计。", zap.Error(err))
return make([]*model.PooledMeterDetailCompound, 0), 0, err
}
poolingMeterIds := lo.Map(poolingMeters, func(element *model.MeterDetail, _ int) string {
return element.Code
})
relations, err := repository.MeterRepository.ListPooledMeterRelationsByCodes(pid, poolingMeterIds)
if err != nil {
ms.log.Error("无法列出指定园区中所有的公摊表计关联关系。", zap.Error(err))
return make([]*model.PooledMeterDetailCompound, 0), 0, err
}
slaveMeters, err := repository.MeterRepository.ListMetersByIDs(pid, lo.Map(relations, func(element *model.MeterRelation, _ int) string {
return element.SlaveMeter
}))
if err != nil {
ms.log.Error("无法列出指定园区中所有的公摊表计的关联表计详细信息。", zap.Error(err))
return make([]*model.PooledMeterDetailCompound, 0), 0, err
}
var assembled []*model.PooledMeterDetailCompound = make([]*model.PooledMeterDetailCompound, 0)
for _, meter := range poolingMeters {
slaveIDs := lo.Map(lo.Filter(
relations,
func(element *model.MeterRelation, _ int) bool {
return element.MasterMeter == meter.Code
}),
func(element *model.MeterRelation, _ int) string {
return element.SlaveMeter
},
)
slaves := lo.Map(lo.Filter(
slaveMeters,
func(element *model.MeterDetail, _ int) bool {
return lo.Contains(slaveIDs, element.Code)
}),
func(element *model.MeterDetail, _ int) model.MeterDetail {
return *element
},
)
assembled = append(assembled, &model.PooledMeterDetailCompound{
MeterDetail: *meter,
BindMeters: slaves,
})
}
cache.CachePagedSearch(assembled, total, []string{fmt.Sprintf("meter:%s", pid), fmt.Sprintf("meter_relation:%s", pid)}, "assemble_pooled_meter_detail", cacheConditions...)
return assembled, total, nil
}
// 批量向园区中指定公摊表计下绑定关联表计
func (ms _MeterService) BindMeter(pid, masterMeter string, slaveMeters []string) (bool, error) {
ms.log.Info("批量向园区中指定公摊表计下绑定关联表计", zap.String("park id", pid), zap.String("master meter", masterMeter), zap.Strings("slave meters", slaveMeters))
ctx, cancel := global.TimeoutContext()
defer cancel()
tx, err := global.DB.Begin(ctx)
if err != nil {
ms.log.Error("无法启动数据库事务。", zap.Error(err))
return false, err
}
for _, slave := range slaveMeters {
ok, err := repository.MeterRepository.BindMeter(tx, ctx, pid, masterMeter, slave)
switch {
case err != nil:
ms.log.Error("无法向园区中指定公摊表计下绑定关联表计。", zap.String("master meter", masterMeter), zap.String("slave meter", slave), zap.Error(err))
tx.Rollback(ctx)
return false, err
case !ok:
ms.log.Error("未能向园区中指定公摊表计下绑定关联表计。", zap.String("master meter", masterMeter), zap.String("slave meter", slave))
tx.Rollback(ctx)
return false, fmt.Errorf("未能成功向园区中指定公摊表计下绑定关联表计。")
}
}
err = tx.Commit(ctx)
if err != nil {
ms.log.Error("未能成功提交数据库事务。", zap.Error(err))
tx.Rollback(ctx)
return false, err
}
cache.AbolishRelation(fmt.Sprintf("meter:%s", pid))
return true, nil
}
// 批量解绑园区中指定表计下的指定表计
func (ms _MeterService) UnbindMeter(pid, masterMeter string, slaveMeters []string) (bool, error) {
ms.log.Info("批量解绑园区中指定表计下的指定表计", zap.String("park id", pid), zap.String("master meter", masterMeter), zap.Strings("slave meters", slaveMeters))
ctx, cancel := global.TimeoutContext()
defer cancel()
tx, err := global.DB.Begin(ctx)
if err != nil {
ms.log.Error("无法启动数据库事务。", zap.Error(err))
return false, err
}
for _, slave := range slaveMeters {
ok, err := repository.MeterRepository.UnbindMeter(tx, ctx, pid, masterMeter, slave)
switch {
case err != nil:
ms.log.Error("无法解绑园区中指定表计下的指定表计。", zap.String("master meter", masterMeter), zap.String("slave meter", slave), zap.Error(err))
tx.Rollback(ctx)
return false, err
case !ok:
ms.log.Error("未能解绑园区中指定表计下的指定表计。", zap.String("master meter", masterMeter), zap.String("slave meter", slave))
tx.Rollback(ctx)
return false, fmt.Errorf("未能成功解绑园区中指定表计下的指定表计。")
}
}
err = tx.Commit(ctx)
if err != nil {
ms.log.Error("未能成功提交数据库事务。", zap.Error(err))
tx.Rollback(ctx)
return false, err
}
cache.AbolishRelation(fmt.Sprintf("meter:%s", pid))
return true, nil
}
// 查询符合条件的表计读数记录
func (ms _MeterService) SearchMeterReadings(pid string, building *string, start, end *types.Date, page uint, keyword *string) ([]*model.DetailedMeterReading, int64, error) {
ms.log.Info(
"查询符合条件的表计读数记录",
zap.String("park id", pid),
zap.Stringp("building", building),
logger.DateFieldp("start", start),
logger.DateFieldp("end", end),
zap.Uint("page", page),
zap.Stringp("keyword", keyword),
)
readings, total, err := repository.MeterRepository.ListMeterReadings(pid, keyword, page, start, end, building)
if err != nil {
ms.log.Error("无法查询符合条件的表计读数记录。", zap.Error(err))
return make([]*model.DetailedMeterReading, 0), 0, err
}
meterCodes := lo.Map(readings, func(element *model.MeterReading, _ int) string {
return element.Meter
})
meterDetails, err := repository.MeterRepository.ListMetersByIDs(pid, meterCodes)
if err != nil {
ms.log.Error("无法查询符合条件的表计读数记录的表计详细信息。", zap.Error(err))
return make([]*model.DetailedMeterReading, 0), 0, err
}
assembles := lo.Map(
readings,
func(element *model.MeterReading, _ int) *model.DetailedMeterReading {
meter, _ := lo.Find(meterDetails, func(detail *model.MeterDetail) bool {
return detail.Code == element.Meter
})
return &model.DetailedMeterReading{
Detail: *meter,
Reading: *element,
}
},
)
return assembles, total, nil
}
// 创建一条新的表计抄表记录
func (ms _MeterService) RecordReading(pid, meterCode string, form *vo.MeterReadingForm) error {
ms.log.Info("创建一条新的表计抄表记录", zap.String("park id", pid), zap.String("meter code", meterCode))
meter, err := repository.MeterRepository.FetchMeterDetail(pid, meterCode)
if err != nil || meter == nil {
ms.log.Error("无法找到指定的表计", zap.Error(err))
return fmt.Errorf("无法找到指定的表计:%w", err)
}
ctx, cancel := global.TimeoutContext()
defer cancel()
tx, err := global.DB.Begin(ctx)
if err != nil {
ms.log.Error("无法启动数据库事务。", zap.Error(err))
return err
}
ok, err := repository.MeterRepository.RecordReading(tx, ctx, pid, meterCode, meter.MeterType, meter.Ratio, form)
if err != nil {
ms.log.Error("无法创建一条新的表计抄表记录。", zap.Error(err))
tx.Rollback(ctx)
return err
}
if !ok {
ms.log.Error("未能创建一条新的表计抄表记录。")
tx.Rollback(ctx)
return fmt.Errorf("未能成功创建一条新的表计抄表记录。")
}
err = tx.Commit(ctx)
if err != nil {
ms.log.Error("未能成功提交数据库事务。", zap.Error(err))
tx.Rollback(ctx)
return err
}
return nil
}
// 处理上传的Excel格式的表计抄表记录所有满足审查条件的记录都将被保存到数据库中。
// 无论峰谷表计还是普通表计,只要抄表记录中不存在峰谷数据,都将自动使用平段配平。
func (ms _MeterService) BatchImportReadings(pid string, file *multipart.FileHeader) ([]excel.ExcelAnalysisError, error) {
ms.log.Info("处理上传的Excel格式的表计抄表记录", zap.String("park id", pid))
ctx, cancel := global.TimeoutContext()
defer cancel()
// 步骤1将解析到的数据转换成创建表单数据
activeFile, err := file.Open()
if err != nil {
ms.log.Error("无法打开上传的抄表数据文件。", zap.Error(err))
return make([]excel.ExcelAnalysisError, 0), fmt.Errorf("无法打开上传的抄表数据文件,%w", err)
}
analyzer, err := excel.NewMeterReadingsExcelAnalyzer(activeFile)
if err != nil {
ms.log.Error("无法根据上传的 Excel 文件创建表计抄表数据解析器。", zap.Error(err))
return make([]excel.ExcelAnalysisError, 0), fmt.Errorf("无法根据上传的 Excel 文件创建表计抄表数据解析器,%w", err)
}
records, errs := analyzer.Analysis(*new(model.ReadingImportRow))
if len(errs) > 0 {
ms.log.Error("表计抄表数据解析器在解析上传的 Excel 文件时发生错误。", zap.Int("error count", len(errs)))
return errs, fmt.Errorf("表计抄表数据解析器在解析上传的 Excel 文件时发生错误。")
}
ms.log.Debug("已经解析到的上传数据", zap.Any("records", records))
// 步骤2对目前已经解析到的数据进行合法性检测检测包括表计编号在同一抄表时间是否重复
var collectRecords = make(map[types.DateTime][]string, 0)
for _, record := range records {
if _, ok := collectRecords[record.ReadAt]; !ok {
collectRecords[record.ReadAt] = []string{}
}
collectRecords[record.ReadAt] = append(collectRecords[record.ReadAt], record.Code)
}
for readAt, codes := range collectRecords {
valCounts := lo.CountValues(codes)
for code, count := range valCounts {
if count > 1 {
errs = append(errs, excel.ExcelAnalysisError{
Row: 0,
Col: 0,
Err: excel.AnalysisError{
Err: fmt.Errorf("表计编号 %s 在同一抄表时间 %s 内重复出现 %d 次", code, readAt.ToString(), count),
},
})
}
}
}
if len(errs) > 0 {
ms.log.Error("表计抄表数据解析器在解析上传的 Excel 文件时发生错误。", zap.Int("error count", len(errs)))
return errs, fmt.Errorf("表计抄表数据解析器在解析上传的 Excel 文件时发生错误。")
}
// 步骤3从数据库中获取当前园区中已有的表计编号
meters, err := repository.MeterRepository.AllMeters(pid)
if err != nil {
ms.log.Error("无法从数据库中获取当前园区中已有的表计编号。", zap.Error(err))
return make([]excel.ExcelAnalysisError, 0), fmt.Errorf("无法从数据库中获取当前园区中已有的表计编号,%w", err)
}
// 步骤4.0:启动数据库事务
tx, err := global.DB.Begin(ctx)
if err != nil {
ms.log.Error("无法启动数据库事务。", zap.Error(err))
return make([]excel.ExcelAnalysisError, 0), fmt.Errorf("无法启动数据库事务,%w", err)
}
// 步骤4.1:对比检查数据库中的表计编号与上传文件中的表计编号是否存在差异。非差异内容将直接保存
for row, record := range records {
meter, exists := lo.Find(meters, func(element *model.MeterDetail) bool {
return element.Code == record.Code
})
if exists {
// 步骤4.1.1:抄表的表计在数据库中已经存在,可以直接保存起数据。
_, err := repository.MeterRepository.RecordReading(tx, ctx, pid, record.Code, meter.MeterType, meter.Ratio, &vo.MeterReadingForm{
ReadAt: lo.ToPtr(record.ReadAt),
Overall: record.Overall,
Critical: record.Critical.Decimal,
Peak: record.Peak.Decimal,
Flat: record.Overall.Sub(record.Peak.Decimal).Sub(record.Valley.Decimal).Sub(record.Critical.Decimal),
Valley: record.Valley.Decimal,
})
if err != nil {
ms.log.Error("无法在数据插入阶段保存抄表信息。", zap.String("meter code", record.Code), zap.Error(err))
errs = append(errs, excel.ExcelAnalysisError{
Row: row + 1,
Col: 0,
Err: excel.AnalysisError{
Err: fmt.Errorf("无法在数据插入阶段保存抄表信息,%w", err),
},
})
}
} else {
// 步骤4.1.2:抄表表计在数据库中不存在,需要将其记录进入错误。
errs = append(errs, excel.ExcelAnalysisError{
Row: row + 1,
Col: 0,
Err: excel.AnalysisError{
Err: fmt.Errorf("表计编号 %s 在系统中不存在", record.Code),
},
})
}
}
// 步骤4.3:如果批处理过程中存在错误,撤销全部导入动作。
if len(errs) > 0 {
ms.log.Error("表计抄表数据解析器在解析上传的 Excel 文件时发生错误。", zap.Int("error count", len(errs)))
tx.Rollback(ctx)
return errs, fmt.Errorf("表计抄表数据解析器在解析上传的 Excel 文件时发生错误。")
}
// 步骤5执行事务更新数据库获取完成更改的行数。
err = tx.Commit(ctx)
if err != nil {
ms.log.Error("无法在数据插入阶段提交数据库事务。", zap.Error(err))
tx.Rollback(ctx)
return make([]excel.ExcelAnalysisError, 0), fmt.Errorf("无法在数据插入阶段提交数据库事务,%w", err)
}
return make([]excel.ExcelAnalysisError, 0), nil
}