First let me thank Matthew for his Christmasy donation, thanks mate!

Please Get us some Threaded Extension Tutorials with callbacks / unique ids per request. Example like MySQL ๐Ÿ™‚

Usually I do my own things and put some out on the blog, but this time my current interests coincided with this request. Also, in the past, I had a few more requests about threaded extensions, so I guess I just gonna cave in ๐Ÿ™‚

Assuming you have VS2013 already (if you don’t, download VS2013 Express, it is free). Bohemia have already included VS2013 distributables with Arma 3, so no extras are needed. Create your own project (I called mine “threaded_example”) as explained in Part 1. Paste the following into it:

// threaded_example.cpp : Defines the exported functions for the DLL application. // #include "stdafx.h" #include <string> #include <unordered_map> #include <thread> #include <mutex> #include <atomic> using namespace std; struct Data { bool ready = false; string params = ""; string result = ""; }; unordered_map<long int, Data> tickets; mutex mtx; atomic<bool> worker_working(false); long int id = 0; // global ticket id long int cur_id = 0; // current ticket id extern "C" { __declspec (dllexport) void __stdcall RVExtension(char *output, int outputSize, const char *function); } void worker() { while (worker_working = id > cur_id) // next ticket exists? { mtx.lock(); Data ticket = tickets[++cur_id]; // copy ticket mtx.unlock(); string input = ticket.params; // get input string output = "output: " + input; // process input Sleep(10); //sleep for 0.01 sec (FOR TESTING PURPOSES ONLY) ticket.result = output; // prepare result ticket.ready = true; // notify about result mtx.lock(); tickets[cur_id] = ticket; // copy back the result mtx.unlock(); } } void __stdcall RVExtension(char *output, int outputSize, const char *function) { if (!strncmp(function, "r:", 2)) // detect checking for result { long int num = atol(&function[2]); // ticket number or 0 if (tickets.find(num) != tickets.end()) // ticket exists { mtx.lock(); if (tickets[num].ready) // result is ready { strncpy_s(output, outputSize, tickets[num].result.c_str(), _TRUNCATE); // result tickets.erase(num); // get rid of the read ticket mtx.unlock(); return; } mtx.unlock(); strncpy_s(output, outputSize, "WAIT", _TRUNCATE); // result is not ready return; } strncpy_s(output, outputSize, "EMPTY", _TRUNCATE); // no such ticket } else if (!strncmp(function, "s:", 2)) // detect ticket submission { Data data; data.params = string(&function[2]); // extract params mtx.lock(); tickets.insert(pair<long int, Data>(++id, data)); // add ticket to the queue mtx.unlock(); if (!worker_working) // if worker thread is finished, start another one { worker_working = true; thread worker(worker); worker.detach(); // start parallel process } strncpy_s(output, outputSize, to_string(id).c_str(), _TRUNCATE); // ticket number } else { strncpy_s(output, outputSize, "INVALID COMMAND", _TRUNCATE); // other input } }

What’s going on here? When extension is called with “s:whateverparamyousend”, it returns ticket number. When this ticket number is queried back with “r:ticketnumber”, extension checks if the ticket is ready to be collected. At this point there are 3 variants of return:

  • “WAIT” – indicates that the ticket is still in the queue
  • “EMPTY” – if ticket does not exist
  • “whateverresultyouassigned” – the actual result

After first successful retrieval of the result, the ticket gets erased, so next call with the same ticket number will result in “EMPTY”.

As you can see I’ve added 2 commands “s:” – send and “r:” – retrieve. You can make your own, it is just the matter of being able to parse them. I used strncmp that compares first n chars of the input. If there is a match, the rest of the string is treated as param. If param needs to be parsed further, you can do it after, in worker thread, in a similar manner.

If command is “s:”, a new element is added to map tickets. Map tickets consist of a pair: long int, which represent id of the ticket and struct that contains data about submission. You can add more members to struct if you want, for example instead of having just one member params, you can expand it to multiple params, like param1, param2 and parse params on the spot. Although if you want extension to return as soon as possible it is better to do all the heavy lifting in the worker thread and keep main thread operations to minimum.

Global id is increased by 1 and new ticket with this id is added to tickets. If at this point worker thread is not running, it will be started. If worker thread is running, when it finally gets to this new ticket, it will process its params and put result into struct result changing flag ready to true. If no more tickets are detected, the thread will quit, until next submission. I have added 1 second delay, to emulate possible delay with ticket processing. It is not needed, just for testing.

If command is “r:”, the rest of the string is converted to long int. Map ticketsย is then searched for the ticket with matching id. If such ticket exists, it then checked for readiness and if it is ready the result is sent back.ย If string passed to the extension starts with neither “s:” nor “r:”, “INVALID COMMAND” message is sent back. As you can see there is not much to it.

One more thing. As I have mentioned before, there is a limit of how much you can return in one go from extension. This limit is defined by outputSize. If the result that you’re sending back is longer than this, you will need to chop it and send in pieces. I will also leave it to you to figure this out on your own (if you ever come across it). Now for the SQF part:

KK_fnc_callExtensionAsync = { [format [ "r:%1", "threaded_example" callExtension format [ "s:%1", _this select 0 ] ], _this select 1] spawn { waitUntil { _res = "threaded_example" callExtension (_this select 0); if (_res != "WAIT") exitWith { _res call (_this select 1); true }; false }; }; }; KK_fnc_callBackFunc = { hint _this; // output: myparam }; _id = ["myparam", KK_fnc_callBackFunc] call KK_fnc_callExtensionAsync;

KK_fnc_callExtensionAsync is SQF function that handles communication with the extension. 1st element of the array that is passed to this function is the bare param you send to extension. 2nd element is the variable containing callback function, in this case KK_fnc_callBackFunc. The return of KK_fnc_callExtensionAsync is spawn script handle. KK_fnc_callBackFunc will receive extension output in _this variable after myparam is processed by the extension.

This example will mostly run in scheduled environment even if started in unscheduled, because of the use of waitUntil. Callback function will also be executed in scheduled environment. If you need unscheduled execution, consider writing the whole routine in FSM.

Enjoy,
KK

EDIT: OK, thanks to MaHuJa I revised the code and made some amendments. Changed map to unordered_map and added mutex. I thought I could get away without, but after thorough testing managed to break it. Also there was unintentional error in the code I have no idea how it got there. Anyway, updated code is just a tad bit more complicated because of the mutex but is more robust.

EDIT: Changed volatile to atomic after MaHuJa’s advice and reading about it a bit more.