Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
## Unreleased

### Added

- Added missing block detection and recovery in the indexer.[#488](https://github.com/proto-kit/framework/pull/488)
- `@dependencyFactory` for static dependency factory type safety
- Added Mempool sorting [#395](https://github.com/proto-kit/framework/pull/395)
- Introduced dynamic block building and JIT transaction fetching [#394](https://github.com/proto-kit/framework/pull/394)
Expand Down
148 changes: 93 additions & 55 deletions packages/indexer/src/IndexerNotifier.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
BlockTriggerBase,
BlockStorage,
Sequencer,
sequencerModule,
SequencerModule,
Expand All @@ -9,10 +10,10 @@ import {
PrivateMempool,
SettlementModule,
} from "@proto-kit/sequencer";
import { log } from "@proto-kit/common";
import { filterNonUndefined, log } from "@proto-kit/common";
import { inject } from "tsyringe";

import { IndexBlockTask } from "./tasks/IndexBlockTask";
import { IndexBlockTask, IndexBlockResult } from "./tasks/IndexBlockTask";
import { IndexPendingTxTask } from "./tasks/IndexPendingTxTask";
import { IndexSettlementTask } from "./tasks/IndexSettlementTask";
import { IndexBatchTask } from "./tasks/IndexBatchTask";
Expand All @@ -30,6 +31,8 @@ export class IndexerNotifier extends SequencerModule<Record<never, never>> {
public sequencer: Sequencer<NotifierMandatorySequencerModules>,
@inject("TaskQueue")
public taskQueue: TaskQueue,
@inject("BlockStorage")
private readonly blockStorage: BlockStorage,
public indexBlockTask: IndexBlockTask,
public indexPendingTxTask: IndexPendingTxTask,
public indexBatchTask: IndexBatchTask,
Expand All @@ -39,6 +42,65 @@ export class IndexerNotifier extends SequencerModule<Record<never, never>> {
super();
}

private async pushTask(
queueName: string,
name: string,
payload: string
): Promise<void> {
const queue = await this.taskQueue.getQueue(queueName);
await queue.addTask({
name,
payload,
flowId: "",
sequencerId: this.sequencerIdProvider.getSequencerId(),
});
}

private async handleIndexBlockTaskCompleted(
payload: TaskPayload
): Promise<void> {
if (payload.name !== this.indexBlockTask.name) {
return;
}

try {
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const result = JSON.parse(payload.payload) as IndexBlockResult;

if (
result.status !== "missing-blocks" ||
result.missingHeights.length === 0
) {
return;
}

const blocks = await Promise.all(
result.missingHeights.map((h) =>
this.blockStorage.getBlockWithResultAt(h)
)
);

const filteredBlocks = blocks.filter(filterNonUndefined);

if (filteredBlocks.length === 0) {
log.warn("No blocks found to re-send");
return;
}

const serialized = await this.indexBlockTask
.inputSerializer()
.toJSON(filteredBlocks);

await this.pushTask(
this.indexBlockTask.name,
this.indexBlockTask.name,
serialized
);
} catch (error) {
Comment thread
rpanic marked this conversation as resolved.
log.error("Failed to handle block task completion result", error);
}
}

public async propagateEventsAsTasks() {
const queue = await this.taskQueue.getQueue(this.indexBlockTask.name);
const inputSerializer = this.indexBlockTask.inputSerializer();
Expand All @@ -47,86 +109,62 @@ export class IndexerNotifier extends SequencerModule<Record<never, never>> {
const settlementInputSerializer =
this.indexSettlementTask.inputSerializer();

await queue.onCompleted(
async (payload) => await this.handleIndexBlockTaskCompleted(payload)
);

this.sequencer.events.on("block-metadata-produced", async (block) => {
log.debug(
"Notifiying the indexer about block",
block.block.height.toBigInt()
);
const payload = await inputSerializer.toJSON(block);
const sequencerId = this.sequencerIdProvider.getSequencerId();

const task: TaskPayload = {
name: this.indexBlockTask.name,
payload,
flowId: "", // empty for now
sequencerId,
};

await queue.addTask(task);
const payload = await inputSerializer.toJSON([block]);
await this.pushTask(
this.indexBlockTask.name,
this.indexBlockTask.name,
payload
);
});

this.sequencer.events.on("mempool-transaction-added", async (tx) => {
try {
const txQueue = await this.taskQueue.getQueue(
this.indexPendingTxTask.name
);
const payload = await txInputSerializer.toJSON(tx);
const sequencerId = this.sequencerIdProvider.getSequencerId();

const task: TaskPayload = {
name: this.indexPendingTxTask.name,
payload,
flowId: "",
sequencerId,
};

await txQueue.addTask(task);
await this.pushTask(
this.indexPendingTxTask.name,
this.indexPendingTxTask.name,
payload
);
} catch (err) {
log.error("Failed to add pending-tx task", err);
}
});

this.sequencer.events.on("batch-produced", async (batch) => {
log.debug("Notifiying the indexer about batch", batch?.height);
try {
const batchQueue = await this.taskQueue.getQueue(
this.indexBatchTask.name
);

const payload = await batchInputSerializer.toJSON(batch);
const sequencerId = this.sequencerIdProvider.getSequencerId();

const task: TaskPayload = {
name: this.indexBatchTask.name,
payload,
flowId: "",
sequencerId,
};

await batchQueue.addTask(task);
await this.pushTask(
this.indexBatchTask.name,
this.indexBatchTask.name,
payload
);
} catch (err) {
log.error(`Failed to index batch ${batch?.height} ${err}`);
}
});

this.sequencer.events.on("settlement-submitted", async (settlement) => {
log.debug(
"Notifiying the indexer about settlement",
"Notifying the indexer about settlement",
settlement.transactionHash
);
try {
const settlementQueue = await this.taskQueue.getQueue(
this.indexSettlementTask.name
);

const payload = await settlementInputSerializer.toJSON(settlement);
const sequencerId = this.sequencerIdProvider.getSequencerId();

const task: TaskPayload = {
name: this.indexSettlementTask.name,
payload,
flowId: "",
sequencerId,
};

await settlementQueue.addTask(task);
await this.pushTask(
this.indexSettlementTask.name,
this.indexSettlementTask.name,
payload
);
} catch (err) {
log.error(
`Failed to add index settlement: ${settlement.transactionHash} ${err}`
Expand Down
81 changes: 65 additions & 16 deletions packages/indexer/src/tasks/IndexBlockTask.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
Block,
BlockQueue,
BlockStorage,
task,
Task,
TaskSerializer,
Expand All @@ -15,18 +16,27 @@ import {
IndexBlockTaskParametersSerializer,
} from "./IndexBlockTaskParameters";

export type IndexBlockResult =
| { status: "ok" }
| {
status: "missing-blocks";
missingHeights: number[];
};

@injectable()
@task()
export class IndexBlockTask
extends TaskWorkerModule
implements Task<IndexBlockTaskParameters, string | void>
implements Task<IndexBlockTaskParameters[], IndexBlockResult>
{
public name = "index-block";

public constructor(
public taskSerializer: IndexBlockTaskParametersSerializer,
@inject("BlockQueue")
public blockStorage: BlockQueue,
@inject("BlockStorage")
private readonly blockRepository: BlockStorage,
@inject("TransactionStorage")
public transactionStorage: TransactionStorage
) {
Expand Down Expand Up @@ -63,29 +73,68 @@ export class IndexBlockTask
}

public async compute(
input: IndexBlockTaskParameters
): Promise<string | void> {
input: IndexBlockTaskParameters[]
): Promise<IndexBlockResult> {
// We have two scenarios here:
// - In normal indexing, we only receive a single block.
// - If we receive multiple blocks, it means we’re indexing missing blocks
// that were generated using Array.from()
// Therefore, the incoming input array will always be in-order
const firstBlockHeight = Number(input[0].block.height.toBigInt());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here - do we have the gurantee that those are in-order?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have two scenarios here:

  • In normal indexing, we only receive a single block.
  • If we receive multiple blocks, it means we’re indexing missing blocks that were generated using Array.from()


try {
await this.syncTransactions(input.block);
await this.blockStorage.pushBlock(input.block);
await this.blockStorage.pushResult(input.result);
const currentHeight = await this.blockRepository.getCurrentBlockHeight();

// We rely on the block storage to enforce some sort of internal consistency
// i.e. it throws an error when we try to insert a block whose parent isn't
// stored yet. Therefore, we can rely on the height indicating that all
// previous blocks are existent - so we only check for that here
if (firstBlockHeight > currentHeight) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, I'd like a few comments here on why this works (internal consistency given by the DB) bcs that fact is not immediately apparent

const missingHeights = Array.from(
{ length: firstBlockHeight - currentHeight + 1 },
(_, i) => currentHeight + i
);
return { status: "missing-blocks", missingHeights };
}

for (const blockWithResult of input) {
const height = Number(blockWithResult.block.height.toBigInt());
// eslint-disable-next-line no-await-in-loop
await this.syncTransactions(blockWithResult.block);
// eslint-disable-next-line no-await-in-loop
await this.blockStorage.pushBlock(blockWithResult.block);
// eslint-disable-next-line no-await-in-loop
await this.blockStorage.pushResult(blockWithResult.result);
log.info(`Block ${height} indexed successfully`);
}

return { status: "ok" };
} catch (error) {
log.error("Failed to index block", input.block.height.toBigInt(), error);
return undefined;
log.error("Failed to index block", firstBlockHeight, error);
return { status: "ok" };
}

log.info(`Block ${input.block.height.toBigInt()} indexed successfully`);
return "";
}

public inputSerializer(): TaskSerializer<IndexBlockTaskParameters> {
return this.taskSerializer;
public inputSerializer(): TaskSerializer<IndexBlockTaskParameters[]> {
return {
toJSON: (blocks: IndexBlockTaskParameters[]): string =>
JSON.stringify(blocks.map((b) => this.taskSerializer.toJSON(b))),

fromJSON: (json: string): IndexBlockTaskParameters[] => {
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const items = JSON.parse(json) as string[];
return items.map((item) => this.taskSerializer.fromJSON(item));
},
};
}

public resultSerializer(): TaskSerializer<string | void> {
public resultSerializer(): TaskSerializer<IndexBlockResult> {
return {
fromJSON: async () => {},
toJSON: async () => "",
toJSON: async (input: IndexBlockResult) => JSON.stringify(input),

fromJSON: async (json: string) =>
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
JSON.parse(json) as IndexBlockResult,
};
}
}
2 changes: 1 addition & 1 deletion packages/indexer/test/IndexBlockTask.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ describe("IndexBlockTask", () => {
const queue = await taskQueue.getQueue(indexBlockTask.name);
const block = BlockWithResult.createEmpty();

const payload = await indexBlockTask.inputSerializer().toJSON(block);
const payload = await indexBlockTask.inputSerializer().toJSON([block]);

const task: TaskPayload = {
name: indexBlockTask.name,
Expand Down
10 changes: 10 additions & 0 deletions packages/persistance/src/services/prisma/PrismaBlockStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ export class PrismaBlockStorage implements BlockQueue, BlockStorage {
return (await this.getBlockByQuery({ height }))?.block;
}

public async getBlockWithResultAt(
height: number
): Promise<BlockWithResult | undefined> {
const data = await this.getBlockByQuery({ height });
if (data === undefined || data.result === undefined) {
return undefined;
}
return { block: data.block, result: data.result };
}

public async getBlock(hash: string): Promise<Block | undefined> {
return (await this.getBlockByQuery({ hash }))?.block;
}
Expand Down
11 changes: 11 additions & 0 deletions packages/sequencer/src/storage/inmemory/InMemoryBlockStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ export class InMemoryBlockStorage implements BlockStorage, BlockQueue {
return this.blocks.at(height);
}

public async getBlockWithResultAt(
height: number
): Promise<BlockWithResult | undefined> {
const block = this.blocks.at(height);
const result = this.results.at(height);
if (block === undefined || result === undefined) {
return undefined;
}
return { block, result };
}

public async getCurrentBlockHeight(): Promise<number> {
return this.blocks.length;
}
Expand Down
3 changes: 3 additions & 0 deletions packages/sequencer/src/storage/repositories/BlockStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,8 @@ export interface BlockStorage {
pushBlock: (block: Block) => Promise<void>;

getBlockAt: (height: number) => Promise<Block | undefined>;
getBlockWithResultAt: (
height: number
) => Promise<BlockWithResult | undefined>;
getBlock: (hash: string) => Promise<Block | undefined>;
}
Loading