Skip to content

Commit 2d817a6

Browse files
authored
Merge pull request #463 from memrecakal/master
Adding Thread Safety
2 parents fa4003b + 4230f3c commit 2d817a6

File tree

15 files changed

+305
-44
lines changed

15 files changed

+305
-44
lines changed

.github/ISSUE_TEMPLATE/bug_report.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ Author: Dr. Martin Bischoff ([email protected])
1919
-->
2020

2121
* [ ] I am at the right place and my issue is directly related to ROS#. General technical questions I would post e.g. at [ROS Answers](https://answers.ros.org/) or [Stack Overflow](https://stackoverflow.com). For library-specific questions I would look for help in the corresponding library forums.
22-
* [ ] I have thoroughly read [the Contributing Guideline](Contributing.md) and writing this issue is the right thing to do in my case.
22+
* [ ] I have thoroughly read [the Contributing Guideline](Contributing) and writing this issue is the right thing to do in my case.
2323

2424
---
2525

.github/ISSUE_TEMPLATE/feature_request.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ Author: Dr. Martin Bischoff ([email protected])
1919
-->
2020

2121
* [ ] I am at the right place and my issue is directly related to ROS#. General technical questions I would post e.g. at [ROS Answers](https://answers.ros.org/) or [Stack Overflow](https://stackoverflow.com). For library-specific questions I would look for help in the corresponding library forums.
22-
* [ ] I have thoroughly read [the Contributing Guideline](Contributing.md) and writing this issue is the right thing to do in my case.
22+
* [ ] I have thoroughly read [the Contributing Guideline](Contributing) and writing this issue is the right thing to do in my case.
2323

2424
---
2525

.github/ISSUE_TEMPLATE/question.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ Author: Dr. Martin Bischoff ([email protected])
1818
-->
1919

2020
* [ ] I am at the right place and my issue is directly related to ROS#. General technical questions I would post e.g. at [ROS Answers](https://answers.ros.org/) or [Stack Overflow](https://stackoverflow.com). For library-specific questions I would look for help in the corresponding library forums.
21-
* [ ] I have thoroughly read [the Contributing Guideline](Contributing.md) and writing this issue is the right thing to do in my case.
21+
* [ ] I have thoroughly read [the Contributing Guideline](Contributing) and writing this issue is the right thing to do in my case.
2222

2323
---
2424

Libraries/MessageGeneration/MessageGeneration.csproj

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,45 @@
1414
<Copyright>Copyright © 2024</Copyright>
1515
<AssemblyVersion>2.0.0.0</AssemblyVersion>
1616
<FileVersion>2.0.0.0</FileVersion>
17+
<Version>2.0.0</Version>
18+
<GeneratePackageOnBuild>True</GeneratePackageOnBuild>
19+
<PackageId>RosSharpMessageGeneration</PackageId>
20+
<Title>MessageGeneration</Title>
21+
<Authors>ros-sharp</Authors>
22+
<EnablePackageValidation>true</EnablePackageValidation>
23+
<PackageProjectUrl>https://github.com/siemens/ros-sharp</PackageProjectUrl>
24+
<PackageReadmeFile>README.md</PackageReadmeFile>
25+
<RepositoryUrl>https://github.com/siemens/ros-sharp</RepositoryUrl>
26+
<PackageTags>ROS2;ROS;Robotic;Operating;System;Message;Generation</PackageTags>
27+
<PackageLicenseExpression>Apache-2.0</PackageLicenseExpression>
28+
<PublishRepositoryUrl>true</PublishRepositoryUrl>
29+
<SourceLinkCreate>true</SourceLinkCreate>
30+
<DebugType>portable</DebugType>
31+
<DebugSymbols>true</DebugSymbols>
32+
<IncludeSymbols>true</IncludeSymbols>
33+
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
34+
<PackageIcon>images\RosSharpLogoNuget.png</PackageIcon>
1735
</PropertyGroup>
1836
<ItemGroup>
1937
<PackageReference Include="Microsoft.CSharp" Version="4.7.0" />
2038
<PackageReference Include="System.Data.DataSetExtensions" Version="4.5.0" />
2139
</ItemGroup>
40+
41+
<ItemGroup>
42+
<None Include="..\..\LICENSE.md">
43+
<Pack>True</Pack>
44+
<PackagePath>\</PackagePath>
45+
</None>
46+
<None Include="..\..\README.md">
47+
<Pack>True</Pack>
48+
<PackagePath>\</PackagePath>
49+
</None>
50+
<None Include="..\RosSharpLogoNuget.png">
51+
<Pack>True</Pack>
52+
<PackagePath>\images\</PackagePath>
53+
</None>
54+
</ItemGroup>
55+
2256
<Target Name="CopyToCombinedOutput" AfterTargets="Publish">
2357
<!-- Copy the new DLLs to the combined output folder -->
2458
<ItemGroup>

