From: Steven Barth Date: Wed, 25 Jun 2008 16:38:48 +0000 (+0000) Subject: * libs/httpd: Introduced keep-alive and pipelining support X-Git-Tag: 0.8.0~775 X-Git-Url: https://git.archive.openwrt.org/?p=project%2Fluci.git;a=commitdiff_plain;h=7a4aa85dd64f72b9edcbf9310d0d95e59960d84e * libs/httpd: Introduced keep-alive and pipelining support --- diff --git a/libs/core/luasrc/sys.lua b/libs/core/luasrc/sys.lua index 17caa7824..54c4e0613 100644 --- a/libs/core/luasrc/sys.lua +++ b/libs/core/luasrc/sys.lua @@ -134,6 +134,23 @@ function syslog() end +-- Generates a random key of length BYTES +function uniqueid(bytes) + local fp = io.open("/dev/urandom") + local chunk = { fp:read(bytes):byte(1, bytes) } + fp:close() + + local hex = "" + + local pattern = "%02X" + for i, byte in ipairs(chunk) do + hex = hex .. pattern:format(byte) + end + + return hex +end + + group = {} group.getgroup = posix.getgroup diff --git a/libs/http/luasrc/http/protocol.lua b/libs/http/luasrc/http/protocol.lua index 318169e0c..205869a2d 100644 --- a/libs/http/luasrc/http/protocol.lua +++ b/libs/http/luasrc/http/protocol.lua @@ -114,9 +114,13 @@ local process_states = { } -- Extract "magic", the first line of a http message. -- Extracts the message type ("get", "post" or "response"), the requested uri -- or the status code if the line descripes a http response. -process_states['magic'] = function( msg, chunk ) +process_states['magic'] = function( msg, chunk, err ) if chunk ~= nil then + -- ignore empty lines before request + if #chunk == 0 then + return true, nil + end -- Is it a request? local method, uri, http_ver = chunk:match("^([A-Z]+) ([^ ]+) HTTP/([01]%.[019])$") @@ -156,7 +160,7 @@ process_states['magic'] = function( msg, chunk ) end end end - + -- Can't handle it return nil, "Invalid HTTP message magic" end @@ -522,6 +526,34 @@ process_states['urldecode-value'] = function( msg, chunk, filecb ) end +-- Creates a header source from a given socket +function header_source( sock ) + return ltn12.source.simplify( function() + + local chunk, err, part = sock:receive("*l") + + -- Line too long + if chunk == nil then + if err ~= "timeout" then + return nil, part + and "Line exceeds maximum allowed length["..part.."]" + or "Unexpected EOF" + else + return nil, err + end + + -- Line ok + elseif chunk ~= nil then + + -- Strip trailing CR + chunk = chunk:gsub("\r$","") + + return chunk, nil + end + end ) +end + + -- Decode MIME encoded data. function mimedecode_message_body( source, msg, filecb ) @@ -617,20 +649,6 @@ function urldecode_message_body( source, msg ) end --- Parse a http message -function parse_message( data, filecb ) - - local reader = _linereader( data, HTTP_MAX_READBUF ) - local message = parse_message_header( reader ) - - if message then - parse_message_body( reader, message, filecb ) - end - - return message -end - - -- Parse a http message header function parse_message_header( source ) @@ -673,7 +691,7 @@ function parse_message_header( source ) REQUEST_URI = msg.request_uri; SCRIPT_NAME = msg.request_uri:gsub("?.+$",""); SCRIPT_FILENAME = ""; -- XXX implement me - SERVER_PROTOCOL = "HTTP/" .. msg.http_version + SERVER_PROTOCOL = "HTTP/" .. string.format("%.1f", msg.http_version) } -- Populate HTTP_* environment variables @@ -702,19 +720,6 @@ end -- Parse a http message body function parse_message_body( source, msg, filecb ) - - -- Install an additional filter if we're operating on chunked transfer - -- coding and client is HTTP/1.1 capable - if msg.http_version == 1.1 and - msg.headers['Transfer-Encoding'] and - msg.headers['Transfer-Encoding']:find("chunked") - then - source = ltn12.source.chain( - source, luci.http.protocol.filter.decode_chunked - ) - end - - -- Is it multipart/mime ? if msg.env.REQUEST_METHOD == "POST" and msg.env.CONTENT_TYPE and msg.env.CONTENT_TYPE:match("^multipart/form%-data") @@ -771,33 +776,14 @@ function parse_message_body( source, msg, filecb ) end end - --- Push a response to a socket -function push_response(request, response, sourceout, sinkout, sinkerr) - local code = response.status - sinkout(request.env.SERVER_PROTOCOL .. " " .. code .. " " .. statusmsg[code] .. "\r\n") - - -- FIXME: Add support for keep-alive - response.headers["Connection"] = "close" - - for k,v in pairs(response.headers) do - sinkout(k .. ": " .. v .. "\r\n") - end - - sinkout("\r\n") - - if sourceout then - ltn12.pump.all(sourceout, sinkout) - end -end - - -- Status codes statusmsg = { [200] = "OK", [400] = "Bad Request", [403] = "Forbidden", [404] = "Not Found", + [405] = "Method Not Allowed", + [411] = "Length Required", [500] = "Internal Server Error", [503] = "Server Unavailable", } diff --git a/libs/httpd/luasrc/httpd.lua b/libs/httpd/luasrc/httpd.lua index 6524bc1de..a9b1ccbb4 100644 --- a/libs/httpd/luasrc/httpd.lua +++ b/libs/httpd/luasrc/httpd.lua @@ -27,13 +27,59 @@ function Socket(ip, port) return sock, err end +Thread = luci.util.class() + +function Thread.__init__(self, socket, func) + self.socket = socket + self.routine = coroutine.create(func) + self.stamp = os.time() + self.waiting = false +end + +function Thread.getidletime(self) + return os.difftime(os.time(), self.stamp) +end + +function Thread.iswaiting(self) + return self.waiting +end + +function Thread.receive(self, ...) + local chunk, err, part + self.waiting = true + + repeat + coroutine.yield() + chunk, err, part = self.socket:receive(...) + until err ~= "timeout" + + self.waiting = false + return chunk, err, part +end + +function Thread.resume(self, ...) + return coroutine.resume(self.routine, self, ...) +end + +function Thread.status(self) + return coroutine.status(self.routine) +end + +function Thread.touch(self) + self.stamp = os.time() +end Daemon = luci.util.class() function Daemon.__init__(self, threadlimit, timeout) self.reading = {} - self.running = {} + self.threads = {} self.handler = {} + self.waiting = {} + self.threadc = 0 + + setmetatable(self.waiting, {__mode = "v"}) + self.debug = false self.threadlimit = threadlimit self.timeout = timeout or 0.1 @@ -58,10 +104,7 @@ end function Daemon.step(self) local input, output, err = socket.select( self.reading, nil, 0 ) - - if err == "timeout" and #self.running == 0 then - socket.sleep(self.timeout) - end + local working = false -- accept new connections for i, connection in ipairs(input) do @@ -70,19 +113,18 @@ function Daemon.step(self) if sock then -- check capacity - if not self.threadlimit or #self.running < self.threadlimit then + if not self.threadlimit or self.threadc < self.threadlimit then if self.debug then self:dprint("Accepted incoming connection from " .. sock:getpeername()) end - - table.insert( self.running, { - coroutine.create( self.handler[connection].clhandler ), - sock - } ) + + local t = Thread(sock, self.handler[connection].clhandler) + self.threads[sock] = t + self.threadc = self.threadc + 1 if self.debug then - self:dprint("Created " .. tostring(self.running[#self.running][1])) + self:dprint("Created " .. tostring(t)) end -- reject client @@ -101,27 +143,62 @@ function Daemon.step(self) end -- create client handler - for i, client in ipairs( self.running ) do + for sock, thread in pairs( self.threads ) do -- reap dead clients - if coroutine.status( client[1] ) == "dead" then + if thread:status() == "dead" then if self.debug then - self:dprint("Completed " .. tostring(client[1])) + self:dprint("Completed " .. tostring(thread)) end - table.remove( self.running, i ) - else + sock:close() + self.threadc = self.threadc - 1 + self.threads[sock] = nil + -- resume working threads + elseif not thread:iswaiting() then if self.debug then - self:dprint("Resuming " .. tostring(client[1])) + self:dprint("Resuming " .. tostring(thread)) end - local stat, err = coroutine.resume( client[1], client[2] ) + local stat, err = thread:resume() + if stat then + thread:touch() + if not thread:iswaiting() then + working = true + else + table.insert(self.waiting, sock) + end + end if self.debug then - self:dprint(tostring(client[1]) .. " returned") + self:dprint(tostring(thread) .. " returned") if not stat then - self:dprint("Error in " .. tostring(client[1]) .. " " .. err) + self:dprint("Error in " .. tostring(thread) .. " " .. err) + end + end + end + end + + -- check for data on waiting threads + input, output, err = socket.select( self.waiting, nil, 0 ) + + for i, sock in ipairs(input) do + self.threads[sock]:resume() + self.threads[sock]:touch() + + if not self.threads[sock]:iswaiting() then + for i, s in ipairs(self.waiting) do + if s == sock then + table.remove(self.waiting, i) + break end end + if not working then + working = true + end end end + + if err == "timeout" and not working then + socket.sleep(self.timeout) + end end diff --git a/libs/httpd/luasrc/httpd/handler/file.lua b/libs/httpd/luasrc/httpd/handler/file.lua index 83549f338..8d75edc9a 100644 --- a/libs/httpd/luasrc/httpd/handler/file.lua +++ b/libs/httpd/luasrc/httpd/handler/file.lua @@ -11,18 +11,28 @@ function Simple.__init__(self, docroot) self.docroot = docroot end -function Simple.handle(self, request, sourcein, sinkerr) - local uri = request.env.PATH_INFO +function Simple.getfile(self, uri) local file = self.docroot .. uri:gsub("%.%./", "") local stat = luci.fs.stat(file) + + return file, stat +end + +function Simple.handle_get(self, request, sourcein, sinkerr) + local file, stat = self:getfile(request.env.PATH_INFO) if stat then if stat.type == "regular" then return Response(200, {["Content-Length"] = stat.size}), ltn12.source.file(io.open(file)) else - return self:failure(403, "Unable to transmit " .. stat.type .. " " .. uri) + return self:failure(403, "Unable to transmit " .. stat.type .. " " .. request.env.PATH_INFO) end else return self:failure(404, "No such file: " .. uri) end +end + +function Simple.handle_head(self, ...) + local response, sourceout = self:handle_get(...) + return response end \ No newline at end of file diff --git a/libs/httpd/luasrc/httpd/handler/luci.lua b/libs/httpd/luasrc/httpd/handler/luci.lua index e4916bd2c..35f832d45 100644 --- a/libs/httpd/luasrc/httpd/handler/luci.lua +++ b/libs/httpd/luasrc/httpd/handler/luci.lua @@ -10,7 +10,16 @@ function Luci.__init__(self) luci.httpd.module.Handler.__init__(self) end -function Luci.handle(self, request, sourcein, sinkerr) +function Luci.handle_head(self, ...) + local response, sourceout = self:handle_get(...) + return response +end + +function Luci.handle_post(self, ...) + return self:handle_get(...) +end + +function Luci.handle_get(self, request, sourcein, sinkerr) local r = luci.http.Request( request.env, sourcein, @@ -22,7 +31,7 @@ function Luci.handle(self, request, sourcein, sinkerr) local status = 200 local x = coroutine.create(luci.dispatcher.httpdispatch) - while id < 3 do + while not id or id < 3 do coroutine.yield() res, id, data1, data2 = coroutine.resume(x, r) @@ -45,6 +54,8 @@ function Luci.handle(self, request, sourcein, sinkerr) local res, id, data = coroutine.resume(x) if not res then return nil, id + elseif not id then + return true elseif id == 5 then return nil else diff --git a/libs/httpd/luasrc/httpd/module.lua b/libs/httpd/luasrc/httpd/module.lua index c321856a8..46cfa54ff 100644 --- a/libs/httpd/luasrc/httpd/module.lua +++ b/libs/httpd/luasrc/httpd/module.lua @@ -24,6 +24,7 @@ Handler = luci.util.class() -- Constructor function Handler.__init__(self) self.filters = {} + self.handler = {} end @@ -41,9 +42,10 @@ function Handler.failure(self, code, message) return response, sourceout end - -- Processes a request -function Handler.process(self, request, sourcein, sinkout, sinkerr) +function Handler.process(self, request, sourcein, sinkerr, ...) + local stat, response, sourceout + -- Process incoming filters for i, f in ipairs(self.filters) do local i = f:get("input") @@ -57,14 +59,20 @@ function Handler.process(self, request, sourcein, sinkout, sinkerr) end end - -- Run the handler - local stat, response, sourceout = luci.util.copcall( - self.handle, self, request, sourcein, sinkerr - ) + -- Detect request Method + local hname = "handle_" .. request.request_method + if self[hname] then + -- Run the handler + stat, response, sourceout = luci.util.copcall( + self[hname], self, request, sourcein, sinkerr, ... + ) - -- Check for any errors - if not stat then - response, sourceout = self:failure(500, response) + -- Check for any errors + if not stat then + response, sourceout = self:failure(500, response) + end + else + response, sourceout = self:failure(405, luci.http.protocol.statusmsg[405]) end -- Check data @@ -85,7 +93,7 @@ function Handler.process(self, request, sourcein, sinkout, sinkerr) end end - luci.http.protocol.push_response(request, response, sourceout, sinkout, sinkerr) + return response, sourceout end diff --git a/libs/httpd/luasrc/httpd/server.lua b/libs/httpd/luasrc/httpd/server.lua index 90fdd7ed3..181ca24a1 100644 --- a/libs/httpd/luasrc/httpd/server.lua +++ b/libs/httpd/luasrc/httpd/server.lua @@ -14,6 +14,8 @@ $Id$ ]]-- module("luci.httpd.server", package.seeall) +require("socket") +require("socket.http") require("luci.util") READ_BUFSIZE = 1024 @@ -26,7 +28,7 @@ function VHost.__init__(self, handler) self.dhandler = {} end -function VHost.process(self, request, sourcein, sinkout, sinkerr) +function VHost.process(self, request, sourcein, sinkerr, ...) local handler = self.handler local uri = request.env.REQUEST_URI:match("^([^?]*)") @@ -47,10 +49,7 @@ function VHost.process(self, request, sourcein, sinkout, sinkerr) end if handler then - handler:process(request, sourcein, sinkout, sinkerr) - return true - else - return false + return handler:process(request, sourcein, sinkerr, ...) end end @@ -69,8 +68,6 @@ end Server = luci.util.class() function Server.__init__(self, host) - self.clhandler = client_handler - self.errhandler = error503 self.host = host self.vhosts = {} end @@ -86,129 +83,153 @@ end function Server.create_daemon_handlers(self) return function(...) return self:process(...) end, - function(...) return self:error503(...) end + function(...) return self:error_overload(...) end end -function Server.create_client_sources(self, client) - -- Create LTN12 block source - local block_source = function() - -- Yielding here may cause chaos in coroutine based modules, be careful - -- coroutine.yield() - - local chunk, err, part = client:receive( READ_BUFSIZE ) - - if chunk == nil and err == "timeout" then - return part - elseif chunk ~= nil then - return chunk - else - return nil, err - end - - end - - - -- Create LTN12 line source - local line_source = ltn12.source.simplify( function() - - coroutine.yield() - - local chunk, err, part = client:receive("*l") - - -- Line too long - if chunk == nil and err ~= "timeout" then - - return nil, part - and "Line exceeds maximum allowed length["..part.."]" - or "Unexpected EOF" - - -- Line ok - elseif chunk ~= nil then - - -- Strip trailing CR - chunk = chunk:gsub("\r$","") - - -- We got end of headers, switch to dummy source - if #chunk == 0 then - return "", function() - return nil - end - else - return chunk, nil - end - end - end ) - - return block_source, line_source -end - - -function Server.error400(self, socket, msg) - socket:send( "HTTP/1.0 400 Bad request\r\n" ) - socket:send( "Content-Type: text/plain\r\n\r\n" ) - - if msg then - socket:send( msg .. "\r\n" ) - end - - socket:close() -end - -function Server.error500(self, socket, msg) - socket:send( "HTTP/1.0 500 Internal Server Error\r\n" ) +function Server.error(self, socket, code, msg) + hcode = tostring(code) + + socket:send( "HTTP/1.1 " .. hcode .. " " .. + luci.http.protocol.statusmsg[code] .. "\r\n" ) + socket:send( "Connection: close\r\n" ) socket:send( "Content-Type: text/plain\r\n\r\n" ) if msg then - socket:send( msg .. "\r\n" ) + socket:send( "HTTP-Error " .. code .. ": " .. msg .. "\r\n" ) end - - socket:close() end -function Server.error503(self, socket) - socket:send( "HTTP/1.0 503 Server unavailable\r\n" ) - socket:send( "Content-Type: text/plain\r\n\r\n" ) - socket:send( "There are too many clients connected, try again later\r\n" ) - socket:close() +function Server.error_overload(self, socket) + self:error(socket, 503, "Too many simultaneous connections") end -function Server.process(self, client) +function Server.process( self, thread ) + -- Setup sockets and sources + local client = thread.socket client:settimeout( 0 ) - local sourcein, sourcehdr = self:create_client_sources(client) - local sinkerr = ltn12.sink.file(io.stderr) - - -- FIXME: Add keep-alive support - local sinkout = socket.sink("close-when-done", client) - - coroutine.yield() - - -- parse headers - local message, err = luci.http.protocol.parse_message_header( sourcehdr ) - - if message then - -- If we have a HTTP/1.1 client and an Expect: 100-continue header then - -- respond with HTTP 100 Continue message - if message.http_version == 1.1 and message.headers['Expect'] and - message.headers['Expect'] == '100-continue' - then - client:send("HTTP/1.1 100 Continue\r\n\r\n") + local sourcein = ltn12.source.empty() + local sourcehdr = luci.http.protocol.header_source( thread ) + local sinkerr = ltn12.sink.file( io.stderr ) + + local close = false + + local reading = { client } + + local message, err + + socket.sleep(5) + + repeat + -- parse headers + message, err = luci.http.protocol.parse_message_header( sourcehdr ) + + if not message then + self:error( client, 400, err ) + break + end + + coroutine.yield() + + -- keep-alive + if message.http_version == 1.1 then + close = (message.env.HTTP_CONNECTION == "close") + else + close = not message.env.HTTP_CONNECTION or message.env.HTTP_CONNECTION == "close" end - - local host = self.vhosts[message.env.HTTP_HOST] or self.host - if host then - if host:process(message, sourcein, sinkout, sinkerr) then - sinkout() + + if message.request_method == "get" or message.request_method == "head" then + -- Be happy + + elseif message.request_method == "post" then + -- If we have a HTTP/1.1 client and an Expect: 100-continue header then + -- respond with HTTP 100 Continue message + if message.http_version == 1.1 and message.headers['Expect'] and + message.headers['Expect'] == '100-continue' + then + client:send("HTTP/1.1 100 Continue\r\n\r\n") + end + + if message.headers['Transfer-Encoding'] and + message.headers['Transfer-Encoding'] ~= "identity" then + sourcein = socket.source("http-chunked", thread) + elseif message.env.CONTENT_LENGTH then + sourcein = socket.source("by-length", thread, + tonumber(message.env.CONTENT_LENGTH)) else - self:error500( client, "No suitable path handler found" ) + self:error( client, 411, luci.http.protocol.statusmsg[411] ) + break; end + else - self:error500( client, "No suitable host handler found" ) + self:error( client, 405, luci.http.protocol.statusmsg[405] ) + break; + end - else - self:error400( client, err ) - return nil - end + + + local host = self.vhosts[message.env.HTTP_HOST] or self.host + if not host then + self:error( client, 500, "Unable to find matching host" ) + break; + end + + coroutine.yield() + + local response, sourceout = host:process( + message, sourcein, sinkerr, + client, io.stderr + ) + if not response then + self:error( client, 500, "Error processing handler" ) + end + + coroutine.yield() + + -- Post process response + local sinkmode = close and "close-when-done" or "keep-open" + + if sourceout then + if not response.headers["Content-Length"] then + if message.http_version == 1.1 then + response.headers["Transfer-Encoding"] = "chunked" + sinkmode = "http-chunked" + else + close = true + sinkmode = "close-when-done" + end + end + end + + if close then + response.headers["Connection"] = "close" + end + + + local sinkout = socket.sink(sinkmode, client) + + local header = + message.env.SERVER_PROTOCOL .. " " .. + tostring(response.status) .. " " .. + luci.http.protocol.statusmsg[response.status] .. "\r\n" + + + for k,v in pairs(response.headers) do + header = header .. k .. ": " .. v .. "\r\n" + end + + client:send(header .. "\r\n") + + if sourceout then + local eof = false + repeat + coroutine.yield() + eof = not ltn12.pump.step(sourceout, sinkout) + until eof + end + until close + + client:close() end diff --git a/libs/web/luasrc/http.lua b/libs/web/luasrc/http.lua index bb05c680b..37050e478 100644 --- a/libs/web/luasrc/http.lua +++ b/libs/web/luasrc/http.lua @@ -7,9 +7,6 @@ HTTP-Header manipulator and form variable preprocessor FileId: $Id$ -ToDo: -- Cookie handling - License: Copyright 2008 Steven Barth @@ -51,22 +48,14 @@ function Request.__init__(self, env, sourcein, sinkerr) params = luci.http.protocol.urldecode_params(env.QUERY_STRING or ""), } - setmetatable(self.message.params, {__index = - function(tbl, key) - setmetatable(tbl, nil) - - luci.http.protocol.parse_message_body( - self.input, - self.message, - self.filehandler - ) - - return rawget(tbl, key) - end - }) + self.parsed_input = false end function Request.formvalue(self, name, default) + if not self.parsed_input then + self:_parse_input() + end + if name then return self.message.params[name] and tostring(self.message.params[name]) or default else @@ -78,6 +67,10 @@ function Request.formvaluetable(self, prefix) local vals = {} prefix = prefix and prefix .. "." or "." + if not self.parsed_input then + self:_parse_input() + end + local void = self.message.params[nil] for k, v in pairs(self.message.params) do if k:find(prefix, 1, true) == 1 then @@ -88,6 +81,13 @@ function Request.formvaluetable(self, prefix) return vals end +function Request.getcookie(self, name) + local c = string.gsub(";" .. (self:getenv("HTTP_COOKIE") or "") .. ";", "%s*;%s*", ";") + local p = ";" .. name .. "=(.-);" + local i, j, value = cookies:find(p) + return value and urldecode(value) +end + function Request.getenv(self, name) if name then return self.message.env[name] @@ -100,6 +100,15 @@ function Request.setfilehandler(self, callback) self.filehandler = callback end +function Request._parse_input(self) + luci.http.protocol.parse_message_body( + self.input, + self.message, + self.filehandler + ) + self.parsed_input = true +end + function close() if not context.eoh then @@ -177,18 +186,6 @@ function write(content) coroutine.yield(4, content) end - -function basic_auth(realm, errorpage) - header("Status", "401 Unauthorized") - header("WWW-Authenticate", string.format('Basic realm="%s"', realm or "")) - - if errorpage then - errorpage() - end - - close() -end - function redirect(url) header("Status", "302 Found") header("Location", url)