Skip to content

R2 Communication Service

A shared service for Cloudflare R2 cloud storage using boto3. Provides a singleton client, generic bucket operations (upload, download, list, delete, copy), and atomic lock primitives.

Table of Contents


Quick Start

from src.shared_services.cloud_com.r2_com.api import (
    initialize_r2,
    BucketService,
)

# Initialize (after R2KeyManager has fetched keys)
initialize_r2()

# Use BucketService for all operations
service = BucketService()

# Upload JSON
service.upload_json("config/settings.json", {"theme": "dark"})

# Download JSON
settings = service.download_json("config/settings.json")

Initialization

The R2 client must be initialized before use. This requires that R2KeyManager has already fetched and decrypted the R2 credentials (happens during OAuth login flow).

Standard Initialization

from src.shared_services.cloud_com.r2_com.api import initialize_r2

# Credentials are fetched from R2KeyManager automatically
initialize_r2()

With Logger

from src.shared_services.cloud_com.r2_com.api import initialize_r2

initialize_r2(logger=my_logger)

Check Initialization Status

from src.shared_services.cloud_com.r2_com.api import is_r2_initialized

if not is_r2_initialized():
    initialize_r2()

Reinitialize After Key Rotation

from src.shared_services.cloud_com.r2_com.api import get_r2_client

client = get_r2_client()
client.reinitialize()  # Fetches fresh credentials from R2KeyManager

Connection Status

Test Connection

from src.shared_services.cloud_com.r2_com.api import test_r2_connection

if test_r2_connection():
    print("R2 storage is reachable")
else:
    print("Cannot reach R2 endpoint")

Quick Online Check

from src.shared_services.cloud_com.r2_com.api import is_r2_online

# Based on last operation status (no new API call)
if is_r2_online():
    # Safe to make R2 calls
    pass

Uploading Data

Upload Raw Bytes

from src.shared_services.cloud_com.r2_com.api import BucketService

service = BucketService()

# Upload binary data
service.upload_bytes(
    "projects/abc/planning.7z",
    compressed_data,
    content_type="application/x-7z-compressed",
)

Upload JSON

service.upload_json("projects/abc/project.meta.json", {
    "name": "Projekt Alpha",
    "creator": "admin",
    "created": "2025-01-15T10:00:00",
})

Downloading Data

Download Raw Bytes

service = BucketService()

data = service.download_bytes("projects/abc/planning.7z")

Download JSON

metadata = service.download_json("projects/abc/project.meta.json")
print(metadata["name"])

Download to Local File

from pathlib import Path

service.download_to_file(
    "projects/abc/planning.7z",
    Path("C:/temp/planning.7z"),
)

Listing Objects

List All Objects Under a Prefix

service = BucketService()

# Returns list of dicts with: key, size, last_modified, etag
objects = service.list_objects("projects/abc/planning/")
for obj in objects:
    print(f"{obj['key']} ({obj['size']} bytes)")

Discover Subdirectories

# List release versions without downloading all objects
prefixes = service.list_common_prefixes("projects/abc/releases/v")
# Returns: ["projects/abc/releases/v1.0/", "projects/abc/releases/v2.0/"]

Delete, Exists, Metadata

Delete an Object

service = BucketService()

# Idempotent - no error if object does not exist
service.delete_object("projects/abc/old_backup.7z")

Check if Object Exists

if service.object_exists("projects/abc/planning.7z"):
    print("Archive exists")

Get Object Metadata

# Returns: etag, content_length, last_modified, content_type
meta = service.get_object_metadata("projects/abc/planning.7z")
print(f"Size: {meta['content_length']} bytes")
print(f"ETag: {meta['etag']}")

Copying Objects

Server-Side Copy

service = BucketService()

# Create a backup copy (no download/upload, server-side only)
service.copy_object(
    "projects/abc/planning/planning.7z",
    "projects/abc/planning/planning_backup.7z",
)

Lock Primitives

Atomic lock operations for concurrent access control. Uses IfNoneMatch="*" for safe lock creation.

Acquire a Lock

from src.shared_services.cloud_com.r2_com.api import BucketService, R2LockError

service = BucketService()

lock_data = {
    "user": "admin",
    "since": "2025-01-15T10:00:00",
    "machine": "WORKSTATION-01",
    "expires": "2025-01-15T22:00:00",
}

try:
    service.acquire_lock("projects/abc/planning/planning.lock.json", lock_data)
    print("Lock acquired")
except R2LockError as e:
    print(f"Lock held by: {e.lock_holder}")

Release a Lock

# Idempotent - no error if lock does not exist
service.release_lock("projects/abc/planning/planning.lock.json")

Check Lock Status

lock = service.check_lock("projects/abc/planning/planning.lock.json")

if lock is None:
    print("No active lock")
else:
    print(f"Locked by {lock['user']} since {lock['since']}")

Exception Handling

All exceptions inherit from R2Error, allowing a single except clause for all R2 errors.

Exception Hierarchy

Exception Description
R2Error Base exception for all R2 operations
R2AuthenticationError Invalid or missing R2 credentials
R2ConnectionError Network or endpoint failures
R2BucketError Bucket does not exist or access denied
R2ObjectNotFoundError Object key does not exist
R2LockError Lock already held (.lock_holder attribute)
R2ClientNotInitializedError Client used before initialization

Basic Error Handling

from src.shared_services.cloud_com.r2_com.api import BucketService, R2Error

service = BucketService()

try:
    data = service.download_json("projects/abc/metadata.json")
except R2Error as e:
    print(f"R2 operation failed: {e}")

Specific Error Handling

from src.shared_services.cloud_com.r2_com.api import (
    BucketService,
    R2ObjectNotFoundError,
    R2AuthenticationError,
    R2ConnectionError,
    R2LockError,
)

service = BucketService()

try:
    data = service.download_json("projects/abc/metadata.json")
except R2ObjectNotFoundError:
    print("Object does not exist")