Libraries/RosBridgeClient/Communicators.cs

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,18 +81,57 @@ internal class Subscriber<T> : Subscriber where T : Message
8181

8282
internal SubscriptionHandler<T> SubscriptionHandler { get; }
8383

84+
private bool _doEnsureThreadSafety = false;
85+
public bool DoEnsureThreadSafety
86+
{
87+
get => _doEnsureThreadSafety;
88+
set
89+
{
90+
_doEnsureThreadSafety= value;
91+
SetReceiveMethod();
92+
}
93+
}
94+
95+
private readonly object _lock = new object();
96+
private Action<string, ISerializer> _receiveMethod;
97+
8498
internal Subscriber(string id, string topic, SubscriptionHandler<T> subscriptionHandler, out Subscription subscription, int throttle_rate = 0, int queue_length = 1, int fragment_size = int.MaxValue, string compression = "none")
8599
{
86100
Id = id;
87101
Topic = topic;
88102
SubscriptionHandler = subscriptionHandler;
89103
subscription = new Subscription(id, Topic, GetRosName<T>(), throttle_rate, queue_length, fragment_size, compression);
104+
105+
SetReceiveMethod();
106+
}
107+
108+
private void SetReceiveMethod()
109+
{
110+
if (_doEnsureThreadSafety)
111+
{
112+
_receiveMethod = ReceiveThreadSafe;
113+
}
114+
else
115+
{
116+
_receiveMethod = ReceiveNonThreadSafe;
117+
}
90118
}
91119

92120
internal override void Receive(string message, ISerializer serializer)
93121
{
94-
//string replacedString = message.Replace("null", "0.0");
95-
//SubscriptionHandler.Invoke(serializer.Deserialize<T>(replacedString));
122+
_receiveMethod(message, serializer);
123+
}
124+
125+
private void ReceiveThreadSafe(string message, ISerializer serializer)
126+
{
127+
lock (_lock)
128+
{
129+
SubscriptionHandler.Invoke(serializer.Deserialize<T>(message));
130+
}
131+
}
132+
133+
private void ReceiveNonThreadSafe(string message, ISerializer serializer)
134+
{
96135
SubscriptionHandler.Invoke(serializer.Deserialize<T>(message));
97136
}
98137
}

