-
Notifications
You must be signed in to change notification settings - Fork 8
/
bulkIngest.py
411 lines (364 loc) · 18.5 KB
/
bulkIngest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
#!/usr/bin/env python3
import sys
import argparse
import os
import sqlite3
import datetime
import hashlib
import threading
import requests
import boto3
import dropbox # type: ignore
import boxsdk as box # type: ignore
from typing import Callable, Tuple, Union, Optional, Dict, Any
from pathlib import Path
from brightcove.CMS import CMS
from brightcove.OAuth import OAuth
from brightcove.DynamicIngest import DynamicIngest
from brightcove.utils import eprint
from brightcove.utils import load_account_info
# allowed success response codes
success_responses = [200,201,202,203,204]
cms:CMS
di:DynamicIngest
class IngestHistory:
def __init__(self, db_name):
self.db_name = db_name
self.__db_conn = None
try:
self.__db_conn = self.CreateConnection(db_name)
self.CreateTable()
except sqlite3.Error as e:
raise(e)
# create a hash
@staticmethod
def CreateHash(salt, value):
raw_bytes = b"%r"%(str(salt)+str(value))
return hashlib.sha256(raw_bytes).hexdigest()
# create a database connection to a SQLite database
@staticmethod
def CreateConnection(db_file):
try:
conn = sqlite3.connect(db_file)
return conn
except sqlite3.Error as e:
raise e
# close database connection
def CloseConnection(self):
if self.__db_conn:
self.__db_conn.close()
# commit database updates
def Commit(self):
if self.__db_conn:
self.__db_conn.commit()
# commit changes and clsoe database
def CommitAndCloseConnection(self):
if self.__db_conn:
self.Commit()
self.CloseConnection()
# create the history table
def CreateTable(self):
sql_create_table = """ CREATE TABLE IF NOT EXISTS ingest_history (
id integer PRIMARY KEY,
ingest_hash text NOT NULL,
ingest_date text,
account_id text,
video_id text,
request_id text,
remote_path text
); """
try:
c = self.__db_conn.cursor()
c.execute(sql_create_table)
return True
except:
return False
# delete all data in history table
def ResetTable(self):
cur = self.__db_conn.cursor()
try:
cur.execute('DELETE FROM ingest_history')
return True
except:
return False
# add an ingest history entry to database
def AddIngestHistory(self, account_id, video_id, request_id, remote_url, hash_value=None):
hash_value = hash_value or self.CreateHash(account_id, remote_url)
history = (hash_value, str(datetime.datetime.now()), account_id, video_id, request_id, remote_url)
sql = 'INSERT INTO ingest_history(ingest_hash,ingest_date,account_id,video_id,request_id,remote_path) VALUES(?,?,?,?,?,?)'
cur = self.__db_conn.cursor()
try:
cur.execute(sql, history)
self.Commit()
return cur.lastrowid
except:
return 0
# find a hash in the database
def FindHashInIngestHistory(self, hash_value):
cur = self.__db_conn.cursor()
cur.execute('SELECT * FROM ingest_history WHERE ingest_hash=?', (hash_value,))
rows = cur.fetchall()
return rows[0] if rows else None
# find a hash in the database
def ListIngestHistory(self):
cur = self.__db_conn.cursor()
cur.execute('SELECT * FROM ingest_history')
rows = cur.fetchall()
row_list =[['id','ingest_hash','ingest_date','account_id','video_id','request_id','remote_path']]
for row in rows:
row_list.append(list(row))
return row_list
class ProgressPercentage(object):
def __init__(self, filename):
self._filename = filename
self._size = int(os.path.getsize(filename))
self._seen_so_far = 0
self._lock = threading.Lock()
def __call__(self, bytes_amount):
# To simplify, assume this is hooked up to a single filename
with self._lock:
self._seen_so_far += bytes_amount
percentage = (self._seen_so_far / self._size) * 100
sys.stdout.write("\rProgress: %s / %s (%.2f%%)\r" % (self._seen_so_far, self._size, percentage))
sys.stdout.flush()
#
def ingest_video(account_id, video_id, source_url, priority_queue, callbacks):
response = di.SubmitIngest(account_id=account_id, video_id=video_id, source_url=source_url, priority_queue=priority_queue, callbacks=callbacks)
if response.status_code in success_responses:
request_id = response.json().get('id')
print(f'Ingest Call ({priority_queue}) result for video ID {video_id}: {response.json()}')
return request_id
elif response.status_code==429:
pass # implement retries for when the queue is maxed out
else:
eprint(f'Ingest Call ({priority_queue}) failed for video ID {video_id}: {response.status_code}')
eprint(response.text)
return None
def create_and_ingest(account_id, filename, source_url, priority, callbacks):
video = cms.CreateVideo(account_id=account_id, video_title=filename)
if video.status_code in success_responses:
video_id = video.json().get('id')
request_id = ingest_video(account_id=account_id, video_id=video_id, source_url=source_url, priority_queue=priority, callbacks=callbacks)
if request_id:
return video_id, request_id
else:
eprint(f'Create Video failed: {video.status_code}')
return None, None
def is_video(filename):
# file extensions to check
extensions_to_check = ('.m4p', '.m4v', '.avi', '.wmv', '.mov', '.mkv', '.webm', '.mpg', '.mp2', '.mpeg', '.mpe', '.mpv', '.mp4', '.qt', '.flv')
# check if the filename ends with any extensions from the list
return filename.lower().endswith(extensions_to_check)
#===========================================
# main program starts here
#===========================================
def main(db_history:IngestHistory):
# disable certificate warnings
requests.urllib3.disable_warnings() # type: ignore
# init the argument parsing
parser = argparse.ArgumentParser(prog=sys.argv[0])
parser.add_argument('--priority', metavar='<ingest priority>', type=str, default='normal', help='Set ingest queue priority (low, medium, high)')
parser.add_argument('--s3bucket', metavar='<S3 bucket name>', type=str, help='Name of the S3 bucket to ingest from')
parser.add_argument('--s3profile', metavar='<AWS profile name>', type=str, help='Name of the AWS profile to use if other than default', default='default')
parser.add_argument('--dbxfolder', metavar='<Dropbox folder>', type=str, help='Name of the Dropbox folder to ingest from')
parser.add_argument('--dbxtoken', metavar='<Dropbox API token>', type=str, help='Token for Dropbox API access')
parser.add_argument('--boxfolder', metavar='<Box folder>', type=str, help='Name of the Box folder to ingest from')
parser.add_argument('--boxtokens', metavar='<Box API token>', type=str, help='Tokens for Box API access')
parser.add_argument('--folder', metavar='<path to folder>', type=str, help='Name and path of local folder to ingest from (use / or \\\\)')
parser.add_argument('--file', metavar='<path to file>', type=str, help='Name and path of local file to ingest from (use / or \\\\)')
parser.add_argument('--callback', metavar='<callbackURL>', type=str, help='URL for ingest callbacks')
parser.add_argument('--account', metavar='<account ID>', type=str, help='Brightcove Account ID to ingest videos into')
parser.add_argument('--profile', metavar='<ingest profile name>', type=str, help='Brightcove ingest profile name to use to ingest videos')
parser.add_argument('--config', metavar='<path to config file>', type=str, help='Name and path of account config information file')
parser.add_argument('--dbreset', action='store_true', help='Resets and clears the ingest history database')
parser.add_argument('--dbignore', action='store_true', help='Ignores the ingest history database (no delta ingest and no record keeping)')
parser.add_argument('--history', action='store_true', help='Displays the ingest history')
# parse the args
args = parser.parse_args()
if args.dbreset:
print('Resetting ingest history database.')
db_history.ResetTable()
if args.history:
for row in db_history.ListIngestHistory():
print(*row, sep=', ')
return
# get S3 info if available
s3_bucket_name = args.s3bucket
s3_profile_name = args.s3profile
# get Dropbox info if available
dbx_folder = args.dbxfolder
dbx_token = args.dbxtoken
# get Box info if available
box_folder = args.boxfolder
box_tokens = args.boxtokens
# get local folder if available
local_folder = args.folder
callback = [args.callback] if args.callback else []
# error out if we have neither S3 nor Dropbox info
if not any([s3_bucket_name, dbx_folder, box_folder, local_folder, args.file]):
eprint('Error: no S3 bucket, Dropbox folder, local folder, file or tokens specified.\n')
return
# get ingest priority
if args.priority in ['low', 'normal', 'high']:
ingest_priority = args.priority
else:
eprint('Error: invalid ingest queue priority specified.\n')
return
try:
account_id, client_id, client_secret, _ = load_account_info(args.config)
except Exception as e:
print(e)
return
account_id = args.account or account_id
ingest_profile = args.profile
global cms
global di
# create the OAuth token from the account config info file
oauth = OAuth(account_id=account_id,client_id=client_id, client_secret=client_secret)
cms = CMS(oauth)
di = DynamicIngest(oauth=oauth, ingest_profile=ingest_profile, priority_queue=ingest_priority)
# this needs moving outside, but for now I'm whatever about it
def ingest_single_file(file_path:str):
file_name = os.path.basename(file_path)
video = cms.CreateVideo(account_id=account_id, video_title=file_name)
if video.status_code in success_responses:
video_id=video.json().get('id')
hash_value = db_history.CreateHash(account_id, file_path)
ingest_record = db_history.FindHashInIngestHistory(hash_value)
if ingest_record is None or args.dbignore:
print(f'Uploading file "{file_path}" to temporary S3 bucket.')
upload_url = di.UploadFile(account_id=account_id, video_id=video_id, file_name=file_path,callback=ProgressPercentage(file_path))
if upload_url:
request_id = ingest_video(account_id=account_id, video_id=video_id, source_url=upload_url['api_request_url'], priority_queue=ingest_priority, callbacks=callback)
if request_id and not args.dbignore:
db_history.AddIngestHistory(account_id=account_id, video_id=video_id, request_id=request_id, remote_url=file_path)
else:
eprint(f'Error: failed to upload "{file_path}" to temporary S3 bucket.')
else:
print(f'Already ingested on {ingest_record[2]}')
#===========================================
# do a single file ingest
#===========================================
if args.file:
ingest_single_file(args.file)
#===========================================
# do the S3 bulk ingest
#===========================================
if s3_bucket_name:
# Let's use Amazon S3
try:
boto3.Session(profile_name=s3_profile_name) # type: ignore
except:
print(f'Error: no AWS credentials found for profile "{s3_profile_name}"')
else:
try:
s3 = boto3.resource('s3')
bucket = s3.Bucket(s3_bucket_name).objects.all()
filenames = [obj.key for obj in bucket if is_video(obj.key)]
except Exception as e:
eprint(f'Error accessing bucket "{s3_bucket_name}" for profile "{s3_profile_name}: {e}"\n')
else:
for filename in filenames:
s3_url = f's3://{s3_bucket_name}.s3.amazonaws.com/'+((filename).replace(' ', '%20'))
hash_value = db_history.CreateHash(account_id, s3_url)
ingest_record = db_history.FindHashInIngestHistory(hash_value)
if ingest_record is None or args.dbignore:
print(f'Ingesting: "{s3_url}"')
video_id, request_id = create_and_ingest(account_id, filename, s3_url, ingest_priority, callbacks=callback)
if request_id and not args.dbignore:
db_history.AddIngestHistory(account_id=account_id, video_id=video_id, request_id=request_id, remote_url=s3_url)
else:
print(f'Already ingested on {ingest_record[2]}: "{s3_url}"')
#===========================================
# do the Dropbox bulk ingest
#===========================================
if dbx_folder:
try:
dbx = dropbox.Dropbox(dbx_token)
except:
eprint('Error: invalid Dropbox API token.')
else:
try:
dbx_folder = f'/{dbx_folder}'
filenames = [entry.name for entry in dbx.files_list_folder(path=dbx_folder, include_non_downloadable_files=False).entries if is_video(entry.name)]
except:
eprint(f'Error: folder "{dbx_folder}" not found in Dropbox.\n')
else:
for filename in filenames:
dbx_path = f'{dbx_folder}/{filename}'
source_url = str(dbx.sharing_create_shared_link(path=dbx_path).url).replace('?dl=0','?dl=1')
hash_value = db_history.CreateHash(account_id, source_url)
ingest_record = db_history.FindHashInIngestHistory(hash_value)
if ingest_record is None or args.dbignore:
print(f'Ingesting: "{dbx_path}"')
video_id, request_id = create_and_ingest(account_id, filename, source_url, ingest_priority, callbacks=callback)
if request_id and not args.dbignore:
db_history.AddIngestHistory(account_id=account_id, video_id=video_id, request_id=request_id, remote_url=source_url)
else:
print(f'Already ingested on {ingest_record[2]}: "{dbx_path}"')
#===========================================
# do the Box bulk ingest
#===========================================
if box_folder:
def box_get_files_in_folder(client:box.Client, folder_id:str='0') -> list:
"""
Returns a list of file names and IDs in a Box folder.
"""
items = client.folder(folder_id=folder_id).get_items()
return [ [item.name, item.id] for item in items if item.type == 'file' and is_video(item.name) ]
try:
box_client_id, box_client_secret, box_dev_token = str(box_tokens).split(sep=',', maxsplit=3)
box_oauth = box.OAuth2(client_id=box_client_id, client_secret=box_client_secret, access_token=box_dev_token)
box_client = box.Client(box_oauth)
except ValueError:
eprint('Error: unable to parse Box tokens.\n')
except:
eprint('Error: unable to use provided credentials.\n')
else:
filenames = []
if box_folder == '.':
filenames = box_get_files_in_folder(box_client)
else:
items = box_client.folder(folder_id='0').get_items()
for item in items:
if item.type == 'folder' and item.name == box_folder:
filenames=box_get_files_in_folder(box_client, item.id)
break
else:
eprint(f'Error: folder "{box_folder}" not found in Box account root.')
for filename, box_id in filenames:
box_path = f'{box_folder}/{filename}'
source_url = box_client.file(box_id).get_download_url()
hash_value = db_history.CreateHash(account_id, box_path)
ingest_record = db_history.FindHashInIngestHistory(hash_value)
if ingest_record is None or args.dbignore:
print(f'Ingesting: "{box_path}"')
video_id, request_id = create_and_ingest(account_id, filename, source_url, ingest_priority, callbacks=callback)
if request_id and not args.dbignore:
db_history.AddIngestHistory(account_id=account_id, video_id=video_id, request_id=request_id, remote_url=source_url, hash_value=hash_value)
else:
print(f'Already ingested on {ingest_record[2]}: "{box_path}"')
#===========================================
# do the local bulk ingest
#===========================================
if local_folder:
def get_list_of_files_in_dir(directory: str, file_types: str ='*') -> list:
return [str(f) for f in Path(directory).glob(file_types) if f.is_file()]
try:
file_list = get_list_of_files_in_dir(local_folder)
except:
eprint(f'Error: unable to access folder "{local_folder}"')
else:
for file_path in [file for file in file_list if is_video(file)]:
ingest_single_file(file_path)
#===========================================
# only run code if it's not imported
#===========================================
if __name__ == '__main__':
try:
db_history = IngestHistory(os.path.expanduser('~')+'/bulkingest.sqlite')
except:
eprint('Error: can not connect to ingest history database.')
else:
main(db_history)
db_history.CommitAndCloseConnection()