๐๏ธ CORE ARCHITECTURE & DESIGN PHILOSOPHY CHEAT SHEET #
Ultra-Condensed Enlightenment Guide - Red 3.5+ Compliant
๐งฉ MODULAR COG SYSTEM MASTERY (Red 3.5+ Architecture) #
Core Cog Architecture #
# Red 3.5+ Compliant Cog Structure
from redbot.core import commands, Config
import asyncio
import logging
log = logging.getLogger("red.mycog")
# REQUIRED: GDPR Compliance Statement
__red_end_user_data_statement__ = (
"This cog stores user IDs and guild IDs for configuration purposes. "
"No personal data is stored."
)
class MyCog(commands.Cog):
"""Modern Red 3.5+ cog implementation"""
def __init__(self, bot):
self.bot = bot
self.config = Config.get_conf(
self,
identifier=1234567890,
force_registration=True # RED 3.5+ BEST PRACTICE
)
self.background_tasks = []
# Register defaults with structured data
self.config.register_guild(
enabled=True,
settings={"feature": False}
)
async def cog_load(self):
"""Called after __init__ when cog is loaded"""
log.info("Cog loading...")
await self.start_background_tasks()
async def cog_unload(self):
"""Called before cog removal - CRITICAL for cleanup"""
log.info("Cog unloading...")
for task in self.background_tasks:
if not task.done():
task.cancel()
if self.background_tasks:
await asyncio.gather(*self.background_tasks, return_exceptions=True)
async def red_delete_data_for_user(self, kwargs):
"""REQUIRED: GDPR compliance data deletion"""
user_id = kwargs.get("user_id")
if user_id:
await self.config.user_from_id(user_id).clear()
# REQUIRED: Async setup function (Red 3.5+)
async def setup(bot):
"""Called by Red when loading the cog - MUST BE ASYNC"""
cog = MyCog(bot)
await bot.add_cog(cog) # add_cog is async in Red 3.5+
Cog Lifecycle Flow (Red 3.5+) #
1. Import Phase โ 2. Class Definition โ 3. __init__() โ 4. cog_load() โ
5. Runtime โ 6. cog_unload() โ 7. Cleanup โ 8. Memory Release
- ๐ฏ Registration: Async
setup()function withawait bot.add_cog() - ๐ง Initialization:
__init__for basic setup,cog_load()for async operations - ๐ฆ Discovery: Auto-scan via
redbot/cogs/or manual loading - ๐ Hot Reload: Zero-downtime reloading with state preservation
- ๐ ๏ธ Dependencies: Dependency injection via bot instance access
- ๐ State Isolation: Independent memory space per cog instance
- ๐ช Event Distribution: Automatic event routing to cog listeners
- ๐งฌ Inheritance: Multiple inheritance from
commands.Cogbase class - ๐ GDPR Compliance: Mandatory data statement and deletion methods
๐๏ธ BACKEND-FIRST ARCHITECTURE (Separation of Concerns) #
Architectural Layers Visualization #
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ DISCORD API LAYER โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ CONTEXT ABSTRACTION โ
โ โข Rich Context Objects โข Error Boundaries โ
โ โข Permission Validation โข Event Processing โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ COG LAYER โ
โ โข Business Logic โข Command Handlers โ
โ โข Event Listeners โข Background Tasks โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ CONFIG LAYER โ
โ โข Data Persistence โข Scope Management โ
โ โข Validation โข Migration โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ DRIVER LAYER โ
โ โข JSON Driver (Default) โข PostgreSQL Driver (Enterprise) โ
โ โข Storage Abstraction โข Transaction Management โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Implementation Patterns #
# Context Abstraction Pattern
class RichContext:
"""Enhanced context with Red-specific utilities"""
async def tick(self, success=True):
"""Visual confirmation with emoji reactions"""
emoji = "โ
" if success else "โ"
try:
await self.message.add_reaction(emoji)
except discord.HTTPException:
await self.send(emoji)
async def maybe_send_embed(self, embed):
"""Intelligent embed sending with fallback"""
if self.channel.permissions_for(self.me).embed_links:
await self.send(embed=embed)
else:
await self.send(embed.description or "No content")
# Error Boundary Implementation
async def cog_command_error(self, ctx, error):
"""Cog-level error handling boundary"""
if isinstance(error, commands.UserInputError):
await ctx.send_help(ctx.command)
elif isinstance(error, commands.CheckFailure):
await ctx.send("Permission denied")
else:
log.exception(f"Unexpected error in {ctx.command}: {error}")
- ๐ฏ Separation: Complete isolation of Discord API from business logic
- ๐ง Abstraction: Rich context objects hide complexity from cogs
- ๐ Error Boundaries: Layered error handling with graceful degradation
- ๐ก๏ธ Validation: Multi-layer input validation and sanitization
- ๐ Async Flow: Full async/await throughout the stack
- ๐ก Event Router: Centralized event distribution to interested cogs
- ๐จ Presentation: Formatting utilities separate from data logic
- ๐ง Middleware: Check system as interceptor middleware
- ๐ Unidirectional: Clean data flow from Discord โ Cog โ Config
- ๐ฏ Testability: Mockable interfaces for comprehensive testing
๐ INSTANCE MANAGEMENT SYSTEM (Multi-Tenancy) #
Instance Architecture #
# Instance Structure
/home/user/.local/share/Red-DiscordBot/
โโโ instance1/ # First bot instance
โ โโโ config.json # Instance configuration
โ โโโ cogs/ # Instance-specific cogs
โ โโโ data/ # Isolated data storage
โ โโโ logs/ # Instance logs
โโโ instance2/ # Second bot instance
โ โโโ config.json # Separate configuration
โ โโโ cogs/ # Independent cog storage
โ โโโ data/ # Isolated data
โ โโโ logs/ # Separate logs
โโโ global_cogs/ # Shared cog repository
โโโ economy/ # Shared cog code
โโโ moderation/ # Reusable across instances
โโโ music/ # Common functionality
Instance Management Commands #
# Instance Lifecycle Management
redbot-setup # Interactive instance creation
redbot-setup --no-prompt # Automated instance setup
redbot <instance_name> # Run specific instance
redbot <instance_name> --edit # Edit instance configuration
# Multi-Instance Operations
redbot-launcher # GUI launcher for multiple instances
redbot --list # List all instances
redbot --delete <instance> # Remove instance safely
- ๐ช Isolation: Complete data and config separation per instance
- ๐๏ธ Organization: Structured directory hierarchy for clarity
- ๐ง Management: CLI tools for instance lifecycle operations
- ๐ฏ Flexibility: Different bot tokens, settings per instance
- ๐ Scaling: Horizontal scaling through instance multiplication
- ๐ Migration: Tools for moving data between instances
- ๐ก๏ธ Security: Cross-instance data isolation for safety
- ๐ฆ Deployment: Docker-ready containerized instances
- ๐ฎ Process: Independent process management per instance
- ๐ง Resource: Shared code base, isolated runtime state
๐ CONFIG ABSTRACTION MASTERY (Data Persistence) #
Scope Hierarchy Visualization #
GLOBAL (0) โโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ โ Bot-wide settings โ
GUILD (1) โโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ Per-server settings โ
CHANNEL (2) โโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ Per-channel settings โ
ROLE (3) โโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ Per-role settings โ
USER (4) โโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ Per-user global settings โ
MEMBER (5) โโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ Per-member guild settingsโ
CUSTOM (6) โโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Custom identifiers โ
Advanced Config Patterns #
# Modern Config Setup (Red 3.5+)
class AdvancedCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.config = Config.get_conf(
self,
identifier=1234567890,
force_registration=True # PREVENTS SILENT FAILURES
)
# Structured Default Registration
default_global = {
"version": "1.0.0",
"debug": False,
"api_keys": {}
}
default_guild = {
"enabled": True,
"prefix": "!",
"modules": {
"economy": {"enabled": False, "currency": "credits"},
"moderation": {"enabled": True, "auto_ban": False}
},
"channels": {"log": None, "admin": None},
"roles": {"moderator": [], "admin": []},
"permissions": {"custom_commands": []}
}
# Force registration prevents typos and ensures defaults
self.config.register_global(default_global)
self.config.register_guild(default_guild)
# Custom scope for complex data
self.config.custom("ECONOMY_ACCOUNTS").register(
balance=0,
transactions=[],
daily_last=None
)
# Safe nested data modification
async def update_user_balance(self, user_id, amount):
"""Thread-safe balance update with context manager"""
async with self.config.custom("ECONOMY_ACCOUNTS", user_id).all() as data:
data["balance"] += amount
data["transactions"].append({
"amount": amount,
"timestamp": datetime.utcnow().isoformat(),
"type": "admin_adjustment"
})
Driver Architecture Comparison #
JSON Driver (Default) PostgreSQL Driver (Enterprise)
โโโโโโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ โข File-based storage โ โ โข Database storage โ
โ โข Human readable โ โ โข ACID compliance โ
โ โข Easy debugging โ โ โข Enterprise scale โ
โ โข Simple backup โ โ โข Clustering support โ
โ โข < 10MB datasets โ โ โข Professional monitoring โ
โ โข Development focus โ โ โข Production focus โ
โโโโโโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
- ๐ฏ Hierarchy: 7-level scope system from global to custom
- ๐ง Value Types: Value (primitives) vs Group (containers)
- ๐ฆ Force Registration: Prevents typos and ensures default values
- ๐ Thread Safety: Per-value asyncio locks for concurrent access
- ๐จ Context Managers: Safe mutable data modification patterns
- ๐๏ธ Driver Agnostic: Pluggable storage backends (JSON/PostgreSQL)
- ๐ Caching: Intelligent weak reference caching system
- ๐ Deep Nesting: Unlimited nested data structure support
- ๐งน Cleanup: Comprehensive data cleanup and maintenance APIs
- ๐ฏ Migration: Schema versioning and automated migration tools
๐ก๏ธ PERMISSION FRAMEWORK ARCHITECTURE #
Permission Hierarchy Flow #
# Permission Privilege Levels (Ascending Order)
PRIVILEGE_LEVELS = {
"NONE": 0, # No special privileges
"MOD": 1, # Moderator level
"ADMIN": 2, # Administrator level
"GUILD_OWNER": 3, # Guild owner privileges
"BOT_OWNER": 4 # Bot owner (highest)
}
# Permission Model Implementation
class PermissionManager:
def __init__(self, bot):
self.bot = bot
self.global_perms = GlobalPermissionModel(bot)
self.guild_perms = GuildPermissionModel(bot)
self.channel_perms = ChannelPermissionModel(bot)
async def check_permissions(self, ctx, required_level):
"""Hierarchical permission checking"""
user_level = await self.get_user_privilege_level(ctx)
# Check global whitelist/blacklist first
if await self.global_perms.is_user_blacklisted(ctx.author):
return False
if await self.global_perms.is_user_whitelisted(ctx.author):
return True
# Check guild-specific permissions
if ctx.guild:
guild_level = await self.guild_perms.get_user_level(ctx)
if guild_level >= required_level:
return True
# Fallback to user privilege level
return user_level >= required_level
# Permission Decorators
@commands.check_any(
checks.is_owner(),
checks.admin_or_permissions(manage_guild=True),
checks.has_role("Administrator")
)
async def admin_command(self, ctx):
"""Multi-layer permission checking"""
await ctx.send("Admin command executed")
Permission Inheritance Tree #
Bot Owner (Level 4)
โ (inherits all)
Guild Owner (Level 3)
โ (inherits below)
Administrator (Level 2)
โ (inherits below)
Moderator (Level 1)
โ (inherits below)
Regular User (Level 0)
- ๐ฏ Hierarchy: 5-level privilege system with inheritance
- ๐ง Models: Specialized permission models per scope
- ๐ Predicate System: Flexible predicate-based checking
- ๐จ Decorators: Rich decorator system for command protection
- ๐ Runtime Changes: Dynamic permission modification
- ๐ ๏ธ Integration: Deep Discord role/permission integration
- ๐ Filtering: User and guild whitelist/blacklist systems
- ๐ฏ Validation: Multi-layer validation with safety checks
- ๐ Audit Trail: Permission change logging and history
- ๐ Inheritance: Smart permission inheritance from higher levels
๐ฏ COMMAND FRAMEWORK STRUCTURE (Red 3.5+) #
Command Class Hierarchy #
# Command Type Evolution
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ Command โ โ Group โ โ HybridCommand โ
โ (Basic) โ โ (Container) โ โ (Slash + Text) โ
โโโโโโโโโโโโโโโโโโโค โโโโโโโโโโโโโโโโโโโโค โโโโโโโโโโโโโโโโโโโค
โ โข Single action โ โ โข Sub-commands โ โ โข Modern syntax โ
โ โข Simple args โ โ โข Organization โ โ โข Dual support โ
โ โข Text only โ โ โข Hierarchical โ โ โข Future-proof โ
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
# Advanced Command Implementation
class ModernCog(commands.Cog):
@commands.hybrid_command() # Red 3.5+ hybrid commands
@app_commands.describe(user="User to check")
async def profile(self, ctx, user: discord.Member = None):
"""Get user profile (works as slash and text command)"""
user = user or ctx.author
# Rich context usage
embed = discord.Embed(
title=f"{user.display_name}'s Profile",
color=await ctx.embed_color() # Theme-aware color
)
# Visual confirmation
await ctx.tick() # โ
reaction
await ctx.send(embed=embed)
@commands.group(invoke_without_command=True)
async def config(self, ctx):
"""Configuration command group"""
if ctx.invoked_subcommand is None:
await ctx.send_help(ctx.command)
@config.command()
@commands.admin_or_permissions(manage_guild=True)
@commands.cooldown(1, 30, commands.BucketType.guild)
async def toggle(self, ctx, *, feature: str):
"""Toggle a configuration feature"""
async with self.config.guild(ctx.guild).features() as features:
features[feature] = not features.get(feature, False)
status = "enabled" if features[feature] else "disabled"
await ctx.send(f"Feature `{feature}` {status}")
Converter System Architecture #
# Built-in Converter Ecosystem
BUILT_IN_CONVERTERS = {
# Discord Objects
"discord.Member": "User in current guild",
"discord.User": "Any Discord user",
"discord.TextChannel": "Text channel",
"discord.Role": "Guild role",
"discord.Emoji": "Custom emoji",
# Utility Converters
"commands.Greedy": "Multiple arguments",
"typing.Union": "Multiple type options",
"typing.Optional": "Optional argument",
# Red-specific
"redbot.core.commands.Context": "Enhanced context",
"redbot.core.utils.chat_formatting.TimedeltaConverter": "Time spans"
}
# Custom Converter Example
class ConfigKeyConverter(commands.Converter):
"""Converter for configuration keys with validation"""
async def convert(self, ctx, argument):
valid_keys = await ctx.cog.config.guild(ctx.guild).all()
if argument not in valid_keys:
raise commands.BadArgument(
f"Invalid config key. Valid keys: {', '.join(valid_keys)}"
)
return argument
- ๐ฎ Command Types: Command, Group, HybridCommand for different patterns
- ๐ง Converters: 20+ built-in converters plus custom converter support
- ๐ Context: Enhanced context with utilities like
tick(),embed_color() - ๐จ Help System: Sophisticated help generation with customizable formatting
- ๐ Error Handling: Comprehensive error class hierarchy with graceful handling
- ๐ก๏ธ Validation: Automatic argument validation and type checking
- ๐ฏ Parsing: Advanced argument parsing with flags and keyword support
- ๐ Cooldowns: Sophisticated rate limiting with multiple bucket types
- ๐ง Hooks: Before/after invoke hooks for command processing middleware
- ๐ช Dynamic: Runtime command addition/removal with hot-swapping
๐ AUTOSHARDED BOT FOUNDATION #
Shard Architecture Flow #
# AutoShardedBot Inheritance Chain
discord.AutoShardedBot โ commands.AutoShardedBot โ Red (Bot Class)
class RedBot(commands.AutoShardedBot):
"""Red's enhanced autosharded bot implementation"""
def __init__(self, *args, kwargs):
super().__init__(*args, kwargs)
self.uptime = datetime.utcnow()
self.counter = Counter()
self._config = None
self._disabled_commands = []
async def on_shard_connect(self, shard_id):
"""Called when a shard connects"""
log.info(f"Shard {shard_id} connected")
async def on_shard_disconnect(self, shard_id):
"""Called when a shard disconnects"""
log.warning(f"Shard {shard_id} disconnected")
async def on_shard_ready(self, shard_id):
"""Called when a shard is ready"""
log.info(f"Shard {shard_id} ready")
await self.update_shard_stats(shard_id)
# Shard Health Monitoring
async def monitor_shard_health(bot):
"""Monitor and report shard health"""
while True:
unhealthy_shards = []
for shard_id, shard in bot.shards.items():
if shard.latency > 1.0: # High latency
unhealthy_shards.append(shard_id)
if unhealthy_shards:
log.warning(f"Unhealthy shards detected: {unhealthy_shards}")
await asyncio.sleep(60) # Check every minute
Shard Distribution Logic #
Guild Distribution Across Shards:
Shard 0: Guilds 0, 4, 8, 12... (guild_id % shard_count == 0)
Shard 1: Guilds 1, 5, 9, 13... (guild_id % shard_count == 1)
Shard 2: Guilds 2, 6, 10, 14... (guild_id % shard_count == 2)
Shard 3: Guilds 3, 7, 11, 15... (guild_id % shard_count == 3)
- ๐ฏ Base: Inherits from discord.py’s AutoShardedBot for scalability
- ๐ง Management: Automatic shard management for large bot deployments
- ๐ Distribution: Automatic load balancing across multiple shards
- ๐จ Event Routing: Events distributed to all shards automatically
- ๐ Fault Tolerance: Individual shard failure isolation and recovery
- ๐ก๏ธ Scaling: Automatic scaling based on guild count thresholds
- ๐ฆ Gateway: Intelligent gateway connection management per shard
- ๐ฏ Latency: Per-shard latency tracking and optimization
- ๐ง Reconnection: Robust reconnection handling with exponential backoff
- ๐ Monitoring: Built-in shard health monitoring and alerting
๐ CONCURRENCY & LOCKING (Thread Safety) #
Lock Granularity System #
# Lock Hierarchy (Fine to Coarse)
Per-Value Lock โ Per-Group Lock โ Per-Scope Lock โ Global Lock
# Practical Lock Implementation
class ThreadSafeConfigManager:
def __init__(self, config):
self.config = config
self._locks = {}
async def safe_increment(self, user_id):
"""Thread-safe counter increment"""
lock_key = f"user_{user_id}_counter"
# Get or create lock for this specific value
if lock_key not in self._locks:
self._locks[lock_key] = asyncio.Lock()
async with self._locks[lock_key]:
current = await self.config.user_from_id(user_id).counter()
new_value = current + 1
await self.config.user_from_id(user_id).counter.set(new_value)
return new_value
async def bulk_update_guild_data(self, guild_id, updates):
"""Thread-safe bulk data update"""
async with self.config.guild_from_id(guild_id).all() as guild_data:
for key, value in updates.items():
guild_data[key] = value
# Automatic commit on context exit
# Deadlock Prevention Pattern
async def safe_multi_lock_operation(config, user_id1, user_id2):
"""Prevent deadlocks with ordered locking"""
# Always acquire locks in consistent order (by ID)
ids = sorted([user_id1, user_id2])
async with config.user_from_id(ids[0]).balance.get_lock():
async with config.user_from_id(ids[1]).balance.get_lock():
# Perform operations safely
balance1 = await config.user_from_id(ids[0]).balance()
balance2 = await config.user_from_id(ids[1]).balance()
# Transfer between users safely
await config.user_from_id(ids[0]).balance.set(balance1 - 100)
await config.user_from_id(ids[1]).balance.set(balance2 + 100)
Concurrency Best Practices #
# Lock Timeout Handling
async def safe_config_operation(config, timeout=5.0):
"""Config operation with timeout protection"""
try:
async with asyncio.wait_for(
config.guild(guild).settings.get_lock(),
timeout=timeout
):
# Perform operation
settings = await config.guild(guild).settings()
settings["last_updated"] = datetime.utcnow()
await config.guild(guild).settings.set(settings)
except asyncio.TimeoutError:
log.warning("Config operation timed out")
return False
return True
# Resource Cleanup Pattern
class ResourceManagedCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.active_locks = WeakSet()
async def cog_unload(self):
"""Ensure all locks are released on unload"""
for lock in list(self.active_locks):
if lock.locked():
lock.release()
self.active_locks.clear()
- ๐ฏ Granularity: Per-value locks minimize contention
- ๐ง Context Managers: Async context managers for automatic cleanup
- ๐ Deadlock Prevention: Ordered locking algorithms prevent deadlocks
- ๐จ Performance: Lock-free operations where thread safety isn’t needed
- ๐ Timeout Handling: Configurable lock timeouts prevent hanging
- ๐ก๏ธ Error Recovery: Automatic lock cleanup on exceptions
- ๐ฆ Resource Management: Weak references for automatic lock cleanup
- ๐ฏ Debugging: Lock state monitoring and debugging capabilities
- ๐ง Optimization: Lock contention minimization strategies
- ๐ Metrics: Performance monitoring for lock optimization
๐ฆ SINGLETON PATTERN IMPLEMENTATION (ConfigMeta) #
Metaclass Architecture #
# ConfigMeta Implementation Details
class ConfigMeta(type):
"""Metaclass ensuring singleton config per cog/identifier"""
_instances = {} # Global registry of config instances
def __call__(cls, cog_instance, identifier, kwargs):
# Create unique key for cog instance + identifier
key = (id(cog_instance), identifier)
if key not in cls._instances:
# Create new instance if not exists
instance = super().__call__(cog_instance, identifier, kwargs)
cls._instances[key] = instance
log.debug(f"Created new config instance: {key}")
else:
log.debug(f"Reusing existing config instance: {key}")
return cls._instances[key]
@classmethod
def cleanup_instance(cls, cog_instance, identifier):
"""Clean up config instance when cog unloads"""
key = (id(cog_instance), identifier)
if key in cls._instances:
del cls._instances[key]
log.debug(f"Cleaned up config instance: {key}")
# Config Class with Singleton Pattern
class Config(metaclass=ConfigMeta):
"""Configuration manager with singleton enforcement"""
def __init__(self, cog_instance, identifier, kwargs):
self.cog_instance = cog_instance
self.identifier = identifier
self._driver = None
self._defaults = {}
@classmethod
def get_conf(cls, cog_instance, identifier, kwargs):
"""Primary entry point - enforces singleton pattern"""
return cls(cog_instance, identifier, kwargs)
def __del__(self):
"""Cleanup on garbage collection"""
if hasattr(self, 'cog_instance') and hasattr(self, 'identifier'):
ConfigMeta.cleanup_instance(self.cog_instance, self.identifier)
# Usage Pattern Ensuring Singleton
class MyCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
# This will always return the same instance for this cog
self.config = Config.get_conf(self, identifier=12345)
async def cog_unload(self):
"""Singleton cleanup on cog unload"""
ConfigMeta.cleanup_instance(self, 12345)
Instance Tracking System #
# Debug and Monitoring Tools
class ConfigInstanceMonitor:
@staticmethod
def get_active_instances():
"""Get all active config instances"""
return list(ConfigMeta._instances.keys())
@staticmethod
def get_instance_count():
"""Get total number of active instances"""
return len(ConfigMeta._instances)
@staticmethod
def detect_leaked_instances():
"""Detect potentially leaked config instances"""
leaked = []
for (cog_id, identifier), instance in ConfigMeta._instances.items():
# Check if cog instance still exists
if not hasattr(instance, 'cog_instance') or instance.cog_instance is None:
leaked.append((cog_id, identifier))
return leaked
- ๐ฏ Metaclass: ConfigMeta ensures single instance per cog/identifier pair
- ๐ง Registry: Global instance registry for tracking and cleanup
- ๐ Memory Safety: Prevents memory leaks from duplicate configs
- ๐จ Conflict Prevention: Eliminates configuration conflicts and overwrites
- ๐ Cleanup: Automatic cleanup on cog unload and garbage collection
- ๐ก๏ธ Thread Safety: Thread-safe singleton implementation with locks
- ๐ฆ Resource Efficiency: Optimal memory usage through instance reuse
- ๐ฏ Debugging: Instance tracking and leak detection capabilities
- ๐ง Validation: Duplicate prevention with clear error messages
- ๐ Monitoring: Instance lifecycle monitoring and reporting
โก QUICK ARCHITECTURE REFERENCE (Red 3.5+) #
# Complete Modern Cog Template (Production-Ready)
from redbot.core import commands, Config, checks
import asyncio
import logging
log = logging.getLogger("red.mycog")
# REQUIRED: GDPR compliance statement
__red_end_user_data_statement__ = (
"This cog stores user IDs and guild IDs for configuration purposes. "
"No personal data is stored."
)
class ProductionCog(commands.Cog):
"""Production-ready cog with all best practices"""
def __init__(self, bot):
self.bot = bot
self.config = Config.get_conf(
self,
identifier=1234567890,
force_registration=True # RED 3.5+ BEST PRACTICE
)
# Structured defaults
self.config.register_global(version="1.0.0", enabled=True)
self.config.register_guild(
enabled=True,
settings={"feature": False},
channels={"log": None}
)
self.background_tasks = []
async def cog_load(self):
"""Initialize after cog loading"""
log.info("ProductionCog loading...")
await self.start_background_tasks()
async def cog_unload(self):
"""Cleanup before cog removal"""
log.info("ProductionCog unloading...")
for task in self.background_tasks:
if not task.done():
task.cancel()
if self.background_tasks:
await asyncio.gather(*self.background_tasks, return_exceptions=True)
async def red_delete_data_for_user(self, kwargs):
"""REQUIRED: GDPR data deletion"""
user_id = kwargs.get("user_id")
if user_id:
await self.config.user_from_id(user_id).clear()
@commands.hybrid_command() # Modern hybrid command
async def hello(self, ctx):
"""Say hello (works as slash and text command)"""
await ctx.tick() # Visual confirmation
await ctx.send("Hello!")
# REQUIRED: Async setup function
async def setup(bot):
"""Load the cog - MUST BE ASYNC in Red 3.5+"""
cog = ProductionCog(bot)
await bot.add_cog(cog) # add_cog is async
# Instance Management Commands
redbot-setup # Create new instance
redbot <instance> # Run instance
redbot <instance> --edit # Edit instance config
# Essential CLI Commands
[p]load cogname # Load cog
[p]unload cogname # Unload cog
[p]reload cogname # Reload cog (hot reload)
[p]cogs # List all cogs
# Config Scope Quick Reference
GLOBAL = 0 # await config.setting()
GUILD = 1 # await config.guild(guild).setting()
CHANNEL = 2 # await config.channel(channel).setting()
ROLE = 3 # await config.role(role).setting()
USER = 4 # await config.user(user).setting()
MEMBER = 5 # await config.member(member).setting()
CUSTOM = 6 # await config.custom("SCOPE", id).setting()
๐๏ธ Master Red 3.5+ architecture patterns for bulletproof, scalable, and maintainable Discord bots. This foundation enables all advanced features with 100% accuracy and modern best practices!