Libraries/RosBridgeClient/RosBridgeClient.csproj

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,25 @@
1414
<Copyright>Copyright © 2024</Copyright>
1515
<AssemblyVersion>2.0.0.0</AssemblyVersion>
1616
<FileVersion>2.0.0.0</FileVersion>
17+
<Version>2.0.0</Version>
18+
<GeneratePackageOnBuild>True</GeneratePackageOnBuild>
19+
<PackageId Condition="'$(Configuration)' == 'ROS2'">RosSharpRosBridgeClient.ROS2</PackageId>
20+
<PackageId Condition="'$(Configuration)' == 'Release'">RosSharpRosBridgeClient.ROS1</PackageId>
21+
<Title>RosBridgeClient</Title>
22+
<Authors>ros-sharp</Authors>
23+
<EnablePackageValidation>true</EnablePackageValidation>
24+
<PackageProjectUrl>https://github.com/siemens/ros-sharp</PackageProjectUrl>
25+
<PackageReadmeFile>README.md</PackageReadmeFile>
26+
<RepositoryUrl>https://github.com/siemens/ros-sharp</RepositoryUrl>
27+
<PackageTags>ROS2;ROS;Robotic;Operating;System</PackageTags>
28+
<PackageLicenseExpression>Apache-2.0</PackageLicenseExpression>
29+
<PublishRepositoryUrl>true</PublishRepositoryUrl>
30+
<SourceLinkCreate>true</SourceLinkCreate>
31+
<DebugType>portable</DebugType>
32+
<DebugSymbols>true</DebugSymbols>
33+
<IncludeSymbols>true</IncludeSymbols>
34+
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
35+
<PackageIcon>images\RosSharpLogoNuget.png</PackageIcon>
1736
</PropertyGroup>
1837
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='ROS2|AnyCPU'">
1938
<Optimize>True</Optimize>
@@ -33,8 +52,22 @@
3352
<ItemGroup>
3453
<Folder Include="MessageTypes\ROS2\" />
3554
</ItemGroup>
36-
37-
<Target Name="CopyToCombinedOutput" AfterTargets="Publish">
55+
<ItemGroup>
56+
<None Include="..\..\LICENSE.md">
57+
<Pack>True</Pack>
58+
<PackagePath>\</PackagePath>
59+
</None>
60+
<None Include="..\..\README.md">
61+
<Pack>True</Pack>
62+
<PackagePath>\</PackagePath>
63+
</None>
64+
<None Include="..\RosSharpLogoNuget.png">
65+
<Pack>True</Pack>
66+
<PackagePath>\images\</PackagePath>
67+
</None>
68+
</ItemGroup>
69+
70+
<Target Name="CopyToCombinedOutput" AfterTargets="Publish">
3871
<!-- Copy the new DLLs to the combined output folder -->
3972
<ItemGroup>
4073
<NewDllFiles Include="$(OutputPath)\publish\*.dll" />

Libraries/RosBridgeClient/RosSocket.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,14 +111,20 @@ public void Unadvertise(string id)
111111

112112
#region Subscribers
113113

