This article assumes that you have some basic knowledge on using object queues to perform multi-threaded asynchronous processing and are familiar with the SysQueue’invoke event.
Object queues are a great way to take advantage of multiple threads to run operations against many instances in parallel. These are the slow cookers of the NexJ platform. We add one or more messages to the queue and the work is completed as resources become available. Set and forget.
However, what if some of that work is sensitive to other work from the same queue? How can we ensure that no work of the same category is happening at the time of our sensitive work? The answer is to use a semaphore. Object queues allow us to define one or more semaphores that a message will need to acquire before the message will be sent for processing and its work is completed.
Before we can use semaphores we need to do a little bit of work. This involves creating a sub-class of the SysMessage class that contains an implementation that identifies the semaphores it will need to acquire.
Asynchronous Parallel Processing Example
You have a process that generates a large volume of data through parallel processing, and occasionally throughout the processing you want to create a snapshot of what has been generated. To ensure accuracy of the snapshot, you want all data generation to stop while the snapshot is generated.
This can be achieved by having the data generation messages acquire difference semaphores, therefore allowing parallel processing, and the snapshot messages acquire all semaphores, therefore enforcing synchronous processing.
First, we need to create our sub-class of SysMessage. This is necessary in order to provide an implementation for the event getSemaphores, which is how the object queue knows which semaphores the message is required to acquire.
This is what our class, SimpleSemaphoreMessage.meta, looks like.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
<Class base="SysMessage"> <Attributes> <Attribute name="semaphores" type="any" value="(@ values semaphores)"/> <Attribute name="classCode" required="true" type="string" value=""SSMSG""/> </Attributes> <Events> <Event name="commit"> <Actions> <Action condition="(this'isDirty'semaphores)" name="setValues" type="before"> <![CDATA[(when (null? (@ values)) (this'values (serializable-property-map))) ((this'values)'body (@ body)) ((this'values)'semaphores (@ semaphores)) ]]> </Action> </Actions> </Event> <Event name="getSemaphores"> <Actions> <Action name="main" type="main"> <![CDATA[ (@ semaphores) ]]> </Action> </Actions> </Event> <Event args="class event args queueName semaphores" name="createInstance" static="true"> <Actions> <Action name="main" type="main"> <![CDATA[ (let ( (request (nexj.core.rpc.Request'new)) (msg (message (: :class "ObjectQueueMessage") (: body ()) (: channel queueName) (: delay 0) (: user ((user)'loginName)) (: name (string-append (class'name) "'" (symbol->string event))) (: protected #f) ) ) ) (request'addInvocation (class'name) (symbol->string event) (list->vector args) ()) (msg'body request) (SimpleSemaphoreMessage'new (: name (msg'name)) (: queue (SysObjectQueue'getQueue (msg'channel))) (: delay (msg'delay)) (: user (msg'user)) (: body msg) (: semaphores semaphores) ) ) ]]> </Action> </Actions> </Event> <Event args="names" name="constructSemaphores" static="true"> <Actions> <Action name="main" type="main"> <![CDATA[ (map (lambda (name) `(,(nexj.core.util.Binary'new (string->utf8 name)) . 1) ) names ) ]]></Action> </Actions> </Event> </Events> <PersistenceMapping dataSource="ObjectQueueDatabase"> <RelationalMapping keyGenerator="KeyGenerator.GUIDGen" primaryTable="Message"/> </PersistenceMapping> </Class> |
The main trick here is to add an attribute on the message to store the semaphores. We added the attribute semaphores for this purpose:
1 |
<Attribute name="semaphores" type="any" value="(@ values semaphores)"/> |
We also added an action to the commit event to ensure that the value of this attribute is serialized with the message instance:
1 2 3 4 5 6 7 8 9 |
<Action condition="(this'isDirty'semaphores)" name="setValues" type="before"> <![CDATA[ (when (null? (@ values)) (this'values (serializable-property-map)) ) ((this'values)'body (@ body)) ((this'values)'semaphores (@ semaphores)) ]]> </Action> |
To make life easier, we added the event constructSemaphores that will create the semaphores that the message needs to acquire. We also added the event createInstance to allow for an easy way to create instances of this message that can be later added to a queue.
Now, what’s next? Assuming that we already have a queue defined, we can write some code that adds work to the queue and then test our message. Here is a very simple example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
(let* ( (sem1 (SimpleSemaphoreMessage'constructSemaphores (list "1"))) (sem2 (SimpleSemaphoreMessage'constructSemaphores (list "2"))) (semBoth (SimpleSemaphoreMessage'constructSemaphores (list "1" "2"))) (msg0 (SimpleSemaphoreMessage'createInstance className 'eventName 'args queueName sem1)) (msg1 (SimpleSemaphoreMessage'createInstance className 'eventName 'args queueName sem2)) (msg2 (SimpleSemaphoreMessage'createInstance className 'eventName 'args queueName semBoth)) (oq (SysObjectQueue'getQueue (SampleDataSeeder'QUEUE_NAME))) ) (oq'send msg0) (oq'send msg1) (oq'send msg2) (commit) ) |
In this example, msg0 and msg1 are our data generation messages and msg2 is our snapshot message. Recall that msg2 cannot be processed while msg0 and msg1 are being processed, as it needs to acquire both semaphores that the other messages have acquired.
Play around with the code above to generate even more messages and assign different semaphores to different messages to see how the processing of the messages is affected. You will start to see that using semaphores in NexJ object queues is a good way to programmatically manage concurrency in asynchronous multi-threaded processes.