Skip to content

Commit 9daf110

Browse files
authored
Merge pull request #1070 from airweave-ai/feat/batchread_hubspot
feat(hubspot): optimize data fetching with batch reads and increased pagination limits
2 parents 4085a91 + d9e1506 commit 9daf110

File tree

1 file changed

+108
-96
lines changed

1 file changed

+108
-96
lines changed

backend/airweave/platform/sources/hubspot.py

Lines changed: 108 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ class HubspotSource(BaseSource):
3939
It provides comprehensive access to contacts, companies, deals, and support tickets.
4040
"""
4141

42+
# HubSpot API limits
43+
HUBSPOT_API_LIMIT = 100 # Maximum results per page for list endpoints
44+
HUBSPOT_BATCH_SIZE = 100 # Maximum items per batch read request
45+
4246
def __init__(self):
4347
"""Initialize the HubSpot source."""
4448
super().__init__()
@@ -295,29 +299,38 @@ async def _generate_contact_entities(
295299
) -> AsyncGenerator[BaseEntity, None]:
296300
"""Generate Contact entities from HubSpot.
297301
298-
This uses the POST-based search API to avoid URL length limitations
299-
when there are many custom properties:
300-
POST /crm/v3/objects/contacts/search
302+
This uses the REST CRM API endpoint for contacts:
303+
GET /crm/v3/objects/contacts
301304
"""
302305
# Get all available properties for contacts
303306
all_properties = await self._get_all_properties(client, "contacts")
304307

305-
url = "https://api.hubapi.com/crm/v3/objects/contacts/search"
306-
after = None
307-
limit = 100
308-
309-
while True:
310-
# Build the search request body with properties in the payload
311-
search_body = {
312-
"properties": all_properties,
313-
"limit": limit,
314-
}
315-
316-
if after:
317-
search_body["after"] = after
308+
# Fetch all contact IDs first (without properties to avoid URI length issues)
309+
url = f"https://api.hubapi.com/crm/v3/objects/contacts?limit={self.HUBSPOT_API_LIMIT}"
310+
contact_ids = []
311+
while url:
312+
data = await self._get_with_auth(client, url)
313+
for contact in data.get("results", []):
314+
contact_ids.append(contact["id"])
318315

319-
data = await self._post_with_auth(client, url, search_body)
316+
paging = data.get("paging", {})
317+
next_link = paging.get("next", {}).get("link")
318+
url = next_link if next_link else None
319+
320+
# Batch read contacts with all properties
321+
batch_url = "https://api.hubapi.com/crm/v3/objects/contacts/batch/read"
322+
for i in range(0, len(contact_ids), self.HUBSPOT_BATCH_SIZE):
323+
chunk = contact_ids[i : i + self.HUBSPOT_BATCH_SIZE]
324+
data = await self._post_with_auth(
325+
client,
326+
batch_url,
327+
{
328+
"inputs": [{"id": contact_id} for contact_id in chunk],
329+
"properties": all_properties,
330+
},
331+
)
320332

333+
# Process results
321334
for contact in data.get("results", []):
322335
raw_properties = contact.get("properties", {})
323336
# Clean properties to remove null/empty values
@@ -354,41 +367,43 @@ async def _generate_contact_entities(
354367
archived=contact.get("archived", False),
355368
)
356369

357-
# Handle pagination using 'after' cursor
358-
paging = data.get("paging", {})
359-
after = paging.get("next", {}).get("after") if paging else None
360-
361-
if not after:
362-
break
363-
364370
async def _generate_company_entities(
365371
self, client: httpx.AsyncClient
366372
) -> AsyncGenerator[BaseEntity, None]:
367373
"""Generate Company entities from HubSpot.
368374
369-
This uses the POST-based search API to avoid URL length limitations
370-
when there are many custom properties:
371-
POST /crm/v3/objects/companies/search
375+
This uses the REST CRM API endpoint for companies:
376+
GET /crm/v3/objects/companies
372377
"""
373378
# Get all available properties for companies
374379
all_properties = await self._get_all_properties(client, "companies")
375380

376-
url = "https://api.hubapi.com/crm/v3/objects/companies/search"
377-
after = None
378-
limit = 100
379-
380-
while True:
381-
# Build the search request body with properties in the payload
382-
search_body = {
383-
"properties": all_properties,
384-
"limit": limit,
385-
}
386-
387-
if after:
388-
search_body["after"] = after
381+
# Fetch all company IDs first (without properties to avoid URI length issues)
382+
url = f"https://api.hubapi.com/crm/v3/objects/companies?limit={self.HUBSPOT_API_LIMIT}"
383+
company_ids = []
384+
while url:
385+
data = await self._get_with_auth(client, url)
386+
for company in data.get("results", []):
387+
company_ids.append(company["id"])
389388

390-
data = await self._post_with_auth(client, url, search_body)
389+
paging = data.get("paging", {})
390+
next_link = paging.get("next", {}).get("link")
391+
url = next_link if next_link else None
392+
393+
# Batch read companies with all properties
394+
batch_url = "https://api.hubapi.com/crm/v3/objects/companies/batch/read"
395+
for i in range(0, len(company_ids), self.HUBSPOT_BATCH_SIZE):
396+
chunk = company_ids[i : i + self.HUBSPOT_BATCH_SIZE]
397+
data = await self._post_with_auth(
398+
client,
399+
batch_url,
400+
{
401+
"inputs": [{"id": company_id} for company_id in chunk],
402+
"properties": all_properties,
403+
},
404+
)
391405

406+
# Process results
392407
for company in data.get("results", []):
393408
raw_properties = company.get("properties", {})
394409
# Clean properties to remove null/empty values
@@ -410,41 +425,43 @@ async def _generate_company_entities(
410425
archived=company.get("archived", False),
411426
)
412427

413-
# Handle pagination using 'after' cursor
414-
paging = data.get("paging", {})
415-
after = paging.get("next", {}).get("after") if paging else None
416-
417-
if not after:
418-
break
419-
420428
async def _generate_deal_entities(
421429
self, client: httpx.AsyncClient
422430
) -> AsyncGenerator[BaseEntity, None]:
423431
"""Generate Deal entities from HubSpot.
424432
425-
This uses the POST-based search API to avoid URL length limitations
426-
when there are many custom properties:
427-
POST /crm/v3/objects/deals/search
433+
This uses the REST CRM API endpoint for deals:
434+
GET /crm/v3/objects/deals
428435
"""
429436
# Get all available properties for deals
430437
all_properties = await self._get_all_properties(client, "deals")
431438

432-
url = "https://api.hubapi.com/crm/v3/objects/deals/search"
433-
after = None
434-
limit = 100
435-
436-
while True:
437-
# Build the search request body with properties in the payload
438-
search_body = {
439-
"properties": all_properties,
440-
"limit": limit,
441-
}
442-
443-
if after:
444-
search_body["after"] = after
439+
# Fetch all deal IDs first (without properties to avoid URI length issues)
440+
url = f"https://api.hubapi.com/crm/v3/objects/deals?limit={self.HUBSPOT_API_LIMIT}"
441+
deal_ids = []
442+
while url:
443+
data = await self._get_with_auth(client, url)
444+
for deal in data.get("results", []):
445+
deal_ids.append(deal["id"])
445446

446-
data = await self._post_with_auth(client, url, search_body)
447+
paging = data.get("paging", {})
448+
next_link = paging.get("next", {}).get("link")
449+
url = next_link if next_link else None
450+
451+
# Batch read deals with all properties
452+
batch_url = "https://api.hubapi.com/crm/v3/objects/deals/batch/read"
453+
for i in range(0, len(deal_ids), self.HUBSPOT_BATCH_SIZE):
454+
chunk = deal_ids[i : i + self.HUBSPOT_BATCH_SIZE]
455+
data = await self._post_with_auth(
456+
client,
457+
batch_url,
458+
{
459+
"inputs": [{"id": deal_id} for deal_id in chunk],
460+
"properties": all_properties,
461+
},
462+
)
447463

464+
# Process results
448465
for deal in data.get("results", []):
449466
raw_properties = deal.get("properties", {})
450467
# Clean properties to remove null/empty values
@@ -467,41 +484,43 @@ async def _generate_deal_entities(
467484
archived=deal.get("archived", False),
468485
)
469486

470-
# Handle pagination using 'after' cursor
471-
paging = data.get("paging", {})
472-
after = paging.get("next", {}).get("after") if paging else None
473-
474-
if not after:
475-
break
476-
477487
async def _generate_ticket_entities(
478488
self, client: httpx.AsyncClient
479489
) -> AsyncGenerator[BaseEntity, None]:
480490
"""Generate Ticket entities from HubSpot.
481491
482-
This uses the POST-based search API to avoid URL length limitations
483-
when there are many custom properties:
484-
POST /crm/v3/objects/tickets/search
492+
This uses the REST CRM API endpoint for tickets:
493+
GET /crm/v3/objects/tickets
485494
"""
486495
# Get all available properties for tickets
487496
all_properties = await self._get_all_properties(client, "tickets")
488497

489-
url = "https://api.hubapi.com/crm/v3/objects/tickets/search"
490-
after = None
491-
limit = 100
492-
493-
while True:
494-
# Build the search request body with properties in the payload
495-
search_body = {
496-
"properties": all_properties,
497-
"limit": limit,
498-
}
499-
500-
if after:
501-
search_body["after"] = after
498+
# Fetch all ticket IDs first (without properties to avoid URI length issues)
499+
url = f"https://api.hubapi.com/crm/v3/objects/tickets?limit={self.HUBSPOT_API_LIMIT}"
500+
ticket_ids = []
501+
while url:
502+
data = await self._get_with_auth(client, url)
503+
for ticket in data.get("results", []):
504+
ticket_ids.append(ticket["id"])
502505

503-
data = await self._post_with_auth(client, url, search_body)
506+
paging = data.get("paging", {})
507+
next_link = paging.get("next", {}).get("link")
508+
url = next_link if next_link else None
509+
510+
# Batch read tickets with all properties
511+
batch_url = "https://api.hubapi.com/crm/v3/objects/tickets/batch/read"
512+
for i in range(0, len(ticket_ids), self.HUBSPOT_BATCH_SIZE):
513+
chunk = ticket_ids[i : i + self.HUBSPOT_BATCH_SIZE]
514+
data = await self._post_with_auth(
515+
client,
516+
batch_url,
517+
{
518+
"inputs": [{"id": ticket_id} for ticket_id in chunk],
519+
"properties": all_properties,
520+
},
521+
)
504522

523+
# Process results
505524
for ticket in data.get("results", []):
506525
raw_properties = ticket.get("properties", {})
507526
# Clean properties to remove null/empty values
@@ -524,13 +543,6 @@ async def _generate_ticket_entities(
524543
archived=ticket.get("archived", False),
525544
)
526545

527-
# Handle pagination using 'after' cursor
528-
paging = data.get("paging", {})
529-
after = paging.get("next", {}).get("after") if paging else None
530-
531-
if not after:
532-
break
533-
534546
async def generate_entities(self) -> AsyncGenerator[BaseEntity, None]:
535547
"""Generate all entities from HubSpot.
536548

0 commit comments

Comments
 (0)