Manual Reference Pages  - nettee (1)

NAME

nettee_cmd - the interface between nettee and programs run by it with the -process command line option.

CONTENTS

Description
Commands read by the child from stdin
Status messaages written by the child to stdout
RB functions
NIO Libraries
Overview of the Parent Interface
Overview of the Child Interface
Mask Bits
Example child program
Example parent program
Example script for running the example child program
Related Programs
Copyrights
License
Author

DESCRIPTION

The nettee -process option starts the specified command as a child process and then communicates with it through the simple protocol described here. In addition, the command may contain command line options. When -process is not specified nettee simply copies from the input to the output(s) and does not create or communicate with a child process. If -process is specified the command may modify the data stream and direct some parts to -next and others to -output. For instance, it can peel off a subset of data to the local machine and echo the rest down the chain. Additionally the command could create or destroy data.

The program started by -process communicates with nettee solely through stdin and stdout. Commands are read from stdin and status messages are written to stdout. Both of these are newline terminated strings. Status messages all begin with a number, shown as XX...XX, which is 16X hex format for a long long unsigned integer value. Cases must be as shown.

Commands read by the child from stdin

STATUS S
 
     Emit the current status at least every S seconds.
     These status messages are in ADDITION to any messages which might
        be sent due to events or other commands.  These messages allow
        the parent to determine if the child has failed.
     May appear once before START.  Otherwise, no additional status messages are sent.
     Return status READY or RUNNING.

SIZE N
     Set the IO buffer size, in bytes.
     If not provided defaults to 1048576.
     Must appear once before STREAMS. (Buffers may not be resized during a run.)
     Return status READY.

STREAMS IO
 
     Number of IO streams which will be created.
     Must be sent once before START.
     Return status READY.

IO I name mask
 
     Define input stream stream I from unix fd (if name is an integer)
       or named socket or file "name" otherwise.
     Mask bits are (one of: 1=In, 2=Out) PLUS (one of: 4=Local, 8=Network).
     Must appear once per IO stream, after STREAMS and before START.
     Return status READY.

LIMIO NR NW XR XW
 
     Set the miN/maX Read/Write parameters (all values are a number of bytes).
     NR  (miNRead) sets the minimum contiguous free space in an input ring buffer
       which must exist before a read will be attempted. Default 1.
     NW  (miNWrite) sets the minimum available data before a write will be attempted.
       Default 1.
     XR  (maXRead)  sets the maximum size of a single read  operation. Default  65536.
     XW  (maXWrite) sets the maximum size of a single write operation. Default  65536.
     Min values must be >=1 and <= buffer size.
     Max values must be >=1.
     May appear once before START.  Otherwise, the defaults are used.
     Return status READY.

START
     Begin or resume reading data
     Must appear after the IO Streams are configured..
     Return status RUNNING.

PAUSE
     Stop reading and writing data.
     Continue to  monitor stdin for further commands.
     May appear after START.
     Return status READY.

DISABLE I
 
     Stop reading or writing data on the indicated stream and continue running.
     Along with ERROR  messages sent up by the process allows nettee to implement
       Continue On [Network/Local][Read/Write] Failure.
       (Continue on Read Failure MAY makes sense with -process [but probably not].)
     May appear after START.
     Return status READY or RUNNING.

EXIT
     On receiving this signal the client must exit() immediately.
     May appear once at any time.
     Return status (NONE).

Status messaages written by the child to stdout

XXXXXXXXXXXXXXXX READY
 
     Ready to run @ bytes READ from all input streams.
     The cound does not include bytes from the command stream.
     Also emitted whenever a command is processed without error.

XXXXXXXXXXXXXXXX RUNNING
 
     Running @ bytes READ from all input streams.
     Response to a START, also may be a periodic status message.

XXXXXXXXXXXXXXXX FAILED type message
 
     A fatal event has been detected.  The child process exits after sending the message.
     Types are:
       COMMAND: the command was not understood or could not be carried out, for
         instance, an open() failed.
       DATA:    some problem was found in the input data.
       OTHER:   any other unrecoverable error, for instance, a failure to allocate more
         memory. 
     The message may contain one line of additional information.


XXXXXXXXXXXXXXXX DONE type stream
 
     Something completed.  If it is an IO stream then that stream has closed.
     Type + stream are:
       COMMAND 0: returns the number of commands processed.(from stream 0).
       IO      I: returns the number of bytes read or written on stream I.
       ALL     0: returns the total number of bytes written.

