diff --git a/.idea/modules.xml b/.idea/modules.xml index 4ea8900..95c0f54 100644 --- a/.idea/modules.xml +++ b/.idea/modules.xml @@ -2,11 +2,7 @@ -<<<<<<< HEAD -======= - ->>>>>>> 8163fd99ee4261a308318b55be08e30d123776e3 \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 828619c..94a25f7 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,10 +1,6 @@ -<<<<<<< HEAD -======= - ->>>>>>> 8163fd99ee4261a308318b55be08e30d123776e3 \ No newline at end of file diff --git a/controller/report.go b/controller/report.go index 6d70c44..c7f8c3e 100644 --- a/controller/report.go +++ b/controller/report.go @@ -144,6 +144,7 @@ func initiateCalculateTask(c *fiber.Ctx) error { reportLog.Error("无法启动核算报表计算任务", zap.Error(err)) return result.Error(fiber.StatusInternalServerError, "无法启动核算报表计算任务。") } + // return result.Success("已经成功启动核算报表计算任务。") } diff --git a/global/redis.go b/global/redis.go index d28d8a9..8afb039 100644 --- a/global/redis.go +++ b/global/redis.go @@ -18,9 +18,10 @@ func SetupRedisConnection() error { a := fmt.Sprintf("%s:%d", config.RedisSettings.Host, config.RedisSettings.Port) fmt.Println(a) Rd, err = rueidis.NewClient(rueidis.ClientOption{ - InitAddress: []string{fmt.Sprintf("%s:%d", config.RedisSettings.Host, config.RedisSettings.Port)}, - Password: config.RedisSettings.Password, + InitAddress: []string{"127.0.0.1:6379"}, + Password: "", SelectDB: config.RedisSettings.DB, + DisableCache:true, }) if err != nil { return err diff --git a/repository/calculate.go b/repository/calculate.go index eed635b..070257a 100644 --- a/repository/calculate.go +++ b/repository/calculate.go @@ -32,6 +32,35 @@ var CalculateRepository = _CalculateRepository{ ds: goqu.Dialect("postgres"), } +//更新当前报表的核算状态 +func (cr _CalculateRepository) UpdateReportCalculateStatus(rid string, status string, + message string) (bool, error) { + ctx, cancel := global.TimeoutContext() + defer cancel() + + currentTime := time.Now() + + updateResultSql, updateResultArgs, _ := cr.ds. + Update(goqu.T("report_task")). + Set(goqu.Record{ + "status": status, + "last_modified_at": currentTime, + "message": message, + }).Where(goqu.I("id").Eq(rid)). + ToSQL() + + res, err := global.DB.Exec(ctx, updateResultSql, updateResultArgs...) + if err != nil { + cr.log.Error("未能更新当前报表的核算状态", zap.Error(err)) + return false, err + } + if res.RowsAffected() == 0 { + cr.log.Warn("未能保存当前报表的核算状态", zap.String("Report", rid)) + return false, nil + } + return true, nil +} + // 获取当前正在等待计算的核算任务ID列表 func (cr _CalculateRepository) ListPendingTasks() ([]string, error) { cr.log.Info("获取当前正在等待计算的核算任务ID列表") diff --git a/repository/report.go b/repository/report.go index b8a4478..74ba8fc 100644 --- a/repository/report.go +++ b/repository/report.go @@ -100,7 +100,7 @@ func (rr _ReportRepository) GetReportIndex(rid string) (*model.ReportIndex, erro } // 为指园区创建一个新的核算报表 -func (rr _ReportRepository) CreateReport(form *vo.ReportCreationForm) (bool, error) { +func (rr _ReportRepository) CreateReport(form *vo.ReportCreationForm) (bool, string, error) { rr.log.Info("为指定园区创建一个新的核算报表", zap.String("Park", form.Park)) ctx, cancel := global.TimeoutContext() defer cancel() @@ -108,13 +108,13 @@ func (rr _ReportRepository) CreateReport(form *vo.ReportCreationForm) (bool, err tx, err := global.DB.Begin(ctx) if err != nil { rr.log.Error("未能开始一个数据库事务", zap.Error(err)) - return false, err + return false, "", err } park, err := ParkRepository.RetrieveParkDetail(form.Park) if err != nil || park == nil { rr.log.Error("未能获取指定园区的详细信息", zap.Error(err)) tx.Rollback(ctx) - return false, exceptions.NewNotFoundErrorFromError("未能获取指定园区的详细信息", err) + return false, "", exceptions.NewNotFoundErrorFromError("未能获取指定园区的详细信息", err) } createTime := types.Now() periodRange := types.NewDateRange(&form.PeriodBegin, &form.PeriodEnd) @@ -176,42 +176,42 @@ func (rr _ReportRepository) CreateReport(form *vo.ReportCreationForm) (bool, err if err != nil { rr.log.Error("创建核算报表索引时出现错误", zap.Error(err)) tx.Rollback(ctx) - return false, err + return false, "", err } if resIndex.RowsAffected() == 0 { rr.log.Error("保存核算报表索引时出现错误", zap.Error(err)) tx.Rollback(ctx) - return false, exceptions.NewUnsuccessCreateError("创建核算报表索引时出现错误") + return false, "", exceptions.NewUnsuccessCreateError("创建核算报表索引时出现错误") } resSummary, err := tx.Exec(ctx, summarySql, summaryArgs...) if err != nil { rr.log.Error("创建核算报表汇总时出现错误", zap.Error(err)) tx.Rollback(ctx) - return false, err + return false, "", err } if resSummary.RowsAffected() == 0 { rr.log.Error("保存核算报表汇总时出现错误", zap.Error(err)) tx.Rollback(ctx) - return false, exceptions.NewUnsuccessCreateError("创建核算报表汇总时出现错误") + return false, "", exceptions.NewUnsuccessCreateError("创建核算报表汇总时出现错误") } resTask, err := tx.Exec(ctx, taskSql, taskArgs...) if err != nil { rr.log.Error("创建核算报表任务时出现错误", zap.Error(err)) tx.Rollback(ctx) - return false, err + return false, "", err } if resTask.RowsAffected() == 0 { rr.log.Error("保存核算报表任务时出现错误", zap.Error(err)) tx.Rollback(ctx) - return false, exceptions.NewUnsuccessCreateError("创建核算报表任务时出现错误") + return false, "", exceptions.NewUnsuccessCreateError("创建核算报表任务时出现错误") } err = tx.Commit(ctx) if err != nil { rr.log.Error("提交核算报表创建事务时出现错误", zap.Error(err)) tx.Rollback(ctx) - return false, err + return false, "", err } - return resIndex.RowsAffected() > 0 && resSummary.RowsAffected() > 0 && resTask.RowsAffected() > 0, nil + return resIndex.RowsAffected() > 0 && resSummary.RowsAffected() > 0 && resTask.RowsAffected() > 0, reportId, nil } // 更新报表的基本信息 diff --git a/service/calculate/wattCost.go b/service/calculate/wattCost.go index f9f1d57..f117b4e 100644 --- a/service/calculate/wattCost.go +++ b/service/calculate/wattCost.go @@ -7,17 +7,17 @@ import ( "fmt" ) -func MainCalculateProcess(rid string) { +func MainCalculateProcess(rid string) error { report, err := repository.ReportRepository.GetReportIndex(rid) if err != nil { fmt.Println("1", err.Error()+"指定报表不存在") - return + return err } reportSummary, err := repository.ReportRepository.RetrieveReportSummary(rid) if err != nil { fmt.Println("2", err.Error()+"指定报表的基本电量电费数据不存在") - return + return err } summary := calculate.FromReportSummary(reportSummary, report) @@ -28,39 +28,39 @@ func MainCalculateProcess(rid string) { meterDetails, err := repository.MeterRepository.AllUsedMetersInReport(report.Id) if err != nil { fmt.Println("3", err) - return + return err } meterRelations, err := repository.CalculateRepository.GetAllPoolingMeterRelations(report.Park, periodStart.Time) if err != nil { fmt.Println("4", err) - return + return err } _, err = CheckMeterArea(report, meterDetails) if err != nil { fmt.Println("5", err) - return + return err } // 寻找每一个商户的所有表计读数,然后对分配到各个商户的表计读数进行初步的计算. tenementReports, err := TenementMetersCalculate(report, periodStart.Time, periodEnd.Time, meterDetails, summary) if err != nil { fmt.Println("6", err) - return + return err } // 取得所有公摊表计的读数,以及公摊表计对应的分摊表计 poolingMetersReports, err := PooledMetersCalculate(report, periodStart.Time, periodEnd.Time, meterDetails, summary) if err != nil { fmt.Println("7", err) - return + return err } // 获取所有的物业表计,然后对所有的物业表计电量进行计算。 parkMetersReports, err := MetersParkCalculate(*report, periodStart.Time, periodEnd.Time, meterDetails, summary) if err != nil { fmt.Println("8", err) - return + return err } // 计算所有表计的总电量 @@ -70,19 +70,19 @@ func MainCalculateProcess(rid string) { err = LossCalculate(report, &parkMetersReports, &parkTotal, &summary) if err != nil { fmt.Println("9", err) - return + return err } // 计算所有已经启用的商铺面积总和,仅计算所有未迁出的商户的所有表计对应的商铺面积。 _, err = EnabledAreaCalculate(&tenementReports, &summary) if err != nil { fmt.Println("10", err) - return + return err } err = CalculatePrices(&summary) if err != nil { fmt.Println("11", err) - return + return err } //=========================================================================== @@ -92,7 +92,7 @@ func MainCalculateProcess(rid string) { meters, err := CollectMeters(tenementReports, poolingMetersReports, parkMetersReports) if err != nil { fmt.Println("12", err) - return + return err } // 计算商户的合计电费信息,并归总与商户相关联的表计记录 @@ -102,28 +102,28 @@ func MainCalculateProcess(rid string) { err = CalculateBasicPooling(report, &summary, &meters) if err != nil { fmt.Println("13", err) - return + return err } err = CalculateAdjustPooling(*report, summary, meters) if err != nil { fmt.Println("14", err) - return + return err } err = CalculateLossPooling(*report, summary, meters) if err != nil { fmt.Println("15", err) - return + return err } // 计算所有商户类型表计的全周期电量,并根据全周期电量计算共用过同一表计的商户的二次分摊比例。 _, err = CalculateTenementConsumptions(meters) if err != nil { fmt.Println("16", err) - return + return err } err = CalculateTenementPoolings(*report, summary, meters, meterRelations) if err != nil { fmt.Println("17", err) - return + return err } // 计算商户的合计电费信息,并归总与商户相关联的表计记录 tenementCharges = TenementChargeCalculate(tenementReports, summary, meters) @@ -136,33 +136,33 @@ func MainCalculateProcess(rid string) { if err != nil { tx.Rollback(ctx) fmt.Println("18", err) - + return err } err = SaveSummary(tx, summary) if err != nil { tx.Rollback(ctx) fmt.Println("19", err) - + return err } err = SavePublics(tx, *report, meters) if err != nil { tx.Rollback(ctx) fmt.Println("20", err) - + return err } err = SavePoolings(tx, *report, meters, meterRelations) if err != nil { tx.Rollback(ctx) fmt.Println("21", err) - + return err } err = SaveTenements(tx, *report, tenementReports, tenementCharges) if err != nil { tx.Rollback(ctx) fmt.Println("22", err) - + return err } tx.Commit(ctx) - + return nil } diff --git a/service/report.go b/service/report.go index 2380e83..e881815 100644 --- a/service/report.go +++ b/service/report.go @@ -5,8 +5,11 @@ import ( "electricity_bill_calc/logger" "electricity_bill_calc/model" "electricity_bill_calc/repository" + "electricity_bill_calc/service/calculate" "electricity_bill_calc/types" "electricity_bill_calc/vo" + "github.com/pkg/errors" + "sync" "github.com/doug-martin/goqu/v9" _ "github.com/doug-martin/goqu/v9/dialect/postgres" @@ -196,3 +199,87 @@ func (rs _ReportService) ListWithdrawalRequests(page uint, keyword *string) ([]* ) return assembled, total, nil } + +// 定时检测执行电费核算任务的分发 +func (rs _ReportService) ReportCalcuateDispatch(rid string) error { + err := rs.CalculateReport(rid) + if err != nil { + return errors.Wrap(err, "报表计算调度时发生错误") + } + return nil +} + +//创建一个新的核算报表,并同时完成核算报表的计算 +func (rs _ReportService) CreateNewReport(createFrom *vo.ReportCreationForm) (bool, error) { + state, report, err := repository.ReportRepository.CreateReport(createFrom) + if err != nil { + rs.log.Error("创建核算报表错误", zap.Error(err)) + return false, err + } + if !state { + status, err := repository.CalculateRepository.UpdateReportCalculateStatus(report, "InsufficientData", "创建报表时发生错误,需手动再次计算") + if err != nil { + rs.log.Error("创建报表时发生错误,需手动再次计算", zap.Error(err)) + return false, err + } + return status, nil + } + err = rs.CalculateReport(report) + if err != nil { + rs.log.Error("计算时出错", zap.Error(err)) + return false, err + } + return true, nil +} + +//更新一个核算报表中的数据,并同时完成计算 +func (rs _ReportService) UpdateRepoet(rid string, updateForm *vo.ReportModifyForm) (bool, error) { + state, err := repository.ReportRepository.UpdateReportSummary(rid, updateForm) + if err != nil { + rs.log.Error("更新摘要出错", zap.Error(err)) + return false, err + } + if !state { + return false, nil + } + err = rs.CalculateReport(rid) + if err != nil { + rs.log.Error("计算时出错", zap.Error(err)) + return false, err + } + return true, nil +} + +var CALCULATE_TASK_PARALLEL_CONTROL = func() *sync.Mutex { + s := sync.Mutex{} + s.Lock() + return &s +}() + +// 执行一个核算报表的计算任务 +func (rs _ReportService) CalculateReport(rid string) error { + semaphore := CALCULATE_TASK_PARALLEL_CONTROL + + semaphore.Lock() + defer semaphore.Unlock() + + errs := calculate.MainCalculateProcess(rid) + + if errs == nil { + _, err := repository.CalculateRepository.UpdateReportCalculateStatus(rid, "success", "") + if err != nil { + rs.log.Error("执行核算报表计算任务失败", zap.Error(err)) + return err + } + } else { + _, err := repository.CalculateRepository.UpdateReportCalculateStatus(rid, "InsufficientData", errs.Error()) + if err != nil { + rs.log.Error("执行核算报表计算任务失败", zap.Error(err)) + return err + } + rs.log.Error("核算报表"+rid+"时发生错误:"+errs.Error()+"", zap.Error(errs)) + return err + } + + return nil +}