1 -------------------------------------------------------------------------------
2 -- Copas - Coroutine Oriented Portable Asynchronous Services
4 -- Offers a dispatcher and socket operations based on coroutines.
6 -- copas.addserver(server, handler, timeout)
7 -- copas.addthread(thread, ...) Create a new coroutine thread and run it with args
8 -- copas.loop(timeout) - listens infinetely
9 -- copas.step(timeout) - executes one listening step
10 -- copas.receive(pattern or number) - receives data from a socket
11 -- copas.settimeout(client, time) if time=0 copas.receive(bufferSize) - receives partial data from a socket were data<=bufferSize
12 -- copas.send - sends data through a socket
13 -- copas.wrap - wraps a LuaSocket socket with Copas methods
14 -- copas.connect - blocks only the thread until connection completes
15 -- copas.flush - *deprecated* do nothing
17 -- Authors: Andre Carregal and Javier Guerra
18 -- Contributors: Diego Nehab, Mike Pall, David Burgess, Leonardo Godinho,
19 -- Thomas Harning Jr. and Gary NG
21 -- Copyright 2005 - Kepler Project (www.keplerproject.org)
23 -- $Id: copas.lua,v 1.31 2008/05/19 18:57:13 carregal Exp $
24 -------------------------------------------------------------------------------
25 local socket = require "socket"
28 local copcall = luci.util.copcall
31 local WATCH_DOG_TIMEOUT = 120
33 -- Redefines LuaSocket functions with coroutine safe versions
34 -- (this allows the use of socket.http from within copas)
35 local function statusHandler(status, ...)
36 if status then return ... end
39 function socket.protect(func)
41 return statusHandler(copcall(func, ...))
45 function socket.newtry(finalizer)
47 local status = (...) or false
48 if (status==false)then
49 copcall(finalizer, select(2, ...) )
50 error((select(2, ...)), 0)
55 -- end of LuaSocket redefinitions
58 module ("copas", package.seeall)
60 -- Meta information is public even if begining with an "_"
61 _COPYRIGHT = "Copyright (C) 2005 Kepler Project"
62 _DESCRIPTION = "Coroutine Oriented Portable Asynchronous Services"
63 _VERSION = "Copas 1.1.3"
65 -------------------------------------------------------------------------------
66 -- Simple set implementation based on LuaSocket's tinyirc.lua example
67 -- adds a FIFO queue for each value in the set
68 -------------------------------------------------------------------------------
69 local function newset()
73 setmetatable(set, { __index = {
74 insert = function(set, value)
75 if not reverse[value] then
81 remove = function(set, value)
82 local index = reverse[value]
94 push = function (set, key, itm)
103 pop = function (set, key)
106 local ret = table.remove (t, 1)
117 local _servers = newset() -- servers being handled
118 local _reading_log = {}
119 local _writing_log = {}
121 local _reading = newset() -- sockets currently being read
122 local _writing = newset() -- sockets currently being written
124 -------------------------------------------------------------------------------
125 -- Coroutine based socket I/O functions.
126 -------------------------------------------------------------------------------
127 -- reads a pattern from a client and yields to the reading set on timeouts
128 function receive(client, pattern, part)
130 pattern = pattern or "*l"
132 s, err, part = client:receive(pattern, part)
133 if s or err ~= "timeout" then
134 _reading_log[client] = nil
137 _reading_log[client] = os.time()
138 coroutine.yield(client, _reading)
142 -- same as above but with special treatment when reading chunks,
143 -- unblocks on any data received.
144 function receivePartial(client, pattern)
146 pattern = pattern or "*l"
148 s, err, part = client:receive(pattern)
149 if s or ( (type(pattern)=="number") and part~="" and part ~=nil ) or
150 err ~= "timeout" then
151 _reading_log[client] = nil
154 _reading_log[client] = os.time()
155 coroutine.yield(client, _reading)
159 -- sends data to a client. The operation is buffered and
160 -- yields to the writing set on timeouts
161 function send(client,data, from, to)
164 local lastIndex = from - 1
167 s, err, lastIndex = client:send(data, lastIndex + 1, to)
168 -- adds extra corrotine swap
169 -- garantees that high throuput dont take other threads to starvation
170 if (math.random(100) > 90) then
171 _writing_log[client] = os.time()
172 coroutine.yield(client, _writing)
174 if s or err ~= "timeout" then
175 _writing_log[client] = nil
176 return s, err,lastIndex
178 _writing_log[client] = os.time()
179 coroutine.yield(client, _writing)
183 -- waits until connection is completed
184 function connect(skt,host, port)
186 local ret,err = skt:connect (host, port)
187 if ret or err ~= "timeout" then
190 _writing_log[skt] = os.time()
191 coroutine.yield(skt, _writing)
192 ret,err = skt:connect (host, port)
193 _writing_log[skt] = nil
194 if (err=="already connected") then
200 -- flushes a client write buffer (deprecated)
201 function flush(client)
204 -- wraps a socket to use Copas methods (send, receive, flush and settimeout)
205 local _skt_mt = {__index = {
206 send = function (self, data, from, to)
207 return send (self.socket, data, from, to)
210 receive = function (self, pattern)
211 if (self.timeout==0) then
212 return receivePartial(self.socket, pattern)
214 return receive (self.socket, pattern)
217 flush = function (self)
218 return flush (self.socket)
221 settimeout = function (self,time)
228 return setmetatable ({socket = skt}, _skt_mt)
231 --------------------------------------------------
233 --------------------------------------------------
235 local _errhandlers = {} -- error handler per coroutine
237 function setErrorHandler (err)
238 local co = coroutine.running()
240 _errhandlers [co] = err
244 local function _deferror (msg, co, skt)
248 -------------------------------------------------------------------------------
250 -------------------------------------------------------------------------------
252 local function _doTick (co, skt, ...)
253 if not co then return end
255 local ok, res, new_q = coroutine.resume(co, skt, ...)
257 if ok and res and new_q then
261 if not ok then copcall (_errhandlers [co] or _deferror, res, co, skt) end
262 if skt then skt:close() end
263 _errhandlers [co] = nil
267 -- accepts a connection on socket input
268 local function _accept(input, handler)
269 local client = input:accept()
272 local co = coroutine.create(handler)
274 --_reading:insert(client)
279 -- handle threads on a queue
280 local function _tickRead (skt)
281 _doTick (_reading:pop (skt), skt)
284 local function _tickWrite (skt)
285 _doTick (_writing:pop (skt), skt)
288 -------------------------------------------------------------------------------
289 -- Adds a server/handler pair to Copas dispatcher
290 -------------------------------------------------------------------------------
291 function addserver(server, handler, timeout)
292 server:settimeout(timeout or 0.1)
293 _servers[server] = handler
294 _reading:insert(server)
297 -------------------------------------------------------------------------------
298 -- Adds an new courotine thread to Copas dispatcher
299 -------------------------------------------------------------------------------
300 function addthread(thread, ...)
301 local co = coroutine.create(thread)
302 _doTick (co, nil, ...)
305 -------------------------------------------------------------------------------
307 -------------------------------------------------------------------------------
311 local function addtaskRead (tsk)
312 -- lets tasks call the default _tick()
313 tsk.def_tick = _tickRead
318 local function addtaskWrite (tsk)
319 -- lets tasks call the default _tick()
320 tsk.def_tick = _tickWrite
325 local function tasks ()
329 -------------------------------------------------------------------------------
330 -- main tasks: manage readable and writable socket sets
331 -------------------------------------------------------------------------------
332 -- a task to check ready to read events
333 local _readable_t = {
334 events = function(self)
342 tick = function (self, input)
343 local handler = _servers[input]
345 input = _accept(input, handler)
347 _reading:remove (input)
348 self.def_tick (input)
353 addtaskRead (_readable_t)
356 -- a task to check ready to write events
357 local _writable_t = {
358 events = function (self)
366 tick = function (self, output)
367 _writing:remove (output)
368 self.def_tick (output)
372 addtaskWrite (_writable_t)
374 local last_cleansing = 0
375 local function _select (timeout)
381 local now = os.time()
382 local duration = os.difftime
385 _readable_t._evs, _writable_t._evs, err = socket.select(_reading, _writing, timeout)
386 local r_evs, w_evs = _readable_t._evs, _writable_t._evs
388 if duration(now, last_cleansing) > WATCH_DOG_TIMEOUT then
390 for k,v in pairs(_reading_log) do
391 if not r_evs[k] and duration(now, v) > WATCH_DOG_TIMEOUT then
392 _reading_log[k] = nil
393 r_evs[#r_evs + 1] = k
398 for k,v in pairs(_writing_log) do
399 if not w_evs[k] and duration(now, v) > WATCH_DOG_TIMEOUT then
400 _writing_log[k] = nil
401 w_evs[#w_evs + 1] = k
407 if err == "timeout" and #r_evs + #w_evs > 0 then return nil
412 -------------------------------------------------------------------------------
413 -- Dispatcher loop step.
414 -- Listen to client requests and handles them
415 -------------------------------------------------------------------------------
416 function step(timeout)
417 local err = _select (timeout)
418 if err == "timeout" then return end
424 for tsk in tasks() do
425 for ev in tsk:events () do
431 -------------------------------------------------------------------------------
432 -- Dispatcher endless loop.
433 -- Listen to client requests and handles them forever
434 -------------------------------------------------------------------------------
435 function loop(timeout)