except R2AuthenticationError:
    print("Invalid credentials - re-authenticate")
except R2ConnectionError:
    print("Network error - check internet connection")

Full Import Reference

from src.shared_services.cloud_com.r2_com.api import (
    # Initialization
    initialize_r2,
    get_r2_client,
    test_r2_connection,
    is_r2_online,
    is_r2_initialized,

    # Services
    BucketService,

    # Exceptions
    R2Error,
    R2AuthenticationError,
    R2BucketError,
    R2ClientNotInitializedError,
    R2ConnectionError,
    R2LockError,
    R2ObjectNotFoundError,
)

Configuration

R2 endpoint and bucket are configured in src/shared_services/constants/cloud.py:

Constant Description
R2_ENDPOINT Cloudflare R2 API endpoint
R2_BUCKET_NAME Active bucket name
R2_PLANT_DESIGN_PROJECTS Base path for plant design projects
R2_PLANT_DESIGN_LIBRARIES Base path for libraries
R2_PROP_DIALOGS Base path for property dialogs

Credentials are managed by R2KeyManager in src/shared_services/security/r2_key_manager.py and are never written to disk.

API Reference

src.shared_services.cloud_com.r2_com.api

Public API for R2 cloud storage communication service.

USAGE

from src.shared_services.cloud_com.r2_com.api import ( initialize_r2, get_r2_client, test_r2_connection, is_r2_online, is_r2_initialized, BucketService, )

Initialize at app startup (after R2KeyManager has fetched keys)

initialize_r2()

Check connection

if is_r2_online(): print("R2 storage is reachable")

Use BucketService for common operations

service = BucketService() service.upload_json("config/settings.json", {"theme": "dark"}) data = service.download_json("config/settings.json")

BucketService

Stateless service for generic R2 bucket operations.

Uses the R2Client singleton for all cloud access. Each method translates botocore errors into R2-specific exceptions.

Example

service = BucketService()

Upload and download JSON

service.upload_json("config/settings.json", {"theme": "dark"}) settings = service.download_json("config/settings.json")

Lock primitives

lock_data = {"user": "admin", "since": "2025-01-01T00:00:00"} service.acquire_lock("project/planning.lock.json", lock_data) service.release_lock("project/planning.lock.json")

