⚑ RED-DISCORDBOT BACKGROUND TASKS MASTERY

⚑ RED-DISCORDBOT BACKGROUND TASKS MASTERY #

Ultra-Condensed Enlightenment Guide - Real Red Patterns


πŸš€ RED COG TASK LIFECYCLE #

  • 🎯 Cog Methods: cog_load() for task creation, cog_unload() for cleanup
  • πŸ”§ Task Storage: Store as instance attributes: self.task_name = None
  • πŸ“Š Red Ready Pattern: await self.bot.wait_until_red_ready() before operations
  • 🎨 Bot Closed Check: while not self.bot.is_closed(): for main loops
  • πŸ”„ Task Creation: asyncio.create_task() in cog_load() method
  • πŸ›‘οΈ Cleanup Pattern: task.cancel() in cog_unload() if not done
  • πŸ“¦ Logging Setup: log = logging.getLogger("red.cogs.cogname")
  • 🎯 Exception Handling: Try/except with proper logging in task loops
import asyncio
import logging
from redbot.core import commands

log = logging.getLogger("red.cogs.mycog")

class MyCog(commands.Cog):
    def __init__(self, bot):
        self.bot = bot
        self.background_task = None
    
    async def cog_load(self):
        """Red's cog initialization method"""
        self.background_task = asyncio.create_task(self._background_loop())
    
    def cog_unload(self):
        """Red's cog cleanup method"""
        if self.background_task and not self.background_task.done():
            self.background_task.cancel()
    
    async def _background_loop(self):
        """Main background task loop"""
        await self.bot.wait_until_red_ready()
        while not self.bot.is_closed():
            try:
                await self._do_work()
                await asyncio.sleep(60)
            except Exception as e:
                log.exception("Background task error: %s", e)
                await asyncio.sleep(300)  # Error recovery delay

πŸ”„ RED CONFIG INTEGRATION PATTERNS #

  • 🎯 Config Access: Use self.config for dynamic settings in tasks
  • πŸ”§ Guild-Specific Tasks: Check guild configs in loops
  • πŸ“Š Dynamic Intervals: Read timer values from config
  • 🎨 Feature Toggles: Enable/disable features via config
  • πŸ”„ Config Watching: React to config changes during execution
  • πŸ›‘οΈ Default Fallbacks: Use sensible defaults when config missing
  • πŸ“¦ Batch Config Reads: Cache config values for performance
  • 🎯 Cross-Guild Processing: Handle multiple guild configurations
async def _stream_monitoring_loop(self):
    """Real pattern from Streams cog"""
    await self.bot.wait_until_red_ready()
    while not self.bot.is_closed():
        try:
            # Get dynamic timer from config
            timer = await self.config.refresh_timer()
            if timer < 60:
                timer = 60  # Minimum interval
            
            # Process all guilds
            for guild in self.bot.guilds:
                guild_config = self.config.guild(guild)
                if await guild_config.enabled():
                    await self._check_guild_streams(guild)
            
            await asyncio.sleep(timer)
        except Exception as error:
            log.exception("Stream monitoring error: %s", error)
            await asyncio.sleep(300)

async def _check_guild_streams(self, guild):
    """Process streams for specific guild"""
    streams = await self.config.guild(guild).streams()
    for stream_data in streams:
        try:
            await self._process_single_stream(guild, stream_data)
        except Exception as e:
            log.error("Failed to check stream in %s: %s", guild.name, e)

πŸ—οΈ COMPLEX COG INITIALIZATION #

  • 🎯 Multi-Phase Setup: Database β†’ API β†’ Services β†’ Background tasks
  • πŸ”§ Resource Management: Database connections, HTTP sessions, APIs
  • πŸ“Š Migration Handling: Schema updates during initialization
  • 🎨 Dependency Loading: Wait for required cogs/services
  • πŸ”„ Event Coordination: Use asyncio.Event() for readiness signaling
  • πŸ›‘οΈ Failure Recovery: Graceful degradation when components fail
  • πŸ“¦ Data Path Usage: cog_data_path(self) for file storage
  • 🎯 Version Management: Handle schema/data version upgrades
