@@ -39,6 +39,10 @@ class HubspotSource(BaseSource):
3939 It provides comprehensive access to contacts, companies, deals, and support tickets.
4040 """
4141
42+ # HubSpot API limits
43+ HUBSPOT_API_LIMIT = 100 # Maximum results per page for list endpoints
44+ HUBSPOT_BATCH_SIZE = 100 # Maximum items per batch read request
45+
4246 def __init__ (self ):
4347 """Initialize the HubSpot source."""
4448 super ().__init__ ()
@@ -295,29 +299,38 @@ async def _generate_contact_entities(
295299 ) -> AsyncGenerator [BaseEntity , None ]:
296300 """Generate Contact entities from HubSpot.
297301
298- This uses the POST-based search API to avoid URL length limitations
299- when there are many custom properties:
300- POST /crm/v3/objects/contacts/search
302+ This uses the REST CRM API endpoint for contacts:
303+ GET /crm/v3/objects/contacts
301304 """
302305 # Get all available properties for contacts
303306 all_properties = await self ._get_all_properties (client , "contacts" )
304307
305- url = "https://api.hubapi.com/crm/v3/objects/contacts/search"
306- after = None
307- limit = 100
308-
309- while True :
310- # Build the search request body with properties in the payload
311- search_body = {
312- "properties" : all_properties ,
313- "limit" : limit ,
314- }
315-
316- if after :
317- search_body ["after" ] = after
308+ # Fetch all contact IDs first (without properties to avoid URI length issues)
309+ url = f"https://api.hubapi.com/crm/v3/objects/contacts?limit={ self .HUBSPOT_API_LIMIT } "
310+ contact_ids = []
311+ while url :
312+ data = await self ._get_with_auth (client , url )
313+ for contact in data .get ("results" , []):
314+ contact_ids .append (contact ["id" ])
318315
319- data = await self ._post_with_auth (client , url , search_body )
316+ paging = data .get ("paging" , {})
317+ next_link = paging .get ("next" , {}).get ("link" )
318+ url = next_link if next_link else None
319+
320+ # Batch read contacts with all properties
321+ batch_url = "https://api.hubapi.com/crm/v3/objects/contacts/batch/read"
322+ for i in range (0 , len (contact_ids ), self .HUBSPOT_BATCH_SIZE ):
323+ chunk = contact_ids [i : i + self .HUBSPOT_BATCH_SIZE ]
324+ data = await self ._post_with_auth (
325+ client ,
326+ batch_url ,
327+ {
328+ "inputs" : [{"id" : contact_id } for contact_id in chunk ],
329+ "properties" : all_properties ,
330+ },
331+ )
320332
333+ # Process results
321334 for contact in data .get ("results" , []):
322335 raw_properties = contact .get ("properties" , {})
323336 # Clean properties to remove null/empty values
@@ -354,41 +367,43 @@ async def _generate_contact_entities(
354367 archived = contact .get ("archived" , False ),
355368 )
356369
357- # Handle pagination using 'after' cursor
358- paging = data .get ("paging" , {})
359- after = paging .get ("next" , {}).get ("after" ) if paging else None
360-
361- if not after :
362- break
363-
364370 async def _generate_company_entities (
365371 self , client : httpx .AsyncClient
366372 ) -> AsyncGenerator [BaseEntity , None ]:
367373 """Generate Company entities from HubSpot.
368374
369- This uses the POST-based search API to avoid URL length limitations
370- when there are many custom properties:
371- POST /crm/v3/objects/companies/search
375+ This uses the REST CRM API endpoint for companies:
376+ GET /crm/v3/objects/companies
372377 """
373378 # Get all available properties for companies
374379 all_properties = await self ._get_all_properties (client , "companies" )
375380
376- url = "https://api.hubapi.com/crm/v3/objects/companies/search"
377- after = None
378- limit = 100
379-
380- while True :
381- # Build the search request body with properties in the payload
382- search_body = {
383- "properties" : all_properties ,
384- "limit" : limit ,
385- }
386-
387- if after :
388- search_body ["after" ] = after
381+ # Fetch all company IDs first (without properties to avoid URI length issues)
382+ url = f"https://api.hubapi.com/crm/v3/objects/companies?limit={ self .HUBSPOT_API_LIMIT } "
383+ company_ids = []
384+ while url :
385+ data = await self ._get_with_auth (client , url )
386+ for company in data .get ("results" , []):
387+ company_ids .append (company ["id" ])
389388
390- data = await self ._post_with_auth (client , url , search_body )
389+ paging = data .get ("paging" , {})
390+ next_link = paging .get ("next" , {}).get ("link" )
391+ url = next_link if next_link else None
392+
393+ # Batch read companies with all properties
394+ batch_url = "https://api.hubapi.com/crm/v3/objects/companies/batch/read"
395+ for i in range (0 , len (company_ids ), self .HUBSPOT_BATCH_SIZE ):
396+ chunk = company_ids [i : i + self .HUBSPOT_BATCH_SIZE ]
397+ data = await self ._post_with_auth (
398+ client ,
399+ batch_url ,
400+ {
401+ "inputs" : [{"id" : company_id } for company_id in chunk ],
402+ "properties" : all_properties ,
403+ },
404+ )
391405
406+ # Process results
392407 for company in data .get ("results" , []):
393408 raw_properties = company .get ("properties" , {})
394409 # Clean properties to remove null/empty values
@@ -410,41 +425,43 @@ async def _generate_company_entities(
410425 archived = company .get ("archived" , False ),
411426 )
412427
413- # Handle pagination using 'after' cursor
414- paging = data .get ("paging" , {})
415- after = paging .get ("next" , {}).get ("after" ) if paging else None
416-
417- if not after :
418- break
419-
420428 async def _generate_deal_entities (
421429 self , client : httpx .AsyncClient
422430 ) -> AsyncGenerator [BaseEntity , None ]:
423431 """Generate Deal entities from HubSpot.
424432
425- This uses the POST-based search API to avoid URL length limitations
426- when there are many custom properties:
427- POST /crm/v3/objects/deals/search
433+ This uses the REST CRM API endpoint for deals:
434+ GET /crm/v3/objects/deals
428435 """
429436 # Get all available properties for deals
430437 all_properties = await self ._get_all_properties (client , "deals" )
431438
432- url = "https://api.hubapi.com/crm/v3/objects/deals/search"
433- after = None
434- limit = 100
435-
436- while True :
437- # Build the search request body with properties in the payload
438- search_body = {
439- "properties" : all_properties ,
440- "limit" : limit ,
441- }
442-
443- if after :
444- search_body ["after" ] = after
439+ # Fetch all deal IDs first (without properties to avoid URI length issues)
440+ url = f"https://api.hubapi.com/crm/v3/objects/deals?limit={ self .HUBSPOT_API_LIMIT } "
441+ deal_ids = []
442+ while url :
443+ data = await self ._get_with_auth (client , url )
444+ for deal in data .get ("results" , []):
445+ deal_ids .append (deal ["id" ])
445446
446- data = await self ._post_with_auth (client , url , search_body )
447+ paging = data .get ("paging" , {})
448+ next_link = paging .get ("next" , {}).get ("link" )
449+ url = next_link if next_link else None
450+
451+ # Batch read deals with all properties
452+ batch_url = "https://api.hubapi.com/crm/v3/objects/deals/batch/read"
453+ for i in range (0 , len (deal_ids ), self .HUBSPOT_BATCH_SIZE ):
454+ chunk = deal_ids [i : i + self .HUBSPOT_BATCH_SIZE ]
455+ data = await self ._post_with_auth (
456+ client ,
457+ batch_url ,
458+ {
459+ "inputs" : [{"id" : deal_id } for deal_id in chunk ],
460+ "properties" : all_properties ,
461+ },
462+ )
447463
464+ # Process results
448465 for deal in data .get ("results" , []):
449466 raw_properties = deal .get ("properties" , {})
450467 # Clean properties to remove null/empty values
@@ -467,41 +484,43 @@ async def _generate_deal_entities(
467484 archived = deal .get ("archived" , False ),
468485 )
469486
470- # Handle pagination using 'after' cursor
471- paging = data .get ("paging" , {})
472- after = paging .get ("next" , {}).get ("after" ) if paging else None
473-
474- if not after :
475- break
476-
477487 async def _generate_ticket_entities (
478488 self , client : httpx .AsyncClient
479489 ) -> AsyncGenerator [BaseEntity , None ]:
480490 """Generate Ticket entities from HubSpot.
481491
482- This uses the POST-based search API to avoid URL length limitations
483- when there are many custom properties:
484- POST /crm/v3/objects/tickets/search
492+ This uses the REST CRM API endpoint for tickets:
493+ GET /crm/v3/objects/tickets
485494 """
486495 # Get all available properties for tickets
487496 all_properties = await self ._get_all_properties (client , "tickets" )
488497
489- url = "https://api.hubapi.com/crm/v3/objects/tickets/search"
490- after = None
491- limit = 100
492-
493- while True :
494- # Build the search request body with properties in the payload
495- search_body = {
496- "properties" : all_properties ,
497- "limit" : limit ,
498- }
499-
500- if after :
501- search_body ["after" ] = after
498+ # Fetch all ticket IDs first (without properties to avoid URI length issues)
499+ url = f"https://api.hubapi.com/crm/v3/objects/tickets?limit={ self .HUBSPOT_API_LIMIT } "
500+ ticket_ids = []
501+ while url :
502+ data = await self ._get_with_auth (client , url )
503+ for ticket in data .get ("results" , []):
504+ ticket_ids .append (ticket ["id" ])
502505
503- data = await self ._post_with_auth (client , url , search_body )
506+ paging = data .get ("paging" , {})
507+ next_link = paging .get ("next" , {}).get ("link" )
508+ url = next_link if next_link else None
509+
510+ # Batch read tickets with all properties
511+ batch_url = "https://api.hubapi.com/crm/v3/objects/tickets/batch/read"
512+ for i in range (0 , len (ticket_ids ), self .HUBSPOT_BATCH_SIZE ):
513+ chunk = ticket_ids [i : i + self .HUBSPOT_BATCH_SIZE ]
514+ data = await self ._post_with_auth (
515+ client ,
516+ batch_url ,
517+ {
518+ "inputs" : [{"id" : ticket_id } for ticket_id in chunk ],
519+ "properties" : all_properties ,
520+ },
521+ )
504522
523+ # Process results
505524 for ticket in data .get ("results" , []):
506525 raw_properties = ticket .get ("properties" , {})
507526 # Clean properties to remove null/empty values
@@ -524,13 +543,6 @@ async def _generate_ticket_entities(
524543 archived = ticket .get ("archived" , False ),
525544 )
526545
527- # Handle pagination using 'after' cursor
528- paging = data .get ("paging" , {})
529- after = paging .get ("next" , {}).get ("after" ) if paging else None
530-
531- if not after :
532- break
533-
534546 async def generate_entities (self ) -> AsyncGenerator [BaseEntity , None ]:
535547 """Generate all entities from HubSpot.
536548
0 commit comments