alert service
This commit is contained in:
parent
0e73841b3e
commit
608561ab38
8 changed files with 283 additions and 8 deletions
131
internal/service/alerter/alerter.go
Normal file
131
internal/service/alerter/alerter.go
Normal file
|
|
@ -0,0 +1,131 @@
|
|||
package alerter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"gitea.computernetthings.ru/yash/crypto_alert_bot/internal/entities"
|
||||
"gitea.computernetthings.ru/yash/crypto_alert_bot/internal/provider"
|
||||
"github.com/shopspring/decimal"
|
||||
)
|
||||
|
||||
|
||||
type Notifier interface {
|
||||
NotifyAlert(ctx context.Context, userID entities.UserID, alert *entities.Alert, currentPrice decimal.Decimal) error
|
||||
}
|
||||
|
||||
type Storage interface {
|
||||
AllActiveAlerts(ctx context.Context) ([]entities.Alert, error)
|
||||
DisableAlert(ctx context.Context, id entities.AlertID) error
|
||||
}
|
||||
|
||||
type Alerter struct {
|
||||
log *slog.Logger
|
||||
cache *alertsCache
|
||||
priceProvider provider.Provider
|
||||
notifier Notifier
|
||||
storage Storage
|
||||
}
|
||||
|
||||
const interval = time.Minute
|
||||
|
||||
func New(log *slog.Logger, priceProvider provider.Provider, notifier Notifier, storage Storage) *Alerter {
|
||||
return &Alerter{
|
||||
log: log,
|
||||
cache: newCache(),
|
||||
priceProvider: priceProvider,
|
||||
notifier: notifier,
|
||||
storage: storage,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Alerter) LoadAlerts(ctx context.Context) error {
|
||||
alerts, err := a.storage.AllActiveAlerts(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to load alerts: %w", err)
|
||||
}
|
||||
|
||||
for i := range alerts {
|
||||
a.cache.Add(&alerts[i])
|
||||
}
|
||||
|
||||
a.log.Info("alerts loaded", "count", len(alerts))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Alerter) AddAlert(alert *entities.Alert) {
|
||||
a.cache.Add(alert)
|
||||
}
|
||||
|
||||
func (a *Alerter) RemoveAlert(id entities.AlertID) {
|
||||
a.cache.Remove(id)
|
||||
}
|
||||
|
||||
func (a *Alerter) Run(ctx context.Context) {
|
||||
go func() {
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
}
|
||||
|
||||
a.log.Info("start checking alerts")
|
||||
|
||||
if err := a.checkAlerts(ctx); err != nil {
|
||||
a.log.Error("failed to check alerts", "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
a.log.Info("alerts checked")
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// TODO: parallel checking for different instruments.
|
||||
|
||||
func (a *Alerter) checkAlerts(ctx context.Context) error {
|
||||
instruments := a.cache.Instruments()
|
||||
|
||||
for _, instrument := range instruments {
|
||||
price, err := a.priceProvider.Price(ctx, instrument)
|
||||
if err != nil {
|
||||
a.log.Error("failed to get price", "instrument", instrument.ID, "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
alerts := a.cache.AlertsByInstrument(instrument.ID)
|
||||
for _, alert := range alerts {
|
||||
switch alert.Condition {
|
||||
case entities.AlertConditionAbove:
|
||||
if price.Ask.GreaterThanOrEqual(alert.Price) {
|
||||
a.triggerAlert(ctx, alert, price.Ask)
|
||||
}
|
||||
case entities.AlertConditionBelow:
|
||||
if price.Bid.LessThanOrEqual(alert.Price) {
|
||||
a.triggerAlert(ctx, alert, price.Bid)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Alerter) triggerAlert(ctx context.Context, alert *entities.Alert, currentPrice decimal.Decimal) {
|
||||
if err := a.notifier.NotifyAlert(ctx, alert.UserID, alert, currentPrice); err != nil {
|
||||
a.log.Error("failed to notify alert", "alert_id", alert.ID, "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
a.cache.Remove(alert.ID)
|
||||
|
||||
if err := a.storage.DisableAlert(ctx, alert.ID); err != nil {
|
||||
a.log.Error("failed to disable alert in db", "alert_id", alert.ID, "err", err)
|
||||
}
|
||||
}
|
||||
71
internal/service/alerter/cache.go
Normal file
71
internal/service/alerter/cache.go
Normal file
|
|
@ -0,0 +1,71 @@
|
|||
package alerter
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"gitea.computernetthings.ru/yash/crypto_alert_bot/internal/entities"
|
||||
)
|
||||
|
||||
type alertsCache struct {
|
||||
mu sync.RWMutex
|
||||
byID map[entities.AlertID]*entities.Alert
|
||||
byInstrument map[entities.InstrumentID]map[entities.AlertID]*entities.Alert
|
||||
}
|
||||
|
||||
func newCache() *alertsCache {
|
||||
return &alertsCache{
|
||||
byID: make(map[entities.AlertID]*entities.Alert),
|
||||
byInstrument: make(map[entities.InstrumentID]map[entities.AlertID]*entities.Alert),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *alertsCache) Add(a *entities.Alert) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
c.byID[a.ID] = a
|
||||
|
||||
if _, ok := c.byInstrument[a.Instrument.ID]; !ok {
|
||||
c.byInstrument[a.Instrument.ID] = make(map[entities.AlertID]*entities.Alert)
|
||||
}
|
||||
c.byInstrument[a.Instrument.ID][a.ID] = a
|
||||
}
|
||||
|
||||
func (c *alertsCache) Remove(id entities.AlertID) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
a, ok := c.byID[id]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
delete(c.byID, id)
|
||||
delete(c.byInstrument[a.Instrument.ID], id)
|
||||
}
|
||||
|
||||
func (c *alertsCache) AlertsByInstrument(id entities.InstrumentID) []*entities.Alert {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
alerts := c.byInstrument[id]
|
||||
result := make([]*entities.Alert, 0, len(alerts))
|
||||
for _, a := range alerts {
|
||||
result = append(result, a)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (c *alertsCache) Instruments() []entities.Instrument {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
instruments := make([]entities.Instrument, 0, len(c.byInstrument))
|
||||
for _, alerts := range c.byInstrument {
|
||||
for _, a := range alerts {
|
||||
instruments = append(instruments, a.Instrument)
|
||||
break
|
||||
}
|
||||
}
|
||||
return instruments
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue