Skip to content

Commit 356b031

Browse files
committed
Commit initial changes to upload path
1 parent 089bbc5 commit 356b031

22 files changed

+826
-748
lines changed

config_schema.json

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,18 @@
44
"properties": {
55
"max_concurrent_downloads": {
66
"default": 5,
7-
"description": "Number of parallel downloader tasks for file parts.",
7+
"description": "Number of parallel download tasks for file parts.",
88
"exclusiveMinimum": 0,
99
"title": "Max Concurrent Downloads",
1010
"type": "integer"
1111
},
12+
"max_concurrent_uploads": {
13+
"default": 5,
14+
"description": "Number of parallel upload tasks for file parts.",
15+
"exclusiveMinimum": 0,
16+
"title": "Max Concurrent Uploads",
17+
"type": "integer"
18+
},
1219
"max_retries": {
1320
"default": 5,
1421
"description": "Number of times to retry failed API calls.",
@@ -24,7 +31,7 @@
2431
"type": "integer"
2532
},
2633
"part_size": {
27-
"default": 16777216,
34+
"default": 67108864,
2835
"description": "The part size to use for download.",
2936
"exclusiveMinimum": 0,
3037
"title": "Part Size",

example_config.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
exponential_backoff_max: 60
22
max_concurrent_downloads: 5
3+
max_concurrent_uploads: 5
34
max_retries: 2
45
max_wait_time: 3600
5-
part_size: 16777216
6+
part_size: 67108864
67
retry_status_codes:
78
- 408
89
- 500

src/ghga_connector/cli.py

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,18 @@
2222
import typer
2323

2424
from ghga_connector import exceptions
25-
from ghga_connector.config import CONFIG, set_runtime_config
2625
from ghga_connector.constants import C4GH
27-
from ghga_connector.core import CLIMessageDisplay, async_client
28-
from ghga_connector.core.main import async_download, decrypt_file, upload_file
26+
from ghga_connector.core import CLIMessageDisplay
27+
from ghga_connector.core.main import async_download, async_upload, decrypt_file
2928
from ghga_connector.core.utils import modify_for_debug, strtobool
3029

3130
cli = typer.Typer(no_args_is_help=True)
3231

3332

33+
@cli.command(no_args_is_help=True)
3434
def upload( # noqa: PLR0913
3535
*,
36-
file_id: str = typer.Option(..., help="The id of the file to upload"),
36+
file_alias: str = typer.Option(..., help="The alias of the file to upload"),
3737
file_path: Path = typer.Option(..., help="The path to the file to upload"),
3838
my_public_key_path: Path = typer.Option(
3939
"./key.pub",
@@ -58,7 +58,7 @@ def upload( # noqa: PLR0913
5858
modify_for_debug(debug)
5959
asyncio.run(
6060
async_upload(
61-
file_id=file_id,
61+
file_alias=file_alias,
6262
file_path=file_path,
6363
my_public_key_path=my_public_key_path,
6464
my_private_key_path=my_private_key_path,
@@ -67,26 +67,6 @@ def upload( # noqa: PLR0913
6767
)
6868

6969

70-
async def async_upload(
71-
file_id: str,
72-
file_path: Path,
73-
my_public_key_path: Path,
74-
my_private_key_path: Path,
75-
passphrase: str | None = None,
76-
):
77-
"""Upload a file asynchronously"""
78-
async with async_client() as client, set_runtime_config(client=client):
79-
await upload_file(
80-
client=client,
81-
file_id=file_id,
82-
file_path=file_path,
83-
my_public_key_path=my_public_key_path,
84-
my_private_key_path=my_private_key_path,
85-
passphrase=passphrase,
86-
part_size=CONFIG.part_size,
87-
)
88-
89-
9070
if strtobool(os.getenv("UPLOAD_ENABLED") or "false"):
9171
cli.command(no_args_is_help=True)(upload)
9272

src/ghga_connector/config.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,10 @@ class Config(BaseSettings):
7979
"""Global Config Parameters"""
8080

8181
max_concurrent_downloads: PositiveInt = Field(
82-
default=5, description="Number of parallel downloader tasks for file parts."
82+
default=5, description="Number of parallel download tasks for file parts."
83+
)
84+
max_concurrent_uploads: PositiveInt = Field(
85+
default=5, description="Number of parallel upload tasks for file parts."
8386
)
8487
max_retries: NonNegativeInt = Field(
8588
default=MAX_RETRIES, description="Number of times to retry failed API calls."

src/ghga_connector/constants.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
"""Constants used throughout the core."""
1818

19-
DEFAULT_PART_SIZE = 16 * 1024 * 1024
19+
DEFAULT_PART_SIZE = 64 * (1024**2) # 64 MiB
2020
TIMEOUT = 60.0
2121
TIMEOUT_LONG = 5 * TIMEOUT + 10
2222
MAX_PART_NUMBER = 10000

src/ghga_connector/core/crypt/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
#
1616
"""Subpackage containing all encryption/decryption related functionality"""
1717

18-
from .abstract_bases import Decryptor, Encryptor # noqa: F401
1918
from .checksums import Checksums # noqa: F401
2019
from .decryption import Crypt4GHDecryptor # noqa: F401
2120
from .encryption import Crypt4GHEncryptor # noqa: F401

src/ghga_connector/core/crypt/abstract_bases.py

Lines changed: 0 additions & 42 deletions
This file was deleted.

src/ghga_connector/core/crypt/checksums.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,10 @@ def update_encrypted(self, part: bytes):
5454
"""Update encrypted part checksums"""
5555
self._encrypted_md5.append(hashlib.md5(part, usedforsecurity=False).hexdigest())
5656
self._encrypted_sha256.append(hashlib.sha256(part).hexdigest())
57+
58+
def encrypted_checksum_for_s3(self) -> str:
59+
"""Formulate the expected encrypted checksum str (etag) stored by S3."""
60+
concatenated_md5s = b"".join(bytes.fromhex(md5) for md5 in self._encrypted_md5)
61+
object_md5 = hashlib.md5(concatenated_md5s, usedforsecurity=False).hexdigest()
62+
num_parts = len(self._encrypted_md5)
63+
return object_md5 + f"-{num_parts}"

src/ghga_connector/core/crypt/decryption.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,8 @@
2020
import crypt4gh.keys
2121
import crypt4gh.lib
2222

23-
from .abstract_bases import Decryptor
2423

25-
26-
class Crypt4GHDecryptor(Decryptor):
24+
class Crypt4GHDecryptor:
2725
"""Convenience class to deal with Crypt4GH decryption"""
2826

2927
def __init__(self, decryption_key_path: Path, passphrase: str | None):

src/ghga_connector/core/crypt/encryption.py

Lines changed: 31 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -13,46 +13,34 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515
#
16-
"""This module contains Crypt4GH based encryption functionality"""
16+
"""Functionality to encrypt files in chunks with Crypt4GH before upload."""
1717

1818
import base64
1919
import os
2020
from collections.abc import Generator
2121
from io import BufferedReader
22-
from pathlib import Path
22+
from typing import Any
2323

2424
import crypt4gh.header
25-
import crypt4gh.keys
2625
import crypt4gh.lib
2726
from nacl.bindings import crypto_aead_chacha20poly1305_ietf_encrypt
27+
from pydantic import SecretBytes
2828

2929
from ghga_connector.config import get_ghga_pubkey
30-
from ghga_connector.core import get_segments, read_file_parts
30+
from ghga_connector.core.crypt.checksums import Checksums
31+
from ghga_connector.core.file_operations import get_segments, read_file_parts
3132

32-
from .abstract_bases import Encryptor
33-
from .checksums import Checksums
3433

35-
36-
class Crypt4GHEncryptor(Encryptor):
34+
class Crypt4GHEncryptor:
3735
"""Handles on the fly encryption and checksum calculation"""
3836

39-
def __init__(
40-
self,
41-
part_size: int,
42-
private_key_path: Path,
43-
passphrase: str | None,
44-
checksums: Checksums = Checksums(),
45-
file_secret: bytes | None = None,
46-
):
47-
self._encrypted_file_size = 0
48-
self._checksums = checksums
37+
def __init__(self, part_size: int, my_private_key: SecretBytes):
4938
self._part_size = part_size
50-
self._private_key_path = private_key_path
39+
self._my_private_key = my_private_key
5140
self._server_public_key = base64.b64decode(get_ghga_pubkey())
52-
self._passphrase = passphrase
53-
if file_secret is None:
54-
file_secret = os.urandom(32)
55-
self._file_secret = file_secret
41+
self._file_secret = os.urandom(32)
42+
self.checksums = Checksums() # Updated as encryption takes place
43+
self._encrypted_file_size = 0 # Updated as encryption takes place
5644

5745
def _encrypt(self, part: bytes):
5846
"""Encrypt file part using secret"""
@@ -64,7 +52,7 @@ def _encrypt(self, part: bytes):
6452

6553
return b"".join(encrypted_segments), incomplete_segment
6654

67-
def _encrypt_segment(self, segment: bytes):
55+
def _encrypt_segment(self, segment: bytes) -> bytes:
6856
"""Encrypt one single segment"""
6957
nonce = os.urandom(12)
7058
encrypted_data = crypto_aead_chacha20poly1305_ietf_encrypt(
@@ -77,38 +65,31 @@ def _create_envelope(self) -> bytes:
7765
Gather file encryption/decryption secret and assemble a crypt4gh envelope using the
7866
server's private and the client's public key
7967
"""
80-
if self._passphrase:
81-
private_key = crypt4gh.keys.get_private_key(
82-
filepath=self._private_key_path, callback=lambda: self._passphrase
83-
)
84-
else:
85-
private_key = crypt4gh.keys.get_private_key(
86-
filepath=self._private_key_path, callback=None
87-
)
88-
89-
keys = [(0, private_key, self._server_public_key)]
68+
keys = [(0, self._my_private_key.get_secret_value(), self._server_public_key)]
9069
header_content = crypt4gh.header.make_packet_data_enc(0, self._file_secret)
9170
header_packets = crypt4gh.header.encrypt(header_content, keys)
9271
header_bytes = crypt4gh.header.serialize(header_packets)
93-
9472
return header_bytes
9573

9674
def get_encrypted_size(self) -> int:
9775
"""Get file size after encryption, excluding envelope"""
9876
return self._encrypted_file_size
9977

100-
def process_file(self, file: BufferedReader) -> Generator[bytes, None, None]:
101-
"""Encrypt file parts and prepare for upload."""
78+
def process_file(
79+
self, file: BufferedReader
80+
) -> Generator[tuple[int, bytes], Any, None]:
81+
"""Encrypt file parts for upload, yielding a tuple of the part number and content."""
10282
unprocessed_bytes = b""
10383
upload_buffer = self._create_envelope()
104-
update_encrypted = self._checksums.update_encrypted
10584

10685
# get envelope size to adjust checksum buffers and encrypted content size
10786
envelope_size = len(upload_buffer)
10887

109-
for file_part in read_file_parts(file=file, part_size=self._part_size):
88+
for part_number, file_part in enumerate(
89+
read_file_parts(file=file, part_size=self._part_size), start=1
90+
):
11091
# process unencrypted
111-
self._checksums.update_unencrypted(file_part)
92+
self.checksums.update_unencrypted(file_part)
11293
unprocessed_bytes += file_part
11394

11495
# encrypt in chunks
@@ -118,12 +99,12 @@ def process_file(self, file: BufferedReader) -> Generator[bytes, None, None]:
11899
# update checksums and yield if part size
119100
if len(upload_buffer) >= self._part_size:
120101
current_part = upload_buffer[: self._part_size]
121-
if self._checksums.encrypted_is_empty():
122-
update_encrypted(current_part[envelope_size:])
102+
if self.checksums.encrypted_is_empty():
103+
self.checksums.update_encrypted(current_part[envelope_size:])
123104
else:
124-
update_encrypted(current_part)
105+
self.checksums.update_encrypted(current_part)
125106
self._encrypted_file_size += self._part_size
126-
yield current_part
107+
yield part_number, current_part
127108
upload_buffer = upload_buffer[self._part_size :]
128109

129110
# process dangling bytes
@@ -132,14 +113,16 @@ def process_file(self, file: BufferedReader) -> Generator[bytes, None, None]:
132113

133114
while len(upload_buffer) >= self._part_size:
134115
current_part = upload_buffer[: self._part_size]
135-
update_encrypted(current_part)
116+
self.checksums.update_encrypted(current_part)
136117
self._encrypted_file_size += self._part_size
137-
yield current_part
118+
part_number += 1
119+
yield part_number, current_part
138120
upload_buffer = upload_buffer[self._part_size :]
139121

140122
if upload_buffer:
141-
update_encrypted(upload_buffer)
123+
self.checksums.update_encrypted(upload_buffer)
142124
self._encrypted_file_size += len(upload_buffer)
143-
yield upload_buffer
125+
part_number += 1
126+
yield part_number, upload_buffer
144127

145128
self._encrypted_file_size -= envelope_size

0 commit comments

Comments
 (0)