feat: implement background payment sync worker and admin trigger UI
This commit is contained in:
152
lib/sync-worker.ts
Normal file
152
lib/sync-worker.ts
Normal file
@@ -0,0 +1,152 @@
|
||||
|
||||
import { CryptoEngine } from './crypto-engine';
|
||||
import { db } from './db';
|
||||
|
||||
/**
|
||||
* Background Sync Worker
|
||||
* Checks blockchain for pending payments and completes them if detected.
|
||||
*/
|
||||
export async function syncPendingPayments() {
|
||||
console.log("[SyncWorker] Starting manual sync...");
|
||||
|
||||
// 1. Fetch pending transactions that have an intent (or check all crypto transactions)
|
||||
// We filter for transactions with statuses that indicate they are waiting for payment
|
||||
// and have a provider type that suggests they are crypto (or just check metadata.wallets)
|
||||
const result = await db.query(`
|
||||
SELECT * FROM transactions
|
||||
WHERE status IN ('waiting', 'pending')
|
||||
AND (metadata->>'wallets' IS NOT NULL)
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 20
|
||||
`);
|
||||
|
||||
const pendingTxs = result.rows;
|
||||
console.log(`[SyncWorker] Found ${pendingTxs.length} pending crypto transactions.`);
|
||||
|
||||
const results = [];
|
||||
|
||||
// 2. Fetch System Settings once
|
||||
const settingsResult = await db.query('SELECT key, value FROM system_settings WHERE key IN (\'sol_platform_address\', \'evm_platform_address\', \'tron_platform_address\', \'btc_platform_address\', \'default_fee_percent\')');
|
||||
const systemMap: Record<string, string> = {};
|
||||
settingsResult.rows.forEach(r => systemMap[r.key] = r.value);
|
||||
|
||||
const sysSettings = {
|
||||
sol: systemMap.sol_platform_address || process.env.SOL_PLATFORM_ADDRESS || "Ajr4nKieZJVu9q2d1eVF9pQPRCLoZ6v4tapB3iQn2SyQ",
|
||||
evm: systemMap.evm_platform_address || process.env.EVM_PLATFORM_ADDRESS || "0x70997970C51812dc3A010C7d01b50e0d17dc79C8",
|
||||
tron: systemMap.tron_platform_address || process.env.TRON_PLATFORM_ADDRESS || "TLYpfG6rre8Gv9m8pYjR7yvX7S9rK6G1P",
|
||||
btc: systemMap.btc_platform_address || process.env.BTC_PLATFORM_ADDRESS || "bc1qxy2kgdygjrsqtzq2n0yrf2493p83kkfJH",
|
||||
fee: parseFloat(systemMap.default_fee_percent || '1.0')
|
||||
};
|
||||
|
||||
for (const tx of pendingTxs) {
|
||||
try {
|
||||
const metadata = tx.metadata || {};
|
||||
const intentNetwork = metadata.intent_network || 'POLYGON';
|
||||
const intentToken = metadata.intent_token || 'USDT';
|
||||
|
||||
console.log(`[SyncWorker] Syncing TX ${tx.id} | Intent: ${intentNetwork}/${intentToken}`);
|
||||
|
||||
// Re-use logic from crypto-sweep but optimized for background
|
||||
const wallets = metadata.wallets || {};
|
||||
const tempWalletConfig = wallets[intentNetwork] || wallets['EVM'];
|
||||
|
||||
if (!tempWalletConfig) continue;
|
||||
|
||||
const depositAddress = typeof tempWalletConfig === 'string' ? tempWalletConfig : tempWalletConfig.address;
|
||||
const depositPrivateKey = tempWalletConfig.privateKey;
|
||||
|
||||
if (!depositPrivateKey) continue;
|
||||
|
||||
const cryptoEngine = new CryptoEngine(intentNetwork);
|
||||
|
||||
// Get expected amount logic
|
||||
let expectedCryptoAmount = tx.amount.toString();
|
||||
try {
|
||||
const coinIdMap: Record<string, string> = {
|
||||
'SOL': 'solana', 'USDC': 'usd-coin', 'USDT': 'tether', 'TRX': 'tron', 'BTC': 'bitcoin'
|
||||
};
|
||||
const coinId = coinIdMap[intentToken] || 'solana';
|
||||
const priceUrl = `https://api.coingecko.com/api/v3/simple/price?ids=${coinId}&vs_currencies=usd,try`;
|
||||
const priceRes = await fetch(priceUrl);
|
||||
const priceData = await priceRes.json();
|
||||
const currencyKey = (tx.currency || 'TRY').toLowerCase();
|
||||
const priceInCurrency = priceData[coinId][currencyKey] || priceData[coinId]['usd'];
|
||||
|
||||
if (priceInCurrency) {
|
||||
const rawExpected = parseFloat(tx.amount) / priceInCurrency;
|
||||
expectedCryptoAmount = (rawExpected * 0.98).toFixed(6);
|
||||
}
|
||||
} catch (e) {}
|
||||
|
||||
// Verify
|
||||
const verification = await cryptoEngine.verifyPayment(depositAddress, expectedCryptoAmount, intentToken);
|
||||
|
||||
if (verification.success) {
|
||||
console.log(`[SyncWorker] Payment DETECTED for TX ${tx.id}. Sweeping...`);
|
||||
|
||||
let platformAddress = sysSettings.evm;
|
||||
if (intentNetwork === 'SOLANA') platformAddress = sysSettings.sol;
|
||||
else if (intentNetwork === 'TRON') platformAddress = sysSettings.tron;
|
||||
else if (intentNetwork === 'BITCOIN') platformAddress = sysSettings.btc;
|
||||
|
||||
// Sweep
|
||||
const sweepResult = await cryptoEngine.sweepFunds(depositPrivateKey, platformAddress, intentToken);
|
||||
|
||||
if (sweepResult.success) {
|
||||
// Update Balances & Transaction (Same as crypto-sweep logic)
|
||||
const merchantResult = await db.query('SELECT * FROM merchants WHERE id = $1', [tx.merchant_id]);
|
||||
const merchant = merchantResult.rows[0];
|
||||
const feePercent = merchant?.fee_percent !== undefined && merchant?.fee_percent !== null ? parseFloat(merchant.fee_percent) : sysSettings.fee;
|
||||
|
||||
const grossAmount = parseFloat(tx.amount);
|
||||
const feeAmount = (grossAmount * feePercent) / 100;
|
||||
const merchantNetCredit = grossAmount - feeAmount;
|
||||
|
||||
const cryptoAmount = parseFloat(expectedCryptoAmount);
|
||||
const cryptoFee = (cryptoAmount * feePercent) / 100;
|
||||
const cryptoNetCredit = cryptoAmount - cryptoFee;
|
||||
|
||||
// Execute DB updates in transaction if possible, or sequential
|
||||
await db.query(`UPDATE merchants SET available_balance = available_balance + $1 WHERE id = $2`, [merchantNetCredit, tx.merchant_id]);
|
||||
|
||||
await db.query(`
|
||||
INSERT INTO merchant_balances (merchant_id, network, token, balance, total_gross)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
ON CONFLICT (merchant_id, network, token)
|
||||
DO UPDATE SET
|
||||
balance = merchant_balances.balance + $4,
|
||||
total_gross = merchant_balances.total_gross + $5
|
||||
`, [tx.merchant_id, intentNetwork, intentToken, cryptoNetCredit, cryptoAmount]);
|
||||
|
||||
await db.query(`
|
||||
UPDATE transactions
|
||||
SET status = 'succeeded', paid_network = $2, paid_token = $3, paid_amount_crypto = $4
|
||||
WHERE id = $1`, [tx.id, intentNetwork, intentToken, expectedCryptoAmount]);
|
||||
|
||||
// Webhook
|
||||
if (tx.callback_url) {
|
||||
fetch(tx.callback_url, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({
|
||||
status: 'success',
|
||||
txId: tx.stripe_pi_id,
|
||||
orderRef: tx.source_ref_id,
|
||||
hashes: { txHash: sweepResult.txHash }
|
||||
})
|
||||
}).catch(() => {});
|
||||
}
|
||||
|
||||
results.push({ id: tx.id, status: 'synced', txHash: sweepResult.txHash });
|
||||
}
|
||||
} else {
|
||||
results.push({ id: tx.id, status: 'no_payment' });
|
||||
}
|
||||
} catch (err: any) {
|
||||
console.error(`[SyncWorker] Error processing TX ${tx.id}:`, err.message);
|
||||
results.push({ id: tx.id, status: 'error', error: err.message });
|
||||
}
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
Reference in New Issue
Block a user