cancel
Showing results for 
Search instead for 
Did you mean: 
cancel

Erlang API: action init callback not implemented?

alex-filippov
Beginner
Beginner

Trying to write an action in Erlang API and wanted to spawn a new worker in init() callback for each new action invocation.
Action init callback seems to be not implemented in Erlang API: confd_action_cb record in econfd.hrl has no "init" field.
NSO versions 4.7.4.3 and 5.1.

7 REPLIES 7

sunge
Cisco Employee
Cisco Employee

Hi,

 

I guess you want to avoid a long running action to block other user session invocations of the action?

Using the erlang api, you do not have to handle that case as you would in the C-api.

 

econfd:init_daemon(...):

Starts and links to a gen_server which connects to ConfD/NSO. This gen_server holds two sockets to ConfD/NSO, one so called control socket and one worker socket (See confd_lib_dp(3) for an explanation of those sockets.)

To avoid blocking control socket callback requests due to long-running worker socket callbacks, the control socket callbacks are run in the gen_server, while the worker socket callbacks are run in a separate process that is spawned by the gen_server. This means that applications must not share e.g. MAAPI sockets between transactions, since this could result in simultaneous use of a socket by the gen_server and the spawned process.

...

Thank you. Yes, that blocking is what I try to avoid. And that was initially my hope that Erlang API does not need the init callback for actions, because it somehow spawns workers as needed. But so far I am stuck with only one worker, which blocks other users while it executes one action.

The problem seems to be that there is only one worker socket and only one worker process. The daemon does not spawn a new worker for each incoming action, does not maintain a pool of workers (like Java and Python API do, I think), and there is no way for me to spawn workers myself (like in C API) because action init callback is not supported.

More in detail, as I understand:

econfd:init_daemon calls econfd_daemon:init, which opens one control and one worker socket. Dx#confd_daemon_ctx.wint gets set to 1.

After econfd:register_action_cb, econfd:register_done invokes econfd_daemon:handle_call(register_done..), which spawns a single worker process listening on the worker socket. The daemon is ready.

A user invokes an action, ?CONFD_PROTO_NEW_ACTION arrives via the control socket, the relevant clause of econfd_daemon:confd_fd_ready sends back wint=1, thus telling NCS to use worker socket one. NCS sends ?CONFD_CALL_ACTION via the worker socket and blocks while the worker runs the action callback.

Any further actions get processed quickly on the control socket and queue up on the worker socket. On the control socket, the handler of ?CONFD_PROTO_NEW_ACTION does not do any spawning, so there are no new workers and it's always wint=1 back to NCS, and NCS is stuck with one worker socket.

Hi,

 

I had a talk with the developers responsible for the Erlang API, and they suspect your findings are correct.

That said, I'm not entirely sure how to help you bypass the issue.

 

sunge
Cisco Employee
Cisco Employee

Quick question.

After calling init_daemon(...), did you run econfd:controlliing_process() to make sure the

worker process is being called?

No, I did not run that.

The worker process worked well without it. The action was doing its job.

radioman
Beginner
Beginner

Hi,

 

Sorry for reviving this old thread, but I am running into exactly the same problem with NSO 5.6.2.

 

I can't get actions callback's written with the Erlang API to run concurrently.

 

Is it still the case there is no way to spawn new workers for action calls ?

radioman
Beginner
Beginner

Hi,

 

Seems like there there is limited interest in the Erlang API. But I found this thread when I hit the problem so might be useful for others.

 

Of all the API's I could not accept the Erlang API, to have no action concurrency options at all. So I made a workaround/hack around this problem, it does re-implement some internal stuff from the econfd application, So would be appreciated if this could be fixed properly upstream at some point in time.

 

This code will spawn a new process for every action call, to limit the amount of code that needed to be re-implemented, the process is stopped with timer:exit_after/2.

 

-module(ec_cfs_test_action_server).

-behaviour(gen_server).

%% API
-export([
         start_link/0
        ]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3, call_action/4]).

-include_lib("econfd/include/econfd.hrl").
-include_lib("econfd/include/econfd_errors.hrl").
-include_lib("econfd/src/econfd_proto.hrl").

-define(SERVER, ?MODULE).

% from econfd_internal.hrl
-define(CLIENT_CAPI,         3).

