My goal / TLDR

My goal with this blog post is to explain how to set custom mountpoints for VerneMQ by modifying the shipped MongoDB auth Lua script  (lua/auth/mongodb.lua).

Setting a custom mountpoint is possible with VerneMQ not only by setting mountpoints manually for specific listeners (e.g. ports), but also programmatically during authorization in your scripts.

I had a hard time wrapping my head around it, as there are no examples, and I had not programmed in a functional language (as Erlang) before. Additionally, I had not touched Lua before – but Lua is easier to understand than Erlang IMHO.

why custom mountpoints?

The idea is to isolate different users against each other (multi-tenancy). Each user will have their very own topic tree, there will be no need to check for collisions or demand of users to add an additional prefix.

The total isolation also increases security by accidentally setting the ACLs incorrectly.

From my perspective therefore a must!

necessary script modifications

The necessary modification of the script is as follows (pasted as code & again as screenshot, so you can see where WordPress messes up formatting, etc.):

function auth_on_register(reg)
     if reg.username ~= nil and reg.password ~= nil then
         doc = mongodb.find_one(pool, „vmq_acl_auth“,
                                 {client_id = reg.client_id,
                                  username = reg.username})
         if doc ~= false then
             if doc.active then
                 if doc.passhash == bcrypt.hashpw(reg.password, doc.passhash) then
                     cache_insert(
                         doc.mountpoint,
                         reg.client_id,
                         reg.username,
                         doc.publish_acl,
                         doc.subscribe_acl
                         )
                     reg.mountpoint = doc.mountpoint
                     — alternatively return just true, but then no modifiers can be set
                     return {
                         subscriber_id = {
                                 mountpoint = doc.mountpoint,
                                 client_id = reg.client_id
                             }
                         }

                 end
             end
         end
     end
     return false
end

image

Of course, you can also return other modifiers. Here’s a more exhaustive list for auth_on_register from the VerneMQ documentation:

image

Note: it’s important to provide the correct types:

image

the subscriber_id is a more complex type consisting of a Tuple (from Erlang’s point of view) of mountpoint and client_id.

That is, why I pass in a table of a table (in Lua’s terminology):

                    return {
                         subscriber_id = {
                                 mountpoint = doc.mountpoint,
                                 client_id = reg.client_id
                             }
                         }

Note: the formatting is quite unimportant for Lua, I have just set it up this way for better readability.

Several parameters can be modified at once. For instance you could throttle the message rate, change the clean_session flag, etc.

Elaboration of the Lua code

Note that I have updated the initial checks to read:

doc = mongodb.find_one(pool, „vmq_acl_auth“,
                         {client_id = reg.client_id,
                          username = reg.username})

Thus omitting the check against the mountpoint. As we are going to set the mountpoint from the database, we do not care about the initial mountpoint of the client (which will be “” an empty string) most probably.

I return the mountpoint as read from the database, and set the client_id as the original one passed to us during the authentication request. At this point the user is already authenticated against the database.

Reloading the script

You can simply reload the script after updating it run-time, using the following command line:

vmq-admin script reload path=./share/lua/auth/mongodb.lua

VerneMQ does not need to be shut down and restarted for this. This is good, as it speeds up development and testing immensely!

Debugging (showing sessions)

Simply use

vmq-admin session show

image

As you can see, the mountpoints are also displayed.

This corresponds to the expected database information I have:

image

The client_id shahrukh is supposed to have an empty mountpoint, and the client_id goat is supposed to have the mountpoint beardedgoat.

Note that the ACLs at this point are very permissive to allow for easier debugging:

image

Bonus: mosquitto_sub commands for testing

mosquitto_sub -t „#“ -u „o2PTwBb“ -P „Dis8gJ2yhdSYmkQBH1mCosxgJmAxCQm3698tg7Mh8mNFAHXDf4“ –host 192.168.1.2 –port 1883 –id „shahrukh“ -q 2 –verbose

mosquitto_sub -t „#“ -u „o2PTwBb“ -P „Dis8gJ2yhdSYmkQBH1mCosxgJmAxCQm3698tg7Mh8mNFAHXDf4“ –host 192.168.1.2 –port 1883 –id „goat“ -q 2 –verbose

image

About Erlang

Erlang is a language which was developed at Ericsson specifically for high-availability fault-tolerant telecommunication systems.

It has some interesting features, if you are interested, read more on Wikipedia about it.

The main challenge to understand Erlang code is that it is quite different from anything I have encountered so far.

Erlang is a functional language. This means, that you do not write code which says “do this, do that, look at the variable, then do this, do that”, but that everything is called as a function with a return value.

e.g. instead of loops you will have functions calling each other recursively.

Also, the Erlang run-time matches the correct function it needs to call depending on the parameters.