from redbot.core.data_manager import cog_data_path
from redbot.core.utils.common_filters import escape_spoilers_and_mass_mentions

class AudioCog(commands.Cog):
    def __init__(self, bot):
        self.bot = bot
        self.session = None
        self.db_conn = None
        self.api_interface = None
        self.playlist_api = None
        self.cog_ready_event = asyncio.Event()
        self.player_automated_timer_task = None
    
    async def cog_load(self):
        """Complex initialization from Audio cog"""
        self.session = aiohttp.ClientSession()
        asyncio.create_task(self.initialize())
    
    async def initialize(self):
        await self.bot.wait_until_red_ready()
        try:
            # Phase 1: Database
            db_path = cog_data_path(self) / "Audio.db"
            self.db_conn = APSWConnectionWrapper(str(db_path))
            
            # Phase 2: API Setup
            self.api_interface = AudioAPIInterface(
                self.bot, self.config, self.session, self.db_conn
            )
            await self.api_interface.initialize()
            
            # Phase 3: Services
            self.playlist_api = PlaylistWrapper(
                self.bot, self.config, self.db_conn
            )
            await self.playlist_api.init()
            
            # Phase 4: Migration
            await self.data_schema_migration()
            
            # Phase 5: Background tasks
            self.player_automated_timer_task = asyncio.create_task(
                self.player_automated_timer()
            )
            
            # Signal ready
            self.cog_ready_event.set()
            
        except Exception as exc:
            log.critical("Audio cog initialization failed: %s", exc)

πŸ›‘οΈ RED ERROR HANDLING PATTERNS #

  • 🎯 Exception Isolation: Individual task failures don’t crash cog
  • πŸ”§ Red Logging: Use log.exception() with proper logger names
  • πŸ“Š Guild-Specific Errors: Continue processing other guilds on failure
  • 🎨 API Rate Limits: Handle Discord API and external API limits
  • πŸ”„ Exponential Backoff: Increase delays for persistent failures
  • πŸ›‘οΈ Resource Cleanup: Ensure connections/sessions are closed
  • πŸ“¦ Error Context: Include guild/user/command context in logs
  • 🎯 Graceful Degradation: Disable features on repeated failures
async def _robust_api_task(self):
    """Production-ready error handling"""
    consecutive_failures = 0
    base_delay = 60
    max_delay = 3600  # 1 hour max
    
    while not self.bot.is_closed():
        try:
            await self._perform_api_work()
            consecutive_failures = 0  # Reset on success
            await asyncio.sleep(base_delay)
            
        except aiohttp.ClientError as e:
            consecutive_failures += 1
            delay = min(base_delay * (2  consecutive_failures), max_delay)
            log.warning(
                "API error (attempt %d), retrying in %ds: %s",
                consecutive_failures, delay, e
            )
            await asyncio.sleep(delay)
            
        except discord.HTTPException as e:
            if e.status == 429:  # Rate limited
                retry_after = e.response.headers.get('Retry-After', 60)
                log.warning("Discord rate limit, waiting %s seconds", retry_after)
                await asyncio.sleep(float(retry_after))
            else:
                log.error("Discord API error: %s", e)
                await asyncio.sleep(300)
                
        except Exception as e:
            log.exception("Unexpected error in background task: %s", e)
            await asyncio.sleep(600)  # Long delay for unknown errors

πŸ“Š RED PERFORMANCE PATTERNS #

  • 🎯 AsyncIter Usage: redbot.core.utils.AsyncIter for large datasets
  • πŸ”§ Batch Processing: Process multiple items together efficiently
  • πŸ“Š Connection Reuse: Single aiohttp session per cog
  • 🎨 Cache Integration: Use Red’s config caching features
  • πŸ”„ Memory Management: Explicit cleanup of large objects
  • πŸ›‘οΈ Concurrency Limits: Prevent overwhelming Discord API
  • πŸ“¦ Lazy Loading: Defer expensive operations until needed
  • 🎯 Resource Pooling: Reuse expensive resources across tasks
from redbot.core.utils import AsyncIter
import aiohttp