XXXXXXXXXXXXXXXX ERROR type stream
 
     A (possibly) recoverable IO error has taken place.
     Types are READ or WRITE.  The value returned is the number of
       bytes read/written on that stream.

RB functions

The files rb.c and rb.h contain ring buffer functions and the definitions for the RB symbols used in the examples shown in this man page and elsewhere in nettee. The ring buffer organization is essntially a variation of Simon Coooke’s "Bip Buffer" method, which was published here:

 http://www.codeproject.com/KB/IP/bipbuffer.aspx?df=100&forumid=14010&exp=0&select=494356

with the title "The Bip Buffer - The Circular Buffer with a Twist". The idea is to make most reads into the ring buffer continuous. Some of the functions accept a minread parameter. If at the end of an operation which moves data into the ring buffer the top of data is less than minread from the top of the data buffer, a gap is introduced. That way the next data transfer into the buffer can start at the bottom of the data buffer and will not need to be split. Changing the value of minread while a ring buffer is in use is not supported.

The RB_NEXUS data structure is:

typedef struct ring_buffer     RB_NEXUS;
struct ring_buffer {
  char *buf;                /* ring buffer data area                                 */
  int   blen;               /* amount of data held within  data buffer)              */
  int   data;               /* the start of the data in a ring buffer                */
  int   size;               /* maximum storage capacity of the buffer                */
  int   gap;                /* if not zero, indicates a gap <MINREAD at top of buffer*/
};

The RB functions are:

int rb_adjust(RB_NEXUS *rb, int count);
  Adjust the ring buffer as if count bytes had been removed from the trailing edge of the data region. Returns 0 for valid, -1 if count exceeds the number of bytes present.

void rb_clear(RB_NEXUS *rb, int fill);
  Fill a ring buffer with the supplied value (typically "0"). Reset the data, gap, and blen values to zero.

RB_NEXUS *rb_create(int size, int count);
  Create one or more empty ring buffers. Returns pointer to them. If it fails it cleans up those it already created, if any, and returns NULL.

void rb_delete(RB_NEXUS **rb, int count);
  Delete one or more empty ring buffers which were allocated in a block with rb_create. Releases all memory and returns a NULL pointer. The value of count must be the same for rb_delete as it was for the corresponding rb_create. That is, all buffers created by a ginle rb_create must be removed with a single rb_delete.

int rb_flat_memcmp(RB_NEXUS *s1, char *s2, int n);
  Compare up to n bytes of data in one ring buffer and one flat buffer.
Returns <0 if s1 < s2, >0 if s1 > s2, and 0 if s1 = s2.
If s1 and s2 are different sizes, and are identical up to the size of s1, and s1 is less than n characters, then -1 will be returned.
If s1 and s2 are the same size, and n is larger than that, then they will only be compared up to the amount of data that they contain.
If s2 is smaller than s1, and n is larger than s2, the result of the function is undefined. (The most likely outcome is a segfault.)
Note that comparison is done as unsigned char, even though the rb structure holds data as char.

int rb_rb_memcmp(RB_NEXUS *s1, RB_NEXUS *s2, int n);
  Compare up to n bytes of data in two ring buffers.
Returns <0 if s1 < s2, >0 if s1 > s2, and 0 if s1 = s2.
If either ring buffer pointer is invalid a fatal error occurs and the program exits.
If s1 has more data then s2, and are identical up to the size of s2, and s2 < n, then +1 will be returned.
If s1 has less data than s2, and are identical up to the size of s1, and s1 < n, then -1 will be returned.
If s1 and s2 are the same size, and n is larger than that, then they will only be compared up to the amount of data that they contain.
Note that comparison is done as unsigned char, even though the rb structure holds data as char.

int rb_from_flat(RB_NEXUS *dst, char *array, int n, int minread);
  Copy up to n bytes of data from a flat array to a ring buffer in 1 or 2 memcpy calls.
Returns the number of bytes transferred, which may be less than n.
If there is a problem with the input parameters -1 is returned.
The size, data, blen, and buf fields in the ring buffer must be valid or bad things will happen.
The blen field will be adjusted to reflect the added data
If the data transfer leaves the top of data within minread of the top of the data buffer a gap will be introduced, so that the next read will begin at the bottom of the data buffer.

