]> gitweb.factorcode.org Git - factor.git/commitdiff
multitasking httpd
authorSlava Pestov <slava@factorcode.org>
Tue, 24 Aug 2004 00:44:58 +0000 (00:44 +0000)
committerSlava Pestov <slava@factorcode.org>
Tue, 24 Aug 2004 00:44:58 +0000 (00:44 +0000)
TODO.FACTOR.txt
library/httpd/httpd.factor
library/test/threads.factor
native/io.c
native/io.h
native/write.c
native/write.h

index 6797f305f183f4ba76e6438ec8b33e9f14a42be0..65622b5d7068448f2e70d1f0fc7f122804def846 100644 (file)
@@ -32,7 +32,7 @@
 \r
 - add a socket timeout\r
 - don't allow multiple reads on the same port\r
-- multiple tasks should be able to write to the same port\r
+  (and don't hang when this happends!)\r
 - is the profiler using correct stack depth?\r
 - bignums\r
 - >lower, >upper for strings\r
@@ -41,7 +41,6 @@
 - sbuf-hashcode\r
 - vector-hashcode\r
 - irc: stack underflow?\r
-- ignore SIGPIPE\r
 - accept multi-line input in listener\r
 - gc call in the middle of some ops might affect callstack\r
 - better i/o scheduler\r
@@ -76,6 +75,8 @@
 \r
 + httpd:\r
 \r
+- quit responder breaks with multithreading\r
+- 'default responder' for when we go to root\r
 - file responder:\r
   - port to native\r
   - if a directory is requested and URL does not end with /, redirect\r
index 77407bdfca07dac9033570ebb2196709a4cea896..6e57dc3a2fd35563d53f8e05d720fe1ab7431072 100644 (file)
@@ -29,6 +29,7 @@ IN: httpd
 USE: arithmetic
 USE: combinators
 USE: errors
+USE: httpd-responder
 USE: kernel
 USE: lists
 USE: logging
@@ -38,8 +39,7 @@ USE: stack
 USE: stdio
 USE: streams
 USE: strings
-
-USE: httpd-responder
+USE: threads
 USE: url-encoding
 
 : httpd-log-stream ( -- stream )
@@ -92,8 +92,16 @@ USE: url-encoding
         [ default-error-handler drop ] when*
     ] catch ;
 
-: quit-flag ( -- ? )
-    global [ "httpd-quit" get ] bind ;
+: httpd-connection ( socket -- )
+    #! We're single-threaded in Java Factor, and
+    #! multi-threaded in CFactor.
+    java? [
+        httpd-client
+    ] [
+        [
+            httpd-client
+        ] in-thread drop
+    ] ifte ;
 
 : clear-quit-flag ( -- )
     global [ "httpd-quit" off ] bind ;
@@ -102,7 +110,7 @@ USE: url-encoding
     [
         quit-flag not
     ] [
-        dup accept httpd-client
+        dup dup accept httpd-connection
     ] while ;
 
 : (httpd) ( port -- )
index da6160df270ea6ce43ac491554a8b703c838311e..7a666ab49e4795b840a98686b1cd6ecea5163277 100644 (file)
@@ -12,5 +12,4 @@ USE: threads
 3 "x" set
 [ yield 2 "x" set ] in-thread
 [ 2 ] [ yield "x" get ] unit-test
-
-! [ flush ] in-thread flush
+[ ] [ [ flush ] in-thread flush ] unit-test
index 4d9eb02aa8e087307961fab3beb2ad45c730c157..e5ec21d0869622b7117cbe647cc2109461d38dae 100644 (file)
@@ -8,7 +8,7 @@ void init_io_tasks(fd_set* fdset, IO_TASK* io_tasks)
        for(i = 0; i < FD_SETSIZE; i++)
        {
                io_tasks[i].port = F;
-               io_tasks[i].callback = F;
+               io_tasks[i].callbacks = F;
        }
 }
 
