sfm2.utils package¶
Submodules¶
sfm2.utils.csv_writer module¶
Can be used to log SFM2 data to CSV files. See write_to_csv()
-
class
sfm2.utils.csv_writer.
CsvWriterOptions
(columns_separator: str = ',', float_format: str = '.3f', add_header: bool = True, include_units: bool = True, timestamp_column_header: str = 'ts', overwrite_file_if_exists: bool = True, create_new_thread: bool = True)¶ Bases:
object
-
columns_separator
: str = ','¶
-
float_format
: str = '.3f'¶
-
add_header
: bool = True¶
-
include_units
: bool = True¶
-
skip_samples_missing_timestamp
= True¶
-
timestamp_column_header
: str = 'ts'¶
-
overwrite_file_if_exists
: bool = True¶
-
create_new_thread
: bool = True¶
-
-
sfm2.utils.csv_writer.
write_to_csv
(data_observable: rx.core.typing.Observable[sfm2.interface.sample_types.Sample], filepath: str, options: sfm2.utils.csv_writer.CsvWriterOptions = CsvWriterOptions(columns_separator=',', float_format='.3f', add_header=True, include_units=True, timestamp_column_header='ts', overwrite_file_if_exists=True, create_new_thread=True)) → rx.core.abc.disposable.Disposable¶ Start writing the data to CSV.
- Returns
A disposable that can be used to stop writing and close the CSV.
- Return type
Disposable
-
class
sfm2.utils.csv_writer.
CsvWriter
(*args, **kwds)¶ Bases:
rx.core.typing.Observer
,rx.core.abc.disposable.Disposable
-
on_next
(value: sfm2.interface.sample_types.Sample) → None¶ Notifies the observer of a new element in the sequence.
- Args:
value: The received element.
-
on_error
(error: Exception) → None¶ Notifies the observer that an exception has occurred.
- Args:
error: The error that has occurred.
-
on_completed
() → None¶ Notifies the observer of the end of the sequence.
-
dispose
()¶
-
sfm2.utils.throughput module¶
Can be used to calculate the actual incoming SFM2 data rates. See calculate_throughput()
-
class
sfm2.utils.throughput.
ThroughputInfo
(samples_count: ‘int’, time_period_ns: ‘int’, min_device_ts_delta: ‘timedelta’, max_device_ts_delta: ‘timedelta’)¶ Bases:
object
-
samples_count
: int¶
-
time_period_ns
: int¶
-
min_device_ts_delta
: timedelta¶
-
max_device_ts_delta
: timedelta¶
-
static
zero
() → sfm2.utils.throughput.ThroughputInfo¶
-
property
rate_hz
¶
-
-
sfm2.utils.throughput.
calculate_throughput
(data: Union[rx.core.typing.Observable[sfm2.interface.sample_types.Sample], Sequence[rx.core.typing.Observable[sfm2.interface.sample_types.Sample]]], interval_s: float = 1) → Union[rx.core.typing.Observable[sfm2.utils.throughput.ThroughputInfo], rx.core.typing.Observable[Tuple[sfm2.utils.throughput.ThroughputInfo]]]¶ Utility used to calculate the rate of SFM2 data.
Transforms the passed data stream into a stream of throughput measurement results. Besides calculating data rate, the utility also examines timestamps to detect missing samples.
If a collection of data streams is passed instead of a single data stream, then the result will be a single stream where each element is a tuple. Each element of that tuple is the throughput measurement result of its corresponding input data stream.
The function won’t have any effect until the returned stream is actually used.
- Parameters
- Returns
A stream of throughput results, update at the requested interval.
- Return type
Union[Observable[ThroughputInfo], Observable[Tuple[ThroughputInfo,]]]
sfm2.utils.time_sync module¶
-
class
sfm2.utils.time_sync.
TimeSyncOptions
(time_reports_rate_hz: ‘float’ = 10, time_reports_before_offset: ‘int’ = 40, max_allowed_offset_ticks: ‘int’ = 30, min_offsets_before_changing_sign: ‘int’ = 3, offsets_before_trim: ‘int’ = 5, trim_scale: ‘float’ = 0.8)¶ Bases:
object
-
time_reports_rate_hz
: float = 10¶
-
time_reports_before_offset
: int = 40¶
-
max_allowed_offset_ticks
: int = 30¶
-
min_offsets_before_changing_sign
: int = 3¶
-
offsets_before_trim
: int = 5¶
-
trim_scale
: float = 0.8¶
-
-
class
sfm2.utils.time_sync.
TimeSyncStatus
¶ Bases:
rx.core.abc.disposable.Disposable
Represents the status of an ongoing synchronization.
Disposing it stops the synchronization.
-
abstract
wait_for_sync
(timeout: datetime.timedelta) → None¶ Blocks until the clocks are properly synchronized.
- Parameters
timeout – Maximum time to wait for the synchronization.
-
abstract property
offset
¶
-
abstract
-
class
sfm2.utils.time_sync.
TimeSync
(options: sfm2.utils.time_sync.TimeSyncOptions = TimeSyncOptions(time_reports_rate_hz=10, time_reports_before_offset=40, max_allowed_offset_ticks=30, min_offsets_before_changing_sign=3, offsets_before_trim=5, trim_scale=0.8))¶ Bases:
rx.core.abc.disposable.Disposable
Utility used to synchronize SFM2 clocks to the PC time.
Creating a TimeSync starts a ‘PC clock’, that measures time elapsed on PC.
-
sync
(sfm2_or_collection: Union[sfm2.interface.sfm2.Sfm2, sfm2.utils.sfm2_collection.Sfm2Collection]) → sfm2.utils.time_sync.TimeSyncStatus¶ Starts synchronizing clocks on one or more SFM2s.
This method can be called multiple times at any time. The SFM2 clock will always be synchronized to the same time (with t0 at zero_timestamp).
- Parameters
sfm2_or_collection (Union[Sfm2, Sfm2Collection]) – One sfm2 or a collection.
- Returns
object representing the status of the synchronization
- Return type
-
property
zero_timestamp
¶ Returns system time corresponding to t=0.
- Return type
datetime
-
get_time
() → datetime.timedelta¶ Returns current clock reading.
Can be used to synchronize other events to the SFM2 measurements.
- Return type
timedelta
-
dispose
()¶
-