Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions tests/unit/cache/local.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,70 @@ describe("LocalCache", () => {

expect((testCache as any).cache.size).toBe(maxItems);
});

it("should support custom TTL override per item", async () => {
await cache.set("shortTTL", { data: "short" }, 500); // Custom short TTL
await cache.set("defaultTTL", { data: "default" }); // Uses default 5000ms TTL

const shortBefore = await cache.get("shortTTL");
expect(shortBefore).toEqual({ data: "short" });

jest.advanceTimersByTime(501);

const shortAfter = await cache.get("shortTTL");
expect(shortAfter).toBeNull();

const defaultAfter = await cache.get("defaultTTL");
expect(defaultAfter).toEqual({ data: "default" });
});

it("should delete an item", async () => {
await cache.set("key1", { data: "value1" });

const valueBefore = await cache.get("key1");
expect(valueBefore).toEqual({ data: "value1" });

await cache.delete("key1");

const valueAfter = await cache.get("key1");
expect(valueAfter).toBeNull();
});

it("should remove unlisted keys with deleteMissing", async () => {
await cache.set("keep1", { data: "keep1" });
await cache.set("keep2", { data: "keep2" });
await cache.set("remove1", { data: "remove1" });
await cache.set("remove2", { data: "remove2" });

await cache.deleteMissing(["keep1", "keep2"]);

const keep1 = await cache.get("keep1");
const keep2 = await cache.get("keep2");
const remove1 = await cache.get("remove1");
const remove2 = await cache.get("remove2");

expect(keep1).toEqual({ data: "keep1" });
expect(keep2).toEqual({ data: "keep2" });
expect(remove1).toBeNull();
expect(remove2).toBeNull();
});

it("should handle concurrent read/write safely", async () => {
const operations = [];
for (let i = 0; i < 50; i++) {
operations.push(cache.set(`concurrent${i}`, { data: `value${i}` }));
operations.push(cache.get(`concurrent${i}`));
}

await expect(Promise.all(operations)).resolves.not.toThrow();

// Verify that the last items written are consistent
for (let i = 0; i < 10; i++) {
const key = `concurrent${i}`;
const value = await cache.get(key);
if (value !== null) {
expect(value).toEqual({ data: `value${i}` });
}
}
});
});
121 changes: 121 additions & 0 deletions tests/unit/datastream/datastream-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,42 @@ describe('DataStreamClient', () => {
expect(errorSpy).toHaveBeenCalledWith(new Error('test error'));
});

test('should handle reconnection state', async () => {
const connectSpy = jest.fn();
const disconnectSpy = jest.fn();

client.on('connected', connectSpy);
client.on('disconnected', disconnectSpy);

await client.start();

// Get the event handlers registered on the WebSocket client mock
const onCalls = mockDatastreamWSClientInstance.on.mock.calls;
const connectedHandler = onCalls.find((call: [string, Function]) => call[0] === 'connected')?.[1];
const disconnectedHandler = onCalls.find((call: [string, Function]) => call[0] === 'disconnected')?.[1];

// Simulate initial connected state
mockDatastreamWSClientInstance.isConnected.mockReturnValue(true);
if (connectedHandler) connectedHandler();

expect(connectSpy).toHaveBeenCalledTimes(1);
expect(client.isConnected()).toBe(true);

// Simulate disconnect
mockDatastreamWSClientInstance.isConnected.mockReturnValue(false);
if (disconnectedHandler) disconnectedHandler();

expect(disconnectSpy).toHaveBeenCalledTimes(1);
expect(client.isConnected()).toBe(false);

// Simulate reconnection (connected event fires again)
mockDatastreamWSClientInstance.isConnected.mockReturnValue(true);
if (connectedHandler) connectedHandler();

expect(connectSpy).toHaveBeenCalledTimes(2);
expect(client.isConnected()).toBe(true);
});