int rb_memcpy(RB_NEXUS *dst, RB_NEXUS *src, int n, int mode,int minread);
  Copy up to n bytes of data from one ring buffer to another.
This MAY NOT be used to copy data from a buffer to itself!!!
Returns the number of bytes which were moved, which is limited by either how many more
will fit into dst or how many are available in src.
If mode is not zero the src ring buffer is modified to remove the copied bytes by
adjusting blen and data. The src data buffer is not modified.
If the data transfer leaves the top of data within minread of the top of the data buffer a gap will be introduced, so that the next read will begin at the bottom of the data buffer.

void rb_reset(RB_NEXUS *rb);
  Effectively empty a ring buffer by setting blen, data, and gap to zero, but do not clear contents of the buffer.

RB_NEXUS *rb_shadow_set_create(RB_NEXUS *rb, int count);
  Create a shadow set of a ring buffer. A shadow set is an array of ring buffers which have independent data, gap, blen, and size fields, but share the single data buffer of the ring buffer which they shadow. Shadow sets allow different parts of a program to access the common data buffer without interfering with each other. One member may be used to add new data to the common buffer while several others are simultaneously used to copy out existing data. No shadow member may be used to both add and remove data from the buffer in a single operation. If this is done there will be no way to reconcile the changes. After one operation has been performed on each shadow set member their independent data, gap, blen, and size fields must be reconciled with rb_shadow_set_reconcile before further operations are performed. If the size field of a shadow set member is set to 0 then that shadow set member will be ignored by rb_shadow_set_reconcile. This action would typically be taken when a data stream has closed - since there will be no further operations on that shadow set member, there is no need to reconcile its fields on each cycle.
If rb does not exist, or the shadow cannot be created, NULL is returned.

void rb_shadow_set_delete(RB_NEXUS **rb);
  Delete a ring buffer shadow set. All memory is released and the shadow set pointer rb is set to Null. The ring buffer from which the shadow set was made is not affected. However, its contents are typically invalid and rb_clear should be applied before it is used again.

int rb_shadow_set_reconcile(RB_NEXUS *rb, int count, int input, int reference);
  When operating on shadow sets in a given IO cycle one read into the buffer and N writes from the buffer may be carried out. The read adds data to the "top" of the buffer, and the writes copy variable amounts of data from the bottom. It is essential to reconcile the shadow set before any shadow set member has an IO operation repeated on it - failure to do so could result in the top of the buffer overwriting the bottom, or the top not being read because the buffer appeared "full" when it wasn’t. The number of members in the shadow set is count. The number of the member which added data is input. reference is the number of a member which was neither read nor written. It is used by rb_shadow_set_reconcile to determine what to change in all the other members.
The return value is -1 on failure, or the size of member with the most data on success.

int rb_to_flat(char *array, RB_NEXUS *src, int n, int mode);
  Copy n bytes of data from a ring buffer to a flat buffer. n may be more than the number of bytes stored in src, in which case the number of bytes present will be transferred. Returns the number of bytes transferred. If there is a problem with the input parameters -1 is returned. If mode is 0 the ring buffer is not modified, if nonzero the data which is copied is effectively removed from ring buffer by adjusting the data, blen, and gap fields.

int rb_to_flat_tail(char *array, RB_NEXUS *src, int n, int mode);
  Like rb_to_flat except the data is removed from the top of the data region instead of the bottom.

int rb_sanity(RB_NEXUS *rb, char *string);
  Sanity check the fields in rb. This is primarily for debugging. Returns 0 if the data structure is ok. Returns 1 and emits diagnostic information followed by string if the data structure is invalid.

NIO Libraries

The files nio.c and nio.h contain nettee command and IO functions as well as the NCMD symbols used in the examples. There are many more functions available than will be described here. The top level functions which the parent or child calls to enable the nettee command interface are described in the following sections.

These definitions are available for simplifying access to certain information associated with the IO streams and their assocated ring buffers. Most of these are used in the Example Child Program:

NCMD_RB_SIZE(A)  (ncmd->rb[A].size)                 /* rb total storage capacity of stream A    */
NCMD_RB_LEN(A)   (ncmd->rb[A].blen)                 /* rb bytes of data stored                  */
NCMD_RB_FREE(A)  (ncmd->rb[A].size - ncmd->rb[A].blen - ncmd->rb[A].gap)
                                                    /* rb bytes of storage available            */