E.g. for a recursive function / loop, you will continue calling the function until a certain point is reached. For instance, you have processed the last item of the list, and the list is empty – here, you can respond differently, instead of continuing to recurse, give back the final result.

Understanding VerneMQ Erlang Code

Note: I have reproduced the code only for the express purpose to explain what it does, all code is copyright by Octavo Labs AG.

Understanding the erl code a bit, for vmq_diversity_plugin.erl:

-module(vmq_diversity_plugin). 

the module name here must match the filename of the module

-export([auth_on_register/5, … etc]).

which functions can be called in this module, and the number of parameters they expect.

%%%===================================================================

%%% Hook functions

%%%===================================================================

%% called as an all_till_ok hook

auth_on_register(Peer, SubscriberId, UserName, Password, CleanSession) ->

{PPeer, Port} = peer(Peer),

{MP, ClientId} = subscriber_id(SubscriberId),

Res = all_till_ok(auth_on_register, [{addr, PPeer},

{port, Port},

{mountpoint, MP},

{client_id, ClientId},

{username, nilify(UserName)},

{password, nilify(Password)},

{clean_session, CleanSession}]),

conv_res(auth_on_reg, Res).

all_till_ok will call all available authentication „backends“ (hooks) until one returns ok in turn, to give each a chance to authenticate the user.

auth_on_publish(UserName, SubscriberId, QoS, Topic, Payload, IsRetain) ->

{MP, ClientId} = subscriber_id(SubscriberId),

case vmq_diversity_cache:match_publish_acl(MP, ClientId, QoS, Topic, Payload, IsRetain) of

true ->

%% Found a valid cache entry which grants this publish

ok;

Modifiers when is_list(Modifiers) ->

%% Found a valid cache entry containing modifiers

{ok, Modifiers};

false ->

%% Found a valid cache entry which rejects this publish

{error, not_authorized};

no_cache ->

Res = all_till_ok(auth_on_publish, [{username, nilify(UserName)},

{mountpoint, MP},

{client_id, ClientId},

{qos, QoS},

{topic, unword(Topic)},

{payload, Payload},

{retain, IsRetain}]),

conv_res(auth_on_pub, Res)

end.

note:

SubscriberId contains both mountpoint and client_id:

clip_image002

it is unpacked into mountpoint and client_id in the first statement:

{MP, ClientId} = subscriber_id(SubscriberId),

note that variables start with a capital letter in Erlang. MP and ClientId are therefore variables.

case vmq_diversity_cache:match_publish_acl(MP, ClientId, QoS, Topic, Payload, IsRetain) of

true ->

%% Found a valid cache entry which grants this publish

ok;

the vmq_diversity_cache module is called with the function match_publish_acl.

the mountpoint (MP), ClientId, Quality of Service (QoS), Topic, Payload and IsRetain are passed into it.

If this function returns true, the return value of the Erlang function auth_on_publish is ok.

note that in Erlang, since „ok“ starts with a small letter, it is just a name – not a variable (it is, specifically, an instance of the datatype Atom). The equivalent in Crystal Lang would probably be Symbols.

ok;

note that the „;“ is not a termination of the statement, but should be read as an „else“.

Modifiers when is_list(Modifiers) ->

%% Found a valid cache entry containing modifiers

{ok, Modifiers};

when the returned value is a list , it is passed as a return value with Modifiers – in this case as an Erlang tuple {ok, Modifiers} (grouping the Atom „ok“ and the variable Modifiers together and returning them).

Note that is_list is a built in function (BIF) of Erlang, and not something Lua /Luerl specific.

false ->

%% Found a valid cache entry which rejects this publish

{error, not_authorized};

here instead of „ok“ „error“ is passed, along with „not_authorized“. These are all Atoms, not variables – as is false.

no_cache ->

Res = all_till_ok(auth_on_publish, [{username, nilify(UserName)},

{mountpoint, MP},

{client_id, ClientId},

{qos, QoS},

{topic, unword(Topic)},

{payload, Payload},

{retain, IsRetain}]),

conv_res(auth_on_pub, Res)

finally, if the cache returns „no_cache“, we call the all_till_ok function, with „auth_on_publish“, passing in an array of tuples, looking whether any hook can authenticate the publishing of this message.

all_till_ok([Pid|Rest], HookName, Args) ->

case vmq_diversity_script:call_function(Pid, HookName, Args) of

true ->

ok;

Mods0 when is_list(Mods0) ->

Mods1 = convert_modifiers(HookName, Mods0),

case vmq_plugin_util:check_modifiers(HookName, Mods1) of

error ->

{error, {invalid_modifiers, Mods1}};

CheckedModifiers ->

{ok, CheckedModifiers}

end;

false ->

{error, lua_script_returned_false};

error ->

{error, lua_script_error};

{error, Reason} ->

{error, Reason};

