@@ -15,6 +15,7 @@ import {
1515 coerceError ,
1616 RETRYABLE_ERROR_CODES ,
1717 RetryOptions ,
18+ safeEmit ,
1819 sleep ,
1920} from "./lib" ;
2021import { batchGetJobs } from "./sql/getJobs" ;
@@ -231,7 +232,7 @@ export class LocalQueue {
231232 private refetchDelayAbortThreshold : number = Infinity ;
232233
233234 constructor (
234- private readonly compiledSharedOptions : CompiledSharedOptions < WorkerPoolOptions > ,
235+ private readonly ctx : CompiledSharedOptions < WorkerPoolOptions > ,
235236 private readonly tasks : TaskList ,
236237 private readonly withPgClient : EnhancedWithPgClient ,
237238 public readonly workerPool : WorkerPool ,
@@ -244,13 +245,10 @@ export class LocalQueue {
244245 private readonly continuous : boolean ,
245246 private readonly onMajorError : ( e : unknown ) => void ,
246247 ) {
247- this . ttl =
248- compiledSharedOptions . resolvedPreset . worker . localQueue ?. ttl ?? 5 * MINUTE ;
249- this . pollInterval =
250- compiledSharedOptions . resolvedPreset . worker . pollInterval ?? 2 * SECOND ;
248+ this . ttl = ctx . resolvedPreset . worker . localQueue ?. ttl ?? 5 * MINUTE ;
249+ this . pollInterval = ctx . resolvedPreset . worker . pollInterval ?? 2 * SECOND ;
251250 const localQueueRefetchDelayDuration =
252- compiledSharedOptions . resolvedPreset . worker . localQueue ?. refetchDelay
253- ?. durationMs ;
251+ ctx . resolvedPreset . worker . localQueue ?. refetchDelay ?. durationMs ;
254252 if (
255253 localQueueRefetchDelayDuration != null &&
256254 localQueueRefetchDelayDuration > this . pollInterval
@@ -259,8 +257,8 @@ export class LocalQueue {
259257 `Invalid configuration; 'preset.worker.localQueue.refetchDelay.durationMs' (${ localQueueRefetchDelayDuration } ) must not be larger than 'preset.worker.pollInterval' (${ this . pollInterval } )` ,
260258 ) ;
261259 }
262- compiledSharedOptions . events . emit ( "localQueue:init" , {
263- ctx : compiledSharedOptions ,
260+ safeEmit ( ctx , "localQueue:init" , {
261+ ctx : ctx ,
264262 localQueue : this ,
265263 } ) ;
266264 // Immediately enter polling mode.
@@ -276,8 +274,8 @@ export class LocalQueue {
276274 const oldMode = this . mode ;
277275 // Override the 'readonly'
278276 ( this . mode as LocalQueueMode ) = newMode ;
279- this . compiledSharedOptions . events . emit ( "localQueue:setMode" , {
280- ctx : this . compiledSharedOptions ,
277+ safeEmit ( this . ctx , "localQueue:setMode" , {
278+ ctx : this . ctx ,
281279 localQueue : this ,
282280 oldMode,
283281 newMode,
@@ -321,7 +319,7 @@ export class LocalQueue {
321319 } else {
322320 // If we're not shutting down, view this as a temporary error (but give
323321 // Benjie a wrist slap anyway).
324- this . compiledSharedOptions . logger . error (
322+ this . ctx . logger . error (
325323 `GraphileWorkerInternalError<cd483429-3372-42f0-bcf6-c78e045c760d>: Backgrounding should never yield errors when the queue is not RELEASED` ,
326324 { error : e } ,
327325 ) ;
@@ -444,8 +442,8 @@ export class LocalQueue {
444442 }
445443 const jobsToReturn = this . jobQueue . splice ( 0 , l ) ;
446444
447- this . compiledSharedOptions . events . emit ( "localQueue:returnJobs" , {
448- ctx : this . compiledSharedOptions ,
445+ safeEmit ( this . ctx , "localQueue:returnJobs" , {
446+ ctx : this . ctx ,
449447 localQueue : this ,
450448 jobs : jobsToReturn ,
451449 } ) ;
@@ -459,7 +457,7 @@ export class LocalQueue {
459457 initialError = lastError ;
460458 }
461459
462- this . compiledSharedOptions . logger . error (
460+ this . ctx . logger . error (
463461 `Failed to return jobs from local queue to database queue (attempt ${ attempts } /${ maxAttempts } )` ,
464462 {
465463 error : e ,
@@ -502,7 +500,7 @@ export class LocalQueue {
502500 ++ attempts ;
503501 return sleep ( delay ) . then ( ( ) =>
504502 returnJobs (
505- this . compiledSharedOptions ,
503+ this . ctx ,
506504 this . withPgClient , // We'll handle the retries via onError
507505 this . workerPool . id ,
508506 jobsToReturn ,
@@ -517,7 +515,7 @@ export class LocalQueue {
517515 // `onError` above, since `onError` returns the next promise each time.
518516 this . background (
519517 returnJobs (
520- this . compiledSharedOptions ,
518+ this . ctx ,
521519 this . withPgClient , // We'll handle the retries via onError
522520 this . workerPool . id ,
523521 jobsToReturn ,
@@ -554,7 +552,7 @@ export class LocalQueue {
554552 this . background (
555553 this . _fetch ( ) . catch ( ( e ) => {
556554 // This should not happen
557- this . compiledSharedOptions . logger . error ( `Error occurred during fetch` , {
555+ this . ctx . logger . error ( `Error occurred during fetch` , {
558556 error : e ,
559557 } ) ;
560558 } ) ,
@@ -578,7 +576,7 @@ export class LocalQueue {
578576 /** How many jobs did we fetch? (Initialize to zero in case of error.) */
579577 let jobCount = 0 ;
580578 const refetchDelayOptions =
581- this . compiledSharedOptions . resolvedPreset . worker . localQueue ?. refetchDelay ;
579+ this . ctx . resolvedPreset . worker . localQueue ?. refetchDelay ;
582580 try {
583581 assert . equal ( this . mode , POLLING , "Can only fetch when in polling mode" ) ;
584582 assert . equal (
@@ -605,16 +603,16 @@ export class LocalQueue {
605603
606604 // The ONLY await in this function.
607605 const jobs = await batchGetJobs (
608- this . compiledSharedOptions ,
606+ this . ctx ,
609607 this . withPgClient ,
610608 this . tasks ,
611609 this . workerPool . id ,
612610 null , // `flagsToSkip` is not set, see `LocalQueue.getJob`
613611 this . getJobBatchSize ,
614612 ) ;
615613
616- this . compiledSharedOptions . events . emit ( "localQueue:getJobs:complete" , {
617- ctx : this . compiledSharedOptions ,
614+ safeEmit ( this . ctx , "localQueue:getJobs:complete" , {
615+ ctx : this . ctx ,
618616 localQueue : this ,
619617 jobs,
620618 } ) ;
@@ -634,7 +632,7 @@ export class LocalQueue {
634632 this . receivedJobs ( jobs ) ;
635633 } catch ( e ) {
636634 // Error happened; rely on poll interval.
637- this . compiledSharedOptions . logger . error (
635+ this . ctx . logger . error (
638636 `Error occurred fetching jobs; will try again on next poll interval. Error: ${ e } ` ,
639637 { error : e } ,
640638 ) ;
@@ -674,8 +672,8 @@ export class LocalQueue {
674672 this . refetchDelayCompleteOrAbort ,
675673 refetchDelayMs ,
676674 ) ;
677- this . compiledSharedOptions . events . emit ( "localQueue:refetchDelay:start" , {
678- ctx : this . compiledSharedOptions ,
675+ safeEmit ( this . ctx , "localQueue:refetchDelay:start" , {
676+ ctx : this . ctx ,
679677 localQueue : this ,
680678 jobCount,
681679 threshold : refetchDelayOptions ?. threshold ?? 0 ,
@@ -720,20 +718,17 @@ export class LocalQueue {
720718 // Force refetch because we've been notified of so many jobs!
721719 this . refetchDelayFetchOnComplete = true ;
722720
723- this . compiledSharedOptions . events . emit ( "localQueue:refetchDelay:abort" , {
724- ctx : this . compiledSharedOptions ,
721+ safeEmit ( this . ctx , "localQueue:refetchDelay:abort" , {
722+ ctx : this . ctx ,
725723 localQueue : this ,
726724 count : this . refetchDelayCounter ,
727725 abortThreshold : this . refetchDelayAbortThreshold ,
728726 } ) ;
729727 } else {
730- this . compiledSharedOptions . events . emit (
731- "localQueue:refetchDelay:expired" ,
732- {
733- ctx : this . compiledSharedOptions ,
734- localQueue : this ,
735- } ,
736- ) ;
728+ safeEmit ( this . ctx , "localQueue:refetchDelay:expired" , {
729+ ctx : this . ctx ,
730+ localQueue : this ,
731+ } ) ;
737732 }
738733
739734 if ( this . mode === POLLING && this . refetchDelayFetchOnComplete ) {
@@ -789,7 +784,7 @@ export class LocalQueue {
789784 if ( flagsToSkip !== null ) {
790785 // PERF: we could actually batch for similar flags, I guess.
791786 const jobsPromise = batchGetJobs (
792- this . compiledSharedOptions ,
787+ this . ctx ,
793788 this . withPgClient ,
794789 this . tasks ,
795790 this . workerPool . id ,
0 commit comments