NCMD_RB_CELL(A)  (&ncmd->rb[A])                     /* a pointer to the specified ncmd->rb cell */
NCMD_IO_FD(A)    (ncmd->io_fd[A])                   /* the fd associated with the stream        */
NCMD_LOOP_MASK   (ncmd->loop_mask)                  /* the loop mask                            */
NCMD_MINREAD     (ncmd->minread)                    /* the minread value                        */
NCMD_IO_TOTAL    (ncmd->num_total)                  /* the number of io streams                 */
NCMD_IS_IN(A)    (ncmd->io_type[A] & NCMD_FD_IN)    /* true for an input   stream               */
NCMD_IS_OUT(A)   (ncmd->io_type[A] & NCMD_FD_OUT)   /* true for an output  stream               */
NCMD_IS_NET(A)   (ncmd->io_type[A] & NCMD_FD_NET)   /* true for a  network stream               */
NCMD_IS_LOCAL(A) (ncmd->io_type[A] & NCMD_FD_LOCAL) /* true for a  local   stream               */
NCMD_DEBUG(B)    if(ncmd->debug){ B ; (void) fflush(stderr); }
                                                    /* conditionally emit a debug message       */

Overview of the Parent Interface

More complete details for how the parent controls the child are shown below in the Example Parent Program section. In brief, the parent calls three primary functions. The first is ncmd_p_init which creates the necessary data structure. The parent then opens whatever data streams the child will have access to and stores the information in this data structure. The parent sets up sockets to talk to the child and forks a process which runs the child. After the fork the parent runs ncmd_control_child which handles communications with the child. Finally, after the child exits, the parent may call ncmd_p_free to release the data structure. The primary functions called by the parent are:

void ncmd_p_free(NCMD_P_NEXUS *ncmd);
  Release the data associated with controlling the child. This is called after the child exits.

NCMD_P_NEXUS *ncmd_p_init(int io, int size, int minread, int minwrite, int maxread, int maxwrite, int misses, int interval, int conmask);
  Allocate a data structure which is used while controlling the child. The values for size, minread, minwrite, maxread, and maxwrite are described in the -buf_size (etc.) sections of the nettee man page. The child process may be configured to send heartbeat messages. The value of interval, which is a time in seconds, is passed to the child to set up this heartbeat. The parent looks for the hearbeat at the expected interval. If interval is zero the parent does not monitor and the child does not generate hearbeat messages. If the parent is monitoring and the expected heartbeat has not appeared in misses intervals the parent assumes that the child has crashed, locked up, or is for some other reason broken. The parent then attempts to kill the child and then exits.

The child may encounter read/write errors from/to local/network data streams. When it does so it will send a message to the parent:
XXXXXXXXXXXXXXXX ERROR [READ|WRITE] stream
What the parent does at the point is determined by the conmask (Continue ON) mask. First the parent determines if the designated stream was Read or Write, Local or Network. The parent knows this because it kept track of that information when it opened that stream, before passing the information to the child. If the corresponding bit in conmask is not set the parent will send the child an EXIT message. However if the bit is set it will instead send a START message. In the former case the child will exit immediately, in the latter it will resume processing, albeit without the failed data stream. It is up to the programmer to determine for each type of child process whether or not to continue on these types of errors. The defined conmask bits are:

NCMD_ERR_CONWF 0x0010  continue on network write failure
NCMD_ERR_COLWF 0x0020  continue on local   write failure
NCMD_ERR_CONRF 0x0040  continue on network read failure
NCMD_ERR_COLRF 0x0080  continue on local   read failure

All other bits are reserved.
int ncmd_control_child(NCMD_P_NEXUS *ncmd);
  This is function that keeps track of the child child process. The first thing it does is send the following messages to the child, using data already stored in the ncmd data structure:
     SIZE value                  ! buffer for IO streams
     LIMIO minread minwrite maxread maxwrite
     STATUS intervaL
     STREAMS x y                 ! x = all inputs, y = all outputs
     CLASSIFY pdata in next out  !
     READ 0 fd1                  ! assign input data streams to fd’s
     (etc.)
     WRITE 4 fd5                 ! assign output data streams to fd’s
     (etc.)
     START                       ! go

