Skip to content

Commit 7c1480f

Browse files
authored
Merge pull request #1060 from airweave-ai/fix/atlassian_token_refresh
Fix: Atlassian OAuth rotating refresh tokens + Monke
2 parents f482a2d + 35a0020 commit 7c1480f

File tree

9 files changed

+276
-228
lines changed

9 files changed

+276
-228
lines changed

backend/airweave/platform/auth/oauth2_service.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -539,22 +539,22 @@ def _prepare_token_request(
539539
"refresh_token": refresh_token,
540540
}
541541

542-
# Include scope in refresh request if available
543-
# This ensures the refreshed token maintains the same permissions
544-
if hasattr(integration_config, "scope") and integration_config.scope:
545-
payload["scope"] = integration_config.scope
546-
logger.debug(f"Including scope in token refresh: {integration_config.scope}")
547-
548-
# Include additional_frontend_params (e.g., audience for Atlassian)
549-
# These are often required for the refreshed token to access the same resources
542+
# IMPORTANT: For WITH_ROTATING_REFRESH OAuth (Jira, Confluence, Microsoft),
543+
# do NOT include scope parameter - Atlassian/Microsoft reject it with 403
544+
# For WITH_REFRESH OAuth (Google, Slack), scope should be included
545+
# See: https://developer.atlassian.com/cloud/jira/platform/oauth-2-3lo-apps/#refresh-a-token
546+
oauth_type = getattr(integration_config, "oauth_type", None)
550547
if (
551-
hasattr(integration_config, "additional_frontend_params")
552-
and integration_config.additional_frontend_params
548+
oauth_type == "with_refresh"
549+
and hasattr(integration_config, "scope")
550+
and integration_config.scope
553551
):
554-
payload.update(integration_config.additional_frontend_params)
552+
payload["scope"] = integration_config.scope
555553
logger.debug(
556-
f"Including additional params in token refresh: {list(integration_config.additional_frontend_params.keys())}"
554+
f"Including scope in token refresh (oauth_type=with_refresh): {integration_config.scope}"
557555
)
556+
elif oauth_type == "with_rotating_refresh":
557+
logger.debug(f"Skipping scope in token refresh (oauth_type=with_rotating_refresh)")
558558

559559
if integration_config.client_credential_location == "header":
560560
encoded_credentials = OAuth2Service._encode_client_credentials(client_id, client_secret)

backend/airweave/platform/sources/confluence.py

Lines changed: 86 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020
from typing import Any, AsyncGenerator, Dict, List, Optional
2121

2222
import httpx
23+
import tenacity
24+
from tenacity import stop_after_attempt, wait_exponential
2325

