Skip to content

Commit 6af6cc4

Browse files
authored
feat!: Add Spark BigQuery connector options (#1027)
1 parent 8839d15 commit 6af6cc4

File tree

4 files changed

+190
-23
lines changed

4 files changed

+190
-23
lines changed

python/dataproc_templates/elasticsearch/README.md

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,8 @@ This template has been tested with the following versions of the above mentioned
363363
- `es.bq.input.api.key`: API Key for Elasticsearch Authorization
364364
- `es.bq.output.dataset`: BigQuery dataset id (format: Dataset_id)
365365
- `es.bq.output.table`: BigQuery table name (format: Table_name)
366-
- `es.bq.temp.bucket.name`: Temporary bucket for the Spark BigQuery connector
366+
- `es.bq.output.temporarygcsbucket`: The GCS bucket that temporarily holds the data before it is loaded to BigQuery
367+
- `es.bq.output.persistentgcsbucket`: The GCS bucket that holds the data before it is loaded to BigQuery. If informed, the data won't be deleted after write data into BigQuery.
367368

368369
#### Optional Arguments
369370

@@ -414,9 +415,26 @@ This template has been tested with the following versions of the above mentioned
414415
- `es.bq.flatten.struct.fields`: Flatten the struct fields
415416
- `es.bq.flatten.array.fields`: Flatten the n-D array fields to 1-D array fields, it needs es.bq.flatten.struct.fields option to be passed
416417
- `es.bq.output.mode`: Output write mode (one of: append,overwrite,ignore,errorifexists) (Defaults to append)
418+
- `es.bq.output.bigquerytablelabel`: Used to add labels to the table while writing to a table. Multiple labels can be set.
419+
- `es.bq.output.createdisposition`: Specifies whether the job is allowed to create new tables.
420+
- `es.bq.output.persistentgcspath`: The GCS path that holds the data before it is loaded to BigQuery. Used only with es.bq.output.persistentgcsbucket
421+
- `es.bq.output.datepartition`: The date partition the data is going to be written to.
422+
- `es.bq.output.partitionfield`: If this field is specified, the table is partitioned by this field.
423+
- `es.bq.output.partitionexpirationms`: Number of milliseconds for which to keep the storage for partitions in the table.
424+
- `es.bq.output.partitiontype`: Used to specify Time partitioning. Supported types are: HOUR, DAY, MONTH, YEAR. This option is mandatory for a target table to be Time partitioned. Defaults to DAY if es.bq.output.partitionfield is specified
425+
- `es.bq.output.partitionrangestart`: Used to specify Integer-range partitioning. This option is mandatory for a target table to be Integer-range partitioned. Pass es.bq.output.partitionrangeend and es.bq.output.partitionrangeinterval along with this option.
426+
- `es.bq.output.partitionrangeend`: Used to specify Integer-range partitioning. This option is mandatory for a target table to be Integer-range partitioned. Pass es.bq.output.partitionrangestart and es.bq.output.partitionrangeinterval along with this option.
427+
- `es.bq.output.partitionrangeinterval`: Used to specify Integer-range partitioning. This option is mandatory for a target table to be Integer-range partitioned. Pass es.bq.output.partitionrangestart and es.bq.output.partitionrangeend along with this option.
428+
- `es.bq.output.clusteredfields`: A string of non-repeated, top level columns seperated by comma.
429+
- `es.bq.output.allowfieldaddition`: Adds the ALLOW_FIELD_ADDITION SchemaUpdateOption to the BigQuery LoadJob. Allowed values are true and false. Default to false
430+
- `es.bq.output.allowfieldrelaxation`: Adds the ALLOW_FIELD_RELAXATION SchemaUpdateOption to the BigQuery LoadJob. Allowed values are true and false.
431+
- `es.bq.output.bignumericdefaultprecision`: An alternative default precision for BigNumeric fields, as the BigQuery default is too wide for Spark. Values can be between 1 and 38.
432+
- `es.bq.output.bignumericdefaultscale`: An alternative default scale for BigNumeric fields. Values can be between 0 and 38, and less than bigNumericFieldsPrecision. This default is used only when the field has an unparameterized BigNumeric type.
417433

418434
**Note:** Make sure that either ```es.bq.input.api.key``` or both ```es.bq.input.user``` and ```es.bq.input.password``` is provided. Setting or not setting all three properties at the same time will throw an error.
419435

436+
Pass either ```es.bq.output.temporarygcsbucket``` or ```es.bq.output.persistentgcsbucket```.
437+
420438
## Usage
421439

422440
```
@@ -429,7 +447,8 @@ usage: main.py [-h]
429447
--es.bq.input.api.key ES.BQ.INPUT.API.KEY
430448
--es.bq.output.dataset ES.BQ.OUTPUT.DATASET
431449
--es.bq.output.table ES.BQ.OUTPUT.TABLE
432-
--es.bq.temp.bucket.name ES.BQ.TEMP.BUCKET.NAME
450+
--es.bq.output.temporarygcsbucket ES.BQ.OUTPUT.TEMPORARYGCSBUCKET
451+
--es.bq.output.persistentgcsbucket ES.BQ.OUTPUT.PERSISTENTGCSBUCKET
433452
--es.bq.output.mode {overwrite,append,ignore,errorifexists}
434453
[--es.bq.input.es.nodes.path.prefix ES.BQ.INPUT.ES.NODES.PATH.PREFIX]
435454
[--es.bq.input.es.query ES.BQ.INPUT.ES.QUERY]
@@ -477,6 +496,21 @@ usage: main.py [-h]
477496
[--es.bq.input.es.net.proxy.socks.use.system.props ES.BQ.INPUT.ES.NET.PROXY.SOCKS.USE.SYSTEM.PROPS]
478497
[--es.bq.flatten.struct.fields]
479498
[--es.bq.flatten.array.fields]
499+
[--es.bq.output.bigquerytablelabel ES.BQ.OUTPUT.BIGQUERYTABLELABEL]
500+
[--es.bq.output.createdisposition ES.BQ.OUTPUT.CREATEDISPOSITION]
501+
[--es.bq.output.persistentgcspath ES.BQ.OUTPUT.PERSISTENTGCSPATH]
502+
[--es.bq.output.datepartition ES.BQ.OUTPUT.DATEPARTITION]
503+
[--es.bq.output.partitionfield ES.BQ.OUTPUT.PARTITIONFIELD]
504+
[--es.bq.output.partitionexpirationms ES.BQ.OUTPUT.PARTITIONEXPIRATIONMS]
505+
[--es.bq.output.partitiontype ES.BQ.OUTPUT.PARTITIONTYPE]
506+
[--es.bq.output.partitionrangestart ES.BQ.OUTPUT.PARTITIONRANGESTART]
507+
[--es.bq.output.partitionrangeend ES.BQ.OUTPUT.PARTITIONRANGEEND]
508+
[--es.bq.output.partitionrangeinterval ES.BQ.OUTPUT.PARTITIONRANGEINTERVAL]
509+
[--es.bq.output.clusteredfields ES.BQ.OUTPUT.CLUSTEREDFIELDS]
510+
[--es.bq.output.allowfieldaddition ES.BQ.OUTPUT.ALLOWFIELDADDITION]
511+
[--es.bq.output.allowfieldrelaxation ES.BQ.OUTPUT.ALLOWFIELDRELAXATION]
512+
[--es.bq.output.bignumericdefaultprecision ES.BQ.OUTPUT.BIGNUMERICDEFAULTPRECISION]
513+
[--es.bq.output.bignumericdefaultscale ES.BQ.OUTPUT.BIGNUMERICDEFAULTSCALE]
480514
481515
options:
482516
-h, --help show this help message and exit
@@ -586,12 +620,45 @@ options:
586620
BigQuery Output Dataset Name
587621
--es.bq.output.table ES.BQ.OUTPUT.TABLE
588622
BigQuery Output Table Name
589-
--es.bq.temp.bucket.name ES.BIGQUERY.TEMP.BUCKET.NAME
590-
Spark BigQuery connector temporary bucket
591623
--es.bq.output.mode {overwrite,append,ignore,errorifexists}
592624
BigQuery Output write mode (one of:
593625
append,overwrite,ignore,errorifexists) (Defaults to
594626
append)
627+
--es.bq.output.temporarygcsbucket ES.BQ.OUTPUT.TEMPORARYGCSBUCKET
628+
The GCS bucket that temporarily holds the data before it is loaded to BigQuery
629+
--es.bq.output.persistentgcsbucket ES.BQ.OUTPUT.PERSISTENTGCSBUCKET
630+
The GCS bucket that holds the data before it is loaded to BigQuery. If informed, the data won't be deleted after write data into BigQuery.
631+
--es.bq.output.bigquerytablelabel ES.BQ.OUTPUT.BIGQUERYTABLELABEL
632+
Used to add labels to the table while writing to a table. Multiple labels can be set.
633+
--es.bq.output.createdisposition ES.BQ.OUTPUT.CREATEDISPOSITION
634+
Specifies whether the job is allowed to create new tables.
635+
--es.bq.output.persistentgcspath ES.BQ.OUTPUT.PERSISTENTGCSPATH
636+
The GCS path that holds the data before it is loaded to BigQuery.
637+
Used only with es.bq.output.persistentgcsbucket
638+
--es.bq.output.datepartition ES.BQ.OUTPUT.DATEPARTITION
639+
The date partition the data is going to be written to.
640+
--es.bq.output.partitionfield ES.BQ.OUTPUT.PARTITIONFIELD
641+
If this field is specified, the table is partitioned by this field.
642+
--es.bq.output.partitionexpirationms ES.BQ.OUTPUT.PARTITIONEXPIRATIONMS
643+
Number of milliseconds for which to keep the storage for partitions in the table.
644+
--es.bq.output.partitiontype ES.BQ.OUTPUT.PARTITIONTYPE
645+
Used to specify Time partitioning. Supported types are: HOUR, DAY, MONTH, YEAR. This option is mandatory for a target table to be Time partitioned. Defaults to DAY if es.bq.output.partitionfield is specified
646+
--es.bq.output.partitionrangestart ES.BQ.OUTPUT.PARTITIONRANGESTART
647+
Used to specify Integer-range partitioning. This option is mandatory for a target table to be Integer-range partitioned. Pass es.bq.output.partitionrangeend and es.bq.output.partitionrangeinterval along with this option.
648+
--es.bq.output.partitionrangeend ES.BQ.OUTPUT.PARTITIONRANGEEND
649+
Used to specify Integer-range partitioning. This option is mandatory for a target table to be Integer-range partitioned. Pass es.bq.output.partitionrangestart and es.bq.output.partitionrangeinterval along with this option.
650+
--es.bq.output.partitionrangeinterval ES.BQ.OUTPUT.PARTITIONRANGEINTERVAL
651+
Used to specify Integer-range partitioning. This option is mandatory for a target table to be Integer-range partitioned. Pass es.bq.output.partitionrangestart and es.bq.output.partitionrangeend along with this option.
652+
--es.bq.output.clusteredfields ES.BQ.OUTPUT.CLUSTEREDFIELDS
653+
A string of non-repeated, top level columns seperated by comma.
654+
--es.bq.output.allowfieldaddition ES.BQ.OUTPUT.ALLOWFIELDADDITION
655+
Adds the ALLOW_FIELD_ADDITION SchemaUpdateOption to the BigQuery LoadJob. Allowed values are true and false. Default to false
656+
--es.bq.output.allowfieldrelaxation ES.BQ.OUTPUT.ALLOWFIELDRELAXATION
657+
Adds the ALLOW_FIELD_RELAXATION SchemaUpdateOption to the BigQuery LoadJob. Allowed values are true and false.
658+
--es.bq.output.bignumericdefaultprecision ES.BQ.OUTPUT.BIGNUMERICDEFAULTPRECISION
659+
An alternative default precision for BigNumeric fields, as the BigQuery default is too wide for Spark. Values can be between 1 and 38.
660+
--es.bq.output.bignumericdefaultscale ES.BQ.OUTPUT.BIGNUMERICDEFAULTSCALE
661+
An alternative default scale for BigNumeric fields. Values can be between 0 and 38, and less than bigNumericFieldsPrecision. This default is used only when the field has an unparameterized BigNumeric type.
595662
596663
```
597664

@@ -612,7 +679,7 @@ export SUBNET=projects/my-project/regions/us-central1/subnetworks/test-subnet
612679
--es.bq.input.password="demo" \
613680
--es.bq.output.dataset="my-project.test_dataset" \
614681
--es.bq.output.table="dummyusers" \
615-
--es.bq.temp.bucket.name="<temp-bq-bucket-name>" \
682+
--es.bq.output.temporarygcsbucket="<temp-bq-bucket-name>" \
616683
--es.bq.output.mode="append"
617684
```
618685
# Elasticsearch To Bigtable

python/dataproc_templates/elasticsearch/elasticsearch_to_bq.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from pyspark.sql import SparkSession
2222

2323
from dataproc_templates import BaseTemplate
24-
from dataproc_templates.util.argument_parsing import add_es_spark_connector_options
24+
from dataproc_templates.util.argument_parsing import add_spark_options, add_es_spark_connector_options
2525
from dataproc_templates.util.dataframe_reader_wrappers import ingest_dataframe_from_elasticsearch
2626
from dataproc_templates.util.elasticsearch_transformations import flatten_struct_fields, flatten_array_fields
2727
import dataproc_templates.util.template_constants as constants
@@ -97,12 +97,6 @@ def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]:
9797
required=True,
9898
help='BigQuery Output Table Name'
9999
)
100-
parser.add_argument(
101-
f'--{constants.ES_BQ_LD_TEMP_BUCKET_NAME}',
102-
dest=constants.ES_BQ_LD_TEMP_BUCKET_NAME,
103-
required=True,
104-
help='Spark BigQuery connector temporary bucket'
105-
)
106100
parser.add_argument(
107101
f'--{constants.ES_BQ_OUTPUT_MODE}',
108102
dest=constants.ES_BQ_OUTPUT_MODE,
@@ -121,6 +115,8 @@ def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]:
121115
]
122116
)
123117

118+
add_spark_options(parser, constants.get_bq_output_spark_options("es.bq.output."))
119+
124120
known_args: argparse.Namespace
125121
known_args, _ = parser.parse_known_args(args)
126122

@@ -152,7 +148,6 @@ def run(self, spark: SparkSession, args: Dict[str, Any]) -> None:
152148
es_api_key: str = args[constants.ES_BQ_NODE_API_KEY]
153149
flatten_struct = args[constants.ES_BQ_FLATTEN_STRUCT]
154150
flatten_array = args[constants.ES_BQ_FLATTEN_ARRAY]
155-
bq_temp_bucket: str = args[constants.ES_BQ_LD_TEMP_BUCKET_NAME]
156151
output_mode: str = args[constants.ES_BQ_OUTPUT_MODE]
157152
big_query_output_dataset: str = args[constants.ES_BQ_OUTPUT_DATASET]
158153
big_query_output_table: str = args[constants.ES_BQ_OUTPUT_TABLE]
@@ -180,12 +175,15 @@ def run(self, spark: SparkSession, args: Dict[str, Any]) -> None:
180175
if not input_data.head(1):
181176
logger.info("No records in dataframe, Skipping the BigQuery Load")
182177
return
178+
179+
bq_output_constant_options: dict = constants.get_bq_output_spark_options("es.bq.output.")
180+
spark_options = {bq_output_constant_options[k]: v for k, v in args.items() if k in bq_output_constant_options and v}
183181

184182
# Write
185183
input_data.write \
186184
.format(constants.FORMAT_BIGQUERY) \
187185
.option(constants.TABLE, big_query_output_dataset + "." + big_query_output_table) \
188-
.option(constants.ES_BQ_TEMP_BUCKET, bq_temp_bucket) \
189186
.option("enableListInference", True) \
190187
.mode(output_mode) \
188+
.options(**spark_options) \
191189
.save()

0 commit comments

Comments
 (0)