Source code in src\shared_services\cloud_com\r2_com\bucket_service.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
class BucketService:
    """
    Stateless service for generic R2 bucket operations.

    Uses the R2Client singleton for all cloud access. Each method
    translates botocore errors into R2-specific exceptions.

    Example:
        service = BucketService()

        # Upload and download JSON
        service.upload_json("config/settings.json", {"theme": "dark"})
        settings = service.download_json("config/settings.json")

        # Lock primitives
        lock_data = {"user": "admin", "since": "2025-01-01T00:00:00"}
        service.acquire_lock("project/planning.lock.json", lock_data)
        service.release_lock("project/planning.lock.json")
    """

    @staticmethod
    def _client() -> R2Client:
        """Get the R2Client singleton (ensures initialized)."""
        client = R2Client.instance()
        if not client.is_initialized:
            raise R2ClientNotInitializedError(
                "R2 client not initialized. Call initialize_r2() first."
            )
        return client

    # =========================================================================
    # UPLOAD OPERATIONS
    # =========================================================================

    def upload_bytes(
        self,
        key: str,
        data: bytes,
        content_type: str = "application/octet-stream",
    ) -> None:
        """
        Upload raw bytes to an R2 key.

        Args:
            key: The R2 object key (path).
            data: The bytes to upload.
            content_type: MIME type for the object.

        Raises:
            R2Error: If the upload fails.
        """
        client = self._client()
        try:
            client.boto3_client.put_object(
                Bucket=client.bucket_name,
                Key=key,
                Body=data,
                ContentType=content_type,
            )
            client.mark_online()
        except ClientError as e:
            raise client.translate_client_error(e) from e
        except (EndpointConnectionError, ConnectionError) as e:
            client.mark_offline(str(e))
            raise R2ConnectionError(f"Connection failed during upload: {e}") from e

    def upload_json(self, key: str, data: Any) -> None:
        """
        Serialize data as JSON and upload to an R2 key.

        Args:
            key: The R2 object key (path).
            data: JSON-serializable data.

        Raises:
            R2Error: If the upload fails.
        """
        json_bytes = json.dumps(data, indent=2, ensure_ascii=False).encode("utf-8")
        self.upload_bytes(key, json_bytes, content_type="application/json")

    # =========================================================================
    # DOWNLOAD OPERATIONS
    # =========================================================================

    def download_bytes(self, key: str) -> bytes:
        """
        Download raw bytes from an R2 key.

        Args:
            key: The R2 object key (path).

        Returns:
            The object contents as bytes.

        Raises:
            R2ObjectNotFoundError: If the key does not exist.
            R2Error: If the download fails.
        """
        client = self._client()
        try:
            response = client.boto3_client.get_object(
                Bucket=client.bucket_name,
                Key=key,
            )
            data = response["Body"].read()
            client.mark_online()
            return data
        except ClientError as e:
            error_code = e.response.get("Error", {}).get("Code", "")
            if error_code in ("NoSuchKey", "404"):
                raise R2ObjectNotFoundError(f"Object not found: {key}") from e
            raise client.translate_client_error(e) from e
        except (EndpointConnectionError, ConnectionError) as e:
            client.mark_offline(str(e))
            raise R2ConnectionError(f"Connection failed during download: {e}") from e

    def download_json(self, key: str) -> Any:
        """
        Download and deserialize JSON from an R2 key.

        Args:
            key: The R2 object key (path).

        Returns:
            The deserialized JSON data.

        Raises:
            R2ObjectNotFoundError: If the key does not exist.
            R2Error: If the download or deserialization fails.
        """
        data = self.download_bytes(key)
        return json.loads(data.decode("utf-8"))

    def download_to_file(self, key: str, local_path: Path) -> None:
        """
        Download an R2 object directly to a local file.

        Creates parent directories if they do not exist.

        Args:
            key: The R2 object key (path).
            local_path: Local file path to write to.

        Raises:
            R2ObjectNotFoundError: If the key does not exist.
            R2Error: If the download fails.
        """
        client = self._client()
        local_path.parent.mkdir(parents=True, exist_ok=True)
        try:
            client.boto3_client.download_file(
                Bucket=client.bucket_name,
                Key=key,
                Filename=str(local_path),
            )
            client.mark_online()
        except ClientError as e:
            error_code = e.response.get("Error", {}).get("Code", "")
            if error_code in ("NoSuchKey", "404"):
                raise R2ObjectNotFoundError(f"Object not found: {key}") from e
            raise client.translate_client_error(e) from e
        except (EndpointConnectionError, ConnectionError) as e:
            client.mark_offline(str(e))
            raise R2ConnectionError(f"Connection failed during download: {e}") from e

    # =========================================================================
    # DELETE / EXISTS / METADATA
    # =========================================================================

    def delete_object(self, key: str) -> None:
        """
        Delete an object from R2.

        Note: R2/S3 delete is idempotent - deleting a non-existent key
        does not raise an error.

        Args:
            key: The R2 object key (path).

        Raises:
            R2Error: If the delete fails.
        """
        client = self._client()
        try:
            client.boto3_client.delete_object(
                Bucket=client.bucket_name,
                Key=key,
            )
            client.mark_online()
        except ClientError as e:
            raise client.translate_client_error(e) from e
        except (EndpointConnectionError, ConnectionError) as e:
            client.mark_offline(str(e))
            raise R2ConnectionError(f"Connection failed during delete: {e}") from e

    def object_exists(self, key: str) -> bool:
        """
        Check if an object exists in R2.

        Args:
            key: The R2 object key (path).

        Returns:
            True if the object exists.

        Raises:
            R2Error: If the check fails (network error, auth error).
        """
        client = self._client()
        try:
            client.boto3_client.head_object(
                Bucket=client.bucket_name,
                Key=key,
            )
            client.mark_online()
            return True
        except ClientError as e:
            error_code = e.response.get("Error", {}).get("Code", "")
            if error_code in ("404", "NoSuchKey"):
                client.mark_online()
                return False
            raise client.translate_client_error(e) from e
        except (EndpointConnectionError, ConnectionError) as e:
            client.mark_offline(str(e))
            raise R2ConnectionError(f"Connection failed during existence check: {e}") from e

    def get_object_metadata(self, key: str) -> Dict[str, Any]:
        """
        Get metadata for an R2 object (ETag, ContentLength, LastModified).

        Args:
            key: The R2 object key (path).

        Returns:
            Dict with keys: etag, content_length, last_modified, content_type.

        Raises:
            R2ObjectNotFoundError: If the key does not exist.
            R2Error: If the request fails.
        """
        client = self._client()
        try:
            response = client.boto3_client.head_object(
                Bucket=client.bucket_name,
                Key=key,
            )
            client.mark_online()
            return {
                "etag": response.get("ETag", "").strip('"'),
                "content_length": response.get("ContentLength", 0),
                "last_modified": response.get("LastModified"),
                "content_type": response.get("ContentType", ""),
            }
        except ClientError as e:
            error_code = e.response.get("Error", {}).get("Code", "")
            if error_code in ("404", "NoSuchKey"):
                raise R2ObjectNotFoundError(f"Object not found: {key}") from e
            raise client.translate_client_error(e) from e
        except (EndpointConnectionError, ConnectionError) as e:
            client.mark_offline(str(e))
            raise R2ConnectionError(f"Connection failed during metadata fetch: {e}") from e

    # =========================================================================
    # LIST OPERATIONS
    # =========================================================================

    def list_objects(
        self,
        prefix: str,
        delimiter: str = "",
    ) -> List[Dict[str, Any]]:
        """
        List all objects under a prefix, handling pagination automatically.

        Args:
            prefix: The key prefix to list under.
            delimiter: Optional delimiter for grouping (e.g. "/" for directory-like listing).

        Returns:
            List of dicts, each with: key, size, last_modified, etag.

        Raises:
            R2Error: If the list operation fails.
        """
        client = self._client()
        results = []

        try:
            paginator = client.boto3_client.get_paginator("list_objects_v2")
            paginate_kwargs = {
                "Bucket": client.bucket_name,
                "Prefix": prefix,
            }
            if delimiter:
                paginate_kwargs["Delimiter"] = delimiter

            for page in paginator.paginate(**paginate_kwargs):
                for obj in page.get("Contents", []):
                    results.append({
                        "key": obj["Key"],
                        "size": obj.get("Size", 0),
                        "last_modified": obj.get("LastModified"),
                        "etag": obj.get("ETag", "").strip('"'),
                    })

            client.mark_online()
            return results

        except ClientError as e:
            raise client.translate_client_error(e) from e
        except (EndpointConnectionError, ConnectionError) as e:
            client.mark_offline(str(e))
            raise R2ConnectionError(f"Connection failed during list: {e}") from e

    def list_common_prefixes(
        self,
        prefix: str,
        delimiter: str = "/",
    ) -> List[str]:
        """
        List common prefixes (directory-like groupings) under a prefix.

        Useful for discovering subdirectories without listing all objects.
        For example, listing releases/v returns ["releases/v1.0/", "releases/v2.0/"].

        Args:
            prefix: The key prefix to list under.
            delimiter: The delimiter for grouping (default "/").

        Returns:
            List of common prefix strings.

        Raises:
            R2Error: If the list operation fails.
        """
        client = self._client()
        prefixes = []

        try:
            paginator = client.boto3_client.get_paginator("list_objects_v2")
            for page in paginator.paginate(
                Bucket=client.bucket_name,
                Prefix=prefix,
                Delimiter=delimiter,
            ):
                for cp in page.get("CommonPrefixes", []):
                    prefixes.append(cp["Prefix"])

            client.mark_online()
            return prefixes

        except ClientError as e:
            raise client.translate_client_error(e) from e
        except (EndpointConnectionError, ConnectionError) as e:
            client.mark_offline(str(e))
            raise R2ConnectionError(f"Connection failed during prefix list: {e}") from e

    # =========================================================================
    # COPY OPERATIONS
    # =========================================================================

    def copy_object(self, source_key: str, dest_key: str) -> None:
        """
        Copy an object within the same bucket (server-side).

        Args:
            source_key: Source object key.
            dest_key: Destination object key.

        Raises:
            R2ObjectNotFoundError: If the source key does not exist.
            R2Error: If the copy fails.
        """
        client = self._client()
        try:
            client.boto3_client.copy_object(
                Bucket=client.bucket_name,
                CopySource=f"{client.bucket_name}/{source_key}",
                Key=dest_key,
            )
            client.mark_online()
        except ClientError as e:
            error_code = e.response.get("Error", {}).get("Code", "")
            if error_code in ("NoSuchKey", "404"):
                raise R2ObjectNotFoundError(f"Source object not found: {source_key}") from e
            raise client.translate_client_error(e) from e
        except (EndpointConnectionError, ConnectionError) as e:
            client.mark_offline(str(e))
            raise R2ConnectionError(f"Connection failed during copy: {e}") from e

    # =========================================================================
    # LOCK PRIMITIVES
    # =========================================================================

    def acquire_lock(
        self,
        lock_key: str,
        lock_data: Dict[str, Any],
    ) -> None:
        """
        Atomically create a lock file in R2.

        Uses ``IfNoneMatch="*"`` to ensure the lock only succeeds if no lock
        file exists yet. This is an atomic compare-and-create operation.

        Args:
            lock_key: The R2 key for the lock file (e.g. "projects/abc/planning.lock.json").
            lock_data: Lock metadata to store (user, since, machine, expires, etc.).

        Raises:
            R2LockError: If the lock is already held by another user.
            R2Error: If the operation fails for other reasons.
        """
        client = self._client()
        json_body = json.dumps(lock_data, indent=2, ensure_ascii=False).encode("utf-8")

        try:
            client.boto3_client.put_object(
                Bucket=client.bucket_name,
                Key=lock_key,
                Body=json_body,
                ContentType="application/json",
                IfNoneMatch="*",
            )
            client.mark_online()

        except ClientError as e:
            error_code = e.response.get("Error", {}).get("Code", "")
            if error_code == "PreconditionFailed":
                # Lock already exists - try to read who holds it
                try:
                    existing_lock = self._read_existing_lock(lock_key)
                except R2Error:
                    existing_lock = None
                lock_holder = existing_lock.get("locked_by") if existing_lock else None
                raise R2LockError(
                    message=f"Lock already held: {lock_key}",
                    lock_holder=lock_holder,
                ) from e
            raise client.translate_client_error(e) from e
        except (EndpointConnectionError, ConnectionError) as e:
            client.mark_offline(str(e))
            raise R2ConnectionError(f"Connection failed during lock acquire: {e}") from e

    def release_lock(self, lock_key: str) -> None:
        """
        Release a lock by deleting the lock file from R2.

        This is idempotent - releasing a non-existent lock does not raise.

        Args:
            lock_key: The R2 key for the lock file.

        Raises:
            R2Error: If the delete fails.
        """
        self.delete_object(lock_key)

    def check_lock(self, lock_key: str) -> Optional[Dict[str, Any]]:
        """
        Check if a lock exists and return its contents.

        Args:
            lock_key: The R2 key for the lock file.

        Returns:
            Lock data dict if lock exists, None if no lock.

        Raises:
            R2Error: If the check fails (network error, auth error).
        """
        return self._read_existing_lock(lock_key)

    def _read_existing_lock(self, lock_key: str) -> Optional[Dict[str, Any]]:
        """Read an existing lock file, returning None if it does not exist.

        Raises:
            R2ConnectionError: If the read fails due to a network error.
            R2Error: If the read fails for non-404 reasons.
        """
        try:
            return self.download_json(lock_key)
        except R2ObjectNotFoundError:
            return None
