]> gitweb.factorcode.org Git - factor.git/commitdiff
i/o refactoring continues
authorSlava Pestov <slava@factorcode.org>
Mon, 16 Aug 2004 01:50:44 +0000 (01:50 +0000)
committerSlava Pestov <slava@factorcode.org>
Mon, 16 Aug 2004 01:50:44 +0000 (01:50 +0000)
17 files changed:
build.sh
library/cross-compiler.factor
library/platform/native/io-internals.factor
library/platform/native/stream.factor
native/error.c
native/error.h
native/factor.c
native/fd.c
native/fd.h
native/file.c
native/iomux.c
native/iomux.h
native/port.c
native/port.h
native/primitives.c
native/primitives.h
native/socket.c

index 2980a4bbedbef142d710dc36295764963d7f5f61..c81edfb0f4eab3977a5d789a9ecd86f2c2c77916 100644 (file)
--- a/build.sh
+++ b/build.sh
@@ -1,6 +1,4 @@
-export CC=gcc34
-export CFLAGS="-lm -pedantic -Wall -Winline -O3 -march=pentium4 -fomit-frame-pointer"
+export CC=gcc
+export CFLAGS="-lm -g -Wall -Wno-long-long -Wno-inline"
 
 $CC $CFLAGS -o f native/*.c
-
-strip f
index 4113a6087876aef91063ea841268fb4bfbd47259..ba65d5c6d765f0f9eb142fb42eebe07f98b38c7d 100644 (file)
@@ -67,9 +67,13 @@ DEFER: open-file
 DEFER: server-socket
 DEFER: close-fd
 DEFER: accept-fd
+DEFER: can-read-line?
+DEFER: add-read-line-io-task
 DEFER: read-line-fd-8
+DEFER: can-write?
+DEFER: add-write-io-task
 DEFER: write-fd-8
-DEFER: flush-fd
+DEFER: next-io-task
 
 IN: parser
 DEFER: str>float
@@ -211,9 +215,13 @@ IN: cross-compiler
         server-socket
         close-fd
         accept-fd
+        can-read-line?
+        add-read-line-io-task
         read-line-fd-8
+        can-write?
+        add-write-io-task
         write-fd-8
-        flush-fd
+        next-io-task
         room
         os-env
         millis
index 45483ba85de7e1a5662b0ae0c45aec9c5e2f438b..10752b266b36f4ad90c299e66348bca57eb6ac8a 100644 (file)
 ! ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
 IN: io-internals
+USE: combinators
+USE: continuations
 USE: kernel
 USE: namespaces
-USE: combinators
+USE: stack
+USE: strings
 
 : stdin 0 getenv ;
 : stdout 1 getenv ;
 : stderr 2 getenv ;
+
+: flush-fd ( port -- )
+    [ swap add-write-io-task next-io-task drop ( call ) ] callcc0 ;
+
+: wait-to-write ( len port -- )
+    tuck can-write? [ drop ] [ flush-fd ] ifte ;
+
+: blocking-write ( str port -- )
+    over
+    dup string? [ str-length ] [ drop 1 ] ifte
+    over wait-to-write write-fd-8 ;
+
+: fill-fd ( port -- )
+    [ swap add-read-line-io-task next-io-task drop ( call ) ] callcc0 ;
+
+: wait-to-read-line ( port -- )
+    dup can-read-line? [ drop ] [ fill-fd ] ifte ;
+
+: blocking-read-line ( port -- line )
+    dup wait-to-read-line read-line-fd-8 dup [ sbuf>str ] when ;
index 0863eef01eb530356a859a8c5ae56fda04417b87..563a279804aae425adefe2b8168654c4898700dc 100644 (file)
@@ -27,6 +27,7 @@
 
 IN: streams
 USE: combinators
+USE: continuations
 USE: io-internals
 USE: errors
 USE: kernel
@@ -44,39 +45,32 @@ USE: namespaces
         "in" set
 
         ( str -- )
-        [ "out" get write-fd-8 ] "fwrite" set
+        [ "out" get blocking-write ] "fwrite" set
         
         ( -- str )
-        [
-            "in" get read-line-fd-8
-        ] "freadln" set
+        [ "in" get dup [ blocking-read-line ] when ] "freadln" set
         
         ( -- )
-        [
-            "out" get [ flush-fd ] when*
-        ] "fflush" set
+        [ "out" get [ flush-fd ] when* ] "fflush" set
         
         ( -- )
         [
             "in" get [ close-fd ] when*
-            "out" get [ close-fd ] when*
+            "out" get [ dup flush-fd close-fd ] when*
         ] "fclose" set
     ] extend ;
 
-: <file-stream> ( path read? write? -- stream )
-    open-file dup <fd-stream> ;
-
 : <filecr> ( path -- stream )
-    t f <file-stream> ;
+    t f open-file f <fd-stream> ;
 
 : <filecw> ( path -- stream )
-    f t <file-stream> ;
+    f t open-file f swap <fd-stream> ;
 
 : <filebr> ( path -- stream )
-    t f <file-stream> ;
+    <filecr> ;
 
 : <filebw> ( path -- stream )
-    f t <file-stream> ;
+    <filecw> ;
 
 : <server> ( port -- stream )
     #! Starts listening on localhost:port. Returns a stream that
index 7166a532485411b3b36706b9db225379e493e29f..e8d1b23eea25e55db1bbf258e7164bf5479848dd 100644 (file)
@@ -2,13 +2,13 @@
 
 void fatal_error(char* msg, CELL tagged)
 {
-       printf("Fatal error: %s %ld\n",msg,tagged);
+       fprintf(stderr,"Fatal error: %s %ld\n",msg,tagged);
        exit(1);
 }
 
 void critical_error(char* msg, CELL tagged)
 {
-       printf("Critical error: %s %ld\n",msg,tagged);
+       fprintf(stderr,"Critical error: %s %ld\n",msg,tagged);
        save_image("factor.crash.image");
        exit(1);
 }
@@ -59,13 +59,3 @@ void range_error(CELL tagged, CELL index, CELL max)
                tag_cons(cons(tag_fixnum(max),F)))));
        general_error(ERROR_RANGE,tag_cons(c));
 }
-
-void io_error(const char* func)
-{
-       STRING* function = from_c_string(func);
-       STRING* error = from_c_string(strerror(errno));
-
-       CONS* c = cons(tag_object(function),tag_cons(
-               cons(tag_object(error),F)));
-       general_error(ERROR_IO,tag_cons(c));
-}
index a8e779e54877c555a8cb6e73f298b7430203c7db..a86ad3dab46b24b346479b3903526c0f7fa9e290 100644 (file)
@@ -15,4 +15,3 @@ void throw_error(CELL object);
 void general_error(CELL error, CELL tagged);
 void type_error(CELL type, CELL tagged);
 void range_error(CELL tagged, CELL index, CELL max);
-void io_error(const char* func);
index eaa9a037094196f069586b7a11775ebaf923569e..a7944e418e6aa014eb7d7dee53d5895d43811c58 100644 (file)
@@ -13,6 +13,7 @@ int main(int argc, char** argv)
        init_arena(DEFAULT_ARENA);
        load_image(argv[1]);
        init_stacks();
+       init_iomux();
        init_io();
 
        run();
index 0316acb192ef5a27d1dae60b187f17437996f998..8fab55b72f98d0cdb7a81177d8ae34281d834dec 100644 (file)
@@ -2,12 +2,20 @@
 
 void init_io(void)
 {
-       env.user[STDIN_ENV]  = port(0);
-       set_nonblocking(0);
-       env.user[STDOUT_ENV] = port(1);
-       set_nonblocking(1);
-       env.user[STDERR_ENV] = port(2);
-       /* set_nonblocking(2); */
+       env.user[STDIN_ENV]  = tag_object(port(0));
+       env.user[STDOUT_ENV] = tag_object(port(1));
+       env.user[STDERR_ENV] = tag_object(port(2));
+}
+
+bool can_read_line(PORT* port)
+{
+       return false;
+}
+
+void primitive_can_read_line(void)
+{
+       PORT* port = untag_port(dpop());
+       dpush(tag_boolean(can_read_line(port)));
 }
 
 /* Return true if something was read */
