init
This commit is contained in:
168
internal/service/sync_scheduler.go
Normal file
168
internal/service/sync_scheduler.go
Normal file
@@ -0,0 +1,168 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"accounting-app/internal/cache"
|
||||
)
|
||||
|
||||
// SyncScheduler handles scheduled synchronization of exchange rates
|
||||
// It uses ExchangeRateServiceV2 to fetch rates from YunAPI and update Redis cache
|
||||
type SyncScheduler struct {
|
||||
service *ExchangeRateServiceV2
|
||||
interval time.Duration
|
||||
stopChan chan struct{}
|
||||
mu sync.Mutex
|
||||
running bool
|
||||
}
|
||||
|
||||
// NewSyncScheduler creates a new SyncScheduler instance
|
||||
// interval specifies the time between sync operations (default: 10 minutes)
|
||||
func NewSyncScheduler(service *ExchangeRateServiceV2, interval time.Duration) *SyncScheduler {
|
||||
if interval <= 0 {
|
||||
interval = 10 * time.Minute // Default to 10 minutes
|
||||
}
|
||||
|
||||
return &SyncScheduler{
|
||||
service: service,
|
||||
interval: interval,
|
||||
stopChan: make(chan struct{}),
|
||||
running: false,
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins the scheduled synchronization of exchange rates
|
||||
// It performs an initial sync immediately, then syncs every interval
|
||||
// This method blocks until Stop() is called or context is cancelled
|
||||
// Requirements: 3.1 (sync on start), 3.2 (10-minute interval)
|
||||
func (s *SyncScheduler) Start(ctx context.Context) {
|
||||
s.mu.Lock()
|
||||
if s.running {
|
||||
s.mu.Unlock()
|
||||
log.Println("[SyncScheduler] Scheduler is already running")
|
||||
return
|
||||
}
|
||||
s.running = true
|
||||
s.stopChan = make(chan struct{}) // Reset stop channel
|
||||
s.mu.Unlock()
|
||||
|
||||
log.Printf("[SyncScheduler] Starting exchange rate sync scheduler with interval: %v", s.interval)
|
||||
|
||||
// Perform initial sync immediately (Requirement 3.1)
|
||||
s.performSync(ctx)
|
||||
|
||||
// Create ticker for periodic sync
|
||||
ticker := time.NewTicker(s.interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
s.performSync(ctx)
|
||||
case <-s.stopChan:
|
||||
log.Println("[SyncScheduler] Scheduler stopped by Stop() call")
|
||||
s.mu.Lock()
|
||||
s.running = false
|
||||
s.mu.Unlock()
|
||||
return
|
||||
case <-ctx.Done():
|
||||
log.Println("[SyncScheduler] Scheduler stopped due to context cancellation")
|
||||
s.mu.Lock()
|
||||
s.running = false
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stop gracefully stops the scheduler
|
||||
// It signals the scheduler to stop and waits for it to finish
|
||||
func (s *SyncScheduler) Stop() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if !s.running {
|
||||
log.Println("[SyncScheduler] Scheduler is not running")
|
||||
return
|
||||
}
|
||||
|
||||
log.Println("[SyncScheduler] Stopping scheduler...")
|
||||
close(s.stopChan)
|
||||
}
|
||||
|
||||
// IsRunning returns whether the scheduler is currently running
|
||||
func (s *SyncScheduler) IsRunning() bool {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.running
|
||||
}
|
||||
|
||||
// GetInterval returns the sync interval
|
||||
func (s *SyncScheduler) GetInterval() time.Duration {
|
||||
return s.interval
|
||||
}
|
||||
|
||||
// performSync executes a single sync operation
|
||||
// It fetches rates from YunAPI, updates Redis cache, and records sync status
|
||||
// Requirements: 3.4 (record sync time and status)
|
||||
func (s *SyncScheduler) performSync(ctx context.Context) {
|
||||
log.Println("[SyncScheduler] Starting exchange rate sync...")
|
||||
startTime := time.Now()
|
||||
|
||||
// Get the API client and cache from the service
|
||||
client := s.service.GetClient()
|
||||
rateCache := s.service.GetCache()
|
||||
|
||||
// Fetch rates from YunAPI (uses exponential backoff retry internally)
|
||||
rates, err := client.FetchRates()
|
||||
if err != nil {
|
||||
log.Printf("[SyncScheduler] Failed to fetch rates: %v", err)
|
||||
s.updateSyncStatus(ctx, rateCache, false, 0, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// Update Redis cache with fetched rates
|
||||
if err := rateCache.SetAll(ctx, rates); err != nil {
|
||||
log.Printf("[SyncScheduler] Failed to update cache: %v", err)
|
||||
s.updateSyncStatus(ctx, rateCache, false, 0, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// Update sync status to success
|
||||
s.updateSyncStatus(ctx, rateCache, true, len(rates), "")
|
||||
|
||||
elapsed := time.Since(startTime)
|
||||
log.Printf("[SyncScheduler] Sync completed successfully: %d rates updated in %v", len(rates), elapsed)
|
||||
}
|
||||
|
||||
// updateSyncStatus updates the sync status in Redis cache
|
||||
// Requirements: 3.4 (record sync time and status)
|
||||
func (s *SyncScheduler) updateSyncStatus(ctx context.Context, rateCache *cache.ExchangeRateCache, success bool, ratesCount int, errorMsg string) {
|
||||
status := &cache.SyncStatus{
|
||||
LastSyncTime: time.Now(),
|
||||
NextSyncTime: time.Now().Add(s.interval),
|
||||
RatesCount: ratesCount,
|
||||
}
|
||||
|
||||
if success {
|
||||
status.LastSyncStatus = "success"
|
||||
} else {
|
||||
status.LastSyncStatus = "failed"
|
||||
status.ErrorMessage = errorMsg
|
||||
}
|
||||
|
||||
if err := rateCache.SetSyncStatus(ctx, status); err != nil {
|
||||
log.Printf("[SyncScheduler] Warning: failed to update sync status: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ForceSync triggers an immediate sync operation outside of the regular schedule
|
||||
// This can be used for manual refresh requests
|
||||
func (s *SyncScheduler) ForceSync(ctx context.Context) error {
|
||||
log.Println("[SyncScheduler] Force sync triggered")
|
||||
s.performSync(ctx)
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user