Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/remote/query-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export class QueryLoader extends EventEmitter<QueryLoaderEvents> {
private stopped = false;
private readonly interval: number;
private readonly maxErrors: number;
private timer?: ReturnType<typeof setTimeout>;

constructor(
private readonly sourceManager: ConnectionManager,
Expand Down Expand Up @@ -62,14 +63,19 @@ export class QueryLoader extends EventEmitter<QueryLoaderEvents> {
*/
stop() {
this.stopped = true;
if (this.timer !== undefined) {
clearTimeout(this.timer);
this.timer = undefined;
}
}

private scheduleNextPoll() {
if (this.stopped) {
return;
}

setTimeout(() => {
this.timer = setTimeout(() => {
this.timer = undefined;
if (this.stopped) {
return;
}
Expand Down
6 changes: 5 additions & 1 deletion src/remote/query-optimizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export class QueryOptimizer extends EventEmitter<EventMap> {

constructor(
private readonly manager: ConnectionManager,
private readonly connectable: Connectable,
private connectable: Connectable,
config?: {
maxRetries?: number;
queryTimeoutMs?: number;
Expand Down Expand Up @@ -136,6 +136,10 @@ export class QueryOptimizer extends EventEmitter<EventMap> {
this.target = { optimizer, statistics };
}

updateConnectable(connectable: Connectable) {
this.connectable = connectable;
}

stop() {
this.semaphore = new Sema(QueryOptimizer.MAX_CONCURRENCY);
this.queries.clear();
Expand Down
17 changes: 5 additions & 12 deletions src/remote/remote-controller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import { Pool } from "pg";

import { RemoteController } from "./remote-controller.ts";
import { ConnectionManager } from "../sync/connection-manager.ts";
import { RemoteSyncRequest } from "./remote.dto.ts";

test("controller syncs correctly", async () => {
const [sourceDb, targetDb] = await Promise.all([
Expand Down Expand Up @@ -36,14 +35,12 @@ test("controller syncs correctly", async () => {
const remote = new RemoteController(innerRemote);

try {
const syncResult = await remote.onFullSync(
RemoteSyncRequest.encode({ db: source }),
);
const syncResult = await remote.onFullSync(source);

expect(syncResult.status).toEqual(200);

const pool = new Pool({
connectionString: target.withDatabaseName(Remote.optimizingDbName).toString(),
connectionString: innerRemote.optimizingDb.toString(),
});
const tablesAfter =
await pool.query("select tablename from pg_tables where schemaname = 'public'");
Expand Down Expand Up @@ -89,13 +86,11 @@ test("creating an index via endpoint adds it to the optimizing db", async () =>

try {
// First sync the database
const syncResult = await remote.onFullSync(
RemoteSyncRequest.encode({ db: source }),
);
const syncResult = await remote.onFullSync(source);
expect(syncResult.status).toEqual(200);

const pool = new Pool({
connectionString: target.withDatabaseName(Remote.optimizingDbName).toString(),
connectionString: innerRemote.optimizingDb.toString(),
});

// Verify no indexes exist initially
Expand Down Expand Up @@ -152,9 +147,7 @@ test("controller returns extension error when pg_stat_statements is not installe
const remote = new RemoteController(innerRemote);

try {
const syncResult = await remote.onFullSync(
RemoteSyncRequest.encode({ db: source }),
);
const syncResult = await remote.onFullSync(source);

expect(syncResult.status).toEqual(200);

Expand Down
23 changes: 15 additions & 8 deletions src/remote/remote-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
} from "./remote-controller.dto.ts";
import { ZodError } from "zod";
import { ExportedStats, Statistics } from "@query-doctor/core";
import type { Connectable } from "../sync/connectable.ts";

const SyncStatus = {
NOT_STARTED: "notStarted",
Expand All @@ -35,6 +36,7 @@ export class RemoteController {
private socket?: WebSocket;
private syncResponse?: Awaited<ReturnType<Remote["syncFrom"]>>;
private syncStatus: SyncStatus = SyncStatus.NOT_STARTED;
private lastSourceDb?: Connectable;

constructor(
private readonly remote: Remote,
Expand Down Expand Up @@ -84,7 +86,7 @@ export class RemoteController {

// TODO: type return (Site#2402)
async getStatus(): Promise<unknown> {
if (!this.syncResponse || this.syncStatus !== SyncStatus.COMPLETED) {
if (!this.syncResponse) {
return { status: this.syncStatus };
}
const { schema, meta } = this.syncResponse;
Expand Down Expand Up @@ -117,13 +119,8 @@ export class RemoteController {
} as const;
}

async onFullSync(rawBody: string): Promise<HandlerResult> {
const body = RemoteSyncRequest.safeDecode(rawBody);
if (!body.success) {
return { status: 400, body: body.error };
}

const { db } = body.data;
async onFullSync(db: Connectable): Promise<HandlerResult> {
this.lastSourceDb = db;
try {
this.syncStatus = SyncStatus.IN_PROGRESS;
this.syncResponse = await this.remote.syncFrom(db, {
Expand Down Expand Up @@ -158,6 +155,16 @@ export class RemoteController {
}
}

async redump(): Promise<HandlerResult> {
if (!this.lastSourceDb) {
return {
status: 400,
body: { type: "error", error: "no_source_db", message: "No source database has been synced yet" },
};
}
return this.onFullSync(this.lastSourceDb);
}

async onImportStats(body: unknown): Promise<HandlerResult> {
let stats: ExportedStats[];
try {
Expand Down
11 changes: 3 additions & 8 deletions src/remote/remote.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { Pool } from "pg";
import { ConnectionManager } from "../sync/connection-manager.ts";
import { normalizeQuery } from "./test-utils.ts";

import { PgIdentifier } from "@query-doctor/core";
import { type Op } from "jsondiffpatch/formatters/jsonpatch";

const TEST_TARGET_CONTAINER_NAME = "postgres:17";
Expand Down Expand Up @@ -94,7 +93,7 @@ test("syncs correctly", async () => {
expect(indexNames).toEqual(expect.arrayContaining(["testing_1234"]));

const pool = new Pool({
connectionString: target.withDatabaseName(Remote.optimizingDbName).toString(),
connectionString: remote.optimizingDb.toString(),
});

const indexesAfter =
Expand Down Expand Up @@ -203,10 +202,8 @@ test("raw timescaledb syncs correctly", async () => {
try {
await using remote = new Remote(targetConn, manager);

const t = manager.getOrCreateConnection(
targetConn.withDatabaseName(PgIdentifier.fromString("optimizing_db")),
);
await remote.syncFrom(sourceConn);
const t = manager.getOrCreateConnection(remote.optimizingDb);
const indexesAfter = await t.exec(
"select indexname from pg_indexes where schemaname = 'public'",
);
Expand Down Expand Up @@ -349,10 +346,8 @@ test("timescaledb with continuous aggregates sync correctly", async () => {
try {
await using remote = new Remote(targetConn, manager);

const t = manager.getOrCreateConnection(
targetConn.withDatabaseName(PgIdentifier.fromString("optimizing_db")),
);
await remote.syncFrom(sourceConn);
const t = manager.getOrCreateConnection(remote.optimizingDb);
const queries = remote.optimizer.getQueries();
const queryStrings = queries.map((q) => normalizeQuery(q.query));

Expand Down
45 changes: 29 additions & 16 deletions src/remote/remote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ type RemoteEvents = {
*/
export class Remote extends EventEmitter<RemoteEvents> {
static readonly baseDbName = PgIdentifier.fromString("postgres");
static readonly optimizingDbName = PgIdentifier.fromString(
"optimizing_db",
);
private static readonly optimizingDbPrefix = "optimizing_db";
static defaultOptimizingDbPrefix = PgIdentifier.fromString(Remote.optimizingDbPrefix)

/* Threshold that we determine is "too few rows" for Postgres to start using indexes
* and not defaulting to table scan.
*/
Expand All @@ -55,8 +55,13 @@ export class Remote extends EventEmitter<RemoteEvents> {
* destroyed and re-created on each successful sync along with the db itself
*/
private baseDbURL: Connectable;
/** The URL of the optimizing db */
private readonly optimizingDbUDRL: Connectable;
private generation = 0;
/** The URL of the current generation optimizing db */
private optimizingDbUDRL: Connectable;

get optimizingDb(): Connectable {
return this.optimizingDbUDRL;
}

private isPolling = false;
private queryLoader?: QueryLoader;
Expand All @@ -74,7 +79,9 @@ export class Remote extends EventEmitter<RemoteEvents> {
) {
super();
this.baseDbURL = targetURL.withDatabaseName(Remote.baseDbName);
this.optimizingDbUDRL = targetURL.withDatabaseName(Remote.optimizingDbName);
this.optimizingDbUDRL = targetURL.withDatabaseName(
Remote.defaultOptimizingDbPrefix,
);
this.optimizer = new QueryOptimizer(manager, this.optimizingDbUDRL);
}

Expand Down Expand Up @@ -210,21 +217,27 @@ export class Remote extends EventEmitter<RemoteEvents> {
}
}

/**
* Drops and recreates the {@link Remote.optimizingDbName} db.
*
* TODO: allow juggling multiple databases in the future
*/
private generationDbName(generation: number): PgIdentifier {
return generation === 0
? Remote.defaultOptimizingDbPrefix
: PgIdentifier.fromString(`${Remote.optimizingDbPrefix}_${generation}`);
}

private async resetDatabase(): Promise<void> {
const databaseName = Remote.optimizingDbName;
log.info(`Resetting internal database: ${databaseName}`, "remote");
const prevGeneration = this.generation;
const nextGeneration = prevGeneration + 1;
const nextDbName = this.generationDbName(nextGeneration);
log.info(`Creating new generation database: ${nextDbName}`, "remote");
const baseDb = this.manager.getOrCreateConnection(this.baseDbURL);
await baseDb.exec(`create database ${nextDbName};`);
const prevDbName = this.generationDbName(prevGeneration);
this.generation = nextGeneration;
this.optimizingDbUDRL = this.optimizingDbUDRL.withDatabaseName(nextDbName);
this.optimizer.updateConnectable(this.optimizingDbUDRL);
// these cannot be run in the same `exec` block as that implicitly creates transactions
await baseDb.exec(
// drop database does not allow parameterization
`drop database if exists ${databaseName} with (force);`,
`drop database if exists ${prevDbName} with (force);`,
);
await baseDb.exec(`create database ${databaseName};`);
}

private async pipeSchema(
Expand Down
17 changes: 13 additions & 4 deletions src/server/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { SyncResult } from "../sync/syncer.ts";
import * as errors from "../sync/errors.ts";
import { RemoteController } from "../remote/remote-controller.ts";
import { Connectable } from "../sync/connectable.ts";
import { RemoteSyncRequest } from "../remote/remote.dto.ts";
import { ConnectionManager } from "../sync/connection-manager.ts";
import { Remote } from "../remote/remote.ts";

Expand Down Expand Up @@ -175,9 +176,11 @@ export async function createServer(
if (!sourceDb) {
fastify.post("/postgres", async (request, reply) => {
log.info(`[POST] /postgres`, "http");
const result = await remoteController.onFullSync(
JSON.stringify(request.body),
);
const body = RemoteSyncRequest.safeDecode(JSON.stringify(request.body));
if (!body.success) {
return reply.status(400).send(body.error);
}
const result = await remoteController.onFullSync(body.data.db);
return reply.status(result.status).send(result.body);
});
}
Expand Down Expand Up @@ -223,13 +226,19 @@ export async function createServer(
const result = await remoteController.onImportStats(request.body);
return reply.status(result.status).send(result.body);
});

fastify.post("/postgres/dump", async (request, reply) => {
log.info(`[POST] /postgres/dump`, "http");
const result = await remoteController.redump();
return reply.status(result.status).send(result.body);
});
}

await fastify.listen({ host: hostname, port });

if (remoteController && sourceDb) {
log.info(`SOURCE_DATABASE_URL set, triggering initial sync`, "http");
remoteController.onFullSync(JSON.stringify({ db: sourceDb.toString() })).then((result) => {
remoteController.onFullSync(sourceDb).then((result) => {
if (result.status >= 400) {
log.error(`Initial sync failed: ${JSON.stringify(result.body)}`, "http");
process.exit(1);
Expand Down
2 changes: 2 additions & 0 deletions src/sync/pg-connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ ORDER BY
-- excluding this makes sure we can use analyzer
-- in multi-tenant environments
and query != '<insufficient privilege>'
and query not ilike 'explain%'
-- and pg_user.usename not in (/* supabase */ 'supabase_admin', 'supabase_auth_admin', /* neon */ 'cloud_admin'); -- @qd_introspection
`); // we're excluding `pg_stat_statements` from the results since it's almost certainly unrelated

Expand All @@ -536,6 +537,7 @@ ORDER BY
FROM ${source.schema}.pg_stat_monitor
WHERE query not like '%pg_stat_monitor%'
and query not like '%@qd_introspection%'
and query not ilike 'explain%'
`); // we're excluding `pg_stat_monitor` from the results since it's almost certainly unrelated

return await this.segmentedQueryCache.sync(
Expand Down
Loading