Reusable containers

By default, each task execution in Flyte and Union runs in a fresh container instance that is created just for that execution and then discarded. With reusable containers, the same container can be reused across multiple executions and tasks. This approach reduces start up overhead and improves resource efficiency.

The reusable container feature is only available when running your Flyte code on a Union backend.

How It Works

With reusable containers, the system maintains a pool of persistent containers that can handle multiple task executions. When you configure a TaskEnvironment with a ReusePolicy, the system does the following:

  1. Creates a pool of persistent containers.
  2. Routes task executions to available container instances.
  3. Manages container lifecycle with configurable timeouts.
  4. Supports concurrent task execution within containers (for async tasks).
  5. Preserves the Python execution environment across task executions, allowing you to maintain state through global variables.

Basic Usage

The reusable containers feature currently requires a dedicated runtime library ( unionai-reuse) to be installed in the task image used by the reusable task. You can add this library to your task image using the flyte.Image.with_pip_packages method, as shown below. This library only needs to be added to the task image. It does not need to be installed in your local development environment.

Enable container reuse by adding a ReusePolicy to your TaskEnvironment:

import flyte

# Currently required to enable resuable containers
reusable_image = flyte.Image.from_debian_base().with_pip_packages("unionai-reuse>=0.1.3")

env = flyte.TaskEnvironment(
    name="reusable-env",
    resources=flyte.Resources(memory="1Gi", cpu="500m"),
    reusable=flyte.ReusePolicy(
        replicas=2,                           # Create 2 container instances
        concurrency=1,                        # Process 1 task per container at a time
        scaledown_ttl=timedelta(minutes=10),  # Individual containers shut down after 5 minutes of inactivity
        idle_ttl=timedelta(hours=1)           # Entire environment shuts down after 30 minutes of no tasks
    ),
    image=reusable_image  # Use the container image augmented with the unionai-reuse library.
)

@env.task
async def compute_task(x: int) -> int:
    return x * x

@env.task
async def main() -> list[int]:
    # These tasks will reuse containers from the pool
    results = []
    for i in range(10):
        result = await compute_task(i)
        results.append(result)
    return results

ReusePolicy parameters

The ReusePolicy class controls how containers are managed in a reusable environment:

flyte.ReusePolicy(
    replicas: typing.Union[int, typing.Tuple[int, int]],
    concurrency: int,
    scaledown_ttl: typing.Union[int, datetime.timedelta],
    idle_ttl: typing.Union[int, datetime.timedelta]
)

replicas: Container pool size

Controls the number of container instances in the reusable pool:

  • Fixed size: replicas=3 Creates exactly 3 container instances. These 3 replicas will be shutdown after idle_ttl expires.
  • Auto-scaling: replicas=(2, 5) Starts with 2 containers and can scale up to 5 based on demand.
    • If the task is running on 2 replicas and demand drops to zero then these 2 containers will be shutdown after idle_ttl expires.
    • If the task is running on 2 replicas and demand increases, new containers will be created up to the maximum of 5.
    • If the task is running on 5 replicas and demand drops, container 5 will be shutdown after scaledown_ttl expires.
    • If demand drops again, container 4 will be also shutdown after another period of scaledown_ttl expires.
  • Resource impact: Each replica consumes the full resources defined in TaskEnvironment.resources.
# Fixed pool size
reuse_policy = flyte.ReusePolicy(
    replicas=3,
    concurrency=1,
    scaledown_ttl=timedelta(minutes=10),
    idle_ttl=timedelta(hours=1)
)

# Auto-scaling pool
reuse_policy = flyte.ReusePolicy(
    replicas=(1, 10),
    concurrency=1,
    scaledown_ttl=timedelta(minutes=10),
    idle_ttl=timedelta(hours=1)
)

concurrency: Tasks per container

Controls how many tasks can execute simultaneously within a single container:

  • Default: concurrency=1 (one task per container at a time).
  • Higher concurrency: concurrency=5 allows 5 tasks to run simultaneously in each container.
  • Total capacity: replicas × concurrency = maximum concurrent tasks across the entire pool.
# Sequential processing (default)
sequential_policy = flyte.ReusePolicy(
    replicas=2,
    concurrency=1,  # One task per container
    scaledown_ttl=timedelta(minutes=10),
    idle_ttl=timedelta(hours=1)
)

# Concurrent processing
concurrent_policy = flyte.ReusePolicy(
    replicas=2,
    concurrency=5,  # 5 tasks per container = 10 total concurrent tasks
    scaledown_ttl=timedelta(minutes=10),
    idle_ttl=timedelta(hours=1)
)

idle_ttl vs scaledown_ttl: Container lifecycle

These parameters work together to manage container lifecycle at different levels:

idle_ttl: Environment timeout

  • Scope: Controls the entire reusable environment infrastructure.
  • Behavior: When there are no active or queued tasks, the entire environment scales down after idle_ttl expires.
  • Purpose: Manages the lifecycle of the entire container pool.
  • Typical values: 1-2 hours, or None for always-on environments

scaledown_ttl: Individual container timeout

  • Scope: Controls individual container instances.
  • Behavior: When a container finishes a task and becomes inactive, it will be terminated after scaledown_ttl expires.
  • Purpose: Prevents resource waste from inactive containers.
  • Typical values: 5-30 minutes for most workloads.
from datetime import timedelta

lifecycle_policy = flyte.ReusePolicy(
    replicas=3,
    concurrency=2,
    scaledown_ttl=timedelta(minutes=10),  # Individual containers shut down after 10 minutes of inactivity
    idle_ttl=timedelta(hours=1)         # Entire environment shuts down after 1 hour of no tasks
)

