Cloudflare Workflows System

Overview

PadawanForge implements a comprehensive Cloudflare Workflows system that provides reliable, long-running background processing for player progression, customer management, and system maintenance tasks. This system leverages Cloudflare’s serverless workflow capabilities for complex orchestration patterns.

Architecture

Core Components

  • Player Workflows: Level progression, achievements, daily rewards
  • Customer Workflows: Subscription management, billing processing
  • System Workflows: Maintenance, cleanup, analytics processing
  • Workflow Orchestration: Complex multi-step processes
  • State Management: Persistent workflow state

Workflow Features

  • Long-running Tasks: Reliable execution of complex processes
  • State Persistence: Automatic state management across executions
  • Error Handling: Automatic retry and failure recovery
  • Integration: Seamless connection with Workers and Durable Objects
  • Cost Optimization: Efficient resource utilization

Player Workflows

Core Player Workflow

import { PlayerWorkflow } from '@/workflows/player_workflow';

// Player workflow for managing progression and rewards
export class PlayerWorkflow {
  private playerId: string;
  private state: any;

  constructor(playerId: string) {
    this.playerId = playerId;
  }

  // Main workflow entry point
  async run(): Promise<void> {
    try {
      // Initialize workflow state
      await this.initializeState();

      // Process daily rewards
      await this.processDailyRewards();

      // Check for level progression
      await this.checkLevelProgression();

      // Process achievements
      await this.processAchievements();

      // Update player statistics
      await this.updatePlayerStats();

      // Cleanup old data
      await this.cleanupOldData();

    } catch (error) {
      console.error('Player workflow failed:', error);
      throw error;
    }
  }

  // Initialize workflow state
  private async initializeState(): Promise<void> {
    this.state = {
      playerId: this.playerId,
      startTime: new Date(),
      completedSteps: [],
      errors: []
    };
  }

  // Process daily rewards
  private async processDailyRewards(): Promise<void> {
    const lastReward = await this.getLastDailyReward();
    const now = new Date();
    
    if (this.shouldGiveDailyReward(lastReward, now)) {
      const reward = await this.calculateDailyReward();
      await this.grantReward(reward);
      await this.recordDailyReward(now);
      
      this.state.completedSteps.push('daily_rewards');
    }
  }

  // Check for level progression
  private async checkLevelProgression(): Promise<void> {
    const player = await this.getPlayerData();
    const currentLevel = this.calculateLevel(player.experience);
    const previousLevel = player.level;

    if (currentLevel > previousLevel) {
      // Player leveled up
      await this.handleLevelUp(previousLevel, currentLevel);
      this.state.completedSteps.push('level_progression');
    }
  }

  // Process achievements
  private async processAchievements(): Promise<void> {
    const player = await this.getPlayerData();
    const unlockedAchievements = await this.checkAchievements(player);
    
    for (const achievement of unlockedAchievements) {
      await this.unlockAchievement(achievement);
      await this.grantAchievementReward(achievement);
    }

    if (unlockedAchievements.length > 0) {
      this.state.completedSteps.push('achievements');
    }
  }

  // Update player statistics
  private async updatePlayerStats(): Promise<void> {
    const stats = await this.calculatePlayerStats();
    await this.updatePlayerData(stats);
    this.state.completedSteps.push('stats_update');
  }

  // Cleanup old data
  private async cleanupOldData(): Promise<void> {
    const cutoffDate = new Date();
    cutoffDate.setDate(cutoffDate.getDate() - 30); // 30 days ago

    await this.cleanupOldSessions(cutoffDate);
    await this.cleanupOldMessages(cutoffDate);
    await this.cleanupOldLogs(cutoffDate);

    this.state.completedSteps.push('cleanup');
  }
}

Daily Rewards System

// Daily rewards processing
class DailyRewardsProcessor {
  // Check if player should receive daily reward
  private shouldGiveDailyReward(lastReward: Date | null, now: Date): boolean {
    if (!lastReward) return true;

    const lastRewardDate = new Date(lastReward);
    const today = new Date(now);
    
    // Reset time to compare dates only
    lastRewardDate.setHours(0, 0, 0, 0);
    today.setHours(0, 0, 0, 0);
    
    return today.getTime() > lastRewardDate.getTime();
  }

