Skip to content

Commit a1d791d

Browse files
authored
Merge pull request #1064 from airweave-ai/fix/hubspot
fix(hubspot): Use POST search API to avoid 414 URI Too Long errors
2 parents 7c1480f + 130faff commit a1d791d

File tree

4 files changed

+157
-44
lines changed

4 files changed

+157
-44
lines changed

backend/airweave/platform/sources/hubspot.py

Lines changed: 139 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,49 @@ async def _get_with_auth(self, client: httpx.AsyncClient, url: str) -> Dict:
8484
response.raise_for_status()
8585
return response.json()
8686

87+
@retry(
88+
stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), reraise=True
89+
)
90+
async def _post_with_auth(
91+
self, client: httpx.AsyncClient, url: str, json_data: Dict[str, Any]
92+
) -> Dict:
93+
"""Make authenticated POST request to HubSpot API.
94+
95+
Args:
96+
client: HTTP client
97+
url: API endpoint URL
98+
json_data: JSON payload for POST body
99+
100+
Returns:
101+
JSON response from API
102+
"""
103+
# Get fresh token (will refresh if needed)
104+
access_token = await self.get_access_token()
105+
headers = {
106+
"Authorization": f"Bearer {access_token}",
107+
"Content-Type": "application/json",
108+
}
109+
110+
response = await client.post(url, headers=headers, json=json_data)
111+
112+
# Handle 401 errors by refreshing token and retrying
113+
if response.status_code == 401:
114+
self.logger.warning(
115+
f"Got 401 Unauthorized from HubSpot API at {url}, refreshing token..."
116+
)
117+
await self.refresh_on_unauthorized()
118+
119+
# Get new token and retry
120+
access_token = await self.get_access_token()
121+
headers = {
122+
"Authorization": f"Bearer {access_token}",
123+
"Content-Type": "application/json",
124+
}
125+
response = await client.post(url, headers=headers, json=json_data)
126+
127+
response.raise_for_status()
128+
return response.json()
129+
87130
def _safe_float_conversion(self, value: Any) -> Optional[float]:
88131
"""Safely convert a value to float, handling empty strings and None."""
89132
if not value or value == "":
@@ -204,16 +247,29 @@ async def _generate_contact_entities(
204247
) -> AsyncGenerator[BaseEntity, None]:
205248
"""Generate Contact entities from HubSpot.
206249
207-
This uses the REST CRM API endpoint for contacts:
208-
GET /crm/v3/objects/contacts
250+
This uses the POST-based search API to avoid URL length limitations
251+
when there are many custom properties:
252+
POST /crm/v3/objects/contacts/search
209253
"""
210254
# Get all available properties for contacts
211255
all_properties = await self._get_all_properties(client, "contacts")
212-
properties_param = ",".join(all_properties)
213256

214-
url = f"https://api.hubapi.com/crm/v3/objects/contacts?properties={properties_param}"
215-
while url:
216-
data = await self._get_with_auth(client, url)
257+
url = "https://api.hubapi.com/crm/v3/objects/contacts/search"
258+
after = None
259+
limit = 100
260+
261+
while True:
262+
# Build the search request body with properties in the payload
263+
search_body = {
264+
"properties": all_properties,
265+
"limit": limit,
266+
}
267+
268+
if after:
269+
search_body["after"] = after
270+
271+
data = await self._post_with_auth(client, url, search_body)
272+
217273
for contact in data.get("results", []):
218274
raw_properties = contact.get("properties", {})
219275
# Clean properties to remove null/empty values
@@ -250,26 +306,41 @@ async def _generate_contact_entities(
250306
archived=contact.get("archived", False),
251307
)
252308

253-
# Handle pagination
309+
# Handle pagination using 'after' cursor
254310
paging = data.get("paging", {})
255-
next_link = paging.get("next", {}).get("link")
256-
url = next_link if next_link else None
311+
after = paging.get("next", {}).get("after") if paging else None
312+
313+
if not after:
314+
break
257315

258316
async def _generate_company_entities(
259317
self, client: httpx.AsyncClient
260318
) -> AsyncGenerator[BaseEntity, None]:
261319
"""Generate Company entities from HubSpot.
262320
263-
This uses the REST CRM API endpoint for companies:
264-
GET /crm/v3/objects/companies
321+
This uses the POST-based search API to avoid URL length limitations
322+
when there are many custom properties:
323+
POST /crm/v3/objects/companies/search
265324
"""
266325
# Get all available properties for companies
267326
all_properties = await self._get_all_properties(client, "companies")
268-
properties_param = ",".join(all_properties)
269327

270-
url = f"https://api.hubapi.com/crm/v3/objects/companies?properties={properties_param}"
271-
while url:
272-
data = await self._get_with_auth(client, url)
328+
url = "https://api.hubapi.com/crm/v3/objects/companies/search"
329+
after = None
330+
limit = 100
331+
332+
while True:
333+
# Build the search request body with properties in the payload
334+
search_body = {
335+
"properties": all_properties,
336+
"limit": limit,
337+
}
338+
339+
if after:
340+
search_body["after"] = after
341+
342+
data = await self._post_with_auth(client, url, search_body)
343+
273344
for company in data.get("results", []):
274345
raw_properties = company.get("properties", {})
275346
# Clean properties to remove null/empty values
@@ -291,25 +362,41 @@ async def _generate_company_entities(
291362
archived=company.get("archived", False),
292363
)
293364

365+
# Handle pagination using 'after' cursor
294366
paging = data.get("paging", {})
295-
next_link = paging.get("next", {}).get("link")
296-
url = next_link if next_link else None
367+
after = paging.get("next", {}).get("after") if paging else None
368+
369+
if not after:
370+
break
297371

298372
async def _generate_deal_entities(
299373
self, client: httpx.AsyncClient
300374
) -> AsyncGenerator[BaseEntity, None]:
301375
"""Generate Deal entities from HubSpot.
302376
303-
This uses the REST CRM API endpoint for deals:
304-
GET /crm/v3/objects/deals
377+
This uses the POST-based search API to avoid URL length limitations
378+
when there are many custom properties:
379+
POST /crm/v3/objects/deals/search
305380
"""
306381
# Get all available properties for deals
307382
all_properties = await self._get_all_properties(client, "deals")
308-
properties_param = ",".join(all_properties)
309383

310-
url = f"https://api.hubapi.com/crm/v3/objects/deals?properties={properties_param}"
311-
while url:
312-
data = await self._get_with_auth(client, url)
384+
url = "https://api.hubapi.com/crm/v3/objects/deals/search"
385+
after = None
386+
limit = 100
387+
388+
while True:
389+
# Build the search request body with properties in the payload
390+
search_body = {
391+
"properties": all_properties,
392+
"limit": limit,
393+
}
394+
395+
if after:
396+
search_body["after"] = after
397+
398+
data = await self._post_with_auth(client, url, search_body)
399+
313400
for deal in data.get("results", []):
314401
raw_properties = deal.get("properties", {})
315402
# Clean properties to remove null/empty values
@@ -332,25 +419,41 @@ async def _generate_deal_entities(
332419
archived=deal.get("archived", False),
333420
)
334421

422+
# Handle pagination using 'after' cursor
335423
paging = data.get("paging", {})
336-
next_link = paging.get("next", {}).get("link")
337-
url = next_link if next_link else None
424+
after = paging.get("next", {}).get("after") if paging else None
425+
426+
if not after:
427+
break
338428

339429
async def _generate_ticket_entities(
340430
self, client: httpx.AsyncClient
341431
) -> AsyncGenerator[BaseEntity, None]:
342432
"""Generate Ticket entities from HubSpot.
343433
344-
This uses the REST CRM API endpoint for tickets:
345-
GET /crm/v3/objects/tickets
434+
This uses the POST-based search API to avoid URL length limitations
435+
when there are many custom properties:
436+
POST /crm/v3/objects/tickets/search
346437
"""
347438
# Get all available properties for tickets
348439
all_properties = await self._get_all_properties(client, "tickets")
349-
properties_param = ",".join(all_properties)
350440

351-
url = f"https://api.hubapi.com/crm/v3/objects/tickets?properties={properties_param}"
352-
while url:
353-
data = await self._get_with_auth(client, url)
441+
url = "https://api.hubapi.com/crm/v3/objects/tickets/search"
442+
after = None
443+
limit = 100
444+
445+
while True:
446+
# Build the search request body with properties in the payload
447+
search_body = {
448+
"properties": all_properties,
449+
"limit": limit,
450+
}
451+
452+
if after:
453+
search_body["after"] = after
454+
455+
data = await self._post_with_auth(client, url, search_body)
456+
354457
for ticket in data.get("results", []):
355458
raw_properties = ticket.get("properties", {})
356459
# Clean properties to remove null/empty values
@@ -373,9 +476,12 @@ async def _generate_ticket_entities(
373476
archived=ticket.get("archived", False),
374477
)
375478

479+
# Handle pagination using 'after' cursor
376480
paging = data.get("paging", {})
377-
next_link = paging.get("next", {}).get("link")
378-
url = next_link if next_link else None
481+
after = paging.get("next", {}).get("after") if paging else None
482+
483+
if not after:
484+
break
379485

380486
async def generate_entities(self) -> AsyncGenerator[BaseEntity, None]:
381487
"""Generate all entities from HubSpot.

monke/bongos/hubspot.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,15 @@ async def generate_contact_data(token: str):
4444
async with httpx.AsyncClient(base_url=HUBSPOT_API, timeout=30) as client:
4545
for token, c in gen_results:
4646
await self._pace()
47+
# Embed token in multiple fields for better vector search recall
48+
# Token in company name is most reliable for vector search
4749
payload = {
4850
"properties": {
4951
"email": c.email,
5052
"firstname": c.firstname,
5153
"lastname": c.lastname,
54+
"company": f"{c.company or 'Monke Test Corp'} [{token}]", # Token in company
5255
**({"phone": c.phone} if c.phone else {}),
53-
**({"company": c.company} if c.company else {}),
5456
**({"address": c.address} if c.address else {}),
5557
**({"city": c.city} if c.city else {}),
5658
**({"state": c.state} if c.state else {}),
@@ -88,11 +90,15 @@ async def update_entities(self) -> List[Dict[str, Any]]:
8890
async with httpx.AsyncClient(base_url=HUBSPOT_API, timeout=30) as client:
8991
for ent in self._contacts[: min(3, len(self._contacts))]:
9092
await self._pace()
91-
# tiny tweak: add a note-ish detail (mapped to 'company' or 'phone')
93+
# Update phone but preserve token in company name
94+
token = ent.get("token", "")
9295
r = await client.patch(
9396
f"/crm/v3/objects/contacts/{ent['id']}",
9497
headers=self._hdrs(),
95-
json={"properties": {"company": "Monke QA", "phone": "+1-555-0100"}},
98+
json={"properties": {
99+
"phone": "+1-555-0100",
100+
"company": f"Monke QA Updated [{token}]" # Preserve token
101+
}},
96102
)
97103
r.raise_for_status()
98104
updated.append({**ent, "updated": True})

monke/configs/hubspot.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@ connector:
55
type: "hubspot"
66
auth_mode: composio
77
composio_config:
8-
account_id: ca_example_hubspot
9-
auth_config_id: ac_example_hubspot
8+
account_id: ca_vG0a0ruEje88
9+
auth_config_id: ac_lTZKrCIwL1ks
1010
config_fields:
1111
openai_model: "gpt-4.1-mini"
1212
rate_limit_delay_ms: 800
13+
post_create_sleep_seconds: 45 # Extra time for HubSpot to index contacts
1314
test_flow:
1415
steps: [cleanup, create, sync, verify, update, sync, verify, partial_delete, sync, verify_partial_deletion, verify_remaining_entities, complete_delete, sync, verify_complete_deletion, cleanup]
1516
entity_count: 3

monke/generation/hubspot.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,22 @@
44

55
async def generate_hubspot_contact(model: str, token: str) -> HubSpotContact:
66
"""
7-
Generate a realistic CRM contact. The email MUST contain the token (e.g., [email protected])
8-
so we can reliably verify later via search.
7+
Generate a realistic CRM contact with embedded token for verification.
8+
Token will be added to company field for reliable vector search.
99
"""
1010
llm = LLMClient(model_override=model)
1111
instruction = (
1212
"Generate a realistic CRM contact for a B2B SaaS context. "
13-
f"The literal token '{token}' MUST be embedded in the email local-part (e.g., '{token}@example.test') "
14-
"and may appear in notes. Include plausible fields."
13+
f"The email should be '{token}@monke-test.com'. "
14+
"Include realistic firstname, lastname, company name, and other contact details. "
15+
"Make it look like a real business contact."
1516
)
1617
contact = await llm.generate_structured(HubSpotContact, instruction)
1718

1819
# Ensure invariants
1920
contact.token = token
2021
if token not in contact.email:
2122
# Force tokenized email (stable id)
22-
local = f"{token}.contact"
23-
contact.email = f"{local}@example.test"
23+
contact.email = f"{token}@monke-test.com"
2424

2525
return contact

0 commit comments

Comments
 (0)