Understanding parameter relationships

The four ReusePolicy parameters work together to control different aspects of container management:

reuse_policy = flyte.ReusePolicy(
    replicas=4,                           # Infrastructure: How many containers?
    concurrency=3,                        # Throughput: How many tasks per container?
    scaledown_ttl=timedelta(minutes=10),  # Individual: When do idle containers shut down?
    idle_ttl=timedelta(hours=1)           # Environment: When does the whole pool shut down?
)
# Total capacity: 4 × 3 = 12 concurrent tasks
# Individual containers shut down after 10 minutes of inactivity
# Entire environment shuts down after 1 hour of no tasks

Key relationships

  • Total throughput = replicas × concurrency
  • Resource usage = replicas × TaskEnvironment.resources
  • Cost efficiency: Higher concurrency reduces container overhead, more replicas provides better isolation
  • Lifecycle management: scaledown_ttl manages individual containers, idle_ttl manages the environment

Machine learning example

A good use case for re-usable containers is machine learning inference. The overhead of loading a large model can be significant, so re-using containers for multiple inference requests can improve efficiency.

In this example we mock the model loading and prediction process. The full source code can be found on GitHUb.

First, import the needed modules:

import asyncio
import time
from typing import List

import flyte
from async_lru import alru_cache
Next, mock-up the model loading and prediction process:
# Mock expensive model that takes time to "load"
class ExpensiveModel:
    def __init__(self):
        self.loaded_at = time.time()
        print(f"✅ Model loaded successfully at {self.loaded_at}")

    @classmethod
    async def create(cls):
        """Async factory method to create the expensive model"""
        print("🔄 Loading expensive model... (this takes 5 seconds)")
        await asyncio.sleep(5)  # Simulate expensive model loading
        return cls()

    def predict(self, data: List[float]) -> float:
        # Simple mock prediction: return sum of inputs
        result = sum(data) * 1.5  # Some "AI" calculation
        print(f"🧠 Model prediction: {data} -> {result}")
        return result


@alru_cache(maxsize=1)
async def load_expensive_model() -> ExpensiveModel:
    """Async factory function to create the expensive model with caching"""
    return await ExpensiveModel.create()
Now, we set up the reusable task environment. Note that, currently, the image used for a reusable environment requires an extra package to be installed:
# Currently required to enable reusable containers
reusable_image = flyte.Image.from_debian_base().with_pip_packages("unionai-reuse>=0.1.3")


env = flyte.TaskEnvironment(
    name="ml_env",
    resources=flyte.Resources(memory="2Gi", cpu="1"),
    reusable=flyte.ReusePolicy(
        replicas=1,         # Single container to clearly see reuse
        concurrency=3,      # Allow 3 concurrent predictions
        scaledown_ttl=300,  # Keep container alive for 5 minutes
        idle_ttl=1800       # Keep environment alive for 30 minutes
    ),
    image=reusable_image
)

We define the do_predict task that loads the model and performs predictions using that model.

The key aspect of this task is that the model is loaded once per container and reused for all subsequent predictions, thus minimizes the overhead.

This is achieved through the use of a global variable to store the model and a lock to ensure that the model is only loaded once.

# Model loaded once per container
model = None
model_lock = asyncio.Lock()

@env.task
async def do_predict(data: List[float]) -> float:
    """
    Prediction task that loads the model once per container
    and reuses it for subsequent predictions.
    """
    global model

    print(f"🚀 Task started with data: {data}")

    # Thread-safe lazy loading of the expensive model
    if model is None:
        async with model_lock:
            if model is None:  # Double-check pattern
                print("📦 No model found, loading expensive model...")
                # Load the model asynchronously with caching
                model = await load_expensive_model()
            else:
                print("⚡ Another task already loaded the model while we waited")
    else:
        print("⚡ Model already loaded, reusing existing model")

    # Use the model for prediction
    result = model.predict(data)
    print(f"✨ Task completed: {data} -> {result}")
    return result
The main task ofthe workflow drives the prediction loop with a set of test data:
@env.task
async def main() -> List[float]:
    """
    Main workflow that calls do_predict multiple times.
    The first call will load the model, subsequent calls will reuse it.
    """
    print("🎯 Starting ML inference workflow with reusable containers")

    # Test data for predictions
    test_data = [
        [1.0, 2.0, 3.0],
        [4.0, 5.0, 6.0],
        [7.0, 8.0, 9.0],
        [10.0, 11.0, 12.0],
        [13.0, 14.0, 15.0]
    ]

    print(f"📊 Running {len(test_data)} predictions...")

    # Run predictions - these may execute concurrently due to concurrency=3
    # but they'll all reuse the same model once it's loaded
    results = []
    for i, data in enumerate(test_data):
        print(f"📤 Submitting prediction {i+1}/{len(test_data)}")
        result = await do_predict(data)
        results.append(result)

        # Small delay to see the timing more clearly
        await asyncio.sleep(1)

    print("🏁 All predictions completed!")
    print(f"📈 Results: {results}")
    return results
Finally, we deploy and run the workflow programmatically, so all you have to do is execute python reuse.py to see it in action:
if __name__ == "__main__":
    # Establish a remote connection from within your script.
    flyte.init_from_config()

    # Run your tasks remotely inline and pass parameter data.
    run = flyte.run(main)

    # Print various attributes of the run.
    print(run.name)
    print(run.url)

    # Stream the logs from the remote run to the terminal.
    run.wait(run)