acquire_lock(lock_key, lock_data)

Atomically create a lock file in R2.

Uses IfNoneMatch="*" to ensure the lock only succeeds if no lock file exists yet. This is an atomic compare-and-create operation.

Parameters:

Name Type Description Default
lock_key str

The R2 key for the lock file (e.g. "projects/abc/planning.lock.json").

required
lock_data Dict[str, Any]

Lock metadata to store (user, since, machine, expires, etc.).

required

Raises:

Type Description
R2LockError

If the lock is already held by another user.

R2Error

If the operation fails for other reasons.

Source code in src\shared_services\cloud_com\r2_com\bucket_service.py
def acquire_lock(
    self,
    lock_key: str,
    lock_data: Dict[str, Any],
) -> None:
    """
    Atomically create a lock file in R2.

    Uses ``IfNoneMatch="*"`` to ensure the lock only succeeds if no lock
    file exists yet. This is an atomic compare-and-create operation.

    Args:
        lock_key: The R2 key for the lock file (e.g. "projects/abc/planning.lock.json").
        lock_data: Lock metadata to store (user, since, machine, expires, etc.).

    Raises:
        R2LockError: If the lock is already held by another user.
        R2Error: If the operation fails for other reasons.
    """
    client = self._client()
    json_body = json.dumps(lock_data, indent=2, ensure_ascii=False).encode("utf-8")

    try:
        client.boto3_client.put_object(
            Bucket=client.bucket_name,
            Key=lock_key,
            Body=json_body,
            ContentType="application/json",
            IfNoneMatch="*",
        )
        client.mark_online()

    except ClientError as e:
        error_code = e.response.get("Error", {}).get("Code", "")
        if error_code == "PreconditionFailed":
            # Lock already exists - try to read who holds it
            try:
                existing_lock = self._read_existing_lock(lock_key)
            except R2Error:
                existing_lock = None
            lock_holder = existing_lock.get("locked_by") if existing_lock else None
            raise R2LockError(
                message=f"Lock already held: {lock_key}",
                lock_holder=lock_holder,
            ) from e
        raise client.translate_client_error(e) from e
    except (EndpointConnectionError, ConnectionError) as e:
        client.mark_offline(str(e))
        raise R2ConnectionError(f"Connection failed during lock acquire: {e}") from e