@@ -20,7 +28,7 @@ bool read_step(PORT* port)
        if(amount == -1)
        {
                if(errno != EAGAIN)
-                       io_error(__FUNCTION__);
+                       io_error(port,__FUNCTION__);
                return false;
        }
        else
@@ -31,21 +39,12 @@ bool read_step(PORT* port)
        }
 }
 
-READLINE_STAT read_line_step(PORT* port)
+bool read_line_step(PORT* port)
 {
        int i;
        char ch;
 
-       SBUF* line = port->line;
-
-       if(port->buf_pos >= port->buf_fill)
-       {
-               if(!read_step(port))
-                       return READLINE_WAIT;
-
-               if(port->buf_fill == 0)
-                       return READLINE_EOF;
-       }
+       SBUF* line = untag_sbuf(port->line);
 
        for(i = port->buf_pos; i < port->buf_fill; i++)
        {
@@ -53,7 +52,7 @@ READLINE_STAT read_line_step(PORT* port)
                if(ch == '\n')
                {
                        port->buf_pos = i + 1;
-                       return READLINE_EOL;
+                       return true;
                }
                else
                        set_sbuf_nth(line,line->top,ch);
@@ -63,43 +62,15 @@ READLINE_STAT read_line_step(PORT* port)
 
        /* We've reached the end of the above loop, without seeing a newline
        or EOF, so read again */
-       return READLINE_AGAIN;
+       return false;
 }
 
 void primitive_read_line_fd_8(void)
 {
        PORT* port = untag_port(dpeek());
-       SBUF* line;
-       READLINE_STAT state;
-
-       init_buffer(port,B_READ);
-       if(port->line == NULL)
-               port->line = sbuf(LINE_SIZE);
-       else
-               port->line->top = 0;
-       line = port->line;
-
-       add_io_task(IO_TASK_READ_LINE,port,F);
-
-       for(;;)
-       {
-               state = read_line_step(port);
-               if(state == READLINE_WAIT)
-                       iomux();
-               else if(state == READLINE_EOF && line->top == 0)
-               {
-                       /* didn't read anything before EOF */
-                       drepl(F);
-                       break;
-               }
-               else if(state == READLINE_EOL)
-               {
-                       drepl(tag_object(sbuf_to_string(line)));
-                       break;
-               }
-       }
+       drepl(port->line);
+       port->line = F;
 
-       remove_io_task(IO_TASK_READ_LINE,port);
 }
 
 /* Return true if write was done */
@@ -113,7 +84,7 @@ bool write_step(PORT* port)
        if(amount == -1)
        {
                if(errno != EAGAIN)
-                       io_error(__FUNCTION__);
+                       io_error(port,__FUNCTION__);
                return false;
        }
        else
@@ -123,39 +94,48 @@ bool write_step(PORT* port)
        }
 }
 
