From: Slava Pestov Date: Mon, 16 Aug 2004 01:50:44 +0000 (+0000) Subject: i/o refactoring continues X-Git-Tag: release-0-62~8 X-Git-Url: https://gitweb.factorcode.org/gitweb.cgi?p=factor.git;a=commitdiff_plain;h=171c89059768e791109de48c8228438cac0cacc3 i/o refactoring continues --- diff --git a/build.sh b/build.sh index 2980a4bbed..c81edfb0f4 100644 --- a/build.sh +++ b/build.sh @@ -1,6 +1,4 @@ -export CC=gcc34 -export CFLAGS="-lm -pedantic -Wall -Winline -O3 -march=pentium4 -fomit-frame-pointer" +export CC=gcc +export CFLAGS="-lm -g -Wall -Wno-long-long -Wno-inline" $CC $CFLAGS -o f native/*.c - -strip f diff --git a/library/cross-compiler.factor b/library/cross-compiler.factor index 4113a60878..ba65d5c6d7 100644 --- a/library/cross-compiler.factor +++ b/library/cross-compiler.factor @@ -67,9 +67,13 @@ DEFER: open-file DEFER: server-socket DEFER: close-fd DEFER: accept-fd +DEFER: can-read-line? +DEFER: add-read-line-io-task DEFER: read-line-fd-8 +DEFER: can-write? +DEFER: add-write-io-task DEFER: write-fd-8 -DEFER: flush-fd +DEFER: next-io-task IN: parser DEFER: str>float @@ -211,9 +215,13 @@ IN: cross-compiler server-socket close-fd accept-fd + can-read-line? + add-read-line-io-task read-line-fd-8 + can-write? + add-write-io-task write-fd-8 - flush-fd + next-io-task room os-env millis diff --git a/library/platform/native/io-internals.factor b/library/platform/native/io-internals.factor index 45483ba85d..10752b266b 100644 --- a/library/platform/native/io-internals.factor +++ b/library/platform/native/io-internals.factor @@ -26,10 +26,33 @@ ! ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. IN: io-internals +USE: combinators +USE: continuations USE: kernel USE: namespaces -USE: combinators +USE: stack +USE: strings : stdin 0 getenv ; : stdout 1 getenv ; : stderr 2 getenv ; + +: flush-fd ( port -- ) + [ swap add-write-io-task next-io-task drop ( call ) ] callcc0 ; + +: wait-to-write ( len port -- ) + tuck can-write? [ drop ] [ flush-fd ] ifte ; + +: blocking-write ( str port -- ) + over + dup string? [ str-length ] [ drop 1 ] ifte + over wait-to-write write-fd-8 ; + +: fill-fd ( port -- ) + [ swap add-read-line-io-task next-io-task drop ( call ) ] callcc0 ; + +: wait-to-read-line ( port -- ) + dup can-read-line? [ drop ] [ fill-fd ] ifte ; + +: blocking-read-line ( port -- line ) + dup wait-to-read-line read-line-fd-8 dup [ sbuf>str ] when ; diff --git a/library/platform/native/stream.factor b/library/platform/native/stream.factor index 0863eef01e..563a279804 100644 --- a/library/platform/native/stream.factor +++ b/library/platform/native/stream.factor @@ -27,6 +27,7 @@ IN: streams USE: combinators +USE: continuations USE: io-internals USE: errors USE: kernel @@ -44,39 +45,32 @@ USE: namespaces "in" set ( str -- ) - [ "out" get write-fd-8 ] "fwrite" set + [ "out" get blocking-write ] "fwrite" set ( -- str ) - [ - "in" get read-line-fd-8 - ] "freadln" set + [ "in" get dup [ blocking-read-line ] when ] "freadln" set ( -- ) - [ - "out" get [ flush-fd ] when* - ] "fflush" set + [ "out" get [ flush-fd ] when* ] "fflush" set ( -- ) [ "in" get [ close-fd ] when* - "out" get [ close-fd ] when* + "out" get [ dup flush-fd close-fd ] when* ] "fclose" set ] extend ; -: ( path read? write? -- stream ) - open-file dup ; - : ( path -- stream ) - t f ; + t f open-file f ; : ( path -- stream ) - f t ; + f t open-file f swap ; : ( path -- stream ) - t f ; + ; : ( path -- stream ) - f t ; + ; : ( port -- stream ) #! Starts listening on localhost:port. Returns a stream that diff --git a/native/error.c b/native/error.c index 7166a53248..e8d1b23eea 100644 --- a/native/error.c +++ b/native/error.c @@ -2,13 +2,13 @@ void fatal_error(char* msg, CELL tagged) { - printf("Fatal error: %s %ld\n",msg,tagged); + fprintf(stderr,"Fatal error: %s %ld\n",msg,tagged); exit(1); } void critical_error(char* msg, CELL tagged) { - printf("Critical error: %s %ld\n",msg,tagged); + fprintf(stderr,"Critical error: %s %ld\n",msg,tagged); save_image("factor.crash.image"); exit(1); } @@ -59,13 +59,3 @@ void range_error(CELL tagged, CELL index, CELL max) tag_cons(cons(tag_fixnum(max),F))))); general_error(ERROR_RANGE,tag_cons(c)); } - -void io_error(const char* func) -{ - STRING* function = from_c_string(func); - STRING* error = from_c_string(strerror(errno)); - - CONS* c = cons(tag_object(function),tag_cons( - cons(tag_object(error),F))); - general_error(ERROR_IO,tag_cons(c)); -} diff --git a/native/error.h b/native/error.h index a8e779e548..a86ad3dab4 100644 --- a/native/error.h +++ b/native/error.h @@ -15,4 +15,3 @@ void throw_error(CELL object); void general_error(CELL error, CELL tagged); void type_error(CELL type, CELL tagged); void range_error(CELL tagged, CELL index, CELL max); -void io_error(const char* func); diff --git a/native/factor.c b/native/factor.c index eaa9a03709..a7944e418e 100644 --- a/native/factor.c +++ b/native/factor.c @@ -13,6 +13,7 @@ int main(int argc, char** argv) init_arena(DEFAULT_ARENA); load_image(argv[1]); init_stacks(); + init_iomux(); init_io(); run(); diff --git a/native/fd.c b/native/fd.c index 0316acb192..8fab55b72f 100644 --- a/native/fd.c +++ b/native/fd.c @@ -2,12 +2,20 @@ void init_io(void) { - env.user[STDIN_ENV] = port(0); - set_nonblocking(0); - env.user[STDOUT_ENV] = port(1); - set_nonblocking(1); - env.user[STDERR_ENV] = port(2); - /* set_nonblocking(2); */ + env.user[STDIN_ENV] = tag_object(port(0)); + env.user[STDOUT_ENV] = tag_object(port(1)); + env.user[STDERR_ENV] = tag_object(port(2)); +} + +bool can_read_line(PORT* port) +{ + return false; +} + +void primitive_can_read_line(void) +{ + PORT* port = untag_port(dpop()); + dpush(tag_boolean(can_read_line(port))); } /* Return true if something was read */ @@ -20,7 +28,7 @@ bool read_step(PORT* port) if(amount == -1) { if(errno != EAGAIN) - io_error(__FUNCTION__); + io_error(port,__FUNCTION__); return false; } else @@ -31,21 +39,12 @@ bool read_step(PORT* port) } } -READLINE_STAT read_line_step(PORT* port) +bool read_line_step(PORT* port) { int i; char ch; - SBUF* line = port->line; - - if(port->buf_pos >= port->buf_fill) - { - if(!read_step(port)) - return READLINE_WAIT; - - if(port->buf_fill == 0) - return READLINE_EOF; - } + SBUF* line = untag_sbuf(port->line); for(i = port->buf_pos; i < port->buf_fill; i++) { @@ -53,7 +52,7 @@ READLINE_STAT read_line_step(PORT* port) if(ch == '\n') { port->buf_pos = i + 1; - return READLINE_EOL; + return true; } else set_sbuf_nth(line,line->top,ch); @@ -63,43 +62,15 @@ READLINE_STAT read_line_step(PORT* port) /* We've reached the end of the above loop, without seeing a newline or EOF, so read again */ - return READLINE_AGAIN; + return false; } void primitive_read_line_fd_8(void) { PORT* port = untag_port(dpeek()); - SBUF* line; - READLINE_STAT state; - - init_buffer(port,B_READ); - if(port->line == NULL) - port->line = sbuf(LINE_SIZE); - else - port->line->top = 0; - line = port->line; - - add_io_task(IO_TASK_READ_LINE,port,F); - - for(;;) - { - state = read_line_step(port); - if(state == READLINE_WAIT) - iomux(); - else if(state == READLINE_EOF && line->top == 0) - { - /* didn't read anything before EOF */ - drepl(F); - break; - } - else if(state == READLINE_EOL) - { - drepl(tag_object(sbuf_to_string(line))); - break; - } - } + drepl(port->line); + port->line = F; - remove_io_task(IO_TASK_READ_LINE,port); } /* Return true if write was done */ @@ -113,7 +84,7 @@ bool write_step(PORT* port) if(amount == -1) { if(errno != EAGAIN) - io_error(__FUNCTION__); + io_error(port,__FUNCTION__); return false; } else @@ -123,39 +94,48 @@ bool write_step(PORT* port) } } -/* keep writing to the stream until everything is written */ -void flush_buffer(PORT* port) +bool can_write(PORT* port, FIXNUM len) { - IO_TASK* task; - if(port->buf_mode != B_WRITE || port->buf_fill == 0) - return; + CELL buf_capacity; - task = add_io_task(IO_TASK_WRITE,port,F); - - for(;;) + switch(port->buf_mode) { - if(port->buf_fill == port->buf_pos) - break; - - if(!write_step(port)) - iomux(); + case B_NONE: + return true; + case B_READ_LINE: + return false; + case B_WRITE: + buf_capacity = port->buffer->capacity * CHARS; + /* Is the string longer than the buffer? */ + if(port->buf_fill == 0 && len > buf_capacity) + { + /* Increase the buffer to fit the string */ + port->buffer = allot_string(len / CHARS + 1); + return true; + } + else + return (port->buf_fill + len <= buf_capacity); + default: + critical_error("Bad buf_mode",port->buf_mode); + return false; } +} - remove_io_task(IO_TASK_WRITE,port); - - port->buf_pos = 0; - port->buf_fill = 0; +void primitive_can_write(void) +{ + PORT* port = untag_port(dpop()); + FIXNUM len = to_fixnum(dpop()); + dpush(tag_boolean(can_write(port,len))); } void write_fd_char_8(PORT* port, FIXNUM ch) { char c = (char)ch; - init_buffer(port,B_WRITE); + if(!can_write(port,1)) + io_error(port,__FUNCTION__); - /* Is the buffer full? */ - if(port->buf_fill == port->buffer->capacity * CHARS) - flush_buffer(port); + init_buffer(port,B_WRITE); bput((CELL)port->buffer + sizeof(STRING) + port->buf_fill,c); port->buf_fill++; @@ -163,24 +143,15 @@ void write_fd_char_8(PORT* port, FIXNUM ch) void write_fd_string_8(PORT* port, STRING* str) { - char* c_str = to_c_string(str); + char* c_str; - init_buffer(port,B_WRITE); + /* Note this ensures the buffer is large enough to fit the string */ + if(!can_write(port,str->capacity)) + io_error(port,__FUNCTION__); - /* Is the string longer than the buffer? */ - if(str->capacity > port->buffer->capacity * CHARS) - { - flush_buffer(port); + init_buffer(port,B_WRITE); - /* Increase the buffer to fit the string */ - port->buffer = allot_string(str->capacity / CHARS + 1); - } - /* Is there enough room in the buffer? If not, flush */ - if(port->buf_fill + str->capacity - > port->buffer->capacity * CHARS) - { - flush_buffer(port); - } + c_str = to_c_string(str); /* Append string to buffer */ memcpy((void*)((CELL)port->buffer + sizeof(STRING) @@ -211,21 +182,20 @@ void primitive_write_fd_8(void) } } -void primitive_flush_fd(void) -{ - PORT* port = untag_port(dpop()); - flush_buffer(port); -} - void primitive_close_fd(void) { + /* This does not flush. */ PORT* port = untag_port(dpop()); - flush_buffer(port); close(port->fd); } -void set_nonblocking(int fd) +void io_error(PORT* port, const char* func) { - if(fcntl(fd,F_SETFL,O_NONBLOCK,1) == -1) - io_error(__FUNCTION__); + STRING* function = from_c_string(func); + STRING* error = from_c_string(strerror(errno)); + + CONS* c = cons(tag_object(function),tag_cons( + cons(tag_object(error),F))); + + general_error(ERROR_IO,tag_cons(c)); } diff --git a/native/fd.h b/native/fd.h index 03ecc80562..631306a73f 100644 --- a/native/fd.h +++ b/native/fd.h @@ -1,24 +1,19 @@ #define LINE_SIZE 80 #define BUF_SIZE (32 * 1024) +bool can_read_line(PORT* port); +void primitive_can_read_line(void); bool read_step(PORT* port); +bool read_line_step(PORT* port); -/* read_line_step() return values */ -typedef enum { - READLINE_WAIT, /* means we have to wait for more I/O */ - READLINE_AGAIN, - READLINE_EOL, - READLINE_EOF -} READLINE_STAT; - -READLINE_STAT read_line_step(PORT* port); bool write_step(PORT* port); void flush_buffer(PORT* port); void init_io(void); void primitive_read_line_fd_8(void); +bool can_write(PORT* port, FIXNUM len); +void primitive_can_write(void); void write_fd_char_8(PORT* port, FIXNUM ch); void write_fd_string_8(PORT* port, STRING* str); void primitive_write_fd_8(void); -void primitive_flush_fd(void); void primitive_close_fd(void); -void set_nonblocking(int fd); +void io_error(PORT* port, const char* func); diff --git a/native/file.c b/native/file.c index e48706efc1..249e9dfd12 100644 --- a/native/file.c +++ b/native/file.c @@ -9,17 +9,17 @@ void primitive_open_file(void) int fd; if(read && write) - mode = O_RDWR | O_CREAT | O_NONBLOCK; + mode = O_RDWR | O_CREAT; else if(read) - mode = O_RDONLY | O_NONBLOCK; + mode = O_RDONLY; else if(write) - mode = O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK; + mode = O_WRONLY | O_CREAT | O_TRUNC; else - mode = O_NONBLOCK; + mode = 0; fd = open(path,mode); if(fd < 0) - io_error(__FUNCTION__); + io_error(NULL,__FUNCTION__); - dpush(port(fd)); + dpush(tag_object(port(fd))); } diff --git a/native/iomux.c b/native/iomux.c index d489f8d038..298a26d37d 100644 --- a/native/iomux.c +++ b/native/iomux.c @@ -7,8 +7,8 @@ void init_io_tasks(fd_set* fdset, IO_TASK* io_tasks) FD_ZERO(fdset); for(i = 0; i < FD_SETSIZE; i++) { - read_io_tasks[i].port = F; - read_io_tasks[i].callback = F; + io_tasks[i].port = F; + io_tasks[i].callback = F; } } @@ -21,7 +21,7 @@ void init_iomux(void) init_io_tasks(&write_fd_set,write_io_tasks); } -IO_TASK* add_io_task_impl( +IO_TASK* add_io_task( IO_TASK_TYPE type, PORT* port, CELL callback, @@ -30,6 +30,9 @@ IO_TASK* add_io_task_impl( { int fd = port->fd; + /* if(io_tasks[fd].port != F) + critical_error("Adding I/O task twice",fd); */ + io_tasks[fd].type = type; io_tasks[fd].port = tag_object(port); io_tasks[fd].callback = callback; @@ -40,24 +43,23 @@ IO_TASK* add_io_task_impl( return &io_tasks[fd]; } -IO_TASK* add_io_task(IO_TASK_TYPE type, PORT* port, CELL callback) +void primitive_add_read_line_io_task(void) { - switch(type) - { - case IO_TASK_READ_LINE: - case IO_TASK_READ_COUNT: - return add_io_task_impl(type,port,callback, - read_io_tasks,&read_fd_count); - case IO_TASK_WRITE: - return add_io_task_impl(type,port,callback, - write_io_tasks,&write_fd_count); - default: - fatal_error("Invalid IO_TASK_TYPE",type); - return NULL; - } + PORT* port = untag_port(dpop()); + CELL callback = dpop(); + add_io_task(IO_TASK_READ_LINE,port,callback, + read_io_tasks,&read_fd_count); } -void remove_io_task_impl( +void primitive_add_write_io_task(void) +{ + PORT* port = untag_port(dpop()); + CELL callback = dpop(); + add_io_task(IO_TASK_WRITE,port,callback, + write_io_tasks,&write_fd_count); +} + +void remove_io_task( IO_TASK_TYPE type, PORT* port, IO_TASK* io_tasks, @@ -72,64 +74,139 @@ void remove_io_task_impl( *fd_count = *fd_count - 1; } -void remove_io_task(IO_TASK_TYPE type, PORT* port) +void remove_io_tasks(PORT* port) { - switch(type) + remove_io_task(IO_TASK_READ_LINE,port, + read_io_tasks,&read_fd_count); + remove_io_task(IO_TASK_WRITE,port, + write_io_tasks,&write_fd_count); +} + +bool set_up_fd_set(fd_set* fdset, int fd_count, IO_TASK* io_tasks) +{ + bool retval = false; + int i; + + FD_ZERO(fdset); + + for(i = 0; i < fd_count; i++) { - case IO_TASK_READ_LINE: - case IO_TASK_READ_COUNT: - remove_io_task_impl(type,port,read_io_tasks,&read_fd_count); - break; - case IO_TASK_WRITE: - remove_io_task_impl(type,port,write_io_tasks,&write_fd_count); - break; + if(typep(PORT_TYPE,io_tasks[i].port)) + { + retval = true; + FD_SET(i,fdset); + } + } + + return retval; +} + +bool perform_read_line_io_task(PORT* port) +{ + init_buffer(port,B_READ_LINE); + if(port->buf_pos >= port->buf_fill) + { + if(!read_step(port)) + return false; + } + + if(port->buf_fill == 0) + { + /* EOF */ + port->line = F; + return true; } + else + return read_line_step(port); } -void perform_io_task(IO_TASK* task) +bool perform_write_io_task(PORT* port) { - if(task->port == F) - return; + init_buffer(port,B_WRITE); + if(write_step(port)) + { + if(port->buf_pos == port->buf_fill) + { + /* All written */ + port->buf_pos = 0; + port->buf_fill = 0; + return true; + } + } + return false; +} +CELL perform_io_task(IO_TASK* task) +{ + PORT* port = untag_port(task->port); + CELL callback = task->callback; switch(task->type) { case IO_TASK_READ_LINE: - - break; + remove_io_task(IO_TASK_READ_LINE,port, + read_io_tasks,&read_fd_count); + if(perform_read_line_io_task(port)) + return callback; + else + { + add_io_task(IO_TASK_READ_LINE,port, + callback,read_io_tasks, + &read_fd_count); + return F; + } case IO_TASK_WRITE: - write_step(untag_port(task->port)); - break; + remove_io_task(IO_TASK_WRITE,port, + write_io_tasks,&write_fd_count); + if(perform_write_io_task(port)) + return callback; + else + { + add_io_task(IO_TASK_WRITE,port, + callback,write_io_tasks, + &write_fd_count); + return F; + } default: critical_error("Bad I/O task",task->type); - break; + return F; } } -bool set_up_fd_set(fd_set* fdset, IO_TASK* io_tasks) +CELL perform_io_tasks(fd_set* fdset, int fd_count, IO_TASK* io_tasks) { - bool retval = false; - int i; - for(i = 0; i < read_fd_count; i++) + CELL callback; + + for(i = 0; i < fd_count; i++) { - if(read_io_tasks[i].port != F) + if(FD_ISSET(i,fdset)) { - retval = true; - FD_SET(i,&read_fd_set); + if(io_tasks[i].port == F) + critical_error("select() returned fd for non-existent task",i); + else + { + callback = perform_io_task(&io_tasks[i]); + if(callback != F) + return callback; + } } } - - return retval; + + return F; } /* Wait for I/O and return a callback. */ -CELL iomux(void) +CELL next_io_task(void) { - bool reading = set_up_fd_set(&read_fd_set,read_io_tasks); - bool writing = set_up_fd_set(&write_fd_set,write_io_tasks); + bool reading = set_up_fd_set(&read_fd_set, + read_fd_count,read_io_tasks); + bool writing = set_up_fd_set(&write_fd_set, + write_fd_count,write_io_tasks); + + CELL callback; if(!reading && !writing) - fatal_error("iomux() called with no IO tasks",0); + critical_error("next_io_task() called with no IO tasks",0); select(read_fd_count > write_fd_count ? read_fd_count : write_fd_count, @@ -137,13 +214,16 @@ CELL iomux(void) (writing ? &write_fd_set : NULL), NULL,NULL); - /* for(i = 0; i < read_fd_count; i++) - perform_io_task(&read_io_tasks[i]); + callback = perform_io_tasks(&read_fd_set,read_fd_count,read_io_tasks); + if(callback != F) + return callback; - for(i = 0; i < write_fd_count; i++) - perform_io_task(&write_io_tasks[i]); */ + return perform_io_tasks(&write_fd_set,write_fd_count,write_io_tasks); +} - return F; +void primitive_next_io_task(void) +{ + dpush(next_io_task()); } void collect_io_tasks(void) diff --git a/native/iomux.h b/native/iomux.h index d81668a2e1..d0544e53a1 100644 --- a/native/iomux.h +++ b/native/iomux.h @@ -20,20 +20,25 @@ int write_fd_count; void init_io_tasks(fd_set* fd_set, IO_TASK* io_tasks); void init_iomux(void); -IO_TASK* add_io_task_impl( +IO_TASK* add_io_task( IO_TASK_TYPE type, PORT* port, CELL callback, IO_TASK* io_tasks, int* fd_count); -IO_TASK* add_io_task(IO_TASK_TYPE type, PORT* port, CELL callback); -void remove_io_task_impl( +void primitive_add_read_line_io_task(void); +void primitive_add_write_io_task(void); +void remove_io_task( IO_TASK_TYPE type, PORT* port, IO_TASK* io_tasks, int* fd_count); -void remove_io_task(IO_TASK_TYPE type, PORT* port); -void perform_io_task(IO_TASK* task); -bool set_up_fd_set(fd_set* fdset, IO_TASK* io_tasks); -CELL iomux(void); +void remove_io_tasks(PORT* port); +bool set_up_fd_set(fd_set* fdset, int fd_count, IO_TASK* io_tasks); +bool perform_read_line_io_task(PORT* port); +bool perform_write_io_task(PORT* port); +CELL perform_io_task(IO_TASK* task); +CELL perform_io_tasks(fd_set* fdset, int fd_count, IO_TASK* io_tasks); +CELL next_io_task(void); +void primitive_next_io_task(void); void collect_io_tasks(void); diff --git a/native/port.c b/native/port.c index 1f074066cf..b5ba244b36 100644 --- a/native/port.c +++ b/native/port.c @@ -11,16 +11,20 @@ PORT* untag_port(CELL tagged) return p; } -CELL port(CELL fd) +PORT* port(CELL fd) { PORT* port = allot_object(PORT_TYPE,sizeof(PORT)); port->fd = fd; port->buffer = NULL; - port->line = NULL; + port->line = F; port->buf_mode = B_NONE; port->buf_fill = 0; port->buf_pos = 0; - return tag_object(port); + + if(fcntl(port->fd,F_SETFL,O_NONBLOCK,1) == -1) + io_error(port,__FUNCTION__); + + return port; } void primitive_portp(void) @@ -37,6 +41,16 @@ void init_buffer(PORT* port, int mode) { port->buf_fill = port->buf_pos = 0; port->buf_mode = mode; + + if(mode == B_READ_LINE) + port->line = tag_object(sbuf(LINE_SIZE)); + } + else if(port->buf_mode == B_READ_LINE) + { + if(port->line == F) + port->line = tag_object(sbuf(LINE_SIZE)); + else + untag_sbuf(port->line)->top = 0; } } @@ -45,20 +59,12 @@ void fixup_port(PORT* port) port->fd = -1; if(port->buffer != 0) port->buffer = fixup_untagged_string(port->buffer); - if(port->line != 0) - { - port->line = (SBUF*)((CELL)port->line - + (active->base - relocation_base)); - } + fixup(&port->line); } void collect_port(PORT* port) { if(port->buffer != 0) port->buffer = copy_untagged_string(port->buffer); - if(port->line != 0) - { - port->line = (SBUF*)copy_untagged_object( - port->line,sizeof(SBUF)); - } + copy_object(&port->line); } diff --git a/native/port.h b/native/port.h index 3d230f8ed9..d9497dfb91 100644 --- a/native/port.h +++ b/native/port.h @@ -1,12 +1,12 @@ /* Buffer mode */ -typedef enum { B_READ, B_WRITE, B_NONE } B_MODE; +typedef enum { B_READ_LINE, B_WRITE, B_NONE } B_MODE; typedef struct { CELL header; FIXNUM fd; STRING* buffer; - /* partial line used by read_line_fd */ - SBUF* line; + /* tagged partial line used by read_line_fd */ + CELL line; /* one of B_READ, B_WRITE or B_NONE */ B_MODE buf_mode; /* top of buffer */ @@ -16,7 +16,7 @@ typedef struct { } PORT; PORT* untag_port(CELL tagged); -CELL port(CELL fd); +PORT* port(CELL fd); void init_buffer(PORT* port, int mode); void primitive_portp(void); void fixup_port(PORT* port); diff --git a/native/primitives.c b/native/primitives.c index 411b0e94b4..577261c934 100644 --- a/native/primitives.c +++ b/native/primitives.c @@ -120,9 +120,13 @@ XT primitives[] = { primitive_server_socket, primitive_close_fd, primitive_accept_fd, + primitive_can_read_line, + primitive_add_read_line_io_task, primitive_read_line_fd_8, + primitive_can_write, + primitive_add_write_io_task, primitive_write_fd_8, - primitive_flush_fd, + primitive_next_io_task, primitive_room, primitive_os_env, primitive_millis, diff --git a/native/primitives.h b/native/primitives.h index 7ed7c17cf9..97c6cd0057 100644 --- a/native/primitives.h +++ b/native/primitives.h @@ -1,4 +1,4 @@ extern XT primitives[]; -#define PRIMITIVE_COUNT 129 +#define PRIMITIVE_COUNT 133 CELL primitive_to_xt(CELL primitive); diff --git a/native/socket.c b/native/socket.c index cdf630c8c1..4d4195aaab 100644 --- a/native/socket.c +++ b/native/socket.c @@ -11,11 +11,11 @@ int make_server_socket(CHAR port) sock = socket(PF_INET, SOCK_STREAM, 0); if(sock < 0) - io_error(__FUNCTION__); + io_error(NULL,__FUNCTION__); /* Reuse port number */ if(setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&reuseaddr,sizeof(int)) < 0) - io_error(__FUNCTION__); + io_error(NULL,__FUNCTION__); /* Give the socket a name */ name.sin_family = AF_INET; @@ -24,14 +24,14 @@ int make_server_socket(CHAR port) if(bind(sock,(struct sockaddr *)&name, sizeof(name)) < 0) { close(sock); - io_error(__FUNCTION__); + io_error(NULL,__FUNCTION__); } /* Start listening for connections */ if(listen(sock,1) < 0) { close(sock); - io_error(__FUNCTION__); + io_error(NULL,__FUNCTION__); } return sock; @@ -40,7 +40,7 @@ int make_server_socket(CHAR port) void primitive_server_socket(void) { CHAR p = (CHAR)to_fixnum(dpop()); - dpush(port(make_server_socket(p))); + dpush(tag_object(port(make_server_socket(p)))); } int accept_connection(int sock) @@ -50,9 +50,7 @@ int accept_connection(int sock) int new = accept(sock,(struct sockaddr *)&clientname,&size); if(new < 0) - io_error(__FUNCTION__); - - set_nonblocking(new); + io_error(NULL,__FUNCTION__); printf("Connection from host %s, port %hd.\n", inet_ntoa(clientname.sin_addr), @@ -64,5 +62,6 @@ int accept_connection(int sock) void primitive_accept_fd(void) { PORT* p = untag_port(dpop()); - dpush(port(accept_connection(p->fd))); + PORT* new = port(accept_connection(p->fd)); + dpush(tag_object(new)); }