114-
public string Subscribe<T>(string topic, SubscriptionHandler<T> subscriptionHandler, int throttle_rate = 0, int queue_length = 1, int fragment_size = int.MaxValue, string compression = "none") where T : Message
114+
public string Subscribe<T>(string topic, SubscriptionHandler<T> subscriptionHandler, int throttle_rate = 0, int queue_length = 1, int fragment_size = int.MaxValue, string compression = "none", bool ensureThreadSafety = false) where T : Message
115115
{
116116
string id;
117117
lock (SubscriberLock)
118118
{
119119
id = GetUnusedCounterID(Subscribers, topic);
120120
Subscription subscription;
121-
Subscribers.Add(id, new Subscriber<T>(id, topic, subscriptionHandler, out subscription, throttle_rate, queue_length, fragment_size, compression));
121+
122+
var subscriber = new Subscriber<T>(id, topic, subscriptionHandler, out subscription, throttle_rate, queue_length, fragment_size, compression)
123+
{
124+
DoEnsureThreadSafety = ensureThreadSafety
125+
};
126+
127+
Subscribers.Add(id, subscriber);
122128
Send(subscription);
123129
}
124130

Libraries/RosBridgeClientTest/RosSocketConsoleExample.cs

Lines changed: 80 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ limitations under the License.
1818
using std_msgs = RosSharp.RosBridgeClient.MessageTypes.Std;
1919
using std_srvs = RosSharp.RosBridgeClient.MessageTypes.Std;
2020
using rosapi = RosSharp.RosBridgeClient.MessageTypes.Rosapi;
21+
using System.Threading;
22+
using System.Threading.Tasks;
23+
using System.Text;
24+
using System.Collections.Generic;
25+
using System.Reflection;
2126

2227

2328
// commands on ROS system:
@@ -46,48 +51,105 @@ public static void Main(string[] args)
4651
//RosSocket rosSocket = new RosSocket(new RosBridgeClient.Protocols.WebSocketSharpProtocol(uri));
4752
RosSocket rosSocket = new RosSocket(new RosBridgeClient.Protocols.WebSocketNetProtocol(uri));
4853

49-
// Publication:
50-
std_msgs.String message = new std_msgs.String
51-
{
52-
data = "publication test masdasdessage data"
53-
};
54-
55-
string publication_id = rosSocket.Advertise<std_msgs.String>("pub_test");
56-
rosSocket.Publish(publication_id, message);
54+
// Publication
55+
SimpleMessagePub(rosSocket, "/pub_test");
5756

5857
// Subscription:
59-
string subscription_id = rosSocket.Subscribe<std_msgs.String>("/sub_test", SubscriptionHandler);
58+
SimpleMessageSub(rosSocket, "/sub_test");
59+
60+
// Thread Safety Test:
61+
SimulateParallelMessageReception(rosSocket, 8);
6062

6163
// Service Call:
62-
rosSocket.CallService<rosapi.GetParamRequest, rosapi.GetParamResponse>("/rosapi/get_param", ServiceCallHandler, new rosapi.GetParamRequest("/rosdistro", "defaut_value")); // Just "default" for ROS1
64+
//rosSocket.CallService<rosapi.GetParamRequest, rosapi.GetParamResponse>("/rosapi/get_param", ServiceCallHandler, new rosapi.GetParamRequest("/rosdistro", "default")); // ROS1
65+
rosSocket.CallService<rosapi.GetROSVersionRequest, rosapi.GetROSVersionResponse>("/rosapi/get_ros_version", ServiceCallHandlerROS2, new rosapi.GetROSVersionRequest());
6366

6467
// Service Response:
6568
string service_id = rosSocket.AdvertiseService<std_srvs.TriggerRequest, std_srvs.TriggerResponse>("/service_response_test", ServiceResponseHandler);
66-
67-
Console.WriteLine("Press any key to unsubscribe...");
69+
Console.WriteLine("Service id: " + service_id);
70+
Console.WriteLine("Press any key to close unadvertise service...");
6871
Console.ReadKey(true);
69-
rosSocket.Unadvertise(publication_id);
70-
rosSocket.Unsubscribe(subscription_id);
7172
rosSocket.UnadvertiseService(service_id);
7273

73-
Console.WriteLine("Press any key to close...");
74+
Console.WriteLine("Press any key to close RosSocket...");
7475
Console.ReadKey(true);
7576
rosSocket.Close();
7677
}
7778
private static void SubscriptionHandler(std_msgs.String message)
7879
{
79-
Console.WriteLine((message).data);
80+
Console.WriteLine("processing message: " + (message).data);
81+
Thread.Sleep(100); // simulate some work
82+
Console.WriteLine("done processing message: " + (message).data);
8083
}
8184

8285
private static void ServiceCallHandler(rosapi.GetParamResponse message)
8386
{
84-
Console.WriteLine("ROS distro: " + message.value);
87+
Console.WriteLine("Response value: " + message.value);
88+
}
89+
90+
private static void ServiceCallHandlerROS2(rosapi.GetROSVersionResponse message)
91+
{
92+
Console.WriteLine("Response value: " + message.distro);
8593
}
8694

