Data Manager Class & Methods in S

A more descriptive account of the use of the class and methods implemented here is given elsewhere. Each S object defined here is accompanied with a brief description of its purpose and the difficulties and subtleties of implementing and using it in the S user level Threads API

Class Structure

The class extends thread and uses several threadLock objects to manage and protect data shared by multiple threads. The data slot acts as the table in which the objects held by the Manager are stored. The threads slot is another threadLock object that is used for storing thread handles for objects that are currently being created and that will be inserted into the data threadLock table. The notify threadLock can be used to have other threads notified of the creation of a given object, based on its name in the data table. The notification amounts to sending an expression to the relevant thread(s) via sendTask().

  setClass("DataManager",
            representation(
                            "thread",
                            data="threadLock",
                            threads="threadLock",
                            notify="threadLock"
                          )
)

Constructor

This is the function that is called to create an instance of the class DataManager.
what
The argument what should be a named list of expressions that are used to create the objects to be inserted into the manager's data table. The names are used to identify (for inserting, removing and retrieving) the objects in the manager.
start
The argument start argument indicates whether this manager thread is started immediately. or created in suspend mode. If this is F, the thread must be start()ed by some thread in the future.
priority
The priority argument refers to the value to use for the priority of the constructor threads spawned by the manager to create the objects. This does not set the priority of the manager thread itself. That can be controlled using attributes argument.
notify
The notify object should be a named list of pairs. The names correspond to the names of what (and if omitted, those values are used) Each pair should itself be a named list containing the fields expression and thread. The idea is that each element of expression is evaluated by the thread in the corresponding element of thread when the relevant object is added to the data table. This is a form of broadcasting to interested parties rather than having them poll the existence of an object.
DataManager <- function(what,
		        notify, 
		        start = T,
		        priority=Thread.minPriority, attributes=threadAttributes())
  {
      # create a DataManager object - this takes care of the necessary
      # fields such as the threadLocks used for the pending and data table.
    dmgr = new("DataManager")
       # create the datamanager thread that handles the initialization
       # of the objects in the table. This differs from the Java
       # implementation.
 

     if(!is.null(notify))  {
      if(is.null(names(notify)) && length(notify) == length(what))
        names(notify) = names(what)
      for(i in names(notify)) {
         # this is rather crude, but not the point of this example!
        if(match("expression",names(notify[[i]])))
         names(notify[[i]])[[1]] = "expression"

        if(match("thread",names(notify[[i]])))
         notify[[i]][["thread"]]  = self()
        assign(i, list(expression=notify[[i]], thread=self()), where=dmgr@notify)
      }
     }

    dmgr@thread =  thread(Quote(loadObjects()),
                            start=start,
                              data=list(init.objects=what,
                                          notify=notify, priority=priority))


    return(dmgr)
  }

The Thread Constructor Expression/function

This function is called by the data manager thread when it is started. It gets its inputs from the manager's own frame. These objects have been assigned there in the constructor. This function should only be called by the manager thread itself.

The function locks both the thread and data slots of the manager and creates the threads. Note the order in which the threads are lock and unlocked. The acquisition and release are reversed to ensure the proper nesting and avoid potential deadlock and race condition problems.

loadObjects <- function(where=self()) {

   succeeded = 0
   if(exists("init.objects", frame=0)) { 
    getLock(data)
    getLock(threads)
      for(i in names(init.objects))  {
   	      # default.priority is in the thread's frame 0
   	      # we ignore the where argument for addDataSet
   	      # as the default is self() and we are calling this
   	      # from within the  DataManager thread.
   	  data.set.thread = addDataSet(init.objects[[i]], default.priority,where=where)
   	  if(!is.null(data.set.thread)) 
   	     assign(i, data.set.thread, where=threads)

      }
    succeeded = length(threads)
    yieldLock(threads)
    yieldLock(data)
  }
}

Update Threads

This method removes the thread associated with the name what in the manager object. If stop is true and the thread associated with what is alive, it is cancelled. The threads object of manager is modified with the appropriate thread object removed if it exists when the function is evaluated.
 updateThreads.dataManager <- function(what,manager, stop=T) {
  tmp.thread = NULL
  getLock(manager@threads)
          #  I hold the lock so get, exists and remove must still be able to work  
     if(exists(what,where=manager@threads))   {
       if(stop) 
          tmp.thread = get(what,where=manager@threads)
        remove(what,where=manager@threads)
     }
   yieldLock(manager@threads)


   if(stop && !is.null(tmp.thread)) {
     # the thread could finish in between evaluating the condition and
     # calling cancel. But the Evaluator Manager should take care of
     # this when evaluating the internals of the call to cancel.
    cancel(tmp.thread)
   }

  getLock(manager@threads)
   return(length(objects(where=manager@threads))) # return the number of
                                          # threads pending
  yieldLock(manager@threads)
 } 

