🧩 COG DEVELOPMENT & LIFECYCLE CHEAT SHEET

🧩 COG DEVELOPMENT & LIFECYCLE CHEAT SHEET #

Ultra-Condensed Enlightenment Guide - CORRECTED VERSION


πŸ—οΈ COG ARCHITECTURE FOUNDATION #

  • 🎯 Base Class: All cogs inherit from commands.Cog
  • πŸ”§ Lifecycle Methods: __init__, cog_load(), cog_unload(), cog_check()
  • πŸ“Š Registration: async setup() function required for bot discovery
  • 🎨 GDPR Compliance: __red_end_user_data_statement__ module-level string
  • πŸ”„ State Management: Instance variables for cog-specific data storage
  • πŸ›‘οΈ Error Handling: Built-in error boundaries and exception isolation per cog
  • πŸ“¦ Resource Management: Automatic cleanup on unload via cog_unload()
  • 🎯 Event Handling: @commands.Cog.listener() for Discord events
  • πŸ”§ Dependencies: Inter-cog communication via bot instance
  • πŸ“Š Configuration: Config instance per cog with unique identifier

πŸ”„ COG LIFECYCLE MASTERY #

Complete Lifecycle Flow (Red 3.5+ Pattern) #

# 1. Import Phase
from redbot.core import commands, Config
import asyncio
import logging

log = logging.getLogger("red.mycog.cog_name")

# 2. GDPR Compliance Statement (Required)
__red_end_user_data_statement__ = (
    "This cog stores user IDs and guild IDs for configuration purposes. "
    "No personal data is stored."
)

# 3. Class Definition
class MyCog(commands.Cog):
    """Cog description for help system"""
    
    def __init__(self, bot):
        # 4. Initialization Phase
        self.bot = bot
        self.config = Config.get_conf(
            self, 
            identifier=1234567890,
            force_registration=True  # Recommended best practice
        )
        self.background_tasks = []
        
        # Register default values
        default_guild = {"enabled": True, "channel": None}
        self.config.register_guild(**default_guild)
    
    async def cog_load(self):
        """5. Load Phase - runs after __init__"""
        log.info("Cog loaded successfully")
        # Start background tasks
        task = asyncio.create_task(self.background_loop())
        self.background_tasks.append(task)
        # Initialize external connections
        await self.init_external_apis()
    
    async def cog_unload(self):
        """6. Unload Phase - cleanup before removal"""
        log.info("Cog unloading...")
        # Cancel background tasks
        for task in self.background_tasks:
            if not task.done():
                task.cancel()
        # Wait for task completion
        if self.background_tasks:
            await asyncio.gather(*self.background_tasks, return_exceptions=True)
        # Close external connections
        await self.cleanup_external_apis()
        # Save any pending data
        await self.save_data()
    
    def cog_check(self, ctx):
        """7. Global check - runs before every command"""
        # Check if cog is enabled in guild
        if not ctx.guild:
            return True  # Always allow in DMs
        return self.config.guild(ctx.guild).enabled()
    
    async def red_delete_data_for_user(self, **kwargs):
        """8. GDPR Data Deletion (Required)"""
        user_id = kwargs.get("user_id")
        if user_id:
            await self.config.user_from_id(user_id).clear()

# 9. Setup Function (Required - MUST be async in Red 3.5+)
async def setup(bot):
    """Called by Red when loading the cog"""
    cog = MyCog(bot)
    await bot.add_cog(cog)  # add_cog is async in Red 3.5+
    
# 10. Teardown Function (Optional - MUST be async in Red 3.5+)
async def teardown(bot):
    """Called by Red when unloading the cog"""
    cog = bot.get_cog("MyCog")
    if cog:
        await cog.cleanup_resources()

Lifecycle State Management #