8795
private static bool ServiceResponseHandler(std_srvs.TriggerRequest arguments, out std_srvs.TriggerResponse result)
8896
{
8997
result = new std_srvs.TriggerResponse(true, "service response message");
9098
return true;
9199
}
100+
101+
// Simple message pub
102+
private static void SimpleMessagePub(RosSocket rosSocket, String topic)
103+
{
104+
std_msgs.String message = new std_msgs.String
105+
{
106+
data = "single pub test msg"
107+
};
108+
109+
string publication_id = rosSocket.Advertise<std_msgs.String>(topic);
110+
rosSocket.Publish(publication_id, message);
111+
112+
Console.WriteLine("Press any key to unadvertise...");
113+
Console.ReadKey(true);
114+
115+
rosSocket.Unadvertise(publication_id);
116+
}
117+
118+
// Simple message sub
119+
private static void SimpleMessageSub(RosSocket rosSocket, String topic)
120+
{
121+
string subscription_id = rosSocket.Subscribe<std_msgs.String>(topic, SubscriptionHandler, ensureThreadSafety: false);
122+
123+
Console.WriteLine("Press any key to unsubscribe...");
124+
Console.ReadKey(true);
125+
126+
rosSocket.Unsubscribe(subscription_id);
127+
}
128+
129+
// Simulate receiving messages from multiple threads to test thread safety
130+
private static void SimulateParallelMessageReception(RosSocket rosSocket, int numberOfMessages)
131+
{
132+
string subscription_id = rosSocket.Subscribe<std_msgs.String>("/thread_test", SubscriptionHandler, ensureThreadSafety: true);
133+
var rosSocketType = rosSocket.GetType();
134+
var subscribersField = rosSocketType.GetField("Subscribers", BindingFlags.NonPublic | BindingFlags.Instance);
135+
var subscribers = (Dictionary<string, Subscriber>)subscribersField.GetValue(rosSocket);
136+
var subscriber = subscribers[subscription_id];
137+
138+
Parallel.For(0, numberOfMessages, i =>
139+
{
140+
// Fake message data
141+
var message = new std_msgs.String { data = $"message {i}" };
142+
143+
byte[] serializedBytes = rosSocket.Serializer.Serialize(message);
144+
string serializedMessage = Encoding.UTF8.GetString(serializedBytes);
145+
var deserializedMessage = rosSocket.Serializer.Deserialize<std_msgs.String>(serializedMessage);
146+
147+
Console.WriteLine($"Processing thread: {i}");
148+
149+
subscriber.Receive(serializedMessage, rosSocket.Serializer);
150+
});
151+
152+
rosSocket.Unsubscribe(subscription_id);
153+
}
92154
}
93-
}
155+
}

Libraries/Urdf/Urdf.csproj

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,45 @@
1515
<Copyright>Copyright © 2024</Copyright>
1616
<AssemblyVersion>2.0.0.0</AssemblyVersion>
1717
<FileVersion>2.0.0.0</FileVersion>
18+
<Version>2.0.0</Version>
19+
<GeneratePackageOnBuild>True</GeneratePackageOnBuild>
20+
<PackageId>RosSharpUrdf</PackageId>
21+
<Title>Urdf</Title>
22+
<Authors>ros-sharp</Authors>
23+
<EnablePackageValidation>true</EnablePackageValidation>
24+
<PackageProjectUrl>https://github.com/siemens/ros-sharp</PackageProjectUrl>
25+
<PackageReadmeFile>README.md</PackageReadmeFile>
26+
<RepositoryUrl>https://github.com/siemens/ros-sharp</RepositoryUrl>
27+
<PackageTags>ROS2;ROS;Robotic;Operating;System;URDF</PackageTags>
28+
<PackageLicenseExpression>Apache-2.0</PackageLicenseExpression>
29+
<PublishRepositoryUrl>true</PublishRepositoryUrl>
30+
<SourceLinkCreate>true</SourceLinkCreate>
31+
<DebugType>portable</DebugType>
32+
<DebugSymbols>true</DebugSymbols>
33+
<IncludeSymbols>true</IncludeSymbols>
34+
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
35+
<PackageIcon>images\RosSharpLogoNuget.png</PackageIcon>
1836
</PropertyGroup>
1937
<ItemGroup>
2038
<PackageReference Include="Microsoft.CSharp" Version="4.7.0" />
2139
<PackageReference Include="System.Data.DataSetExtensions" Version="4.5.0" />
2240
</ItemGroup>
41+
42+
<ItemGroup>
43+
<None Include="..\..\LICENSE.md">
44+
<Pack>True</Pack>
45+
<PackagePath>\</PackagePath>
46+
</None>
47+
<None Include="..\..\README.md">
48+
<Pack>True</Pack>
49+
<PackagePath>\</PackagePath>
50+
</None>
51+
<None Include="..\RosSharpLogoNuget.png">
52+
<Pack>True</Pack>
53+
<PackagePath>\images\</PackagePath>
54+
</None>
55+
</ItemGroup>
56+
2357
<Target Name="CopyToCombinedOutput" AfterTargets="Publish">
2458
<!-- Copy the new DLLs to the combined output folder -->
2559
<ItemGroup>

0 commit comments

Comments
 (0)