Files
Novault-backend/internal/service/sync_scheduler.go
2026-01-25 21:59:00 +08:00

169 lines
4.6 KiB
Go

package service
import (
"context"
"log"
"sync"
"time"
"accounting-app/internal/cache"
)
// SyncScheduler handles scheduled synchronization of exchange rates
// It uses ExchangeRateServiceV2 to fetch rates from YunAPI and update Redis cache
type SyncScheduler struct {
service *ExchangeRateServiceV2
interval time.Duration
stopChan chan struct{}
mu sync.Mutex
running bool
}
// NewSyncScheduler creates a new SyncScheduler instance
// interval specifies the time between sync operations (default: 10 minutes)
func NewSyncScheduler(service *ExchangeRateServiceV2, interval time.Duration) *SyncScheduler {
if interval <= 0 {
interval = 10 * time.Minute // Default to 10 minutes
}
return &SyncScheduler{
service: service,
interval: interval,
stopChan: make(chan struct{}),
running: false,
}
}
// Start begins the scheduled synchronization of exchange rates
// It performs an initial sync immediately, then syncs every interval
// This method blocks until Stop() is called or context is cancelled
// Requirements: 3.1 (sync on start), 3.2 (10-minute interval)
func (s *SyncScheduler) Start(ctx context.Context) {
s.mu.Lock()
if s.running {
s.mu.Unlock()
log.Println("[SyncScheduler] Scheduler is already running")
return
}
s.running = true
s.stopChan = make(chan struct{}) // Reset stop channel
s.mu.Unlock()
log.Printf("[SyncScheduler] Starting exchange rate sync scheduler with interval: %v", s.interval)
// Perform initial sync immediately (Requirement 3.1)
s.performSync(ctx)
// Create ticker for periodic sync
ticker := time.NewTicker(s.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.performSync(ctx)
case <-s.stopChan:
log.Println("[SyncScheduler] Scheduler stopped by Stop() call")
s.mu.Lock()
s.running = false
s.mu.Unlock()
return
case <-ctx.Done():
log.Println("[SyncScheduler] Scheduler stopped due to context cancellation")
s.mu.Lock()
s.running = false
s.mu.Unlock()
return
}
}
}
// Stop gracefully stops the scheduler
// It signals the scheduler to stop and waits for it to finish
func (s *SyncScheduler) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
if !s.running {
log.Println("[SyncScheduler] Scheduler is not running")
return
}
log.Println("[SyncScheduler] Stopping scheduler...")
close(s.stopChan)
}
// IsRunning returns whether the scheduler is currently running
func (s *SyncScheduler) IsRunning() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.running
}
// GetInterval returns the sync interval
func (s *SyncScheduler) GetInterval() time.Duration {
return s.interval
}
// performSync executes a single sync operation
// It fetches rates from YunAPI, updates Redis cache, and records sync status
// Requirements: 3.4 (record sync time and status)
func (s *SyncScheduler) performSync(ctx context.Context) {
log.Println("[SyncScheduler] Starting exchange rate sync...")
startTime := time.Now()
// Get the API client and cache from the service
client := s.service.GetClient()
rateCache := s.service.GetCache()
// Fetch rates from YunAPI (uses exponential backoff retry internally)
rates, err := client.FetchRates()
if err != nil {
log.Printf("[SyncScheduler] Failed to fetch rates: %v", err)
s.updateSyncStatus(ctx, rateCache, false, 0, err.Error())
return
}
// Update Redis cache with fetched rates
if err := rateCache.SetAll(ctx, rates); err != nil {
log.Printf("[SyncScheduler] Failed to update cache: %v", err)
s.updateSyncStatus(ctx, rateCache, false, 0, err.Error())
return
}
// Update sync status to success
s.updateSyncStatus(ctx, rateCache, true, len(rates), "")
elapsed := time.Since(startTime)
log.Printf("[SyncScheduler] Sync completed successfully: %d rates updated in %v", len(rates), elapsed)
}
// updateSyncStatus updates the sync status in Redis cache
// Requirements: 3.4 (record sync time and status)
func (s *SyncScheduler) updateSyncStatus(ctx context.Context, rateCache *cache.ExchangeRateCache, success bool, ratesCount int, errorMsg string) {
status := &cache.SyncStatus{
LastSyncTime: time.Now(),
NextSyncTime: time.Now().Add(s.interval),
RatesCount: ratesCount,
}
if success {
status.LastSyncStatus = "success"
} else {
status.LastSyncStatus = "failed"
status.ErrorMessage = errorMsg
}
if err := rateCache.SetSyncStatus(ctx, status); err != nil {
log.Printf("[SyncScheduler] Warning: failed to update sync status: %v", err)
}
}
// ForceSync triggers an immediate sync operation outside of the regular schedule
// This can be used for manual refresh requests
func (s *SyncScheduler) ForceSync(ctx context.Context) error {
log.Println("[SyncScheduler] Force sync triggered")
s.performSync(ctx)
return nil
}