During and after START it monitors responses. If an ERROR comes back it may send an EXIT to the child if the corresponding con_mask bit is not set.

Returns 0 on normal child exit. Returns with the NCMD_ERR status bit on abnormal exits.

Overview of the Child Interface

More complete details for how the child program works are shown below in the Example Child Program section. In brief, the child calls three primary functions. The first is ncmd_c_init which creates the necessary data structure. (Note that although the parent and child ncmd data structures share many field names they are actually different types: NCMD_P_NEXUS and NCMD_C_NEXUS.) The child then repeatedly calls ncmd_all_io to obtain and send data. The child does no explicit IO on data streams but it may send an interface message directly to the parent. Between calls to this function it performs whatever processing that is required. The child controls when ncmd_all_io will return by setting the bits in the loop mask of the child’s ncmd data structure. For instance

   NCMD_LOOP_MASK = (NCMD_DATA_IN_SOME | NCMD_CLOSED_IN_ALL | NCMD_DATA_OUT_LTMNW_SOME);

tells the ncmd_all_io function to not return until either there is some input data, all the inputs have closed, or at least one of the outputs contains less than minwrite bytes of data. The same bits are used to return complex status information via the function’s return value. So the child can use
   if(ret & NCMD_DATA_IN_RD_SOME)

to see that new data is present in one or more input buffers. The complete set of bits is described in the Mask Bits section below. Finally, once all of the data has been read, processed, and sent off, the child releases the data structure with ncmd_c_free and then exits.

Mask Bits

Here are all of the mask bits used by the child program in its interactions with ncmd_all_io. These are either preset in the exit mask or returned in the status value:

NCMD_RUNNING reading/writing data and commmanonds, also processing NCMD_DATA_IN_RD_NONE data read on no open input streams NCMD_DATA_IN_RD_SOME data read on >=1 open input streams NCMD_DATA_IN_RD_ALL data read on all open input streams NCMD_DATA_IN_NONE data present >= 1 bytes on no input streams NCMD_DATA_IN_SOME data present >= 1 bytes on >=1 input streams NCMD_DATA_IN_ALL data present >= 1 bytes on all input streams NCMD_DATA_OUT_WR_NONE data written on no open output streams NCMD_DATA_OUT_WR_SOME data written on >=1 open output streams NCMD_DATA_OUT_WR_ALL data written on all open output streams NCMD_DATA_OUT_NONE data present on no open output streams NCMD_DATA_OUT_SOME data present on >=1 open output streams NCMD_DATA_OUT_ALL data present on all open output streams NCMD_DATA_OUT_LTMNW_NONE data present <minwrite bytes on no open output streams NCMD_DATA_OUT_LTMNW_SOME data present <minwrite bytes on >=1 open output streams NCMD_DATA_OUT_LTMNW_ALL data present <minwrite bytes on all open output streams NCMD_CLOSED_IN_SOME closed >=1 input streams NCMD_CLOSED_IN_ALL closed all input streams NCMD_CLOSED_COMMAND closed command stream

Special bits: (these may be set but are never returned as status)

NCMD_FLUSH Set minwrite to 1, so that all output buffers empty. NCMD_CLEANUP Emit DONE WRITE for any currently open output streams. Emit DONE READ for any currently open input streams. Emit DONE. SKIP the regular IO section (IO must have completed). Return an empty status mask. NCMD_IGNORE_COMMAND ignore commands from parent.

Example child program

/*
Program:   stub_child_process.c
Version:   0.1
Date:      03-DEC-2007
Author:    David Mathog, Biology Division, Caltech
email:     mathog@caltech.edu
Copyright: 2007 David Mathog and California Institute of Technology (Caltech)
License:   GNU General Public License 2.
Description:

This stub program implements the communications protocol needed to run as a -process in nettee. See the comments in that program for the full syntax. This example just echoes all input to all outputs, monitors for command signals, and returns status signals. If the input files are larger than the buffers their contents will be interleaved in more or less buffer sized chunks into the output. This is a binary method so EOL in the input files has no special meaning.

Compiles cleanly with:

% gcc -Wall -pedantic -std=c99 -D_POSIX_SOURCE -o stub_child_process stub_child_process.c rb.c nio.c

Dependencies: rb.c ring buffer routines nio.c nettee cmd routines

Changes: V 0.1 03-DEC-2007, David Mathog <mathog@caltech.edu. First version

*/ #include <stdlib.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <sys/types.h> #include <sys/stat.h> #include <sys/wait.h> #include <sys/poll.h> #include <netdb.h> #include <sys/time.h> #include <sys/mman.h> #include <netinet/tcp.h> #include <assert.h> #include <ctype.h> #include "rb.h" /* ring buffers */ #include "nio.h" /* nettee io and ncmd pieces */

