Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,23 @@ URL を deal() で受け取り:
- スクレイピングはインプロセス(`@d-zero/beholder`)で実行。各 URL ごとにブラウザを起動・終了
- `push()` で発見した新 URL を動的にキューに追加
- `onPush` コールバックで `withoutHashAndAuth` による重複排除
- `signal` オプションで `AbortSignal` を渡し、中断時に新規ワーカーの起動を停止

### クロール中断メカニズム

```
CLI シグナルハンドラ(SIGINT / SIGHUP 等)
→ CrawlerOrchestrator.abort()
→ Crawler.abort()
→ AbortController.abort()
→ deal() の signal オプション経由で新規ワーカー起動を停止
→ 実行中のワーカーは正常完了まで継続
→ 全ワーカー完了後 deal() が resolve → crawlEnd イベント emit
```

- `Crawler` は内部に `AbortController` を保持し、`signal` getter で `AbortSignal` を公開
- `CrawlerOrchestrator` のコンストラクタで `archive` の `error` イベントを監視し、アーカイブエラー発生時にも `Crawler.abort()` を呼び出す
- CLI の `killed()` ハンドラでは `abort()` 後に `garbageCollect()`(ゾンビ Chromium プロセスの終了)→ `process.exit()` を実行

### 主要定数

Expand Down
8 changes: 5 additions & 3 deletions packages/@nitpicker/cli/src/commands/crawl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,10 @@ type LogType = 'verbose' | 'normal' | 'silent';
/**
* Sets up signal handlers for graceful shutdown and starts event logging.
*
* Registers SIGINT/SIGBREAK/SIGHUP/SIGABRT handlers that kill zombie
* Chromium processes before exiting, then delegates to {@link eventAssignments}
* for progress output.
* Registers SIGINT/SIGBREAK/SIGHUP/SIGABRT handlers that abort the
* crawl via {@link CrawlerOrchestrator.abort}, then kill zombie Chromium
* processes and exit. The abort signal propagates through the dealer's
* AbortSignal mechanism so no new workers are launched.
* @param trigger - Display label for the crawl (URL or stub file path)
* @param orchestrator - The initialized CrawlerOrchestrator instance
* @param config - The resolved archive configuration
Expand All @@ -155,6 +156,7 @@ function run(
logType: LogType,
) {
const killed = () => {
orchestrator.abort();
orchestrator.garbageCollect();
process.exit();
};
Expand Down
10 changes: 5 additions & 5 deletions packages/@nitpicker/crawler/src/crawler-orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,14 @@ export class CrawlerOrchestrator extends EventEmitter<CrawlEvent> {
}

/**
* Abort the current crawl and archive operations.
* Abort the current crawl operation.
*
* Delegates to the archive's abort method, which stops all in-progress
* database writes and cleans up temporary resources.
* @returns The result of the archive abort operation.
* Delegates to the crawler's AbortController so that the dealer stops
* launching new workers. Currently running workers will finish, after
* which `deal()` resolves and `crawlEnd` is emitted normally.
*/
abort() {
return this.#archive.abort();
this.#crawler.abort();
}