_ ->

all_till_ok(Rest, HookName, Args)

end;

all_till_ok([], _, _) -> next.

here the function all_till_ok calls the function vmq_diversity_script:call_function, also passing in the HookName (which is set to e.g. auth_on_publish or auth_on_register), and the arguments to the Hook.

If the hook returns „true“, then the value to return is „ok“.

Otherwise if the Hook returns a list of modifiers,

the modifiers are run through convert_modifiers

Since variables are immutable in Erlang – that is, once you assign something to a variable, you can’t reassign to the variable, we use a new variable for the converted modifiers, Mods1.

convert_modifiers(Hook, Mods0) ->

vmq_diversity_utils:convert_modifiers(Hook, Mods0).

this just wraps the vmq_diversity_utils:convert_modifiers function.

It is defined in here:

https://github.com/vernemq/vernemq/blob/c8b92f398e76d6ce4b8cca5e438e8ae1e717d71c/apps/vmq_diversity/src/vmq_diversity_utils.erl

convert_modifiers(auth_on_subscribe, Mods0) ->

normalize_subscribe_topics(convert(Mods0));

convert_modifiers(on_unsubscribe, Mods0) ->

convert(Mods0);

convert_modifiers(Hook, Mods0) ->

Mods1 = atomize_keys(Mods0),

Converted = lists:map(

fun(Mod) ->

convert_modifier(Hook, Mod)

end,

Mods1),

case lists:member(Hook, [auth_on_register_m5,

auth_on_subscribe_m5,

auth_on_unsubscribe_m5,

on_unsubscribe_m5,

auth_on_publish_m5,

on_deliver_m5,

on_auth_m5]) of

true ->

maps:from_list(Converted);

_ ->

Converted

end.

This shows Erlang’s capabilities of matching. Depending with which Atom as first part the function is called, a different function is executed.

If called with auth_on_subscribe, it will call normalize_subscribe_topics, passing in a converted version of Mods0.

normalize_subscribe_topics(convert(Mods0));

convert is defined and explained further down in the same file:

%% @doc recursively converts a value returned from lua to an erlang

%% data structure.

convert(Val) when is_list(Val) ->

convert_list(Val, []);

convert(Val) when is_number(Val) ->

case round(Val) of

RVal when RVal == Val -> RVal;

_ -> Val

end;

convert(Val) when is_binary(Val) -> Val;

convert(Val) when is_boolean(Val) -> Val;

convert(nil) -> undefined.

if Val (Mods0 in our case) is a list, convert_list is called:

convert_list([ListItem|Rest], Acc) ->

convert_list(Rest, [convert_list_item(ListItem)|Acc]);

convert_list([], Acc) -> lists:reverse(Acc).

convert_list_item({Idx, Val}) when is_integer(Idx) ->

%% lua array

convert(Val);

convert_list_item({BinKey, Val}) when is_binary(BinKey) ->

try list_to_existing_atom(binary_to_list(BinKey)) of

Key -> {Key, convert(Val)}

catch

_:_ ->

{BinKey, convert(Val)}

end.

convert_list([ListItem|Rest], Acc) ->

convert_list(Rest, [convert_list_item(ListItem)|Acc]);

convert_list([], Acc) -> lists:reverse(Acc).

This uses Erlang’s recursion. The list is converted one item at a time, by recursively calling itself (and processing each list item in turn, using convert_list_item). The list item is transferred from the left to the right when it has been processed, so that the left variable ends up as an empty list. Once it does, the second part will match:

convert_list([], Acc) -> lists:reverse(Acc).

and the result of the function will be lists:reverse(Acc) (the right hand side).

convert_list_item is using some Erlang functions, currently I do not understand that part completely. I do undestand the first part however:

convert_list_item({Idx, Val}) when is_integer(Idx) ->

%% lua array

convert(Val);

For a Lua array (table), the array is unpacked and the Array index of the corresponding item is dropped.

Note, that in Lua the table is the only associative array / hash / … type. There is no specific array type.

Back to the all_till_ok function:

case vmq_plugin_util:check_modifiers(HookName, Mods1) of

error ->

{error, {invalid_modifiers, Mods1}};

CheckedModifiers ->

{ok, CheckedModifiers}

end;

The converted Modifiers are passed into vmq_plugin_util:check_modifiers (that is, the check_modifiers function in the vmq_plugin_util module).

If this function returns an error, the return value is a tuple of {error, {invalid_modifiers, Mods1}};

error and invalid_modifiers are, remember, just names. Additionally the modifiers are passed along for our inspection. (Again, note the semicolon at the end of the first part of the statement, indicating an „else“)

if the function returns a variable instead, we return a tuple of {ok, CheckedModifiers}.

This function check_modifiers is implemented here:

