β‘ RED-DISCORDBOT BACKGROUND TASKS MASTERY #
Ultra-Condensed Enlightenment Guide - Real Red Patterns
π RED COG TASK LIFECYCLE #
- π― Cog Methods:
cog_load()for task creation,cog_unload()for cleanup - π§ Task Storage: Store as instance attributes:
self.task_name = None - π Red Ready Pattern:
await self.bot.wait_until_red_ready()before operations - π¨ Bot Closed Check:
while not self.bot.is_closed():for main loops - π Task Creation:
asyncio.create_task()incog_load()method - π‘οΈ Cleanup Pattern:
task.cancel()incog_unload()if not done - π¦ Logging Setup:
log = logging.getLogger("red.cogs.cogname") - π― Exception Handling: Try/except with proper logging in task loops
import asyncio
import logging
from redbot.core import commands
log = logging.getLogger("red.cogs.mycog")
class MyCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.background_task = None
async def cog_load(self):
"""Red's cog initialization method"""
self.background_task = asyncio.create_task(self._background_loop())
def cog_unload(self):
"""Red's cog cleanup method"""
if self.background_task and not self.background_task.done():
self.background_task.cancel()
async def _background_loop(self):
"""Main background task loop"""
await self.bot.wait_until_red_ready()
while not self.bot.is_closed():
try:
await self._do_work()
await asyncio.sleep(60)
except Exception as e:
log.exception("Background task error: %s", e)
await asyncio.sleep(300) # Error recovery delay
π RED CONFIG INTEGRATION PATTERNS #
- π― Config Access: Use
self.configfor dynamic settings in tasks - π§ Guild-Specific Tasks: Check guild configs in loops
- π Dynamic Intervals: Read timer values from config
- π¨ Feature Toggles: Enable/disable features via config
- π Config Watching: React to config changes during execution
- π‘οΈ Default Fallbacks: Use sensible defaults when config missing
- π¦ Batch Config Reads: Cache config values for performance
- π― Cross-Guild Processing: Handle multiple guild configurations
async def _stream_monitoring_loop(self):
"""Real pattern from Streams cog"""
await self.bot.wait_until_red_ready()
while not self.bot.is_closed():
try:
# Get dynamic timer from config
timer = await self.config.refresh_timer()
if timer < 60:
timer = 60 # Minimum interval
# Process all guilds
for guild in self.bot.guilds:
guild_config = self.config.guild(guild)
if await guild_config.enabled():
await self._check_guild_streams(guild)
await asyncio.sleep(timer)
except Exception as error:
log.exception("Stream monitoring error: %s", error)
await asyncio.sleep(300)
async def _check_guild_streams(self, guild):
"""Process streams for specific guild"""
streams = await self.config.guild(guild).streams()
for stream_data in streams:
try:
await self._process_single_stream(guild, stream_data)
except Exception as e:
log.error("Failed to check stream in %s: %s", guild.name, e)
ποΈ COMPLEX COG INITIALIZATION #
- π― Multi-Phase Setup: Database β API β Services β Background tasks
- π§ Resource Management: Database connections, HTTP sessions, APIs
- π Migration Handling: Schema updates during initialization
- π¨ Dependency Loading: Wait for required cogs/services
- π Event Coordination: Use asyncio.Event() for readiness signaling
- π‘οΈ Failure Recovery: Graceful degradation when components fail
- π¦ Data Path Usage:
cog_data_path(self)for file storage - π― Version Management: Handle schema/data version upgrades
from redbot.core.data_manager import cog_data_path
from redbot.core.utils.common_filters import escape_spoilers_and_mass_mentions
class AudioCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.session = None
self.db_conn = None
self.api_interface = None
self.playlist_api = None
self.cog_ready_event = asyncio.Event()
self.player_automated_timer_task = None
async def cog_load(self):
"""Complex initialization from Audio cog"""
self.session = aiohttp.ClientSession()
asyncio.create_task(self.initialize())
async def initialize(self):
await self.bot.wait_until_red_ready()
try:
# Phase 1: Database
db_path = cog_data_path(self) / "Audio.db"
self.db_conn = APSWConnectionWrapper(str(db_path))
# Phase 2: API Setup
self.api_interface = AudioAPIInterface(
self.bot, self.config, self.session, self.db_conn
)
await self.api_interface.initialize()
# Phase 3: Services
self.playlist_api = PlaylistWrapper(
self.bot, self.config, self.db_conn
)
await self.playlist_api.init()
# Phase 4: Migration
await self.data_schema_migration()
# Phase 5: Background tasks
self.player_automated_timer_task = asyncio.create_task(
self.player_automated_timer()
)
# Signal ready
self.cog_ready_event.set()
except Exception as exc:
log.critical("Audio cog initialization failed: %s", exc)
π‘οΈ RED ERROR HANDLING PATTERNS #
- π― Exception Isolation: Individual task failures don’t crash cog
- π§ Red Logging: Use
log.exception()with proper logger names - π Guild-Specific Errors: Continue processing other guilds on failure
- π¨ API Rate Limits: Handle Discord API and external API limits
- π Exponential Backoff: Increase delays for persistent failures
- π‘οΈ Resource Cleanup: Ensure connections/sessions are closed
- π¦ Error Context: Include guild/user/command context in logs
- π― Graceful Degradation: Disable features on repeated failures
async def _robust_api_task(self):
"""Production-ready error handling"""
consecutive_failures = 0
base_delay = 60
max_delay = 3600 # 1 hour max
while not self.bot.is_closed():
try:
await self._perform_api_work()
consecutive_failures = 0 # Reset on success
await asyncio.sleep(base_delay)
except aiohttp.ClientError as e:
consecutive_failures += 1
delay = min(base_delay * (2 consecutive_failures), max_delay)
log.warning(
"API error (attempt %d), retrying in %ds: %s",
consecutive_failures, delay, e
)
await asyncio.sleep(delay)
except discord.HTTPException as e:
if e.status == 429: # Rate limited
retry_after = e.response.headers.get('Retry-After', 60)
log.warning("Discord rate limit, waiting %s seconds", retry_after)
await asyncio.sleep(float(retry_after))
else:
log.error("Discord API error: %s", e)
await asyncio.sleep(300)
except Exception as e:
log.exception("Unexpected error in background task: %s", e)
await asyncio.sleep(600) # Long delay for unknown errors
π RED PERFORMANCE PATTERNS #
- π― AsyncIter Usage:
redbot.core.utils.AsyncIterfor large datasets - π§ Batch Processing: Process multiple items together efficiently
- π Connection Reuse: Single aiohttp session per cog
- π¨ Cache Integration: Use Red’s config caching features
- π Memory Management: Explicit cleanup of large objects
- π‘οΈ Concurrency Limits: Prevent overwhelming Discord API
- π¦ Lazy Loading: Defer expensive operations until needed
- π― Resource Pooling: Reuse expensive resources across tasks
from redbot.core.utils import AsyncIter
import aiohttp
async def process_large_guild_list(self, guilds):
"""Efficient processing using Red's AsyncIter"""
# Process guilds in batches
async for batch in AsyncIter(guilds, steps=10):
tasks = []
for guild in batch:
# Limit concurrent operations
if len(tasks) >= 5:
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
log.error("Guild processing error: %s", result)
tasks = []
task = asyncio.create_task(self._process_guild(guild))
tasks.append(task)
# Process remaining tasks
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
# Yield control to other tasks
await asyncio.sleep(0)
async def _process_guild(self, guild):
"""Process individual guild with rate limiting"""
try:
# Use cached config when possible
config = self.config.guild(guild)
settings = await config.all() # Single config call
if not settings.get('enabled', False):
return
# Process guild data
await self._do_guild_work(guild, settings)
except Exception as e:
log.error("Failed to process guild %s (%d): %s",
guild.name, guild.id, e)
π RED TASK COORDINATION #
- π― Cog Dependencies: Wait for other cogs to be ready
- π§ Event Coordination: Use asyncio.Event() for synchronization
- π Queue Processing: Handle command queues and work items
- π¨ Lock Usage: Protect shared resources with asyncio.Lock()
- π Condition Variables: Complex state coordination
- π‘οΈ Semaphore Limits: Control concurrent operations
- π¦ Cross-Cog Communication: Safe inter-cog task coordination
- π― Bot Event Integration: React to Discord events in tasks
class CoordinatedCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.ready_event = asyncio.Event()
self.work_queue = asyncio.Queue()
self.processing_lock = asyncio.Lock()
self.worker_semaphore = asyncio.Semaphore(3)
async def cog_load(self):
# Start coordinator tasks
asyncio.create_task(self._producer_task())
asyncio.create_task(self._worker_task())
asyncio.create_task(self._monitor_task())
async def _producer_task(self):
"""Generate work items"""
await self.bot.wait_until_red_ready()
self.ready_event.set()
while not self.bot.is_closed():
# Check for new work from config/events
work_items = await self._generate_work()
for item in work_items:
await self.work_queue.put(item)
await asyncio.sleep(30)
async def _worker_task(self):
"""Process work items with coordination"""
await self.ready_event.wait()
while not self.bot.is_closed():
try:
# Get work with timeout
work_item = await asyncio.wait_for(
self.work_queue.get(), timeout=5.0
)
# Process with semaphore limiting
async with self.worker_semaphore:
await self._process_work_item(work_item)
except asyncio.TimeoutError:
continue # No work available
except Exception as e:
log.exception("Worker task error: %s", e)
# React to Discord events in background tasks
@commands.Cog.listener()
async def on_guild_join(self, guild):
"""Trigger background processing for new guild"""
if hasattr(self, 'work_queue'):
await self.work_queue.put(('new_guild', guild))
π― RED LIFECYCLE BEST PRACTICES #
- π― Proper Cleanup: Always cancel tasks in
cog_unload() - π§ Resource Management: Close aiohttp sessions and database connections
- π Graceful Shutdown: Allow current operations to complete
- π¨ State Persistence: Save important state before shutdown
- π Restart Recovery: Handle bot restarts gracefully
- π‘οΈ Timeout Handling: Force cleanup if graceful shutdown fails
- π¦ Memory Cleanup: Clear large data structures
- π― Event Cleanup: Remove event listeners if needed
class LifecycleCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.session = None
self.db_conn = None
self.background_tasks = []
self.shutdown_event = asyncio.Event()
async def cog_load(self):
"""Initialize resources and tasks"""
self.session = aiohttp.ClientSession()
# Create and track all background tasks
tasks = [
asyncio.create_task(self._main_loop()),
asyncio.create_task(self._cleanup_loop()),
asyncio.create_task(self._monitor_loop())
]
self.background_tasks.extend(tasks)
def cog_unload(self):
"""Clean shutdown of all resources"""
log.info("Shutting down %s cog", self.__class__.__name__)
# Signal shutdown to all tasks
self.shutdown_event.set()
# Cancel all background tasks
for task in self.background_tasks:
if not task.done():
task.cancel()
# Schedule resource cleanup
asyncio.create_task(self._cleanup_resources())
async def _cleanup_resources(self):
"""Clean up resources with timeout"""
try:
# Wait for tasks to complete gracefully
if self.background_tasks:
await asyncio.wait_for(
asyncio.gather(*self.background_tasks, return_exceptions=True),
timeout=10.0
)
except asyncio.TimeoutError:
log.warning("Some background tasks didn't complete in time")
# Close external resources
if self.session and not self.session.closed:
await self.session.close()
if self.db_conn:
self.db_conn.close()
log.info("Cog cleanup completed")
async def _main_loop(self):
"""Main task with shutdown awareness"""
await self.bot.wait_until_red_ready()
while not self.bot.is_closed() and not self.shutdown_event.is_set():
try:
await self._do_work()
# Use shutdown event for early termination
try:
await asyncio.wait_for(
self.shutdown_event.wait(), timeout=60.0
)
break # Shutdown requested
except asyncio.TimeoutError:
continue # Normal operation
except Exception as e:
log.exception("Main loop error: %s", e)
if not self.shutdown_event.is_set():
await asyncio.sleep(300)
π‘ REAL RED PATTERNS IN PRODUCTION #
- π― Audio Queue Management: Complex state with player coordination
- π§ Stream Monitoring: Multi-platform API polling with caching
- π Economy Tasks: Transaction processing and interest calculations
- π¨ Moderation Automation: Automated actions based on user behavior
- π Trivia Timing: Game state management with timeouts
- π‘οΈ Bank Operations: Atomic transactions with rollback support
- π¦ Playlist Management: Background playlist processing and caching
- π― Auto-Moderation: Real-time message analysis and action
# Real Audio cog pattern - Player automation
async def player_automated_timer(self):
"""From Audio cog - manages player timeouts"""
await self.bot.wait_until_red_ready()
await self.cog_ready_event.wait()
while not self.bot.is_closed():
try:
# Check all active players
for guild_id, player in self.players.copy().items():
try:
if await self._should_disconnect_player(player):
await player.stop()
await player.disconnect()
except Exception as e:
log.error("Player cleanup error for guild %d: %s", guild_id, e)
await asyncio.sleep(60) # Check every minute
except Exception as e:
log.exception("Player timer error: %s", e)
await asyncio.sleep(300)
# Real Bank cog pattern - Interest calculation
async def _interest_loop(self):
"""Background interest calculation"""
await self.bot.wait_until_red_ready()
while not self.bot.is_closed():
try:
# Calculate interest for all users
interest_rate = await self.config.interest_rate()
if interest_rate > 0:
async for user_data in self._get_all_user_accounts():
await self._apply_interest(user_data, interest_rate)
# Wait until next interest period
next_run = await self._calculate_next_interest_time()
await asyncio.sleep(next_run)
except Exception as e:
log.exception("Interest calculation error: %s", e)
await asyncio.sleep(3600) # Retry in 1 hour
β‘ Master Red-DiscordBot’s proven background task patterns for rock-solid, efficient, and maintainable cog operations!