26+
from airweave.core.exceptions import TokenRefreshError
2427
from airweave.core.logging import logger
2528
from airweave.platform.decorators import source
2629
from airweave.platform.entities._base import BaseEntity, Breadcrumb
@@ -45,7 +48,7 @@
4548
AuthenticationMethod.OAUTH_TOKEN,
4649
AuthenticationMethod.AUTH_PROVIDER,
4750
],
48-
oauth_type=OAuthType.WITH_REFRESH,
51+
oauth_type=OAuthType.WITH_ROTATING_REFRESH,
4952
auth_config_class=None,
5053
config_class="ConfluenceConfig",
5154
labels=["Knowledge Base", "Documentation"],
@@ -61,70 +64,58 @@ class ConfluenceSource(BaseSource):
6164
extracts embedded files and attachments from page content.
6265
"""
6366

64-
@staticmethod
65-
async def _get_accessible_resources(access_token: str) -> list[dict]:
67+
async def _get_accessible_resources(self) -> list[dict]:
6668
"""Get the list of accessible Atlassian resources for this token.
6769
68-
Args:
69-
access_token: The OAuth access token
70-
71-
Returns:
72-
list[dict]: List of accessible resources, each containing 'id' and 'url' keys
70+
Uses token manager to ensure fresh access token.
7371
"""
72+
self.logger.info("Retrieving accessible Atlassian resources")
73+
74+
# Get fresh access token (will refresh if needed)
75+
access_token = await self.get_access_token()
76+
77+
if not access_token:
78+
self.logger.error("Cannot get accessible resources: access token is None")
79+
return []
80+
7481
async with httpx.AsyncClient() as client:
7582
headers = {"Authorization": f"Bearer {access_token}", "Accept": "application/json"}
7683
try:
84+
self.logger.debug(
85+
"Making request to https://api.atlassian.com/oauth/token/accessible-resources"
86+
)
7787
response = await client.get(
7888
"https://api.atlassian.com/oauth/token/accessible-resources", headers=headers
7989
)
8090
response.raise_for_status()
81-
return response.json()
91+
resources = response.json()
92+
self.logger.info(f"Found {len(resources)} accessible Atlassian resources")
93+
self.logger.debug(f"Resources: {resources}")
94+
return resources
95+
except httpx.HTTPStatusError as e:
96+
self.logger.error(
97+
f"HTTP error getting accessible resources: {e.response.status_code} - {e.response.text}"
98+
)
99+
return []
82100
except Exception as e:
83-
logger.error(f"Error getting accessible resources: {str(e)}")
101+
self.logger.error(f"Error getting accessible resources: {str(e)}", exc_info=True)
84102
return []
85103

86-
@staticmethod
87-
async def _extract_cloud_id(access_token: str) -> tuple[str, str]:
88-
"""Extract the Atlassian Cloud ID from OAuth 2.0 accessible-resources.
89-
90-
Args:
91-
access_token: The OAuth access token
92-
93-
Returns:
94-
cloud_id (str): The cloud instance ID
95-
"""
96-
try:
97-
resources = await ConfluenceSource._get_accessible_resources(access_token)
98-
99-
if not resources:
100-
logger.warning("No accessible resources found")
101-
return ""
102-
103-
# Use the first available resource
104-
# In most cases, there will only be one resource
105-
resource = resources[0]
106-
cloud_id = resource.get("id", "")
107-
108-
if not cloud_id:
109-
logger.warning("Missing ID in accessible resources")
110-
return cloud_id
111-
112-
except Exception as e:
113-
logger.error(f"Error extracting cloud ID: {str(e)}")
114-
return ""
115-
116104
@classmethod
117105
async def create(
118106
cls, access_token: str, config: Optional[Dict[str, Any]] = None
119107
) -> "ConfluenceSource":
120108
"""Create a new Confluence source instance."""
121109
instance = cls()
122110
instance.access_token = access_token
123-
instance.cloud_id = await cls._extract_cloud_id(access_token)
124-
instance.base_url = f"https://api.atlassian.com/ex/confluence/{instance.cloud_id}"
125-
logger.info(f"Initialized Confluence source with base URL: {instance.base_url}")
126111
return instance
127112

113+
@tenacity.retry(
114+
stop=stop_after_attempt(3),
115+
wait=wait_exponential(multiplier=1, min=2, max=10),
116+
retry=tenacity.retry_if_exception_type(httpx.HTTPStatusError),
117+
reraise=True,
118+
)
128119
async def _get_with_auth(self, client: httpx.AsyncClient, url: str) -> Any:
129120
"""Make an authenticated GET request to the Confluence REST API using the provided URL.
130121
@@ -137,56 +128,40 @@ async def _get_with_auth(self, client: httpx.AsyncClient, url: str) -> Any:
137128
"X-Atlassian-Token": "no-check", # Required for CSRF protection
138129
}
139130

140-
# Add cloud instance ID if available
141-
if self.cloud_id:
142-
headers["X-Cloud-ID"] = self.cloud_id
143-
144-
self.logger.debug(f"Making request to {url} with headers: {headers}")
145-
131+
self.logger.debug(f"Request headers: {headers}")
146132
try:
147133
response = await client.get(url, headers=headers)
134+
response.raise_for_status()
135+
return response.json()
148136
except httpx.HTTPStatusError as e:
149137
# Handle 401 Unauthorized - try refreshing token
150138
if e.response.status_code == 401 and self._token_manager:
151-
self.logger.info("Received 401 error, attempting to refresh token")
152-
refreshed = await self._token_manager.refresh_on_unauthorized()
153-
154-
if refreshed:
155-
# Retry with new token
156-
new_access_token = await self.get_access_token()
157-
headers["Authorization"] = f"Bearer {new_access_token}"
158-
self.logger.info("Retrying request with refreshed token")
159-
response = await client.get(url, headers=headers)
160-
else:
161-
raise
162-
else:
163-
raise
164-
165-
if not response.is_success:
166-
self.logger.error(f"Request failed with status {response.status_code}")
167-
self.logger.error(f"Response headers: {dict(response.headers)}")
168-
self.logger.error(f"Response body: {response.text}")
169-
170-
# Special handling for scope-related errors
171-
if response.status_code == 401:
172-
error_body = response.json() if response.text else {}
173-
error_message = error_body.get("message", "")
174-
175-
if (
176-
"scope" in error_message.lower()
177-
or "x-failure-category" in response.headers
178-
and "SCOPE" in response.headers.get("x-failure-category", "")
179-
):
180-
self.logger.error(
181-
"OAuth scope error. The token doesn't have the required permissions."
182-
)
139+
self.logger.warning(
140+
"🔐 Received 401 Unauthorized from Confluence - attempting token refresh"
141+
)
142+
try:
143+
refreshed = await self._token_manager.refresh_on_unauthorized()
144+
145+
if refreshed:
146+
# Retry with new token (the retry decorator will handle this)
147+
self.logger.info("✅ Token refreshed successfully, retrying request")
148+
raise # Let tenacity retry with the refreshed token
149+
except TokenRefreshError as refresh_error:
150+
# Token refresh failed - provide clear error message
183151
self.logger.error(
184-
"Please verify that your OAuth app has the correct scopes configured."
152+
f"❌ Token refresh failed: {str(refresh_error)}. "
153+
f"User may need to reconnect their Confluence account."
185154
)
186-
raise ValueError(f"OAuth scope error: {error_message}.")
155+
# Re-raise with clearer context
156+
raise TokenRefreshError(
157+
f"Failed to refresh Confluence access token: {str(refresh_error)}. "
158+
f"Please reconnect your Confluence account."
159+
) from refresh_error
187160

