Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: sequential writes #87

Merged
merged 10 commits into from
Nov 18, 2024
1 change: 1 addition & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ module.exports = {
"@typescript-eslint/explicit-function-return-type": "off",
"@typescript-eslint/explicit-module-boundary-types": "off",
"@typescript-eslint/no-explicit-any": "off",
"@typescript-eslint/no-floating-promises": "error",
"@typescript-eslint/no-unused-vars": ["error", { argsIgnorePattern: "_+" }],
},
};
40 changes: 19 additions & 21 deletions apps/agent/test/e2e/utils/prophet-e2e-scaffold/eboCore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -414,29 +414,27 @@ async function addEboRequestCreatorChains(
value: parseEther("1"),
});

await Promise.all(
chainsToAdd.map(async (chainId) => {
console.log(`Adding ${chainId} to EBORequestCreator...`);

const addChainTxHash = await client.sendTransaction({
account: arbitratorAddress,
from: arbitratorAddress,
to: deployedContracts["EBORequestCreator"],
data: encodeFunctionData({
abi: parseAbi(["function addChain(string calldata _chainId)"]),
args: [chainId],
functionName: "addChain",
}),
});
for (const chainId of chainsToAdd) {
console.log(`Adding ${chainId} to EBORequestCreator...`);

await client.waitForTransactionReceipt({
hash: addChainTxHash,
confirmations: 1,
});
const addChainTxHash = await client.sendTransaction({
account: arbitratorAddress,
from: arbitratorAddress,
to: deployedContracts["EBORequestCreator"],
data: encodeFunctionData({
abi: parseAbi(["function addChain(string calldata _chainId)"]),
args: [chainId],
functionName: "addChain",
}),
});

console.log(`${chainId} added.`);
}),
);
await client.waitForTransactionReceipt({
hash: addChainTxHash,
confirmations: 1,
});

console.log(`${chainId} added.`);
}

await client.stopImpersonatingAccount({ address: arbitratorAddress });
}
126 changes: 69 additions & 57 deletions packages/automated-dispute/src/services/eboActor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ export class EboActor {
registry: this.registry,
});

this.errorHandler.handle(err);
await this.errorHandler.handle(err);
} else {
throw err;
}
Expand Down Expand Up @@ -870,28 +870,8 @@ export class EboActor {

const isValidDispute = await this.isValidDispute(request, proposedResponse);

if (isValidDispute) {
await Promise.all([
this.pledgeFor(request, dispute),
(async () => {
try {
const { chainId } = request.decodedData.requestModuleData;

await this.proposeResponse(chainId);
} catch (err) {
if (err instanceof ResponseAlreadyProposed) {
this.logger.warn(err.message);
} else {
this.logger.error(
`Could not propose a new response after response ${proposedResponse.id} disputal.`,
);

throw err;
}
}
})(),
]);
} else await this.pledgeAgainst(request, dispute);
if (isValidDispute) await this.pledgeForAndPropose(request, proposedResponse, dispute);
else await this.pledge("against", request, proposedResponse, dispute);
}

