Cloudflare Workflows System
Overview
PadawanForge implements a comprehensive Cloudflare Workflows system that provides reliable, long-running background processing for player progression, customer management, and system maintenance tasks. This system leverages Cloudflare’s serverless workflow capabilities for complex orchestration patterns.
Architecture
Core Components
- Player Workflows: Level progression, achievements, daily rewards
- Customer Workflows: Subscription management, billing processing
- System Workflows: Maintenance, cleanup, analytics processing
- Workflow Orchestration: Complex multi-step processes
- State Management: Persistent workflow state
Workflow Features
- Long-running Tasks: Reliable execution of complex processes
- State Persistence: Automatic state management across executions
- Error Handling: Automatic retry and failure recovery
- Integration: Seamless connection with Workers and Durable Objects
- Cost Optimization: Efficient resource utilization
Player Workflows
Core Player Workflow
import { PlayerWorkflow } from '@/workflows/player_workflow';
// Player workflow for managing progression and rewards
export class PlayerWorkflow {
private playerId: string;
private state: any;
constructor(playerId: string) {
this.playerId = playerId;
}
// Main workflow entry point
async run(): Promise<void> {
try {
// Initialize workflow state
await this.initializeState();
// Process daily rewards
await this.processDailyRewards();
// Check for level progression
await this.checkLevelProgression();
// Process achievements
await this.processAchievements();
// Update player statistics
await this.updatePlayerStats();
// Cleanup old data
await this.cleanupOldData();
} catch (error) {
console.error('Player workflow failed:', error);
throw error;
}
}
// Initialize workflow state
private async initializeState(): Promise<void> {
this.state = {
playerId: this.playerId,
startTime: new Date(),
completedSteps: [],
errors: []
};
}
// Process daily rewards
private async processDailyRewards(): Promise<void> {
const lastReward = await this.getLastDailyReward();
const now = new Date();
if (this.shouldGiveDailyReward(lastReward, now)) {
const reward = await this.calculateDailyReward();
await this.grantReward(reward);
await this.recordDailyReward(now);
this.state.completedSteps.push('daily_rewards');
}
}
// Check for level progression
private async checkLevelProgression(): Promise<void> {
const player = await this.getPlayerData();
const currentLevel = this.calculateLevel(player.experience);
const previousLevel = player.level;
if (currentLevel > previousLevel) {
// Player leveled up
await this.handleLevelUp(previousLevel, currentLevel);
this.state.completedSteps.push('level_progression');
}
}
// Process achievements
private async processAchievements(): Promise<void> {
const player = await this.getPlayerData();
const unlockedAchievements = await this.checkAchievements(player);
for (const achievement of unlockedAchievements) {
await this.unlockAchievement(achievement);
await this.grantAchievementReward(achievement);
}
if (unlockedAchievements.length > 0) {
this.state.completedSteps.push('achievements');
}
}
// Update player statistics
private async updatePlayerStats(): Promise<void> {
const stats = await this.calculatePlayerStats();
await this.updatePlayerData(stats);
this.state.completedSteps.push('stats_update');
}
// Cleanup old data
private async cleanupOldData(): Promise<void> {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - 30); // 30 days ago
await this.cleanupOldSessions(cutoffDate);
await this.cleanupOldMessages(cutoffDate);
await this.cleanupOldLogs(cutoffDate);
this.state.completedSteps.push('cleanup');
}
}
Daily Rewards System
// Daily rewards processing
class DailyRewardsProcessor {
// Check if player should receive daily reward
private shouldGiveDailyReward(lastReward: Date | null, now: Date): boolean {
if (!lastReward) return true;
const lastRewardDate = new Date(lastReward);
const today = new Date(now);
// Reset time to compare dates only
lastRewardDate.setHours(0, 0, 0, 0);
today.setHours(0, 0, 0, 0);
return today.getTime() > lastRewardDate.getTime();
}
// Calculate daily reward based on streak
private async calculateDailyReward(): Promise<DailyReward> {
const streak = await this.getDailyStreak();
const baseReward = 10; // Base XP
const streakBonus = Math.min(streak * 2, 50); // Max 50 bonus
const totalReward = baseReward + streakBonus;
return {
experience: totalReward,
coins: Math.floor(totalReward / 2),
streak: streak + 1
};
}
// Grant reward to player
private async grantReward(reward: DailyReward): Promise<void> {
await this.addExperience(reward.experience);
await this.addCoins(reward.coins);
await this.updateStreak(reward.streak);
}
// Record daily reward
private async recordDailyReward(date: Date): Promise<void> {
await this.db.prepare(`
INSERT INTO daily_rewards (player_uuid, reward_date, experience_gained, coins_gained, streak)
VALUES (?, ?, ?, ?, ?)
`).bind(this.playerId, date.toISOString(), reward.experience, reward.coins, reward.streak).run();
}
}
Level Progression System
// Level progression handling
class LevelProgressionProcessor {
// Handle player level up
private async handleLevelUp(oldLevel: number, newLevel: number): Promise<void> {
// Calculate level up rewards
const rewards = this.calculateLevelUpRewards(oldLevel, newLevel);
// Grant rewards
await this.grantLevelUpRewards(rewards);
// Update player level
await this.updatePlayerLevel(newLevel);
// Check for level-based achievements
await this.checkLevelAchievements(newLevel);
// Send notifications
await this.sendLevelUpNotification(newLevel, rewards);
}
// Calculate level up rewards
private calculateLevelUpRewards(oldLevel: number, newLevel: number): LevelUpRewards {
const levelsGained = newLevel - oldLevel;
const baseReward = 100; // Base XP per level
const totalReward = baseReward * levelsGained;
return {
experience: totalReward,
coins: Math.floor(totalReward / 5),
skillPoints: levelsGained,
unlockables: this.getUnlockablesForLevel(newLevel)
};
}
// Grant level up rewards
private async grantLevelUpRewards(rewards: LevelUpRewards): Promise<void> {
await this.addExperience(rewards.experience);
await this.addCoins(rewards.coins);
await this.addSkillPoints(rewards.skillPoints);
for (const unlockable of rewards.unlockables) {
await this.unlockFeature(unlockable);
}
}
// Check level-based achievements
private async checkLevelAchievements(level: number): Promise<void> {
const levelAchievements = [
{ level: 5, name: 'Novice' },
{ level: 10, name: 'Apprentice' },
{ level: 25, name: 'Skilled' },
{ level: 50, name: 'Expert' },
{ level: 100, name: 'Master' }
];
for (const achievement of levelAchievements) {
if (level >= achievement.level) {
await this.unlockAchievement(achievement.name);
}
}
}
}
Achievement System
// Achievement processing
class AchievementProcessor {
// Check for unlocked achievements
private async checkAchievements(player: PlayerData): Promise<Achievement[]> {
const allAchievements = await this.getAllAchievements();
const unlockedAchievements: Achievement[] = [];
for (const achievement of allAchievements) {
if (await this.hasUnlockedAchievement(achievement.id)) {
continue; // Already unlocked
}
if (await this.checkAchievementCriteria(achievement, player)) {
unlockedAchievements.push(achievement);
}
}
return unlockedAchievements;
}
// Check achievement criteria
private async checkAchievementCriteria(achievement: Achievement, player: PlayerData): Promise<boolean> {
switch (achievement.type) {
case 'level':
return player.level >= achievement.criteria.level;
case 'experience':
return player.experience >= achievement.criteria.experience;
case 'sessions':
const sessionCount = await this.getSessionCount();
return sessionCount >= achievement.criteria.sessions;
case 'streak':
const streak = await this.getDailyStreak();
return streak >= achievement.criteria.streak;
case 'accuracy':
const avgAccuracy = await this.getAverageAccuracy();
return avgAccuracy >= achievement.criteria.accuracy;
default:
return false;
}
}
// Unlock achievement
private async unlockAchievement(achievement: Achievement): Promise<void> {
await this.db.prepare(`
INSERT INTO player_achievements (player_uuid, achievement_id, unlocked_at)
VALUES (?, ?, CURRENT_TIMESTAMP)
`).bind(this.playerId, achievement.id).run();
// Send notification
await this.sendAchievementNotification(achievement);
}
// Grant achievement reward
private async grantAchievementReward(achievement: Achievement): Promise<void> {
const reward = achievement.reward;
if (reward.experience) {
await this.addExperience(reward.experience);
}
if (reward.coins) {
await this.addCoins(reward.coins);
}
if (reward.items) {
for (const item of reward.items) {
await this.addItem(item);
}
}
}
}
Customer Workflows
Customer Workflow Management
import { CustomerWorkflow } from '@/workflows/customer_workflow';
// Customer workflow for subscription and billing management
export class CustomerWorkflow {
private customerId: string;
private state: any;
constructor(customerId: string) {
this.customerId = customerId;
}
// Main workflow entry point
async run(): Promise<void> {
try {
await this.initializeState();
// Process subscription status
await this.processSubscriptionStatus();
// Handle billing cycles
await this.processBillingCycle();
// Manage customer data
await this.manageCustomerData();
// Process usage analytics
await this.processUsageAnalytics();
} catch (error) {
console.error('Customer workflow failed:', error);
throw error;
}
}
// Process subscription status
private async processSubscriptionStatus(): Promise<void> {
const customer = await this.getCustomerData();
const subscription = await this.getSubscriptionData();
if (subscription.status === 'active') {
await this.processActiveSubscription(customer, subscription);
} else if (subscription.status === 'past_due') {
await this.processPastDueSubscription(customer, subscription);
} else if (subscription.status === 'canceled') {
await this.processCanceledSubscription(customer, subscription);
}
this.state.completedSteps.push('subscription_status');
}
// Process active subscription
private async processActiveSubscription(customer: any, subscription: any): Promise<void> {
// Check for upcoming renewals
const renewalDate = new Date(subscription.current_period_end);
const now = new Date();
const daysUntilRenewal = Math.ceil((renewalDate.getTime() - now.getTime()) / (1000 * 60 * 60 * 24));
if (daysUntilRenewal <= 7) {
await this.sendRenewalReminder(customer, subscription, daysUntilRenewal);
}
// Process usage limits
await this.checkUsageLimits(customer, subscription);
}
// Process past due subscription
private async processPastDueSubscription(customer: any, subscription: any): Promise<void> {
const daysPastDue = this.calculateDaysPastDue(subscription.current_period_end);
if (daysPastDue >= 30) {
// Cancel subscription after 30 days
await this.cancelSubscription(subscription.id, 'non_payment');
} else if (daysPastDue >= 7) {
// Send final notice
await this.sendFinalNotice(customer, subscription);
} else {
// Send payment reminder
await this.sendPaymentReminder(customer, subscription);
}
}
// Process billing cycle
private async processBillingCycle(): Promise<void> {
const subscriptions = await this.getActiveSubscriptions();
for (const subscription of subscriptions) {
if (this.isBillingDue(subscription)) {
await this.processBilling(subscription);
}
}
this.state.completedSteps.push('billing_cycle');
}
// Process billing for subscription
private async processBilling(subscription: any): Promise<void> {
try {
const payment = await this.chargeCustomer(subscription);
if (payment.status === 'succeeded') {
await this.recordSuccessfulPayment(subscription, payment);
await this.extendSubscription(subscription);
} else {
await this.handleFailedPayment(subscription, payment);
}
} catch (error) {
await this.handleBillingError(subscription, error);
}
}
}
Subscription Management
// Subscription processing
class SubscriptionProcessor {
// Check usage limits
private async checkUsageLimits(customer: any, subscription: any): Promise<void> {
const plan = await this.getPlanDetails(subscription.plan_id);
const usage = await this.getCurrentUsage(customer.id);
if (usage.apiCalls > plan.limits.apiCalls * 0.9) {
await this.sendUsageWarning(customer, usage, plan.limits);
}
if (usage.storage > plan.limits.storage * 0.9) {
await this.sendStorageWarning(customer, usage, plan.limits);
}
}
// Handle failed payment
private async handleFailedPayment(subscription: any, payment: any): Promise<void> {
await this.recordFailedPayment(subscription, payment);
// Update subscription status
await this.updateSubscriptionStatus(subscription.id, 'past_due');
// Send payment failure notification
await this.sendPaymentFailureNotification(subscription, payment);
// Schedule retry
await this.schedulePaymentRetry(subscription);
}
// Schedule payment retry
private async schedulePaymentRetry(subscription: any): Promise<void> {
const retrySchedule = [1, 3, 7, 14, 30]; // Days to retry
const retryCount = await this.getRetryCount(subscription.id);
if (retryCount < retrySchedule.length) {
const retryDate = new Date();
retryDate.setDate(retryDate.getDate() + retrySchedule[retryCount]);
await this.scheduleWorkflow('payment_retry', {
subscriptionId: subscription.id,
retryDate: retryDate.toISOString()
});
}
}
// Cancel subscription
private async cancelSubscription(subscriptionId: string, reason: string): Promise<void> {
await this.updateSubscriptionStatus(subscriptionId, 'canceled');
await this.recordCancellation(subscriptionId, reason);
// Send cancellation notification
const customer = await this.getCustomerBySubscription(subscriptionId);
await this.sendCancellationNotification(customer, reason);
// Schedule cleanup
await this.scheduleWorkflow('subscription_cleanup', {
subscriptionId,
cleanupDate: new Date(Date.now() + 30 * 24 * 60 * 60 * 1000).toISOString() // 30 days
});
}
}
System Workflows
Maintenance Workflows
// System maintenance workflow
export class SystemMaintenanceWorkflow {
async run(): Promise<void> {
try {
// Database cleanup
await this.cleanupDatabase();
// Log rotation
await this.rotateLogs();
// Cache invalidation
await this.invalidateCache();
// Health checks
await this.performHealthChecks();
// Analytics processing
await this.processAnalytics();
} catch (error) {
console.error('System maintenance failed:', error);
throw error;
}
}
// Cleanup database
private async cleanupDatabase(): Promise<void> {
// Cleanup old sessions
await this.cleanupOldSessions();
// Cleanup old messages
await this.cleanupOldMessages();
// Cleanup old logs
await this.cleanupOldLogs();
// Optimize database
await this.optimizeDatabase();
}
// Rotate logs
private async rotateLogs(): Promise<void> {
const logFiles = await this.getLogFiles();
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - 7); // Keep 7 days
for (const logFile of logFiles) {
if (logFile.createdAt < cutoffDate) {
await this.archiveLogFile(logFile);
await this.deleteLogFile(logFile);
}
}
}
// Invalidate cache
private async invalidateCache(): Promise<void> {
const cacheKeys = await this.getExpiredCacheKeys();
for (const key of cacheKeys) {
await this.invalidateCacheKey(key);
}
}
// Perform health checks
private async performHealthChecks(): Promise<void> {
const checks = [
this.checkDatabaseHealth(),
this.checkKVHealth(),
this.checkR2Health(),
this.checkDurableObjectsHealth()
];
const results = await Promise.allSettled(checks);
for (const result of results) {
if (result.status === 'rejected') {
await this.alertHealthCheckFailure(result.reason);
}
}
}
}
Workflow Orchestration
Complex Workflow Patterns
// Workflow orchestrator for complex processes
class WorkflowOrchestrator {
// Execute workflow with retry logic
async executeWithRetry<T>(
workflow: () => Promise<T>,
maxRetries: number = 3,
delayMs: number = 1000
): Promise<T> {
let lastError: Error;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await workflow();
} catch (error) {
lastError = error as Error;
if (attempt === maxRetries) {
throw lastError;
}
// Exponential backoff
const backoffDelay = delayMs * Math.pow(2, attempt - 1);
await new Promise(resolve => setTimeout(resolve, backoffDelay));
}
}
throw lastError!;
}
// Execute parallel workflows
async executeParallel<T>(
workflows: (() => Promise<T>)[]
): Promise<T[]> {
return await Promise.all(workflows.map(wf => wf()));
}
// Execute sequential workflows
async executeSequential<T>(
workflows: (() => Promise<T>)[]
): Promise<T[]> {
const results: T[] = [];
for (const workflow of workflows) {
const result = await workflow();
results.push(result);
}
return results;
}
// Execute conditional workflow
async executeConditional<T>(
condition: () => Promise<boolean>,
trueWorkflow: () => Promise<T>,
falseWorkflow: () => Promise<T>
): Promise<T> {
const shouldExecute = await condition();
if (shouldExecute) {
return await trueWorkflow();
} else {
return await falseWorkflow();
}
}
}
Integration Examples
Workflow Triggering
// Trigger workflows from API endpoints
export async function POST(request: Request, locals: any) {
const url = new URL(request.url);
const body = await request.json();
if (url.pathname === '/api/workflows/player') {
const { playerId } = body;
// Trigger player workflow
const workflowId = await locals.runtime.env.PLAYER_WORKFLOW.start({
playerId,
triggerTime: new Date().toISOString()
});
return new Response(JSON.stringify({ workflowId }), {
status: 202,
headers: { 'Content-Type': 'application/json' }
});
}
if (url.pathname === '/api/workflows/customer') {
const { customerId } = body;
// Trigger customer workflow
const workflowId = await locals.runtime.env.CUSTOMER_WORKFLOW.start({
customerId,
triggerTime: new Date().toISOString()
});
return new Response(JSON.stringify({ workflowId }), {
status: 202,
headers: { 'Content-Type': 'application/json' }
});
}
return new Response('Not found', { status: 404 });
}
Workflow Status Checking
// Check workflow status
export async function GET(request: Request, locals: any) {
const url = new URL(request.url);
const workflowId = url.searchParams.get('id');
if (!workflowId) {
return new Response('Workflow ID required', { status: 400 });
}
try {
const status = await locals.runtime.env.PLAYER_WORKFLOW.getStatus(workflowId);
return new Response(JSON.stringify(status), {
status: 200,
headers: { 'Content-Type': 'application/json' }
});
} catch (error) {
return new Response('Workflow not found', { status: 404 });
}
}
Testing
Workflow Testing
describe('Workflows', () => {
let playerWorkflow: PlayerWorkflow;
let customerWorkflow: CustomerWorkflow;
beforeEach(() => {
playerWorkflow = new PlayerWorkflow('test-player');
customerWorkflow = new CustomerWorkflow('test-customer');
});
it('should process daily rewards', async () => {
// Mock player data
jest.spyOn(playerWorkflow, 'getPlayerData').mockResolvedValue({
id: 'test-player',
experience: 1000,
level: 5
});
await playerWorkflow.run();
expect(playerWorkflow.state.completedSteps).toContain('daily_rewards');
});
it('should handle level progression', async () => {
// Mock player with enough experience for level up
jest.spyOn(playerWorkflow, 'getPlayerData').mockResolvedValue({
id: 'test-player',
experience: 2500,
level: 4
});
await playerWorkflow.run();
expect(playerWorkflow.state.completedSteps).toContain('level_progression');
});
it('should process customer subscription', async () => {
// Mock customer data
jest.spyOn(customerWorkflow, 'getCustomerData').mockResolvedValue({
id: 'test-customer',
status: 'active'
});
await customerWorkflow.run();
expect(customerWorkflow.state.completedSteps).toContain('subscription_status');
});
});
This comprehensive Cloudflare Workflows system provides reliable, long-running background processing for complex business logic with automatic state management and error handling.