Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configurable timeout to BigQuery query jobs #1105

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions parsons/google/google_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ def copy_from_gcs(
compression_type: str = "gzip",
new_file_extension: str = "csv",
template_table: Optional[str] = None,
max_timeout: int = 30,
**load_kwargs,
):
"""
Expand Down Expand Up @@ -421,6 +422,8 @@ def copy_from_gcs(
template_table: str
Table name to be used as the load schema. Load operation wil use the same
columns and data types as the template table.
max_timeout: int
The maximum number of seconds to wait for a request before the job fails.
**load_kwargs: kwargs
Other arguments to pass to the underlying load_table_from_uri
call on the BigQuery client.
Expand Down Expand Up @@ -463,12 +466,14 @@ def copy_from_gcs(
job_config=job_config,
compression_type=compression_type,
new_file_extension=new_file_extension,
max_timeout=max_timeout,
)
else:
return self._load_table_from_uri(
source_uris=gcs_blob_uri,
destination=table_ref,
job_config=job_config,
max_timeout=max_timeout,
**load_kwargs,
)
except exceptions.BadRequest as e:
Expand All @@ -493,6 +498,7 @@ def copy_from_gcs(
job_config=job_config,
compression_type=compression_type,
new_file_extension=new_file_extension,
max_timeout=max_timeout,
)
elif "Schema has no field" in str(e):
logger.debug(f"{gcs_blob_uri.split('/')[-1]} is empty, skipping file")
Expand All @@ -501,6 +507,10 @@ def copy_from_gcs(
else:
raise e

except exceptions.DeadlineExceeded as e:
logger.error(f"Max timeout exceeded for {gcs_blob_uri.split('/')[-1]}")
raise e

def copy_large_compressed_file_from_gcs(
self,
gcs_blob_uri: str,
Expand All @@ -519,6 +529,7 @@ def copy_large_compressed_file_from_gcs(
compression_type: str = "gzip",
new_file_extension: str = "csv",
template_table: Optional[str] = None,
max_timeout: int = 30,
**load_kwargs,
):
"""
Expand Down Expand Up @@ -577,6 +588,8 @@ def copy_large_compressed_file_from_gcs(
template_table: str
Table name to be used as the load schema. Load operation wil use the same
columns and data types as the template table.
max_timeout: int
The maximum number of seconds to wait for a request before the job fails.
**load_kwargs: kwargs
Other arguments to pass to the underlying load_table_from_uri call on the BigQuery
client.
Expand Down Expand Up @@ -621,6 +634,7 @@ def copy_large_compressed_file_from_gcs(
source_uris=uncompressed_gcs_uri,
destination=table_ref,
job_config=job_config,
max_timeout=max_timeout,
**load_kwargs,
)

Expand All @@ -647,6 +661,7 @@ def copy_s3(
tmp_gcs_bucket: Optional[str] = None,
template_table: Optional[str] = None,
job_config: Optional[LoadJobConfig] = None,
max_timeout: int = 30,
**load_kwargs,
):
"""
Expand Down Expand Up @@ -692,6 +707,8 @@ def copy_s3(
on the BigQuery client. The function will create its own if not provided. Note
if there are any conflicts between the job_config and other parameters, the
job_config values are preferred.
max_timeout: int
The maximum number of seconds to wait for a request before the job fails.

`Returns`
Parsons Table or ``None``
Expand Down Expand Up @@ -724,6 +741,7 @@ def copy_s3(
nullas=nullas,
job_config=job_config,
template_table=template_table,
max_timeout=max_timeout,
**load_kwargs,
)
finally:
Expand All @@ -745,6 +763,7 @@ def copy(
allow_jagged_rows: bool = True,
quote: Optional[str] = None,
schema: Optional[List[dict]] = None,
max_timeout: int = 30,
**load_kwargs,
):
"""
Expand Down Expand Up @@ -774,6 +793,8 @@ def copy(
template_table: str
Table name to be used as the load schema. Load operation wil use the same
columns and data types as the template table.
max_timeout: int
The maximum number of seconds to wait for a request before the job fails.
**load_kwargs: kwargs
Arguments to pass to the underlying load_table_from_uri call on the BigQuery
client.
Expand Down Expand Up @@ -823,6 +844,7 @@ def copy(
source_uris=temp_blob_uri,
destination=self.get_table_ref(table_name=table_name),
job_config=job_config,
max_timeout=max_timeout,
**load_kwargs,
)
finally:
Expand Down Expand Up @@ -1339,7 +1361,8 @@ def _validate_copy_inputs(self, if_exists: str, data_type: str):
if data_type not in ["csv", "json"]:
raise ValueError(f"Only supports csv or json files [data_type = {data_type}]")

def _load_table_from_uri(self, source_uris, destination, job_config, **load_kwargs):
def _load_table_from_uri(self, source_uris, destination, job_config, max_timeout,
**load_kwargs):
load_job = self.client.load_table_from_uri(
source_uris=source_uris,
destination=destination,
Expand All @@ -1348,7 +1371,7 @@ def _load_table_from_uri(self, source_uris, destination, job_config, **load_kwar
)

try:
load_job.result()
load_job.result(timeout=max_timeout)
return load_job
except exceptions.BadRequest as e:
for idx, error_ in enumerate(load_job.errors):
Expand Down
Loading