  // Calculate daily reward based on streak
  private async calculateDailyReward(): Promise<DailyReward> {
    const streak = await this.getDailyStreak();
    
    const baseReward = 10; // Base XP
    const streakBonus = Math.min(streak * 2, 50); // Max 50 bonus
    const totalReward = baseReward + streakBonus;

    return {
      experience: totalReward,
      coins: Math.floor(totalReward / 2),
      streak: streak + 1
    };
  }

  // Grant reward to player
  private async grantReward(reward: DailyReward): Promise<void> {
    await this.addExperience(reward.experience);
    await this.addCoins(reward.coins);
    await this.updateStreak(reward.streak);
  }

  // Record daily reward
  private async recordDailyReward(date: Date): Promise<void> {
    await this.db.prepare(`
      INSERT INTO daily_rewards (player_uuid, reward_date, experience_gained, coins_gained, streak)
      VALUES (?, ?, ?, ?, ?)
    `).bind(this.playerId, date.toISOString(), reward.experience, reward.coins, reward.streak).run();
  }
}

Level Progression System

// Level progression handling
class LevelProgressionProcessor {
  // Handle player level up
  private async handleLevelUp(oldLevel: number, newLevel: number): Promise<void> {
    // Calculate level up rewards
    const rewards = this.calculateLevelUpRewards(oldLevel, newLevel);
    
    // Grant rewards
    await this.grantLevelUpRewards(rewards);
    
    // Update player level
    await this.updatePlayerLevel(newLevel);
    
    // Check for level-based achievements
    await this.checkLevelAchievements(newLevel);
    
    // Send notifications
    await this.sendLevelUpNotification(newLevel, rewards);
  }

  // Calculate level up rewards
  private calculateLevelUpRewards(oldLevel: number, newLevel: number): LevelUpRewards {
    const levelsGained = newLevel - oldLevel;
    const baseReward = 100; // Base XP per level
    const totalReward = baseReward * levelsGained;
    
    return {
      experience: totalReward,
      coins: Math.floor(totalReward / 5),
      skillPoints: levelsGained,
      unlockables: this.getUnlockablesForLevel(newLevel)
    };
  }

  // Grant level up rewards
  private async grantLevelUpRewards(rewards: LevelUpRewards): Promise<void> {
    await this.addExperience(rewards.experience);
    await this.addCoins(rewards.coins);
    await this.addSkillPoints(rewards.skillPoints);
    
    for (const unlockable of rewards.unlockables) {
      await this.unlockFeature(unlockable);
    }
  }

  // Check level-based achievements
  private async checkLevelAchievements(level: number): Promise<void> {
    const levelAchievements = [
      { level: 5, name: 'Novice' },
      { level: 10, name: 'Apprentice' },
      { level: 25, name: 'Skilled' },
      { level: 50, name: 'Expert' },
      { level: 100, name: 'Master' }
    ];

    for (const achievement of levelAchievements) {
      if (level >= achievement.level) {
        await this.unlockAchievement(achievement.name);
      }
    }
  }
}

Achievement System

// Achievement processing
class AchievementProcessor {
  // Check for unlocked achievements
  private async checkAchievements(player: PlayerData): Promise<Achievement[]> {
    const allAchievements = await this.getAllAchievements();
    const unlockedAchievements: Achievement[] = [];

    for (const achievement of allAchievements) {
      if (await this.hasUnlockedAchievement(achievement.id)) {
        continue; // Already unlocked
      }

      if (await this.checkAchievementCriteria(achievement, player)) {
        unlockedAchievements.push(achievement);
      }
    }

    return unlockedAchievements;
  }

  // Check achievement criteria
  private async checkAchievementCriteria(achievement: Achievement, player: PlayerData): Promise<boolean> {
    switch (achievement.type) {
      case 'level':
        return player.level >= achievement.criteria.level;
      
      case 'experience':
        return player.experience >= achievement.criteria.experience;
      
      case 'sessions':
        const sessionCount = await this.getSessionCount();
        return sessionCount >= achievement.criteria.sessions;
      
      case 'streak':
        const streak = await this.getDailyStreak();
        return streak >= achievement.criteria.streak;
      
      case 'accuracy':
        const avgAccuracy = await this.getAverageAccuracy();
        return avgAccuracy >= achievement.criteria.accuracy;
      
      default:
        return false;
    }
  }