-/* keep writing to the stream until everything is written */
-void flush_buffer(PORT* port)
+bool can_write(PORT* port, FIXNUM len)
 {
-       IO_TASK* task;
-       if(port->buf_mode != B_WRITE || port->buf_fill == 0)
-               return;
+       CELL buf_capacity;
 
-       task = add_io_task(IO_TASK_WRITE,port,F);
-
-       for(;;)
+       switch(port->buf_mode)
        {
-               if(port->buf_fill == port->buf_pos)
-                       break;
-
-               if(!write_step(port))
-                       iomux();
+       case B_NONE:
+               return true;
+       case B_READ_LINE:
+               return false;
+       case B_WRITE:
+               buf_capacity = port->buffer->capacity * CHARS;
+               /* Is the string longer than the buffer? */
+               if(port->buf_fill == 0 && len > buf_capacity)
+               {
+                       /* Increase the buffer to fit the string */
+                       port->buffer = allot_string(len / CHARS + 1);
+                       return true;
+               }
+               else
+                       return (port->buf_fill + len <= buf_capacity);
+       default:
+               critical_error("Bad buf_mode",port->buf_mode);
+               return false;
        }
+}
 
-       remove_io_task(IO_TASK_WRITE,port);
-
-       port->buf_pos = 0;
-       port->buf_fill = 0;
+void primitive_can_write(void)
+{
+       PORT* port = untag_port(dpop());
+       FIXNUM len = to_fixnum(dpop());
+       dpush(tag_boolean(can_write(port,len)));
 }
 
 void write_fd_char_8(PORT* port, FIXNUM ch)
 {
        char c = (char)ch;
 
-       init_buffer(port,B_WRITE);
+       if(!can_write(port,1))
+               io_error(port,__FUNCTION__);
 
-       /* Is the buffer full? */
-       if(port->buf_fill == port->buffer->capacity * CHARS)
-               flush_buffer(port);
+       init_buffer(port,B_WRITE);
 
        bput((CELL)port->buffer + sizeof(STRING) + port->buf_fill,c);
        port->buf_fill++;
@@ -163,24 +143,15 @@ void write_fd_char_8(PORT* port, FIXNUM ch)
 
 void write_fd_string_8(PORT* port, STRING* str)
 {
-       char* c_str = to_c_string(str);
+       char* c_str;
 
-       init_buffer(port,B_WRITE);
+       /* Note this ensures the buffer is large enough to fit the string */
+       if(!can_write(port,str->capacity))
+               io_error(port,__FUNCTION__);
 
-       /* Is the string longer than the buffer? */
-       if(str->capacity > port->buffer->capacity * CHARS)
-       {
-               flush_buffer(port);
+       init_buffer(port,B_WRITE);
 
-               /* Increase the buffer to fit the string */
-               port->buffer = allot_string(str->capacity / CHARS + 1);
-       }
-       /* Is there enough room in the buffer? If not, flush */
-       if(port->buf_fill + str->capacity
-               > port->buffer->capacity * CHARS)
-       {
-               flush_buffer(port);
-       }
+       c_str = to_c_string(str);
 
        /* Append string to buffer */
        memcpy((void*)((CELL)port->buffer + sizeof(STRING)
@@ -211,21 +182,20 @@ void primitive_write_fd_8(void)
        }
 }
 
-void primitive_flush_fd(void)
-{
-       PORT* port = untag_port(dpop());
-       flush_buffer(port);
-}
-
 void primitive_close_fd(void)
 {
+       /* This does not flush. */
        PORT* port = untag_port(dpop());
-       flush_buffer(port);
        close(port->fd);
 }
 
-void set_nonblocking(int fd)
+void io_error(PORT* port, const char* func)
 {
-       if(fcntl(fd,F_SETFL,O_NONBLOCK,1) == -1)
-               io_error(__FUNCTION__);
+       STRING* function = from_c_string(func);
+       STRING* error = from_c_string(strerror(errno));
+
+       CONS* c = cons(tag_object(function),tag_cons(
+               cons(tag_object(error),F)));
+
+       general_error(ERROR_IO,tag_cons(c));
 }
index 03ecc8056227712af3098687745f137c2f4ab2d6..631306a73f1191f57751b055d844e2962dc0b682 100644 (file)
@@ -1,24 +1,19 @@
 #define LINE_SIZE 80
 #define BUF_SIZE (32 * 1024)
 
+bool can_read_line(PORT* port);
+void primitive_can_read_line(void);
 bool read_step(PORT* port);
+bool read_line_step(PORT* port);
 
-/* read_line_step() return values */
-typedef enum {
-       READLINE_WAIT, /* means we have to wait for more I/O */
-       READLINE_AGAIN,
-       READLINE_EOL,
-       READLINE_EOF
-} READLINE_STAT;
-
-READLINE_STAT read_line_step(PORT* port);
 bool write_step(PORT* port);
 void flush_buffer(PORT* port);
 void init_io(void);
 void primitive_read_line_fd_8(void);
+bool can_write(PORT* port, FIXNUM len);
+void primitive_can_write(void);
 void write_fd_char_8(PORT* port, FIXNUM ch);
 void write_fd_string_8(PORT* port, STRING* str);
 void primitive_write_fd_8(void);
-void primitive_flush_fd(void);
 void primitive_close_fd(void);
-void set_nonblocking(int fd);
+void io_error(PORT* port, const char* func);
index e48706efc138d4f154fb4eaba173cb3153be887f..249e9dfd12a020bfea0cb18ba0986042be77bd47 100644 (file)
@@ -9,17 +9,17 @@ void primitive_open_file(void)
        int fd;
 
        if(read && write)
-               mode = O_RDWR | O_CREAT | O_NONBLOCK;
+               mode = O_RDWR | O_CREAT;
        else if(read)
-               mode = O_RDONLY | O_NONBLOCK;
+               mode = O_RDONLY;
        else if(write)
-               mode = O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK;
+               mode = O_WRONLY | O_CREAT | O_TRUNC;
        else
-               mode = O_NONBLOCK;
+               mode = 0;
 
        fd = open(path,mode);
        if(fd < 0)
-               io_error(__FUNCTION__);
+               io_error(NULL,__FUNCTION__);
 
-       dpush(port(fd));
+       dpush(tag_object(port(fd)));
 }
index d489f8d038a2b3dc3cac701f7d1fa9b4debe424a..298a26d37d9efa17af15e3d263a66558367374d8 100644 (file)
@@ -7,8 +7,8 @@ void init_io_tasks(fd_set* fdset, IO_TASK* io_tasks)
        FD_ZERO(fdset);
        for(i = 0; i < FD_SETSIZE; i++)
        {
-               read_io_tasks[i].port = F;
-               read_io_tasks[i].callback = F;
+               io_tasks[i].port = F;
+               io_tasks[i].callback = F;
        }
 }
 
