fszmq


Node Coordination

Synchronized publisher

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
#r "fszmq.dll"
open fszmq

let [<Literal>] SUBSCRIBERS_EXPECTED = 10
// we wait for 10 subscribers

let main () =
  use context = new Context ()

  // socket to talk to clients
  use publisher = Context.pub context
  Socket.setOption publisher (ZMQ.SNDHWM,1100000)
  Socket.bind publisher "tcp://*:5561"

  // socket to receive signals
  use syncservice = Context.rep context
  Socket.bind syncservice "tcp://*:5562"

  // get synchronization from subscribers
  printfn "Waiting for subscribers"
  let rec loop subscribers =
    if subscribers < SUBSCRIBERS_EXPECTED then
      // - wait for synchronization event
      syncservice
      |> Socket.recv
      |> ignore
      // - send synchronization reply
      ""B |> Socket.send syncservice
      loop (subscribers + 1)
  loop 0

  // now broadcast exactly 1M updates followed by END
  printfn "Broadcasting messages"
  for _ in 0 .. 1000000 do
    "Rhubarb"B |> Socket.send publisher
  "END"B |> Socket.send publisher

  0 // return code
module docs
module PATH

from docs
val hijack : unit -> unit

Full name: docs.PATH.hijack
namespace fszmq
Multiple items
type LiteralAttribute =
  inherit Attribute
  new : unit -> LiteralAttribute

Full name: Microsoft.FSharp.Core.LiteralAttribute

--------------------
new : unit -> LiteralAttribute
val SUBSCRIBERS_EXPECTED : int

Full name: Syncpub.SUBSCRIBERS_EXPECTED
val main : unit -> int

Full name: Syncpub.main
val context : System.IDisposable
val publisher : System.IDisposable
val syncservice : System.IDisposable
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
val loop : (int -> unit)
val subscribers : int
val ignore : value:'T -> unit

Full name: Microsoft.FSharp.Core.Operators.ignore
val release : unit -> unit

Full name: docs.PATH.release
Fork me on GitHub