  // Unlock achievement
  private async unlockAchievement(achievement: Achievement): Promise<void> {
    await this.db.prepare(`
      INSERT INTO player_achievements (player_uuid, achievement_id, unlocked_at)
      VALUES (?, ?, CURRENT_TIMESTAMP)
    `).bind(this.playerId, achievement.id).run();

    // Send notification
    await this.sendAchievementNotification(achievement);
  }

  // Grant achievement reward
  private async grantAchievementReward(achievement: Achievement): Promise<void> {
    const reward = achievement.reward;
    
    if (reward.experience) {
      await this.addExperience(reward.experience);
    }
    
    if (reward.coins) {
      await this.addCoins(reward.coins);
    }
    
    if (reward.items) {
      for (const item of reward.items) {
        await this.addItem(item);
      }
    }
  }
}

Customer Workflows

Customer Workflow Management

import { CustomerWorkflow } from '@/workflows/customer_workflow';

// Customer workflow for subscription and billing management
export class CustomerWorkflow {
  private customerId: string;
  private state: any;

  constructor(customerId: string) {
    this.customerId = customerId;
  }

  // Main workflow entry point
  async run(): Promise<void> {
    try {
      await this.initializeState();

      // Process subscription status
      await this.processSubscriptionStatus();

      // Handle billing cycles
      await this.processBillingCycle();

      // Manage customer data
      await this.manageCustomerData();

      // Process usage analytics
      await this.processUsageAnalytics();

    } catch (error) {
      console.error('Customer workflow failed:', error);
      throw error;
    }
  }

  // Process subscription status
  private async processSubscriptionStatus(): Promise<void> {
    const customer = await this.getCustomerData();
    const subscription = await this.getSubscriptionData();

    if (subscription.status === 'active') {
      await this.processActiveSubscription(customer, subscription);
    } else if (subscription.status === 'past_due') {
      await this.processPastDueSubscription(customer, subscription);
    } else if (subscription.status === 'canceled') {
      await this.processCanceledSubscription(customer, subscription);
    }

    this.state.completedSteps.push('subscription_status');
  }

  // Process active subscription
  private async processActiveSubscription(customer: any, subscription: any): Promise<void> {
    // Check for upcoming renewals
    const renewalDate = new Date(subscription.current_period_end);
    const now = new Date();
    const daysUntilRenewal = Math.ceil((renewalDate.getTime() - now.getTime()) / (1000 * 60 * 60 * 24));

    if (daysUntilRenewal <= 7) {
      await this.sendRenewalReminder(customer, subscription, daysUntilRenewal);
    }

    // Process usage limits
    await this.checkUsageLimits(customer, subscription);
  }

  // Process past due subscription
  private async processPastDueSubscription(customer: any, subscription: any): Promise<void> {
    const daysPastDue = this.calculateDaysPastDue(subscription.current_period_end);

    if (daysPastDue >= 30) {
      // Cancel subscription after 30 days
      await this.cancelSubscription(subscription.id, 'non_payment');
    } else if (daysPastDue >= 7) {
      // Send final notice
      await this.sendFinalNotice(customer, subscription);
    } else {
      // Send payment reminder
      await this.sendPaymentReminder(customer, subscription);
    }
  }

  // Process billing cycle
  private async processBillingCycle(): Promise<void> {
    const subscriptions = await this.getActiveSubscriptions();
    
    for (const subscription of subscriptions) {
      if (this.isBillingDue(subscription)) {
        await this.processBilling(subscription);
      }
    }

    this.state.completedSteps.push('billing_cycle');
  }

  // Process billing for subscription
  private async processBilling(subscription: any): Promise<void> {
    try {
      const payment = await this.chargeCustomer(subscription);
      
      if (payment.status === 'succeeded') {
        await this.recordSuccessfulPayment(subscription, payment);
        await this.extendSubscription(subscription);
      } else {
        await this.handleFailedPayment(subscription, payment);
      }
    } catch (error) {
      await this.handleBillingError(subscription, error);
    }
  }
}

Subscription Management

// Subscription processing
class SubscriptionProcessor {
  // Check usage limits
  private async checkUsageLimits(customer: any, subscription: any): Promise<void> {
    const plan = await this.getPlanDetails(subscription.plan_id);
    const usage = await this.getCurrentUsage(customer.id);

    if (usage.apiCalls > plan.limits.apiCalls * 0.9) {
      await this.sendUsageWarning(customer, usage, plan.limits);
    }

    if (usage.storage > plan.limits.storage * 0.9) {
      await this.sendStorageWarning(customer, usage, plan.limits);
    }
  }

