forked from free-lancers/electricity_bill_calc_service
		
	enhance(report):
This commit is contained in:
		
							
								
								
									
										4
									
								
								.idea/modules.xml
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										4
									
								
								.idea/modules.xml
									
									
									
										generated
									
									
									
								
							| @@ -2,11 +2,7 @@ | ||||
| <project version="4"> | ||||
|   <component name="ProjectModuleManager"> | ||||
|     <modules> | ||||
| <<<<<<< HEAD | ||||
|       <module fileurl="file://$PROJECT_DIR$/.idea/electricity_bill_calc_service.iml" filepath="$PROJECT_DIR$/.idea/electricity_bill_calc_service.iml" /> | ||||
| ======= | ||||
|       <module fileurl="file://$PROJECT_DIR$/.idea/0.2.iml" filepath="$PROJECT_DIR$/.idea/0.2.iml" /> | ||||
| >>>>>>> 8163fd99ee4261a308318b55be08e30d123776e3 | ||||
|     </modules> | ||||
|   </component> | ||||
| </project> | ||||
							
								
								
									
										4
									
								
								.idea/vcs.xml
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										4
									
								
								.idea/vcs.xml
									
									
									
										generated
									
									
									
								
							| @@ -1,10 +1,6 @@ | ||||
| <?xml version="1.0" encoding="UTF-8"?> | ||||
| <project version="4"> | ||||
|   <component name="VcsDirectoryMappings"> | ||||
| <<<<<<< HEAD | ||||
|     <mapping directory="$PROJECT_DIR$" vcs="Git" /> | ||||
| ======= | ||||
|     <mapping directory="" vcs="Git" /> | ||||
| >>>>>>> 8163fd99ee4261a308318b55be08e30d123776e3 | ||||
|   </component> | ||||
| </project> | ||||
| @@ -144,6 +144,7 @@ func initiateCalculateTask(c *fiber.Ctx) error { | ||||
| 		reportLog.Error("无法启动核算报表计算任务", zap.Error(err)) | ||||
| 		return result.Error(fiber.StatusInternalServerError, "无法启动核算报表计算任务。") | ||||
| 	} | ||||
| 	// | ||||
| 	return result.Success("已经成功启动核算报表计算任务。") | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -18,9 +18,10 @@ func SetupRedisConnection() error { | ||||
| 	a := fmt.Sprintf("%s:%d", config.RedisSettings.Host, config.RedisSettings.Port) | ||||
| 	fmt.Println(a) | ||||
| 	Rd, err = rueidis.NewClient(rueidis.ClientOption{ | ||||
| 		InitAddress: []string{fmt.Sprintf("%s:%d", config.RedisSettings.Host, config.RedisSettings.Port)}, | ||||
| 		Password:    config.RedisSettings.Password, | ||||
| 		InitAddress: []string{"127.0.0.1:6379"}, | ||||
| 		Password:    "", | ||||
| 		SelectDB:    config.RedisSettings.DB, | ||||
| 		DisableCache:true, | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
|   | ||||
| @@ -32,6 +32,35 @@ var CalculateRepository = _CalculateRepository{ | ||||
| 	ds:  goqu.Dialect("postgres"), | ||||
| } | ||||
|  | ||||
| //更新当前报表的核算状态 | ||||
| func (cr _CalculateRepository) UpdateReportCalculateStatus(rid string, status string, | ||||
| 	message string) (bool, error) { | ||||
| 	ctx, cancel := global.TimeoutContext() | ||||
| 	defer cancel() | ||||
|  | ||||
| 	currentTime := time.Now() | ||||
|  | ||||
| 	updateResultSql, updateResultArgs, _ := cr.ds. | ||||
| 		Update(goqu.T("report_task")). | ||||
| 		Set(goqu.Record{ | ||||
| 			"status":           status, | ||||
| 			"last_modified_at": currentTime, | ||||
| 			"message":          message, | ||||
| 		}).Where(goqu.I("id").Eq(rid)). | ||||
| 		ToSQL() | ||||
|  | ||||
| 	res, err := global.DB.Exec(ctx, updateResultSql, updateResultArgs...) | ||||
| 	if err != nil { | ||||
| 		cr.log.Error("未能更新当前报表的核算状态", zap.Error(err)) | ||||
| 		return false, err | ||||
| 	} | ||||
| 	if res.RowsAffected() == 0 { | ||||
| 		cr.log.Warn("未能保存当前报表的核算状态", zap.String("Report", rid)) | ||||
| 		return false, nil | ||||
| 	} | ||||
| 	return true, nil | ||||
| } | ||||
|  | ||||
| // 获取当前正在等待计算的核算任务ID列表 | ||||
| func (cr _CalculateRepository) ListPendingTasks() ([]string, error) { | ||||
| 	cr.log.Info("获取当前正在等待计算的核算任务ID列表") | ||||
|   | ||||
| @@ -100,7 +100,7 @@ func (rr _ReportRepository) GetReportIndex(rid string) (*model.ReportIndex, erro | ||||
| } | ||||
|  | ||||
| // 为指园区创建一个新的核算报表 | ||||
| func (rr _ReportRepository) CreateReport(form *vo.ReportCreationForm) (bool, error) { | ||||
| func (rr _ReportRepository) CreateReport(form *vo.ReportCreationForm) (bool, string, error) { | ||||
| 	rr.log.Info("为指定园区创建一个新的核算报表", zap.String("Park", form.Park)) | ||||
| 	ctx, cancel := global.TimeoutContext() | ||||
| 	defer cancel() | ||||
| @@ -108,13 +108,13 @@ func (rr _ReportRepository) CreateReport(form *vo.ReportCreationForm) (bool, err | ||||
| 	tx, err := global.DB.Begin(ctx) | ||||
| 	if err != nil { | ||||
| 		rr.log.Error("未能开始一个数据库事务", zap.Error(err)) | ||||
| 		return false, err | ||||
| 		return false, "", err | ||||
| 	} | ||||
| 	park, err := ParkRepository.RetrieveParkDetail(form.Park) | ||||
| 	if err != nil || park == nil { | ||||
| 		rr.log.Error("未能获取指定园区的详细信息", zap.Error(err)) | ||||
| 		tx.Rollback(ctx) | ||||
| 		return false, exceptions.NewNotFoundErrorFromError("未能获取指定园区的详细信息", err) | ||||
| 		return false, "", exceptions.NewNotFoundErrorFromError("未能获取指定园区的详细信息", err) | ||||
| 	} | ||||
| 	createTime := types.Now() | ||||
| 	periodRange := types.NewDateRange(&form.PeriodBegin, &form.PeriodEnd) | ||||
| @@ -176,42 +176,42 @@ func (rr _ReportRepository) CreateReport(form *vo.ReportCreationForm) (bool, err | ||||
| 	if err != nil { | ||||
| 		rr.log.Error("创建核算报表索引时出现错误", zap.Error(err)) | ||||
| 		tx.Rollback(ctx) | ||||
| 		return false, err | ||||
| 		return false, "", err | ||||
| 	} | ||||
| 	if resIndex.RowsAffected() == 0 { | ||||
| 		rr.log.Error("保存核算报表索引时出现错误", zap.Error(err)) | ||||
| 		tx.Rollback(ctx) | ||||
| 		return false, exceptions.NewUnsuccessCreateError("创建核算报表索引时出现错误") | ||||
| 		return false, "", exceptions.NewUnsuccessCreateError("创建核算报表索引时出现错误") | ||||
| 	} | ||||
| 	resSummary, err := tx.Exec(ctx, summarySql, summaryArgs...) | ||||
| 	if err != nil { | ||||
| 		rr.log.Error("创建核算报表汇总时出现错误", zap.Error(err)) | ||||
| 		tx.Rollback(ctx) | ||||
| 		return false, err | ||||
| 		return false, "", err | ||||
| 	} | ||||
| 	if resSummary.RowsAffected() == 0 { | ||||
| 		rr.log.Error("保存核算报表汇总时出现错误", zap.Error(err)) | ||||
| 		tx.Rollback(ctx) | ||||
| 		return false, exceptions.NewUnsuccessCreateError("创建核算报表汇总时出现错误") | ||||
| 		return false, "", exceptions.NewUnsuccessCreateError("创建核算报表汇总时出现错误") | ||||
| 	} | ||||
| 	resTask, err := tx.Exec(ctx, taskSql, taskArgs...) | ||||
| 	if err != nil { | ||||
| 		rr.log.Error("创建核算报表任务时出现错误", zap.Error(err)) | ||||
| 		tx.Rollback(ctx) | ||||
| 		return false, err | ||||
| 		return false, "", err | ||||
| 	} | ||||
| 	if resTask.RowsAffected() == 0 { | ||||
| 		rr.log.Error("保存核算报表任务时出现错误", zap.Error(err)) | ||||
| 		tx.Rollback(ctx) | ||||
| 		return false, exceptions.NewUnsuccessCreateError("创建核算报表任务时出现错误") | ||||
| 		return false, "", exceptions.NewUnsuccessCreateError("创建核算报表任务时出现错误") | ||||
| 	} | ||||
| 	err = tx.Commit(ctx) | ||||
| 	if err != nil { | ||||
| 		rr.log.Error("提交核算报表创建事务时出现错误", zap.Error(err)) | ||||
| 		tx.Rollback(ctx) | ||||
| 		return false, err | ||||
| 		return false, "", err | ||||
| 	} | ||||
| 	return resIndex.RowsAffected() > 0 && resSummary.RowsAffected() > 0 && resTask.RowsAffected() > 0, nil | ||||
| 	return resIndex.RowsAffected() > 0 && resSummary.RowsAffected() > 0 && resTask.RowsAffected() > 0, reportId, nil | ||||
| } | ||||
|  | ||||
| // 更新报表的基本信息 | ||||
|   | ||||
| @@ -7,17 +7,17 @@ import ( | ||||
| 	"fmt" | ||||
| ) | ||||
|  | ||||
| func MainCalculateProcess(rid string) { | ||||
| func MainCalculateProcess(rid string) error { | ||||
| 	report, err := repository.ReportRepository.GetReportIndex(rid) | ||||
| 	if err != nil { | ||||
| 		fmt.Println("1", err.Error()+"指定报表不存在") | ||||
| 		return | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	reportSummary, err := repository.ReportRepository.RetrieveReportSummary(rid) | ||||
| 	if err != nil { | ||||
| 		fmt.Println("2", err.Error()+"指定报表的基本电量电费数据不存在") | ||||
| 		return | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	summary := calculate.FromReportSummary(reportSummary, report) | ||||
| @@ -28,39 +28,39 @@ func MainCalculateProcess(rid string) { | ||||
| 	meterDetails, err := repository.MeterRepository.AllUsedMetersInReport(report.Id) | ||||
| 	if err != nil { | ||||
| 		fmt.Println("3", err) | ||||
| 		return | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	meterRelations, err := repository.CalculateRepository.GetAllPoolingMeterRelations(report.Park, periodStart.Time) | ||||
| 	if err != nil { | ||||
| 		fmt.Println("4", err) | ||||
| 		return | ||||
| 		return err | ||||
| 	} | ||||
| 	_, err = CheckMeterArea(report, meterDetails) | ||||
| 	if err != nil { | ||||
| 		fmt.Println("5", err) | ||||
| 		return | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	// 寻找每一个商户的所有表计读数,然后对分配到各个商户的表计读数进行初步的计算. | ||||
| 	tenementReports, err := TenementMetersCalculate(report, periodStart.Time, periodEnd.Time, meterDetails, summary) | ||||
| 	if err != nil { | ||||
| 		fmt.Println("6", err) | ||||
| 		return | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	// 取得所有公摊表计的读数,以及公摊表计对应的分摊表计 | ||||
| 	poolingMetersReports, err := PooledMetersCalculate(report, periodStart.Time, periodEnd.Time, meterDetails, summary) | ||||
| 	if err != nil { | ||||
| 		fmt.Println("7", err) | ||||
| 		return | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	// 获取所有的物业表计,然后对所有的物业表计电量进行计算。 | ||||
| 	parkMetersReports, err := MetersParkCalculate(*report, periodStart.Time, periodEnd.Time, meterDetails, summary) | ||||
| 	if err != nil { | ||||
| 		fmt.Println("8", err) | ||||
| 		return | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	// 计算所有表计的总电量 | ||||
| @@ -70,19 +70,19 @@ func MainCalculateProcess(rid string) { | ||||
| 	err = LossCalculate(report, &parkMetersReports, &parkTotal, &summary) | ||||
| 	if err != nil { | ||||
| 		fmt.Println("9", err) | ||||
| 		return | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	// 计算所有已经启用的商铺面积总和,仅计算所有未迁出的商户的所有表计对应的商铺面积。 | ||||
| 	_, err = EnabledAreaCalculate(&tenementReports, &summary) | ||||
| 	if err != nil { | ||||
| 		fmt.Println("10", err) | ||||
| 		return | ||||
| 		return err | ||||
| 	} | ||||
| 	err = CalculatePrices(&summary) | ||||
| 	if err != nil { | ||||
| 		fmt.Println("11", err) | ||||
| 		return | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	//=========================================================================== | ||||
| @@ -92,7 +92,7 @@ func MainCalculateProcess(rid string) { | ||||
| 	meters, err := CollectMeters(tenementReports, poolingMetersReports, parkMetersReports) | ||||
| 	if err != nil { | ||||
| 		fmt.Println("12", err) | ||||
| 		return | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	// 计算商户的合计电费信息,并归总与商户相关联的表计记录 | ||||
| @@ -102,28 +102,28 @@ func MainCalculateProcess(rid string) { | ||||
| 	err = CalculateBasicPooling(report, &summary, &meters) | ||||
| 	if err != nil { | ||||
| 		fmt.Println("13", err) | ||||
| 		return | ||||
| 		return err | ||||
| 	} | ||||
| 	err = CalculateAdjustPooling(*report, summary, meters) | ||||
| 	if err != nil { | ||||
| 		fmt.Println("14", err) | ||||
| 		return | ||||
| 		return err | ||||
| 	} | ||||
| 	err = CalculateLossPooling(*report, summary, meters) | ||||
| 	if err != nil { | ||||
| 		fmt.Println("15", err) | ||||
| 		return | ||||
| 		return err | ||||
| 	} | ||||
| 	// 计算所有商户类型表计的全周期电量,并根据全周期电量计算共用过同一表计的商户的二次分摊比例。 | ||||
| 	_, err = CalculateTenementConsumptions(meters) | ||||
| 	if err != nil { | ||||
| 		fmt.Println("16", err) | ||||
| 		return | ||||
| 		return err | ||||
| 	} | ||||
| 	err = CalculateTenementPoolings(*report, summary, meters, meterRelations) | ||||
| 	if err != nil { | ||||
| 		fmt.Println("17", err) | ||||
| 		return | ||||
| 		return err | ||||
| 	} | ||||
| 	// 计算商户的合计电费信息,并归总与商户相关联的表计记录 | ||||
| 	tenementCharges = TenementChargeCalculate(tenementReports, summary, meters) | ||||
| @@ -136,33 +136,33 @@ func MainCalculateProcess(rid string) { | ||||
| 	if err != nil { | ||||
| 		tx.Rollback(ctx) | ||||
| 		fmt.Println("18", err) | ||||
|  | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	err = SaveSummary(tx, summary) | ||||
| 	if err != nil { | ||||
| 		tx.Rollback(ctx) | ||||
| 		fmt.Println("19", err) | ||||
|  | ||||
| 		return err | ||||
| 	} | ||||
| 	err = SavePublics(tx, *report, meters) | ||||
| 	if err != nil { | ||||
| 		tx.Rollback(ctx) | ||||
| 		fmt.Println("20", err) | ||||
|  | ||||
| 		return err | ||||
| 	} | ||||
| 	err = SavePoolings(tx, *report, meters, meterRelations) | ||||
| 	if err != nil { | ||||
| 		tx.Rollback(ctx) | ||||
| 		fmt.Println("21", err) | ||||
|  | ||||
| 		return err | ||||
| 	} | ||||
| 	err = SaveTenements(tx, *report, tenementReports, tenementCharges) | ||||
| 	if err != nil { | ||||
| 		tx.Rollback(ctx) | ||||
| 		fmt.Println("22", err) | ||||
|  | ||||
| 		return err | ||||
| 	} | ||||
| 	tx.Commit(ctx) | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|   | ||||
| @@ -5,8 +5,11 @@ import ( | ||||
| 	"electricity_bill_calc/logger" | ||||
| 	"electricity_bill_calc/model" | ||||
| 	"electricity_bill_calc/repository" | ||||
| 	"electricity_bill_calc/service/calculate" | ||||
| 	"electricity_bill_calc/types" | ||||
| 	"electricity_bill_calc/vo" | ||||
| 	"github.com/pkg/errors" | ||||
| 	"sync" | ||||
|  | ||||
| 	"github.com/doug-martin/goqu/v9" | ||||
| 	_ "github.com/doug-martin/goqu/v9/dialect/postgres" | ||||
| @@ -196,3 +199,87 @@ func (rs _ReportService) ListWithdrawalRequests(page uint, keyword *string) ([]* | ||||
| 	) | ||||
| 	return assembled, total, nil | ||||
| } | ||||
|  | ||||
| // 定时检测执行电费核算任务的分发 | ||||
| func (rs _ReportService) ReportCalcuateDispatch(rid string) error { | ||||
| 	err := rs.CalculateReport(rid) | ||||
| 	if err != nil { | ||||
| 		return errors.Wrap(err, "报表计算调度时发生错误") | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| //创建一个新的核算报表,并同时完成核算报表的计算 | ||||
| func (rs _ReportService) CreateNewReport(createFrom *vo.ReportCreationForm) (bool, error) { | ||||
| 	state, report, err := repository.ReportRepository.CreateReport(createFrom) | ||||
| 	if err != nil { | ||||
| 		rs.log.Error("创建核算报表错误", zap.Error(err)) | ||||
| 		return false, err | ||||
| 	} | ||||
| 	if !state { | ||||
| 		status, err := repository.CalculateRepository.UpdateReportCalculateStatus(report, "InsufficientData", "创建报表时发生错误,需手动再次计算") | ||||
| 		if err != nil { | ||||
| 			rs.log.Error("创建报表时发生错误,需手动再次计算", zap.Error(err)) | ||||
| 			return false, err | ||||
| 		} | ||||
| 		return status, nil | ||||
| 	} | ||||
| 	err = rs.CalculateReport(report) | ||||
| 	if err != nil { | ||||
| 		rs.log.Error("计算时出错", zap.Error(err)) | ||||
| 		return false, err | ||||
| 	} | ||||
| 	return true, nil | ||||
| } | ||||
|  | ||||
| //更新一个核算报表中的数据,并同时完成计算 | ||||
| func (rs _ReportService) UpdateRepoet(rid string, updateForm *vo.ReportModifyForm) (bool, error) { | ||||
| 	state, err := repository.ReportRepository.UpdateReportSummary(rid, updateForm) | ||||
| 	if err != nil { | ||||
| 		rs.log.Error("更新摘要出错", zap.Error(err)) | ||||
| 		return false, err | ||||
| 	} | ||||
| 	if !state { | ||||
| 		return false, nil | ||||
| 	} | ||||
| 	err = rs.CalculateReport(rid) | ||||
| 	if err != nil { | ||||
| 		rs.log.Error("计算时出错", zap.Error(err)) | ||||
| 		return false, err | ||||
| 	} | ||||
| 	return true, nil | ||||
| } | ||||
|  | ||||
| var CALCULATE_TASK_PARALLEL_CONTROL = func() *sync.Mutex { | ||||
| 	s := sync.Mutex{} | ||||
| 	s.Lock() | ||||
| 	return &s | ||||
| }() | ||||
|  | ||||
| // 执行一个核算报表的计算任务 | ||||
| func (rs _ReportService) CalculateReport(rid string) error { | ||||
| 	semaphore := CALCULATE_TASK_PARALLEL_CONTROL | ||||
|  | ||||
| 	semaphore.Lock() | ||||
| 	defer semaphore.Unlock() | ||||
|  | ||||
| 	errs := calculate.MainCalculateProcess(rid) | ||||
|  | ||||
| 	if errs == nil { | ||||
| 		_, err := repository.CalculateRepository.UpdateReportCalculateStatus(rid, "success", "") | ||||
| 		if err != nil { | ||||
| 			rs.log.Error("执行核算报表计算任务失败", zap.Error(err)) | ||||
| 			return err | ||||
| 		} | ||||
| 	} else { | ||||
| 		_, err := repository.CalculateRepository.UpdateReportCalculateStatus(rid, "InsufficientData", errs.Error()) | ||||
| 		if err != nil { | ||||
| 			rs.log.Error("执行核算报表计算任务失败", zap.Error(err)) | ||||
| 			return err | ||||
| 		} | ||||
| 		rs.log.Error("核算报表"+rid+"时发生错误:"+errs.Error()+"", zap.Error(errs)) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user