Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
97 changes: 97 additions & 0 deletions examples/example_measure_mw.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import time
import threading

from src.ppk2_api.ppk2_api import PPK2_API


class PowerProfiler:
conversion_factor = 1_000_000 # uA to mW

def __init__(self, voltage_mv=3700):
self.voltage_mv = voltage_mv
self.total_power_mW = 0
self.total_samples = 0

self.lock = threading.Lock()
self.ppk2 = None
self.sampling_thread = None
self.sampling_enabled = False


def _setup_ppk2(self):
ppk2s_connected = PPK2_API.list_devices()

# Check if we have at least one PPK2 device
if not ppk2s_connected:
raise ConnectionError("No PPK2 devices found!")

print(ppk2s_connected)
# Just select the first available PPK2 device
for ppk2_port_tuple in ppk2s_connected:
ppk2_port = ppk2_port_tuple[0] #Just get the port part of the tuple
print(f"Connecting to {ppk2_port}")

self.ppk2 = PPK2_API(ppk2_port, timeout=1, write_timeout=1, exclusive=True)

ret = self.ppk2.get_modifiers()
if ret is not None:
break

print(f"Failed to connect to {ppk2_port}")

self.ppk2.set_source_voltage(self.voltage_mv)
self.ppk2.use_source_meter()
self.ppk2.toggle_DUT_power("ON")

self.ppk2.start_measuring()
print("Initialized Power Profiler")


def _run_sampling(self):
try:
self._setup_ppk2()
while self.sampling_enabled:
time.sleep(0.01)
read_data = self.ppk2.get_data()

if read_data == b"":
continue

samples, raw_digital = self.ppk2.get_samples(read_data)
if not samples:
continue

average_current_uA = sum(samples) / len(samples)
average_power_mW = (average_current_uA * self.voltage_mv) / self.conversion_factor
formatted_power = round(average_power_mW, 2)

with self.lock:
self.total_power_mW += formatted_power
self.total_samples += 1
average_of_averages_mW = self.total_power_mW / self.total_samples

print(f"{formatted_power} mW, Avg: {average_of_averages_mW:.2f} mW")

except Exception as e:
self.sampling_enabled = False
print(f"An error occurred: {e}")

def start_sampling(self):
self.sampling_enabled = True
self.sampling_thread = threading.Thread(target=self._run_sampling, daemon=True)
self.sampling_thread.start()

def stop_sampling(self):
self.sampling_enabled = False
self.sampling_thread.join()


def main():
sampler = PowerProfiler(voltage_mv=3800)
sampler.start_sampling()
input("Press Enter to exit...\n")
sampler.stop_sampling()


if __name__ == "__main__":
main()
File renamed without changes.
55 changes: 45 additions & 10 deletions src/ppk2_api/ppk2_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,25 @@ def _read_metadata(self):
"""Read metadata"""
# try to get metadata from device
for _ in range(0, 5):
# it appears the second reading is the metadata
read = self.ser.read(self.ser.in_waiting)
time.sleep(0.1)

# TODO add a read_until serial read function with a timeout
if read != b'' and "END" in read.decode("utf-8"):
return read.decode("utf-8")
if not read:
continue # No data, try again

# Try decoding the data
try:
metadata = read.decode("utf-8")
except UnicodeDecodeError:
# If decoding fails, try again in next iteration
continue

# Check if the metadata is valid (i.e., contains "END")
if "END" in metadata:
return metadata

# If we exit the loop, it means we couldn't get valid metadata
raise ValueError("Could not retrieve valid metadata from the device.")

def _parse_metadata(self, metadata):
"""Parse metadata and store it to modifiers"""
Expand Down Expand Up @@ -229,17 +241,40 @@ def list_devices():
]
return devices


def get_data(self):
"""Return readings of one sampling period"""
sampling_data = self.ser.read(self.ser.in_waiting)
return sampling_data

def get_modifiers(self):
"""Gets and sets modifiers from device memory"""
self._write_serial((PPK2_Command.GET_META_DATA, ))
metadata = self._read_metadata()
ret = self._parse_metadata(metadata)
return ret
def get_modifiers(self, retries=2):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def get_modifiers(self, retries=2):
def get_modifiers(self, retries=4):

I have seen this occurring in our devices, maybe a bit of extra time would help?

"""
Retrieve and parse modifiers from the device memory, with optional retries.

In cases where the PPK2 tool did not shut down gracefully, the device may still
hold residual data from the previous session. The first GET_META_DATA command
may return a mix of valid metadata and garbage. Rather than parsing
and filtering out this garbage on the first try, issuing the GET_META_DATA command
again often yields clean data. This function will retry up to the
specified number of times before giving up.
"""

for attempt in range(1, retries + 1):
# Send command to request metadata
self._write_serial((PPK2_Command.GET_META_DATA, ))
try:
metadata = self._read_metadata()
ret = self._parse_metadata(metadata)
print(f"Attempt {attempt}/{retries} - Got metadata from PPK2")

return ret
except ValueError as e:
print(f"Attempt {attempt}/{retries} - Failed to get valid PPK2 metadata: {e}")
# If this wasn't the last attempt, we try again by sending GET_META_DATA again.


print("Failed to get modifiers after multiple attempts.")
return None
Comment on lines +262 to +277
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for attempt in range(1, retries + 1):
# Send command to request metadata
self._write_serial((PPK2_Command.GET_META_DATA, ))
try:
metadata = self._read_metadata()
ret = self._parse_metadata(metadata)
print(f"Attempt {attempt}/{retries} - Got metadata from PPK2")
return ret
except ValueError as e:
print(f"Attempt {attempt}/{retries} - Failed to get valid PPK2 metadata: {e}")
# If this wasn't the last attempt, we try again by sending GET_META_DATA again.
print("Failed to get modifiers after multiple attempts.")
return None
for attempt in range(1, retries + 1):
# Send command to request metadata
self._write_serial((PPK2_Command.GET_META_DATA, ))
try:
metadata = self._read_metadata()
ret = self._parse_metadata(metadata)
logging.info(f"Attempt {attempt}/{retries} - Got metadata from PPK2")
return ret
except ValueError as e:
logging.warning(f"Attempt {attempt}/{retries} - Failed to get valid PPK2 metadata: {e}")
# Wait a bit in case device was previously running
time.sleep(0.5)
# Clear anything in the buffer
self.ser.read(self.ser.in_waiting)
# If this wasn't the last attempt, we try again by sending GET_META_DATA again.
raise IOError("Failed to get modifiers after multiple attempts. Reconnect device if this persists.")

I would try not to use prints here and rather logging to keep the stdout clean, logging allows a bit more control from the application side.

I think also throwing a proper exception if the modifiers are not there is better, then the applications would run uncalibrated, causing worse problems in the future... I have seen it before.

I also suggested (but didn't test) a way to hopefully harden the recovery process...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, in hindsight, that the best way to solve this problem is to parse the metadata out from the garbage data (as it is in there). It feels like its the proper way to do it anyways, even on "clean" runs + it becomes more robust. Then, i think we can also remove the retries.

If it fails, raising an IOError is a very good suggestion!

I dont have time to look this before the weekend, but i would like to hear your thoughts on the suggestion above before continuing.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't closely looked into the output or the complexity of the parsing... If you think it is easy enough (and robust across the various PPK versions) then we can give it a try. Solving hardware problems at the root is always better than adding retries and timeouts (though not always possible).

I am happy to test across firmware versions if you think you have it. Otherwise, I have implemented this patch (or close enough to this patch) and am running tests with it.


def start_measuring(self):
"""Start continuous measurement"""
Expand Down