check_lock(lock_key)

Check if a lock exists and return its contents.

Parameters:

Name Type Description Default
lock_key str

The R2 key for the lock file.

required

Returns:

Type Description
Optional[Dict[str, Any]]

Lock data dict if lock exists, None if no lock.

Raises:

Type Description
R2Error

If the check fails (network error, auth error).

Source code in src\shared_services\cloud_com\r2_com\bucket_service.py
def check_lock(self, lock_key: str) -> Optional[Dict[str, Any]]:
    """
    Check if a lock exists and return its contents.

    Args:
        lock_key: The R2 key for the lock file.

    Returns:
        Lock data dict if lock exists, None if no lock.

    Raises:
        R2Error: If the check fails (network error, auth error).
    """
    return self._read_existing_lock(lock_key)
copy_object(source_key, dest_key)

Copy an object within the same bucket (server-side).

Parameters:

Name Type Description Default
source_key str

Source object key.

required
dest_key str

Destination object key.

required

Raises:

Type Description
R2ObjectNotFoundError

If the source key does not exist.

R2Error

If the copy fails.

Source code in src\shared_services\cloud_com\r2_com\bucket_service.py
def copy_object(self, source_key: str, dest_key: str) -> None:
    """
    Copy an object within the same bucket (server-side).

    Args:
        source_key: Source object key.
        dest_key: Destination object key.

    Raises:
        R2ObjectNotFoundError: If the source key does not exist.
        R2Error: If the copy fails.
    """
    client = self._client()
    try:
        client.boto3_client.copy_object(
            Bucket=client.bucket_name,
            CopySource=f"{client.bucket_name}/{source_key}",
            Key=dest_key,
        )
        client.mark_online()
    except ClientError as e:
        error_code = e.response.get("Error", {}).get("Code", "")
        if error_code in ("NoSuchKey", "404"):
            raise R2ObjectNotFoundError(f"Source object not found: {source_key}") from e
        raise client.translate_client_error(e) from e
    except (EndpointConnectionError, ConnectionError) as e:
        client.mark_offline(str(e))
        raise R2ConnectionError(f"Connection failed during copy: {e}") from e
delete_object(key)

Delete an object from R2.

Note: R2/S3 delete is idempotent - deleting a non-existent key does not raise an error.

Parameters:

Name Type Description Default
key str

The R2 object key (path).

required

Raises:

Type Description
R2Error

If the delete fails.

Source code in src\shared_services\cloud_com\r2_com\bucket_service.py
def delete_object(self, key: str) -> None:
    """
    Delete an object from R2.

    Note: R2/S3 delete is idempotent - deleting a non-existent key
    does not raise an error.

    Args:
        key: The R2 object key (path).

    Raises:
        R2Error: If the delete fails.
    """
    client = self._client()
    try:
        client.boto3_client.delete_object(
            Bucket=client.bucket_name,
            Key=key,
        )
        client.mark_online()
    except ClientError as e:
        raise client.translate_client_error(e) from e
    except (EndpointConnectionError, ConnectionError) as e:
        client.mark_offline(str(e))
        raise R2ConnectionError(f"Connection failed during delete: {e}") from e
download_bytes(key)

Download raw bytes from an R2 key.

Parameters:

Name Type Description Default
key str

The R2 object key (path).

required

Returns:

Type Description
bytes

The object contents as bytes.

Raises:

Type Description
R2ObjectNotFoundError

If the key does not exist.

R2Error

If the download fails.

Source code in src\shared_services\cloud_com\r2_com\bucket_service.py
def download_bytes(self, key: str) -> bytes:
    """
    Download raw bytes from an R2 key.

    Args:
        key: The R2 object key (path).

    Returns:
        The object contents as bytes.

    Raises:
        R2ObjectNotFoundError: If the key does not exist.
        R2Error: If the download fails.
    """
    client = self._client()
    try:
        response = client.boto3_client.get_object(
            Bucket=client.bucket_name,
            Key=key,
        )
        data = response["Body"].read()
        client.mark_online()
        return data
    except ClientError as e:
        error_code = e.response.get("Error", {}).get("Code", "")
        if error_code in ("NoSuchKey", "404"):
            raise R2ObjectNotFoundError(f"Object not found: {key}") from e
        raise client.translate_client_error(e) from e
    except (EndpointConnectionError, ConnectionError) as e:
        client.mark_offline(str(e))
        raise R2ConnectionError(f"Connection failed during download: {e}") from e
download_json(key)

Download and deserialize JSON from an R2 key.

Parameters:

Name Type Description Default
key str

The R2 object key (path).

required

Returns:

Type Description
Any

The deserialized JSON data.

Raises:

Type Description
R2ObjectNotFoundError

If the key does not exist.

R2Error

If the download or deserialization fails.

Source code in src\shared_services\cloud_com\r2_com\bucket_service.py
def download_json(self, key: str) -> Any:
    """
    Download and deserialize JSON from an R2 key.

    Args:
        key: The R2 object key (path).

    Returns:
        The deserialized JSON data.

    Raises:
        R2ObjectNotFoundError: If the key does not exist.
        R2Error: If the download or deserialization fails.
    """
    data = self.download_bytes(key)
    return json.loads(data.decode("utf-8"))
