-
Notifications
You must be signed in to change notification settings - Fork 14
/
ledgerInfo.js
122 lines (106 loc) · 3.88 KB
/
ledgerInfo.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
const {
PROJECT_ID,
DATASET_NAME,
LEDGER_TABLE_NAME,
} = require('./schema')
const XrplClient = require('xrpl-client').XrplClient
const BigQuery = require('@google-cloud/bigquery')
const bigquery = new BigQuery({ projectId: PROJECT_ID })
const XRPLNodeUrl = typeof process.env.NODE === 'undefined' ? 'wss://s2.ripple.com' : process.env.NODE.trim()
const StartLedger = typeof process.env.LEDGER === 'undefined' ? 32570 : parseInt(process.env.LEDGER)
console.log('Fetch XRPL Ledger Info into Google BigQuery')
const Client = new XrplClient(XRPLNodeUrl)
Client.ready().then(Connection => {
let Stopped = false
let LastLedger = 0
console.log('Connected to the XRPL')
let retryTimeout = 60 * 60 * 12
const fetchLedger = (ledger_index) => {
return new Promise((resolve, reject) => {
return Connection.send({
command: 'ledger',
ledger_index: parseInt(ledger_index),
transactions: false,
expand: false
}).then(Result => {
resolve(Result)
return
}).catch(reject)
})
}
const run = (ledger_index) => {
return fetchLedger(ledger_index).then(Result => {
console.log(`${Result.ledger_index}`)
// console.log(Result)
bigquery.dataset(DATASET_NAME).table(LEDGER_TABLE_NAME).insert([{
LedgerIndex: parseInt(Result.ledger.ledger_index),
hash: Result.ledger.hash,
CloseTime: bigquery.timestamp(new Date(Date.parse(Result.ledger.close_time_human)).toISOString().replace('T', ' ').replace(/[^0-9]+$/, '')),
CloseTimeTimestamp: Result.ledger.close_time,
CloseTimeHuman: Result.ledger.close_time_human,
TotalCoins: parseInt(Result.ledger.totalCoins),
ParentHash: Result.ledger.parent_hash,
AccountHash: Result.ledger.account_hash,
TransactionHash: Result.ledger.transaction_hash,
_InsertedAt: bigquery.timestamp(new Date()),
}])
.then(r => {
console.log(`Inserted rows`, r)
LastLedger = Result.ledger_index
// process.exit(0)
})
.catch(err => {
if (err && err.name === 'PartialFailureError') {
if (err.errors && err.errors.length > 0) {
console.log('Insert errors:')
err.errors.forEach(err => console.dir(err, { depth: null }))
process.exit(1)
}
} else {
console.error('ERROR:', err)
process.exit(1)
}
})
// retryTimeout = 0
if (Stopped) {
return
}
return run(ledger_index + 1)
}).catch(e => {
console.log(e)
process.exit(1)
// retryTimeout += 500
// if (retryTimeout > 5000) retryTimeout = 5000
console.log(`Oops... Retry in ${retryTimeout / 1000} sec.`)
setTimeout(() => {
return run(ledger_index)
}, retryTimeout * 1000)
})
}
console.log(`Starting at ledger [ ${StartLedger} ], \n Checking last ledger in BigQuery...`)
bigquery.query({
query: `SELECT
MAX(LedgerIndex) as MaxLedger
FROM
${PROJECT_ID}.${DATASET_NAME}.${LEDGER_TABLE_NAME}`,
useLegacySql: false, // Use standard SQL syntax for queries.
}).then(r => {
if (r[0][0].MaxLedger > StartLedger) {
console.log(`BigQuery History at ledger [ ${r[0][0].MaxLedger} ], > StartLedger.\n Forcing StartLedger at:\n >>> ${r[0][0].MaxLedger+1}\n\n`)
run(r[0][0].MaxLedger + 1)
} else{
run(StartLedger)
}
}).catch(e => {
console.log('Google BigQuery Error', e)
process.exit(1)
})
process.on('SIGINT', function() {
console.log(`\nGracefully shutting down from SIGINT (Ctrl+C)\n -- Wait for remaining BigQuery inserts and XRPL Connection close...`);
Stopped = true
Connection.close()
if (LastLedger > 0) {
console.log(`\nLast ledger: [ ${LastLedger} ]\n\nRun your next job with ENV: "LEDGER=${LastLedger+1}"\n\n`)
}
})
})