package alerter import ( "context" "fmt" "log/slog" "time" "gitea.computernetthings.ru/yash/crypto_alert_bot/internal/entities" "gitea.computernetthings.ru/yash/crypto_alert_bot/internal/provider" "github.com/shopspring/decimal" ) type Notifier interface { NotifyAlert(ctx context.Context, userID entities.UserID, alert *entities.Alert, currentPrice decimal.Decimal) error } type Storage interface { AllActiveAlerts(ctx context.Context) ([]entities.Alert, error) DisableAlert(ctx context.Context, id entities.AlertID) error GetLastAlertCheck(ctx context.Context) (time.Time, error) SetLastAlertCheck(ctx context.Context, t time.Time) error } type Alerter struct { log *slog.Logger cache *alertsCache priceProvider provider.Provider notifier Notifier storage Storage lastCheckedAt time.Time } const interval = time.Minute func New(log *slog.Logger, priceProvider provider.Provider, notifier Notifier, storage Storage) *Alerter { return &Alerter{ log: log, cache: newCache(), priceProvider: priceProvider, notifier: notifier, storage: storage, lastCheckedAt: time.Now(), // safe default; overwritten by LoadAlerts if DB has a stored value } } func (a *Alerter) LoadAlerts(ctx context.Context) error { alerts, err := a.storage.AllActiveAlerts(ctx) if err != nil { return fmt.Errorf("failed to load alerts: %w", err) } for i := range alerts { a.cache.Add(&alerts[i]) } a.log.Info("alerts loaded", "count", len(alerts)) // Restore last check time so missed candles are checked on next tick. t, err := a.storage.GetLastAlertCheck(ctx) if err != nil { return fmt.Errorf("failed to load last alert check time: %w", err) } if !t.IsZero() { a.lastCheckedAt = t a.log.Info("resuming alert checks from", "since", t) } return nil } func (a *Alerter) AddAlert(alert *entities.Alert) { a.cache.Add(alert) } func (a *Alerter) RemoveAlert(id entities.AlertID) { a.cache.Remove(id) } func (a *Alerter) Run(ctx context.Context) { go func() { ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: } a.log.Info("start checking alerts") if err := a.checkAlerts(ctx); err != nil { a.log.Error("failed to check alerts", "err", err) continue } a.log.Info("alerts checked") } }() } // klineMaxCandles matches the per-request limit of the Bybit kline API. // Used to select the coarsest candle interval that still fits in one request. const klineMaxCandles = 1000 // candleIntervalTable lists Bybit kline intervals in ascending duration order. // selectCandleInterval picks the smallest entry whose window covers the gap. var candleIntervalTable = []provider.KlineInterval{ provider.Kline1m, provider.Kline3m, provider.Kline5m, provider.Kline15m, provider.Kline30m, provider.Kline1H, provider.Kline4H, provider.Kline6H, provider.Kline12H, provider.Kline1D, provider.Kline1W, } // selectCandleInterval returns the smallest interval whose single-request window // (klineMaxCandles * intervalDuration) covers the entire gap. func selectCandleInterval(gap time.Duration) provider.KlineInterval { for _, iv := range candleIntervalTable { if gap <= time.Duration(klineMaxCandles)*iv.ToDuration() { return iv } } return candleIntervalTable[len(candleIntervalTable)-1] } // TODO: parallel checking for different instruments. func (a *Alerter) checkAlerts(ctx context.Context) error { now := time.Now() instruments := a.cache.Instruments() for _, instrument := range instruments { alerts := a.cache.AlertsByInstrument(instrument.ID) // Separate crossing alerts (above/below) from candle-close alerts. var crossingAlerts []*entities.Alert closeByTimeframe := make(map[provider.KlineInterval][]*entities.Alert) for _, alert := range alerts { switch alert.Condition { case entities.AlertConditionAbove, entities.AlertConditionBelow: crossingAlerts = append(crossingAlerts, alert) case entities.AlertConditionCloseAbove, entities.AlertConditionCloseBelow: tf := provider.KlineInterval(alert.Timeframe) closeByTimeframe[tf] = append(closeByTimeframe[tf], alert) } } // Check crossing alerts using an auto-selected interval for the gap. if len(crossingAlerts) > 0 { gap := now.Sub(a.lastCheckedAt) candleInterval := selectCandleInterval(gap) from := a.lastCheckedAt.Truncate(candleInterval.ToDuration()) a.log.Debug("checking crossing alerts", "instrument", instrument.ID, "from", from, "to", now, "interval", candleInterval) candles, err := a.priceProvider.Candles(ctx, instrument, from, now, candleInterval) if err != nil { a.log.Error("failed to get candles for crossing alerts", "instrument", instrument.ID, "err", err) return fmt.Errorf("failed to get candles: %w", err) } for _, alert := range crossingAlerts { if triggered, price := crossingTriggered(alert, candles); triggered { a.triggerAlert(ctx, alert, price) } } } // Check candle-close alerts per timeframe; only closed candles count. for tf, tfAlerts := range closeByTimeframe { from := a.lastCheckedAt.Truncate(tf.ToDuration()) a.log.Debug("checking close alerts", "instrument", instrument.ID, "from", from, "to", now, "timeframe", tf) candles, err := a.priceProvider.Candles(ctx, instrument, from, now, tf) if err != nil { a.log.Error("failed to get candles for close alerts", "instrument", instrument.ID, "timeframe", tf, "err", err) return fmt.Errorf("failed to get candles: %w", err) } // Only consider candles that have fully closed. closed := closedCandles(candles, tf, now) for _, alert := range tfAlerts { if triggered, price := closeTriggered(alert, closed); triggered { a.triggerAlert(ctx, alert, price) } } } } a.lastCheckedAt = now if err := a.storage.SetLastAlertCheck(ctx, now); err != nil { a.log.Error("failed to persist last alert check time", "err", err) } return nil } // closedCandles filters to candles whose close time (openTime + interval) is before now. func closedCandles(candles []entities.Candle, interval provider.KlineInterval, now time.Time) []entities.Candle { dur := interval.ToDuration() var result []entities.Candle for _, c := range candles { if !c.OpenTime.Add(dur).After(now) { result = append(result, c) } } return result } // crossingTriggered returns true if any candle's High/Low crosses the alert price. func crossingTriggered(alert *entities.Alert, candles []entities.Candle) (bool, decimal.Decimal) { for _, candle := range candles { switch alert.Condition { case entities.AlertConditionAbove: if candle.High.GreaterThanOrEqual(alert.Price) { return true, candle.High } case entities.AlertConditionBelow: if candle.Low.LessThanOrEqual(alert.Price) { return true, candle.Low } } } return false, decimal.Zero } // closeTriggered returns true if any closed candle's Close crosses the alert price. func closeTriggered(alert *entities.Alert, candles []entities.Candle) (bool, decimal.Decimal) { for _, candle := range candles { switch alert.Condition { case entities.AlertConditionCloseAbove: if candle.Close.GreaterThanOrEqual(alert.Price) { return true, candle.Close } case entities.AlertConditionCloseBelow: if candle.Close.LessThanOrEqual(alert.Price) { return true, candle.Close } } } return false, decimal.Zero } func (a *Alerter) triggerAlert(ctx context.Context, alert *entities.Alert, currentPrice decimal.Decimal) { if err := a.notifier.NotifyAlert(ctx, alert.UserID, alert, currentPrice); err != nil { a.log.Error("failed to notify alert", "alert_id", alert.ID, "err", err) return } a.cache.Remove(alert.ID) if err := a.storage.DisableAlert(ctx, alert.ID); err != nil { a.log.Error("failed to disable alert in db", "alert_id", alert.ID, "err", err) } }