  // Handle failed payment
  private async handleFailedPayment(subscription: any, payment: any): Promise<void> {
    await this.recordFailedPayment(subscription, payment);
    
    // Update subscription status
    await this.updateSubscriptionStatus(subscription.id, 'past_due');
    
    // Send payment failure notification
    await this.sendPaymentFailureNotification(subscription, payment);
    
    // Schedule retry
    await this.schedulePaymentRetry(subscription);
  }

  // Schedule payment retry
  private async schedulePaymentRetry(subscription: any): Promise<void> {
    const retrySchedule = [1, 3, 7, 14, 30]; // Days to retry
    const retryCount = await this.getRetryCount(subscription.id);
    
    if (retryCount < retrySchedule.length) {
      const retryDate = new Date();
      retryDate.setDate(retryDate.getDate() + retrySchedule[retryCount]);
      
      await this.scheduleWorkflow('payment_retry', {
        subscriptionId: subscription.id,
        retryDate: retryDate.toISOString()
      });
    }
  }

  // Cancel subscription
  private async cancelSubscription(subscriptionId: string, reason: string): Promise<void> {
    await this.updateSubscriptionStatus(subscriptionId, 'canceled');
    await this.recordCancellation(subscriptionId, reason);
    
    // Send cancellation notification
    const customer = await this.getCustomerBySubscription(subscriptionId);
    await this.sendCancellationNotification(customer, reason);
    
    // Schedule cleanup
    await this.scheduleWorkflow('subscription_cleanup', {
      subscriptionId,
      cleanupDate: new Date(Date.now() + 30 * 24 * 60 * 60 * 1000).toISOString() // 30 days
    });
  }
}

System Workflows

Maintenance Workflows

// System maintenance workflow
export class SystemMaintenanceWorkflow {
  async run(): Promise<void> {
    try {
      // Database cleanup
      await this.cleanupDatabase();
      
      // Log rotation
      await this.rotateLogs();
      
      // Cache invalidation
      await this.invalidateCache();
      
      // Health checks
      await this.performHealthChecks();
      
      // Analytics processing
      await this.processAnalytics();
      
    } catch (error) {
      console.error('System maintenance failed:', error);
      throw error;
    }
  }

  // Cleanup database
  private async cleanupDatabase(): Promise<void> {
    // Cleanup old sessions
    await this.cleanupOldSessions();
    
    // Cleanup old messages
    await this.cleanupOldMessages();
    
    // Cleanup old logs
    await this.cleanupOldLogs();
    
    // Optimize database
    await this.optimizeDatabase();
  }

  // Rotate logs
  private async rotateLogs(): Promise<void> {
    const logFiles = await this.getLogFiles();
    const cutoffDate = new Date();
    cutoffDate.setDate(cutoffDate.getDate() - 7); // Keep 7 days

    for (const logFile of logFiles) {
      if (logFile.createdAt < cutoffDate) {
        await this.archiveLogFile(logFile);
        await this.deleteLogFile(logFile);
      }
    }
  }

  // Invalidate cache
  private async invalidateCache(): Promise<void> {
    const cacheKeys = await this.getExpiredCacheKeys();
    
    for (const key of cacheKeys) {
      await this.invalidateCacheKey(key);
    }
  }

  // Perform health checks
  private async performHealthChecks(): Promise<void> {
    const checks = [
      this.checkDatabaseHealth(),
      this.checkKVHealth(),
      this.checkR2Health(),
      this.checkDurableObjectsHealth()
    ];

    const results = await Promise.allSettled(checks);
    
    for (const result of results) {
      if (result.status === 'rejected') {
        await this.alertHealthCheckFailure(result.reason);
      }
    }
  }
}

Workflow Orchestration

Complex Workflow Patterns

// Workflow orchestrator for complex processes
class WorkflowOrchestrator {
  // Execute workflow with retry logic
  async executeWithRetry<T>(
    workflow: () => Promise<T>,
    maxRetries: number = 3,
    delayMs: number = 1000
  ): Promise<T> {
    let lastError: Error;

    for (let attempt = 1; attempt <= maxRetries; attempt++) {
      try {
        return await workflow();
      } catch (error) {
        lastError = error as Error;
        
        if (attempt === maxRetries) {
          throw lastError;
        }

        // Exponential backoff
        const backoffDelay = delayMs * Math.pow(2, attempt - 1);
        await new Promise(resolve => setTimeout(resolve, backoffDelay));
      }
    }

    throw lastError!;
  }