188-
response.raise_for_status()
189-
return response.json()
161+
# Log the error details
162+
self.logger.error(f"Request failed: {str(e)}")
163+
self.logger.error(f"Response body: {e.response.text}")
164+
raise
190165

191166
async def _generate_space_entities(
192167
self, client: httpx.AsyncClient
@@ -469,35 +444,36 @@ async def _generate_folder_entities(
469444
url = f"{self.base_url}{next_link}" if next_link else None
470445

471446
async def validate(self) -> bool:
472-
"""Verify Confluence OAuth2 token and site access by pinging a lightweight endpoint."""
473-
# Ensure we have a cloud_id/base_url; if missing, try to resolve from the current token.
474-
if not getattr(self, "cloud_id", None) or not getattr(self, "base_url", None):
475-
token = await self.get_access_token()
476-
if not token:
477-
self.logger.error("Confluence validation failed: no access token available.")
478-
return False
479-
cloud_id = await self._extract_cloud_id(token)
480-
if not cloud_id:
481-
self.logger.error(
482-
"Confluence validation failed: unable to resolve Atlassian cloud ID."
483-
)
447+
"""Verify Confluence OAuth2 token by calling accessible-resources endpoint.
448+
449+
A successful call proves the token is valid and has necessary scopes.
450+
Cloud ID extraction happens lazily during sync.
451+
"""
452+
try:
453+
resources = await self._get_accessible_resources()
454+
455+
if not resources:
456+
self.logger.error("Confluence validation failed: no accessible resources found")
484457
return False
485-
self.cloud_id = cloud_id
486-
self.base_url = f"https://api.atlassian.com/ex/confluence/{cloud_id}"
487-
488-
# Simple authorized ping against spaces (validates scopes and site reachability).
489-
return await self._validate_oauth2(
490-
ping_url=f"{self.base_url}/wiki/api/v2/spaces?limit=1",
491-
headers={
492-
"Accept": "application/json",
493-
"X-Atlassian-Token": "no-check",
494-
"X-Cloud-ID": self.cloud_id,
495-
},
496-
timeout=10.0,
497-
)
458+
459+
self.logger.info("✅ Confluence validation successful")
460+
return True
461+
462+
except Exception as e:
463+
self.logger.error(f"Confluence validation failed: {str(e)}")
464+
return False
498465

499466
async def generate_entities(self) -> AsyncGenerator[BaseEntity, None]: # noqa: C901
500467
"""Generate all Confluence content."""
468+
self.logger.info("Starting Confluence entity generation process")
469+
470+
resources = await self._get_accessible_resources()
471+
if not resources:
472+
raise ValueError("No accessible resources found")
473+
cloud_id = resources[0]["id"]
474+
475+
self.base_url = f"https://api.atlassian.com/ex/confluence/{cloud_id}"
476+
self.logger.debug(f"Base URL set to: {self.base_url}")
501477
async with httpx.AsyncClient() as client:
502478
# 1) Yield all spaces (top-level)
503479
async for space_entity in self._generate_space_entities(client):

0 commit comments

Comments
 (0)