electricity_bill_calc_service/repository/synchronize.go
2023-08-04 17:11:10 +08:00

209 lines
8.3 KiB
Go

package repository
import (
"context"
"electricity_bill_calc/config"
"electricity_bill_calc/global"
"electricity_bill_calc/logger"
"electricity_bill_calc/model"
"electricity_bill_calc/tools"
"electricity_bill_calc/vo"
"fmt"
"github.com/doug-martin/goqu/v9"
"github.com/georgysavva/scany/v2/pgxscan"
"github.com/jackc/pgx/v5"
"go.uber.org/zap"
"strconv"
)
type _SynchronizeRepository struct {
log *zap.Logger
ds goqu.DialectWrapper
}
var SynchronizeRepository = _SynchronizeRepository{
log: logger.Named("Repository", "Synchronize"),
ds: goqu.Dialect("postgres"),
}
func (sr _SynchronizeRepository) SearchSynchronizeSchedules(userId *string, parkId *string, page uint, keyword *string) ([]*model.SynchronizeSchedule, int64, error) {
sr.log.Info("检索符合指定条件的同步记录", zap.String("user id", tools.DefaultTo(userId, "")),
zap.String("park id", tools.DefaultTo(parkId, "")), zap.Uint("page", page),
zap.String("keyword", tools.DefaultTo(keyword, "")))
ctx, cancelFunc := global.TimeoutContext()
defer cancelFunc()
//scheduleQuery := "select ss.*, ud.name as user_name, p.name as park_name from synchronize_schedule as ss
//join park as p on p.id=ss.park_id join user_detail as ud on ud.id=ss.user_id where 1=1"
schedulequery := sr.ds.From(goqu.T("synchronize_schedule").As("ss")).
Join(goqu.T("park").As("p"), goqu.On(goqu.I("p.id").Eq(goqu.I("ss.park_id")))).
Join(goqu.T("user_detail").As("ud"), goqu.On(goqu.I("ud.id").Eq(goqu.I("ss.user_id")))).
Select("ss.*", goqu.I("ud.name").As("user_name"), goqu.I("p.name").As("park_name"))
//countQuery := "select count(ss.*) from synchronize_schedule as ss
//join park as p on p.id=ss.park_id join user_detail as ud on ud.id=ss.user_id where 1=1"
countquery := sr.ds.From(goqu.T("synchronize_schedule").As("ss")).
Join(goqu.T("park").As("p"), goqu.On(goqu.I("p.id").Eq(goqu.I("ss.park_id")))).
Join(goqu.T("user_detail").As("ud"), goqu.On(goqu.I("ud.id").Eq(goqu.I("ss.user_id")))).
Select(goqu.COUNT(goqu.I("ss.*")))
if userId != nil && len(*userId) > 0 {
schedulequery = schedulequery.Where(goqu.I("ss.user_id").Eq(*userId))
countquery = countquery.Where(goqu.I("ss.user_id").Eq(*userId))
}
if parkId != nil && len(*parkId) > 0 {
schedulequery = schedulequery.Where(goqu.I("ss.park_id").Eq(*parkId))
countquery = countquery.Where(goqu.I("ss.park_id").Eq(*parkId))
}
if keyword != nil && len(*keyword) > 0 {
pattern := fmt.Sprintf("%%%s%%", *keyword)
schedulequery = schedulequery.Where(goqu.Or(
goqu.I("p.name").ILike(pattern),
goqu.I("p.abbr").ILike(pattern),
goqu.I("p.address").ILike(pattern),
goqu.I("p.contact").ILike(pattern),
goqu.I("p.phone").ILike(pattern),
goqu.I("ud.name").ILike(pattern),
goqu.I("ud.abbr").ILike(pattern),
goqu.I("ud.contact").ILike(pattern),
goqu.I("ud.phone").ILike(pattern),
goqu.I("ss.task_name").ILike(pattern),
goqu.I("ss.task_description").ILike(pattern),
))
countquery = countquery.Where(goqu.Or(
goqu.I("p.name").ILike(pattern),
goqu.I("p.abbr").ILike(pattern),
goqu.I("p.address").ILike(pattern),
goqu.I("p.contact").ILike(pattern),
goqu.I("ud.name").ILike(pattern),
goqu.I("ud.abbr").ILike(pattern),
goqu.I("ud.contact").ILike(pattern),
goqu.I("ud.phone").ILike(pattern),
goqu.I("ss.task_name").ILike(pattern),
goqu.I("ss.task_description").ILike(pattern),
))
}
startRow := (page - 1) * config.ServiceSettings.ItemsPageSize
schedulequery = schedulequery.
Order(goqu.I("ss.created_at").Desc()).
Offset(startRow).Limit(config.ServiceSettings.ItemsPageSize)
var (
schedule []*model.SynchronizeSchedule = make([]*model.SynchronizeSchedule, 0)
count int64
)
querySql, queryArgs, _ := schedulequery.Prepared(true).ToSQL()
countSql, countArgs, _ := countquery.Prepared(true).ToSQL()
if err := pgxscan.Select(ctx, global.DB, &schedule, querySql, queryArgs...); err != nil {
sr.log.Error("获取同步任务时出现错误", zap.Error(err))
return schedule, 0, err
}
if err := pgxscan.Get(ctx, global.DB, &count, countSql, countArgs...); err != nil {
sr.log.Error("检索同步任务总数量时出现错误", zap.Error(err))
return schedule, 0, err
}
return schedule, count, nil
}
// From("synchronize_schedule").
//
// Select(
// goqu.I("synchronize_schedule.*"),
// goqu.I("user_detail.name").As("user_name"),
// goqu.I("park.name").As("park_name"),
// ).
// Join(
// goqu.T("park").On(goqu.I("park.id").Eq(goqu.I("synchronize_schedule.park_id"))),
// goqu.T("user_detail").On(goqu.I("user_detail.id").Eq(goqu.I("synchronize_schedule.user_id"))),
// ).
// Where(goqu.C("1").Eq(1))
//
// SELECT count(ss.*)
// FROM synchronize_schedule AS ss
// JOIN park AS p ON p.id = ss.park_id
// JOIN user_detail AS ud ON ud.id = ss.user_id
// WHERE true`
//
// var args []interface{}
//
// if uid != nil {
// scheduleQuery += " AND ss.user_id = $1"
// countQuery += " AND ss.user_id = $1"
// args = append(args, *uid)
// }
//
// if pid != nil {
// scheduleQuery += " AND ss.park_id = $2"
// countQuery += " AND ss.park_id = $2"
// args = append(args, *pid)
// }
//
// if keyword != nil {
// pattern := "%" + *keyword + "%"
// scheduleQuery += ` AND (p.name LIKE $3 OR p.abbr LIKE $3 OR p.address LIKE $3 OR p.contact LIKE $3 OR
//
// p.phone LIKE $3 OR ud.name LIKE $3 OR ud.abbr LIKE $3 OR ud.contact LIKE $3 OR
// ud.phone LIKE $3 OR ss.task_name LIKE $3 OR ss.task_description LIKE $3)`
//
// args = append(args, pattern)
// }
func (sr _SynchronizeRepository) RetrieveSynchronizeConfiguration(uId, pId string) (vo.SynchronizeConfiguration, error) {
sr.log.Info("检索符合指定条件的同步记录", zap.String("user id", uId), zap.String("park id", pId))
ctx, cancelFunc := global.TimeoutContext()
defer cancelFunc()
//select * from synchronize_config where user_id=$1 and park_id=$2
configSql, configArgs, _ := sr.ds.
From(goqu.T("synchronize_config")).
Where(goqu.I("user_id").Eq(uId)).
Where(goqu.I("park_id").Eq(pId)).
Prepared(true).Select("*").ToSQL()
fmt.Println(configSql)
var configs []model.SynchronizeConfiguration
if err := pgxscan.Select(ctx, global.DB, &configs, configSql, configArgs...); err != nil {
fmt.Println(err)
sr.log.Error("获取同步任务时出现错误", zap.Error(err))
return vo.SynchronizeConfiguration{}, err
}
if len(configs) <= 0 {
return vo.SynchronizeConfiguration{}, nil
}
maxr := strconv.Itoa(int(configs[0].MaxRetries))
retry := strconv.Itoa(int(configs[0].RetryInterval))
synconfig := vo.SynchronizeConfiguration{
CollectAt: configs[0].CollectAt.Format("15:04:05"),
EntID: configs[0].User,
Imrs: configs[0].ImrsType,
ImrsAccount: configs[0].AuthorizationAccount,
ImrsKey: string(configs[0].AuthorizationKey),
ImrsSecret: configs[0].AuthorizationSecret,
Interval: float64(configs[0].Interval),
MaxRetries: maxr,
ParkID: configs[0].Park,
ReadingType: float64(configs[0].MeterReadingType),
RetryAlgorithm: float64(configs[0].RetryIntervalAlgorithm),
RetryInterval: retry,
}
return synconfig, nil
}
func (sr _SynchronizeRepository) CreateSynchronizeConfiguration(tx pgx.Tx, ctx context.Context, uId string, form *vo.SynchronizeConfigurationCreateForm) (bool, error) {
sr.log.Info("创建新的同步用户配置", zap.String("user Id", uId))
ctx, cancel := global.TimeoutContext()
defer cancel()
//insert into synchronize_config (user_id, park_id, meter_reading_type, imrs_type, imrs_authorization_account,
// imrs_authorization_secret, imrs_authorization_key, interval, collect_at, max_retries, retry_interval, retry_interval_algorithm) values
configSql, configArgs, _ := sr.ds.
Insert(goqu.T("synchronize_config")).
Cols(
"user_id", "park_id", "meter_reading_type", "imrs_type", "imrs_authorization_account", "imrs_authorization_secret",
"imrs_authorization_key", "interval", "collect_at", "max_retries",
"retry_interval", "retry_interval_algorithm").
Vals(
goqu.Vals{uId, form.ParkID, form.ReadingType, form.Imrs, form.ImrsAccount, form.ImrsSecret, form.ImrsKey, form.Interval,
form.CollectAt, form.MaxRetries, form.RetryInterval, form.RetryAlgorithm,
},
).
Prepared(true).ToSQL()
ok, err := tx.Exec(ctx, configSql, configArgs...)
if err != nil {
sr.log.Error("创建同步配置信息失败", zap.Error(err))
return false, err
}
return ok.RowsAffected() > 0, nil
}