Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 65 additions & 118 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,159 +1,106 @@
# Collaborative SLAM Exploration for Robots

A proof-of-concept implementation of the Saga pattern for orchestrating multi-step, distributed workflows using Celery, Redis, PostgreSQL, and Flower. The scenario simulates collaborative SLAM (Simultaneous Localization and Mapping) exploration by multiple robots, with robust rollback (compensation) logic for failures.
A proof-of-concept that demonstrates how Celery, Redis Streams, and async command handlers can implement the Saga pattern for collaborative robot exploration. The project coordinates mission phases—resource allocation, route planning, exploration, and map integration—while providing compensating actions and rich telemetry.
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to fix it -- main point is Even driven development patterns Request/Reply and Saga. And we use them to split complex mission into async stages.


## Key Features
- **Saga orchestration:** The Celery flow builds a canvas of mission tasks with compensations using `link_error`. 【F:app/flows/mission_start_celery/orchestrator.py†L27-L50】
Copy link

Copilot AI Sep 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reference format '【F:app/flows/mission_start_celery/orchestrator.py†L27-L50】' uses non-standard documentation syntax. Consider using standard markdown links or code references for better readability.

Suggested change
- **Saga orchestration:** The Celery flow builds a canvas of mission tasks with compensations using `link_error`. 【F:app/flows/mission_start_celery/orchestrator.pyL27-L50
- **Saga orchestration:** The Celery flow builds a canvas of mission tasks with compensations using `link_error`. [app/flows/mission_start_celery/orchestrator.py#L27-L50](app/flows/mission_start_celery/orchestrator.py#L27-L50)

Copilot uses AI. Check for mistakes.
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Celery/Flower aren't key features -- we have 2 backends which brings their own tools but key features are implementing with 2 different backends

- **Redis-backed services:** Celery tasks exchange commands and replies with async handlers via Redis Streams. 【F:app/flows/mission_start_celery/tasks.py†L18-L75】
- **Async command bus:** The listener discovers handlers dynamically and manages consumer groups for each mission stream. 【F:app/commands/listener.py†L18-L109】
- **Observability:** Flower consumes Celery events while handlers emit start/progress/completed telemetry for dashboards. 【F:docker-compose.yml†L23-L34】【F:app/redis_utils/decorators.py†L24-L55】

- **Saga Pattern Orchestration:** Implements the Saga pattern using Celery to coordinate a sequence of tasks and compensations.
- **Celery Task Queue:** Asynchronous task execution and orchestration.
- **Redis Broker & Backend:** Fast in-memory message broker and result backend for Celery.
- **PostgreSQL Database:** Shared persistent state for simulating service data.
- **Flower Monitoring UI:** Real-time visualization and debugging of task flows.
- **Docker Compose Infrastructure:** All components run as containers for easy setup and teardown.
- **Optional FastAPI Trigger:** HTTP endpoint for starting Sagas (optional).
Comprehensive architecture and lessons learned are available in the [`docs/`](docs) folder.

## Architecture Overview
## Repository Layout

- **Celery Worker:** Runs all Saga step and compensation tasks.
- **Saga Orchestrator:** Coordinates task execution and compensation on failure.
- **Redis:** Message broker and result backend for Celery.
- **PostgreSQL:** Shared database for simulating persistent state.
- **Flower:** Web UI for monitoring Celery tasks.
- **(Optional) FastAPI:** HTTP API to trigger Saga runs.

All services are defined in `docker-compose.yml` and run together on a single machine.

## Prerequisites

- Docker & Docker Compose (or Podman Compose)
- Python 3.11+ (for local development/testing)
- (Optional) Make sure ports 5432 (Postgres), 6379 (Redis), 5555 (Flower), and 8000 (API) are available.

## Quick Start
```
app/
├── celery_app.py # Celery configuration and Redis readiness probe
├── commands/ # Redis stream listener and handler implementations
├── flows/ # Celery and async saga orchestrators
└── redis_utils/ # Request/reply helpers and telemetry decorators
docker-compose.yml # Container stack for Redis, PostgreSQL, Celery, Flower, RedisInsight
Dockerfile # Runtime image for Celery worker and listener
docs/ # High-level architecture and lessons learned
scripts/ # Helper scripts for local automation
tests/ # Unit and integration suites (pytest)
```

1. **Clone the repository:**
## Getting Started

```bash
git clone https://github.com/hyzhak/collaborative-slam-exporation-for-robots.git
cd collaborative-slam-exporation-for-robots
```
### Prerequisites

2. **Build and start all services:**
- Docker and Docker Compose (v2 syntax)
- Optional: Python 3.11+ if you wish to run tests without containers

```bash
docker-compose up --build -d
```
### Launch the Stack

3. **Check that all containers are running:**
1. Clone the repository and move into the project directory.
2. Start all services:

```bash
docker-compose ps
docker compose up --build
```

4. **(Optional) Initialize the database:**
If your tasks require tables, create them in the Postgres container.
This command launches Redis, PostgreSQL, the Celery worker, the Redis stream listener, Flower, and RedisInsight. Health checks ensure Redis is available before Celery starts. 【F:docker-compose.yml†L1-L74】

5. **Trigger a Saga run:**
- **Via container (recommended):**
3. Watch logs for the `orchestrator` service to confirm the listener is consuming `mission:commands` events.

```bash
podman-compose exec celery_worker python -m app.orchestrator 2 ZoneA
```
### Trigger a Mission

- Adjust arguments as needed for robot count, area, or failure simulation.
Publish a `mission:start` command to Redis Streams. The required fields are documented in the mission handler. 【F:app/commands/handlers/start_mission.py†L24-L47】

- **Via API (if enabled):**
Send a POST request to `/start_saga` on port 8000.
```bash
docker compose exec redis redis-cli XADD mission:commands * \
event_type mission:start \
correlation_id demo-1 \
robot_count 2 \
area ZoneA \
reply_stream mission:replies:demo-1 \
backend celery
```

6. **Monitor progress:**
Open [http://localhost:5555](http://localhost:5555) to view the Flower UI.
Open [http://localhost:8001](http://localhost:8001) to access RedisInsight for visualizing Redis Streams and keys.
Monitor Flower at [http://localhost:5555](http://localhost:5555) and RedisInsight at [http://localhost:8001](http://localhost:8001) to follow task progress and stream telemetry.

## Project Structure
### Shutting Down

```
├── docker-compose.yml
├── Dockerfile
├── requirements.txt
├── app/
│ ├── __init__.py
│ ├── celery_app.py # Celery app configuration
│ ├── tasks.py # Saga step & compensation tasks
│ ├── orchestrator.py # Saga orchestration logic
│ └── api.py # (Optional) FastAPI app
├── memory-bank/ # Project documentation & context
│ ├── projectbrief.md
│ ├── productContext.md
│ ├── systemPatterns.md
│ ├── techContext.md
│ ├── activeContext.md
│ └── progress.md
```bash
docker compose down
```

## Testing the Saga
Use `docker compose down -v` if you wish to remove the Postgres volume.

- Run the orchestrator to start a Saga.
- Simulate failures (by parameters or randomization in tasks) to observe compensation logic.
- Use Flower UI and logs to trace task execution and rollback steps.
- Inspect the PostgreSQL database to verify state changes and rollbacks.
## Testing

## Running Integration Tests
The project ships with pytest-based unit and integration tests.

Automated integration tests are defined in `docker-compose.test.yaml` and can be run in multiple ways:
- **Unit tests:** Exercise handlers, Redis utilities, and the async orchestrator. 【F:tests/unit/flow/test_async_orchestrator.py†L1-L120】【F:tests/unit/test_tasks_request_reply.py†L1-L86】
- **Integration tests:** Interact with Redis Streams to validate end-to-end orchestration. 【F:tests/integration/test_orchestrator_trigger.py†L1-L88】

1. Using the helper script:
Run the suites locally using the development compose file:
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have "/scripts/unit-tests.sh" script to run unit tests


```bash
./scripts/integration-tests.sh
```
```bash
docker compose -f docker-compose.unit.yaml up --build lint unit_test
```

2. With Podman Compose:
For integration testing inside containers:

```bash
podman-compose -f docker-compose.yml -f docker-compose.test.yaml run integration_test
```
```bash
./scripts/integration-tests.sh
```

3. With Docker Compose:
Both commands rely on the dev image defined in `Dockerfile.dev` and enforce linting via Ruff.

```bash
docker-compose -f docker-compose.yml -f docker-compose.test.yaml up --build --exit-code-from integration_test integration_test
```
## Documentation

- [High-Level Architecture](docs/high_level_design.md)
- [Lessons Learned and Implementation Playbook](docs/lessons_learned.md)

The tests are located in the `tests/` directory and are mounted into the integration test container. A minimal sanity test is provided as `tests/test_orchestration.py`.
These documents contain detailed diagrams, step-by-step guidance, and testing considerations for replicating the architecture.

## Contributing

Contributions are welcome! Please follow the Conventional Commits specification for commit messages.
Contributions are welcome. Please open an issue describing the enhancement or bug before submitting a pull request, and follow the existing linting/testing workflow.

## License

This project is licensed under the MIT License.

## Python Linting & Auto-formatting

This project uses [Ruff](https://docs.astral.sh/ruff/) for linting and formatting, managed via pre-commit hooks and containerized workflows.

### Linting and Unit Tests (Containerized)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we still need Linting instruction


1. Build and run lint/unit tests using the dev image and dedicated compose file:

```bash
podman-compose -f docker-compose.unit.yaml up --build lint unit_test
# or
docker-compose -f docker-compose.unit.yaml up --build lint unit_test
```

- `lint` runs Ruff on the entire codebase.
- `unit_test` runs all unit tests with pytest.

2. Dev dependencies are installed automatically in the dev image (`Dockerfile.dev`).

3. Configuration is in `.ruff.toml` and `.pre-commit-config.yaml`.

4. For pre-commit hooks, you may still install them inside the container if you want local staged checks:

```bash
podman-compose exec unit_test pre-commit install
```
This project is licensed under the MIT License. See [LICENSE](LICENSE) for details.

See `docker-compose.unit.yaml` and `Dockerfile.dev` for details.
129 changes: 129 additions & 0 deletions docs/high_level_design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# High-Level Architecture

This document summarizes the architecture of the Collaborative SLAM Exploration proof-of-concept. The PoC demonstrates how a Celery-driven saga orchestrates Redis stream based command handlers to coordinate multi-robot exploration workflows.
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, it isn't Celery driven saga architecture, we have option based on async-await python flow. Two backend options.

also it worth to mention that we use Redis for request/reply pattern.

So basically Celery has the list important role here but more important are 2 patterns: Saga and Request/reply, and for Saga we have 2 options: Celery and pure async-await python. and for Request/reply we have Redis backend.


## System Context

The solution is triggered when an external operator emits a `mission:start` command. The asynchronous command listener routes the event to the saga orchestrator, which dispatches Celery tasks and compensations through Redis. Handlers emit multi-stage progress updates so that mission control can observe state through Flower and the Redis replies stream.
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd start from request/reply first -- it happen that one of the handlers have Saga pattern, but it isn't key moment.
Please focus on patterns, and note that we have 2 backends for Saga and one for request/reply.
And Flower isn't as much important we just happen to have it. But with pure python implementation we might use different approach like OTEL to track events.


```mermaid
%%{init: {'theme': 'neutral'}}%%
flowchart TD
Operator([Mission Operator]):::actor
Control([Mission Control Tools]):::actor
subgraph System[Collaborative SLAM Orchestrator]
Orchestrator[Saga Orchestrator]
CeleryWorker[Celery Task Worker]
RedisBroker[(Redis Streams & Broker)]
Pg[(PostgreSQL State)]
Flower[[Flower Monitoring UI]]
end
Operator -->|Trigger mission:start| Orchestrator
Orchestrator -->|Publish command| RedisBroker
RedisBroker -->|Deliver tasks| CeleryWorker
CeleryWorker -->|Update mission state| Pg
CeleryWorker -->|Emit progress replies| RedisBroker
RedisBroker -->|Surface telemetry| Control
Flower -->|Read task events| RedisBroker
Control -->|Inspect task timeline| Flower
classDef actor fill:#f6f6f6,stroke:#333,stroke-width:1px;
```

### Key Responsibilities

- **Saga Orchestrator** – Builds the Celery canvas that sequences mission tasks and compensations. 【F:app/flows/mission_start_celery/orchestrator.py†L21-L56】
Copy link

Copilot AI Sep 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reference format '【F:app/flows/mission_start_celery/orchestrator.py†L21-L56】' uses non-standard documentation syntax. Consider using standard markdown links or code references for consistency and better tool support.

Suggested change
- **Saga Orchestrator** – Builds the Celery canvas that sequences mission tasks and compensations. 【F:app/flows/mission_start_celery/orchestrator.pyL21-L56
- **Saga Orchestrator** – Builds the Celery canvas that sequences mission tasks and compensations. [app/flows/mission_start_celery/orchestrator.py (lines 21–56)](../app/flows/mission_start_celery/orchestrator.py#L21-L56)

Copilot uses AI. Check for mistakes.
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please do these fixes for all references provided

- **Celery Worker** – Hosts mission tasks and their compensating actions, delegating execution to Redis stream command handlers. 【F:app/flows/mission_start_celery/tasks.py†L13-L87】
- **Command Listener** – Discovers handler modules dynamically and consumes Redis streams to invoke them. 【F:app/commands/listener.py†L15-L113】
- **Redis Utilities** – Provide request/reply behavior and multi-stage progress reporting for handlers. 【F:app/redis_utils/decorators.py†L1-L58】
- **Monitoring** – Flower attaches to the Redis broker to visualize task life cycles.

## Component Structure

The PoC is organized into well-defined layers: orchestration flows, task definitions, command handlers, and Redis utilities. The following component diagram highlights the internal structure.

```mermaid
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, please focus on even driven patterns in this schema.
We don't need add much details about backend -- you can mention that we have 2 options for backend (async/await python, Celery) and Redis consider as tool to stream events. And request_and_reply + multi_stage_reply implement Request/reply pattern for requester, and receiver side.

%%{init: {'theme': 'neutral'}}%%
graph LR
subgraph Flows
Orchestrator[Mission Start Orchestrator]
Tasks[Celery Tasks]
end
subgraph Commands
Listener[Async Command Listener]
Handlers[Mission Command Handlers]
end
subgraph RedisLayer[Redis Utilities]
RequestReply[request_and_reply]
MultiStage[multi_stage_reply]
end
Orchestrator -->|chain.apply_async| Tasks
Tasks -->|request_and_reply| Listener
Listener -->|xreadgroup| Redis[(Redis Streams)]
Handlers -->|multi_stage_reply| Redis
MultiStage --> Handlers
RequestReply --> Redis
Tasks --> Pg[(PostgreSQL)]
Tasks --> Flower
Orchestrator --> Flower
```

### Interaction Notes

1. The orchestrator chains mission steps with compensations using Celery's canvas primitives. 【F:app/flows/mission_start_celery/orchestrator.py†L27-L50】
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can have just a separate section in the document which explain details of Celery implementation without spreading these details across a document

2. Each task translates Celery calls into Redis stream requests handled by async listeners. 【F:app/flows/mission_start_celery/tasks.py†L18-L75】
3. Handlers leverage the `multi_stage_reply` decorator to send start, progress, completed, and failed events. 【F:app/redis_utils/decorators.py†L9-L58】【F:app/commands/handlers/plan_route.py†L1-L25】
4. The listener manages consumer groups and ack loops to scale independently. 【F:app/commands/listener.py†L42-L108】

## Saga Execution Sequence

The diagram below shows how a successful mission start flows through the system. Failure paths trigger compensations by linking Celery error callbacks; this behavior mirrors the same message exchanges with compensation tasks.

```mermaid
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need 2 diagrams:

  1. Request/Reply pattern -- just give description of one command, all commands supposed to be the same: start, progress, completed, failed.
  2. Saga (ignore Celery implementation focus on idea -- which looks simpler in async/await python implementation) -- here you can show all commands (but without providing details on Request/Reply pattern (just in general)

and remember that we happen to call saga pattern in one of the Request/Reply handlers ("start_mission"), but we could have similar saga for other handlers laster as well -- they just point of splitting one command handling in a series of asynchronous commands.

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant Operator
participant Listener as Command Listener
participant Orchestrator as Celery Orchestrator
participant Redis as Redis Streams/Broker
participant Worker as Celery Worker
participant Handler as Async Handler
participant Flower
Operator->>Listener: mission:start event on mission:commands
Listener->>Orchestrator: call run_saga(correlation_id,...)
Orchestrator->>Worker: apply_async(chain)
Worker->>Redis: request_and_reply(resources:allocate)
Redis->>Handler: deliver allocate command
Handler->>Redis: emit start/progress/completed
Redis->>Worker: reply with allocation result
Worker->>Redis: request_and_reply(routing:plan)
Redis->>Handler: deliver plan command
Handler->>Redis: emit progress and completion
Worker->>Flower: task state updates via broker events
Worker->>Redis: request_and_reply(exploration:perform)
Worker->>Redis: request_and_reply(map:integrate)
Worker->>Worker: release_resources compensation at end
Flower->>Operator: visualize saga status
```

### Failure Handling

- Each saga step attaches a `link_error` callback that enqueues compensating Celery tasks when a downstream task fails. 【F:app/flows/mission_start_celery/orchestrator.py†L32-L49】
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each backend has its own compensation tactics, you can mention both

- Handlers surface failure details in the Redis reply stream, enabling upstream retries or abort logic.

## Deployment Considerations

- All services run inside Docker Compose with Redis, PostgreSQL, Celery workers, and Flower containers.
- The orchestrator waits for Redis availability at startup to avoid race conditions. 【F:app/celery_app.py†L13-L30】
- Command listeners register consumer groups idempotently, making scaling straightforward. 【F:app/commands/listener.py†L64-L92】

## Observability

- Flower consumes Celery events from Redis, providing task-level insight.
- Redis reply streams include structured progress percentages and payloads for mission dashboards.
Comment on lines +125 to +128
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say redisinsight provide to us much more important details about each command execution. ideally we should add OTEL + LGTM, but it will happen later


Loading