test('should handle company messages and update cache', async () => {
await client.start();

Expand Down Expand Up @@ -262,6 +298,65 @@ describe('DataStreamClient', () => {
expect(retrievedFlag).toEqual(mockFlag);
});

test('should handle partial entity message merging', async () => {
await client.start();

// Get message handler
const DatastreamWSClientMock = DatastreamWSClient as jest.MockedClass<typeof DatastreamWSClient>;
const messageHandler = DatastreamWSClientMock.mock.calls[0][0].messageHandler;

// Send a FULL company message with all fields
const fullCompany = {
id: 'company-partial',
account_id: 'account-123',
environment_id: 'env-123',
keys: { name: 'Partial Corp' },
traits: [{ key: 'tier', value: 'free' }],
rules: [],
metrics: [],
plan_ids: ['plan-1'],
billing_product_ids: [],
crm_product_ids: [],
credit_balances: {},
} as unknown as Schematic.RulesengineCompany;

await messageHandler({
entity_type: EntityType.COMPANY,
message_type: MessageType.FULL,
data: fullCompany,
});

// Verify the full company is cached
const cachedFull = await client.getCompany({ name: 'Partial Corp' });
expect(cachedFull).toEqual(fullCompany);

// Send a PARTIAL company message that updates only some fields
const partialCompany = {
id: 'company-partial',
keys: { name: 'Partial Corp' },
traits: [{ key: 'tier', value: 'enterprise' }],
plan_ids: ['plan-2'],
} as unknown as Schematic.RulesengineCompany;

await messageHandler({
entity_type: EntityType.COMPANY,
message_type: MessageType.PARTIAL,
data: partialCompany,
});

// NOTE: The current implementation does not merge partial messages with
// existing cached data. Both FULL and PARTIAL message types overwrite
// the cache entirely. This test documents that behavior: after a PARTIAL
// message, only the fields present in the partial payload are retained.
const cachedAfterPartial = await client.getCompany({ name: 'Partial Corp' });
expect(cachedAfterPartial.id).toBe('company-partial');
expect((cachedAfterPartial as any).traits).toEqual([{ key: 'tier', value: 'enterprise' }]);
expect((cachedAfterPartial as any).plan_ids).toEqual(['plan-2']);
// Original fields not present in the partial message are lost (overwritten)
expect((cachedAfterPartial as any).metrics).toBeUndefined();
expect((cachedAfterPartial as any).rules).toBeUndefined();
}, 10000);

test('should request data from datastream when not in cache', async () => {
// Set up connected state
mockDatastreamWSClientInstance.isConnected.mockReturnValue(true);
Expand Down Expand Up @@ -812,6 +907,32 @@ describe('DataStreamClient', () => {
expect(bySlug).toEqual(updatedCompany);
});

test('should handle deep copy to prevent mutation of cached entities', async () => {
// Cache a company
await messageHandler({
entity_type: EntityType.COMPANY,
message_type: MessageType.FULL,
data: multiKeyCompany,
});

// Retrieve the company from cache
const firstRetrieval = await client.getCompany({ name: 'acme' });
expect(firstRetrieval).toEqual(multiKeyCompany);

// Mutate a field on the returned object
(firstRetrieval as any).traits = [{ key: 'mutated', value: 'yes' }];

// Retrieve the company again from cache
const secondRetrieval = await client.getCompany({ name: 'acme' });

// NOTE: The LocalCache returns references, not copies, so mutation of
// a retrieved object DOES affect subsequent cache reads. This test
// documents the current behavior: the cache does not deep-copy on get.
// If deep-copy-on-read were implemented, secondRetrieval.traits would
// still equal the original (empty array).
expect((secondRetrieval as any).traits).toEqual([{ key: 'mutated', value: 'yes' }]);
});

test('should update company metrics and reflect via all keys', async () => {
const companyWithMetrics = {
...multiKeyCompany,
Expand Down
Loading