signature CHANNEL structure Channel : CHANNEL
A fully concurrent imperative channel abstraction.
A consumer takes elements available at the beginning of the channel. A producer inserts elements in the channel, either at the beginning (LIFO) or at the end (FIFO). Channels are thread-safe: many consumers and producers can operate concurrently on the same channel.
Note: Channels contain implicit locks. If you stop a thread while it is manipulating a channel, it may cause all further access to the same channel to block, until the thread is restarted.
import signature CHANNEL from "x-alice:/lib/data/CHANNEL-sig" import structure Channel from "x-alice:/lib/data/Channel"
signature CHANNEL = sig type 'a channel type 'a t = 'a channel exception Empty val channel : unit -> 'a channel val put : 'a channel * 'a -> unit val push : 'a channel * 'a -> unit val get : 'a channel -> 'a val pop : 'a channel -> 'a val peek : 'a channel -> 'a val clone : 'a channel -> 'a channel val isEmpty : 'a channel -> bool val close : 'a channel -> unit val isClosed : 'a channel -> bool val waitClosed : 'a channel -> unit val purge : 'a channel -> unit val app : ('a -> unit) -> 'a channel -> unit val appNB : ('a -> unit) -> 'a channel -> unit val toList : 'a channel -> 'a list val toListNB : 'a channel -> 'a list end
The type of polymorphic channels.
Used to indicate invalid accesses to an empty channel. Equal to List.Empty.
Creates a new channel that is initially empty.
Producer: Adds the value x at the end of the channel ch.
Producer: The value x is inserted at the beginning of the channel ch. That is, next read (peek or get) will return x.
Consumer: Remove and returns the first element of channel ch if available. Raise Empty if the channel is closed and empty. Otherwise, blocks until an element is inserted (put or pushed).
Returns the first element of channel ch if available, but does not remove it from the channel (it is not a consumer). Like get, it may block or raise Empty.
Returns a new channel initialized with the elements of channel ch.
Returns true iff the channel is empty, that is, if reading an element would block.
Closes channel ch: all subsequent writes (push and put) will have no effect.
Returns true iff the channel has been closed with close.
Returns unit if the channel is closed, or blocks until it is closed.
Consumer: Remove all elements of the channel ch.
Consumer: Takes elements out of the channel and applies function f to each of them. Does not return until the channel is closed.
Consumer: Takes elements out of the channel and applies function f to each of them. Returns as soon as the channel is empty.
Consumer: Returns a lazy list with the elements of the channel. Elements of the list are removed from the channel when they are evaluated. Reading the list blocks if the channel is empty.
Consumer: Returns a lazy list with the elements of the channel. Elements of the list are removed from the channel when they are evaluated. Reading the list returns an empty list if the channel is empty.
The following functor creates a module which evaluates tasks sequentially.
functor NewSequence () = struct (* A new channel for jobs that must be run sequentially. *) val jobs = Channel.channel () (* f () will be evaluated sequentially *) fun add f = Channel.put (jobs, f) fun app f arg = add (fn () => f arg) (* Run jobs. *) val _ = spawn (Channel.app (fn job => job()) jobs) (* Nicely stop all jobs. *) fun stop () = (Channel.close jobs ; Channel.purge jobs) end
Usage:
structure Log = NewSequence () val printInt = print o Int.toString val _ = Log.app print "First Message" val _ = Log.app printInt 32