download_to_file(key, local_path)

Download an R2 object directly to a local file.

Creates parent directories if they do not exist.

Parameters:

Name Type Description Default
key str

The R2 object key (path).

required
local_path Path

Local file path to write to.

required

Raises:

Type Description
R2ObjectNotFoundError

If the key does not exist.

R2Error

If the download fails.

Source code in src\shared_services\cloud_com\r2_com\bucket_service.py
def download_to_file(self, key: str, local_path: Path) -> None:
    """
    Download an R2 object directly to a local file.

    Creates parent directories if they do not exist.

    Args:
        key: The R2 object key (path).
        local_path: Local file path to write to.

    Raises:
        R2ObjectNotFoundError: If the key does not exist.
        R2Error: If the download fails.
    """
    client = self._client()
    local_path.parent.mkdir(parents=True, exist_ok=True)
    try:
        client.boto3_client.download_file(
            Bucket=client.bucket_name,
            Key=key,
            Filename=str(local_path),
        )
        client.mark_online()
    except ClientError as e:
        error_code = e.response.get("Error", {}).get("Code", "")
        if error_code in ("NoSuchKey", "404"):
            raise R2ObjectNotFoundError(f"Object not found: {key}") from e
        raise client.translate_client_error(e) from e
    except (EndpointConnectionError, ConnectionError) as e:
        client.mark_offline(str(e))
        raise R2ConnectionError(f"Connection failed during download: {e}") from e
get_object_metadata(key)

Get metadata for an R2 object (ETag, ContentLength, LastModified).

Parameters:

Name Type Description Default
key str

The R2 object key (path).

required

Returns:

Type Description
Dict[str, Any]

Dict with keys: etag, content_length, last_modified, content_type.

Raises:

Type Description
R2ObjectNotFoundError

If the key does not exist.

R2Error

If the request fails.

Source code in src\shared_services\cloud_com\r2_com\bucket_service.py
def get_object_metadata(self, key: str) -> Dict[str, Any]:
    """
    Get metadata for an R2 object (ETag, ContentLength, LastModified).

    Args:
        key: The R2 object key (path).

    Returns:
        Dict with keys: etag, content_length, last_modified, content_type.

    Raises:
        R2ObjectNotFoundError: If the key does not exist.
        R2Error: If the request fails.
    """
    client = self._client()
    try:
        response = client.boto3_client.head_object(
            Bucket=client.bucket_name,
            Key=key,
        )
        client.mark_online()
        return {
            "etag": response.get("ETag", "").strip('"'),
            "content_length": response.get("ContentLength", 0),
            "last_modified": response.get("LastModified"),
            "content_type": response.get("ContentType", ""),
        }
    except ClientError as e:
        error_code = e.response.get("Error", {}).get("Code", "")
        if error_code in ("404", "NoSuchKey"):
            raise R2ObjectNotFoundError(f"Object not found: {key}") from e
        raise client.translate_client_error(e) from e
    except (EndpointConnectionError, ConnectionError) as e:
        client.mark_offline(str(e))
        raise R2ConnectionError(f"Connection failed during metadata fetch: {e}") from e
list_common_prefixes(prefix, delimiter='/')

List common prefixes (directory-like groupings) under a prefix.

Useful for discovering subdirectories without listing all objects. For example, listing releases/v returns ["releases/v1.0/", "releases/v2.0/"].

Parameters:

Name Type Description Default
prefix str

The key prefix to list under.

required
delimiter str

The delimiter for grouping (default "/").

'/'

Returns:

Type Description
List[str]

List of common prefix strings.

Raises:

Type Description
R2Error

If the list operation fails.

Source code in src\shared_services\cloud_com\r2_com\bucket_service.py
def list_common_prefixes(
    self,
    prefix: str,
    delimiter: str = "/",
) -> List[str]:
    """
    List common prefixes (directory-like groupings) under a prefix.

    Useful for discovering subdirectories without listing all objects.
    For example, listing releases/v returns ["releases/v1.0/", "releases/v2.0/"].

    Args:
        prefix: The key prefix to list under.
        delimiter: The delimiter for grouping (default "/").

    Returns:
        List of common prefix strings.

    Raises:
        R2Error: If the list operation fails.
    """
    client = self._client()
    prefixes = []

    try:
        paginator = client.boto3_client.get_paginator("list_objects_v2")
        for page in paginator.paginate(
            Bucket=client.bucket_name,
            Prefix=prefix,
            Delimiter=delimiter,
        ):
            for cp in page.get("CommonPrefixes", []):
                prefixes.append(cp["Prefix"])

        client.mark_online()
        return prefixes

    except ClientError as e:
        raise client.translate_client_error(e) from e
    except (EndpointConnectionError, ConnectionError) as e:
        client.mark_offline(str(e))
        raise R2ConnectionError(f"Connection failed during prefix list: {e}") from e
list_objects(prefix, delimiter='')

List all objects under a prefix, handling pagination automatically.

Parameters:

Name Type Description Default
prefix str

The key prefix to list under.

required
delimiter str

Optional delimiter for grouping (e.g. "/" for directory-like listing).

''

Returns:

Type Description
List[Dict[str, Any]]

List of dicts, each with: key, size, last_modified, etag.

Raises:

Type Description
R2Error

If the list operation fails.

