sfm2.utils package

Submodules

sfm2.utils.csv_writer module

Can be used to log SFM2 data to CSV files. See write_to_csv()

class sfm2.utils.csv_writer.CsvWriterOptions(columns_separator: str = ',', float_format: str = '.3f', add_header: bool = True, include_units: bool = True, timestamp_column_header: str = 'ts', overwrite_file_if_exists: bool = True, create_new_thread: bool = True)

Bases: object

columns_separator: str = ','
float_format: str = '.3f'
add_header: bool = True
include_units: bool = True
skip_samples_missing_timestamp = True
timestamp_column_header: str = 'ts'
overwrite_file_if_exists: bool = True
create_new_thread: bool = True
sfm2.utils.csv_writer.write_to_csv(data_observable: rx.core.typing.Observable[sfm2.interface.sample_types.Sample], filepath: str, options: sfm2.utils.csv_writer.CsvWriterOptions = CsvWriterOptions(columns_separator=',', float_format='.3f', add_header=True, include_units=True, timestamp_column_header='ts', overwrite_file_if_exists=True, create_new_thread=True)) → rx.core.abc.disposable.Disposable

Start writing the data to CSV.

Returns

A disposable that can be used to stop writing and close the CSV.

Return type

Disposable

class sfm2.utils.csv_writer.CsvWriter(*args, **kwds)

Bases: rx.core.typing.Observer, rx.core.abc.disposable.Disposable

on_next(value: sfm2.interface.sample_types.Sample) → None

Notifies the observer of a new element in the sequence.

Args:

value: The received element.

on_error(error: Exception) → None

Notifies the observer that an exception has occurred.

Args:

error: The error that has occurred.

on_completed() → None

Notifies the observer of the end of the sequence.

dispose()

sfm2.utils.throughput module

Can be used to calculate the actual incoming SFM2 data rates. See calculate_throughput()

class sfm2.utils.throughput.ThroughputInfo(samples_count: ‘int’, time_period_ns: ‘int’, min_device_ts_delta: ‘timedelta’, max_device_ts_delta: ‘timedelta’)

Bases: object

samples_count: int
time_period_ns: int
min_device_ts_delta: timedelta
max_device_ts_delta: timedelta
static zero()sfm2.utils.throughput.ThroughputInfo
property rate_hz
sfm2.utils.throughput.calculate_throughput(data: Union[rx.core.typing.Observable[sfm2.interface.sample_types.Sample], Sequence[rx.core.typing.Observable[sfm2.interface.sample_types.Sample]]], interval_s: float = 1) → Union[rx.core.typing.Observable[sfm2.utils.throughput.ThroughputInfo], rx.core.typing.Observable[Tuple[sfm2.utils.throughput.ThroughputInfo]]]

Utility used to calculate the rate of SFM2 data.

Transforms the passed data stream into a stream of throughput measurement results. Besides calculating data rate, the utility also examines timestamps to detect missing samples.

If a collection of data streams is passed instead of a single data stream, then the result will be a single stream where each element is a tuple. Each element of that tuple is the throughput measurement result of its corresponding input data stream.

The function won’t have any effect until the returned stream is actually used.

Parameters
  • data (Union[Observable[Sample], Sequence[Observable[Sample]]]) – Data stream from the SFM2 or a sequence of data streams.

  • interval_s (float) – Interval at which the rate should be evaluated, in seconds. Defaults to 1 second.

Returns

A stream of throughput results, update at the requested interval.

Return type

Union[Observable[ThroughputInfo], Observable[Tuple[ThroughputInfo,]]]

sfm2.utils.time_sync module

class sfm2.utils.time_sync.TimeSyncOptions(time_reports_rate_hz: ‘float’ = 10, time_reports_before_offset: ‘int’ = 40, max_allowed_offset_ticks: ‘int’ = 30, min_offsets_before_changing_sign: ‘int’ = 3, offsets_before_trim: ‘int’ = 5, trim_scale: ‘float’ = 0.8)

Bases: object

time_reports_rate_hz: float = 10
time_reports_before_offset: int = 40
max_allowed_offset_ticks: int = 30
min_offsets_before_changing_sign: int = 3
offsets_before_trim: int = 5
trim_scale: float = 0.8
class sfm2.utils.time_sync.TimeSyncStatus

Bases: rx.core.abc.disposable.Disposable

Represents the status of an ongoing synchronization.

Disposing it stops the synchronization.

abstract wait_for_sync(timeout: datetime.timedelta) → None

Blocks until the clocks are properly synchronized.

Parameters

timeout – Maximum time to wait for the synchronization.

abstract property offset
class sfm2.utils.time_sync.TimeSync(options: sfm2.utils.time_sync.TimeSyncOptions = TimeSyncOptions(time_reports_rate_hz=10, time_reports_before_offset=40, max_allowed_offset_ticks=30, min_offsets_before_changing_sign=3, offsets_before_trim=5, trim_scale=0.8))

Bases: rx.core.abc.disposable.Disposable

Utility used to synchronize SFM2 clocks to the PC time.

Creating a TimeSync starts a ‘PC clock’, that measures time elapsed on PC.

sync(sfm2_or_collection: Union[sfm2.interface.sfm2.Sfm2, sfm2.utils.sfm2_collection.Sfm2Collection])sfm2.utils.time_sync.TimeSyncStatus

Starts synchronizing clocks on one or more SFM2s.

This method can be called multiple times at any time. The SFM2 clock will always be synchronized to the same time (with t0 at zero_timestamp).

Parameters

sfm2_or_collection (Union[Sfm2, Sfm2Collection]) – One sfm2 or a collection.

Returns

object representing the status of the synchronization

Return type

TimeSyncStatus

property zero_timestamp

Returns system time corresponding to t=0.

Return type

datetime

get_time() → datetime.timedelta

Returns current clock reading.

Can be used to synchronize other events to the SFM2 measurements.

Return type

timedelta

dispose()

Module contents