class StatefulCog(commands.Cog):
    def __init__(self, bot):
        self.bot = bot
        self.loaded = False
        self.ready = False
        self.connections = {}
        self.cache = {}
        self.tasks = []
        
    async def cog_load(self):
        """Transition to loaded state"""
        self.loaded = True
        await self.initialize_resources()
        self.ready = True
        
    async def cog_unload(self):
        """Cleanup and state reset"""
        self.ready = False
        # Cancel all tasks
        for task in self.tasks:
            if not task.done():
                task.cancel()
        if self.tasks:
            await asyncio.gather(*self.tasks, return_exceptions=True)
        await self.cleanup_resources()
        self.loaded = False
        
    def is_ready(self):
        """Check if cog is ready for use"""
        return self.loaded and self.ready

πŸ“Š COG STRUCTURE PATTERNS #

Standard Cog Template (Red 3.5+ Compliant) #

from redbot.core import commands, Config, checks
from redbot.core.bot import Red
from redbot.core.utils.chat_formatting import box, pagify
import discord
import asyncio
import logging

log = logging.getLogger("red.mycog")

# REQUIRED: GDPR compliance statement
__red_end_user_data_statement__ = (
    "This cog stores user IDs and guild IDs for configuration purposes. "
    "No personal data is stored."
)

class MyCog(commands.Cog):
    """
    Detailed cog description for the help system.
    
    This cog provides functionality for...
    """
    
    def __init__(self, bot: Red):
        self.bot = bot
        self.config = Config.get_conf(
            self, 
            identifier=1234567890,
            force_registration=True  # Best practice
        )
        
        # Configuration defaults
        self.config.register_global(
            enabled=True,
            debug=False
        )
        
        self.config.register_guild(
            enabled=True,
            channel=None,
            roles=[],
            settings={}
        )
        
        self.config.register_user(
            opted_in=False,
            preferences={}
        )
        
        # Cog state
        self.background_tasks = []
        self.connections = {}
        self.cache = {}
        
    async def cog_load(self):
        """Initialize cog after bot is ready"""
        log.info("Loading MyCog...")
        await self.initialize_cache()
        await self.start_background_tasks()
        log.info("MyCog loaded successfully")
        
    async def cog_unload(self):
        """Cleanup before cog removal"""
        log.info("Unloading MyCog...")
        await self.stop_background_tasks()
        await self.cleanup_connections()
        log.info("MyCog unloaded successfully")
        
    def cog_check(self, ctx):
        """Global check for all commands in this cog"""
        # Allow DMs, check guild settings
        if not ctx.guild:
            return True
        return self.config.guild(ctx.guild).enabled()
        
    async def red_delete_data_for_user(self, **kwargs):
        """Required for GDPR compliance"""
        user_id = kwargs.get("user_id")
        if user_id:
            await self.config.user_from_id(user_id).clear()

Advanced Cog with Dependencies #

class AdvancedCog(commands.Cog):
    def __init__(self, bot):
        self.bot = bot
        self.dependencies = ["Economy", "Modlog"]  # Required cogs
        self.optional_deps = ["Audio", "Trivia"]   # Optional cogs
        
    async def cog_load(self):
        """Check dependencies on load"""
        missing_deps = []
        for dep in self.dependencies:
            if not self.bot.get_cog(dep):
                missing_deps.append(dep)
                
        if missing_deps:
            raise RuntimeError(f"Missing required cogs: {missing_deps}")
            
        # Optional dependency integration
        for dep in self.optional_deps:
            cog = self.bot.get_cog(dep)
            if cog:
                await self.integrate_with_cog(cog)

🎯 COMMAND IMPLEMENTATION PATTERNS #

Command Organization Best Practices #

class OrganizedCog(commands.Cog):
    # Basic commands
    @commands.command()
    async def ping(self, ctx):
        """Simple ping command"""
        await ctx.send("Pong!")
    
    # Group commands
    @commands.group(invoke_without_command=True)
    async def config(self, ctx):
        """Configuration commands"""
        if ctx.invoked_subcommand is None:
            await ctx.send_help(ctx.command)
    
    @config.command(name="set")
    @checks.admin_or_permissions(manage_guild=True)
    async def config_set(self, ctx, setting, *, value):
        """Set a configuration value"""
        async with self.config.guild(ctx.guild).settings() as settings:
            settings[setting] = value
        await ctx.tick()
    
    @config.command(name="get")
    async def config_get(self, ctx, setting):
        """Get a configuration value"""
        settings = await self.config.guild(ctx.guild).settings()
        value = settings.get(setting, "Not set")
        await ctx.send(f"{setting}: {value}")
    
    # Event listeners
    @commands.Cog.listener()
    async def on_message(self, message):
        """Handle message events"""
        # Exit early if bot or disabled
        if message.author.bot:
            return
        if message.guild and await self.bot.cog_disabled_in_guild(self, message.guild):
            return
        # Process message
        await self.process_user_message(message)
    
    @commands.Cog.listener()
    async def on_member_join(self, member):
        """Handle member join events"""
        # Check if enabled in guild
        if await self.bot.cog_disabled_in_guild(self, member.guild):
            return
        # Welcome new members
        await self.welcome_member(member)