@@ -35,12 +35,13 @@ IO_TASK* add_io_task(
 {
        int fd = port->fd;
 
-       if(io_tasks[fd].port != F)
+       if(io_tasks[fd].callbacks != F && type != IO_TASK_WRITE)
                general_error(ERROR_IO_TASK_TWICE,tag_object(port));
 
        io_tasks[fd].type = type;
        io_tasks[fd].port = tag_object(port);
-       io_tasks[fd].callback = callback;
+       io_tasks[fd].callbacks = tag_cons(cons(callback,
+               io_tasks[fd].callbacks));
 
        if(fd >= *fd_count)
                *fd_count = fd + 1;
@@ -65,7 +66,7 @@ void remove_io_task(
        int fd = port->fd;
 
        io_tasks[fd].port = F;
-       io_tasks[fd].callback = F;
+       io_tasks[fd].callbacks = F;
 
        if(fd == *fd_count - 1)
                *fd_count = *fd_count - 1;
@@ -98,73 +99,62 @@ bool set_up_fd_set(fd_set* fdset, int fd_count, IO_TASK* io_tasks)
        return retval;
 }
 
-CELL perform_io_task(IO_TASK* task)
+CELL pop_io_task_callback(
+       IO_TASK_TYPE type,
+       PORT* port,
+       IO_TASK* io_tasks,
+       int* fd_count)
 {
-       PORT* port = untag_port(task->port);
-       CELL callback = task->callback;
+       int fd = port->fd;
+       CONS* callbacks = untag_cons(io_tasks[fd].callbacks);
+       CELL callback = callbacks->car;
+       if(callbacks->cdr == F)
+               remove_io_task(type,port,io_tasks,fd_count);
+       else
+               io_tasks[fd].callbacks = callbacks->cdr;
+       return callback;
+}
 
-       switch(task->type)
+CELL perform_io_task(IO_TASK* io_task, IO_TASK* io_tasks, int* fd_count)
+{
+       bool success;
+       PORT* port = untag_port(io_task->port);
+
+       switch(io_task->type)
        {
        case IO_TASK_READ_LINE:
-               remove_io_task(IO_TASK_READ_LINE,port,
-                       read_io_tasks,&read_fd_count);
-               if(perform_read_line_io_task(port))
-                       return callback;
-               else
-               {
-                       add_io_task(IO_TASK_READ_LINE,port,
-                               callback,read_io_tasks,
-                               &read_fd_count);
-                       return F;
-               }
+               success = perform_read_line_io_task(port);
+               break;
        case IO_TASK_READ_COUNT:
-               remove_io_task(IO_TASK_READ_COUNT,port,
-                       read_io_tasks,&read_fd_count);
-               if(perform_read_count_io_task(port))
-                       return callback;
-               else
-               {
-                       add_io_task(IO_TASK_READ_COUNT,port,
-                               callback,read_io_tasks,
-                               &read_fd_count);
-                       return F;
-               }
+               success = perform_read_count_io_task(port);
+               break;
        case IO_TASK_WRITE:
-               remove_io_task(IO_TASK_WRITE,port,
-                       write_io_tasks,&write_fd_count);
-               if(perform_write_io_task(port))
-                       return callback;
-               else
-               {
-                       add_io_task(IO_TASK_WRITE,port,
-                               callback,write_io_tasks,
-                               &write_fd_count);
-                       return F;
-               }
+               success = perform_write_io_task(port);
+               break;
        case IO_TASK_ACCEPT:
-               remove_io_task(IO_TASK_ACCEPT,port,
-                       read_io_tasks,&read_fd_count);
-               if(accept_connection(port))
-                       return callback;
-               else
-               {
-                       add_io_task(IO_TASK_ACCEPT,port,
-                               callback,read_io_tasks,
-                               &read_fd_count);
-                       return F;
-               }
+               success = accept_connection(port);
+               break;
        default:
-               critical_error("Bad I/O task",task->type);
-               return F;
+               critical_error("Bad I/O task",io_task->type);
+               success = false;
+               break;
        }
+
+       if(success)
+       {
+               return pop_io_task_callback(io_task->type,port,
+                       io_tasks,fd_count);
+       }
+       else
+               return F;
 }
 
-CELL perform_io_tasks(fd_set* fdset, int fd_count, IO_TASK* io_tasks)
+CELL perform_io_tasks(fd_set* fdset, IO_TASK* io_tasks, int* fd_count)
 {
        int i;
        CELL callback;
 
-       for(i = 0; i < fd_count; i++)
+       for(i = 0; i < *fd_count; i++)
        {
                if(FD_ISSET(i,fdset))
                {
@@ -172,7 +162,8 @@ CELL perform_io_tasks(fd_set* fdset, int fd_count, IO_TASK* io_tasks)
                                critical_error("select() returned fd for non-existent task",i);
                        else
                        {
-                               callback = perform_io_task(&io_tasks[i]);
+                               callback = perform_io_task(&io_tasks[i],
+                                       io_tasks,fd_count);
                                if(callback != F)
                                        return callback;
                        }
@@ -203,11 +194,11 @@ CELL next_io_task(void)
        select(read_fd_count > write_fd_count ? read_fd_count : write_fd_count,
                &read_fd_set,&write_fd_set,&except_fd_set,NULL);
        
-       callback = perform_io_tasks(&read_fd_set,read_fd_count,read_io_tasks);
+       callback = perform_io_tasks(&read_fd_set,read_io_tasks,&read_fd_count);
        if(callback != F)
                return callback;
 
-       return perform_io_tasks(&write_fd_set,write_fd_count,write_io_tasks);
+       return perform_io_tasks(&write_fd_set,write_io_tasks,&write_fd_count);
 }
 
 void primitive_next_io_task(void)
@@ -229,12 +220,12 @@ void collect_io_tasks(void)
        for(i = 0; i < FD_SETSIZE; i++)
        {
                copy_object(&read_io_tasks[i].port);
-               copy_object(&read_io_tasks[i].callback);
+               copy_object(&read_io_tasks[i].callbacks);
        }
 
        for(i = 0; i < FD_SETSIZE; i++)
        {
                copy_object(&write_io_tasks[i].port);
-               copy_object(&write_io_tasks[i].callback);
+               copy_object(&write_io_tasks[i].callbacks);
        }
 }
index 462fa84b55f6c9fddf760c461d6cf37f5d65aca4..089e63ff600447afeb64070c920176f35c28bc4a 100644 (file)
@@ -8,7 +8,9 @@ typedef enum {
 typedef struct {
        IO_TASK_TYPE type;
        CELL port;
-       CELL callback;
+       /* TAGGED list of callbacks, or F */
+       /* Multiple callbacks per port are only permitted for IO_TASK_WRITE. */
+       CELL callbacks;
 } IO_TASK;
 
 fd_set read_fd_set;
@@ -36,9 +38,14 @@ void remove_io_task(
        IO_TASK* io_tasks,
        int* fd_count);
 void remove_io_tasks(PORT* port);
+CELL pop_io_task_callback(
+       IO_TASK_TYPE type,
+       PORT* port,
+       IO_TASK* io_tasks,
+       int* fd_count);
 bool set_up_fd_set(fd_set* fdset, int fd_count, IO_TASK* io_tasks);
-CELL perform_io_task(IO_TASK* task);
-CELL perform_io_tasks(fd_set* fdset, int fd_count, IO_TASK* io_tasks);
+CELL perform_io_task(IO_TASK* io_task, IO_TASK* io_tasks, int* fd_count);
+CELL perform_io_tasks(fd_set* fdset, IO_TASK* io_tasks, int* fd_count);
 CELL next_io_task(void);
 void primitive_next_io_task(void);
 void primitive_close(void);
index 8b7a47c8fe033bcea162d0c158beeb4319b361a9..3ce8116e7b5d431164c565a53c6d59834fe4a131 100644 (file)
@@ -1,7 +1,7 @@
 #include "factor.h"
 
 /* Return true if write was done */
-bool write_step(PORT* port)
+void write_step(PORT* port)
 {
        char* chars = (char*)port->buffer + sizeof(STRING);
 
@@ -11,18 +11,10 @@ bool write_step(PORT* port)
        if(amount == -1)
        {
                if(errno != EAGAIN)
-               {
                        postpone_io_error(port,__FUNCTION__);
-                       return true;
-               }
-               else
-                       return false;
        }
        else
-       {
                port->buf_pos += amount;
-               return true;
-       }
 }
 
 bool can_write(PORT* port, FIXNUM len)
@@ -69,18 +61,18 @@ void primitive_add_write_io_task(void)
 
 bool perform_write_io_task(PORT* port)
 {
-       if(write_step(port))
+       if(port->buf_pos == port->buf_fill || port->io_error != F)
        {
-               if(port->buf_pos == port->buf_fill || port->io_error != F)
-               {
-                       /* All written, or I/O error is preventing further
-                       transaction */
-                       port->buf_pos = 0;
-                       port->buf_fill = 0;
-                       return true;
-               }
+               /* Nothing to write */
+               port->buf_pos = 0;
+               port->buf_fill = 0;
+               return true;
+       }
+       else
+       {
+               write_step(port);
+               return false;
        }
-       return false;
 }
 
 void write_char_8(PORT* port, FIXNUM ch)
index ada141610777815ddc0f5518d6a1b2177584e41a..0efacb086ab207e4d41ae999e64f72d3c90127f6 100644 (file)
@@ -1,4 +1,4 @@
-bool write_step(PORT* port);
+void write_step(PORT* port);
 bool can_write(PORT* port, FIXNUM len);
 void primitive_can_write(void);
 void primitive_add_write_io_task(void);