async def process_large_guild_list(self, guilds):
    """Efficient processing using Red's AsyncIter"""
    # Process guilds in batches
    async for batch in AsyncIter(guilds, steps=10):
        tasks = []
        for guild in batch:
            # Limit concurrent operations
            if len(tasks) >= 5:
                results = await asyncio.gather(*tasks, return_exceptions=True)
                for result in results:
                    if isinstance(result, Exception):
                        log.error("Guild processing error: %s", result)
                tasks = []
            
            task = asyncio.create_task(self._process_guild(guild))
            tasks.append(task)
        
        # Process remaining tasks
        if tasks:
            await asyncio.gather(*tasks, return_exceptions=True)
        
        # Yield control to other tasks
        await asyncio.sleep(0)

async def _process_guild(self, guild):
    """Process individual guild with rate limiting"""
    try:
        # Use cached config when possible
        config = self.config.guild(guild)
        settings = await config.all()  # Single config call
        
        if not settings.get('enabled', False):
            return
        
        # Process guild data
        await self._do_guild_work(guild, settings)
        
    except Exception as e:
        log.error("Failed to process guild %s (%d): %s", 
                 guild.name, guild.id, e)

πŸ”„ RED TASK COORDINATION #

  • 🎯 Cog Dependencies: Wait for other cogs to be ready
  • πŸ”§ Event Coordination: Use asyncio.Event() for synchronization
  • πŸ“Š Queue Processing: Handle command queues and work items
  • 🎨 Lock Usage: Protect shared resources with asyncio.Lock()
  • πŸ”„ Condition Variables: Complex state coordination
  • πŸ›‘οΈ Semaphore Limits: Control concurrent operations
  • πŸ“¦ Cross-Cog Communication: Safe inter-cog task coordination
  • 🎯 Bot Event Integration: React to Discord events in tasks
class CoordinatedCog(commands.Cog):
    def __init__(self, bot):
        self.bot = bot
        self.ready_event = asyncio.Event()
        self.work_queue = asyncio.Queue()
        self.processing_lock = asyncio.Lock()
        self.worker_semaphore = asyncio.Semaphore(3)
        
    async def cog_load(self):
        # Start coordinator tasks
        asyncio.create_task(self._producer_task())
        asyncio.create_task(self._worker_task())
        asyncio.create_task(self._monitor_task())
    
    async def _producer_task(self):
        """Generate work items"""
        await self.bot.wait_until_red_ready()
        self.ready_event.set()
        
        while not self.bot.is_closed():
            # Check for new work from config/events
            work_items = await self._generate_work()
            for item in work_items:
                await self.work_queue.put(item)
            await asyncio.sleep(30)
    
    async def _worker_task(self):
        """Process work items with coordination"""
        await self.ready_event.wait()
        
        while not self.bot.is_closed():
            try:
                # Get work with timeout
                work_item = await asyncio.wait_for(
                    self.work_queue.get(), timeout=5.0
                )
                
                # Process with semaphore limiting
                async with self.worker_semaphore:
                    await self._process_work_item(work_item)
                    
            except asyncio.TimeoutError:
                continue  # No work available
            except Exception as e:
                log.exception("Worker task error: %s", e)

# React to Discord events in background tasks
@commands.Cog.listener()
async def on_guild_join(self, guild):
    """Trigger background processing for new guild"""
    if hasattr(self, 'work_queue'):
        await self.work_queue.put(('new_guild', guild))

🎯 RED LIFECYCLE BEST PRACTICES #

  • 🎯 Proper Cleanup: Always cancel tasks in cog_unload()
  • πŸ”§ Resource Management: Close aiohttp sessions and database connections
  • πŸ“Š Graceful Shutdown: Allow current operations to complete
  • 🎨 State Persistence: Save important state before shutdown
  • πŸ”„ Restart Recovery: Handle bot restarts gracefully
  • πŸ›‘οΈ Timeout Handling: Force cleanup if graceful shutdown fails
  • πŸ“¦ Memory Cleanup: Clear large data structures
  • 🎯 Event Cleanup: Remove event listeners if needed