https://github.com/vernemq/vernemq/blob/cd6666a2a57e16eb04011d0628359ad6a4883b34/apps/vmq_plugin/src/vmq_plugin_util.erl

-spec check_modifiers(atom(), list() | map()) -> list() | map() | error.

This line tells us, that the check_modifiers function expects an atom (i.e. auth_on_subscribe) as first parameter, and a list() or a map() as a second parameter. It returns a list(), or a map(), or error (an atom()).

clip_image004

An intimidating function, I will honestly admit it. Let’s step through it:

AllowedModifiers = modifiers(Hook),

AllowedModifiers is a variable. It calls the function modifiers, with the Hook variable being passed in.

For instance, for auth_on_register, it will match the following function:

modifiers(auth_on_register) ->

[{allow_register, fun val_bool/1},

{allow_publish, fun val_bool/1},

{allow_subscribe, fun val_bool/1},

{allow_unsubscribe, fun val_bool/1},

{max_message_size, fun val_int/1},

{subscriber_id, fun val_subscriber_id/1},

{clean_session, fun val_bool/1},

{max_message_rate, fun val_int/1},

{max_inflight_messages, fun val_int/1},

{shared_subscription_policy, fun val_atom/1},

{retry_interval, fun val_int/1},

{upgrade_qos, fun val_bool/1},

{allow_multiple_sessions, fun val_bool/1},

{max_online_messages, fun val_int/1},

{max_offline_messages, fun val_int/1},

{queue_deliver_mode, fun val_atom/1},

{queue_type, fun val_atom/1},

{max_drain_time, fun val_int/1},

{max_msgs_per_drain_step, fun val_int/1}];

so it will return an array of tuples. In these tuples, the allowed names (which can be modified) are defined, e.g. the subscriber_id. (remember, the subscriber_id contains both the mountpoint and the client_id!) , and the function to check the value against. e.g. fun val_subscriber_id/1 means the function val_subscriber_id is to be checked, with 1 parameter to be passed to it.

To understand the next statements, we have to look at a bit of Erlang documentation:

lists:foldl(fun (_, error) -> error;

http://erlang.org/doc/man/lists.html

foldl will call a function on successive elements of a list

This is how it is defined.

foldl(Fun, Acc0, List) -> Acc1

so we pass in a function, an empty List, and our modifiers.

explanation for the empty list: „Acc0 is returned if the list is empty“ – that is if our initial list of modifiers is empty, we return an empty list.

The „_“ signifies an anonymous variable. That means, that the variable is required, but its value can be ignored.

See http://erlang.org/doc/reference_manual/expressions.html for details.

thus, if the function is called with something, and error as the second variable passed in, the result is error.

otherwise, if called with a tuple {ModKey, ModVal} and Acc, the result value depends whether the key is found in the list of AllowedModifiers. If it is not found (false) , then the result is error.

keyfind will return a tuple if the key is found (including the key), otherwise false.

Since we have already determined that we know the key, it is in the list, we can ignore it using the anonymous variable „_“, and focus on the ValidatorFun (validator function).

Here, then ModVal is run through the validator function (which is defined in the appropriate modifiers function which we matched).

If the function returns true, then the tuple of ModKey and ModVal is returned (it is ok and has been checked) along with the rest of Acc.

If it is false, an error will be logged (can’t validate modifier), and error will be returned.

If it is a tuple with ok and NewModVal, then ModKey and NewModVal will be used.

Let’s have a look at val_subscriber_id, which allows us to modify the subscriber, and therefore to change the mountpoint:

clip_image006

if we pass in a list, then further checks are done. Otherwise, false is returned.

The list has to contain both „client_id“ and „mountpoint“. The remainder of the code is poorly understood by me at the moment.

If this first statement does not match, we also return false.

The result

See Lua code in the introduction, what we set out to achieve has been achieved, the mountpoint is now a custom one for each client:

clip_image007

References:

https://github.com/vernemq/vernemq/issues/533

https://docs.vernemq.com/configuring-vernemq/db-auth

http://erlang.org/doc/getting_started/users_guide.html

https://docs.vernemq.com/plugin-development/luaplugins

https://docs.vernemq.com/plugin-development/sessionlifecycle

https://www.erlang.org/docs

http://erlang.org/doc/man/erlang.html

https://en.wikipedia.org/wiki/Erlang_(programming_language)

https://github.com/vernemq/vernemq/issues/312 (this issue pointed me in the right direction, thank you very much!)

https://github.com/vernemq/vernemq/blob/cd6666a2a57e16eb04011d0628359ad6a4883b34/apps/vmq_plugin/src/vmq_plugin_util.erl

https://github.com/vernemq/vernemq/blob/c8b92f398e76d6ce4b8cca5e438e8ae1e717d71c/apps/vmq_diversity/src/vmq_diversity_plugin.erl