semantic_digital_twin.adapters.ros.world_synchronizer#

Classes#

Synchronizer

Abstract Synchronizer class to manage world synchronizations between processes running semantic digital twin.

SynchronizerOnCallback

Synchronizer that does something on callbacks by the world.

StateSynchronizer

Synchronizes the state (values of free variables) of the semantic digital twin with the associated ROS topic.

ModelSynchronizer

Synchronizes the model (addition/removal of bodies/DOFs/connections) with the associated ROS topic.

ModelReloadSynchronizer

Synchronizes the model reloading process across different systems using ROS messaging.

Module Contents#

class semantic_digital_twin.adapters.ros.world_synchronizer.Synchronizer#

Bases: abc.ABC

Abstract Synchronizer class to manage world synchronizations between processes running semantic digital twin. It manages publishers and subscribers, ensuring proper cleanup after use. The communication is JSON string based.

node: rclpy.node.Node#

The rclpy node used to create the publishers and subscribers.

world: semantic_digital_twin.world.World#

The world to synchronize.

topic_name: str | None = None#

The topic name of the publisher and subscriber.

publisher: rclpy.publisher.Publisher | None = None#

The publisher used to publish the world state.

subscriber: rclpy.subscription.Subscription | None = None#

The subscriber to the world state.

message_type: ClassVar[Type[random_events.utils.SubclassJSONSerializer] | None] = None#

The type of the message that is sent and received.

property meta_data: semantic_digital_twin.adapters.ros.messages.MetaData#

The metadata of the synchronizer which can be used to compare origins of messages.

subscription_callback(msg: std_msgs.msg.String)#

Wrap the origin subscription callback by self-skipping and disabling the next world callback.

publish(msg: semantic_digital_twin.adapters.ros.messages.Message)#
close()#

Clean up publishers and subscribers.

class semantic_digital_twin.adapters.ros.world_synchronizer.SynchronizerOnCallback#

Bases: Synchronizer, semantic_digital_twin.callbacks.callback.Callback, abc.ABC

Synchronizer that does something on callbacks by the world. Additionally, ensures that the callback is cleaned up on close.

missed_messages: List = []#

The messages that the callback did not trigger due to being paused.

abstract apply_message(msg)#

Apply the received message to the world.

abstract world_callback()#

Called when the world notifies and update that is not caused by this synchronizer.

apply_missed_messages()#

Applies the missed messages to the world.

class semantic_digital_twin.adapters.ros.world_synchronizer.StateSynchronizer#

Bases: semantic_digital_twin.callbacks.callback.StateChangeCallback, SynchronizerOnCallback

Synchronizes the state (values of free variables) of the semantic digital twin with the associated ROS topic.

message_type: ClassVar[Type[random_events.utils.SubclassJSONSerializer] | None]#

The type of the message that is sent and received.

topic_name: str = '/semantic_digital_twin/world_state'#

The topic name of the publisher and subscriber.

apply_message(msg: semantic_digital_twin.adapters.ros.messages.WorldStateUpdate)#

Update the world state with the provided message.

Parameters:

msg – The message containing the new state information.

world_callback()#

Publish the current world state to the ROS topic.

class semantic_digital_twin.adapters.ros.world_synchronizer.ModelSynchronizer#

Bases: semantic_digital_twin.callbacks.callback.ModelChangeCallback, SynchronizerOnCallback

Synchronizes the model (addition/removal of bodies/DOFs/connections) with the associated ROS topic.

message_type: ClassVar[Type[random_events.utils.SubclassJSONSerializer]]#

The type of the message that is sent and received.

topic_name: str = '/semantic_digital_twin/world_model'#

The topic name of the publisher and subscriber.

apply_message(msg: semantic_digital_twin.adapters.ros.messages.ModificationBlock)#

Apply the received message to the world.

world_callback()#

Called when the world notifies and update that is not caused by this synchronizer.

class semantic_digital_twin.adapters.ros.world_synchronizer.ModelReloadSynchronizer#

Bases: Synchronizer

Synchronizes the model reloading process across different systems using ROS messaging. The database must be the same across the different processes, otherwise the synchronizer will fail.

Use this when you did changes to the model that cannot be communicated via the ModelSynchronizer and hence need to force all processes to load your world model. Note that this may take a couple of seconds.

message_type: ClassVar[Type[random_events.utils.SubclassJSONSerializer]]#

The type of the message that is sent and received.

session: sqlalchemy.orm.Session = None#

The session used to perform persistence interaction.

topic_name: str = '/semantic_digital_twin/reload_model'#

The topic name of the publisher and subscriber.

publish_reload_model()#

Save the current world model to the database and publish the primary key to the ROS topic such that other processes can subscribe to the model changes and update their worlds.