Source code in src\shared_services\cloud_com\r2_com\bucket_service.py
def list_objects(
    self,
    prefix: str,
    delimiter: str = "",
) -> List[Dict[str, Any]]:
    """
    List all objects under a prefix, handling pagination automatically.

    Args:
        prefix: The key prefix to list under.
        delimiter: Optional delimiter for grouping (e.g. "/" for directory-like listing).

    Returns:
        List of dicts, each with: key, size, last_modified, etag.

    Raises:
        R2Error: If the list operation fails.
    """
    client = self._client()
    results = []

    try:
        paginator = client.boto3_client.get_paginator("list_objects_v2")
        paginate_kwargs = {
            "Bucket": client.bucket_name,
            "Prefix": prefix,
        }
        if delimiter:
            paginate_kwargs["Delimiter"] = delimiter

        for page in paginator.paginate(**paginate_kwargs):
            for obj in page.get("Contents", []):
                results.append({
                    "key": obj["Key"],
                    "size": obj.get("Size", 0),
                    "last_modified": obj.get("LastModified"),
                    "etag": obj.get("ETag", "").strip('"'),
                })

        client.mark_online()
        return results

    except ClientError as e:
        raise client.translate_client_error(e) from e
    except (EndpointConnectionError, ConnectionError) as e:
        client.mark_offline(str(e))
        raise R2ConnectionError(f"Connection failed during list: {e}") from e
object_exists(key)

Check if an object exists in R2.

Parameters:

Name Type Description Default
key str

The R2 object key (path).

required

Returns:

Type Description
bool

True if the object exists.

Raises:

Type Description
R2Error

If the check fails (network error, auth error).

Source code in src\shared_services\cloud_com\r2_com\bucket_service.py
def object_exists(self, key: str) -> bool:
    """
    Check if an object exists in R2.

    Args:
        key: The R2 object key (path).

    Returns:
        True if the object exists.

    Raises:
        R2Error: If the check fails (network error, auth error).
    """
    client = self._client()
    try:
        client.boto3_client.head_object(
            Bucket=client.bucket_name,
            Key=key,
        )
        client.mark_online()
        return True
    except ClientError as e:
        error_code = e.response.get("Error", {}).get("Code", "")
        if error_code in ("404", "NoSuchKey"):
            client.mark_online()
            return False
        raise client.translate_client_error(e) from e
    except (EndpointConnectionError, ConnectionError) as e:
        client.mark_offline(str(e))
        raise R2ConnectionError(f"Connection failed during existence check: {e}") from e
release_lock(lock_key)

Release a lock by deleting the lock file from R2.

This is idempotent - releasing a non-existent lock does not raise.

Parameters:

Name Type Description Default
lock_key str

The R2 key for the lock file.

required

Raises:

Type Description
R2Error

If the delete fails.

Source code in src\shared_services\cloud_com\r2_com\bucket_service.py
def release_lock(self, lock_key: str) -> None:
    """
    Release a lock by deleting the lock file from R2.

    This is idempotent - releasing a non-existent lock does not raise.

    Args:
        lock_key: The R2 key for the lock file.

    Raises:
        R2Error: If the delete fails.
    """
    self.delete_object(lock_key)
upload_bytes(key, data, content_type='application/octet-stream')

Upload raw bytes to an R2 key.

Parameters:

Name Type Description Default
key str

The R2 object key (path).

required
data bytes

The bytes to upload.

required
content_type str

MIME type for the object.

'application/octet-stream'

Raises:

Type Description
R2Error

If the upload fails.

Source code in src\shared_services\cloud_com\r2_com\bucket_service.py
def upload_bytes(
    self,
    key: str,
    data: bytes,
    content_type: str = "application/octet-stream",
) -> None:
    """
    Upload raw bytes to an R2 key.

    Args:
        key: The R2 object key (path).
        data: The bytes to upload.
        content_type: MIME type for the object.

    Raises:
        R2Error: If the upload fails.
    """
    client = self._client()
    try:
        client.boto3_client.put_object(
            Bucket=client.bucket_name,
            Key=key,
            Body=data,
            ContentType=content_type,
        )
        client.mark_online()
    except ClientError as e:
        raise client.translate_client_error(e) from e
    except (EndpointConnectionError, ConnectionError) as e:
        client.mark_offline(str(e))
        raise R2ConnectionError(f"Connection failed during upload: {e}") from e
upload_json(key, data)

Serialize data as JSON and upload to an R2 key.

Parameters:

Name Type Description Default
key str

The R2 object key (path).

required
data Any

JSON-serializable data.

required

Raises:

Type Description
R2Error

If the upload fails.

Source code in src\shared_services\cloud_com\r2_com\bucket_service.py
def upload_json(self, key: str, data: Any) -> None:
    """
    Serialize data as JSON and upload to an R2 key.

    Args:
        key: The R2 object key (path).
        data: JSON-serializable data.

    Raises:
        R2Error: If the upload fails.
    """
    json_bytes = json.dumps(data, indent=2, ensure_ascii=False).encode("utf-8")
    self.upload_bytes(key, json_bytes, content_type="application/json")

R2AuthenticationError

Bases: R2Error

Raised when R2 credentials are missing or invalid.

This occurs when: - R2KeyManager has no keys loaded - Access key or secret key is invalid - Credentials have been revoked

Source code in src\shared_services\cloud_com\r2_com\exceptions.py
class R2AuthenticationError(R2Error):
    """
    Raised when R2 credentials are missing or invalid.

    This occurs when:
    - R2KeyManager has no keys loaded
    - Access key or secret key is invalid
    - Credentials have been revoked
    """

    pass

R2BucketError

Bases: R2Error

Raised for bucket-level errors.

This occurs when: - Bucket does not exist - Access to bucket is denied

Source code in src\shared_services\cloud_com\r2_com\exceptions.py
class R2BucketError(R2Error):
    """
    Raised for bucket-level errors.

    This occurs when:
    - Bucket does not exist
    - Access to bucket is denied
    """

    pass

R2ClientNotInitializedError

Bases: R2Error

Raised when R2 client is used before initialization.

Call initialize_r2() before using the client.

Example

from src.shared_services.cloud_com.r2_com.api import initialize_r2

initialize_r2() client = get_r2_client()

Source code in src\shared_services\cloud_com\r2_com\exceptions.py
class R2ClientNotInitializedError(R2Error):
    """
    Raised when R2 client is used before initialization.

    Call initialize_r2() before using the client.

    Example:
        from src.shared_services.cloud_com.r2_com.api import initialize_r2

        initialize_r2()
        client = get_r2_client()
    """

    pass