πŸ“¦ BACKGROUND TASK MANAGEMENT #

Task Lifecycle Management (Red 3.5+ Pattern) #

class BackgroundCog(commands.Cog):
    def __init__(self, bot):
        self.bot = bot
        self.tasks = []
        
    async def cog_load(self):
        """Start all background tasks"""
        self.tasks.append(asyncio.create_task(self.cleanup_loop()))
        self.tasks.append(asyncio.create_task(self.update_loop()))
        self.tasks.append(asyncio.create_task(self.monitor_loop()))
        
    async def cog_unload(self):
        """Cancel all background tasks"""
        for task in self.tasks:
            if not task.done():
                task.cancel()
        # Wait for cancellation to complete
        if self.tasks:
            await asyncio.gather(*self.tasks, return_exceptions=True)
        self.tasks.clear()
        
    async def cleanup_loop(self):
        """Periodic cleanup task"""
        try:
            while True:
                await self.perform_cleanup()
                await asyncio.sleep(3600)  # Every hour
        except asyncio.CancelledError:
            log.debug("Cleanup task cancelled")
            raise  # Re-raise to ensure proper cancellation
        except Exception as e:
            log.exception(f"Error in cleanup loop: {e}")
            
    async def update_loop(self):
        """Periodic update task"""
        try:
            while True:
                await self.update_data()
                await asyncio.sleep(300)  # Every 5 minutes
        except asyncio.CancelledError:
            log.debug("Update task cancelled")
            raise
        except Exception as e:
            log.exception(f"Error in update loop: {e}")

Task Recovery and Health Monitoring #

async def ensure_task_health(self):
    """Check and restart failed tasks"""
    new_tasks = []
    for i, task in enumerate(self.tasks):
        if task.done() and not task.cancelled():
            # Task completed unexpectedly, restart it
            exception = task.exception()
            if exception:
                log.error(f"Task {i} failed: {exception}")
            
            # Restart based on task type
            if i == 0:  # cleanup task
                new_tasks.append(asyncio.create_task(self.cleanup_loop()))
            elif i == 1:  # update task
                new_tasks.append(asyncio.create_task(self.update_loop()))
        else:
            new_tasks.append(task)
    
    self.tasks = new_tasks

πŸ”§ ERROR HANDLING & VALIDATION #

Comprehensive Error Handling #

class RobustCog(commands.Cog):
    @commands.Cog.listener()
    async def on_command_error(self, ctx, error):
        """Handle errors for this cog's commands"""
        # Only handle errors from this cog's commands
        if not hasattr(ctx.command, "cog") or ctx.command.cog != self:
            return
            
        if isinstance(error, commands.MissingRequiredArgument):
            await ctx.send(f"Missing argument: {error.param}")
            await ctx.send_help(ctx.command)
            
        elif isinstance(error, commands.BadArgument):
            await ctx.send(f"Invalid argument: {error}")
            await ctx.send_help(ctx.command)
            
        elif isinstance(error, commands.CheckFailure):
            await ctx.send("You don't have permission to use this command")
            
        elif isinstance(error, commands.CommandOnCooldown):
            await ctx.send(f"Command on cooldown: {error.retry_after:.1f}s")
            
        else:
            # Log unexpected errors
            log.exception(f"Unexpected error in {ctx.command}: {error}")
            await ctx.send("An unexpected error occurred")
    
    async def validate_configuration(self):
        """Validate cog configuration"""
        try:
            config = await self.config.all()
            # Validate configuration structure
            required_keys = ["enabled", "settings"]
            for key in required_keys:
                if key not in config:
                    log.warning(f"Missing config key: {key}")
                    
        except Exception as e:
            log.error(f"Configuration validation failed: {e}")