Dead Thread Collector

This function taks no arguments and works on the threads in the Data Manager's pending threads list. Those that are dead/complete are removed.
  updateThreads.collector <- function()
  {
    getLock(threads) 
    for(i in objects(where=threads)) {
     val = get(i,where=threads)
     if(is.null(i) || !isAlive(i))
        remove(i, where=threads)
    }

    yieldLock(threads)
  }

Add a Data object to the Table

This method the object, as is, into the manager's data table using the name as the identifier. No locks are used other than data database lock that is implicit in the call to assign()
 addDataSet <- function(obj, name, manager)
 {
   assign(name, data,where=manager@threads) 
 }

Add an object to the table by creating it.

This method is called to create a new object that will be added to the manager's data table when it is constructed asynchronously by a background thread. That thread handles the insertion of the object into the manager's table using insertData(). The argument obj is an expression.
  addDataSet <- function(obj, name, manager=NULL, priority = manager.priority)
  {
    # take the expression and modify it so that we can
    # assign the object into the thread
    getLock(manager.threads)
      tmp.thread = thread(obj, priority=priority, data=list(manager = manager))
      if(!is.null(tmp.thread))
        assign(name, tmp.thread, where=manager.threads)
    yieldLock(manager.threads)

}

Query Manager

This method returns whether an object with the given name is currently in the manager's data table. We use the implicit lock on the manager@data object provided by exists()
 manages <- function(name, manager) {
   return(exists(name, where=manager@data))
  }

isPending

This functon determines the status of an object in the manager by its name. It returns the existance of an element in the thread database of the manager.

Note that here we use only the implicit locking of the database in the call to exists(). Locking is used explicitly in updateThreads().

 isPending <- function(name, manager)
  {
    updateThreads()  # do some garbage collection first
    return(exists(manager@threads))
  }

getDataSet

This is the simple method for retrieving an object from the Manager. It simply looks in the data table and returns the object associated with the given name if it exists or NULL otherwise. The examination of the table is performed inside a lock held by the calling thread. Here we check for the existsence of the object rather than just naïvely getting it. So in this case, we can't rely on the database locking mechanisms as it is possible to have exists() return T and between the call to exists() and get() another thread remove()s the object in question. Locking the database that contains the object prevents this possibility.
getDataSet <- function(name, manager)
   {
     ans = null
     getLock(manager@data)
      if(exists(name,where=manager@data))
        ans = get(name, where=manager@data)
     yieldLock(manager@data)

    return(ans)
   }

getDataSet or Create it.

This method gets the data set using the method above. If the object exists at the time the expression is evaluated, it is returned immediately. Otherwise, we wait for it to be created. If a thread is currently constructing the object, we wait for it to finish and perfom the insertion the object into the table before we copy the object from the table. We do however change the priority of this constructor thread if priority is supplied in order to allow the calling thread control the importance of the object. If no such constructor thread is currently active (isPending()), we spawn a thread and have it construct the object using the how and priority arguments, if supplied.

Note that we use a lock-condition where we could also use a call to join(). The latter approach may be better since we don't have to evaluate the condition each time a thread modifies the pending list. We use the lock-condition more as an example.

