fszmq


Task Sink (design 2)

Binds PULL socket to tcp://localhost:5558

Collects results from workers via that socket

Update:

Adds PUB-SUB flow to receive and respond to kill signal

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
#r "fszmq.dll"
open fszmq
open System.Diagnostics

let main () =
  use context  = new Context ()

  // Socket to receive messages on
  use receiver = Context.pull context
  Socket.bind receiver "tcp://*:5558"

  // Socket for worker control
  use controller = Context.pub context
  Socket.bind controller "tcp://*:5559"

  // Wait for start of batch
  receiver |> Socket.recv |> ignore

  // Start our clock now
  let watch = Stopwatch.StartNew ()

  // Process 100 confirmations
  for task_nbr in 0 .. 99 do
    receiver |> Socket.recv |> ignore
    printf (if (task_nbr / 10) * 10 = task_nbr then ":" else ".")

  // Calculate and report duration of batch
  watch.Stop ()
  printfn "Total elapsed time: %d msec" watch.ElapsedMilliseconds

  // Send kill signal to workers
  Socket.send controller "KILL"B

  0 // return code
module docs
module PATH

from docs
val hijack : unit -> unit

Full name: docs.PATH.hijack
namespace fszmq
namespace System
namespace System.Diagnostics
val main : unit -> int

Full name: Tasksink2.main
val context : System.IDisposable
val receiver : System.IDisposable
val controller : System.IDisposable
val ignore : value:'T -> unit

Full name: Microsoft.FSharp.Core.Operators.ignore
val watch : Stopwatch
Multiple items
type Stopwatch =
  new : unit -> Stopwatch
  member Elapsed : TimeSpan
  member ElapsedMilliseconds : int64
  member ElapsedTicks : int64
  member IsRunning : bool
  member Reset : unit -> unit
  member Restart : unit -> unit
  member Start : unit -> unit
  member Stop : unit -> unit
  static val Frequency : int64
  ...

Full name: System.Diagnostics.Stopwatch

--------------------
Stopwatch() : unit
Stopwatch.StartNew() : Stopwatch
val task_nbr : int32
val printf : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printf
Stopwatch.Stop() : unit
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
property Stopwatch.ElapsedMilliseconds: int64
val release : unit -> unit

Full name: docs.PATH.release
Fork me on GitHub