candles based alerts

This commit is contained in:
yash 2026-02-26 16:02:11 +03:00
parent bec3b7de5b
commit 999f675da9
11 changed files with 316 additions and 15 deletions

BIN
crypro_alert_bot Executable file

Binary file not shown.

View file

@ -0,0 +1,13 @@
package entities
import (
"time"
"github.com/shopspring/decimal"
)
type Candle struct {
OpenTime time.Time
High decimal.Decimal
Low decimal.Decimal
}

View file

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"log/slog" "log/slog"
"net/http" "net/http"
"strconv"
"time" "time"
"gitea.computernetthings.ru/yash/crypto_alert_bot/internal/config" "gitea.computernetthings.ru/yash/crypto_alert_bot/internal/config"
@ -73,3 +74,95 @@ func (b *Bybit) Price(ctx context.Context, instrument entities.Instrument) (*ent
Instrument: instrument, Instrument: instrument,
}, nil }, nil
} }
const klineLimit = 1000
var intervalToBybitIntervalMapper = map[provider.KlineInterval]string{
provider.Kline1m: "1",
provider.Kline3m: "3",
provider.Kline5m: "5",
provider.Kline15m: "15",
provider.Kline30m: "30",
provider.Kline1H: "60",
provider.Kline4H: "240",
provider.Kline6H: "360",
provider.Kline12H: "720",
provider.Kline1D: "D",
provider.Kline1W: "W",
}
// intervalDuration returns the time.Duration corresponding to a Bybit kline interval string.
func intervalBybit(interval provider.KlineInterval) (string, error) {
i, ok := intervalToBybitIntervalMapper[interval]
if !ok {
return "", fmt.Errorf("interval not found")
}
return i, nil
}
// Candles returns OHLC candles for the given interval in the [from, to) range.
// It paginates automatically when the range exceeds klineLimit candles per request.
func (b *Bybit) Candles(ctx context.Context, instrument entities.Instrument, from, to time.Time, interval provider.KlineInterval) ([]entities.Candle, error) {
dur := interval.ToDuration()
limit := klineLimit
var allCandles []entities.Candle
batchFrom := from
for batchFrom.Before(to) {
batchTo := batchFrom.Add(time.Duration(limit) * dur)
if batchTo.After(to) {
batchTo = to
}
bybitInterval, err := intervalBybit(interval)
if err != nil {
return nil, err
}
req := marketKlineReq{
Category: categorySpot,
Symbol: b.symbol(&instrument),
Interval: bybitInterval,
Start: strconv.FormatInt(batchFrom.UnixMilli(), 10),
End: strconv.FormatInt(batchTo.UnixMilli(), 10),
Limit: &limit,
}
var resp marketKlineResp
if err := b.makeRequest(ctx, http.MethodGet, "/v5/market/kline", req, &resp); err != nil {
return nil, fmt.Errorf("failed to get kline [%s, %s]: %w", batchFrom.Format(time.RFC3339), batchTo.Format(time.RFC3339), err)
}
for _, item := range resp.List {
if len(item) < 4 {
b.log.Error("bybit candles: length of elements less then 4", "len", len(item))
continue
}
startMs, err := strconv.ParseInt(item[0], 10, 64)
if err != nil {
b.log.Error("bybit candles: failed to parse start time", "err", err)
continue
}
high, err := decimal.NewFromString(item[2])
if err != nil {
b.log.Error("bybit candles: failed to parse candle high price", "err", err)
continue
}
low, err := decimal.NewFromString(item[3])
if err != nil {
b.log.Error("bybit candles: failed to parse candle low price", "err", err)
continue
}
allCandles = append(allCandles, entities.Candle{
OpenTime: time.UnixMilli(startMs),
High: high,
Low: low,
})
}
batchFrom = batchTo
}
return allCandles, nil
}

View file

@ -30,3 +30,21 @@ type marketOrderbookResp struct {
Seq int `json:"seq"` // Cross sequence. Seq int `json:"seq"` // Cross sequence.
Cts int64 `json:"cts"` // The timestamp from the matching engine when this orderbook data is produced. It can be correlated with T from public trade channel. Cts int64 `json:"cts"` // The timestamp from the matching engine when this orderbook data is produced. It can be correlated with T from public trade channel.
} }
type marketKlineReq struct {
Category string `json:"category"` // Product type. spot,linear,inverse
Symbol string `json:"symbol"` // Symbol name, like BTCUSDT, uppercase only
Interval string `json:"interval"` // Kline interval. 1,3,5,15,30,60,120,240,360,720,D,W,M
Start string `json:"start,omitempty"` // unix timestamp in ms
End string `json:"end,omitempty"` // unix timestamp in ms
Limit *int `json:"limit,omitempty"` // Limit for data size per page. [1, 1000]. Default: 200
}
// marketKlineResp contains the result of /v5/market/kline.
// List entries: [startTime, open, high, low, close, volume, turnover].
// Returned in descending order (newest first).
type marketKlineResp struct {
Symbol string `json:"symbol"`
Category string `json:"category"`
List [][]string `json:"list"`
}

