Skip to content

Commit 4388e0c

Browse files
feat: added BigQuery to Memorystore python template (#1033)
* feat: added BigQuery to Memorystore python template * ci: added integration tests for BigqueryToMemorystore * test: added test cases for BigQuery To Memorystore template * test: fixed test cases for BigQuery To Memorystore template * test: fixed test cases for BigQuery To Memorystore template
1 parent f106b7c commit 4388e0c

File tree

10 files changed

+459
-0
lines changed

10 files changed

+459
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ Please refer to the [Dataproc Templates (Java - Spark) README](/java) for more i
6060
Please refer to the [Dataproc Templates (Python - PySpark) README](/python) for more information
6161
* [AzureBlobToBigQuery](/python/dataproc_templates/azure#azure-blob-to-bigquery)
6262
* [BigQueryToGCS](/python/dataproc_templates/bigquery#bigquery-to-gcs) (blogpost [link](https://medium.com/google-cloud/moving-data-from-bigquery-to-gcs-using-gcp-dataproc-serverless-and-pyspark-f6481b86bcd1))
63+
* [BigQueryToMemorystore](/python/dataproc_templates/bigquery#bigquery-to-memorystore)
6364
* [CassandraToBigquery](/python/dataproc_templates/cassandra)
6465
* [CassandraToGCS](/python/dataproc_templates/cassandra) (blogpost [link](https://medium.com/google-cloud/export-data-from-cassandra-to-google-cloud-storage-using-dataproc-serverless-2569a00e17fe))
6566
* [ElasticsearchToBigQuery](/python/dataproc_templates/elasticsearch#elasticsearch-to-bq)

python/.ci/Jenkinsfile

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -887,6 +887,37 @@ EOF
887887
}
888888
}
889889
}
890+
stage('BigQuery to Memorystore'){
891+
steps{
892+
retry(count: stageRetryCount) {
893+
sh '''
894+
python3.8 -m pip install --user virtualenv
895+
896+
python3.8 -m venv env
897+
898+
source env/bin/activate
899+
900+
export GCS_STAGING_LOCATION="gs://python-dataproc-templates-temp"
901+
export SKIP_BUILD=true
902+
export JARS="gs://vbhatia_test/jars/spark-redis_2.12-3.0.0-jar-with-dependencies.jar"
903+
904+
cd python
905+
906+
./bin/start.sh \
907+
-- --template=BIGQUERYTOMEMORYSTORE \
908+
--bigquery.memorystore.input.table=bigquery-public-data.fcc_political_ads.file_history \
909+
--bigquery.memorystore.output.host=10.0.0.17 \
910+
--bigquery.memorystore.output.port=6379 \
911+
--bigquery.memorystore.output.table=file_history \
912+
--bigquery.memorystore.output.key.column=fileHistoryId \
913+
--bigquery.memorystore.output.model=hash \
914+
--bigquery.memorystore.output.mode=overwrite \
915+
--bigquery.memorystore.output.ttl=3600 \
916+
--bigquery.memorystore.output.dbnum=0
917+
'''
918+
}
919+
}
920+
}
890921
}
891922
}
892923
}

python/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
- [AzureBlobStorageToBigQuery](/python/dataproc_templates/azure#azure-blob-storage-to-bigquery)
88
- [BigQueryToGCS](/python/dataproc_templates/bigquery#bigquery-to-gcs) (blogpost [link](https://medium.com/google-cloud/moving-data-from-bigquery-to-gcs-using-gcp-dataproc-serverless-and-pyspark-f6481b86bcd1))
9+
- [BigQueryToMemorystore](/python/dataproc_templates/bigquery#bigquery-to-memorystore)
910
- [CassandraToBigquery](/python/dataproc_templates/cassandra#cassandra-to-bigquery)
1011
- [CassandraToGCS](/python/dataproc_templates/cassandra#cassandra-to-gcs) (blogpost [link](https://medium.com/google-cloud/export-data-from-cassandra-to-google-cloud-storage-using-dataproc-serverless-2569a00e17fe))
1112
- [ElasticsearchToBigQuery](/python/dataproc_templates/elasticsearch#elasticsearch-to-bq) (blogpost [link](https://medium.com/@anujrohilla197/exporting-data-from-elasticsearch-to-bigquery-using-pyspark-on-dataproc-serverless-47633f620ce3))

python/dataproc_templates/bigquery/README.md

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,3 +121,87 @@ export REGION=us-central1
121121
--bigquery.gcs.output.header=false \
122122
--bigquery.gcs.output.timestampntzformat="yyyy-MM-dd HH:mm:ss"
123123
```
124+
125+
## BigQuery to Memorystore
126+
127+
Template for exporting data from BigQuery to Google Cloud Memorystore (Redis). This template supports writing data using hash and binary persistence model. It also supports specifying ttl for data, key column and automatic schema conversion & creation.
128+
129+
It uses the [Spark BigQuery connector](https://cloud.google.com/dataproc-serverless/docs/guides/bigquery-connector-spark-example) for reading from BigQuery and [Spark-Redis](https://github.com/RedisLabs/spark-redis) for writing to Redis.
130+
131+
132+
## Arguments
133+
134+
* `bigquery.memorystore.input.table`: BigQuery Input table name (format: `project.dataset.table`)
135+
* `bigquery.memorystore.output.host`: Redis Memorystore host
136+
* `bigquery.memorystore.output.table`: Redis Memorystore target table name
137+
* `bigquery.memorystore.output.key.column`: Redis Memorystore key column for target table
138+
139+
#### Optional Arguments
140+
141+
* `bigquery.memorystore.output.port`: Redis Memorystore port. Defaults to 6379
142+
* `bigquery.memorystore.output.model`: Memorystore persistence model for Dataframe (one of: hash, binary) (Defaults to hash)
143+
* `bigquery.memorystore.output.mode`: Output write mode (one of: append,overwrite,ignore,errorifexists) (Defaults to append)
144+
* `bigquery.memorystore.output.ttl`: Data time to live in seconds. Data doesn't expire if ttl is less than 1 (Defaults to 0)
145+
* `bigquery.memorystore.output.dbnum`: Database / namespace for logical key separation (Defaults to 0)
146+
147+
## Usage
148+
```
149+
python main.py --template BIGQUERYTOMEMORYSTORE --help
150+
151+
usage: main.py [-h]
152+
--bigquery.memorystore.input.table BIGQUERY.MEMORYSTORE.INPUT.TABLE
153+
--bigquery.memorystore.output.host BIGQUERY.MEMORYSTORE.OUTPUT.HOST
154+
--bigquery.memorystore.output.table BIGQUERY.MEMORYSTORE.OUTPUT.TABLE
155+
--bigquery.memorystore.output.key.column BIGQUERY.MEMORYSTORE.OUTPUT.KEY.COLUMN
156+
[--bigquery.memorystore.output.port BIGQUERY.MEMORYSTORE.OUTPUT.PORT]
157+
[--bigquery.memorystore.output.model {hash,binary}]
158+
[--bigquery.memorystore.output.mode {overwrite,append,ignore,errorifexists}]
159+
[--bigquery.memorystore.output.ttl BIGQUERY.MEMORYSTORE.OUTPUT.TTL]
160+
[--bigquery.memorystore.output.dbnum BIGQUERY.MEMORYSTORE.OUTPUT.DBNUM]
161+
162+
options:
163+
-h, --help show this help message and exit
164+
--bigquery.memorystore.input.table BIGQUERY.MEMORYSTORE.INPUT.TABLE
165+
BigQuery Input table name
166+
--bigquery.memorystore.output.host BIGQUERY.MEMORYSTORE.OUTPUT.HOST
167+
Redis Memorystore host
168+
--bigquery.memorystore.output.table BIGQUERY.MEMORYSTORE.OUTPUT.TABLE
169+
Redis Memorystore target table name
170+
--bigquery.memorystore.output.key.column BIGQUERY.MEMORYSTORE.OUTPUT.KEY.COLUMN
171+
Redis Memorystore key column for target table
172+
--bigquery.memorystore.output.port BIGQUERY.MEMORYSTORE.OUTPUT.PORT
173+
Redis Memorystore port. Defaults to 6379
174+
--bigquery.memorystore.output.model {hash,binary}
175+
Memorystore persistence model for Dataframe (one of: hash, binary) (Defaults to hash)
176+
--bigquery.memorystore.output.mode {overwrite,append,ignore,errorifexists}
177+
Output write mode (one of: append,overwrite,ignore,errorifexists) (Defaults to append)
178+
--bigquery.memorystore.output.ttl BIGQUERY.MEMORYSTORE.OUTPUT.TTL
179+
Data time to live in seconds. Data doesn't expire if ttl is less than 1 (Defaults to 0)
180+
--bigquery.memorystore.output.dbnum BIGQUERY.MEMORYSTORE.OUTPUT.DBNUM
181+
Database / namespace for logical key separation (Defaults to 0)
182+
```
183+
184+
## Example submission
185+
186+
```
187+
export GCP_PROJECT=myprojectid
188+
export REGION=us-west1
189+
export SUBNET=projects/myprojectid/regions/us-west1/subnetworks/mysubnetid
190+
export GCS_STAGING_LOCATION="gs://python-dataproc-templates"
191+
export JARS="gs://mygcsstagingbkt/jars/spark-redis_2.12-3.0.0-jar-with-dependencies.jar"
192+
193+
./bin/start.sh \
194+
-- --template=BIGQUERYTOMEMORYSTORE \
195+
--bigquery.memorystore.input.table=bigquery-public-data.fcc_political_ads.file_history \
196+
--bigquery.memorystore.output.host=10.0.0.17 \
197+
--bigquery.memorystore.output.port=6379 \
198+
--bigquery.memorystore.output.table=file_history \
199+
--bigquery.memorystore.output.key.column=fileHistoryId \
200+
--bigquery.memorystore.output.model=hash \
201+
--bigquery.memorystore.output.mode=overwrite \
202+
--bigquery.memorystore.output.ttl=360 \
203+
--bigquery.memorystore.output.dbnum=0
204+
```
205+
206+
## Known limitations
207+
With Spark-Redis, the Hash model does not support nested fields in the DataFrame. Alternatively, you can use the Binary persistence model, which supports nested fields.

python/dataproc_templates/bigquery/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@
1313
# limitations under the License.
1414

1515
from .bigquery_to_gcs import BigQueryToGCSTemplate
16+
from .bigquery_to_memorystore import BigQueryToMemorystoreTemplate
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
# Copyright 2022 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from typing import Dict, Sequence, Optional, Any
16+
from logging import Logger
17+
import argparse
18+
import pprint
19+
20+
from pyspark.sql import SparkSession, DataFrameWriter
21+
22+
from dataproc_templates import BaseTemplate
23+
from dataproc_templates.util.argument_parsing import add_spark_options
24+
from dataproc_templates.util.dataframe_writer_wrappers import persist_dataframe_to_cloud_storage
25+
import dataproc_templates.util.template_constants as constants
26+
27+
28+
__all__ = ['BigQueryToMemorystoreTemplate']
29+
30+
31+
class BigQueryToMemorystoreTemplate(BaseTemplate):
32+
"""
33+
Dataproc template implementing exports from BigQuery to Memorystore
34+
"""
35+
36+
@staticmethod
37+
def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]:
38+
parser: argparse.ArgumentParser = argparse.ArgumentParser()
39+
40+
parser.add_argument(
41+
f'--{constants.BQ_MEMORYSTORE_INPUT_TABLE}',
42+
dest=constants.BQ_MEMORYSTORE_INPUT_TABLE,
43+
required=True,
44+
help='BigQuery Input table name'
45+
)
46+
parser.add_argument(
47+
f'--{constants.BQ_MEMORYSTORE_OUTPUT_HOST}',
48+
dest=constants.BQ_MEMORYSTORE_OUTPUT_HOST,
49+
required=True,
50+
help='Redis Memorystore host',
51+
)
52+
parser.add_argument(
53+
f'--{constants.BQ_MEMORYSTORE_OUTPUT_PORT}',
54+
dest=constants.BQ_MEMORYSTORE_OUTPUT_PORT,
55+
required=False,
56+
default=6379,
57+
help='Redis Memorystore port. Defaults to 6379',
58+
)
59+
#todo: add defaults
60+
parser.add_argument(
61+
f'--{constants.BQ_MEMORYSTORE_OUTPUT_TABLE}',
62+
dest=constants.BQ_MEMORYSTORE_OUTPUT_TABLE,
63+
required=True,
64+
help='Redis Memorystore target table name',
65+
)
66+
parser.add_argument(
67+
f'--{constants.BQ_MEMORYSTORE_OUTPUT_KEY_COLUMN}',
68+
dest=constants.BQ_MEMORYSTORE_OUTPUT_KEY_COLUMN,
69+
required=True,
70+
help='Redis Memorystore key column for target table',
71+
)
72+
parser.add_argument(
73+
f'--{constants.BQ_MEMORYSTORE_OUTPUT_MODEL}',
74+
dest=constants.BQ_MEMORYSTORE_OUTPUT_MODEL,
75+
required=False,
76+
default=constants.BQ_MEMORYSTORE_OUTPUT_MODEL_HASH,
77+
help=(
78+
'Memorystore persistence model for Dataframe'
79+
'(one of: hash, binary) '
80+
'(Defaults to hash)'
81+
),
82+
choices=[
83+
constants.BQ_MEMORYSTORE_OUTPUT_MODEL_HASH,
84+
constants.BQ_MEMORYSTORE_OUTPUT_MODEL_BINARY
85+
]
86+
)
87+
parser.add_argument(
88+
f'--{constants.BQ_MEMORYSTORE_OUTPUT_MODE}',
89+
dest=constants.BQ_MEMORYSTORE_OUTPUT_MODE,
90+
required=False,
91+
default=constants.OUTPUT_MODE_APPEND,
92+
help=(
93+
'Output write mode '
94+
'(one of: append,overwrite,ignore,errorifexists) '
95+
'(Defaults to append)'
96+
),
97+
choices=[
98+
constants.OUTPUT_MODE_OVERWRITE,
99+
constants.OUTPUT_MODE_APPEND,
100+
constants.OUTPUT_MODE_IGNORE,
101+
constants.OUTPUT_MODE_ERRORIFEXISTS
102+
]
103+
)
104+
parser.add_argument(
105+
f'--{constants.BQ_MEMORYSTORE_OUTPUT_TTL}',
106+
dest=constants.BQ_MEMORYSTORE_OUTPUT_TTL,
107+
required=False,
108+
default=0,
109+
help=(
110+
'Data time to live in seconds. Data doesn\'t expire if ttl is less than 1'
111+
'(Defaults to 0)'
112+
)
113+
)
114+
parser.add_argument(
115+
f'--{constants.BQ_MEMORYSTORE_OUTPUT_DBNUM}',
116+
dest=constants.BQ_MEMORYSTORE_OUTPUT_DBNUM,
117+
required=False,
118+
default=0,
119+
help=(
120+
'Database / namespace for logical key separation'
121+
'(Defaults to 0)'
122+
)
123+
)
124+
125+
known_args: argparse.Namespace
126+
known_args, _ = parser.parse_known_args(args)
127+
128+
return vars(known_args)
129+
130+
def run(self, spark: SparkSession, args: Dict[str, Any]) -> None:
131+
132+
logger: Logger = self.get_logger(spark=spark)
133+
134+
# Arguments
135+
input_table: str = args[constants.BQ_MEMORYSTORE_INPUT_TABLE]
136+
137+
output_table: str = args[constants.BQ_MEMORYSTORE_OUTPUT_TABLE]
138+
host: str = args[constants.BQ_MEMORYSTORE_OUTPUT_HOST]
139+
port: str = args[constants.BQ_MEMORYSTORE_OUTPUT_PORT]
140+
key_column: str = args[constants.BQ_MEMORYSTORE_OUTPUT_KEY_COLUMN]
141+
model: str = args[constants.BQ_MEMORYSTORE_OUTPUT_MODEL]
142+
ttl: int = args[constants.BQ_MEMORYSTORE_OUTPUT_TTL]
143+
dbnum: int = args[constants.BQ_MEMORYSTORE_OUTPUT_DBNUM]
144+
output_mode: str = args[constants.BQ_MEMORYSTORE_OUTPUT_MODE]
145+
146+
logger.info(
147+
"Starting Bigquery to Memorystore Spark job with parameters:\n"
148+
f"{pprint.pformat(args)}"
149+
)
150+
151+
# Read
152+
input_data = spark.read \
153+
.format(constants.FORMAT_BIGQUERY) \
154+
.option(constants.TABLE, input_table) \
155+
.load()
156+
157+
# Write
158+
input_data.write \
159+
.format(constants.FORMAT_MEMORYSTORE) \
160+
.option(constants.TABLE, output_table) \
161+
.option(constants.MEMORYSTORE_KEY_COLUMN, key_column) \
162+
.option(constants.MEMORYSTORE_MODEL, model) \
163+
.option(constants.MEMORYSTORE_HOST, host) \
164+
.option(constants.MEMORYSTORE_PORT, port) \
165+
.option(constants.MEMORYSTORE_DBNUM, dbnum) \
166+
.option(constants.MEMORYSTORE_TTL, ttl) \
167+
.mode(output_mode) \
168+
.save()

python/dataproc_templates/template_name.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ class TemplateName(Enum):
2727
GCSTOBIGTABLE = "GCSTOBIGTABLE"
2828
GCSTOGCS = "GCSTOGCS"
2929
BIGQUERYTOGCS = "BIGQUERYTOGCS"
30+
BIGQUERYTOMEMORYSTORE = "BIGQUERYTOMEMORYSTORE"
3031
HIVETOBIGQUERY = "HIVETOBIGQUERY"
3132
HIVETOGCS = "HIVETOGCS"
3233
TEXTTOBIGQUERY = "TEXTTOBIGQUERY"

python/dataproc_templates/util/template_constants.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
FORMAT_JDBC = "jdbc"
3535
FORMAT_PUBSUBLITE = "pubsublite"
3636
FORMAT_REDSHIFT = "io.github.spark_redshift_community.spark.redshift"
37+
FORMAT_MEMORYSTORE = "org.apache.spark.sql.redis"
3738
JDBC_URL = "url"
3839
JDBC_TABLE = "dbtable"
3940
JDBC_QUERY = "query"
@@ -89,6 +90,12 @@
8990
FORMAT_MONGO = "com.mongodb.spark.sql.DefaultSource"
9091
MONGO_DEFAULT_BATCH_SIZE = 512
9192
MONGO_BATCH_SIZE = "maxBatchSize"
93+
MEMORYSTORE_KEY_COLUMN = "key.column"
94+
MEMORYSTORE_MODEL = "model"
95+
MEMORYSTORE_HOST = "host"
96+
MEMORYSTORE_PORT = "port"
97+
MEMORYSTORE_DBNUM = "dbNum"
98+
MEMORYSTORE_TTL = "ttl"
9299
FORMAT_SNOWFLAKE = "snowflake"
93100
REDSHIFT_TEMPDIR = "tempdir"
94101
REDSHIFT_IAMROLE = "aws_iam_role"
@@ -923,3 +930,17 @@ def get_es_spark_connector_input_options(prefix):
923930
AZ_BLOB_STORAGE_ACCOUNT = "azure.blob.storage.account"
924931
AZ_BLOB_CONTAINER_NAME = "azure.blob.container.name"
925932
AZ_BLOB_SAS_TOKEN = "azure.blob.sas.token"
933+
934+
# BigQuery to Memorystore
935+
BQ_MEMORYSTORE_INPUT_TABLE = "bigquery.memorystore.input.table"
936+
BQ_MEMORYSTORE_OUTPUT_HOST = "bigquery.memorystore.output.host"
937+
BQ_MEMORYSTORE_OUTPUT_PORT = "bigquery.memorystore.output.port"
938+
BQ_MEMORYSTORE_OUTPUT_TABLE = "bigquery.memorystore.output.table"
939+
BQ_MEMORYSTORE_OUTPUT_KEY_COLUMN = "bigquery.memorystore.output.key.column"
940+
BQ_MEMORYSTORE_OUTPUT_MODEL = "bigquery.memorystore.output.model"
941+
BQ_MEMORYSTORE_OUTPUT_MODE = "bigquery.memorystore.output.mode"
942+
BQ_MEMORYSTORE_OUTPUT_TTL = "bigquery.memorystore.output.ttl"
943+
BQ_MEMORYSTORE_OUTPUT_DBNUM = "bigquery.memorystore.output.dbnum"
944+
945+
BQ_MEMORYSTORE_OUTPUT_MODEL_HASH = "hash"
946+
BQ_MEMORYSTORE_OUTPUT_MODEL_BINARY = "binary"

python/main.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from dataproc_templates.gcs.gcs_to_mongo import GCSToMONGOTemplate
3030
from dataproc_templates.gcs.gcs_to_bigtable import GCSToBigTableTemplate
3131
from dataproc_templates.bigquery.bigquery_to_gcs import BigQueryToGCSTemplate
32+
from dataproc_templates.bigquery.bigquery_to_memorystore import BigQueryToMemorystoreTemplate
3233
from dataproc_templates.hive.hive_to_bigquery import HiveToBigQueryTemplate
3334
from dataproc_templates.hive.hive_to_gcs import HiveToGCSTemplate
3435
from dataproc_templates.gcs.text_to_bigquery import TextToBigQueryTemplate
@@ -60,6 +61,7 @@
6061
TemplateName.GCSTOGCS: GCSToGCSTemplate,
6162
TemplateName.GCSTOBIGTABLE: GCSToBigTableTemplate,
6263
TemplateName.BIGQUERYTOGCS: BigQueryToGCSTemplate,
64+
TemplateName.BIGQUERYTOMEMORYSTORE: BigQueryToMemorystoreTemplate,
6365
TemplateName.HIVETOBIGQUERY: HiveToBigQueryTemplate,
6466
TemplateName.HIVETOGCS: HiveToGCSTemplate,
6567
TemplateName.TEXTTOBIGQUERY: TextToBigQueryTemplate,

0 commit comments

Comments
 (0)