Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 68 additions & 17 deletions workers/grouper/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ const DB_DUPLICATE_KEY_ERROR = '11000';
*/
const MAX_CODE_LINE_LENGTH = 140;

/**
* TTL for repetition cache lookups in seconds
*/
const REPETITION_CACHE_TTL = 300;

/**
* Worker for handling Javascript events
*/
Expand Down Expand Up @@ -87,6 +92,11 @@ export default class GrouperWorker extends Worker {
*/
private cacheCleanupInterval: NodeJS.Timeout | null = null;

/**
* Cache for compiled RegExp patterns to avoid repeated compilation
*/
private regexpCache = new Map<string, RegExp>();

/**
* Start consuming messages
*/
Expand Down Expand Up @@ -131,6 +141,35 @@ export default class GrouperWorker extends Worker {
await this.redis.close();
}

/**
* Override clearCache to also clear memoization caches and RegExp cache
* This prevents memory leaks from decorator-based LRU caches
*/
public clearCache(): void {
super.clearCache();

/**
* Clear RegExp cache
*/
this.regexpCache.clear();
Comment on lines +152 to +154
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In clearCache(), for (const key in this) iterates enumerable properties including any enumerable properties on the prototype chain. Using Object.keys(this) / Object.getOwnPropertyNames(this) would avoid unexpected prototype enumeration and make the cache-clearing logic more predictable.

Copilot uses AI. Check for mistakes.

/**
* Clear memoization caches from decorators
* These are stored as properties on the instance
*/
const memoizeCachePrefix = 'memoizeCache:';
Copy link
Member

@neSpecc neSpecc Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems hardcoded, its better to import it from memoize.ts.

Anyway, I don't see any reason to clear memoization cache manually:

  • it has limited length (with LRU algo)
  • it has ttl


Object.keys(this).forEach(key => {
if (key.startsWith(memoizeCachePrefix)) {
const cache = this[key] as any;

if (cache && typeof cache.reset === 'function') {
cache.reset();
}
}
});
}

/**
* Task handling function
*
Expand All @@ -156,7 +195,7 @@ export default class GrouperWorker extends Worker {
const similarEvent = await this.findSimilarEvent(task.projectId, task.payload.title);

if (similarEvent) {
this.logger.info(`similar event: ${JSON.stringify(similarEvent)}`);
this.logger.info(`similar event found: groupHash=${similarEvent.groupHash}, totalCount=${similarEvent.totalCount}`);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets also print projectId and event payload.title.


/**
* Override group hash with found event's group hash
Expand Down Expand Up @@ -386,7 +425,7 @@ export default class GrouperWorker extends Worker {
try {
const originalEvent = await this.findFirstEventByPattern(matchingPattern.pattern, projectId);

this.logger.info(`original event for pattern: ${JSON.stringify(originalEvent)}`);
this.logger.info(`original event for pattern found: groupHash=${originalEvent?.groupHash || 'none'}`);

if (originalEvent) {
return originalEvent;
Expand Down Expand Up @@ -416,7 +455,12 @@ export default class GrouperWorker extends Worker {
}

return patterns.filter(pattern => {
const patternRegExp = new RegExp(pattern.pattern);
let patternRegExp = this.regexpCache.get(pattern.pattern);

if (!patternRegExp) {
patternRegExp = new RegExp(pattern.pattern);
this.regexpCache.set(pattern.pattern, patternRegExp);
}

return title.match(patternRegExp);
}).pop() || null;
Expand All @@ -428,6 +472,7 @@ export default class GrouperWorker extends Worker {
* @param projectId - id of the project to find related event patterns
* @returns {ProjectEventGroupingPatternsDBScheme[]} EventPatterns object with projectId and list of patterns
*/
@memoize({ max: 100, ttl: MEMOIZATION_TTL, strategy: 'hash' })
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding @memoize to getProjectPatterns means grouping-pattern updates in the accounts DB (or in tests where the mock changes) won’t be observed until the memoize cache expires or clearCache runs. This changes behavior for “dynamic pattern addition” scenarios and can lead to incorrect grouping for minutes after a pattern update. Consider removing memoization here, using a much shorter TTL, or incorporating a pattern-version/updatedAt value into the cache key so changes invalidate immediately.

Suggested change
@memoize({ max: 100, ttl: MEMOIZATION_TTL, strategy: 'hash' })

Copilot uses AI. Check for mistakes.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any reason to add @memoize to getProjectPatterns() since it is called only inside findSimilarEvent() which is memoized itself.

private async getProjectPatterns(projectId: string): Promise<ProjectEventGroupingPatternsDBScheme[]> {
const project = await this.accountsDb.getConnection()
.collection('projects')
Expand Down Expand Up @@ -478,11 +523,14 @@ export default class GrouperWorker extends Worker {
const repetitionCacheKey = `repetitions:${task.projectId}:${existedEvent.groupHash}:${eventUser.id}`;
const repetition = await this.cache.get(repetitionCacheKey, async () => {
return this.eventsDb.getConnection().collection(`repetitions:${task.projectId}`)
.findOne({
groupHash: existedEvent.groupHash,
'payload.user.id': eventUser.id,
});
});
.findOne(
{
groupHash: existedEvent.groupHash,
'payload.user.id': eventUser.id,
},
{ projection: { _id: 1 } }
);
}, REPETITION_CACHE_TTL);

if (repetition) {
shouldIncrementRepetitionAffectedUsers = false;
Expand Down Expand Up @@ -512,15 +560,18 @@ export default class GrouperWorker extends Worker {
const repetitionDailyCacheKey = `repetitions:${task.projectId}:${existedEvent.groupHash}:${eventUser.id}:${eventMidnight}`;
const repetitionDaily = await this.cache.get(repetitionDailyCacheKey, async () => {
return this.eventsDb.getConnection().collection(`repetitions:${task.projectId}`)
.findOne({
groupHash: existedEvent.groupHash,
'payload.user.id': eventUser.id,
timestamp: {
$gte: eventMidnight,
$lt: eventNextMidnight,
.findOne(
{
groupHash: existedEvent.groupHash,
'payload.user.id': eventUser.id,
timestamp: {
$gte: eventMidnight,
$lt: eventNextMidnight,
},
},
});
});
{ projection: { _id: 1 } }
);
}, REPETITION_CACHE_TTL);

/**
* If daily repetition exists, don't increment daily affected users
Expand Down Expand Up @@ -575,7 +626,7 @@ export default class GrouperWorker extends Worker {
* @returns {string} cache key for event
*/
private async getEventCacheKey(projectId: string, groupHash: string): Promise<string> {
return `${projectId}:${JSON.stringify({ groupHash })}`;
return `event:${projectId}:${groupHash}`;
}

/**
Expand Down
Loading