/* prototypes */

/* globals NONE */

int main(int argc, char **argv){ /* Your variable declarations go here */ int i,j; int ret; int maxmove; NCMD_C_NEXUS *ncmd=NULL;

/* Your command line arguments are processed here */

/* sets default values in the ncmd structure */ ncmd=ncmd_c_init(); ncmd->debug = 0; /* If 1, then emit NCMD_DEBUG messages */

/* Here we set up a mask which determines when ncmd_all_io() will return, here the conditions are: NCMD_DATA_IN_SOME = some input data is available, it may not have been read this cycle NCMD_CLOSED_IN_ALL = the last input stream has closed NCMD_DATA_OUT_LTMNW_SOME = at least one output buffers has been written to the greatest extent possible, (They may not be EMPTY if minwrite > 1!)

Note that this example assumes that anything put in the output buffers will be written eventually. That might not happen if minwrite was set larger than the size of one of the input files. To handle cases like that one would test for conditions like (ret & NCMD_DATA_OUT_SOME) && (ret & NCMD_DATA_WR_NONE) */

NCMD_LOOP_MASK = (NCMD_DATA_IN_SOME | NCMD_CLOSED_IN_ALL | NCMD_DATA_OUT_LTMNW_SOME);

/* main loop here, this example just copies all input(s) to all output(s). It copies only after one full set of writes, to all output streams, has completed. */

while(1){ ret = ncmd_all_io(ncmd); if(ncmd->debug)ncmd_say_status("DEBUG child MAIN, after ncmd_all_io",ret); if(ret & NCMD_CLOSED_IN_ALL){ NCMD_DEBUG( (void) fprintf(stderr,"DEBUG, in child MAIN all input closed\n")); if(ret & NCMD_DATA_IN_SOME){ NCMD_DEBUG( (void) fprintf(stderr,"DEBUG, in child MAIN input data remains\n")); /* if all the inputs have closed we just need to finish moving whatever is already in the input buffers. Remove the condition for closed_in_all or the IO loop will not execute anymore since that status bit will be stuck "on". Set flush so that the output buffers will empty no matter how little data they contain (ie, ignore minwrite). Note that NCMD_FLUSH is not a status bit, so it can’t ever prevent the IO loop from executing. */ NCMD_LOOP_MASK = (NCMD_DATA_IN_SOME | NCMD_DATA_OUT_LTMNW_SOME | NCMD_FLUSH); } else if(ret & NCMD_DATA_OUT_SOME){ NCMD_DEBUG( (void) fprintf(stderr,"DEBUG, in child MAIN output data remains to be flushed\n")); NCMD_LOOP_MASK = (NCMD_DATA_OUT_LTMNW_SOME | NCMD_FLUSH); } else { NCMD_DEBUG( (void) fprintf(stderr,"DEBUG, in child MAIN input data has been consumed\n")); /* We are done. However... 1. Use NCMD_CLEANUP to force the DONE WRITE values out and force status to 0. 2. Command could still be open, use NCMD_IGNORE_COMMAND to force command to be ignored, otherwise ncmd_all_io will loop forever waiting for the next command. */ NCMD_LOOP_MASK = NCMD_CLEANUP | NCMD_IGNORE_COMMAND; } } if((ret & NCMD_DATA_OUT_LTMNW_SOME) && (ret & NCMD_DATA_IN_SOME)){ /* This is the place to put the code which actually does something with the IO Streams. In this example for each pass where there is input data all of the output streams are scanned to find out the maximum number of bytes which can be moved to all of them. Then that amount is transferred. */ NCMD_DEBUG( (void) fprintf(stderr,"DEBUG, in child MAIN have input\n")); /* If most output data has been written and input data is available */ /* There is input to work with and space in the output buffers are available, so copy at least some of it */ for(j=0; j < NCMD_IO_TOTAL; j++){ /* over all possible inputs */ /* This is an input and has data available. It isn’t necessary to check for a valid fd since no data could have been read on an invalid fd. */ if(NCMD_RB_LEN(j) && NCMD_IS_IN(j)){ NCMD_DEBUG((void) fprintf(stderr,"DEBUG, in child MAIN SRC of data %d\n",j)); maxmove=NCMD_RB_LEN(j); for(i=0; i< NCMD_IO_TOTAL; i++){ if(NCMD_IO_FD(i) >= 0 && NCMD_IS_OUT(i)){ if(NCMD_RB_FREE(i) < maxmove)maxmove=NCMD_RB_FREE(i); } } /* copy the data from this input buffer to each of the (open) output buffer(s). The buffers are the same size, so it will always fit. */ for(i=0; i< NCMD_IO_TOTAL; i++){ if(NCMD_IO_FD(i) >= 0 && NCMD_IS_OUT(i)){ NCMD_DEBUG((void) fprintf(stderr,"DEBUG, in child MAIN DST of data %d\n",i)); NCMD_DEBUG((void) fprintf(stderr,"DEBUG, in child MAIN move bytes: %8d\n",maxmove)); ret = rb_memcpy(NCMD_RB_CELL(i), NCMD_RB_CELL(j), maxmove, RB_SRC_COPY, NCMD_MINREAD); if(ret != maxmove){ send_status_exit("FAILED OTHER \"programming error, memcpy failed \"", NCMD_TOTAL_READ, 0, EXIT_FAILURE); } } } rb_adjust(NCMD_RB_CELL(j),maxmove); /* throw out data in this input buffer */ break; /* all output buffers were filled from this one input buffer */ } } } else if(ret==0){ /* We are completely done. */ break; } /* there may be other ret values returned, but all just cause it to cycle back for another round */ } NCMD_DEBUG((void) fprintf(stderr,"DEBUG child main, immediately before program exit\n")); ncmd_c_free(ncmd); exit(EXIT_SUCCESS); }