/**
Expand All @@ -916,20 +896,61 @@ export class EboActor {
}

/**
* Pledge in favor of the dispute.
* Pledge for or against a dispute.
*
* @param request the dispute's `Request`
* @param dispute the `Dispute`
* @param side for or against
* @param request the request
* @param response the response being disputed
* @param dispute the dispute being pledged
*/
private async pledgeFor(request: Request, dispute: Dispute) {
private async pledge(
side: "for" | "against",
request: Request,
response: Response,
dispute: Dispute,
) {
try {
this.logger.info(`Pledging for dispute ${dispute.id}`);

await this.protocolProvider.pledgeForDispute(request.prophetData, dispute.prophetData);
const { maxNumberOfEscalations } = request.decodedData.disputeModuleData;
const escalation = await this.protocolProvider.getEscalation(request.id);
const sidePledges =
side === "for"
? escalation.amountOfPledgesForDispute
: escalation.amountOfPledgesAgainstDispute;

if (sidePledges < maxNumberOfEscalations) {
this.logger.info(`Pledging ${side} dispute...`);
this.logger.debug(stringify({ request, response, dispute }));

if (side === "for") {
await this.protocolProvider.pledgeForDispute(
request["prophetData"],
dispute["prophetData"],
);
} else {
await this.protocolProvider.pledgeAgainstDispute(
request["prophetData"],
dispute["prophetData"],
);
}
} else {
this.logger.warn(
`Skipping pledge ${side} dispute. Max number of escalations reached`,
);
this.logger.debug(
stringify({
request,
response,
dispute,
escalation,
}),
);
}
} catch (err) {
if (err instanceof ContractFunctionRevertedError) {
const errorName = err.data?.errorName || err.name;
this.logger.warn(`Pledge for dispute ${dispute.id} reverted due to: ${errorName}`);
this.logger.warn(
`Pledge ${side} dispute ${dispute.id} reverted due to: ${errorName}`,
);

const customError = ErrorFactory.createError(errorName);
const context: ErrorContext = {
Expand All @@ -948,40 +969,31 @@ export class EboActor {
}
}
}

/**
* Pledge against the dispute.
* Pledge for a dispute and propose a new response if there are no responses proposed yet.
*
* The response proposal is skipped if the same response has already been proposed before.
*
* @param request the dispute's `Request`
* @param dispute the `Dispute`
* @param request the request
* @param response the response being disputed
* @param dispute the dispute
*/
private async pledgeAgainst(request: Request, dispute: Dispute) {
private async pledgeForAndPropose(request: Request, response: Response, dispute: Dispute) {
await this.pledge("for", request, response, dispute);

try {
this.logger.info(`Pledging against dispute ${dispute.id}`);
const { chainId } = request.decodedData.requestModuleData;

await this.protocolProvider.pledgeAgainstDispute(
request.prophetData,
dispute.prophetData,
);
await this.proposeResponse(chainId);
} catch (err) {
if (err instanceof ContractFunctionRevertedError) {
const errorName = err.data?.errorName || err.name;
this.logger.warn(
`Pledge against dispute ${dispute.id} reverted due to: ${errorName}`,
if (err instanceof ResponseAlreadyProposed) {
this.logger.warn(err.message);
} else {
this.logger.error(
`Could not propose a new response after response ${response.id} disputal.`,
);

const customError = ErrorFactory.createError(errorName);
const context: ErrorContext = {
request,
dispute,
registry: this.registry,
terminateActor: () => {
throw customError;
},
};
customError.setContext(context);

await this.errorHandler.handle(customError);
} else {
throw err;
}
}
Expand Down
18 changes: 7 additions & 11 deletions packages/automated-dispute/src/services/eboProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,15 @@ export class EboProcessor {
`Reading events for the following requests:\n${synchableRequests.join(", ")}`,
);

const synchedRequests = [...synchableRequests].map(async (requestId: RequestId) => {
for (const requestId of synchableRequests) {
try {
const events = eventsByRequestId.get(requestId) ?? [];

await this.syncRequest(requestId, events, currentEpoch.number, lastBlock);
} catch (err) {
this.onActorError(requestId, err as Error);
await this.onActorError(requestId, err as Error);
}
});

await Promise.all(synchedRequests);
}

this.logger.info(`Consumed events up to block ${lastBlock.number}.`);

Expand Down Expand Up @@ -290,7 +288,7 @@ export class EboProcessor {
await actor.onLastBlockUpdated(lastBlockTimestamp);

if (actor.canBeTerminated(currentEpoch, lastBlockTimestamp)) {
this.terminateActor(requestId);
await this.terminateActor(requestId);
}
}

Expand Down Expand Up @@ -390,7 +388,7 @@ export class EboProcessor {

await this.notifier.sendError(`Actor error for request ${requestId}`, { requestId }, error);

this.terminateActor(requestId);
await this.terminateActor(requestId);
}

/**
Expand Down Expand Up @@ -422,7 +420,7 @@ export class EboProcessor {

this.logger.info("Creating missing requests...");

const epochChainRequests = unhandledSupportedChains.map(async (chain) => {
for (const chain of unhandledSupportedChains) {
try {
this.logger.info(`Creating request for chain ${chain} and epoch ${epoch}...`);

Expand Down Expand Up @@ -452,9 +450,7 @@ export class EboProcessor {
err,
);
}
});

await Promise.all(epochChainRequests);
}

this.logger.info("Missing requests created.");
} catch (err) {
Expand Down
10 changes: 5 additions & 5 deletions packages/automated-dispute/tests/services/eboActor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ describe("EboActor", () => {
actor.enqueue(malformedEvent);
actor.enqueue(correctEvent);

expect(actor.processEvents()).resolves.not.toThrow();
await expect(actor.processEvents()).resolves.not.toThrow();
});

it("enqueues again an event if its processing throws", async () => {
Expand Down Expand Up @@ -256,16 +256,16 @@ describe("EboActor", () => {
.mockImplementationOnce(onLastEventDelay20)
.mockImplementationOnce(onLastEventDelay1);

setTimeout(() => {
setTimeout(async () => {
actor.enqueue(firstEvent);

actor.processEvents();
await actor.processEvents();
}, 5);

setTimeout(() => {
setTimeout(async () => {
actor.enqueue(secondEvent);

actor.processEvents();
await actor.processEvents();
}, 10);

await vi.advanceTimersByTimeAsync(5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ describe("EboActor", () => {

const newBlockNumber = disputeDeadline + 1n;

expect(actor.onLastBlockUpdated(newBlockNumber as UnixTimestamp)).rejects.toThrow(
await expect(actor.onLastBlockUpdated(newBlockNumber as UnixTimestamp)).rejects.toThrow(
DisputeWithoutResponse,
);
});
Expand Down
Loading