If thread A knows that a manager thread has already spawned the constructor thread for an object named "obj", it could monitor the manager's data threadLock object directly and block its own execution until that object was in the table. For example,

  addDataSet("obj", manager=dmgr, how=Quote(createObject("obj")))
   ..
   ..
   getLock(dmgr@data, condition = Quote(exists("obj", where=dmgr@data))
     # now it is there and we hold the lock
     obj = get("obj", where=dmgr@data)
   yieldLock(dmgr@data)
Note that this thread exploits knowledge of the structure of a DataManager object rather than using methods to get access to the slots. In which thread are the conditions in a getLock call evaluated? What is the name space? I believe that the expression should be evaluated in the current state of the thread which calls getLock . The evaluator manager takes care of evaluating the expression in that context.
getDataSet <- function(name, manager, how=NULL, priority = manager@priority)
{
  obj = getDataSet(name, manager)

  if(!is.null(obj))
   return(obj)

  # so not in the table yet.
  # so look in the list of pending threads

 pending = F
 getLock(manager@threads)
  if(exists(name, where=manager@threads)) {
   pending = T
   if(!missing(priority)) {
     tmp.thread = get(name,where=manager@threads)
     setPriority(tmp.thread, priority)
   }
  }
 yieldLock(manager@threads)

  if(pending) {
     # could join on this thread or we can wait for the
     # pending thread list to be changed and this thread to
     # be removed. Joining is probably better since we don't
     # have to evaluate the condition each time a thread modifies
     # the pending list. However, if we join, we must do so
     # outside of the lock or no other thread can modify the threads list
    getLock(manager@threads,condition= Quote(exists(name,where=manager@threads)))

    return(getDataSet(name,manager))
  }

  # so not pending and not in the table, so make it.

  getLock(manager@threads)
    tmp.thread = addDataSet(how,name, manager)
    assign(name, tmp.thread, where=manager@threads)
  yieldLock(manager@threads)

  join(tmp.thread)  # wait for the thread to finish.
  sendTask(Quote(updateThreads(), thread=manager@thread)
  return(getDataSet(name,manager))
}

Remove a Data Set from the Table

The object identified by name is removed from the manager object. This involves removing any thread that is currently creating an object of that name and removing any object in the data table of that name. Again, we don't want the locks around the database function as these functions take care of locking themselves and making their operations atomic!
removeDataSet <- function(name, manager)
{
  ans = T
   # handle the case in which there is a thread already
   # running
  if(isPending(name, manager)) {
    ans =updateThreads(name, manager,stop=T))  # have to make updateThreads reutrn logical
                                              # indicating success or failure
  }

   # now check the table
  getLock(manager@data)
    remove(name, where=manager@data)
  yieldLock(manager@data)

 return(ans)
}

length()

This returns the number of objects currently being managed by the relevant manager.

A lock is obtained to ensure other threads don't modify the table. Again, we don't want the locks around the database function as these functions take care of locking themselves and making their operations atomic!

 length <- function(manager)
  {
    getLock(manager@data)
      n = length(objects(where=manager@data))
    yieldLock(manager@data)
   return(n)
  }

names()

This returns the names of the objects in the data slot of the manager. These are the objects currently available and being managed by the manager. A lock is obtained to ensure that no other thread adds or removes an object during the request for the names.

Again, we don't want the locks around the database function as these functions take care of locking themselves and making their operations atomic!

  names <- function(manager)
   {
     getLock(manager@thread)
      nms = objects(where=manager@data)
     yieldLock(manager@thread)
   }

Status Message

Lock both the data table and pending threads simultaneously so that a thread can't insert an object into the table as we get stats.
 status <- function(manager) {
   getLock(manager@data)
   getLock(manager@threads)

     ans = list(num.objects=length(manager@data), num.threads=length(manager@threads))

   yieldLock(manager@threads)
   yieldLock(manager@data)

  return(ans)
 }

insertDataSet

This function is designed to be called by a thread that is creating an object on behalf of the Manager. That thread should call this function as the last commands of the object construction task. (These will typically be added to the expression that creates the object in addDataSet. The function assigns the object to the manager's data table and removes itself from the list of pending threads. In both operations, it grabs the lock on the relevant objects in the manager object.

Note that the notification performed in this function will come from the thread performing the construction rather than the Manager itself. (Next we need methods for faking messages!;-))

insertDataSet <- function(obj, name, manager)
{
   # don't want these locks
   # any thread waiting on a condition in this threadLock object (manager@data)
   # will have the condition re-evaluated when the lock is yielded
  getLock(manager@data)
    assign(name, obj, where=manager@data)
  yieldLock(manager@data)

   # ensure that this thread is removed from list of pending threads
  updateThreads(name, F)

   # now make sure the asyncrhonous notifications are handled
  getLock(manager@notify)
   if(match(name, names(manager@notify)) {
    destination.threads = manager@notify[[name]]
    for(i in destination.threads) {
        # how do we put the type of action in - e.g. ADDED_OBJECT, REMOVE_OBJECT 
      sendTask(i$expression, i$thread, wait=F)
    }
   }
  threadLock(manager@notify)

  invisible(return(T))  # not needed because this function is to be
                        # called as the last action of a background thread 
}

kill

The kill() method is not needed in the S model as we can send a cancel to that thread quite easily. We can also use that approach in the Java implementation. However, we can do it more gracefully by sending a request to the manager's thread to have it terminate itself with the relevant exit handlers and user supplied expressions invoked. (Java of course has the final construct.)

run method

S's threads will stay alive waiting for a task to be sent to it via sendTask(). Thus we don't have to spin idly as Java does. And we can send a "signal" to the Manager as is done in the Java implementation using sendTask(). However, this signal has complete semantic information in that it can be an expression. (Security issues here and hence we have access! Think CGI scripts and Perl's eval therein.)

Example: Document and Data Set Browser

The example is relatively simple but illustrates the way in which the Data Manager class can be used.

A window is displayed which contains 2 buttons, an empty list and a text widget. (In the adjacent figure, we also have a status completion bar, but we ignore that here.) The two buttons allow the user to quit the browser and to determine the status of the manager and provide a display of the number of objects still being constructed/pending. The list is used to display the names of the objects of interest. As these objects are constructed and become available to the browser, the names are added to the list. Double clicking on an entry in the list displays the objects contents in the text area. So this behaves like an interactive print in S. The sample view shows the contents of the file doc2.html in the text area.
We gloss over the details of how the window and widgets are created. (See GUI development for these details.) We assume this is done via a call to the function make.gui(). So our main thread, A say, might look like the following

Thread A
 1 base = "file:///home/duncan/RESEARCH/GUI/Examples/Java/Data"
 2 
 3 docs = paste(0:4,".html",sep="")
 4 data = as.character(0:4)
 5 
 6 docs = paste(paste(base,"doc",sep=""),docs,sep="/")
 7 data = paste(paste(base,"data",sep=""),data,sep="/")
 8 
 9 how = vector("list",length(docs)+length(data))
10 ctr = 0
11 for(i in docs) {
12  how[[ctr]] = substitute(readDoc(d),list(d=i))
13  ctr = ctr+1
14 }
15 
16 for(i in data) {
17  how[[ctr]] = substitute(readData(d),list(d=i))
18  ctr = ctr+1
19 }
20 
21 notify = vector("list",length(how))
22 names(notify) = names(who)
23 
24 for(i in names(notify))
25   notify[[i]] = list(expression=
26			 substitute(addItem(xlist,what),list(what=i)), thread = self())
27 
28  # now create the data manager.
29 dmgr = DataManager(how,notify)
30 
31 
32 guiTree = make.gui
33 xlist = getList(guiTree)
34 
35 
36 show(guiTree)
37 
38  thread sleeps waiting for tasks from
39  the event loop processing user events
Lines 1 through 26 create the relevant objects for the Data Manager. Other covenience routines could be created that would hide the details of this that would use knowledge of the context and objects being created. The important things to note are

Some user's will find notify mechanism complicated or out of the S style of evaluation. An alternative mechanism is available that has the same semantics. This approach uses a second thread to wait for notification of new objects in the data slot of the manager. This thread is spawned by the main thread in our example above. While the order in which the actions are done is flexible, it is easiest to create the GUI and then spawn the waiting thread.

32 guiTree = make.gui
33 xlist = getList(guiTree)
34 
35 thread(substitute(updateList(manager),list(manager=dmgr)),data=list(xlist=xlist))
36 show(guiTree)
The function updateList() looks something like the following:
updateList <- function(manager) {

     # first, insert all the elements already in the data table
     # that were created before this thread was started.
  getLock(manager@data)
   items = names(manager)
   for(i in items)
    addItem(xlist)
  yieldLock(manager@data)


  while(T) {
       # always be notified if anything happens
    getLock(manager@data, condition=T)

       # now pick out the new threads
    new.items = new.items[match(names(manager),items,nomatch=0)==0]
    for(i in new.items)
      addItem(xlist)
  }

invisible(return(NULL))
}

One might think that we could just track changes in the length of the data table. However, one thread could add a data set to the table and a second thread remove a different one. We will be notified of both but we may not obtain the lock when we are woken up. Instead, we are competing for the lock and have just registered for the competition rather than guaranteed winning it. So, it is possible that the length would be the same when we get to evaluate the expression, but the contents would differ. Such effects are common in threaded applications that never arise in single-threaded sequential code. In general, one has to consider all possible thread interactions to validate an algorithm. This is what makes developing threaded code complicated and time consuming.


Next, we consider a different style of synchronization in that it uses regular S objects rather than S databases (e.g. threadLock objects) as the shared variables. We modify the existing example so that threads slot in the DataManager class is a threadGroup object. Should use ThreadGroup for the thread slot.
Duncan Temple Lang<duncan@stat.Berkeley.EDU>
Last modified: Fri Feb 28 14:43:52 1997