Task Behaviors
Docket tasks can declare behavioral defaults like retries, timeouts, progress reporting, and concurrency limits as default parameter values. Docket's dependency injection resolves them at execution time, so you just write a normal function signature.
Perpetual Tasks
Perpetual tasks automatically reschedule themselves, making them well-suited for recurring work like health checks, data synchronization, or periodic cleanup operations.
Basic Perpetual Tasks
from docket import Perpetual
async def health_check_service(
service_url: str,
perpetual: Perpetual = Perpetual(every=timedelta(minutes=5))
) -> None:
try:
response = await http_client.get(f"{service_url}/health")
response.raise_for_status()
print(f"✓ {service_url} is healthy")
except Exception as e:
print(f"✗ {service_url} failed health check: {e}")
await send_alert(f"Service {service_url} is down")
# Schedule the task once, it will run every 5 minutes forever
await docket.add(health_check_service)("https://api.example.com")
After each execution, the task automatically schedules itself to run again after the specified interval.
Automatic Startup
Perpetual tasks can start themselves automatically when a worker sees them, without needing to be explicitly scheduled:
async def background_cleanup(
perpetual: Perpetual = Perpetual(
every=timedelta(hours=1),
automatic=True
)
) -> None:
deleted_count = await cleanup_old_records()
print(f"Cleaned up {deleted_count} old records")
# Just register the task - no need to schedule it
docket.register(background_cleanup)
# When a worker starts, it will automatically begin running this task
# The task key will be the function name: "background_cleanup"
Self-Canceling Tasks
Perpetual tasks can stop themselves when their work is done:
async def monitor_deployment(
deployment_id: str,
perpetual: Perpetual = Perpetual(every=timedelta(seconds=30))
) -> None:
status = await check_deployment_status(deployment_id)
if status in ["completed", "failed"]:
await notify_deployment_finished(deployment_id, status)
perpetual.cancel() # Stop monitoring this deployment
return
print(f"Deployment {deployment_id} status: {status}")
Dynamic Parameters
Perpetual tasks can change their arguments or timing for the next execution:
async def adaptive_rate_limiter(
api_endpoint: str,
requests_per_minute: int = 60,
perpetual: Perpetual = Perpetual(every=timedelta(minutes=1))
) -> None:
# Check current API load
current_load = await check_api_load(api_endpoint)
if current_load > 0.8: # High load
new_rate = max(30, requests_per_minute - 10)
perpetual.every = timedelta(seconds=30) # Check more frequently
print(f"High load detected, reducing rate to {new_rate} req/min")
else: # Normal load
new_rate = min(120, requests_per_minute + 5)
perpetual.every = timedelta(minutes=1) # Normal check interval
print(f"Normal load, increasing rate to {new_rate} req/min")
# Schedule next run with updated parameters
perpetual.perpetuate(api_endpoint, new_rate)
Error Resilience
Perpetual tasks automatically reschedule themselves regardless of success or failure:
async def resilient_sync(
source_url: str,
perpetual: Perpetual = Perpetual(every=timedelta(minutes=15))
) -> None:
# This will ALWAYS reschedule, whether it succeeds or fails
await sync_data_from_source(source_url)
print(f"Successfully synced data from {source_url}")
You don't need try/except blocks to ensure rescheduling - Docket handles this automatically. Whether the task completes successfully or raises an exception, the next execution will be scheduled according to the every interval.
Cron Tasks
For tasks that need to run at specific wall-clock times rather than at fixed intervals, use the Cron dependency. It extends Perpetual with cron expression support, scheduling the next run at the exact matching time after each execution.
Basic Cron Expressions
from docket import Cron
async def weekly_report(cron: Cron = Cron("0 9 * * 1")) -> None:
# Runs every Monday at 9:00 AM UTC
await generate_and_send_report()
async def hourly_sync(cron: Cron = Cron("0 * * * *")) -> None:
# Runs at the top of every hour
await sync_external_data()
Cron uses standard 5-field syntax: minute hour day month weekday.
Vixie Keywords
For common schedules, use the shorthand keywords:
async def daily_cleanup(cron: Cron = Cron("@daily")) -> None:
await cleanup_old_records()
async def hourly_check(cron: Cron = Cron("@hourly")) -> None:
await check_service_health()
Supported keywords: @yearly, @annually, @monthly, @weekly, @daily, @midnight, @hourly.
Timezone Support
By default, cron expressions are interpreted in UTC. Pass a tz argument to use a different timezone — this handles daylight saving time transitions automatically:
from zoneinfo import ZoneInfo
async def morning_standup(
cron: Cron = Cron("0 9 * * 1-5", tz=ZoneInfo("America/Los_Angeles"))
) -> None:
# Runs weekdays at 9:00 AM Pacific, adjusting for DST
await send_standup_reminder()
async def tokyo_report(
cron: Cron = Cron("30 17 * * *", tz=ZoneInfo("Asia/Tokyo"))
) -> None:
# Runs at 5:30 PM JST every day
await generate_daily_report()
Automatic Scheduling
Like Perpetual, cron tasks default to automatic=True, meaning they start themselves when a worker sees them — no explicit docket.add() call needed:
# Just register the task; the worker handles scheduling
docket.register(weekly_report)
docket.register(daily_cleanup)
Since cron tasks are automatic by default, they must not require any arguments.
Retrying Tasks
Exponential Backoff
For services that might be overloaded, exponential backoff gives them time to recover:
from docket import ExponentialRetry
async def call_external_api(
url: str,
retry: ExponentialRetry = ExponentialRetry(
attempts=5,
minimum_delay=timedelta(seconds=1),
maximum_delay=timedelta(minutes=5)
)
) -> None:
# Retries with delays: 1s, 2s, 4s, 8s, 16s (but capped at 5 minutes)
try:
response = await http_client.get(url)
response.raise_for_status()
print(f"API call succeeded on attempt {retry.attempt}")
except Exception as e:
print(f"Attempt {retry.attempt} failed: {e}")
raise
Unlimited Retries
For critical tasks that must eventually succeed, use attempts=None:
from docket import Retry
async def critical_data_sync(
source_url: str,
retry: Retry = Retry(attempts=None, delay=timedelta(minutes=5))
) -> None:
# This will retry forever with 5-minute delays until it succeeds
await sync_critical_data(source_url)
print(f"Critical sync completed after {retry.attempt} attempts")
Both Retry and ExponentialRetry support unlimited retries this way.
Task Timeouts
Prevent tasks from running too long with the Timeout dependency:
from docket import Timeout
async def data_processing_task(
large_dataset: dict,
timeout: Timeout = Timeout(timedelta(minutes=10))
) -> None:
# This task will be cancelled if it runs longer than 10 minutes
await process_dataset_phase_one(large_dataset)
# Extend timeout if we need more time for phase two
timeout.extend(timedelta(minutes=5))
await process_dataset_phase_two(large_dataset)
The extend() method can take a specific duration or default to the original timeout duration:
async def adaptive_timeout_task(
timeout: Timeout = Timeout(timedelta(minutes=2))
) -> None:
await quick_check()
# Extend by the base timeout (another 2 minutes)
timeout.extend()
await longer_operation()
Timeouts work alongside retries. If a task times out, it can be retried according to its retry policy.
Reporting Task Progress
The Progress() dependency provides access to the current task's progress tracker, allowing tasks to report their progress to external observers:
from docket import Progress
from docket.execution import ExecutionProgress
async def import_records(
file_path: str,
progress: ExecutionProgress = Progress()
) -> None:
records = await load_records(file_path)
# Set the total number of items to process
await progress.set_total(len(records))
await progress.set_message("Starting import")
for i, record in enumerate(records, 1):
await import_record(record)
# Update progress atomically
await progress.increment()
# Optionally update status message
if i % 100 == 0:
await progress.set_message(f"Imported {i}/{len(records)} records")
await progress.set_message("Import complete")
Progress updates are:
- Atomic:
increment()uses Redis HINCRBY for thread-safe updates - Real-time: Updates published via pub/sub for live monitoring
- Observable: Can be monitored with
docket watchCLI or programmatically - Ephemeral: Progress data is automatically deleted when the task completes
The ExecutionProgress object provides these methods:
set_total(total: int): Set the target/total value for progress trackingincrement(amount: int = 1): Atomically increment the current progress valueset_message(message: str): Update the status messagesync(): Refresh local state from Redis
For more details on progress monitoring patterns and real-time observation, see Task Observability.
Concurrency Control
Docket provides fine-grained concurrency control that allows you to limit the number of concurrent tasks based on specific argument values. This is essential for protecting shared resources, preventing overwhelming external services, and managing database connections.
Basic Concurrency Limits
Use ConcurrencyLimit to restrict concurrent execution based on task arguments:
from docket import ConcurrencyLimit
async def process_customer_data(
customer_id: int,
concurrency: ConcurrencyLimit = ConcurrencyLimit("customer_id", max_concurrent=1)
) -> None:
# Only one task per customer_id can run at a time
await update_customer_profile(customer_id)
await recalculate_customer_metrics(customer_id)
# These will run sequentially for the same customer
await docket.add(process_customer_data)(customer_id=1001)
await docket.add(process_customer_data)(customer_id=1001)
await docket.add(process_customer_data)(customer_id=1001)
# But different customers can run concurrently
await docket.add(process_customer_data)(customer_id=2001) # Runs in parallel
await docket.add(process_customer_data)(customer_id=3001) # Runs in parallel
Database Connection Pooling
Limit concurrent database operations to prevent overwhelming your database:
async def backup_database_table(
db_name: str,
table_name: str,
concurrency: ConcurrencyLimit = ConcurrencyLimit("db_name", max_concurrent=2)
) -> None:
# Maximum 2 backup operations per database at once
await create_table_backup(db_name, table_name)
await verify_backup_integrity(db_name, table_name)
# Schedule many backup tasks - only 2 per database will run concurrently
tables = ["users", "orders", "products", "analytics", "logs"]
for table in tables:
await docket.add(backup_database_table)("production", table)
await docket.add(backup_database_table)("staging", table)
API Rate Limiting
Protect external APIs from being overwhelmed:
async def sync_user_with_external_service(
user_id: int,
service_name: str,
concurrency: ConcurrencyLimit = ConcurrencyLimit("service_name", max_concurrent=5)
) -> None:
# Limit to 5 concurrent API calls per external service
api_client = get_api_client(service_name)
user_data = await fetch_user_data(user_id)
await api_client.sync_user(user_data)
# These respect per-service limits
await docket.add(sync_user_with_external_service)(123, "salesforce")
await docket.add(sync_user_with_external_service)(456, "salesforce") # Will queue if needed
await docket.add(sync_user_with_external_service)(789, "hubspot") # Different service, runs in parallel
File Processing Limits
Control concurrent file operations to manage disk I/O:
async def process_media_file(
file_path: str,
operation_type: str,
concurrency: ConcurrencyLimit = ConcurrencyLimit("operation_type", max_concurrent=3)
) -> None:
# Limit concurrent operations by type (e.g., 3 video transcodes, 3 image resizes)
if operation_type == "video_transcode":
await transcode_video(file_path)
elif operation_type == "image_resize":
await resize_image(file_path)
elif operation_type == "audio_compress":
await compress_audio(file_path)
# Different operation types can run concurrently, but each type is limited
await docket.add(process_media_file)("/videos/movie1.mp4", "video_transcode")
await docket.add(process_media_file)("/videos/movie2.mp4", "video_transcode")
await docket.add(process_media_file)("/images/photo1.jpg", "image_resize") # Runs in parallel
Custom Scopes
Use custom scopes to create independent concurrency limits:
async def process_tenant_data(
tenant_id: str,
operation: str,
concurrency: ConcurrencyLimit = ConcurrencyLimit(
"tenant_id",
max_concurrent=2,
scope="tenant_operations"
)
) -> None:
# Each tenant can have up to 2 concurrent operations
await perform_tenant_operation(tenant_id, operation)
async def process_global_data(
data_type: str,
concurrency: ConcurrencyLimit = ConcurrencyLimit(
"data_type",
max_concurrent=1,
scope="global_operations" # Separate from tenant operations
)
) -> None:
# Global operations have their own concurrency limits
await process_global_data_type(data_type)
Multi-Level Concurrency
Combine multiple concurrency controls for complex scenarios:
async def process_user_export(
user_id: int,
export_type: str,
region: str,
user_limit: ConcurrencyLimit = ConcurrencyLimit("user_id", max_concurrent=1),
type_limit: ConcurrencyLimit = ConcurrencyLimit("export_type", max_concurrent=3),
region_limit: ConcurrencyLimit = ConcurrencyLimit("region", max_concurrent=10)
) -> None:
# This task respects ALL concurrency limits:
# - Only 1 export per user at a time
# - Only 3 exports of each type globally
# - Only 10 exports per region
await generate_user_export(user_id, export_type, region)
Note: When using multiple ConcurrencyLimit dependencies, all limits must be satisfied before the task can start.
Monitoring Concurrency
Concurrency limits are enforced using Redis sets, so you can monitor them:
async def monitor_concurrency_usage() -> None:
async with docket.redis() as redis:
# Check how many tasks are running for a specific limit
active_count = await redis.scard("docket:concurrency:customer_id:1001")
print(f"Customer 1001 has {active_count} active tasks")
# List all active concurrency keys
keys = await redis.keys("docket:concurrency:*")
for key in keys:
count = await redis.scard(key)
print(f"{key}: {count} active tasks")
Tips
-
Choose appropriate argument names: Use arguments that represent the resource you want to protect (database name, customer ID, API endpoint).
-
Set reasonable limits: Base limits on your system's capacity and external service constraints.
-
Use descriptive scopes: When you have multiple unrelated concurrency controls, use different scopes to avoid conflicts.
-
Monitor blocked tasks: Tasks that can't start due to concurrency limits are automatically rescheduled with small delays.
-
Consider cascading effects: Concurrency limits can create queuing effects - monitor your system to ensure tasks don't back up excessively.