class LifecycleCog(commands.Cog):
    def __init__(self, bot):
        self.bot = bot
        self.session = None
        self.db_conn = None
        self.background_tasks = []
        self.shutdown_event = asyncio.Event()
    
    async def cog_load(self):
        """Initialize resources and tasks"""
        self.session = aiohttp.ClientSession()
        
        # Create and track all background tasks
        tasks = [
            asyncio.create_task(self._main_loop()),
            asyncio.create_task(self._cleanup_loop()),
            asyncio.create_task(self._monitor_loop())
        ]
        self.background_tasks.extend(tasks)
    
    def cog_unload(self):
        """Clean shutdown of all resources"""
        log.info("Shutting down %s cog", self.__class__.__name__)
        
        # Signal shutdown to all tasks
        self.shutdown_event.set()
        
        # Cancel all background tasks
        for task in self.background_tasks:
            if not task.done():
                task.cancel()
        
        # Schedule resource cleanup
        asyncio.create_task(self._cleanup_resources())
    
    async def _cleanup_resources(self):
        """Clean up resources with timeout"""
        try:
            # Wait for tasks to complete gracefully
            if self.background_tasks:
                await asyncio.wait_for(
                    asyncio.gather(*self.background_tasks, return_exceptions=True),
                    timeout=10.0
                )
        except asyncio.TimeoutError:
            log.warning("Some background tasks didn't complete in time")
        
        # Close external resources
        if self.session and not self.session.closed:
            await self.session.close()
        
        if self.db_conn:
            self.db_conn.close()
        
        log.info("Cog cleanup completed")
    
    async def _main_loop(self):
        """Main task with shutdown awareness"""
        await self.bot.wait_until_red_ready()
        
        while not self.bot.is_closed() and not self.shutdown_event.is_set():
            try:
                await self._do_work()
                
                # Use shutdown event for early termination
                try:
                    await asyncio.wait_for(
                        self.shutdown_event.wait(), timeout=60.0
                    )
                    break  # Shutdown requested
                except asyncio.TimeoutError:
                    continue  # Normal operation
                    
            except Exception as e:
                log.exception("Main loop error: %s", e)
                if not self.shutdown_event.is_set():
                    await asyncio.sleep(300)

πŸ’‘ REAL RED PATTERNS IN PRODUCTION #

  • 🎯 Audio Queue Management: Complex state with player coordination
  • πŸ”§ Stream Monitoring: Multi-platform API polling with caching
  • πŸ“Š Economy Tasks: Transaction processing and interest calculations
  • 🎨 Moderation Automation: Automated actions based on user behavior
  • πŸ”„ Trivia Timing: Game state management with timeouts
  • πŸ›‘οΈ Bank Operations: Atomic transactions with rollback support
  • πŸ“¦ Playlist Management: Background playlist processing and caching
  • 🎯 Auto-Moderation: Real-time message analysis and action
# Real Audio cog pattern - Player automation
async def player_automated_timer(self):
    """From Audio cog - manages player timeouts"""
    await self.bot.wait_until_red_ready()
    await self.cog_ready_event.wait()
    
    while not self.bot.is_closed():
        try:
            # Check all active players
            for guild_id, player in self.players.copy().items():
                try:
                    if await self._should_disconnect_player(player):
                        await player.stop()
                        await player.disconnect()
                except Exception as e:
                    log.error("Player cleanup error for guild %d: %s", guild_id, e)
            
            await asyncio.sleep(60)  # Check every minute
            
        except Exception as e:
            log.exception("Player timer error: %s", e)
            await asyncio.sleep(300)

# Real Bank cog pattern - Interest calculation
async def _interest_loop(self):
    """Background interest calculation"""
    await self.bot.wait_until_red_ready()
    
    while not self.bot.is_closed():
        try:
            # Calculate interest for all users
            interest_rate = await self.config.interest_rate()
            if interest_rate > 0:
                async for user_data in self._get_all_user_accounts():
                    await self._apply_interest(user_data, interest_rate)
            
            # Wait until next interest period
            next_run = await self._calculate_next_interest_time()
            await asyncio.sleep(next_run)
            
        except Exception as e:
            log.exception("Interest calculation error: %s", e)
            await asyncio.sleep(3600)  # Retry in 1 hour

⚑ Master Red-DiscordBot’s proven background task patterns for rock-solid, efficient, and maintainable cog operations!