R2ConnectionError

Bases: R2Error

Raised when R2 endpoint is unreachable.

This occurs when: - No internet connection - R2 endpoint is down - Request times out

Source code in src\shared_services\cloud_com\r2_com\exceptions.py
class R2ConnectionError(R2Error):
    """
    Raised when R2 endpoint is unreachable.

    This occurs when:
    - No internet connection
    - R2 endpoint is down
    - Request times out
    """

    pass

R2Error

Bases: Exception

Base exception for R2 operations.

All R2-related exceptions inherit from this class, allowing callers to catch all R2 errors with a single except clause.

Example

try: client.upload(...) except R2Error as e: print(f"R2 operation failed: {e}")

Source code in src\shared_services\cloud_com\r2_com\exceptions.py
class R2Error(Exception):
    """
    Base exception for R2 operations.

    All R2-related exceptions inherit from this class,
    allowing callers to catch all R2 errors with a single except clause.

    Example:
        try:
            client.upload(...)
        except R2Error as e:
            print(f"R2 operation failed: {e}")
    """

    pass

R2LockError

Bases: R2Error

Raised when a lock operation fails.

This occurs when: - Lock is already held by another user - Lock acquisition times out - Lock file is corrupted

Attributes:

Name Type Description
lock_holder

Username of the current lock holder, if known.

Source code in src\shared_services\cloud_com\r2_com\exceptions.py
class R2LockError(R2Error):
    """
    Raised when a lock operation fails.

    This occurs when:
    - Lock is already held by another user
    - Lock acquisition times out
    - Lock file is corrupted

    Attributes:
        lock_holder: Username of the current lock holder, if known.
    """

    def __init__(
        self,
        message: str = "Lock operation failed",
        lock_holder: Optional[str] = None,
    ) -> None:
        super().__init__(message)
        self.lock_holder = lock_holder

R2ObjectNotFoundError

Bases: R2Error

Raised when an object key does not exist in the bucket.

Example

try: client.head_object(key) except R2ObjectNotFoundError: print("Object not found")

Source code in src\shared_services\cloud_com\r2_com\exceptions.py
class R2ObjectNotFoundError(R2Error):
    """
    Raised when an object key does not exist in the bucket.

    Example:
        try:
            client.head_object(key)
        except R2ObjectNotFoundError:
            print("Object not found")
    """

    pass

get_r2_client()

Get the R2Client singleton instance.

Returns:

Type Description
R2Client

The R2Client singleton.

Raises:

Type Description
R2ClientNotInitializedError

If not initialized.

Example

client = get_r2_client() s3 = client.boto3_client

Source code in src\shared_services\cloud_com\r2_com\api.py
def get_r2_client() -> R2Client:
    """
    Get the R2Client singleton instance.

    Returns:
        The R2Client singleton.

    Raises:
        R2ClientNotInitializedError: If not initialized.

    Example:
        client = get_r2_client()
        s3 = client.boto3_client
    """
    client = R2Client.instance()
    if not client.is_initialized:
        raise R2ClientNotInitializedError(
            "R2 client not initialized. Call initialize_r2() first."
        )
    return client

initialize_r2(logger=None)

Initialize the R2 client.

Call once at application startup, after R2KeyManager has fetched keys. Credentials are fetched from R2KeyManager automatically.

Parameters:

Name Type Description Default
logger Optional[AsyncAppLogger]

Optional logger instance for debugging.

None

Raises:

Type Description
R2AuthenticationError

If R2 credentials are not available.

Example

from src.shared_services.cloud_com.r2_com.api import initialize_r2

After successful key fetch

initialize_r2()

Source code in src\shared_services\cloud_com\r2_com\api.py
def initialize_r2(
    logger: Optional["AsyncAppLogger"] = None,
) -> None:
    """
    Initialize the R2 client.

    Call once at application startup, after R2KeyManager has fetched keys.
    Credentials are fetched from R2KeyManager automatically.

    Args:
        logger: Optional logger instance for debugging.

    Raises:
        R2AuthenticationError: If R2 credentials are not available.

    Example:
        from src.shared_services.cloud_com.r2_com.api import initialize_r2

        # After successful key fetch
        initialize_r2()
    """
    client = R2Client.instance()
    client.initialize(logger=logger)

is_r2_initialized()

Check if R2 client has been initialized.

Returns:

Type Description
bool

True if initialize_r2() has been called.

Source code in src\shared_services\cloud_com\r2_com\api.py
def is_r2_initialized() -> bool:
    """
    Check if R2 client has been initialized.

    Returns:
        True if initialize_r2() has been called.
    """
    client = R2Client.instance()
    return client.is_initialized

is_r2_online()

Quick check if R2 is online.

Based on last operation status. Does not make a new API call. Use test_r2_connection() for a fresh connectivity check.

Returns:

Type Description
bool

True if last R2 operation succeeded.

Source code in src\shared_services\cloud_com\r2_com\api.py
def is_r2_online() -> bool:
    """
    Quick check if R2 is online.

    Based on last operation status. Does not make a new API call.
    Use test_r2_connection() for a fresh connectivity check.

    Returns:
        True if last R2 operation succeeded.
    """
    client = R2Client.instance()
    return client.is_online

test_r2_connection()

Test if R2 endpoint is accessible.

Returns:

Type Description
bool

True if connection successful.

Example

if test_r2_connection(): print("R2 storage is reachable")

Source code in src\shared_services\cloud_com\r2_com\api.py
def test_r2_connection() -> bool:
    """
    Test if R2 endpoint is accessible.

    Returns:
        True if connection successful.

    Example:
        if test_r2_connection():
            print("R2 storage is reachable")
    """
    try:
        client = R2Client.instance()
        if not client.is_initialized:
            return False
        return client.test_connection()
    except (R2Error, ImportError):
        return False