-
Notifications
You must be signed in to change notification settings - Fork 0
/
address.py
153 lines (148 loc) · 5.33 KB
/
address.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import pymysql as mdb
con = mdb.connect('localhost','root','','bitcoin')
cur = con.cursor()
class Address(str):
def outgoingTxs(self):
command = 'SELECT head.transactionHash FROM outputs tail, outputs head '
command += 'WHERE tail.address = "%s" ' % self
command += 'AND tail.spendHash = head.transactionHash'
cur.execute(command)
return [Transaction(tx[0]) for tx in cur.fetchall()]
def incomingTxs(self):
command = 'SELECT * FROM outputs '
command += 'WHERE address = "%s"' % self
cur.execute(command)
inputs = [Coins(entry) for entry in cur.fetchall()]
return inputs
def getOutputs(self, height):
command = 'SELECT b.* '
command += 'FROM outputs a, outputs b '
command += 'WHERE a.address = "%s" ' % self
command += 'AND a.spendHash = b.transactionHash '
command += 'AND b.blockNumber = %i' % block_number
cur.execute(command)
return [Coins(entry) for entry in cur.fetchall()]
def nextTxs(self,block_number):
command = 'SELECT MIN(b.blockNumber) FROM outputs a, outputs b '
command += 'WHERE a.address = "%s" ' % self
command += 'AND a.spendHash = b.transactionHash '
command += 'AND b.blockNumber >= %i' % block_number
cur.execute(command)
try:
next_block = int(cur.fetchone()[0])
except TypeError:
next_block = 1e9 # this is not a long term solution.
command = 'SELECT DISTINCT b.transactionHash, b.blockNumber FROM outputs a, outputs b '
command += 'WHERE a.address = "%s" ' % self
command += 'AND a.spendHash = b.transactionHash '
command += 'AND b.blockNumber = %i' % next_block
cur.execute(command)
return [(Transaction(row[0]),row[1]) for row in cur.fetchall()]
def value(self,block_number):
command = 'SELECT SUM(value) '
command += 'FROM outputs WHERE address = "%s" ' % self
command += 'AND blockNumber <= %i' % block_number
cur.execute(command)
try:
total_in = int(cur.fetchone()[0])
except TypeError:
total_in = 0
command = 'SELECT SUM(b.value) '
command += 'FROM outputs a, outputs b '
command += 'WHERE a.address = "%s" ' % self
command += 'AND a.spendHash = b.transactionHash '
command += 'AND b.blockNumber <= %i' %block_number
cur.execute(command)
try:
total_out = int(cur.fetchone()[0])
except TypeError:
total_out = 0
return total_in - total_out
def getTxs(self,block_number = -1):
command = 'SELECT transactionHash FROM outputs '
command += 'WHERE address = "%s"' % self
cur.execute()
return [Transaction(tx_string) for tx_string in cur.fetchall()]
class Transaction(str):
def inputAddresses(self):
command = 'SELECT address, value FROM outputs WHERE spendHash = "%s"'
command = command % self
cur.execute(command)
return [(Address(row[0]), row[1]) for row in cur.fetchall()]
def inputValue(self):
command = 'SELECT SUM(value) FROM outputs WHERE spendHash = "%s"'
command = command % self
cur.execute(command)
try:
return cur.fetchone()[0]
except TypeError:
return 0
def outputAddresses(self):
command = 'SELECT address, value FROM outputs WHERE transactionHash = "%s"'
command = command % self
cur.execute(command)
return [(Address(row[0]), row[1]) for row in cur.fetchall()]
def outputValue(self):
command = 'SELECT SUM(value) FROM outputs WHERE transactionHash = "%s"'
command = command % self
cur.execute(command)
try:
return int(cur.fetchone()[0])
except TypeError:
return 0
def outputs(self):
command = 'SELECT * FROM outputs WHERE transactionHash = "%s"'
command = command % self
cur.execute(command)
return [Coins(entry) for entry in cur.fetchall()]
def height(self):
command = 'SELECT blockNumber FROM outputs '
command += 'WHERE transactionHash = "%s" LIMIT 1' % self
cur.execute(command)
try:
return int(cur.fetchone()[0])
except TypeError:
return None
class Coins:
def __init__(self,row):
self.transaction = Transaction(row[0])
self.output_index = row[1]
self.value = row[2]
self.height = int(row[3])
self.spend_transaction = Transaction(row[4])
self.address = Address(row[5])
def __hash__(self):
return hash((self.output_index,self.transaction))
def __eq__(self,other):
return hash(self) == hash(other)
def nextOutputs(self):
command = 'SELECT * FROM outputs '
command += 'WHERE transactionHash = "%s" ' % self.spend_transaction
cur.execute(command)
return [Coins(row) for row in cur.fetchall()]
def previousInputs(self):
command = 'SELECT * FROM outputs '
command += 'WHERE spendHash = "%s" ' % self.transaction
cur.execute(command)
return [Coins(row) for row in cur.fetchall()]
def __str__(self):
string = 'height: %s ' % self.height
string += 'contamination: %e ' % self.contamination
string += 'taint: %.2f' % (self.contamination / self.value)
return string
def taint(self):
return self.contamination / self.value
if __name__ == '__main__':
pizza_address = Address('17SkEw2md5avVNyYgj6RiXuQKNwkXaxFyQ')
assert pizza_address.value(57042) == 0
assert pizza_address.value(57043) == int(1e12)
assert pizza_address.value(57044) == 0
assert len(pizza_address.getInputs()) == 1
assert len(pizza_address.getOutputs()) == 2
assert len(pizza_address.nextTxs(57041)) == 1
assert len(pizza_address.nextTxs(57042)) == 1
assert len(pizza_address.nextTxs(57043)) == 1
assert len(pizza_address.nextTxs(57044)) == 1
assert len(pizza_address.nextTxs(57045)) == 0
inout_address = Address('1PrwYMffhu1XJyVXs6Ba67NidGZVjbGHCq')
assert len(inout_address.nextTxs(64683)) == 1