2020from typing import Any , AsyncGenerator , Dict , List , Optional
2121
2222import httpx
23+ import tenacity
24+ from tenacity import stop_after_attempt , wait_exponential
2325
26+ from airweave .core .exceptions import TokenRefreshError
2427from airweave .core .logging import logger
2528from airweave .platform .decorators import source
2629from airweave .platform .entities ._base import BaseEntity , Breadcrumb
4548 AuthenticationMethod .OAUTH_TOKEN ,
4649 AuthenticationMethod .AUTH_PROVIDER ,
4750 ],
48- oauth_type = OAuthType .WITH_REFRESH ,
51+ oauth_type = OAuthType .WITH_ROTATING_REFRESH ,
4952 auth_config_class = None ,
5053 config_class = "ConfluenceConfig" ,
5154 labels = ["Knowledge Base" , "Documentation" ],
@@ -61,70 +64,58 @@ class ConfluenceSource(BaseSource):
6164 extracts embedded files and attachments from page content.
6265 """
6366
64- @staticmethod
65- async def _get_accessible_resources (access_token : str ) -> list [dict ]:
67+ async def _get_accessible_resources (self ) -> list [dict ]:
6668 """Get the list of accessible Atlassian resources for this token.
6769
68- Args:
69- access_token: The OAuth access token
70-
71- Returns:
72- list[dict]: List of accessible resources, each containing 'id' and 'url' keys
70+ Uses token manager to ensure fresh access token.
7371 """
72+ self .logger .info ("Retrieving accessible Atlassian resources" )
73+
74+ # Get fresh access token (will refresh if needed)
75+ access_token = await self .get_access_token ()
76+
77+ if not access_token :
78+ self .logger .error ("Cannot get accessible resources: access token is None" )
79+ return []
80+
7481 async with httpx .AsyncClient () as client :
7582 headers = {"Authorization" : f"Bearer { access_token } " , "Accept" : "application/json" }
7683 try :
84+ self .logger .debug (
85+ "Making request to https://api.atlassian.com/oauth/token/accessible-resources"
86+ )
7787 response = await client .get (
7888 "https://api.atlassian.com/oauth/token/accessible-resources" , headers = headers
7989 )
8090 response .raise_for_status ()
81- return response .json ()
91+ resources = response .json ()
92+ self .logger .info (f"Found { len (resources )} accessible Atlassian resources" )
93+ self .logger .debug (f"Resources: { resources } " )
94+ return resources
95+ except httpx .HTTPStatusError as e :
96+ self .logger .error (
97+ f"HTTP error getting accessible resources: { e .response .status_code } - { e .response .text } "
98+ )
99+ return []
82100 except Exception as e :
83- logger .error (f"Error getting accessible resources: { str (e )} " )
101+ self . logger .error (f"Error getting accessible resources: { str (e )} " , exc_info = True )
84102 return []
85103
86- @staticmethod
87- async def _extract_cloud_id (access_token : str ) -> tuple [str , str ]:
88- """Extract the Atlassian Cloud ID from OAuth 2.0 accessible-resources.
89-
90- Args:
91- access_token: The OAuth access token
92-
93- Returns:
94- cloud_id (str): The cloud instance ID
95- """
96- try :
97- resources = await ConfluenceSource ._get_accessible_resources (access_token )
98-
99- if not resources :
100- logger .warning ("No accessible resources found" )
101- return ""
102-
103- # Use the first available resource
104- # In most cases, there will only be one resource
105- resource = resources [0 ]
106- cloud_id = resource .get ("id" , "" )
107-
108- if not cloud_id :
109- logger .warning ("Missing ID in accessible resources" )
110- return cloud_id
111-
112- except Exception as e :
113- logger .error (f"Error extracting cloud ID: { str (e )} " )
114- return ""
115-
116104 @classmethod
117105 async def create (
118106 cls , access_token : str , config : Optional [Dict [str , Any ]] = None
119107 ) -> "ConfluenceSource" :
120108 """Create a new Confluence source instance."""
121109 instance = cls ()
122110 instance .access_token = access_token
123- instance .cloud_id = await cls ._extract_cloud_id (access_token )
124- instance .base_url = f"https://api.atlassian.com/ex/confluence/{ instance .cloud_id } "
125- logger .info (f"Initialized Confluence source with base URL: { instance .base_url } " )
126111 return instance
127112
113+ @tenacity .retry (
114+ stop = stop_after_attempt (3 ),
115+ wait = wait_exponential (multiplier = 1 , min = 2 , max = 10 ),
116+ retry = tenacity .retry_if_exception_type (httpx .HTTPStatusError ),
117+ reraise = True ,
118+ )
128119 async def _get_with_auth (self , client : httpx .AsyncClient , url : str ) -> Any :
129120 """Make an authenticated GET request to the Confluence REST API using the provided URL.
130121
@@ -137,56 +128,40 @@ async def _get_with_auth(self, client: httpx.AsyncClient, url: str) -> Any:
137128 "X-Atlassian-Token" : "no-check" , # Required for CSRF protection
138129 }
139130
140- # Add cloud instance ID if available
141- if self .cloud_id :
142- headers ["X-Cloud-ID" ] = self .cloud_id
143-
144- self .logger .debug (f"Making request to { url } with headers: { headers } " )
145-
131+ self .logger .debug (f"Request headers: { headers } " )
146132 try :
147133 response = await client .get (url , headers = headers )
134+ response .raise_for_status ()
135+ return response .json ()
148136 except httpx .HTTPStatusError as e :
149137 # Handle 401 Unauthorized - try refreshing token
150138 if e .response .status_code == 401 and self ._token_manager :
151- self .logger .info ("Received 401 error, attempting to refresh token" )
152- refreshed = await self ._token_manager .refresh_on_unauthorized ()
153-
154- if refreshed :
155- # Retry with new token
156- new_access_token = await self .get_access_token ()
157- headers ["Authorization" ] = f"Bearer { new_access_token } "
158- self .logger .info ("Retrying request with refreshed token" )
159- response = await client .get (url , headers = headers )
160- else :
161- raise
162- else :
163- raise
164-
165- if not response .is_success :
166- self .logger .error (f"Request failed with status { response .status_code } " )
167- self .logger .error (f"Response headers: { dict (response .headers )} " )
168- self .logger .error (f"Response body: { response .text } " )
169-
170- # Special handling for scope-related errors
171- if response .status_code == 401 :
172- error_body = response .json () if response .text else {}
173- error_message = error_body .get ("message" , "" )
174-
175- if (
176- "scope" in error_message .lower ()
177- or "x-failure-category" in response .headers
178- and "SCOPE" in response .headers .get ("x-failure-category" , "" )
179- ):
180- self .logger .error (
181- "OAuth scope error. The token doesn't have the required permissions."
182- )
139+ self .logger .warning (
140+ "🔐 Received 401 Unauthorized from Confluence - attempting token refresh"
141+ )
142+ try :
143+ refreshed = await self ._token_manager .refresh_on_unauthorized ()
144+
145+ if refreshed :
146+ # Retry with new token (the retry decorator will handle this)
147+ self .logger .info ("✅ Token refreshed successfully, retrying request" )
148+ raise # Let tenacity retry with the refreshed token
149+ except TokenRefreshError as refresh_error :
150+ # Token refresh failed - provide clear error message
183151 self .logger .error (
184- "Please verify that your OAuth app has the correct scopes configured."
152+ f"❌ Token refresh failed: { str (refresh_error )} . "
153+ f"User may need to reconnect their Confluence account."
185154 )
186- raise ValueError (f"OAuth scope error: { error_message } ." )
155+ # Re-raise with clearer context
156+ raise TokenRefreshError (
157+ f"Failed to refresh Confluence access token: { str (refresh_error )} . "
158+ f"Please reconnect your Confluence account."
159+ ) from refresh_error
187160
188- response .raise_for_status ()
189- return response .json ()
161+ # Log the error details
162+ self .logger .error (f"Request failed: { str (e )} " )
163+ self .logger .error (f"Response body: { e .response .text } " )
164+ raise
190165
191166 async def _generate_space_entities (
192167 self , client : httpx .AsyncClient
@@ -469,35 +444,36 @@ async def _generate_folder_entities(
469444 url = f"{ self .base_url } { next_link } " if next_link else None
470445
471446 async def validate (self ) -> bool :
472- """Verify Confluence OAuth2 token and site access by pinging a lightweight endpoint."""
473- # Ensure we have a cloud_id/base_url; if missing, try to resolve from the current token.
474- if not getattr (self , "cloud_id" , None ) or not getattr (self , "base_url" , None ):
475- token = await self .get_access_token ()
476- if not token :
477- self .logger .error ("Confluence validation failed: no access token available." )
478- return False
479- cloud_id = await self ._extract_cloud_id (token )
480- if not cloud_id :
481- self .logger .error (
482- "Confluence validation failed: unable to resolve Atlassian cloud ID."
483- )
447+ """Verify Confluence OAuth2 token by calling accessible-resources endpoint.
448+
449+ A successful call proves the token is valid and has necessary scopes.
450+ Cloud ID extraction happens lazily during sync.
451+ """
452+ try :
453+ resources = await self ._get_accessible_resources ()
454+
455+ if not resources :
456+ self .logger .error ("Confluence validation failed: no accessible resources found" )
484457 return False
485- self .cloud_id = cloud_id
486- self .base_url = f"https://api.atlassian.com/ex/confluence/{ cloud_id } "
487-
488- # Simple authorized ping against spaces (validates scopes and site reachability).
489- return await self ._validate_oauth2 (
490- ping_url = f"{ self .base_url } /wiki/api/v2/spaces?limit=1" ,
491- headers = {
492- "Accept" : "application/json" ,
493- "X-Atlassian-Token" : "no-check" ,
494- "X-Cloud-ID" : self .cloud_id ,
495- },
496- timeout = 10.0 ,
497- )
458+
459+ self .logger .info ("✅ Confluence validation successful" )
460+ return True
461+
462+ except Exception as e :
463+ self .logger .error (f"Confluence validation failed: { str (e )} " )
464+ return False
498465
499466 async def generate_entities (self ) -> AsyncGenerator [BaseEntity , None ]: # noqa: C901
500467 """Generate all Confluence content."""
468+ self .logger .info ("Starting Confluence entity generation process" )
469+
470+ resources = await self ._get_accessible_resources ()
471+ if not resources :
472+ raise ValueError ("No accessible resources found" )
473+ cloud_id = resources [0 ]["id" ]
474+
475+ self .base_url = f"https://api.atlassian.com/ex/confluence/{ cloud_id } "
476+ self .logger .debug (f"Base URL set to: { self .base_url } " )
501477 async with httpx .AsyncClient () as client :
502478 # 1) Yield all spaces (top-level)
503479 async for space_entity in self ._generate_space_entities (client ):
0 commit comments