Skip to content

Commit 2ab6743

Browse files
committed
Fixed passing resume token to MongoDB change stream subscription as resumeAfter
Probably accidentally during refactoring this was removed
1 parent 711aadb commit 2ab6743

File tree

2 files changed

+57
-21
lines changed

2 files changed

+57
-21
lines changed

src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStoreConsumer.inMemory.projections.int.spec.ts

Lines changed: 55 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,10 @@ void describe('mongoDB event store started consumer', () => {
179179
// { type: 'ProductItemAdded', data: { productItem } },
180180
// { type: 'ProductItemAdded', data: { productItem } },
181181
// ];
182-
// const { lastEventGlobalPosition: startPosition } =
183-
// await eventStore.appendToStream(streamName, initialEvents);
182+
// const { nextExpectedStreamVersion } = await eventStore.appendToStream(
183+
// streamName,
184+
// initialEvents,
185+
// );
184186

185187
// const events: ShoppingCartSummaryEvent[] = [
186188
// { type: 'ProductItemAdded', data: { productItem } },
@@ -190,31 +192,64 @@ void describe('mongoDB event store started consumer', () => {
190192
// },
191193
// ];
192194

193-
// let stopAfterPosition: bigint | undefined = undefined;
195+
// let stopAfterPosition: bigint | undefined = nextExpectedStreamVersion;
196+
// let checkpoint: MongoDBCheckpoint | null | undefined = undefined;
194197

195-
// const inMemoryProcessor = inMemoryProjector<ShoppingCartSummaryEvent>({
196-
// processorId: uuid(),
197-
// projection: shoppingCartsSummaryProjection,
198-
// connectionOptions: { database },
199-
// startFrom: { lastCheckpoint: startPosition },
200-
// stopAfter: (event) =>
201-
// event.metadata.globalPosition === stopAfterPosition,
202-
// });
198+
// const projectorOptions: InMemoryProjectorOptions<ShoppingCartSummaryEvent> =
199+
// {
200+
// processorId: uuid(),
201+
// projection: shoppingCartsSummaryProjection,
202+
// connectionOptions: { database },
203+
// stopAfter: (event) => {
204+
// checkpoint = getCheckpoint(event);
205+
// return (
206+
// event.metadata.streamName === streamName &&
207+
// event.metadata.streamPosition === stopAfterPosition
208+
// );
209+
// },
210+
// };
211+
212+
// let eartlierConsumer: MongoDBEventStoreConsumer<ShoppingCartSummaryEvent> =
213+
// undefined!;
203214

204-
// const consumer = mongoDBEventStoreConsumer<ShoppingCartSummaryEvent>({
205-
// connectionString,
206-
// processors: [inMemoryProcessor],
207-
// });
215+
// try {
216+
// eartlierConsumer =
217+
// mongoDBEventStoreConsumer<ShoppingCartSummaryEvent>({
218+
// connectionString,
219+
// clientOptions: { directConnection: true },
220+
// processors: [
221+
// inMemoryProjector<ShoppingCartSummaryEvent>(projectorOptions),
222+
// ],
223+
// });
224+
225+
// await eartlierConsumer.start();
226+
// } finally {
227+
// await eartlierConsumer.close();
228+
// }
208229

209230
// // When
231+
// let consumer: MongoDBEventStoreConsumer<ShoppingCartSummaryEvent> =
232+
// undefined!;
233+
210234
// try {
235+
// consumer = mongoDBEventStoreConsumer<ShoppingCartSummaryEvent>({
236+
// connectionString,
237+
// clientOptions: { directConnection: true },
238+
// processors: [
239+
// inMemoryProjector<ShoppingCartSummaryEvent>({
240+
// ...projectorOptions,
241+
// startFrom: checkpoint,
242+
// }),
243+
// ],
244+
// });
245+
// stopAfterPosition = undefined;
211246
// const consumerPromise = consumer.start();
212247

213248
// const appendResult = await eventStore.appendToStream(
214249
// streamName,
215250
// events,
216251
// );
217-
// stopAfterPosition = appendResult.lastEventGlobalPosition;
252+
// stopAfterPosition = appendResult.nextExpectedStreamVersion;
218253

219254
// await consumerPromise;
220255

@@ -223,7 +258,7 @@ void describe('mongoDB event store started consumer', () => {
223258
// assertMatches(summary, {
224259
// _id: streamName,
225260
// status: 'confirmed',
226-
// _version: 2n,
261+
// _version: 3n,
227262
// productItemsCount: productItem.quantity,
228263
// });
229264
// } finally {
@@ -350,13 +385,14 @@ void describe('mongoDB event store started consumer', () => {
350385
stopAfterPosition = undefined;
351386

352387
try {
353-
const consumerPromise = consumer.start();
354-
355388
const appendResult = await eventStore.appendToStream(
356389
streamName,
357390
events,
358391
);
359392
stopAfterPosition = appendResult.nextExpectedStreamVersion;
393+
console.log('stopAfterPosition', stopAfterPosition);
394+
395+
const consumerPromise = consumer.start();
360396

361397
await consumerPromise;
362398

src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ const createChangeStream = <EventType extends Message = AnyMessage>(
325325
}
326326
: resumeToken === 'END'
327327
? void 0
328-
: toMongoDBResumeToken(resumeToken.lastCheckpoint)),
328+
: { resumeAfter: toMongoDBResumeToken(resumeToken.lastCheckpoint) }),
329329
});
330330
};
331331

@@ -404,7 +404,7 @@ export const mongoDBSubscription = <MessageType extends Message = AnyMessage>({
404404
}
405405

406406
console.info(
407-
`Starting subscription. ${retry++} retries. From: ${JSONParser.stringify(from ?? '$all')}, Start from: ${JSONParser.stringify(
407+
`Starting subscription. ${retry++} retries. From: ${JSONParser.stringify(from)}, Start from: ${JSONParser.stringify(
408408
options.startFrom,
409409
)}`,
410410
);

0 commit comments

Comments
 (0)