Example parent program

Before running this sample program create two text files /tmp/input1.txt and /tmp/input2.txt and put some data in them. Compile both the parent and child example programs and leave them in the same directory. The program will create three output files /tmp/output3.txt, /tmp/output4.txt,and /tmp/output3.txt.

/*
Program:   stub_parent_process.c
Version:   0.1
Date:      03-DEC-2007
Author:    David Mathog, Biology Division, Caltech
email:     mathog@caltech.edu
Copyright: 2007 David Mathog and California Institute of Technology (Caltech)
License:   GNU General Public License 2.
Description:

This is a stub showing how the parent program nettee interacts with the child through the nettee cmd interface. It may be used as a scaffold upon which to build other programs which can call the same programs that netttee does. gcc -Wall -pedantic -std=c99 -D_POSIX_SOURCE -o stub_parent_process stub_parent_process.c nio.c rb.c

*/

#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/stat.h> #include <fcntl.h> #include "nio.h"

#define MAXSTREAMS 8 #define INPUTS 2 #define OUTPUTS 3 #define IO (INPUTS + OUTPUTS) #define MINREAD NCMD_DEFAULT_MINREAD /* minimum contiguous free space for a read, must be >= 1 !*/ #define MINWRITE 1 /* rb parameter, write even if only 1 byte of data in buffer */ #define MAXREAD 1024 /* maximum bytes read per src before allowing other IO */ #define MAXWRITE 1024 /* maximum bytes written per dst before allowing other IO */ #define SIZE 1024 /* rb parameter, buffer size */ #define MISSES 3 /* may miss status 3 times before error */ #define INTERVAL 2 /* 2 second status */ #define CONMASK (NCMD_ERR_CONWF | NCMD_ERR_COLWF | NCMD_ERR_CONRF | NCMD_ERR_COLRF) /* no errors are fatal, this is a pretty unusual setting... */

