|
183 | 183 | "# Define the date range of transactions for feature engineering (last 10 days up until yesterday)\n", |
184 | 184 | "YESTERDAY = datetime.today() - timedelta(days=1)\n", |
185 | 185 | "YEAR_MONTH_PREFIX = YESTERDAY.strftime(\"%Y-%m\")\n", |
186 | | - "DATAPROCESSING_START_DATE = (YESTERDAY - timedelta(days=10)).strftime(\n", |
187 | | - " \"%Y-%m-%d\"\n", |
188 | | - ")\n", |
| 186 | + "DATAPROCESSING_START_DATE = (YESTERDAY - timedelta(days=10)).strftime(\"%Y-%m-%d\")\n", |
189 | 187 | "DATAPROCESSING_END_DATE = YESTERDAY.strftime(\"%Y-%m-%d\")\n", |
190 | 188 | "\n", |
191 | 189 | "# Define BiqQuery dataset and tables to calculate features.\n", |
|
208 | 206 | "\n", |
209 | 207 | "TERMINALS_FE_BQ_VIEW_URI = f\"{PROJECT_ID}.tx.v_terminals_features\"\n", |
210 | 208 | "\n", |
211 | | - "CUSTOMERS_STREAMING_FE_TABLE_URI = (\n", |
212 | | - " f\"{PROJECT_ID}.tx.t_customers_streaming_features\"\n", |
213 | | - ")\n", |
214 | | - "TERMINALS_STREAMING_FE_TABLE_URI = (\n", |
215 | | - " f\"{PROJECT_ID}.tx.t_terminals_streaming_features\"\n", |
216 | | - ")\n", |
| 209 | + "CUSTOMERS_STREAMING_FE_TABLE_URI = f\"{PROJECT_ID}.tx.t_customers_streaming_features\"\n", |
| 210 | + "TERMINALS_STREAMING_FE_TABLE_URI = f\"{PROJECT_ID}.tx.t_terminals_streaming_features\"\n", |
217 | 211 | "\n", |
218 | 212 | "ONLINE_STORAGE_NODES = 1\n", |
219 | 213 | "FEATURE_TIME = \"feature_ts\"\n", |
|
789 | 783 | "tags": [] |
790 | 784 | }, |
791 | 785 | "source": [ |
792 | | - "run_bq_query(\n", |
793 | | - " f\"SELECT * FROM `{CUSTOMERS_STREAMING_FE_TABLE_URI}` LIMIT 5\", show=True\n", |
794 | | - ")" |
| 786 | + "run_bq_query(f\"SELECT * FROM `{CUSTOMERS_STREAMING_FE_TABLE_URI}` LIMIT 5\", show=True)" |
795 | 787 | ], |
796 | 788 | "outputs": [], |
797 | 789 | "execution_count": null |
|
802 | 794 | "tags": [] |
803 | 795 | }, |
804 | 796 | "source": [ |
805 | | - "run_bq_query(\n", |
806 | | - " f\"SELECT * FROM `{TERMINALS_STREAMING_FE_TABLE_URI}` LIMIT 5\", show=True\n", |
807 | | - ")" |
| 797 | + "run_bq_query(f\"SELECT * FROM `{TERMINALS_STREAMING_FE_TABLE_URI}` LIMIT 5\", show=True)" |
808 | 798 | ], |
809 | 799 | "outputs": [], |
810 | 800 | "execution_count": null |
|
1074 | 1064 | "metadata": {}, |
1075 | 1065 | "source": [ |
1076 | 1066 | "def create_fs_feature_group(\n", |
1077 | | - " bq_source_uri, entity_id_column, feature_group_id, feature_ids_list):\n", |
1078 | | - " \n", |
| 1067 | + " bq_source_uri, entity_id_column, feature_group_id, feature_ids_list\n", |
| 1068 | + "):\n", |
| 1069 | + "\n", |
1079 | 1070 | " # Now, create the featureGroup\n", |
1080 | 1071 | " feature_group_config = feature_group_pb2.FeatureGroup(\n", |
1081 | 1072 | " big_query=feature_group_pb2.FeatureGroup.BigQuery(\n", |
1082 | | - " big_query_source=io_pb2.BigQuerySource(\n", |
1083 | | - " input_uri=f\"bq://{bq_source_uri}\"\n", |
1084 | | - " ),\n", |
| 1073 | + " big_query_source=io_pb2.BigQuerySource(input_uri=f\"bq://{bq_source_uri}\"),\n", |
1085 | 1074 | " # Add the entity_id_columns parameter here\n", |
1086 | 1075 | " entity_id_columns=[entity_id_column],\n", |
1087 | 1076 | " )\n", |
|
1093 | 1082 | " feature_group=feature_group_config,\n", |
1094 | 1083 | " )\n", |
1095 | 1084 | " )\n", |
1096 | | - " \n", |
| 1085 | + "\n", |
1097 | 1086 | " # After the long-running operation (LRO) is complete, show the result.\n", |
1098 | 1087 | " print(create_group_lro.result())\n", |
1099 | 1088 | "\n", |
|
1123 | 1112 | "tags": [] |
1124 | 1113 | }, |
1125 | 1114 | "source": [ |
1126 | | - "CUSTOMER_ID_COLUMN = \"customer_id\" #entity_id\n", |
| 1115 | + "CUSTOMER_ID_COLUMN = \"customer_id\" # entity_id\n", |
1127 | 1116 | "\n", |
1128 | 1117 | "CUSTOMER_BATCH_FEATURES_GROUP_ID = \"fraudfinder_customers_batch\"\n", |
1129 | 1118 | "\n", |
|
1135 | 1124 | " \"customer_id_avg_amount_14day_window\",\n", |
1136 | 1125 | " \"customer_id_nb_tx_7day_window\",\n", |
1137 | 1126 | "]\n", |
1138 | | - " \n", |
| 1127 | + "\n", |
1139 | 1128 | "# Creating feature Group for batch for customers\n", |
1140 | 1129 | "create_fs_feature_group(\n", |
1141 | 1130 | " bq_source_uri=CUSTOMERS_FE_BQ_VIEW_URI,\n", |
1142 | 1131 | " entity_id_column=CUSTOMER_ID_COLUMN,\n", |
1143 | 1132 | " feature_group_id=CUSTOMER_BATCH_FEATURES_GROUP_ID,\n", |
1144 | | - " feature_ids_list=CUSTOMER_BATCH_FEATURE_IDS\n", |
| 1133 | + " feature_ids_list=CUSTOMER_BATCH_FEATURE_IDS,\n", |
1145 | 1134 | ")" |
1146 | 1135 | ], |
1147 | 1136 | "outputs": [], |
|
1162 | 1151 | " \"customer_id_avg_amount_30min_window\",\n", |
1163 | 1152 | " \"customer_id_avg_amount_60min_window\",\n", |
1164 | 1153 | "]\n", |
1165 | | - " \n", |
| 1154 | + "\n", |
1166 | 1155 | "# Creating feature Group for streaming for customers\n", |
1167 | 1156 | "create_fs_feature_group(\n", |
1168 | 1157 | " bq_source_uri=CUSTOMERS_STREAMING_FE_TABLE_URI,\n", |
1169 | 1158 | " entity_id_column=CUSTOMER_ID_COLUMN,\n", |
1170 | 1159 | " feature_group_id=CUSTOMER_STREAMING_FEATURES_GROUP_ID,\n", |
1171 | | - " feature_ids_list=CUSTOMER_STREAMING_FEATURE_IDS\n", |
| 1160 | + " feature_ids_list=CUSTOMER_STREAMING_FEATURE_IDS,\n", |
1172 | 1161 | ")" |
1173 | 1162 | ], |
1174 | 1163 | "outputs": [], |
|
1192 | 1181 | " \"terminal_id_risk_7day_window\",\n", |
1193 | 1182 | " \"terminal_id_risk_14day_window\",\n", |
1194 | 1183 | "]\n", |
1195 | | - " \n", |
| 1184 | + "\n", |
1196 | 1185 | "# Creating feature Group for batch for customers\n", |
1197 | 1186 | "create_fs_feature_group(\n", |
1198 | 1187 | " bq_source_uri=TERMINALS_FE_BQ_VIEW_URI,\n", |
1199 | 1188 | " entity_id_column=TERMINAL_ID_COLUMN,\n", |
1200 | 1189 | " feature_group_id=TERMINAL_BATCH_FEATURES_GROUP_ID,\n", |
1201 | | - " feature_ids_list=TERMINAL_BATCH_FEATURE_IDS\n", |
1202 | | - ")\n" |
| 1190 | + " feature_ids_list=TERMINAL_BATCH_FEATURE_IDS,\n", |
| 1191 | + ")" |
1203 | 1192 | ], |
1204 | 1193 | "outputs": [], |
1205 | 1194 | "execution_count": null |
|
1220 | 1209 | " \"terminal_id_avg_amount_30min_window\",\n", |
1221 | 1210 | " \"terminal_id_avg_amount_60min_window\",\n", |
1222 | 1211 | "]\n", |
1223 | | - " \n", |
| 1212 | + "\n", |
1224 | 1213 | "# Creating feature Group for batch for customers\n", |
1225 | 1214 | "create_fs_feature_group(\n", |
1226 | 1215 | " bq_source_uri=TERMINALS_STREAMING_FE_TABLE_URI,\n", |
1227 | 1216 | " entity_id_column=TERMINAL_ID_COLUMN,\n", |
1228 | 1217 | " feature_group_id=TERMINAL_STREAMING_FEATURES_GROUP_ID,\n", |
1229 | | - " feature_ids_list=TERMINAL_STREAMING_FEATURE_IDS\n", |
| 1218 | + " feature_ids_list=TERMINAL_STREAMING_FEATURE_IDS,\n", |
1230 | 1219 | ")" |
1231 | 1220 | ], |
1232 | 1221 | "outputs": [], |
|
1246 | 1235 | "tags": [] |
1247 | 1236 | }, |
1248 | 1237 | "source": [ |
1249 | | - "def create_online_fs_view(fs_view_id, fs_online_store_id, feature_group_id, feature_ids_list, continuous, cron_schedule=None):\n", |
| 1238 | + "def create_online_fs_view(\n", |
| 1239 | + " fs_view_id,\n", |
| 1240 | + " fs_online_store_id,\n", |
| 1241 | + " feature_group_id,\n", |
| 1242 | + " feature_ids_list,\n", |
| 1243 | + " continuous,\n", |
| 1244 | + " cron_schedule=None,\n", |
| 1245 | + "):\n", |
1250 | 1246 | "\n", |
1251 | 1247 | " feature_registry_source = feature_view_pb2.FeatureView.FeatureRegistrySource(\n", |
1252 | 1248 | " feature_groups=[\n", |
|
1288 | 1284 | "source": [ |
1289 | 1285 | "CUSTOMER_BATCH_FEATURE_VIEW_ID = \"fv_fraudfinder_customers_batch\"\n", |
1290 | 1286 | "\n", |
1291 | | - "CRON_SCHEDULE = \"TZ=America/Los_Angeles */15 * * * *\" # Each 15min\n", |
| 1287 | + "CRON_SCHEDULE = \"TZ=America/Los_Angeles */15 * * * *\" # Each 15min\n", |
1292 | 1288 | "\n", |
1293 | 1289 | "create_online_fs_view(\n", |
1294 | 1290 | " fs_view_id=CUSTOMER_BATCH_FEATURE_VIEW_ID,\n", |
|
1316 | 1312 | " fs_online_store_id=FEATURE_ONLINE_STORE_ID,\n", |
1317 | 1313 | " feature_group_id=CUSTOMER_STREAMING_FEATURES_GROUP_ID,\n", |
1318 | 1314 | " feature_ids_list=CUSTOMER_STREAMING_FEATURE_IDS,\n", |
1319 | | - " continuous=True\n", |
| 1315 | + " continuous=True,\n", |
1320 | 1316 | ")" |
1321 | 1317 | ], |
1322 | 1318 | "outputs": [], |
|
1357 | 1353 | " fs_online_store_id=FEATURE_ONLINE_STORE_ID,\n", |
1358 | 1354 | " feature_group_id=TERMINAL_STREAMING_FEATURES_GROUP_ID,\n", |
1359 | 1355 | " feature_ids_list=TERMINAL_STREAMING_FEATURE_IDS,\n", |
1360 | | - " continuous=True\n", |
| 1356 | + " continuous=True,\n", |
1361 | 1357 | ")" |
1362 | 1358 | ], |
1363 | 1359 | "outputs": [], |
|
1430 | 1426 | " name=sync_response.feature_view_sync\n", |
1431 | 1427 | " )\n", |
1432 | 1428 | " if feature_view_sync.run_time.end_time.seconds > 0:\n", |
1433 | | - " status = (\n", |
1434 | | - " \"Succeed\" if feature_view_sync.final_status.code == 0 else \"Failed\"\n", |
1435 | | - " )\n", |
| 1429 | + " status = \"Succeed\" if feature_view_sync.final_status.code == 0 else \"Failed\"\n", |
1436 | 1430 | " print(f\"Sync {status} for {feature_view_sync.name}.\")\n", |
1437 | 1431 | " break\n", |
1438 | 1432 | " else:\n", |
|
1481 | 1475 | " name=sync_response.feature_view_sync\n", |
1482 | 1476 | " )\n", |
1483 | 1477 | " if feature_view_sync.run_time.end_time.seconds > 0:\n", |
1484 | | - " status = (\n", |
1485 | | - " \"Succeed\" if feature_view_sync.final_status.code == 0 else \"Failed\"\n", |
1486 | | - " )\n", |
| 1478 | + " status = \"Succeed\" if feature_view_sync.final_status.code == 0 else \"Failed\"\n", |
1487 | 1479 | " print(f\"Sync {status} for {feature_view_sync.name}.\")\n", |
1488 | 1480 | " break\n", |
1489 | 1481 | " else:\n", |
|
1546 | 1538 | "print(FEATURE_ONLINE_STORE_ID)\n", |
1547 | 1539 | "print(CUSTOMER_BATCH_FEATURE_VIEW_ID)\n", |
1548 | 1540 | "\n", |
1549 | | - "customer_key = \"0001071169708317\" # Put known id here\n", |
| 1541 | + "customer_key = \"0001071169708317\" # Put known id here\n", |
1550 | 1542 | "\n", |
1551 | 1543 | "FEATURE_VIEW_FULL_ID = f\"projects/{PROJECT_ID}/locations/{REGION}/featureOnlineStores/{FEATURE_ONLINE_STORE_ID}/featureViews/{CUSTOMER_BATCH_FEATURE_VIEW_ID}\"\n", |
1552 | 1544 | "\n", |
|
0 commit comments