/**
Expand Down
69 changes: 69 additions & 0 deletions packages/@nitpicker/crawler/src/crawler/crawler.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,75 @@ describe('Crawler', () => {
});
});

describe('abort()', () => {
it('abort() 後に deal() の signal オプションに渡された AbortSignal が aborted になる', async () => {
const { deal } = await import('@d-zero/dealer');
const { default: Crawler } = await import('./crawler.js');

let receivedSignal: AbortSignal | undefined;

vi.mocked(deal).mockImplementation((_items, _factory, options) => {
receivedSignal = options?.signal;
return Promise.resolve();
});

const crawler = new Crawler(defaultOptions);
crawler.start(parseUrl('https://example.com/')!);

await vi.waitFor(() => {
expect(receivedSignal).toBeDefined();
});

expect(receivedSignal!.aborted).toBe(false);
crawler.abort();
expect(receivedSignal!.aborted).toBe(true);
});

it('deal 正常完了時に crawlEnd イベントが emit される', async () => {
const { deal } = await import('@d-zero/dealer');
const { default: Crawler } = await import('./crawler.js');

vi.mocked(deal).mockImplementation((_items, _factory, options) => {
// Simulate: abort is called, deal checks signal and resolves normally
expect(options?.signal).toBeInstanceOf(AbortSignal);
return Promise.resolve();
});

const crawler = new Crawler(defaultOptions);
let crawlEndEmitted = false;
crawler.on('crawlEnd', () => {
crawlEndEmitted = true;
});

crawler.start(parseUrl('https://example.com/')!);

await vi.waitFor(() => {
expect(crawlEndEmitted).toBe(true);
});
});

it('二重 abort でもエラーにならない', async () => {
const { deal } = await import('@d-zero/dealer');
const { default: Crawler } = await import('./crawler.js');

vi.mocked(deal).mockResolvedValue();

const crawler = new Crawler(defaultOptions);
crawler.start(parseUrl('https://example.com/')!);

crawler.abort();
expect(() => crawler.abort()).not.toThrow();
expect(crawler.signal.aborted).toBe(true);
});

it('signal getter が AbortSignal を返す', async () => {
const { default: Crawler } = await import('./crawler.js');
const crawler = new Crawler(defaultOptions);
expect(crawler.signal).toBeInstanceOf(AbortSignal);
expect(crawler.signal.aborted).toBe(false);
});
});

describe('worker-level error handling', () => {
it('ワーカー内の例外が error イベントとして emit され処理が継続する', async () => {
const { deal } = await import('@d-zero/dealer');
Expand Down
25 changes: 18 additions & 7 deletions packages/@nitpicker/crawler/src/crawler/crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ export type { CrawlerOptions } from './types.js';
* configurable parallelism up to {@link Crawler.MAX_PROCESS_LENGTH}.
*/
export default class Crawler extends EventEmitter<CrawlerEventTypes> {
/** Flag set by `abort()` to signal in-progress tasks to exit early. */
#aborted = false;
/** Controller used to cancel the deal-based crawl via its AbortSignal. */
readonly #abortController = new AbortController();
/** Tracks discovered URLs, their scrape status, and deduplication. */
readonly #linkList = new LinkList();
/** Merged crawler configuration (user overrides + defaults). */
Expand All @@ -69,6 +69,16 @@ export default class Crawler extends EventEmitter<CrawlerEventTypes> {
/** Maps hostnames to their scope URLs. Defines the crawl boundary for internal/external classification. */
readonly #scope = new Map<string /* hostname */, ExURL[]>();

/**
* The AbortSignal associated with this crawler's AbortController.
*
* Passed to `deal()` so that it stops launching new workers after abort.
* Also available to the orchestrator for forwarding to other subsystems.
*/
get signal(): AbortSignal {
return this.#abortController.signal;
}

/**
* Create a new Crawler instance.
* @param options - Configuration options for crawling behavior. All fields have
Expand Down Expand Up @@ -113,12 +123,13 @@ export default class Crawler extends EventEmitter<CrawlerEventTypes> {
/**
* Abort the current crawl operation.
*
* Sets the aborted flag and immediately emits a `crawlEnd` event.
* In-progress scrape tasks will check the flag and exit early.
* Signals the AbortController so that the dealer stops launching new
* workers. Currently running workers will finish, after which `deal()`
* resolves and `crawlEnd` is emitted by the normal completion path in
* {@link #runDeal}.
*/
abort() {
this.#aborted = true;
void this.emit('crawlEnd', {});
this.#abortController.abort();
}

/**
Expand Down Expand Up @@ -525,7 +536,6 @@ export default class Crawler extends EventEmitter<CrawlerEventTypes> {
this.#linkList.progress(url);

return async () => {
if (this.#aborted) return;
const log = createTimedUpdate(update, this.#options.verbose);

try {
Expand Down Expand Up @@ -617,6 +627,7 @@ export default class Crawler extends EventEmitter<CrawlerEventTypes> {
limit: concurrency,
interval: this.#options.interval,
verbose: this.#options.verbose || !process.stdout.isTTY,
signal: this.#abortController.signal,
header: (_progress, done, total, limit) => {
return formatCrawlProgress({
done,
Expand Down
Loading