View file

@ -71,11 +71,16 @@ func (b *Bybit) getRequest(ctx context.Context, endPoint string, params any) ([]
} }
query := make(url.Values) query := make(url.Values)
for k, v := range q { for k, v := range q {
if v == nil {
continue
}
query.Add(k, fmt.Sprint(v)) query.Add(k, fmt.Sprint(v))
} }
queryString = query.Encode() queryString = query.Encode()
} }
fmt.Println("req:", b.cfg.BaseURL+endPoint+"?"+queryString)
// make request // make request
request, err := http.NewRequest("GET", b.cfg.BaseURL+endPoint+"?"+queryString, nil) request, err := http.NewRequest("GET", b.cfg.BaseURL+endPoint+"?"+queryString, nil)
if err != nil { if err != nil {

View file

@ -2,6 +2,7 @@ package provider
import ( import (
"context" "context"
"time"
"gitea.computernetthings.ru/yash/crypto_alert_bot/internal/entities" "gitea.computernetthings.ru/yash/crypto_alert_bot/internal/entities"
) )
@ -10,4 +11,55 @@ type Provider interface {
// Price returns the current price of the pair (base currency / quote currency). // Price returns the current price of the pair (base currency / quote currency).
// e.g. BTC/USDT. // e.g. BTC/USDT.
Price(ctx context.Context, instrument entities.Instrument) (*entities.Price, error) Price(ctx context.Context, instrument entities.Instrument) (*entities.Price, error)
// Candles returns OHLC candles for the given interval in the [from, to) range.
// interval is a provider-specific string (e.g. "1", "5", "60", "D", "W" for Bybit).
// The implementation handles pagination automatically when the range exceeds one
// request's capacity.
Candles(ctx context.Context, instrument entities.Instrument, from, to time.Time, interval KlineInterval) ([]entities.Candle, error)
}
type KlineInterval string
const (
Kline1m = "1"
Kline3m = "3"
Kline5m = "5"
Kline15m = "15"
Kline30m = "30"
Kline1H = "1h"
Kline4H = "4h"
Kline6H = "6h"
Kline12H = "12h"
Kline1D = "1d"
Kline1W = "1w"
)
func (k KlineInterval) ToDuration() time.Duration {
switch k {
case Kline1m:
return time.Minute
case Kline3m:
return 3 * time.Minute
case Kline5m:
return 5 * time.Minute
case Kline15m:
return 15 * time.Minute
case Kline30m:
return 30 * time.Minute
case Kline1H:
return time.Hour
case Kline4H:
return 4 * time.Hour
case Kline6H:
return 6 * time.Hour
case Kline12H:
return 12 * time.Hour
case Kline1D:
return 24 * time.Hour
case Kline1W:
return 7 * 24 * time.Hour
default:
return time.Minute
}
} }

View file

@ -0,0 +1,33 @@
package postgresql
import (
"context"
"fmt"
"time"
)
const getLastAlertCheckQuery = `select last_alert_check from alerter_state`
// GetLastAlertCheck returns the persisted time of the last completed alert check.
// Returns zero time if no check has been recorded yet (null in DB).
func (p *Postgresql) GetLastAlertCheck(ctx context.Context) (time.Time, error) {
var t *time.Time
err := p.db.QueryRow(ctx, getLastAlertCheckQuery).Scan(&t)
if err != nil {
return time.Time{}, fmt.Errorf("failed to exec getLastAlertCheckQuery: %w", err)
}
if t == nil {
return time.Time{}, nil
}
return *t, nil
}
const setLastAlertCheckQuery = `update alerter_state set last_alert_check = $1`
func (p *Postgresql) SetLastAlertCheck(ctx context.Context, t time.Time) error {
_, err := p.db.Exec(ctx, setLastAlertCheckQuery, t)
if err != nil {
return fmt.Errorf("failed to exec setLastAlertCheckQuery: %w", err)
}
return nil
}

View file

@ -1,3 +1,4 @@
drop table if exists alerter_state;
drop table if exists alert; drop table if exists alert;
drop type alert_condition; drop type alert_condition;
drop table if exists instrument; drop table if exists instrument;

View file

@ -34,3 +34,10 @@ insert into instrument (base_currency_id, quoted_currency_id) values
((select id from currency where symbol = 'BTC'), (select id from currency where symbol = 'USDT')), ((select id from currency where symbol = 'BTC'), (select id from currency where symbol = 'USDT')),
((select id from currency where symbol = 'ETH'), (select id from currency where symbol = 'USDT')), ((select id from currency where symbol = 'ETH'), (select id from currency where symbol = 'USDT')),
((select id from currency where symbol = 'SOL'), (select id from currency where symbol = 'USDT')); ((select id from currency where symbol = 'SOL'), (select id from currency where symbol = 'USDT'));
create table if not exists alerter_state (
last_alert_check timestamptz
);
-- single row; UPDATE always succeeds without upsert logic
insert into alerter_state(last_alert_check) values (null);

View file

@ -2,6 +2,7 @@ package repository
import ( import (
"context" "context"
"time"
"gitea.computernetthings.ru/yash/crypto_alert_bot/internal/entities" "gitea.computernetthings.ru/yash/crypto_alert_bot/internal/entities"
"github.com/shopspring/decimal" "github.com/shopspring/decimal"
@ -22,4 +23,9 @@ type Storage interface {
DeleteAlert(ctx context.Context, id entities.AlertID) error DeleteAlert(ctx context.Context, id entities.AlertID) error
DisableAlert(ctx context.Context, id entities.AlertID) error DisableAlert(ctx context.Context, id entities.AlertID) error
UpdateAlertPrice(ctx context.Context, id entities.AlertID, price decimal.Decimal) error UpdateAlertPrice(ctx context.Context, id entities.AlertID, price decimal.Decimal) error
// GetLastAlertCheck returns the time of the last completed alert check.
// Returns zero time if no check has been recorded yet.
GetLastAlertCheck(ctx context.Context) (time.Time, error)
SetLastAlertCheck(ctx context.Context, t time.Time) error
} }

View file

@ -11,7 +11,6 @@ import (
"github.com/shopspring/decimal" "github.com/shopspring/decimal"
) )
type Notifier interface { type Notifier interface {
NotifyAlert(ctx context.Context, userID entities.UserID, alert *entities.Alert, currentPrice decimal.Decimal) error NotifyAlert(ctx context.Context, userID entities.UserID, alert *entities.Alert, currentPrice decimal.Decimal) error
} }
@ -19,6 +18,8 @@ type Notifier interface {
type Storage interface { type Storage interface {
AllActiveAlerts(ctx context.Context) ([]entities.Alert, error) AllActiveAlerts(ctx context.Context) ([]entities.Alert, error)
DisableAlert(ctx context.Context, id entities.AlertID) error DisableAlert(ctx context.Context, id entities.AlertID) error
GetLastAlertCheck(ctx context.Context) (time.Time, error)
SetLastAlertCheck(ctx context.Context, t time.Time) error
} }
type Alerter struct { type Alerter struct {
@ -27,6 +28,7 @@ type Alerter struct {
priceProvider provider.Provider priceProvider provider.Provider
notifier Notifier notifier Notifier
storage Storage storage Storage
lastCheckedAt time.Time
} }
const interval = time.Minute const interval = time.Minute
@ -38,6 +40,7 @@ func New(log *slog.Logger, priceProvider provider.Provider, notifier Notifier, s
priceProvider: priceProvider, priceProvider: priceProvider,
notifier: notifier, notifier: notifier,
storage: storage, storage: storage,
lastCheckedAt: time.Now(), // safe default; overwritten by LoadAlerts if DB has a stored value
} }
} }
@ -52,6 +55,17 @@ func (a *Alerter) LoadAlerts(ctx context.Context) error {
} }
a.log.Info("alerts loaded", "count", len(alerts)) a.log.Info("alerts loaded", "count", len(alerts))
// Restore last check time so missed candles are checked on next tick.
t, err := a.storage.GetLastAlertCheck(ctx)
if err != nil {
return fmt.Errorf("failed to load last alert check time: %w", err)
}
if !t.IsZero() {
a.lastCheckedAt = t
a.log.Info("resuming alert checks from", "since", t)
}
return nil return nil
} }
@ -87,36 +101,95 @@ func (a *Alerter) Run(ctx context.Context) {
}() }()
} }
// klineMaxCandles matches the per-request limit of the Bybit kline API.
// Used to select the coarsest candle interval that still fits in one request.
const klineMaxCandles = 1000
// candleIntervalTable lists Bybit kline intervals in ascending duration order.
// selectCandleInterval picks the smallest entry whose window covers the gap.
var candleIntervalTable = []provider.KlineInterval{
provider.Kline1m,
provider.Kline3m,
provider.Kline5m,
provider.Kline15m,
provider.Kline30m,
provider.Kline1H,
provider.Kline4H,
provider.Kline6H,
provider.Kline12H,
provider.Kline1D,
provider.Kline1W,
}
// selectCandleInterval returns the smallest interval whose single-request window
// (klineMaxCandles * intervalDuration) covers the entire gap.
func selectCandleInterval(gap time.Duration) provider.KlineInterval {
for _, iv := range candleIntervalTable {
if gap <= time.Duration(klineMaxCandles)*iv.ToDuration() {
return iv
}
}
return candleIntervalTable[len(candleIntervalTable)-1]
}
// TODO: parallel checking for different instruments. // TODO: parallel checking for different instruments.
// TODO: get one candle before interval
func (a *Alerter) checkAlerts(ctx context.Context) error { func (a *Alerter) checkAlerts(ctx context.Context) error {
now := time.Now()
gap := now.Sub(a.lastCheckedAt)
candleInterval := selectCandleInterval(gap)
// Truncate to the selected interval boundary so we always re-check the candle
// that was still forming at the previous tick — its High/Low may have changed
// since.
from := a.lastCheckedAt.Truncate(candleInterval.ToDuration())
a.log.Debug("checking alerts", "from", from, "to", now, "interval", candleInterval, "gap", gap.Round(time.Second))
instruments := a.cache.Instruments() instruments := a.cache.Instruments()
for _, instrument := range instruments { for _, instrument := range instruments {
price, err := a.priceProvider.Price(ctx, instrument) candles, err := a.priceProvider.Candles(ctx, instrument, from, now, candleInterval)
if err != nil { if err != nil {
a.log.Error("failed to get price", "instrument", instrument.ID, "err", err) a.log.Error("failed to get candles", "instrument", instrument.ID, "err", err)
continue return fmt.Errorf("failed to get candles: %w", err)
} }
alerts := a.cache.AlertsByInstrument(instrument.ID) for _, alert := range a.cache.AlertsByInstrument(instrument.ID) {
for _, alert := range alerts { if triggered, price := alertTriggeredByCandles(alert, candles); triggered {
switch alert.Condition { a.triggerAlert(ctx, alert, price)
case entities.AlertConditionAbove:
if price.Ask.GreaterThanOrEqual(alert.Price) {
a.triggerAlert(ctx, alert, price.Ask)
}
case entities.AlertConditionBelow:
if price.Bid.LessThanOrEqual(alert.Price) {
a.triggerAlert(ctx, alert, price.Bid)
}
} }
} }
} }
a.lastCheckedAt = now
if err := a.storage.SetLastAlertCheck(ctx, now); err != nil {
a.log.Error("failed to persist last alert check time", "err", err)
}
return nil return nil
} }
// alertTriggeredByCandles returns true and the triggering price if any candle
// caused the alert condition to be met.
func alertTriggeredByCandles(alert *entities.Alert, candles []entities.Candle) (bool, decimal.Decimal) {
for _, candle := range candles {
switch alert.Condition {
case entities.AlertConditionAbove:
if candle.High.GreaterThanOrEqual(alert.Price) {
return true, candle.High
}
case entities.AlertConditionBelow:
if candle.Low.LessThanOrEqual(alert.Price) {
return true, candle.Low
}
}
}
return false, decimal.Zero
}
func (a *Alerter) triggerAlert(ctx context.Context, alert *entities.Alert, currentPrice decimal.Decimal) { func (a *Alerter) triggerAlert(ctx context.Context, alert *entities.Alert, currentPrice decimal.Decimal) {
if err := a.notifier.NotifyAlert(ctx, alert.UserID, alert, currentPrice); err != nil { if err := a.notifier.NotifyAlert(ctx, alert.UserID, alert, currentPrice); err != nil {
a.log.Error("failed to notify alert", "alert_id", alert.ID, "err", err) a.log.Error("failed to notify alert", "alert_id", alert.ID, "err", err)