Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tests/conformance/_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import time
import requests


def start_grpc_server(grpc_endpoint, http_endpoint):
"""Starts the testbench gRPC server if it's not already running.

Expand Down
17 changes: 9 additions & 8 deletions tests/perf/microbenchmarks/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import socket
import psutil

_C4_STANDARD_192_NIC = "ens3" # can be fetched via ip link show
_C4_STANDARD_192_NIC = "ens3" # can be fetched via ip link show


def publish_benchmark_extra_info(
benchmark: Any,
Expand All @@ -28,7 +29,6 @@ def publish_benchmark_extra_info(
download_bytes_list: Optional[List[int]] = None,
duration: Optional[int] = None,
) -> None:

"""
Helper function to publish benchmark parameters to the extra_info property.
"""
Expand All @@ -48,14 +48,15 @@ def publish_benchmark_extra_info(
benchmark.group = benchmark_group

if download_bytes_list is not None:
assert duration is not None, "Duration must be provided if total_bytes_transferred is provided."
assert (
duration is not None
), "Duration must be provided if total_bytes_transferred is provided."
throughputs_list = [x / duration / (1024 * 1024) for x in download_bytes_list]
min_throughput = min(throughputs_list)
max_throughput = max(throughputs_list)
mean_throughput = statistics.mean(throughputs_list)
median_throughput = statistics.median(throughputs_list)


else:
object_size = params.file_size_bytes
num_files = params.num_files
Expand Down Expand Up @@ -211,13 +212,13 @@ def get_affinity(irq):

def get_primary_interface_name():
primary_ip = None

# 1. Determine the Local IP used for internet access
# We use UDP (SOCK_DGRAM) so we don't actually send a handshake/packet
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# connect() to a public IP (Google DNS) to force route resolution
s.connect(('8.8.8.8', 80))
s.connect(("8.8.8.8", 80))
primary_ip = s.getsockname()[0]
except Exception:
# Fallback if no internet
Expand Down Expand Up @@ -248,7 +249,7 @@ def get_irq_affinity():
for irq in irqs:
affinity_str = get_affinity(irq)
if affinity_str != "N/A":
for part in affinity_str.split(','):
if '-' not in part:
for part in affinity_str.split(","):
if "-" not in part:
cpus.add(int(part))
return cpus
2 changes: 1 addition & 1 deletion tests/perf/microbenchmarks/time_based/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
@pytest.fixture
def workload_params(request):
params = request.param
files_names = [f'fio-go_storage_fio.0.{i}' for i in range(0, params.num_processes)]
files_names = [f"fio-go_storage_fio.0.{i}" for i in range(0, params.num_processes)]
return params, files_names
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ async def _download_time_based_async(client, filename, params):


def _download_files_worker(process_idx, filename, params, bucket_type):

if bucket_type == "zonal":
return worker_loop.run_until_complete(
_download_time_based_async(worker_client, filename, params)
Expand Down
39 changes: 23 additions & 16 deletions tests/unit/asyncio/test_async_appendable_object_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,9 @@ async def test_state_lookup(self, mock_appendable_writer):
writer._is_stream_open = True
writer.write_obj_stream = mock_appendable_writer["mock_stream"]

mock_appendable_writer["mock_stream"].recv.return_value = (
storage_type.BidiWriteObjectResponse(persisted_size=100)
)
mock_appendable_writer[
"mock_stream"
].recv.return_value = storage_type.BidiWriteObjectResponse(persisted_size=100)

size = await writer.state_lookup()

Expand Down Expand Up @@ -246,9 +246,7 @@ async def test_append_data_less_than_flush_interval(self, mock_appendable_writer
],
)
@pytest.mark.asyncio
async def test_append(
self, data_len, mock_appendable_writer
):
async def test_append(self, data_len, mock_appendable_writer):
"""Verify append orchestrates manager and drives the internal generator."""
# Arrange
writer = self._make_one(mock_appendable_writer["mock_client"])
Expand All @@ -272,10 +270,19 @@ async def test_append(
# Assert
expected_recv_count = data_len // _DEFAULT_FLUSH_INTERVAL_BYTES
assert writer.offset == data_len
assert writer.bytes_appended_since_last_flush == data_len % _DEFAULT_FLUSH_INTERVAL_BYTES
assert writer.persisted_size == expected_recv_count*_DEFAULT_FLUSH_INTERVAL_BYTES
assert writer.write_obj_stream.send.await_count == -(-data_len // _MAX_CHUNK_SIZE_BYTES) # Ceiling division for number of chunks
assert writer.write_obj_stream.recv.await_count == expected_recv_count # Expect 1 recv per flush interval
assert (
writer.bytes_appended_since_last_flush
== data_len % _DEFAULT_FLUSH_INTERVAL_BYTES
)
assert (
writer.persisted_size == expected_recv_count * _DEFAULT_FLUSH_INTERVAL_BYTES
)
assert writer.write_obj_stream.send.await_count == -(
-data_len // _MAX_CHUNK_SIZE_BYTES
) # Ceiling division for number of chunks
assert (
writer.write_obj_stream.recv.await_count == expected_recv_count
) # Expect 1 recv per flush interval

@pytest.mark.asyncio
async def test_append_recovery_reopens_stream(self, mock_appendable_writer):
Expand Down Expand Up @@ -339,9 +346,9 @@ async def test_flush_resets_counters(self, mock_appendable_writer):
writer.write_obj_stream = mock_appendable_writer["mock_stream"]
writer.bytes_appended_since_last_flush = 100

mock_appendable_writer["mock_stream"].recv.return_value = (
storage_type.BidiWriteObjectResponse(persisted_size=200)
)
mock_appendable_writer[
"mock_stream"
].recv.return_value = storage_type.BidiWriteObjectResponse(persisted_size=200)

await writer.flush()

Expand Down Expand Up @@ -382,9 +389,9 @@ async def test_finalize_lifecycle(self, mock_appendable_writer):
writer.write_obj_stream = mock_appendable_writer["mock_stream"]

resource = storage_type.Object(size=999)
mock_appendable_writer["mock_stream"].recv.return_value = (
storage_type.BidiWriteObjectResponse(resource=resource)
)
mock_appendable_writer[
"mock_stream"
].recv.return_value = storage_type.BidiWriteObjectResponse(resource=resource)

res = await writer.finalize()

Expand Down