π§© COG DEVELOPMENT & LIFECYCLE CHEAT SHEET #
Ultra-Condensed Enlightenment Guide - CORRECTED VERSION
ποΈ COG ARCHITECTURE FOUNDATION #
- π― Base Class: All cogs inherit from
commands.Cog - π§ Lifecycle Methods:
__init__,cog_load(),cog_unload(),cog_check() - π Registration: async
setup()function required for bot discovery - π¨ GDPR Compliance:
__red_end_user_data_statement__module-level string - π State Management: Instance variables for cog-specific data storage
- π‘οΈ Error Handling: Built-in error boundaries and exception isolation per cog
- π¦ Resource Management: Automatic cleanup on unload via
cog_unload() - π― Event Handling:
@commands.Cog.listener()for Discord events - π§ Dependencies: Inter-cog communication via bot instance
- π Configuration: Config instance per cog with unique identifier
π COG LIFECYCLE MASTERY #
Complete Lifecycle Flow (Red 3.5+ Pattern) #
# 1. Import Phase
from redbot.core import commands, Config
import asyncio
import logging
log = logging.getLogger("red.mycog.cog_name")
# 2. GDPR Compliance Statement (Required)
__red_end_user_data_statement__ = (
"This cog stores user IDs and guild IDs for configuration purposes. "
"No personal data is stored."
)
# 3. Class Definition
class MyCog(commands.Cog):
"""Cog description for help system"""
def __init__(self, bot):
# 4. Initialization Phase
self.bot = bot
self.config = Config.get_conf(
self,
identifier=1234567890,
force_registration=True # Recommended best practice
)
self.background_tasks = []
# Register default values
default_guild = {"enabled": True, "channel": None}
self.config.register_guild(**default_guild)
async def cog_load(self):
"""5. Load Phase - runs after __init__"""
log.info("Cog loaded successfully")
# Start background tasks
task = asyncio.create_task(self.background_loop())
self.background_tasks.append(task)
# Initialize external connections
await self.init_external_apis()
async def cog_unload(self):
"""6. Unload Phase - cleanup before removal"""
log.info("Cog unloading...")
# Cancel background tasks
for task in self.background_tasks:
if not task.done():
task.cancel()
# Wait for task completion
if self.background_tasks:
await asyncio.gather(*self.background_tasks, return_exceptions=True)
# Close external connections
await self.cleanup_external_apis()
# Save any pending data
await self.save_data()
def cog_check(self, ctx):
"""7. Global check - runs before every command"""
# Check if cog is enabled in guild
if not ctx.guild:
return True # Always allow in DMs
return self.config.guild(ctx.guild).enabled()
async def red_delete_data_for_user(self, **kwargs):
"""8. GDPR Data Deletion (Required)"""
user_id = kwargs.get("user_id")
if user_id:
await self.config.user_from_id(user_id).clear()
# 9. Setup Function (Required - MUST be async in Red 3.5+)
async def setup(bot):
"""Called by Red when loading the cog"""
cog = MyCog(bot)
await bot.add_cog(cog) # add_cog is async in Red 3.5+
# 10. Teardown Function (Optional - MUST be async in Red 3.5+)
async def teardown(bot):
"""Called by Red when unloading the cog"""
cog = bot.get_cog("MyCog")
if cog:
await cog.cleanup_resources()
Lifecycle State Management #
class StatefulCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.loaded = False
self.ready = False
self.connections = {}
self.cache = {}
self.tasks = []
async def cog_load(self):
"""Transition to loaded state"""
self.loaded = True
await self.initialize_resources()
self.ready = True
async def cog_unload(self):
"""Cleanup and state reset"""
self.ready = False
# Cancel all tasks
for task in self.tasks:
if not task.done():
task.cancel()
if self.tasks:
await asyncio.gather(*self.tasks, return_exceptions=True)
await self.cleanup_resources()
self.loaded = False
def is_ready(self):
"""Check if cog is ready for use"""
return self.loaded and self.ready
π COG STRUCTURE PATTERNS #
Standard Cog Template (Red 3.5+ Compliant) #
from redbot.core import commands, Config, checks
from redbot.core.bot import Red
from redbot.core.utils.chat_formatting import box, pagify
import discord
import asyncio
import logging
log = logging.getLogger("red.mycog")
# REQUIRED: GDPR compliance statement
__red_end_user_data_statement__ = (
"This cog stores user IDs and guild IDs for configuration purposes. "
"No personal data is stored."
)
class MyCog(commands.Cog):
"""
Detailed cog description for the help system.
This cog provides functionality for...
"""
def __init__(self, bot: Red):
self.bot = bot
self.config = Config.get_conf(
self,
identifier=1234567890,
force_registration=True # Best practice
)
# Configuration defaults
self.config.register_global(
enabled=True,
debug=False
)
self.config.register_guild(
enabled=True,
channel=None,
roles=[],
settings={}
)
self.config.register_user(
opted_in=False,
preferences={}
)
# Cog state
self.background_tasks = []
self.connections = {}
self.cache = {}
async def cog_load(self):
"""Initialize cog after bot is ready"""
log.info("Loading MyCog...")
await self.initialize_cache()
await self.start_background_tasks()
log.info("MyCog loaded successfully")
async def cog_unload(self):
"""Cleanup before cog removal"""
log.info("Unloading MyCog...")
await self.stop_background_tasks()
await self.cleanup_connections()
log.info("MyCog unloaded successfully")
def cog_check(self, ctx):
"""Global check for all commands in this cog"""
# Allow DMs, check guild settings
if not ctx.guild:
return True
return self.config.guild(ctx.guild).enabled()
async def red_delete_data_for_user(self, **kwargs):
"""Required for GDPR compliance"""
user_id = kwargs.get("user_id")
if user_id:
await self.config.user_from_id(user_id).clear()
Advanced Cog with Dependencies #
class AdvancedCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.dependencies = ["Economy", "Modlog"] # Required cogs
self.optional_deps = ["Audio", "Trivia"] # Optional cogs
async def cog_load(self):
"""Check dependencies on load"""
missing_deps = []
for dep in self.dependencies:
if not self.bot.get_cog(dep):
missing_deps.append(dep)
if missing_deps:
raise RuntimeError(f"Missing required cogs: {missing_deps}")
# Optional dependency integration
for dep in self.optional_deps:
cog = self.bot.get_cog(dep)
if cog:
await self.integrate_with_cog(cog)
π― COMMAND IMPLEMENTATION PATTERNS #
Command Organization Best Practices #
class OrganizedCog(commands.Cog):
# Basic commands
@commands.command()
async def ping(self, ctx):
"""Simple ping command"""
await ctx.send("Pong!")
# Group commands
@commands.group(invoke_without_command=True)
async def config(self, ctx):
"""Configuration commands"""
if ctx.invoked_subcommand is None:
await ctx.send_help(ctx.command)
@config.command(name="set")
@checks.admin_or_permissions(manage_guild=True)
async def config_set(self, ctx, setting, *, value):
"""Set a configuration value"""
async with self.config.guild(ctx.guild).settings() as settings:
settings[setting] = value
await ctx.tick()
@config.command(name="get")
async def config_get(self, ctx, setting):
"""Get a configuration value"""
settings = await self.config.guild(ctx.guild).settings()
value = settings.get(setting, "Not set")
await ctx.send(f"{setting}: {value}")
# Event listeners
@commands.Cog.listener()
async def on_message(self, message):
"""Handle message events"""
# Exit early if bot or disabled
if message.author.bot:
return
if message.guild and await self.bot.cog_disabled_in_guild(self, message.guild):
return
# Process message
await self.process_user_message(message)
@commands.Cog.listener()
async def on_member_join(self, member):
"""Handle member join events"""
# Check if enabled in guild
if await self.bot.cog_disabled_in_guild(self, member.guild):
return
# Welcome new members
await self.welcome_member(member)
π¦ BACKGROUND TASK MANAGEMENT #
Task Lifecycle Management (Red 3.5+ Pattern) #
class BackgroundCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.tasks = []
async def cog_load(self):
"""Start all background tasks"""
self.tasks.append(asyncio.create_task(self.cleanup_loop()))
self.tasks.append(asyncio.create_task(self.update_loop()))
self.tasks.append(asyncio.create_task(self.monitor_loop()))
async def cog_unload(self):
"""Cancel all background tasks"""
for task in self.tasks:
if not task.done():
task.cancel()
# Wait for cancellation to complete
if self.tasks:
await asyncio.gather(*self.tasks, return_exceptions=True)
self.tasks.clear()
async def cleanup_loop(self):
"""Periodic cleanup task"""
try:
while True:
await self.perform_cleanup()
await asyncio.sleep(3600) # Every hour
except asyncio.CancelledError:
log.debug("Cleanup task cancelled")
raise # Re-raise to ensure proper cancellation
except Exception as e:
log.exception(f"Error in cleanup loop: {e}")
async def update_loop(self):
"""Periodic update task"""
try:
while True:
await self.update_data()
await asyncio.sleep(300) # Every 5 minutes
except asyncio.CancelledError:
log.debug("Update task cancelled")
raise
except Exception as e:
log.exception(f"Error in update loop: {e}")
Task Recovery and Health Monitoring #
async def ensure_task_health(self):
"""Check and restart failed tasks"""
new_tasks = []
for i, task in enumerate(self.tasks):
if task.done() and not task.cancelled():
# Task completed unexpectedly, restart it
exception = task.exception()
if exception:
log.error(f"Task {i} failed: {exception}")
# Restart based on task type
if i == 0: # cleanup task
new_tasks.append(asyncio.create_task(self.cleanup_loop()))
elif i == 1: # update task
new_tasks.append(asyncio.create_task(self.update_loop()))
else:
new_tasks.append(task)
self.tasks = new_tasks
π§ ERROR HANDLING & VALIDATION #
Comprehensive Error Handling #
class RobustCog(commands.Cog):
@commands.Cog.listener()
async def on_command_error(self, ctx, error):
"""Handle errors for this cog's commands"""
# Only handle errors from this cog's commands
if not hasattr(ctx.command, "cog") or ctx.command.cog != self:
return
if isinstance(error, commands.MissingRequiredArgument):
await ctx.send(f"Missing argument: {error.param}")
await ctx.send_help(ctx.command)
elif isinstance(error, commands.BadArgument):
await ctx.send(f"Invalid argument: {error}")
await ctx.send_help(ctx.command)
elif isinstance(error, commands.CheckFailure):
await ctx.send("You don't have permission to use this command")
elif isinstance(error, commands.CommandOnCooldown):
await ctx.send(f"Command on cooldown: {error.retry_after:.1f}s")
else:
# Log unexpected errors
log.exception(f"Unexpected error in {ctx.command}: {error}")
await ctx.send("An unexpected error occurred")
async def validate_configuration(self):
"""Validate cog configuration"""
try:
config = await self.config.all()
# Validate configuration structure
required_keys = ["enabled", "settings"]
for key in required_keys:
if key not in config:
log.warning(f"Missing config key: {key}")
except Exception as e:
log.error(f"Configuration validation failed: {e}")
π INTER-COG COMMUNICATION #
Safe Cog Communication Patterns #
class CommunicatingCog(commands.Cog):
def get_economy_cog(self):
"""Safely get Economy cog"""
return self.bot.get_cog("Economy")
async def add_currency(self, user, amount):
"""Add currency via Economy cog"""
economy = self.get_economy_cog()
if economy and hasattr(economy, "deposit_credits"):
try:
from redbot.core import bank
await bank.deposit_credits(user, amount)
return True
except Exception as e:
log.error(f"Failed to add currency: {e}")
return False
async def check_cog_compatibility(self, cog_name, required_methods):
"""Check if another cog has required methods"""
cog = self.bot.get_cog(cog_name)
if not cog:
return False
for method in required_methods:
if not hasattr(cog, method):
return False
return True
@commands.command()
async def reward(self, ctx, user: discord.Member, amount: int):
"""Reward user with currency"""
if await self.add_currency(user, amount):
await ctx.send(f"Added {amount} credits to {user}")
else:
await ctx.send("Economy system not available")
π DATA PERSISTENCE PATTERNS #
Configuration Best Practices (Red 3.5+) #
class DataManagedCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.config = Config.get_conf(
self,
identifier=1234567890,
force_registration=True # Red 3.5+ best practice
)
# Structured defaults
default_global = {
"version": "1.0.0",
"enabled": True,
"settings": {
"debug": False,
"rate_limit": 100
}
}
default_guild = {
"enabled": True,
"prefix": "!",
"channels": {
"log": None,
"admin": None
},
"roles": {
"moderator": [],
"admin": []
},
"features": {
"automod": False,
"logging": True
}
}
self.config.register_global(**default_global)
self.config.register_guild(**default_guild)
async def migrate_data(self):
"""Handle data migration between versions"""
version = await self.config.version()
if version < "1.1.0":
await self.migrate_to_v1_1_0()
if version < "1.2.0":
await self.migrate_to_v1_2_0()
async def backup_data(self):
"""Create data backup"""
from datetime import datetime
all_data = await self.config.all()
backup_data = {
"timestamp": datetime.utcnow().isoformat(),
"version": await self.config.version(),
"data": all_data
}
return backup_data
β‘ QUICK COG REFERENCE #
# Minimal Cog Structure (Red 3.5+)
__red_end_user_data_statement__ = "No user data stored."
class MyCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.config = Config.get_conf(self, identifier=12345, force_registration=True)
@commands.command()
async def hello(self, ctx):
await ctx.send("Hello!")
async def setup(bot): # MUST be async in Red 3.5+
await bot.add_cog(MyCog(bot))
# Loading/Unloading Commands
[p]load cog_name # Load cog
[p]unload cog_name # Unload cog
[p]reload cog_name # Reload cog
[p]cogs # List all cogs
# Cog Directory Structure
cogs/
βββ mycog/
β βββ __init__.py # Empty file
β βββ mycog.py # Main cog file
β βββ info.json # Cog metadata
β βββ data/ # Optional data directory
βββ anothercog.py # Single-file cog
π§© Master Red’s cog lifecycle for bulletproof, maintainable bot extensions that follow official best practices!