Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 60 additions & 6 deletions src/datastream/datastream-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { DataStreamResp, DataStreamReq, DataStreamError, EntityType, MessageType
import { RulesEngineClient } from '../rules-engine';
import { Logger } from '../logger';
import { LazyEmitter } from './emitter';
import { partialCompany, partialUser, extractIdFromData, deepCopyCompany as deepCopyCompanyFn } from './merge';

// Import cache providers from the cache module
import type { CacheProvider } from '../cache/types';
Expand Down Expand Up @@ -681,8 +682,35 @@ export class DataStreamClient extends LazyEmitter {
* handleCompanyMessage processes company-specific datastream messages
*/
private async handleCompanyMessage(message: DataStreamResp): Promise<void> {
const company = message.data as Schematic.RulesengineCompany;

let company: Schematic.RulesengineCompany;

if (message.message_type === MessageType.PARTIAL) {
const data = message.data as Record<string, unknown>;
let id: string;
try {
id = extractIdFromData(data);
} catch (error) {
this.logger.error(`Failed to extract company ID from partial message: ${error}`);
return;
}

const resourceKey = this.resourceIdCacheKey(CACHE_KEY_PREFIX_COMPANY, id);
const existing = await this.companyCacheProvider.get(resourceKey);
if (!existing) {
this.logger.warn(`Cache miss for partial company '${id}', skipping`);
return;
}

try {
company = partialCompany(existing, data);
} catch (error) {
this.logger.error(`Failed to merge partial company: ${error}`);
return;
}
} else {
company = message.data as Schematic.RulesengineCompany;
}

if (!company) {
return;
}
Expand Down Expand Up @@ -720,8 +748,35 @@ export class DataStreamClient extends LazyEmitter {
* handleUserMessage processes user-specific datastream messages
*/
private async handleUserMessage(message: DataStreamResp): Promise<void> {
const user = message.data as Schematic.RulesengineUser;

let user: Schematic.RulesengineUser;

if (message.message_type === MessageType.PARTIAL) {
const data = message.data as Record<string, unknown>;
let id: string;
try {
id = extractIdFromData(data);
} catch (error) {
this.logger.error(`Failed to extract user ID from partial message: ${error}`);
return;
}

const resourceKey = this.resourceIdCacheKey(CACHE_KEY_PREFIX_USER, id);
const existing = await this.userCacheProvider.get(resourceKey);
if (!existing) {
this.logger.warn(`Cache miss for partial user '${id}', skipping`);
return;
}

try {
user = partialUser(existing, data);
} catch (error) {
this.logger.error(`Failed to merge partial user: ${error}`);
return;
}
} else {
user = message.data as Schematic.RulesengineUser;
}

if (!user) {
return;
}
Expand Down Expand Up @@ -1226,8 +1281,7 @@ export class DataStreamClient extends LazyEmitter {
* deepCopyCompany creates a complete deep copy of a Company struct
*/
private deepCopyCompany(company: Schematic.RulesengineCompany): Schematic.RulesengineCompany {
// Use JSON parsing for a deep copy - this matches the Go implementation approach
return JSON.parse(JSON.stringify(company));
return deepCopyCompanyFn(company);
}

private async evaluateFlag(
Expand Down
196 changes: 196 additions & 0 deletions src/datastream/merge.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
import type * as Schematic from "../api/types";

/**
* Helper to read a property that may be in camelCase or snake_case form.
* Wire data from WebSocket uses snake_case; Fern-generated types use camelCase.
*/
function getProp(obj: Record<string, unknown>, camel: string, snake: string): unknown {
return obj[camel] ?? obj[snake];
}

/**
* Creates a complete deep copy of a Company object.
*/
export function deepCopyCompany(c: Schematic.RulesengineCompany): Schematic.RulesengineCompany {
return JSON.parse(JSON.stringify(c));
}

/**
* Creates a complete deep copy of a User object.
*/
export function deepCopyUser(u: Schematic.RulesengineUser): Schematic.RulesengineUser {
return JSON.parse(JSON.stringify(u));
}

/**
* Extracts the "id" field from a parsed datastream message data object.
* Throws if the id field is missing or empty.
*/
export function extractIdFromData(data: Record<string, unknown>): string {
const id = data.id as string | undefined;
if (!id) {
throw new Error("partial message missing required field: id");
}
return id;
}

/**
* Merges a partial update into an existing Company.
* Deep-copies the existing company, then applies only the fields
* present in the partial object. The "id" field must be present.
*
* Wire format uses snake_case keys. The existing company from cache
* may have either camelCase or snake_case keys depending on how it
* was stored.
*/
export function partialCompany(
existing: Schematic.RulesengineCompany,
partial: Record<string, unknown>,
): Schematic.RulesengineCompany {
if (!("id" in partial)) {
throw new Error("partial company message missing required field: id");
}

const merged = deepCopyCompany(existing) as unknown as Record<string, unknown>;

for (const key of Object.keys(partial)) {
switch (key) {
case "id":
case "account_id":
case "environment_id":
merged[key] = partial[key];
break;
case "base_plan_id":
merged[key] = partial[key] ?? null;
break;
case "billing_product_ids":
case "plan_ids":
case "plan_version_ids":
case "entitlements":
case "rules":
case "traits":
case "subscription":
merged[key] = partial[key];
break;
case "keys": {
const existingKeys = (getProp(merged, "keys", "keys") ?? {}) as Record<string, string>;
const incomingKeys = partial[key] as Record<string, string>;
merged[key] = { ...existingKeys, ...incomingKeys };
break;
}
case "credit_balances": {
const existingCB = (getProp(merged, "creditBalances", "credit_balances") ?? {}) as Record<
string,
number
>;
const incomingCB = partial[key] as Record<string, number>;
merged[key] = { ...existingCB, ...incomingCB };
break;
}
case "metrics": {
const existingMetrics = ((getProp(merged, "metrics", "metrics") as unknown[]) ??
[]) as Schematic.RulesengineCompanyMetric[];
const incomingMetrics = partial[key] as Schematic.RulesengineCompanyMetric[];
merged[key] = upsertMetrics(existingMetrics, incomingMetrics);
break;
}
// Ignore unknown keys silently
}
}

return merged as unknown as Schematic.RulesengineCompany;
}

/**
* Merges a partial update into an existing User.
* Deep-copies the existing user, then applies only the fields
* present in the partial object. The "id" field must be present.
*/
export function partialUser(
existing: Schematic.RulesengineUser,
partial: Record<string, unknown>,
): Schematic.RulesengineUser {
if (!("id" in partial)) {
throw new Error("partial user message missing required field: id");
}

const merged = deepCopyUser(existing) as unknown as Record<string, unknown>;

for (const key of Object.keys(partial)) {
switch (key) {
case "id":
case "account_id":
case "environment_id":
merged[key] = partial[key];
break;
case "keys": {
const existingKeys = (getProp(merged, "keys", "keys") ?? {}) as Record<string, string>;
const incomingKeys = partial[key] as Record<string, string>;
merged[key] = { ...existingKeys, ...incomingKeys };
break;
}
case "traits":
case "rules":
merged[key] = partial[key];
break;
// Ignore unknown keys silently
}
}

return merged as unknown as Schematic.RulesengineUser;
}

/**
* Metric key used for deduplication during upsert.
*/
interface MetricKey {
eventSubtype: string;
period: string;
monthReset: string;
}

function metricKeyString(m: MetricKey): string {
return `${m.eventSubtype}|${m.period}|${m.monthReset}`;
}

function getMetricKey(m: Record<string, unknown>): MetricKey {
return {
eventSubtype: ((m.eventSubtype ?? m.event_subtype) as string) || "",
period: ((m.period as string) || ""),
monthReset: ((m.monthReset ?? m.month_reset) as string) || "",
};
}

/**
* Merges incoming metrics into existing ones. Metrics are matched by
* (eventSubtype, period, monthReset); matches are replaced, new metrics
* are appended.
*/
function upsertMetrics(
existing: Schematic.RulesengineCompanyMetric[],
incoming: Schematic.RulesengineCompanyMetric[],
): Schematic.RulesengineCompanyMetric[] {
const result = [...existing];
const index = new Map<string, number>();

for (let i = 0; i < result.length; i++) {
const m = result[i];
if (m) {
const k = metricKeyString(getMetricKey(m as unknown as Record<string, unknown>));
index.set(k, i);
}
}

for (const m of incoming) {
if (!m) continue;
const k = metricKeyString(getMetricKey(m as unknown as Record<string, unknown>));
const idx = index.get(k);
if (idx !== undefined) {
result[idx] = m;
} else {
result.push(m);
}
}

return result;
}
70 changes: 63 additions & 7 deletions tests/unit/datastream/datastream-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -344,17 +344,73 @@ describe('DataStreamClient', () => {
data: partialCompany,
});

// NOTE: The current implementation does not merge partial messages with
// existing cached data. Both FULL and PARTIAL message types overwrite
// the cache entirely. This test documents that behavior: after a PARTIAL
// message, only the fields present in the partial payload are retained.
// Partial messages are now properly merged: fields in the partial update
// the cached entity, while fields not present in the partial are preserved.
const cachedAfterPartial = await client.getCompany({ name: 'Partial Corp' });
expect(cachedAfterPartial.id).toBe('company-partial');
expect((cachedAfterPartial as any).traits).toEqual([{ key: 'tier', value: 'enterprise' }]);
expect((cachedAfterPartial as any).plan_ids).toEqual(['plan-2']);
// Original fields not present in the partial message are lost (overwritten)
expect((cachedAfterPartial as any).metrics).toBeUndefined();
expect((cachedAfterPartial as any).rules).toBeUndefined();
// Original fields not present in the partial message are preserved
expect((cachedAfterPartial as any).metrics).toEqual([]);
expect((cachedAfterPartial as any).rules).toEqual([]);
expect((cachedAfterPartial as any).account_id).toBe('account-123');
expect((cachedAfterPartial as any).billing_product_ids).toEqual([]);
}, 10000);

test('should skip partial company message when entity is not in cache', async () => {
await client.start();

const DatastreamWSClientMock = DatastreamWSClient as jest.MockedClass<typeof DatastreamWSClient>;
const messageHandler = DatastreamWSClientMock.mock.calls[0][0].messageHandler;

// Send a PARTIAL for a company that was never cached via FULL
await messageHandler({
entity_type: EntityType.COMPANY,
message_type: MessageType.PARTIAL,
data: {
id: 'company-unknown',
keys: { name: 'Ghost Corp' },
traits: [{ key: 'tier', value: 'enterprise' }],
},
});

// Warn should be logged about the cache miss
expect(mockLogger.warn).toHaveBeenCalledWith(
expect.stringContaining("Cache miss for partial company 'company-unknown'")
);

// No company should have been cached (no cacheCompanyForKeys call for this entity)
expect(mockLogger.debug).not.toHaveBeenCalledWith(
expect.stringContaining('Ghost Corp')
);
}, 10000);

test('should skip partial user message when entity is not in cache', async () => {
await client.start();

const DatastreamWSClientMock = DatastreamWSClient as jest.MockedClass<typeof DatastreamWSClient>;
const messageHandler = DatastreamWSClientMock.mock.calls[0][0].messageHandler;

// Send a PARTIAL for a user that was never cached via FULL
await messageHandler({
entity_type: EntityType.USER,
message_type: MessageType.PARTIAL,
data: {
id: 'user-unknown',
keys: { email: 'ghost@example.com' },
traits: [{ key: 'tier', value: 'enterprise' }],
},
});

// Warn should be logged about the cache miss
expect(mockLogger.warn).toHaveBeenCalledWith(
expect.stringContaining("Cache miss for partial user 'user-unknown'")
);

// No user should have been cached
expect(mockLogger.debug).not.toHaveBeenCalledWith(
expect.stringContaining('ghost@example.com')
);
}, 10000);

test('should request data from datastream when not in cache', async () => {
Expand Down
Loading