%%%===================================================================
%%% API
%%%===================================================================

start_link() ->
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================

init([]) ->
    process_flag(trap_exit, true), % Triggers call to terminate/2
    Host = {127,0,0,1},
    {ok, Dx} = econfd_daemon:init([?MODULE,?CONFD_SILENT, user, {}, Host, ?NCS_PORT]),

    % While we are in the init state of the gen_server, we can't use the econfd functions,
    % since they rely in the gen_server to be ready.
    {reply, ok, Dx1} = econfd_daemon:handle_call({register_action_cb,
                                                  #confd_action_cb{actionpoint = 'cfs-test-action-erl-ap',
                                                    action = fun call_action/4}
                                                 }, self(), Dx),
    {reply, ok, Dx2} = econfd_daemon:handle_call(register_done, self(), Dx1),
    log(info, "Server started", []),
    {ok, Dx2}.

%%--------------------------------------------------------------------
handle_call(Req, From, State) ->
    econfd_daemon:handle_call(Req, From, State).

%%--------------------------------------------------------------------
handle_cast(Msg, State) ->
    econfd_daemon:handle_cast(Msg, State).

%%--------------------------------------------------------------------
handle_info(State={tcp, _Socket, Data}, Dx) ->
    case econfd_internal:term_get(Data) of
        _ET={_Op=?CONFD_PROTO_NEW_ACTION, _Qref, _Did, _Usid, _CallPoint, _Index, _TH} ->
            {ok, Socket} = econfd_internal:connect(Dx#confd_daemon_ctx.ip, Dx#confd_daemon_ctx.port, ?CLIENT_CAPI, [{active, true}]),
            Int = incr(),
            ok= econfd_internal:term_write(Socket, {?CONFD_PROTO_WORKER, Dx#confd_daemon_ctx.daemon_id, Int}),
            Worker = proc_lib:spawn_link(econfd_daemon, worker, [Dx]),
            % Have some failsafe to not have the worker hanning around forever.
            timer:exit_after(300*1000, Worker,worker_exit),
            Dx1 = Dx#confd_daemon_ctx{ wint=Int, worker=Socket, worker_pid=Worker},
            econfd:controlling_process(Socket, Worker),
            econfd_daemon:handle_info(State, Dx1),
            % Keep using original daemon state since action worker is for one-time usage.
            {noreply, Dx};
        _ ->
            econfd_daemon:handle_info(State, Dx)
    end;
handle_info({'EXIT',_,worker_exit}, State) ->
    {noreply,State};
handle_info(Info, State) ->
    econfd_daemon:handle_info(Info, State).

%%--------------------------------------------------------------------
terminate(Reason, State) ->
    log(info, "Server stopped - ~p", [Reason]),
    econfd_daemon:terminate(Reason, State).

%%--------------------------------------------------------------------
code_change(OldVsn, State, Extra) ->
    econfd_daemon:code_change(OldVsn, State, Extra).

%%%===================================================================
%%% Internal functions
%%%===================================================================

incr() ->
    X = get(sock_int),
    put(sock_int, X+1),
    X.

%%--------------------------------------------------------------------
call_action(_,[_|'test-action'],_,Args) ->
    log(info,"Args ~p ~p",[self(),Args]),
    % Do some heavy lifting :-)
    timer:sleep(10000),
    timer:exit_after(500,worker_exit),
    ok;

call_action(A1,A2,A3,A4) ->
    log(error,"Unknow call ~p ~p ~p ~p",[A1,A2,A3,A4]),
    timer:exit_after(500,worker_exit),
    ok.

%%--------------------------------------------------------------------
log(error, Format, Args) ->
    econfd:log(?CONFD_LEVEL_ERROR, "~p: " ++ Format, [?SERVER | Args]);
log(info, Format, Args) ->
    econfd:log(?CONFD_LEVEL_INFO,  "~p: " ++ Format, [?SERVER | Args]);
log(trace, Format, Args) ->
    econfd:log(?CONFD_LEVEL_TRACE, "~p: " ++ Format, [?SERVER | Args]).
Getting Started

Find answers to your questions by entering keywords or phrases in the Search bar above. New here? Use these resources to familiarize yourself with the community: