Skip to content

Commit 0efd1ea

Browse files
authored
Merge pull request #840 from airweave-ai/fix/unified-temporal-scheduling
fix: unified temporal scheduling
2 parents 02a94a7 + 117ea31 commit 0efd1ea

36 files changed

+879
-138
lines changed

.vscode/settings.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,15 @@
99
"${workspaceFolder}/backend"
1010
],
1111
"python.defaultInterpreterPath": "${command:python.interpreterPath}",
12+
"terminal.integrated.env.osx": {
13+
"PYTHONPATH": "${workspaceFolder}/backend:${env:PYTHONPATH}"
14+
},
15+
"terminal.integrated.env.linux": {
16+
"PYTHONPATH": "${workspaceFolder}/backend:${env:PYTHONPATH}"
17+
},
18+
"terminal.integrated.env.windows": {
19+
"PYTHONPATH": "${workspaceFolder}/backend;${env:PYTHONPATH}"
20+
},
1221
"python.testing.pytestPath": "${command:python.interpreterPath}",
1322
"python.testing.autoTestDiscoverOnSaveEnabled": true,
1423
"python.debugOptions": [

backend/airweave/core/source_connection_service.py

Lines changed: 93 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,68 @@ def _get_default_cron_schedule(self, ctx: ApiContext) -> str:
5656
)
5757
return cron_schedule
5858