@@ -21,7 +21,7 @@ void init_iomux(void)
        init_io_tasks(&write_fd_set,write_io_tasks);
 }
 
-IO_TASK* add_io_task_impl(
+IO_TASK* add_io_task(
        IO_TASK_TYPE type,
        PORT* port,
        CELL callback,
@@ -30,6 +30,9 @@ IO_TASK* add_io_task_impl(
 {
        int fd = port->fd;
 
+       /* if(io_tasks[fd].port != F)
+               critical_error("Adding I/O task twice",fd); */
+
        io_tasks[fd].type = type;
        io_tasks[fd].port = tag_object(port);
        io_tasks[fd].callback = callback;
@@ -40,24 +43,23 @@ IO_TASK* add_io_task_impl(
        return &io_tasks[fd];
 }
 
-IO_TASK* add_io_task(IO_TASK_TYPE type, PORT* port, CELL callback)
+void primitive_add_read_line_io_task(void)
 {
-       switch(type)
-       {
-       case IO_TASK_READ_LINE:
-       case IO_TASK_READ_COUNT:
-               return add_io_task_impl(type,port,callback,
-                       read_io_tasks,&read_fd_count);
-       case IO_TASK_WRITE:
-               return add_io_task_impl(type,port,callback,
-                       write_io_tasks,&write_fd_count);
-       default:
-               fatal_error("Invalid IO_TASK_TYPE",type);
-               return NULL;
-       }
+       PORT* port = untag_port(dpop());
+       CELL callback = dpop();
+       add_io_task(IO_TASK_READ_LINE,port,callback,
+               read_io_tasks,&read_fd_count);
 }
 
-void remove_io_task_impl(
+void primitive_add_write_io_task(void)
+{
+       PORT* port = untag_port(dpop());
+       CELL callback = dpop();
+       add_io_task(IO_TASK_WRITE,port,callback,
+               write_io_tasks,&write_fd_count);
+}
+
+void remove_io_task(
        IO_TASK_TYPE type,
        PORT* port,
        IO_TASK* io_tasks,
@@ -72,64 +74,139 @@ void remove_io_task_impl(
                *fd_count = *fd_count - 1;
 }
 
-void remove_io_task(IO_TASK_TYPE type, PORT* port)
+void remove_io_tasks(PORT* port)
 {
-       switch(type)
+       remove_io_task(IO_TASK_READ_LINE,port,
+               read_io_tasks,&read_fd_count);
+       remove_io_task(IO_TASK_WRITE,port,
+               write_io_tasks,&write_fd_count);
+}
+
+bool set_up_fd_set(fd_set* fdset, int fd_count, IO_TASK* io_tasks)
+{
+       bool retval = false;
+       int i;
+
+       FD_ZERO(fdset);
+
+       for(i = 0; i < fd_count; i++)
        {
-       case IO_TASK_READ_LINE:
-       case IO_TASK_READ_COUNT:
-               remove_io_task_impl(type,port,read_io_tasks,&read_fd_count);
-               break;
-       case IO_TASK_WRITE:
-               remove_io_task_impl(type,port,write_io_tasks,&write_fd_count);
-               break;
+               if(typep(PORT_TYPE,io_tasks[i].port))
+               {
+                       retval = true;
+                       FD_SET(i,fdset);
+               }
+       }
+       
+       return retval;
+}
+
+bool perform_read_line_io_task(PORT* port)
+{
+       init_buffer(port,B_READ_LINE);
+       if(port->buf_pos >= port->buf_fill)
+       {
+               if(!read_step(port))
+                       return false;
+       }
+
+       if(port->buf_fill == 0)
+       {
+               /* EOF */
+               port->line = F;
+               return true;
        }
+       else
+               return read_line_step(port);
 }
 
-void perform_io_task(IO_TASK* task)
+bool perform_write_io_task(PORT* port)
 {
-       if(task->port == F)
-               return;
+       init_buffer(port,B_WRITE);
+       if(write_step(port))
+       {
+               if(port->buf_pos == port->buf_fill)
+               {
+                       /* All written */
+                       port->buf_pos = 0;
+                       port->buf_fill = 0;
+                       return true;
+               }
+       }
+       return false;
+}
 
+CELL perform_io_task(IO_TASK* task)
+{
+       PORT* port = untag_port(task->port);
+       CELL callback = task->callback;
        switch(task->type)
        {
        case IO_TASK_READ_LINE:
-               
-               break;
+               remove_io_task(IO_TASK_READ_LINE,port,
+                       read_io_tasks,&read_fd_count);
+               if(perform_read_line_io_task(port))
+                       return callback;
+               else
+               {
+                       add_io_task(IO_TASK_READ_LINE,port,
+                               callback,read_io_tasks,
+                               &read_fd_count);
+                       return F;
+               }
        case IO_TASK_WRITE:
-               write_step(untag_port(task->port));
-               break;
+               remove_io_task(IO_TASK_WRITE,port,
+                       write_io_tasks,&write_fd_count);
+               if(perform_write_io_task(port))
+                       return callback;
+               else
+               {
+                       add_io_task(IO_TASK_WRITE,port,
+                               callback,write_io_tasks,
+                               &write_fd_count);
+                       return F;
+               }
        default:
                critical_error("Bad I/O task",task->type);
-               break;
+               return F;
        }
 }
 
-bool set_up_fd_set(fd_set* fdset, IO_TASK* io_tasks)
+CELL perform_io_tasks(fd_set* fdset, int fd_count, IO_TASK* io_tasks)
 {
-       bool retval = false;
-       
        int i;
-       for(i = 0; i < read_fd_count; i++)
+       CELL callback;
+
+       for(i = 0; i < fd_count; i++)
        {
-               if(read_io_tasks[i].port != F)
+               if(FD_ISSET(i,fdset))
                {
-                       retval = true;
-                       FD_SET(i,&read_fd_set);
+                       if(io_tasks[i].port == F)
+                               critical_error("select() returned fd for non-existent task",i);
+                       else
+                       {
+                               callback = perform_io_task(&io_tasks[i]);
+                               if(callback != F)
+                                       return callback;
+                       }
                }
        }
-       
-       return retval;
+
+       return F;
 }
 
 /* Wait for I/O and return a callback. */
-CELL iomux(void)
+CELL next_io_task(void)
 {
-       bool reading = set_up_fd_set(&read_fd_set,read_io_tasks);
-       bool writing = set_up_fd_set(&write_fd_set,write_io_tasks);
+       bool reading = set_up_fd_set(&read_fd_set,
+               read_fd_count,read_io_tasks);
+       bool writing = set_up_fd_set(&write_fd_set,
+               write_fd_count,write_io_tasks);
+
+       CELL callback;
 
        if(!reading && !writing)
-               fatal_error("iomux() called with no IO tasks",0);
+               critical_error("next_io_task() called with no IO tasks",0);
 
        select(read_fd_count > write_fd_count
                ? read_fd_count : write_fd_count,
@@ -137,13 +214,16 @@ CELL iomux(void)
                (writing ? &write_fd_set : NULL),
                NULL,NULL);
 
-       /* for(i = 0; i < read_fd_count; i++)
-               perform_io_task(&read_io_tasks[i]);
+       callback = perform_io_tasks(&read_fd_set,read_fd_count,read_io_tasks);
+       if(callback != F)
+               return callback;
 
-       for(i = 0; i < write_fd_count; i++)
-               perform_io_task(&write_io_tasks[i]); */
+       return perform_io_tasks(&write_fd_set,write_fd_count,write_io_tasks);
+}
 
-       return F;
+void primitive_next_io_task(void)
+{
+       dpush(next_io_task());
 }
 
 void collect_io_tasks(void)
index d81668a2e17aff746ff56f92eed86a752641ff5a..d0544e53a1da20b56604596a0ac66d41e19885c6 100644 (file)
@@ -20,20 +20,25 @@ int write_fd_count;
 
 void init_io_tasks(fd_set* fd_set, IO_TASK* io_tasks);
 void init_iomux(void);
-IO_TASK* add_io_task_impl(
+IO_TASK* add_io_task(
        IO_TASK_TYPE type,
        PORT* port,
        CELL callback,
        IO_TASK* io_tasks,
        int* fd_count);
-IO_TASK* add_io_task(IO_TASK_TYPE type, PORT* port, CELL callback);
-void remove_io_task_impl(
+void primitive_add_read_line_io_task(void);
+void primitive_add_write_io_task(void);
+void remove_io_task(
        IO_TASK_TYPE type,
        PORT* port,
        IO_TASK* io_tasks,
        int* fd_count);
-void remove_io_task(IO_TASK_TYPE type, PORT* port);
-void perform_io_task(IO_TASK* task);
-bool set_up_fd_set(fd_set* fdset, IO_TASK* io_tasks);
-CELL iomux(void);
+void remove_io_tasks(PORT* port);
+bool set_up_fd_set(fd_set* fdset, int fd_count, IO_TASK* io_tasks);
+bool perform_read_line_io_task(PORT* port);
+bool perform_write_io_task(PORT* port);
+CELL perform_io_task(IO_TASK* task);
+CELL perform_io_tasks(fd_set* fdset, int fd_count, IO_TASK* io_tasks);
+CELL next_io_task(void);
+void primitive_next_io_task(void);
 void collect_io_tasks(void);
index 1f074066cf3b8ed6a9a50583c876814b98b36f9e..b5ba244b360ea2b7c803ed89e960674168d362e2 100644 (file)
@@ -11,16 +11,20 @@ PORT* untag_port(CELL tagged)
        return p;
 }
 
-CELL port(CELL fd)
+PORT* port(CELL fd)
 {
        PORT* port = allot_object(PORT_TYPE,sizeof(PORT));
        port->fd = fd;
        port->buffer = NULL;
-       port->line = NULL;
+       port->line = F;
        port->buf_mode = B_NONE;
        port->buf_fill = 0;
        port->buf_pos = 0;
-       return tag_object(port);
+
+       if(fcntl(port->fd,F_SETFL,O_NONBLOCK,1) == -1)
+               io_error(port,__FUNCTION__);
+
+       return port;
 }
 
 void primitive_portp(void)
@@ -37,6 +41,16 @@ void init_buffer(PORT* port, int mode)
        {
                port->buf_fill = port->buf_pos = 0;
                port->buf_mode = mode;
+
+               if(mode == B_READ_LINE)
+                       port->line = tag_object(sbuf(LINE_SIZE));
+       }
+       else if(port->buf_mode == B_READ_LINE)
+       {
+               if(port->line == F)
+                       port->line = tag_object(sbuf(LINE_SIZE));
+               else
+                       untag_sbuf(port->line)->top = 0;
        }
 }
 
@@ -45,20 +59,12 @@ void fixup_port(PORT* port)
        port->fd = -1;
        if(port->buffer != 0)
                port->buffer = fixup_untagged_string(port->buffer);
-       if(port->line != 0)
-       {
-               port->line = (SBUF*)((CELL)port->line
-                       + (active->base - relocation_base));
-       }
+       fixup(&port->line);
 }
 
 void collect_port(PORT* port)
 {
        if(port->buffer != 0)
                port->buffer = copy_untagged_string(port->buffer);
-       if(port->line != 0)
-       {
-               port->line = (SBUF*)copy_untagged_object(
-                       port->line,sizeof(SBUF));
-       }
+       copy_object(&port->line);
 }
index 3d230f8ed92de88ac433031aa5ff36d3faa34571..d9497dfb91ac34006a7bc9e6b56063f3ea40693c 100644 (file)
@@ -1,12 +1,12 @@
 /* Buffer mode */
-typedef enum { B_READ, B_WRITE, B_NONE } B_MODE;
+typedef enum { B_READ_LINE, B_WRITE, B_NONE } B_MODE;
 
 typedef struct {
        CELL header;
        FIXNUM fd;
        STRING* buffer;
-       /* partial line used by read_line_fd */
-       SBUF* line;
+       /* tagged partial line used by read_line_fd */
+       CELL line;
        /* one of B_READ, B_WRITE or B_NONE */
        B_MODE buf_mode;
        /* top of buffer */
@@ -16,7 +16,7 @@ typedef struct {
 } PORT;
 
 PORT* untag_port(CELL tagged);
-CELL port(CELL fd);
+PORT* port(CELL fd);
 void init_buffer(PORT* port, int mode);
 void primitive_portp(void);
 void fixup_port(PORT* port);
index 411b0e94b4de098f66e9bebf83d6902375f25139..577261c934fc39403cb79fe6a9f9d6760f077a27 100644 (file)
@@ -120,9 +120,13 @@ XT primitives[] = {
        primitive_server_socket,
        primitive_close_fd,
        primitive_accept_fd,
+       primitive_can_read_line,
+       primitive_add_read_line_io_task,
        primitive_read_line_fd_8,
+       primitive_can_write,
+       primitive_add_write_io_task,
        primitive_write_fd_8,
-       primitive_flush_fd,
+       primitive_next_io_task,
        primitive_room,
        primitive_os_env,
        primitive_millis,
index 7ed7c17cf9f784774a6d1586b469c8796d338730..97c6cd005723ee3fe9de876255469c96ea0ca852 100644 (file)
@@ -1,4 +1,4 @@
 extern XT primitives[];
-#define PRIMITIVE_COUNT 129
+#define PRIMITIVE_COUNT 133
 
 CELL primitive_to_xt(CELL primitive);
index cdf630c8c1f5bf59e1a772a3f200ffcf6bd1a28f..4d4195aaab5990914f41604858a607791b617b10 100644 (file)
@@ -11,11 +11,11 @@ int make_server_socket(CHAR port)
        sock = socket(PF_INET, SOCK_STREAM, 0);
        
        if(sock < 0)
-               io_error(__FUNCTION__);
+               io_error(NULL,__FUNCTION__);
        
        /* Reuse port number */
        if(setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&reuseaddr,sizeof(int)) < 0)
-               io_error(__FUNCTION__);
+               io_error(NULL,__FUNCTION__);
        
        /* Give the socket a name */
        name.sin_family = AF_INET;
@@ -24,14 +24,14 @@ int make_server_socket(CHAR port)
        if(bind(sock,(struct sockaddr *)&name, sizeof(name)) < 0)
        {
                close(sock);
-               io_error(__FUNCTION__);
+               io_error(NULL,__FUNCTION__);
        }
 
        /* Start listening for connections */
        if(listen(sock,1) < 0)
        {
                close(sock);
-               io_error(__FUNCTION__);
+               io_error(NULL,__FUNCTION__);
        }
 
        return sock;
@@ -40,7 +40,7 @@ int make_server_socket(CHAR port)
 void primitive_server_socket(void)
 {
        CHAR p = (CHAR)to_fixnum(dpop());
-       dpush(port(make_server_socket(p)));
+       dpush(tag_object(port(make_server_socket(p))));
 }
 
 int accept_connection(int sock)
@@ -50,9 +50,7 @@ int accept_connection(int sock)
        
        int new = accept(sock,(struct sockaddr *)&clientname,&size);
        if(new < 0)
-               io_error(__FUNCTION__);
-
-       set_nonblocking(new);
+               io_error(NULL,__FUNCTION__);
 
        printf("Connection from host %s, port %hd.\n",
                inet_ntoa(clientname.sin_addr),
@@ -64,5 +62,6 @@ int accept_connection(int sock)
 void primitive_accept_fd(void)
 {
        PORT* p = untag_port(dpop());
-       dpush(port(accept_connection(p->fd)));
+       PORT* new = port(accept_connection(p->fd));
+       dpush(tag_object(new));
 }