Skip to content

Commit 711aadb

Browse files
committed
Fixed restarting stopped MongoDB Consumer
It appeared that when all processors was stopped then it was still showing that's is running, as currently there's no notification from processors (probably something to have eventually). For now, ensured that flag is set to false when stopping.
1 parent 29c483d commit 711aadb

File tree

3 files changed

+68
-68
lines changed

3 files changed

+68
-68
lines changed

src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStoreConsumer.inMemory.projections.int.spec.ts

Lines changed: 61 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -300,79 +300,79 @@ void describe('mongoDB event store started consumer', () => {
300300
},
301301
);
302302

303-
// void it(
304-
// 'handles only new events when CURRENT position is stored for restarted consumer',
305-
// withDeadline,
306-
// async () => {
307-
// // Given
308-
// const shoppingCartId = `shoppingCart:${uuid()}`;
309-
// const streamName = `shopping_cart-${shoppingCartId}`;
303+
void it(
304+
'handles only new events when CURRENT position is stored for restarted consumer',
305+
withDeadline,
306+
async () => {
307+
// Given
308+
const shoppingCartId = `shoppingCart:${uuid()}`;
309+
const streamName = `shopping_cart-${shoppingCartId}`;
310310

311-
// const initialEvents: ShoppingCartSummaryEvent[] = [
312-
// { type: 'ProductItemAdded', data: { productItem } },
313-
// { type: 'ProductItemAdded', data: { productItem } },
314-
// ];
315-
// const { nextExpectedStreamVersion } = await eventStore.appendToStream(
316-
// streamName,
317-
// initialEvents,
318-
// );
311+
const initialEvents: ShoppingCartSummaryEvent[] = [
312+
{ type: 'ProductItemAdded', data: { productItem } },
313+
{ type: 'ProductItemAdded', data: { productItem } },
314+
];
315+
const { nextExpectedStreamVersion } = await eventStore.appendToStream(
316+
streamName,
317+
initialEvents,
318+
);
319319

320-
// const events: ShoppingCartSummaryEvent[] = [
321-
// { type: 'ProductItemAdded', data: { productItem } },
322-
// {
323-
// type: 'ShoppingCartConfirmed',
324-
// data: { confirmedAt },
325-
// },
326-
// ];
320+
const events: ShoppingCartSummaryEvent[] = [
321+
{ type: 'ProductItemAdded', data: { productItem } },
322+
{
323+
type: 'ShoppingCartConfirmed',
324+
data: { confirmedAt },
325+
},
326+
];
327327

328-
// let stopAfterPosition: bigint | undefined = nextExpectedStreamVersion;
328+
let stopAfterPosition: bigint | undefined = nextExpectedStreamVersion;
329329

330-
// const inMemoryProcessor = inMemoryProjector<ShoppingCartSummaryEvent>({
331-
// processorId: uuid(),
332-
// projection: shoppingCartsSummaryProjection,
333-
// connectionOptions: { database },
334-
// startFrom: 'CURRENT',
335-
// stopAfter: (event) =>
336-
// event.metadata.streamName === streamName &&
337-
// event.metadata.streamPosition === stopAfterPosition,
338-
// });
330+
const inMemoryProcessor = inMemoryProjector<ShoppingCartSummaryEvent>({
331+
processorId: uuid(),
332+
projection: shoppingCartsSummaryProjection,
333+
connectionOptions: { database },
334+
startFrom: 'CURRENT',
335+
stopAfter: (event) =>
336+
event.metadata.streamName === streamName &&
337+
event.metadata.streamPosition === stopAfterPosition,
338+
});
339339

340-
// const consumer = mongoDBEventStoreConsumer<ShoppingCartSummaryEvent>({
341-
// connectionString,
342-
// clientOptions: { directConnection: true },
343-
// processors: [inMemoryProcessor],
344-
// });
340+
const consumer = mongoDBEventStoreConsumer<ShoppingCartSummaryEvent>({
341+
connectionString,
342+
clientOptions: { directConnection: true },
343+
processors: [inMemoryProcessor],
344+
});
345345

346-
// // When
347-
// await consumer.start();
348-
// await consumer.stop();
346+
// When
347+
await consumer.start();
348+
await consumer.stop();
349349

350-
// stopAfterPosition = undefined;
350+
stopAfterPosition = undefined;
351351

352-
// try {
353-
// const consumerPromise = consumer.start();
352+
try {
353+
const consumerPromise = consumer.start();
354354

355-
// const appendResult = await eventStore.appendToStream(
356-
// streamName,
357-
// events,
358-
// );
359-
// stopAfterPosition = appendResult.nextExpectedStreamVersion;
355+
const appendResult = await eventStore.appendToStream(
356+
streamName,
357+
events,
358+
);
359+
stopAfterPosition = appendResult.nextExpectedStreamVersion;
360360

361-
// await consumerPromise;
361+
await consumerPromise;
362362

363-
// const summary = await summaries.findOne((d) => d._id === streamName);
363+
const summary = await summaries.findOne((d) => d._id === streamName);
364364

365-
// assertMatches(summary, {
366-
// _id: streamName,
367-
// status: 'confirmed',
368-
// //_version: 4n,
369-
// productItemsCount: productItem.quantity * 3,
370-
// });
371-
// } finally {
372-
// await consumer.close();
373-
// }
374-
// },
375-
// );
365+
assertMatches(summary, {
366+
_id: streamName,
367+
status: 'confirmed',
368+
//_version: 4n,
369+
productItemsCount: productItem.quantity * 3,
370+
});
371+
} finally {
372+
await consumer.close();
373+
}
374+
},
375+
);
376376