59+
def _validate_cron_schedule_for_source(
60+
self, cron_schedule: str, source: schemas.Source, ctx: ApiContext
61+
) -> None:
62+
"""Validate CRON schedule based on source capabilities.
63+
64+
Args:
65+
cron_schedule: The CRON expression to validate
66+
source: The source model
67+
ctx: API context
68+
69+
Raises:
70+
HTTPException: If the schedule is invalid for the source
71+
"""
72+
import re
73+
74+
if not cron_schedule:
75+
return
76+
77+
# Parse the CRON expression to check if it's minute-level
78+
# We need to distinguish between:
79+
# - "*/N * * * *" where N < 60 - runs every N minutes (minute-level)
80+
# - "* * * * *" - runs every minute (minute-level)
81+
# - "0 * * * *" - runs at minute 0 of every hour (hourly, not minute-level)
82+
# - "30 2 * * *" - runs at 2:30 AM daily (daily, not minute-level)
83+
84+
# Check for patterns that run more frequently than hourly
85+
# Pattern 1: */N where N < 60 (e.g., */5, */15, */30)
86+
interval_pattern = r"^\*/([1-5]?[0-9]) \* \* \* \*$"
87+
match = re.match(interval_pattern, cron_schedule)
88+
89+
if match:
90+
interval = int(match.group(1))
91+
if interval < 60:
92+
# This is sub-hourly (minute-level)
93+
if not source.supports_continuous:
94+
raise HTTPException(
95+
status_code=400,
96+
detail=f"Source '{source.short_name}' does not support continuous syncs. "
97+
f"Minimum schedule interval is 1 hour (e.g., '0 * * * *' for hourly).",
98+
)
99+
# For continuous sources, sub-hourly is allowed
100+
ctx.logger.info(
101+
f"Source '{source.short_name}' supports continuous syncs, "
102+
f"allowing minute-level schedule: {cron_schedule}"
103+
)
104+
return
105+
106+
# Pattern 2: * * * * * (every minute)
107+
if cron_schedule == "* * * * *":
108+
if not source.supports_continuous:
109+
raise HTTPException(
110+
status_code=400,
111+
detail=f"Source '{source.short_name}' does not support continuous syncs. "
112+
f"Minimum schedule interval is 1 hour (e.g., '0 * * * *' for hourly).",
113+
)
114+
ctx.logger.info(
115+
f"Source '{source.short_name}' supports continuous syncs, "
116+
f"allowing every-minute schedule: {cron_schedule}"
117+
)
118+
119+
# All other patterns (including "0 * * * *" for hourly) are allowed
120+
59121
"""Clean service with automatic auth method inference.
60122
61123
Key improvements:
@@ -225,11 +287,15 @@ async def update(
225287
ctx: ApiContext,
226288
) -> SourceConnection:
227289
"""Update a source connection."""
290+
# First check if the source connection exists
228291
source_conn = await crud.source_connection.get(db, id=id, ctx=ctx)
229292
if not source_conn:
230293
raise HTTPException(status_code=404, detail="Source connection not found")
231294

232295
async with UnitOfWork(db) as uow:
296+
# Re-fetch the source_conn within the UoW session to avoid session mismatch
297+
source_conn = await crud.source_connection.get(uow.session, id=id, ctx=ctx)
298+
233299
# Update fields
234300
update_data = obj_in.model_dump(exclude_unset=True)
235301

@@ -242,20 +308,32 @@ async def update(
242308
del update_data["config"]
243309

244310
# Handle schedule update
245-
if "schedule" in update_data and update_data["schedule"]:
311+
if (
312+
"schedule" in update_data and update_data["schedule"]
313+
): # TODO: only if actually different
246314
if source_conn.sync_id:
315+
new_cron = update_data["schedule"].get("cron")
316+
if new_cron:
317+
# Get the source to validate schedule
318+
source = await self._get_and_validate_source(
319+
uow.session, source_conn.short_name
320+
)
321+
self._validate_cron_schedule_for_source(new_cron, source, ctx)
247322
await self._update_sync_schedule(
248323
uow.session,
249324
source_conn.sync_id,
250-
update_data["schedule"].get("cron"),
325+
new_cron,
251326
ctx,
252327
uow,
253328
)
254329
del update_data["schedule"]
255330

256331
# Handle credential update (direct auth only)
257332
if "credentials" in update_data:
258-
auth_method = self._determine_auth_method(source_conn)
333+
# Use the schema function that works with database models
334+
from airweave.schemas.source_connection import determine_auth_method
335+
336+
auth_method = determine_auth_method(source_conn)
259337
if auth_method != AuthenticationMethod.DIRECT:
260338
raise HTTPException(
261339
status_code=400,
@@ -365,6 +443,9 @@ async def _create_with_direct_auth(
365443
if cron_schedule is None:
366444
cron_schedule = self._get_default_cron_schedule(ctx)
367445

446+
# Validate the schedule for this source
447+
self._validate_cron_schedule_for_source(cron_schedule, source, ctx)
448+
368449
sync, sync_job = await self._create_sync_without_schedule(
369450
uow.session,
370451
obj_in.name,
@@ -400,6 +481,7 @@ async def _create_with_direct_auth(
400481
cron_schedule=cron_schedule,
401482
db=uow.session,
402483
ctx=ctx,
484+
uow=uow,
403485
)
404486

405487
# Convert to schemas while still in session
@@ -582,6 +664,9 @@ async def _create_with_oauth_token(
582664
if cron_schedule is None:
583665
cron_schedule = self._get_default_cron_schedule(ctx)
584666

667+
# Validate the schedule for this source
668+
self._validate_cron_schedule_for_source(cron_schedule, source, ctx)
669+
585670
sync, sync_job = await self._create_sync_without_schedule(
586671
uow.session,
587672
obj_in.name,
@@ -617,6 +702,7 @@ async def _create_with_oauth_token(
617702
cron_schedule=cron_schedule,
618703
db=uow.session,
619704
ctx=ctx,
705+
uow=uow,
620706
)
621707

622708
# Convert to schemas while still in session
@@ -728,6 +814,9 @@ async def _create_with_auth_provider(
728814
if cron_schedule is None:
729815
cron_schedule = self._get_default_cron_schedule(ctx)
730816

817+
# Validate the schedule for this source
818+
self._validate_cron_schedule_for_source(cron_schedule, source, ctx)
819+
731820
sync, sync_job = await self._create_sync_without_schedule(
732821
uow.session,
733822
obj_in.name,
@@ -765,6 +854,7 @@ async def _create_with_auth_provider(
765854
cron_schedule=cron_schedule,
766855
db=uow.session,
767856
ctx=ctx,
857+
uow=uow,
768858
)
769859

770860
# Convert to schemas while still in session

backend/airweave/core/source_connection_service_helpers.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -800,9 +800,9 @@ async def update_sync_schedule(
800800
self,
801801
db: AsyncSession,
802802
sync_id: UUID,
803-
cron_schedule: str,
803+
cron_schedule: Optional[str],
804804
ctx: ApiContext,
805-
uow: Any,
805+
uow: UnitOfWork,
806806
) -> None:
807807
"""Update sync schedule in database and Temporal."""
808808
sync = await crud.sync.get(db, id=sync_id, ctx=ctx)
@@ -811,15 +811,23 @@ async def update_sync_schedule(
811811
sync_update = schemas.SyncUpdate(cron_schedule=cron_schedule)
812812
await crud.sync.update(db, db_obj=sync, obj_in=sync_update, ctx=ctx, uow=uow)
813813

814-
# Update in Temporal if a schedule exists
814+
# Update in Temporal
815815
from airweave.platform.temporal.schedule_service import temporal_schedule_service
816816

817-
await temporal_schedule_service.create_or_update_schedule(
818-
sync_id=sync_id,
819-
cron_schedule=cron_schedule,
820-
db=db,
821-
ctx=ctx,
822-
)
817+
if cron_schedule is None:
818+
# If cron_schedule is None, delete the Temporal schedule
819+
await temporal_schedule_service.delete_all_schedules_for_sync(
820+
sync_id=sync_id, db=db, ctx=ctx
821+
)
822+
else:
823+
# Otherwise create or update the schedule
824+
await temporal_schedule_service.create_or_update_schedule(
825+
sync_id=sync_id,
826+
cron_schedule=cron_schedule,
827+
db=db,
828+
ctx=ctx,
829+
uow=uow,
830+
)
823831

824832
async def update_auth_fields(
825833
self,
@@ -1184,6 +1192,7 @@ async def complete_oauth_connection(
11841192
cron_schedule=cron_schedule,
11851193
db=uow.session,
11861194
ctx=ctx,
1195+
uow=uow,
11871196
)
11881197

11891198
# Mark init session complete

backend/airweave/core/sync_service.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ async def _create_and_run_with_uow(
8484
cron_schedule=sync_in.cron_schedule,
8585
db=db,
8686
ctx=ctx,
87+
uow=uow,
8788
)
8889

8990
# Run immediately if requested

backend/airweave/main.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
and unhandled exceptions.
55
"""
66

