272 lines
7.8 KiB
Go
272 lines
7.8 KiB
Go
package alerter
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"gitea.computernetthings.ru/yash/crypto_alert_bot/internal/entities"
|
|
"gitea.computernetthings.ru/yash/crypto_alert_bot/internal/provider"
|
|
"github.com/shopspring/decimal"
|
|
)
|
|
|
|
type Notifier interface {
|
|
NotifyAlert(ctx context.Context, userID entities.UserID, alert *entities.Alert, currentPrice decimal.Decimal) error
|
|
}
|
|
|
|
type Storage interface {
|
|
AllActiveAlerts(ctx context.Context) ([]entities.Alert, error)
|
|
DisableAlert(ctx context.Context, id entities.AlertID) error
|
|
GetLastAlertCheck(ctx context.Context) (time.Time, error)
|
|
SetLastAlertCheck(ctx context.Context, t time.Time) error
|
|
}
|
|
|
|
type Alerter struct {
|
|
log *slog.Logger
|
|
cache *alertsCache
|
|
priceProvider provider.Provider
|
|
notifier Notifier
|
|
storage Storage
|
|
lastCheckedAt time.Time
|
|
}
|
|
|
|
const interval = time.Minute
|
|
|
|
func New(log *slog.Logger, priceProvider provider.Provider, notifier Notifier, storage Storage) *Alerter {
|
|
return &Alerter{
|
|
log: log,
|
|
cache: newCache(),
|
|
priceProvider: priceProvider,
|
|
notifier: notifier,
|
|
storage: storage,
|
|
lastCheckedAt: time.Now(), // safe default; overwritten by LoadAlerts if DB has a stored value
|
|
}
|
|
}
|
|
|
|
func (a *Alerter) LoadAlerts(ctx context.Context) error {
|
|
alerts, err := a.storage.AllActiveAlerts(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to load alerts: %w", err)
|
|
}
|
|
|
|
for i := range alerts {
|
|
a.cache.Add(&alerts[i])
|
|
}
|
|
|
|
a.log.Info("alerts loaded", "count", len(alerts))
|
|
|
|
// Restore last check time so missed candles are checked on next tick.
|
|
t, err := a.storage.GetLastAlertCheck(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to load last alert check time: %w", err)
|
|
}
|
|
if !t.IsZero() {
|
|
a.lastCheckedAt = t
|
|
a.log.Info("resuming alert checks from", "since", t)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (a *Alerter) AddAlert(alert *entities.Alert) {
|
|
a.cache.Add(alert)
|
|
}
|
|
|
|
func (a *Alerter) RemoveAlert(id entities.AlertID) {
|
|
a.cache.Remove(id)
|
|
}
|
|
|
|
func (a *Alerter) Run(ctx context.Context) {
|
|
go func() {
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
}
|
|
|
|
a.log.Info("start checking alerts")
|
|
|
|
if err := a.checkAlerts(ctx); err != nil {
|
|
a.log.Error("failed to check alerts", "err", err)
|
|
continue
|
|
}
|
|
|
|
a.log.Info("alerts checked")
|
|
}
|
|
}()
|
|
}
|
|
|
|
// klineMaxCandles matches the per-request limit of the Bybit kline API.
|
|
// Used to select the coarsest candle interval that still fits in one request.
|
|
const klineMaxCandles = 1000
|
|
|
|
// candleIntervalTable lists Bybit kline intervals in ascending duration order.
|
|
// selectCandleInterval picks the smallest entry whose window covers the gap.
|
|
var candleIntervalTable = []provider.KlineInterval{
|
|
provider.Kline1m,
|
|
provider.Kline3m,
|
|
provider.Kline5m,
|
|
provider.Kline15m,
|
|
provider.Kline30m,
|
|
provider.Kline1H,
|
|
provider.Kline4H,
|
|
provider.Kline6H,
|
|
provider.Kline12H,
|
|
provider.Kline1D,
|
|
provider.Kline1W,
|
|
}
|
|
|
|
// selectCandleInterval returns the smallest interval whose single-request window
|
|
// (klineMaxCandles * intervalDuration) covers the entire gap.
|
|
func selectCandleInterval(gap time.Duration) provider.KlineInterval {
|
|
for _, iv := range candleIntervalTable {
|
|
if gap <= time.Duration(klineMaxCandles)*iv.ToDuration() {
|
|
return iv
|
|
}
|
|
}
|
|
return candleIntervalTable[len(candleIntervalTable)-1]
|
|
}
|
|
|
|
// TODO: parallel checking for different instruments.
|
|
|
|
func (a *Alerter) checkAlerts(ctx context.Context) error {
|
|
now := time.Now()
|
|
|
|
instruments := a.cache.Instruments()
|
|
|
|
for _, instrument := range instruments {
|
|
alerts := a.cache.AlertsByInstrument(instrument.ID)
|
|
|
|
// Separate crossing alerts (above/below) from candle-close alerts.
|
|
var crossingAlerts []*entities.Alert
|
|
closeByTimeframe := make(map[provider.KlineInterval][]*entities.Alert)
|
|
|
|
for _, alert := range alerts {
|
|
switch alert.Condition {
|
|
case entities.AlertConditionAbove, entities.AlertConditionBelow:
|
|
crossingAlerts = append(crossingAlerts, alert)
|
|
case entities.AlertConditionCloseAbove, entities.AlertConditionCloseBelow:
|
|
tf := provider.KlineInterval(alert.Timeframe)
|
|
closeByTimeframe[tf] = append(closeByTimeframe[tf], alert)
|
|
}
|
|
}
|
|
|
|
// Check crossing alerts using an auto-selected interval for the gap.
|
|
if len(crossingAlerts) > 0 {
|
|
gap := now.Sub(a.lastCheckedAt)
|
|
candleInterval := selectCandleInterval(gap)
|
|
from := a.lastCheckedAt.Truncate(candleInterval.ToDuration())
|
|
|
|
a.log.Debug("checking crossing alerts", "instrument", instrument.ID,
|
|
"from", from, "to", now, "interval", candleInterval)
|
|
|
|
candles, err := a.priceProvider.Candles(ctx, instrument, from, now, candleInterval)
|
|
if err != nil {
|
|
a.log.Error("failed to get candles for crossing alerts", "instrument", instrument.ID, "err", err)
|
|
return fmt.Errorf("failed to get candles: %w", err)
|
|
}
|
|
|
|
for _, alert := range crossingAlerts {
|
|
if triggered, price := crossingTriggered(alert, candles); triggered {
|
|
a.triggerAlert(ctx, alert, price)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Check candle-close alerts per timeframe; only closed candles count.
|
|
for tf, tfAlerts := range closeByTimeframe {
|
|
from := a.lastCheckedAt.Truncate(tf.ToDuration())
|
|
|
|
a.log.Debug("checking close alerts", "instrument", instrument.ID,
|
|
"from", from, "to", now, "timeframe", tf)
|
|
|
|
candles, err := a.priceProvider.Candles(ctx, instrument, from, now, tf)
|
|
if err != nil {
|
|
a.log.Error("failed to get candles for close alerts", "instrument", instrument.ID,
|
|
"timeframe", tf, "err", err)
|
|
return fmt.Errorf("failed to get candles: %w", err)
|
|
}
|
|
|
|
// Only consider candles that have fully closed.
|
|
closed := closedCandles(candles, tf, now)
|
|
|
|
for _, alert := range tfAlerts {
|
|
if triggered, price := closeTriggered(alert, closed); triggered {
|
|
a.triggerAlert(ctx, alert, price)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
a.lastCheckedAt = now
|
|
|
|
if err := a.storage.SetLastAlertCheck(ctx, now); err != nil {
|
|
a.log.Error("failed to persist last alert check time", "err", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// closedCandles filters to candles whose close time (openTime + interval) is before now.
|
|
func closedCandles(candles []entities.Candle, interval provider.KlineInterval, now time.Time) []entities.Candle {
|
|
dur := interval.ToDuration()
|
|
var result []entities.Candle
|
|
for _, c := range candles {
|
|
if !c.OpenTime.Add(dur).After(now) {
|
|
result = append(result, c)
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
// crossingTriggered returns true if any candle's High/Low crosses the alert price.
|
|
func crossingTriggered(alert *entities.Alert, candles []entities.Candle) (bool, decimal.Decimal) {
|
|
for _, candle := range candles {
|
|
switch alert.Condition {
|
|
case entities.AlertConditionAbove:
|
|
if candle.High.GreaterThanOrEqual(alert.Price) {
|
|
return true, candle.High
|
|
}
|
|
case entities.AlertConditionBelow:
|
|
if candle.Low.LessThanOrEqual(alert.Price) {
|
|
return true, candle.Low
|
|
}
|
|
}
|
|
}
|
|
return false, decimal.Zero
|
|
}
|
|
|
|
// closeTriggered returns true if any closed candle's Close crosses the alert price.
|
|
func closeTriggered(alert *entities.Alert, candles []entities.Candle) (bool, decimal.Decimal) {
|
|
for _, candle := range candles {
|
|
switch alert.Condition {
|
|
case entities.AlertConditionCloseAbove:
|
|
if candle.Close.GreaterThanOrEqual(alert.Price) {
|
|
return true, candle.Close
|
|
}
|
|
case entities.AlertConditionCloseBelow:
|
|
if candle.Close.LessThanOrEqual(alert.Price) {
|
|
return true, candle.Close
|
|
}
|
|
}
|
|
}
|
|
return false, decimal.Zero
|
|
}
|
|
|
|
func (a *Alerter) triggerAlert(ctx context.Context, alert *entities.Alert, currentPrice decimal.Decimal) {
|
|
if err := a.notifier.NotifyAlert(ctx, alert.UserID, alert, currentPrice); err != nil {
|
|
a.log.Error("failed to notify alert", "alert_id", alert.ID, "err", err)
|
|
return
|
|
}
|
|
|
|
a.cache.Remove(alert.ID)
|
|
|
|
if err := a.storage.DisableAlert(ctx, alert.ID); err != nil {
|
|
a.log.Error("failed to disable alert in db", "alert_id", alert.ID, "err", err)
|
|
}
|
|
}
|