377377
// void it(
378378
// 'handles only new events when CURRENT position is stored for a new consumer',

src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStoreConsumer.int.spec.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,10 @@ void describe('mongoDB event store consumer', () => {
101101
const connectionStringToNotExistingDB = 'mongodb://not-existing:2113';
102102
const consumerToNotExistingServer = mongoDBEventStoreConsumer({
103103
connectionString: connectionStringToNotExistingDB,
104-
clientOptions: { directConnection: true },
104+
clientOptions: {
105+
directConnection: true,
106+
serverSelectionTimeoutMS: 1000,
107+
},
105108
processors: [dummyProcessor],
106109
});
107110
await assertThrowsAsync(

src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStoreConsumer.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import {
1414
} from '@event-driven-io/emmett';
1515
import { MongoClient, type MongoClientOptions } from 'mongodb';
1616
import { v4 as uuid } from 'uuid';
17-
import { CancellationPromise } from './CancellablePromise';
1817
import {
1918
changeStreamReactor,
2019
mongoDBProjector,
@@ -103,7 +102,6 @@ export const mongoDBEventStoreConsumer = <
103102
let start: Promise<void>;
104103
let stream: MongoDBSubscription | undefined;
105104
let isRunning = false;
106-
let runningPromise = new CancellationPromise<null>();
107105
const client =
108106
'client' in options && options.client
109107
? options.client
@@ -144,10 +142,11 @@ export const mongoDBEventStoreConsumer = <
144142
};
145143

146144
const stop = async () => {
145+
if (!isRunning) return;
146+
isRunning = false;
147+
147148
if (stream?.isRunning !== true) return;
148149
await stream.stop();
149-
isRunning = false;
150-
runningPromise.resolve(null);
151150
};
152151

153152
return {
@@ -201,8 +200,6 @@ export const mongoDBEventStoreConsumer = <
201200

202201
isRunning = true;
203202

204-
runningPromise = new CancellationPromise<null>();
205-
206203
const positions = await Promise.all(
207204
processors.map((o) => o.start({ client })),
208205
);

0 commit comments

Comments
 (0)