7+
import os
78
import subprocess
89
from contextlib import asynccontextmanager
910

@@ -45,12 +46,22 @@
4546

4647
@asynccontextmanager
4748
async def lifespan(app: FastAPI):
48-
"""Lifespan context manager for startup and shutdown events."""
49-
# Startup
49+
"""Lifespan context manager for startup and shutdown events.
50+
51+
Runs alembic migrations and syncs platform components.
52+
"""
5053
async with AsyncSessionLocal() as db:
5154
if settings.RUN_ALEMBIC_MIGRATIONS:
5255
logger.info("Running alembic migrations...")
53-
subprocess.run(["alembic", "upgrade", "head"], check=True)
56+
env = os.environ.copy()
57+
backend_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
58+
env["PYTHONPATH"] = backend_dir
59+
subprocess.run(
60+
["alembic", "upgrade", "head"],
61+
check=True,
62+
cwd=backend_dir,
63+
env=env,
64+
)
5465
if settings.RUN_DB_SYNC:
5566
# Ensure all FileEntity subclasses have their parent and chunk models created
5667
ensure_file_entity_models()

backend/airweave/models/source.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ class Source(Base):
3333
output_entity_definition_ids = Column(JSON, nullable=False)
3434
config_schema = Column(JSON, nullable=True) # JSON Schema for configuration
3535
labels: Mapped[list[str]] = mapped_column(JSON, nullable=True, default=list)
36+
supports_continuous: Mapped[bool] = mapped_column(
37+
Boolean, default=False, nullable=False, server_default="false"
38+
)
3639

3740
__table_args__ = (UniqueConstraint("name", "organization_id", name="uq_source_name_org"),)
3841

backend/airweave/models/sync.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ class Sync(OrganizationBase, UserMixin):
3333
)
3434
temporal_schedule_id: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
3535
sync_type: Mapped[str] = mapped_column(String(50), default="full")
36-
minute_level_cron_schedule: Mapped[Optional[str]] = mapped_column(String(100), nullable=True)
3736
sync_metadata: Mapped[Optional[dict]] = mapped_column(JSON, nullable=True)
3837

3938
jobs: Mapped[list["SyncJob"]] = relationship(
@@ -95,8 +94,9 @@ def delete_temporal_schedules_after_sync_delete(mapper, connection, target):
9594
import asyncio
9695

9796
schedule_ids = [
98-
f"minute-sync-{target.id}",
99-
f"daily-cleanup-{target.id}",
97+
f"sync-{target.id}", # Regular schedule
98+
f"minute-sync-{target.id}", # Minute-level schedule
99+
f"daily-cleanup-{target.id}", # Daily cleanup schedule
100100
]
101101

102102
async def _cleanup():

backend/airweave/platform/db_sync.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,7 @@ async def _sync_sources(
374374
class_name=source_class.__name__,
375375
output_entity_definition_ids=output_entity_ids,
376376
labels=getattr(source_class, "_labels", []),
377+
supports_continuous=getattr(source_class, "_supports_continuous", False),
377378
)
378379
source_definitions.append(source_def)
379380

backend/airweave/platform/decorators.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ def source(
1818
auth_config_class: Optional[Type[BaseModel]] = None,
1919
config_class: Optional[Type[BaseModel]] = None,
2020
labels: Optional[List[str]] = None,
21+
supports_continuous: bool = False,
2122
) -> Callable[[type], type]:
2223
"""Enhanced source decorator with OAuth type tracking.
2324
@@ -30,6 +31,7 @@ def source(
3031
auth_config_class: Pydantic model for auth configuration (for DIRECT auth only)
3132
config_class: Pydantic model for source configuration
3233
labels: Tags for categorization (e.g., "CRM", "Database")
34+
supports_continuous: Whether source supports cursor-based continuous syncing (default False)
3335
3436
Example:
3537
# OAuth source (no auth config)
@@ -66,6 +68,7 @@ def decorator(cls: type) -> type:
6668
cls._auth_config_class = auth_config_class
6769
cls._config_class = config_class
6870
cls._labels = labels or []
71+
cls._supports_continuous = supports_continuous
6972

7073
# Add validation method if not present
7174
if not hasattr(cls, "validate"):

backend/airweave/platform/sources/asana.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
auth_config_class=None,
3333
config_class="AsanaConfig",
3434
labels=["Project Management"],
35+
supports_continuous=False,
3536
)
3637
class AsanaSource(BaseSource):
3738
"""Asana source connector integrates with the Asana API to extract and synchronize data.

0 commit comments

Comments
 (0)