πŸ”— INTER-COG COMMUNICATION #

Safe Cog Communication Patterns #

class CommunicatingCog(commands.Cog):
    def get_economy_cog(self):
        """Safely get Economy cog"""
        return self.bot.get_cog("Economy")
    
    async def add_currency(self, user, amount):
        """Add currency via Economy cog"""
        economy = self.get_economy_cog()
        if economy and hasattr(economy, "deposit_credits"):
            try:
                from redbot.core import bank
                await bank.deposit_credits(user, amount)
                return True
            except Exception as e:
                log.error(f"Failed to add currency: {e}")
        return False
    
    async def check_cog_compatibility(self, cog_name, required_methods):
        """Check if another cog has required methods"""
        cog = self.bot.get_cog(cog_name)
        if not cog:
            return False
            
        for method in required_methods:
            if not hasattr(cog, method):
                return False
        return True
    
    @commands.command()
    async def reward(self, ctx, user: discord.Member, amount: int):
        """Reward user with currency"""
        if await self.add_currency(user, amount):
            await ctx.send(f"Added {amount} credits to {user}")
        else:
            await ctx.send("Economy system not available")

πŸ“‹ DATA PERSISTENCE PATTERNS #

Configuration Best Practices (Red 3.5+) #

class DataManagedCog(commands.Cog):
    def __init__(self, bot):
        self.bot = bot
        self.config = Config.get_conf(
            self, 
            identifier=1234567890,
            force_registration=True  # Red 3.5+ best practice
        )
        
        # Structured defaults
        default_global = {
            "version": "1.0.0",
            "enabled": True,
            "settings": {
                "debug": False,
                "rate_limit": 100
            }
        }
        
        default_guild = {
            "enabled": True,
            "prefix": "!",
            "channels": {
                "log": None,
                "admin": None
            },
            "roles": {
                "moderator": [],
                "admin": []
            },
            "features": {
                "automod": False,
                "logging": True
            }
        }
        
        self.config.register_global(**default_global)
        self.config.register_guild(**default_guild)
    
    async def migrate_data(self):
        """Handle data migration between versions"""
        version = await self.config.version()
        if version < "1.1.0":
            await self.migrate_to_v1_1_0()
        if version < "1.2.0":
            await self.migrate_to_v1_2_0()
    
    async def backup_data(self):
        """Create data backup"""
        from datetime import datetime
        all_data = await self.config.all()
        backup_data = {
            "timestamp": datetime.utcnow().isoformat(),
            "version": await self.config.version(),
            "data": all_data
        }
        return backup_data

⚑ QUICK COG REFERENCE #

# Minimal Cog Structure (Red 3.5+)
__red_end_user_data_statement__ = "No user data stored."

class MyCog(commands.Cog):
    def __init__(self, bot):
        self.bot = bot
        self.config = Config.get_conf(self, identifier=12345, force_registration=True)
        
    @commands.command()
    async def hello(self, ctx):
        await ctx.send("Hello!")

async def setup(bot):  # MUST be async in Red 3.5+
    await bot.add_cog(MyCog(bot))

# Loading/Unloading Commands
[p]load cog_name          # Load cog
[p]unload cog_name        # Unload cog  
[p]reload cog_name        # Reload cog
[p]cogs                   # List all cogs

# Cog Directory Structure
cogs/
β”œβ”€β”€ mycog/
β”‚   β”œβ”€β”€ __init__.py       # Empty file
β”‚   β”œβ”€β”€ mycog.py          # Main cog file
β”‚   β”œβ”€β”€ info.json         # Cog metadata
β”‚   └── data/             # Optional data directory
└── anothercog.py         # Single-file cog

🧩 Master Red’s cog lifecycle for bulletproof, maintainable bot extensions that follow official best practices!