int main(void){ pid_t child; int sv_down[2]; /*these are used for the socket*/ int sv_up[2]; /*these are used for the socket*/ char string[1000]; int i; int status;

NCMD_P_NEXUS *ncmd;

ncmd = ncmd_p_init( IO, SIZE, MINREAD, MINWRITE, MAXREAD, MAXWRITE, MISSES, INTERVAL, CONMASK); if(!ncmd)insane("fatal error: creating of parent ncmd structure failed"); /* Open all inputs: /tmp/input1.txt and /tmp/input2.txt, these should already have data in them */ for(i=0; i<INPUTS; i++){ sprintf(string,"/tmp/input%d.txt",i+1); ncmd->io_fd[i]=open(string,O_RDONLY); if(!ncmd->io_fd[i])insane("fatal error: open of an input failed"); ncmd->io_type[i] = NCMD_FD_IN + NCMD_FD_LOCAL; } /* Open all outputs, stdout and /tmp/output3/4/5.txt */ for(; i<IO; i++){ sprintf(string,"/tmp/output%d.txt",i+1); ncmd->io_fd[i]=open(string,O_WRONLY | O_CREAT,0666); if(!ncmd->io_fd[i])insane("fatal error: open of an output failed"); ncmd->io_type[i] = NCMD_FD_OUT + NCMD_FD_LOCAL; } for(i=0;i<IO; i++){ (void) fprintf(stderr,"DEBUG parent io_fd[%d] fd %d type %d\n",i,ncmd->io_fd[i],ncmd->io_type[i]); (void) fflush(stderr); }

sprintf(string,"./stub_child_process"); (void) fprintf(stderr,"pre fork, command is %s\n",string); (void) fflush(stderr);

if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv_down)) {insane("fatal error: socketpair down failed");} if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv_up)) { insane("fatal error: socketpair up failed");} ncmd_p_preserve_std(ncmd); /* dup stdin and/or stdout in either datain or dataout */ child=fork(); if (child > 0) { ncmd_p_setchild(ncmd, child, sv_down[1], sv_up[0]); /* write commands to sv_down[1] read from sv_up[0] */ (void) fprintf(stderr,"DEBUG i am PARENT \n"); (void) fflush(stderr); status = ncmd_control_child(ncmd); } else { (void) fprintf(stderr,"DEBUG i am CHILD with command: %s\n",string); (void) fflush(stderr); dup2(sv_down[0], 0); /* socket replaces stdin, read commands from here */ dup2(sv_up[1], 1); /* socket replaces stdout, write status to here */ ncmd_set_nonblock(0); ncmd_set_nonblock(1); execlp("sh", "sh", "-c", string,NULL); insane("fatal error: exec for cmd failed"); } exit(EXIT_SUCCESS); }

Example script for running the example child program

It is usually easiest to first debug the child program separately before moving on to use it with nettee -process. That way if something goes wrong it happens on only a single node instead of, as Murphy dictates, every node in the distribution tree. Because the interface is so simple child programs may be started directly from shell scripts, as is shown below, or even directly from the command line. In this example three input files are created. The example child program is then fed a series of commands and runs. The three output files will consist of the contents of input2.txt concatenated to those of input1.txt. Here it is run within valgrind, so that memory access problems may be detected.

#!/bin/sh
rm -f /tmp/output1.txt
rm -f /tmp/output2.txt
rm -f /tmp/output3.txt
cat >/tmp/input1.txt <<EOD
from input 1 line 1
from input 1 line 2
from input 1 line 3
EOD
cat >/tmp/input2.txt <<EOD
from input 2 line 1
from input 2 line 2
from input 2 line 3
from input 2 line 4
EOD
valgrind ./stub_child_process\
 <<EOD \
  >test_stdout.log\
 2>test_stderr.log\
 3</tmp/input1.txt\
 4</tmp/input2.txt\
 5>/tmp/output1.txt\
 6>/tmp/output2.txt\
 7>/tmp/output3.txt
STATUS 2
SIZE 512
STREAMS 5
IO 0 3  5
IO 1 4  9
IO 2 5  6
IO 3 6  6
IO 4 7 10
START
EOD

RELATED PROGRAMS

Man pages: netcat(1) nettee(3)

nettee is derived from Felix Rauch’s dolly which is available here: http://www.cs.inf.ethz.ch/CoPs/patagonia/#dolly

The nettee home page is: http://saf.bio.caltech.edu/nettee.html

COPYRIGHTS

Copyright: 2008 David Mathog and Caltech.
Copyright: Felix Rauch and ETH Zurich

LICENSE

Freely distributed under the second GNU General Public License (GPL 2).

AUTHOR

David Mathog
Biology Division, Caltech


nettee_cmd 0.2.0 nettee (1) MAR 2008
Generated by manServer 1.07 from /usr/common/man/man3/nettee_cmd.3 using man macros.