Skip to content

Commit 2dbf51b

Browse files
authored
Merge pull request #790 from airweave-ai/feat/better-connect-flow-and-validations
feat: better connect flow and validations
2 parents 140fd99 + 9103670 commit 2dbf51b

File tree

220 files changed

+15876
-22270
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

220 files changed

+15876
-22270
lines changed

.cursor/rules/api-layer.mdc

Lines changed: 2 additions & 164 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ api/
2222
```
2323

2424
### Endpoint Categories
25-
- **Public API (SDK)**: `/sources/`, `/collections/`, `/source-connections/`, `/white-labels/`
25+
- **Public API (SDK)**: `/sources/`, `/collections/`, `/source-connections/`,
2626
- **Internal Frontend**: `/users/`, `/organizations/`, `/api-keys/`, `/sync/`, `/dag/`, `/entities/`, `/destinations/`
2727

2828
## Core Components
@@ -81,8 +81,7 @@ async def my_endpoint(
8181
**Request Processing Pipeline:**
8282
1. `add_request_id`: Generates unique request ID for tracing
8383
2. `log_requests`: Logs request details and duration
84-
3. `DynamicCORSMiddleware`: Handles CORS for white-label OAuth
85-
4. `exception_logging_middleware`: Catches and logs unhandled exceptions
84+
3. `exception_logging_middleware`: Catches and logs unhandled exceptions
8685

8786
**CORS Handling:**
8887
- Dynamic origin validation
@@ -144,7 +143,6 @@ collection = await collection_service.create(
144143
- Organization access validated via ApiContext
145144
- API keys encrypted at rest
146145
- Auth fields hidden by default
147-
- White label CORS handling for OAuth
148146
- Request ID tracking for audit trails
149147

150148
## Best Practices
@@ -161,163 +159,3 @@ return await crud.resource.get(db, id=resource.id, ...)
161159

162160
# Return complete object
163161
return await crud.resource.get(db, id=resource.id, ...)
164-
```# Airweave API Layer Rules
165-
166-
## Overview
167-
FastAPI-based HTTP interface providing RESTful endpoints for SDK access and frontend communication, with unified Auth0/API key authentication.
168-
169-
## Architecture
170-
171-
### Structure
172-
```
173-
api/
174-
├── v1/
175-
│ ├── api.py # Main router aggregation
176-
│ └── endpoints/ # Individual endpoint modules
177-
├── deps.py # Dependency injection & auth resolution
178-
├── router.py # Custom TrailingSlashRouter
179-
├── middleware.py # Request processing & CORS
180-
└── auth.py # Auth0 integration & token validation
181-
```
182-
183-
### Endpoint Categories
184-
- **Public API (SDK)**: `/sources/`, `/collections/`, `/source-connections/`, `/white-labels/`
185-
- **Internal Frontend**: `/users/`, `/organizations/`, `/api-keys/`, `/sync/`, `/dag/`, `/entities/`, `/destinations/`
186-
187-
## Core Components
188-
189-
### 1. TrailingSlashRouter
190-
```python
191-
from airweave.api.router import TrailingSlashRouter
192-
router = TrailingSlashRouter() # Handles /endpoint and /endpoint/
193-
```
194-
195-
### 2. API Context (deps.py)
196-
The `get_context` dependency provides unified authentication and request context handling:
197-
198-
```python
199-
@router.get("/")
200-
async def my_endpoint(
201-
ctx: ApiContext = Depends(deps.get_context),
202-
):
203-
# Provides: organization_id, user, auth_method, auth_metadata, logger
204-
```
205-
206-
**ApiContext Resolution Flow:**
207-
1. Check auth method: system (dev), Auth0, or API key
208-
2. Resolve organization ID from header or defaults
209-
3. Validate organization access
210-
4. Create contextual logger with request metadata
211-
5. Return unified `ApiContext` object with injected logger
212-
213-
**Key Features:**
214-
- Supports multiple auth methods simultaneously
215-
- Organization context via `X-Organization-ID` header
216-
- Automatic access validation
217-
- Pre-configured contextual logger injected via dependency injection
218-
- Request tracking with unique request IDs
219-
220-
### 3. Authentication Methods
221-
222-
#### Auth0 Integration (auth.py)
223-
- Production: Real Auth0 with JWT validation
224-
- Development: Mock Auth0 when `AUTH_ENABLED=false`
225-
- Token verification: `get_user_from_token()` for WebSocket/SSE
226-
227-
#### API Key Authentication
228-
- Header: `X-API-Key: <key>`
229-
- Single organization scope
230-
- Expiration validation
231-
- No user context (service-to-service)
232-
233-
#### System Authentication
234-
- Local development with `AUTH_ENABLED=false`
235-
- Uses `FIRST_SUPERUSER` as default user
236-
- Full access to all organizations
237-
238-
### 4. Middleware Stack (middleware.py)
239-
240-
**Request Processing Pipeline:**
241-
1. `add_request_id`: Generates unique request ID for tracing
242-
2. `log_requests`: Logs request details and duration
243-
3. `DynamicCORSMiddleware`: Handles CORS for white-label OAuth
244-
4. `exception_logging_middleware`: Catches and logs unhandled exceptions
245-
246-
**CORS Handling:**
247-
- Dynamic origin validation
248-
- Special handling for OAuth endpoints
249-
- White-label endpoint support
250-
- Credentials support for cross-origin requests
251-
252-
**Exception Handlers:**
253-
- `validation_exception_handler`: Enhanced 422 errors with schema context
254-
- `permission_exception_handler`: 403 for access violations
255-
- `not_found_exception_handler`: 404 for missing resources
256-
257-
## Key Patterns
258-
259-
### Standard Endpoint Structure
260-
```python
261-
@router.post("/", response_model=schemas.ResponseModel)
262-
async def create_resource(
263-
resource_in: schemas.ResourceCreate,
264-
db: AsyncSession = Depends(deps.get_db),
265-
ctx: ApiContext = Depends(deps.get_context),
266-
) -> schemas.ResponseModel:
267-
"""Clear description."""
268-
# Validate → Delegate to CRUD/Service → Return schema
269-
```
270-
271-
### Naming Conventions
272-
- `list_resources()` → GET `/`
273-
- `create_resource()` → POST `/`
274-
- `get_resource()` → GET `/{id}`
275-
- `update_resource()` → PUT `/{id}`
276-
- `delete_resource()` → DELETE `/{id}`
277-
278-
### Error Handling
279-
```python
280-
raise HTTPException(status_code=404, detail="Resource not found")
281-
```
282-
- `NotFoundException` → 404
283-
- `PermissionException` → 403
284-
- `ValidationError` → 422
285-
286-
### CRUD Integration
287-
```python
288-
# ✅ Always delegate to CRUD layer
289-
return await crud.collection.get_multi(db, ctx=ctx)
290-
291-
# ❌ Never query directly in endpoints
292-
```
293-
294-
### Service Layer Usage
295-
Complex operations use service layers:
296-
```python
297-
collection = await collection_service.create(
298-
db, collection_in=collection_in, ctx=ctx
299-
)
300-
```
301-
302-
## Security
303-
- Organization access validated via ApiContext
304-
- API keys encrypted at rest
305-
- Auth fields hidden by default
306-
- White label CORS handling for OAuth
307-
- Request ID tracking for audit trails
308-
309-
## Best Practices
310-
1. Always use Pydantic schemas for request/response
311-
2. Include OpenAPI descriptions and examples
312-
3. Use dependency injection for common functionality
313-
4. Delegate database operations to CRUD layer
314-
5. Handle streaming with `StreamingResponse` for SSE
315-
6. Add background tasks for async operations
316-
7. Use contextual logger from `ctx.logger` (injected via DI)
317-
318-
# Return complete object
319-
return await crud.resource.get(db, id=resource.id, ...)
320-
321-
# Return complete object
322-
return await crud.resource.get(db, id=resource.id, ...)
323-
```

.cursor/rules/auth-providers.mdc

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
---
2+
globs: **/auth_providers/**
3+
alwaysApply: false
4+
---
5+
# Airweave Auth Providers
6+
7+
## Overview
8+
Auth providers enable third-party services (Pipedream, Composio) to supply credentials for source connections, eliminating manual credential management and enabling OAuth at scale for agent developers who are already using these connector providers.
9+
10+
## Architecture
11+
12+
### Core Components
13+
```
14+
platform/auth_providers/
15+
├── _base.py # BaseAuthProvider abstract class
16+
├── pipedream.py # OAuth2 provider (tokens expire in 3600s)
17+
├── composio.py # API key provider
18+
└── klavis.py # Future provider
19+
```
20+
21+
### Provider Registration
22+
```python
23+
@auth_provider(
24+
name="Pipedream",
25+
short_name="pipedream",
26+
auth_type=AuthType.oauth2_with_refresh,
27+
auth_config_class="PipedreamAuthConfig",
28+
config_class="PipedreamConfig"
29+
)
30+
class PipedreamAuthProvider(BaseAuthProvider):
31+
async def create(credentials, config): ...
32+
async def get_creds_for_source(source_short_name, fields): ...
33+
```
34+
35+
### Field Mappings
36+
Providers map between Airweave and external service naming:
37+
```python
38+
FIELD_NAME_MAPPING = {"api_key": "generic_api_key"}
39+
SLUG_NAME_MAPPING = {"google_drive": "googledrive"}
40+
```
41+
42+
## Integration with TokenManager
43+
44+
The `TokenManager` class orchestrates token refresh during long-running syncs:
45+
46+
### Initialization
47+
```python
48+
# SyncFactory creates TokenManager with optional auth provider
49+
token_manager = TokenManager(
50+
db=db,
51+
source_connection=connection,
52+
auth_provider_instance=auth_provider, # Optional
53+
initial_credentials=credentials
54+
)
55+
```
56+
57+
### Refresh Flow
58+
1. **Check refresh capability** (`_determine_refresh_capability()`):
59+
- Direct injection tokens → no refresh
60+
- Auth provider present → always refreshable
61+
- Standard OAuth → attempt refresh
62+
63+
2. **Proactive refresh** (`get_valid_token()`):
64+
- Refreshes tokens every 25 minutes (before 1-hour expiry)
65+
- Uses async lock to prevent concurrent refreshes
66+
- Falls back to stored token if refresh fails
67+
68+
3. **Auth provider refresh** (`_refresh_via_auth_provider()`):
69+
```python
70+
# TokenManager calls auth provider for fresh credentials
71+
fresh_creds = await auth_provider.get_creds_for_source(
72+
source_short_name="slack",
73+
source_auth_config_fields=["access_token", "refresh_token"]
74+
)
75+
# Updates database with new credentials
76+
await crud.integration_credential.update(db, credential, fresh_creds)
77+
```
78+
79+
4. **Fallback OAuth refresh** (`_refresh_via_oauth()`):
80+
- Uses oauth2_service if no auth provider
81+
- Creates separate DB session to avoid transaction issues
82+
83+
### Credential Priority (in SyncFactory)
84+
1. Direct token injection (highest)
85+
2. Auth provider instance
86+
3. Database credentials with OAuth refresh
87+
88+
## Database Schema
89+
- **auth_providers**: Provider definitions from decorators
90+
- **auth_provider_connections**: User's configured providers (encrypted)
91+
- **source_connections**: Links to auth provider via `auth_provider_connection_id`
92+
# Airweave Auth Providers

.cursor/rules/backend-rules.mdc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ alwaysApply: false
1414
### Service Layer
1515
- **Sync Service**: Orchestrates data synchronization between sources and destinations
1616
- **DAG Service**: Manages directed acyclic graphs for data transformations
17-
- **OAuth2 Service**: Handles authentication for (white-labeled) integrations
17+
- **OAuth2 Service**: Handles authentication for integrations
1818
- **Auth0 Service**: Manages user authentication, roles, and organization memberships
1919
- **Stripe Service**: Handles subscription management and billing operations
2020
- **Resend Service**: Manages transactional email delivery and templates

0 commit comments

Comments
 (0)