* Added preliminary HTTPD construct
[project/luci.git] / libs / httpd / luasrc / copas.lua
1 -------------------------------------------------------------------------------
2 -- Copas - Coroutine Oriented Portable Asynchronous Services
3 --
4 -- Offers a dispatcher and socket operations based on coroutines.
5 -- Usage:
6 --    copas.addserver(server, handler, timeout)
7 --    copas.addthread(thread, ...) Create a new coroutine thread and run it with args
8 --    copas.loop(timeout) - listens infinetely
9 --    copas.step(timeout) - executes one listening step
10 --    copas.receive(pattern or number) - receives data from a socket
11 --    copas.settimeout(client, time) if time=0 copas.receive(bufferSize) - receives partial data from a socket were data<=bufferSize
12 --    copas.send  - sends data through a socket
13 --    copas.wrap  - wraps a LuaSocket socket with Copas methods
14 --    copas.connect - blocks only the thread until connection completes
15 --    copas.flush - *deprecated* do nothing
16 --
17 -- Authors: Andre Carregal and Javier Guerra
18 -- Contributors: Diego Nehab, Mike Pall, David Burgess, Leonardo Godinho,
19 --               Thomas Harning Jr. and Gary NG
20 --
21 -- Copyright 2005 - Kepler Project (www.keplerproject.org)
22 --
23 -- $Id: copas.lua,v 1.31 2008/05/19 18:57:13 carregal Exp $
24 -------------------------------------------------------------------------------
25 local socket = require "socket"
26
27 require"luci.util"
28 local copcall = luci.util.copcall
29
30
31 local WATCH_DOG_TIMEOUT = 120
32
33 -- Redefines LuaSocket functions with coroutine safe versions
34 -- (this allows the use of socket.http from within copas)
35 local function statusHandler(status, ...)
36  if status then return ... end
37  return nil, ...
38 end
39 function socket.protect(func)
40  return function (...)
41                return statusHandler(copcall(func, ...))
42        end
43 end
44
45 function socket.newtry(finalizer)
46        return function (...)
47                local status = (...) or false
48                if (status==false)then
49                        copcall(finalizer, select(2, ...) )
50                        error((select(2, ...)), 0)
51                end
52                return ...
53        end
54 end
55 -- end of LuaSocket redefinitions
56
57
58 module ("copas", package.seeall)
59
60 -- Meta information is public even if begining with an "_"
61 _COPYRIGHT   = "Copyright (C) 2005 Kepler Project"
62 _DESCRIPTION = "Coroutine Oriented Portable Asynchronous Services"
63 _VERSION     = "Copas 1.1.3"
64
65 -------------------------------------------------------------------------------
66 -- Simple set implementation based on LuaSocket's tinyirc.lua example
67 -- adds a FIFO queue for each value in the set
68 -------------------------------------------------------------------------------
69 local function newset()
70    local reverse = {}
71    local set = {}
72    local q = {}
73    setmetatable(set, { __index = {
74        insert = function(set, value)
75            if not reverse[value] then
76                set[#set + 1] = value
77                reverse[value] = #set
78            end
79        end,
80
81        remove = function(set, value)
82            local index = reverse[value]
83            if index then
84                reverse[value] = nil
85                local top = set[#set]
86                set[#set] = nil
87                if top ~= value then
88                    reverse[top] = index
89                    set[index] = top
90                end
91            end
92        end,
93
94                push = function (set, key, itm)
95                        local qKey = q[key]
96                        if qKey == nil then
97                                q[key] = {itm}
98                        else
99                                qKey[#qKey + 1] = itm
100                        end
101                end,
102
103        pop = function (set, key)
104          local t = q[key]
105          if t ~= nil then
106            local ret = table.remove (t, 1)
107            if t[1] == nil then
108              q[key] = nil
109            end
110            return ret
111          end
112        end
113    }})
114    return set
115 end
116
117 local _servers = newset() -- servers being handled
118 local _reading_log = {}
119 local _writing_log = {}
120
121 local _reading = newset() -- sockets currently being read
122 local _writing = newset() -- sockets currently being written
123
124 -------------------------------------------------------------------------------
125 -- Coroutine based socket I/O functions.
126 -------------------------------------------------------------------------------
127 -- reads a pattern from a client and yields to the reading set on timeouts
128 function receive(client, pattern, part)
129  local s, err
130  pattern = pattern or "*l"
131  repeat
132    s, err, part = client:receive(pattern, part)
133    if s or err ~= "timeout" then
134        _reading_log[client] = nil
135        return s, err, part
136    end
137    _reading_log[client] = os.time()
138    coroutine.yield(client, _reading)
139  until false
140 end
141
142 -- same as above but with special treatment when reading chunks,
143 -- unblocks on any data received.
144 function receivePartial(client, pattern)
145  local s, err, part
146  pattern = pattern or "*l"
147  repeat
148    s, err, part = client:receive(pattern)
149    if s or ( (type(pattern)=="number") and part~="" and part ~=nil ) or
150       err ~= "timeout" then
151        _reading_log[client] = nil
152        return s, err, part
153    end
154    _reading_log[client] = os.time()
155    coroutine.yield(client, _reading)
156  until false
157 end
158
159 -- sends data to a client. The operation is buffered and
160 -- yields to the writing set on timeouts
161 function send(client,data, from, to)
162  local s, err,sent
163  from = from or 1
164  local lastIndex = from - 1
165
166  repeat
167    s, err, lastIndex = client:send(data, lastIndex + 1, to)
168    -- adds extra corrotine swap
169    -- garantees that high throuput dont take other threads to starvation
170    if (math.random(100) > 90) then
171        _writing_log[client] = os.time()
172        coroutine.yield(client, _writing)
173    end
174    if s or err ~= "timeout" then
175        _writing_log[client] = nil
176        return s, err,lastIndex
177    end
178    _writing_log[client] = os.time()
179    coroutine.yield(client, _writing)
180  until false
181 end
182
183 -- waits until connection is completed
184 function connect(skt,host, port)
185        skt:settimeout(0)
186        local ret,err = skt:connect (host, port)
187        if ret or err ~= "timeout" then
188        return ret, err
189    end
190    _writing_log[skt] = os.time()
191        coroutine.yield(skt, _writing)
192        ret,err = skt:connect (host, port)
193    _writing_log[skt] = nil
194        if (err=="already connected") then
195                return 1
196        end
197        return ret, err
198 end
199
200 -- flushes a client write buffer (deprecated)
201 function flush(client)
202 end
203
204 -- wraps a socket to use Copas methods (send, receive, flush and settimeout)
205 local _skt_mt = {__index = {
206        send = function (self, data, from, to)
207                        return send (self.socket, data, from, to)
208        end,
209
210        receive = function (self, pattern)
211                        if (self.timeout==0) then
212                                return receivePartial(self.socket, pattern)
213                        end
214                        return receive (self.socket, pattern)
215        end,
216
217        flush = function (self)
218                        return flush (self.socket)
219        end,
220
221        settimeout = function (self,time)
222                        self.timeout=time
223                        return
224        end,
225 }}
226
227 function wrap (skt)
228        return  setmetatable ({socket = skt}, _skt_mt)
229 end
230
231 --------------------------------------------------
232 -- Error handling
233 --------------------------------------------------
234
235 local _errhandlers = {}   -- error handler per coroutine
236
237 function setErrorHandler (err)
238        local co = coroutine.running()
239        if co then
240                _errhandlers [co] = err
241        end
242 end
243
244 local function _deferror (msg, co, skt)
245        print (msg, co, skt)
246 end
247
248 -------------------------------------------------------------------------------
249 -- Thread handling
250 -------------------------------------------------------------------------------
251
252 local function _doTick (co, skt, ...)
253        if not co then return end
254
255        local ok, res, new_q = coroutine.resume(co, skt, ...)
256
257        if ok and res and new_q then
258                new_q:insert (res)
259                new_q:push (res, co)
260        else
261                if not ok then copcall (_errhandlers [co] or _deferror, res, co, skt) end
262                if skt then skt:close() end
263                _errhandlers [co] = nil
264        end
265 end
266
267 -- accepts a connection on socket input
268 local function _accept(input, handler)
269        local client = input:accept()
270        if client then
271                client:settimeout(0)
272                local co = coroutine.create(handler)
273                _doTick (co, client)
274                --_reading:insert(client)
275        end
276        return client
277 end
278
279 -- handle threads on a queue
280 local function _tickRead (skt)
281        _doTick (_reading:pop (skt), skt)
282 end
283
284 local function _tickWrite (skt)
285        _doTick (_writing:pop (skt), skt)
286 end
287
288 -------------------------------------------------------------------------------
289 -- Adds a server/handler pair to Copas dispatcher
290 -------------------------------------------------------------------------------
291 function addserver(server, handler, timeout)
292  server:settimeout(timeout or 0.1)
293  _servers[server] = handler
294  _reading:insert(server)
295 end
296
297 -------------------------------------------------------------------------------
298 -- Adds an new courotine thread to Copas dispatcher
299 -------------------------------------------------------------------------------
300 function addthread(thread, ...)
301        local co = coroutine.create(thread)
302        _doTick (co, nil, ...)
303 end
304
305 -------------------------------------------------------------------------------
306 -- tasks registering
307 -------------------------------------------------------------------------------
308
309 local _tasks = {}
310
311 local function addtaskRead (tsk)
312        -- lets tasks call the default _tick()
313        tsk.def_tick = _tickRead
314
315        _tasks [tsk] = true
316 end
317
318 local function addtaskWrite (tsk)
319        -- lets tasks call the default _tick()
320        tsk.def_tick = _tickWrite
321
322        _tasks [tsk] = true
323 end
324
325 local function tasks ()
326        return next, _tasks
327 end
328
329 -------------------------------------------------------------------------------
330 -- main tasks: manage readable and writable socket sets
331 -------------------------------------------------------------------------------
332 -- a task to check ready to read events
333 local _readable_t = {
334  events = function(self)
335        local i = 0
336        return function ()
337                i = i + 1
338                return self._evs [i]
339        end
340  end,
341
342  tick = function (self, input)
343        local handler = _servers[input]
344        if handler then
345                input = _accept(input, handler)
346        else
347                _reading:remove (input)
348                self.def_tick (input)
349        end
350  end
351 }
352
353 addtaskRead (_readable_t)
354
355
356 -- a task to check ready to write events
357 local _writable_t = {
358  events = function (self)
359        local i = 0
360        return function ()
361                i = i+1
362                return self._evs [i]
363        end
364  end,
365
366  tick = function (self, output)
367        _writing:remove (output)
368        self.def_tick (output)
369  end
370 }
371
372 addtaskWrite (_writable_t)
373
374 local last_cleansing = 0
375 local function _select (timeout)
376        local err
377    local readable={}
378    local writable={}
379    local r={}
380    local w={}
381    local now = os.time()
382    local duration = os.difftime
383
384
385        _readable_t._evs, _writable_t._evs, err = socket.select(_reading, _writing, timeout)
386        local r_evs, w_evs = _readable_t._evs, _writable_t._evs
387
388    if duration(now, last_cleansing) > WATCH_DOG_TIMEOUT then
389        last_cleansing = now
390        for k,v in pairs(_reading_log) do
391            if not r_evs[k] and duration(now, v) > WATCH_DOG_TIMEOUT then
392                _reading_log[k] = nil
393                r_evs[#r_evs + 1] = k
394                r_evs[k] = #r_evs
395            end
396        end
397
398        for k,v in pairs(_writing_log) do
399            if not w_evs[k] and duration(now, v) > WATCH_DOG_TIMEOUT then
400                _writing_log[k] = nil
401                w_evs[#w_evs + 1] = k
402                w_evs[k] = #w_evs
403            end
404        end
405    end
406
407    if err == "timeout" and #r_evs + #w_evs > 0 then return nil
408    else return err end
409 end
410
411
412 -------------------------------------------------------------------------------
413 -- Dispatcher loop step.
414 -- Listen to client requests and handles them
415 -------------------------------------------------------------------------------
416 function step(timeout)
417  local err = _select (timeout)
418  if err == "timeout" then return end
419
420        if err then
421                error(err)
422        end
423
424        for tsk in tasks() do
425                for ev in tsk:events () do
426                        tsk:tick (ev)
427                end
428        end
429 end
430
431 -------------------------------------------------------------------------------
432 -- Dispatcher endless loop.
433 -- Listen to client requests and handles them forever
434 -------------------------------------------------------------------------------
435 function loop(timeout)
436        while true do
437                step(timeout)
438        end
439 end