Skip to content

Commit 97c0a4f

Browse files
authored
Merge pull request #1071 from airweave-ai/feat/publish_last_timestamp
Fix: Prevent cleanup job from cancelling healthy incremental syncs with no changes
2 parents 9daf110 + d69d214 commit 97c0a4f

File tree

4 files changed

+95
-21
lines changed

4 files changed

+95
-21
lines changed

.cursor/rules/sync-architecture.mdc

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,13 +161,21 @@ if all_edges_go_to_destinations:
161161
- **Atomic Counters**: Thread-safe increment operations with async locks
162162
- **Threshold Publishing**: Publishes updates every N operations (default: 3)
163163
- **Entity Tracking**: Maintains count of unique entities by type
164-
- **Redis Integration**: Publishes to `sync_job:{job_id}` channels
164+
- **Redis Integration**: Publishes to `sync_job:{job_id}` channels and stores snapshots for cleanup job
165165

166166
**Statistics Tracked**:
167167
- `inserted`, `updated`, `deleted`: Destination operations
168168
- `kept`: Unchanged entities (hash match)
169169
- `skipped`: Failed or filtered entities
170170
- `entities_encountered`: Count by entity type
171+
- `status`: Final job status (None if still in progress)
172+
- `last_update_timestamp`: ISO timestamp of last update (for cleanup job stuck detection)
173+
174+
**Redis Snapshot Storage**:
175+
- Each progress update stores a snapshot in Redis with key `sync_progress_snapshot:{job_id}`
176+
- Snapshot includes `last_update_timestamp` for cleanup job to detect stuck syncs
177+
- 30-minute TTL to automatically clean up completed syncs
178+
- Used by `cleanup_stuck_sync_jobs_activity` to identify RUNNING jobs with no recent activity
171179

172180
**Critical Fix**:
173181
```python
@@ -247,7 +255,10 @@ EntityProcessor.process()
247255
Every operation increments counter
248256
├── Async lock ensures thread safety
249257
├── Threshold check (every 3 ops)
250-
├── Publish to Redis if threshold met
258+
├── Publish to Redis pubsub if threshold met
259+
│ ├── Real-time updates to subscribers
260+
│ └── Snapshot stored in Redis (sync_progress_snapshot:{job_id})
261+
│ └── Includes last_update_timestamp for cleanup job detection
251262
└── Subscribers receive real-time updates
252263
```
253264

backend/airweave/platform/sync/pubsub.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,22 @@ async def increment(self, stat_name: str, amount: int = 1) -> None:
7272
self._last_status_update = total_ops
7373

7474
async def _publish(self) -> None:
75-
"""Publish current progress."""
76-
await core_pubsub.publish("sync_job", self.job_id, self.stats.model_dump())
75+
"""Publish current progress to pubsub and store snapshot for cleanup job."""
76+
from datetime import datetime, timezone
77+
from airweave.core.redis_client import redis_client
78+
import json
79+
80+
self.stats.last_update_timestamp = datetime.now(timezone.utc).isoformat()
81+
data = self.stats.model_dump()
82+
83+
await core_pubsub.publish("sync_job", self.job_id, data)
84+
85+
snapshot_key = f"sync_progress_snapshot:{self.job_id}"
86+
await redis_client.client.setex(
87+
snapshot_key,
88+
1800, # 30 min TTL
89+
json.dumps(data),
90+
)
7791