  // Execute parallel workflows
  async executeParallel<T>(
    workflows: (() => Promise<T>)[]
  ): Promise<T[]> {
    return await Promise.all(workflows.map(wf => wf()));
  }

  // Execute sequential workflows
  async executeSequential<T>(
    workflows: (() => Promise<T>)[]
  ): Promise<T[]> {
    const results: T[] = [];
    
    for (const workflow of workflows) {
      const result = await workflow();
      results.push(result);
    }
    
    return results;
  }

  // Execute conditional workflow
  async executeConditional<T>(
    condition: () => Promise<boolean>,
    trueWorkflow: () => Promise<T>,
    falseWorkflow: () => Promise<T>
  ): Promise<T> {
    const shouldExecute = await condition();
    
    if (shouldExecute) {
      return await trueWorkflow();
    } else {
      return await falseWorkflow();
    }
  }
}

Integration Examples

Workflow Triggering

// Trigger workflows from API endpoints
export async function POST(request: Request, locals: any) {
  const url = new URL(request.url);
  const body = await request.json();

  if (url.pathname === '/api/workflows/player') {
    const { playerId } = body;
    
    // Trigger player workflow
    const workflowId = await locals.runtime.env.PLAYER_WORKFLOW.start({
      playerId,
      triggerTime: new Date().toISOString()
    });

    return new Response(JSON.stringify({ workflowId }), {
      status: 202,
      headers: { 'Content-Type': 'application/json' }
    });
  }

  if (url.pathname === '/api/workflows/customer') {
    const { customerId } = body;
    
    // Trigger customer workflow
    const workflowId = await locals.runtime.env.CUSTOMER_WORKFLOW.start({
      customerId,
      triggerTime: new Date().toISOString()
    });

    return new Response(JSON.stringify({ workflowId }), {
      status: 202,
      headers: { 'Content-Type': 'application/json' }
    });
  }

  return new Response('Not found', { status: 404 });
}

Workflow Status Checking

// Check workflow status
export async function GET(request: Request, locals: any) {
  const url = new URL(request.url);
  const workflowId = url.searchParams.get('id');

  if (!workflowId) {
    return new Response('Workflow ID required', { status: 400 });
  }

  try {
    const status = await locals.runtime.env.PLAYER_WORKFLOW.getStatus(workflowId);
    
    return new Response(JSON.stringify(status), {
      status: 200,
      headers: { 'Content-Type': 'application/json' }
    });
  } catch (error) {
    return new Response('Workflow not found', { status: 404 });
  }
}

Testing

Workflow Testing

describe('Workflows', () => {
  let playerWorkflow: PlayerWorkflow;
  let customerWorkflow: CustomerWorkflow;

  beforeEach(() => {
    playerWorkflow = new PlayerWorkflow('test-player');
    customerWorkflow = new CustomerWorkflow('test-customer');
  });

  it('should process daily rewards', async () => {
    // Mock player data
    jest.spyOn(playerWorkflow, 'getPlayerData').mockResolvedValue({
      id: 'test-player',
      experience: 1000,
      level: 5
    });

    await playerWorkflow.run();

    expect(playerWorkflow.state.completedSteps).toContain('daily_rewards');
  });

  it('should handle level progression', async () => {
    // Mock player with enough experience for level up
    jest.spyOn(playerWorkflow, 'getPlayerData').mockResolvedValue({
      id: 'test-player',
      experience: 2500,
      level: 4
    });

    await playerWorkflow.run();

    expect(playerWorkflow.state.completedSteps).toContain('level_progression');
  });

  it('should process customer subscription', async () => {
    // Mock customer data
    jest.spyOn(customerWorkflow, 'getCustomerData').mockResolvedValue({
      id: 'test-customer',
      status: 'active'
    });

    await customerWorkflow.run();

    expect(customerWorkflow.state.completedSteps).toContain('subscription_status');
  });
});

This comprehensive Cloudflare Workflows system provides reliable, long-running background processing for complex business logic with automatic state management and error handling.

PadawanForge v1.4.1