diff --git a/crypro_alert_bot b/crypro_alert_bot new file mode 100755 index 0000000..85b6bcb Binary files /dev/null and b/crypro_alert_bot differ diff --git a/internal/entities/candle.go b/internal/entities/candle.go new file mode 100644 index 0000000..a0fe073 --- /dev/null +++ b/internal/entities/candle.go @@ -0,0 +1,13 @@ +package entities + +import ( + "time" + + "github.com/shopspring/decimal" +) + +type Candle struct { + OpenTime time.Time + High decimal.Decimal + Low decimal.Decimal +} diff --git a/internal/provider/bybit/bybit.go b/internal/provider/bybit/bybit.go index 2198648..82961bc 100644 --- a/internal/provider/bybit/bybit.go +++ b/internal/provider/bybit/bybit.go @@ -6,6 +6,7 @@ import ( "fmt" "log/slog" "net/http" + "strconv" "time" "gitea.computernetthings.ru/yash/crypto_alert_bot/internal/config" @@ -73,3 +74,95 @@ func (b *Bybit) Price(ctx context.Context, instrument entities.Instrument) (*ent Instrument: instrument, }, nil } + +const klineLimit = 1000 + +var intervalToBybitIntervalMapper = map[provider.KlineInterval]string{ + provider.Kline1m: "1", + provider.Kline3m: "3", + provider.Kline5m: "5", + provider.Kline15m: "15", + provider.Kline30m: "30", + provider.Kline1H: "60", + provider.Kline4H: "240", + provider.Kline6H: "360", + provider.Kline12H: "720", + provider.Kline1D: "D", + provider.Kline1W: "W", +} + +// intervalDuration returns the time.Duration corresponding to a Bybit kline interval string. +func intervalBybit(interval provider.KlineInterval) (string, error) { + i, ok := intervalToBybitIntervalMapper[interval] + if !ok { + return "", fmt.Errorf("interval not found") + } + return i, nil +} + +// Candles returns OHLC candles for the given interval in the [from, to) range. +// It paginates automatically when the range exceeds klineLimit candles per request. +func (b *Bybit) Candles(ctx context.Context, instrument entities.Instrument, from, to time.Time, interval provider.KlineInterval) ([]entities.Candle, error) { + dur := interval.ToDuration() + limit := klineLimit + + var allCandles []entities.Candle + batchFrom := from + + for batchFrom.Before(to) { + batchTo := batchFrom.Add(time.Duration(limit) * dur) + if batchTo.After(to) { + batchTo = to + } + + bybitInterval, err := intervalBybit(interval) + if err != nil { + return nil, err + } + + req := marketKlineReq{ + Category: categorySpot, + Symbol: b.symbol(&instrument), + Interval: bybitInterval, + Start: strconv.FormatInt(batchFrom.UnixMilli(), 10), + End: strconv.FormatInt(batchTo.UnixMilli(), 10), + Limit: &limit, + } + + var resp marketKlineResp + if err := b.makeRequest(ctx, http.MethodGet, "/v5/market/kline", req, &resp); err != nil { + return nil, fmt.Errorf("failed to get kline [%s, %s]: %w", batchFrom.Format(time.RFC3339), batchTo.Format(time.RFC3339), err) + } + + for _, item := range resp.List { + if len(item) < 4 { + b.log.Error("bybit candles: length of elements less then 4", "len", len(item)) + continue + } + startMs, err := strconv.ParseInt(item[0], 10, 64) + if err != nil { + b.log.Error("bybit candles: failed to parse start time", "err", err) + continue + } + high, err := decimal.NewFromString(item[2]) + if err != nil { + b.log.Error("bybit candles: failed to parse candle high price", "err", err) + continue + } + low, err := decimal.NewFromString(item[3]) + if err != nil { + b.log.Error("bybit candles: failed to parse candle low price", "err", err) + continue + } + allCandles = append(allCandles, entities.Candle{ + OpenTime: time.UnixMilli(startMs), + High: high, + Low: low, + }) + } + + batchFrom = batchTo + } + + return allCandles, nil +} diff --git a/internal/provider/bybit/models.go b/internal/provider/bybit/models.go index bcf1b49..4afc1f8 100644 --- a/internal/provider/bybit/models.go +++ b/internal/provider/bybit/models.go @@ -30,3 +30,21 @@ type marketOrderbookResp struct { Seq int `json:"seq"` // Cross sequence. Cts int64 `json:"cts"` // The timestamp from the matching engine when this orderbook data is produced. It can be correlated with T from public trade channel. } + +type marketKlineReq struct { + Category string `json:"category"` // Product type. spot,linear,inverse + Symbol string `json:"symbol"` // Symbol name, like BTCUSDT, uppercase only + Interval string `json:"interval"` // Kline interval. 1,3,5,15,30,60,120,240,360,720,D,W,M + Start string `json:"start,omitempty"` // unix timestamp in ms + End string `json:"end,omitempty"` // unix timestamp in ms + Limit *int `json:"limit,omitempty"` // Limit for data size per page. [1, 1000]. Default: 200 +} + +// marketKlineResp contains the result of /v5/market/kline. +// List entries: [startTime, open, high, low, close, volume, turnover]. +// Returned in descending order (newest first). +type marketKlineResp struct { + Symbol string `json:"symbol"` + Category string `json:"category"` + List [][]string `json:"list"` +} diff --git a/internal/provider/bybit/request.go b/internal/provider/bybit/request.go index 5be72fa..b03437d 100644 --- a/internal/provider/bybit/request.go +++ b/internal/provider/bybit/request.go @@ -71,11 +71,16 @@ func (b *Bybit) getRequest(ctx context.Context, endPoint string, params any) ([] } query := make(url.Values) for k, v := range q { + if v == nil { + continue + } query.Add(k, fmt.Sprint(v)) } queryString = query.Encode() } + fmt.Println("req:", b.cfg.BaseURL+endPoint+"?"+queryString) + // make request request, err := http.NewRequest("GET", b.cfg.BaseURL+endPoint+"?"+queryString, nil) if err != nil { diff --git a/internal/provider/provider.go b/internal/provider/provider.go index 5b3c722..89bdad1 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -2,6 +2,7 @@ package provider import ( "context" + "time" "gitea.computernetthings.ru/yash/crypto_alert_bot/internal/entities" ) @@ -10,4 +11,55 @@ type Provider interface { // Price returns the current price of the pair (base currency / quote currency). // e.g. BTC/USDT. Price(ctx context.Context, instrument entities.Instrument) (*entities.Price, error) + + // Candles returns OHLC candles for the given interval in the [from, to) range. + // interval is a provider-specific string (e.g. "1", "5", "60", "D", "W" for Bybit). + // The implementation handles pagination automatically when the range exceeds one + // request's capacity. + Candles(ctx context.Context, instrument entities.Instrument, from, to time.Time, interval KlineInterval) ([]entities.Candle, error) +} + +type KlineInterval string + +const ( + Kline1m = "1" + Kline3m = "3" + Kline5m = "5" + Kline15m = "15" + Kline30m = "30" + Kline1H = "1h" + Kline4H = "4h" + Kline6H = "6h" + Kline12H = "12h" + Kline1D = "1d" + Kline1W = "1w" +) + +func (k KlineInterval) ToDuration() time.Duration { + switch k { + case Kline1m: + return time.Minute + case Kline3m: + return 3 * time.Minute + case Kline5m: + return 5 * time.Minute + case Kline15m: + return 15 * time.Minute + case Kline30m: + return 30 * time.Minute + case Kline1H: + return time.Hour + case Kline4H: + return 4 * time.Hour + case Kline6H: + return 6 * time.Hour + case Kline12H: + return 12 * time.Hour + case Kline1D: + return 24 * time.Hour + case Kline1W: + return 7 * 24 * time.Hour + default: + return time.Minute + } } diff --git a/internal/repository/postgresql/alerter_state.go b/internal/repository/postgresql/alerter_state.go new file mode 100644 index 0000000..0e1bda2 --- /dev/null +++ b/internal/repository/postgresql/alerter_state.go @@ -0,0 +1,33 @@ +package postgresql + +import ( + "context" + "fmt" + "time" +) + +const getLastAlertCheckQuery = `select last_alert_check from alerter_state` + +// GetLastAlertCheck returns the persisted time of the last completed alert check. +// Returns zero time if no check has been recorded yet (null in DB). +func (p *Postgresql) GetLastAlertCheck(ctx context.Context) (time.Time, error) { + var t *time.Time + err := p.db.QueryRow(ctx, getLastAlertCheckQuery).Scan(&t) + if err != nil { + return time.Time{}, fmt.Errorf("failed to exec getLastAlertCheckQuery: %w", err) + } + if t == nil { + return time.Time{}, nil + } + return *t, nil +} + +const setLastAlertCheckQuery = `update alerter_state set last_alert_check = $1` + +func (p *Postgresql) SetLastAlertCheck(ctx context.Context, t time.Time) error { + _, err := p.db.Exec(ctx, setLastAlertCheckQuery, t) + if err != nil { + return fmt.Errorf("failed to exec setLastAlertCheckQuery: %w", err) + } + return nil +} diff --git a/internal/repository/postgresql/migrations/000001_init.down.sql b/internal/repository/postgresql/migrations/000001_init.down.sql index fbb669b..6bbb522 100644 --- a/internal/repository/postgresql/migrations/000001_init.down.sql +++ b/internal/repository/postgresql/migrations/000001_init.down.sql @@ -1,3 +1,4 @@ +drop table if exists alerter_state; drop table if exists alert; drop type alert_condition; drop table if exists instrument; diff --git a/internal/repository/postgresql/migrations/000001_init.up.sql b/internal/repository/postgresql/migrations/000001_init.up.sql index 35ab8d8..ad8b222 100644 --- a/internal/repository/postgresql/migrations/000001_init.up.sql +++ b/internal/repository/postgresql/migrations/000001_init.up.sql @@ -34,3 +34,10 @@ insert into instrument (base_currency_id, quoted_currency_id) values ((select id from currency where symbol = 'BTC'), (select id from currency where symbol = 'USDT')), ((select id from currency where symbol = 'ETH'), (select id from currency where symbol = 'USDT')), ((select id from currency where symbol = 'SOL'), (select id from currency where symbol = 'USDT')); + +create table if not exists alerter_state ( + last_alert_check timestamptz +); + +-- single row; UPDATE always succeeds without upsert logic +insert into alerter_state(last_alert_check) values (null); diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 40fe972..98166ea 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -2,6 +2,7 @@ package repository import ( "context" + "time" "gitea.computernetthings.ru/yash/crypto_alert_bot/internal/entities" "github.com/shopspring/decimal" @@ -22,4 +23,9 @@ type Storage interface { DeleteAlert(ctx context.Context, id entities.AlertID) error DisableAlert(ctx context.Context, id entities.AlertID) error UpdateAlertPrice(ctx context.Context, id entities.AlertID, price decimal.Decimal) error + + // GetLastAlertCheck returns the time of the last completed alert check. + // Returns zero time if no check has been recorded yet. + GetLastAlertCheck(ctx context.Context) (time.Time, error) + SetLastAlertCheck(ctx context.Context, t time.Time) error } diff --git a/internal/service/alerter/alerter.go b/internal/service/alerter/alerter.go index 3a96ac2..8cf87db 100644 --- a/internal/service/alerter/alerter.go +++ b/internal/service/alerter/alerter.go @@ -11,7 +11,6 @@ import ( "github.com/shopspring/decimal" ) - type Notifier interface { NotifyAlert(ctx context.Context, userID entities.UserID, alert *entities.Alert, currentPrice decimal.Decimal) error } @@ -19,6 +18,8 @@ type Notifier interface { type Storage interface { AllActiveAlerts(ctx context.Context) ([]entities.Alert, error) DisableAlert(ctx context.Context, id entities.AlertID) error + GetLastAlertCheck(ctx context.Context) (time.Time, error) + SetLastAlertCheck(ctx context.Context, t time.Time) error } type Alerter struct { @@ -27,6 +28,7 @@ type Alerter struct { priceProvider provider.Provider notifier Notifier storage Storage + lastCheckedAt time.Time } const interval = time.Minute @@ -38,6 +40,7 @@ func New(log *slog.Logger, priceProvider provider.Provider, notifier Notifier, s priceProvider: priceProvider, notifier: notifier, storage: storage, + lastCheckedAt: time.Now(), // safe default; overwritten by LoadAlerts if DB has a stored value } } @@ -52,6 +55,17 @@ func (a *Alerter) LoadAlerts(ctx context.Context) error { } a.log.Info("alerts loaded", "count", len(alerts)) + + // Restore last check time so missed candles are checked on next tick. + t, err := a.storage.GetLastAlertCheck(ctx) + if err != nil { + return fmt.Errorf("failed to load last alert check time: %w", err) + } + if !t.IsZero() { + a.lastCheckedAt = t + a.log.Info("resuming alert checks from", "since", t) + } + return nil } @@ -87,36 +101,95 @@ func (a *Alerter) Run(ctx context.Context) { }() } +// klineMaxCandles matches the per-request limit of the Bybit kline API. +// Used to select the coarsest candle interval that still fits in one request. +const klineMaxCandles = 1000 + +// candleIntervalTable lists Bybit kline intervals in ascending duration order. +// selectCandleInterval picks the smallest entry whose window covers the gap. +var candleIntervalTable = []provider.KlineInterval{ + provider.Kline1m, + provider.Kline3m, + provider.Kline5m, + provider.Kline15m, + provider.Kline30m, + provider.Kline1H, + provider.Kline4H, + provider.Kline6H, + provider.Kline12H, + provider.Kline1D, + provider.Kline1W, +} + +// selectCandleInterval returns the smallest interval whose single-request window +// (klineMaxCandles * intervalDuration) covers the entire gap. +func selectCandleInterval(gap time.Duration) provider.KlineInterval { + for _, iv := range candleIntervalTable { + if gap <= time.Duration(klineMaxCandles)*iv.ToDuration() { + return iv + } + } + return candleIntervalTable[len(candleIntervalTable)-1] +} + // TODO: parallel checking for different instruments. +// TODO: get one candle before interval func (a *Alerter) checkAlerts(ctx context.Context) error { + now := time.Now() + gap := now.Sub(a.lastCheckedAt) + candleInterval := selectCandleInterval(gap) + + // Truncate to the selected interval boundary so we always re-check the candle + // that was still forming at the previous tick — its High/Low may have changed + // since. + from := a.lastCheckedAt.Truncate(candleInterval.ToDuration()) + + a.log.Debug("checking alerts", "from", from, "to", now, "interval", candleInterval, "gap", gap.Round(time.Second)) + instruments := a.cache.Instruments() for _, instrument := range instruments { - price, err := a.priceProvider.Price(ctx, instrument) + candles, err := a.priceProvider.Candles(ctx, instrument, from, now, candleInterval) if err != nil { - a.log.Error("failed to get price", "instrument", instrument.ID, "err", err) - continue + a.log.Error("failed to get candles", "instrument", instrument.ID, "err", err) + return fmt.Errorf("failed to get candles: %w", err) } - alerts := a.cache.AlertsByInstrument(instrument.ID) - for _, alert := range alerts { - switch alert.Condition { - case entities.AlertConditionAbove: - if price.Ask.GreaterThanOrEqual(alert.Price) { - a.triggerAlert(ctx, alert, price.Ask) - } - case entities.AlertConditionBelow: - if price.Bid.LessThanOrEqual(alert.Price) { - a.triggerAlert(ctx, alert, price.Bid) - } + for _, alert := range a.cache.AlertsByInstrument(instrument.ID) { + if triggered, price := alertTriggeredByCandles(alert, candles); triggered { + a.triggerAlert(ctx, alert, price) } } } + a.lastCheckedAt = now + + if err := a.storage.SetLastAlertCheck(ctx, now); err != nil { + a.log.Error("failed to persist last alert check time", "err", err) + } + return nil } +// alertTriggeredByCandles returns true and the triggering price if any candle +// caused the alert condition to be met. +func alertTriggeredByCandles(alert *entities.Alert, candles []entities.Candle) (bool, decimal.Decimal) { + for _, candle := range candles { + switch alert.Condition { + case entities.AlertConditionAbove: + if candle.High.GreaterThanOrEqual(alert.Price) { + return true, candle.High + } + case entities.AlertConditionBelow: + if candle.Low.LessThanOrEqual(alert.Price) { + return true, candle.Low + } + } + } + return false, decimal.Zero +} + func (a *Alerter) triggerAlert(ctx context.Context, alert *entities.Alert, currentPrice decimal.Decimal) { if err := a.notifier.NotifyAlert(ctx, alert.UserID, alert, currentPrice); err != nil { a.log.Error("failed to notify alert", "alert_id", alert.ID, "err", err)