7892
async def finalize(self, status: SyncJobStatus) -> None:
7993
"""Publish final progress with the sync job status.

backend/airweave/platform/temporal/activities/sync.py

Lines changed: 62 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ async def cleanup_stuck_sync_jobs_activity() -> None:
457457
# Calculate cutoff times
458458
now = utc_now_naive()
459459
cancelling_pending_cutoff = now - timedelta(minutes=3)
460-
running_cutoff = now - timedelta(minutes=10)
460+
running_cutoff = now - timedelta(minutes=5)
461461

462462
stuck_job_count = 0
463463
cancelled_count = 0
@@ -467,6 +467,8 @@ async def cleanup_stuck_sync_jobs_activity() -> None:
467467
async with get_db_context() as db:
468468
# Import CRUD layer inside to avoid sandbox issues
469469
from airweave import crud
470+
from airweave.core.redis_client import redis_client
471+
import json
470472

471473
# Query 1: Find CANCELLING/PENDING jobs stuck for > 3 minutes
472474
cancelling_pending_jobs = await crud.sync_job.get_stuck_jobs_by_status(
@@ -480,35 +482,78 @@ async def cleanup_stuck_sync_jobs_activity() -> None:
480482
f"stuck for > 3 minutes"
481483
)
482484

483-
# Query 2: Find RUNNING jobs > 10 minutes old
485+
# Query 2: Find RUNNING jobs > 5 minutes old
484486
running_jobs = await crud.sync_job.get_stuck_jobs_by_status(
485487
db=db,
486488
status=[SyncJobStatus.RUNNING.value],
487489
started_before=running_cutoff,
488490
)
489491

490-
logger.info(f"Found {len(running_jobs)} RUNNING jobs that started > 10 minutes ago")
492+
logger.info(f"Found {len(running_jobs)} RUNNING jobs that started > 5 minutes ago")
491493

492-
# Check which RUNNING jobs have no recent entity updates
494+
# Check which RUNNING jobs have no recent activity (via Redis snapshot)
493495
stuck_running_jobs = []
494496
for job in running_jobs:
495-
# Get the most recent entity created_at for this job using CRUD
496-
latest_entity_time = await crud.entity.get_latest_entity_time_for_job(
497-
db=db,
498-
sync_job_id=job.id,
499-
)
497+
job_id_str = str(job.id)
498+
snapshot_key = f"sync_progress_snapshot:{job_id_str}"
499+
500+
try:
501+
snapshot_json = await redis_client.client.get(snapshot_key)
502+
503+
if not snapshot_json:
504+
logger.debug(f"No snapshot for job {job_id_str} - skipping")
505+
continue
506+
507+
snapshot = json.loads(snapshot_json)
508+
last_update_str = snapshot.get("last_update_timestamp")
509+
510+
if not last_update_str:
511+
# Old snapshot without timestamp - fall back to DB check
512+
latest_entity_time = await crud.entity.get_latest_entity_time_for_job(
513+
db=db, sync_job_id=job.id
514+
)
515+
if latest_entity_time is None or latest_entity_time < running_cutoff:
516+
stuck_running_jobs.append(job)
517+
continue
518+
519+
# Check if activity is recent
520+
from datetime import datetime
521+
522+
last_update = datetime.fromisoformat(last_update_str)
523+
# Convert to naive datetime to match running_cutoff (from utc_now_naive)
524+
if last_update.tzinfo is not None:
525+
last_update = last_update.replace(tzinfo=None)
526+
527+
if last_update < running_cutoff:
528+
total_ops = sum(
529+
[
530+
snapshot.get("inserted", 0),
531+
snapshot.get("updated", 0),
532+
snapshot.get("deleted", 0),
533+
snapshot.get("kept", 0),
534+
snapshot.get("skipped", 0),
535+
]
536+
)
537+
stuck_running_jobs.append(job)
538+
logger.info(
539+
f"Job {job_id_str} last activity at {last_update} "
540+
f"({total_ops} total ops) - marking as stuck"
541+
)
542+
else:
543+
logger.debug(f"Job {job_id_str} active at {last_update} - healthy")
500544

501-
# Consider stuck if no entities or latest entity is > 10 minutes old
502-
if latest_entity_time is None or latest_entity_time < running_cutoff:
503-
stuck_running_jobs.append(job)
504-
logger.info(
505-
f"Job {job.id} has no entity updates since "
506-
f"{latest_entity_time or 'job start'} - marking as stuck"
545+
except Exception as e:
546+
logger.warning(
547+
f"Error checking job {job_id_str}: {e}, falling back to DB check"
548+
)
549+
latest_entity_time = await crud.entity.get_latest_entity_time_for_job(
550+
db=db, sync_job_id=job.id
507551
)
552+
if latest_entity_time is None or latest_entity_time < running_cutoff:
553+
stuck_running_jobs.append(job)
508554

509555
logger.info(
510-
f"Found {len(stuck_running_jobs)} RUNNING jobs with no entity "
511-
f"updates in last 10 minutes"
556+
f"Found {len(stuck_running_jobs)} RUNNING jobs with no activity in last 5 minutes"
512557
)
513558

514559
# Combine all stuck jobs

backend/airweave/schemas/sync_pubsub.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ class SyncProgressUpdate(BaseModel):
2929
)
3030
# Status field to track the final state - None means still in progress
3131
status: Optional[SyncJobStatus] = None
32+
# Timestamp for stuck job detection
33+
last_update_timestamp: Optional[str] = Field(
34+
None, description="ISO timestamp of last update (for cleanup job stuck detection)"
35+
)
3236

3337

3438
class EntityStateUpdate(BaseModel):

0 commit comments

Comments
 (0)