--- /dev/null
+#!/usr/local/bin/tclsh8.4
+
+#
+# Copyright 2006 University of Zagreb, Croatia. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+# 3. All advertising materials mentioning features or use of this software
+# must display the following acknowledgement:
+# This product includes software developed by the University of Zagreb,
+# Croatia and its contributors.
+# 4. Neither the name of the University nor the names of its contributors
+# may be used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE UNIVERSITY AND CONTRIBUTORS ``AS IS'' AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE UNIVERSITY OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+# SUCH DAMAGE.
+#
+
+#
+# host_id, peer_id -> ip_address in A.B.C.D form
+#
+# Arrays indexed by $host_id:
+#
+# host_paths_tbl[] # All known path-vectors to $host_id
+# host_attrib_tbl[] # Attributes for $host_id
+# host_attrib_version[] # Version of attributes
+#
+# Arrays indexed by $peer_id:
+# deadline[] # Received keepalives reset this
+# peer_outq[] # Messages to be sent to $peer_id
+# peer_so[] # Socket id for peering with $peer_id
+#
+# Peering messages:
+# open initialization_vector
+# announce host_id path
+# withdraw host_id
+# attributes host_id version attributes
+# notify error
+# keepalive
+#
+# Default parameters:
+#
+# my_port = 2001 # The port we listen on
+# min_peers = 2 # Do not initiate more peerings after this many
+# max_peers = 4 # Randomly drop peerings if have more than this
+# debug_level = 1 # 0 = error; 1 = warn; 2 = notice; 3 = debug
+# keepalive_timo = 30 # Send a keepalive every timo seconds
+# announce_delay = 10 # Dampens count to infinity on withdrawals
+#
+
+
+proc PrunePeerings {} {
+ global peers max_peers my_node_id active_hosts
+
+ while { [llength $peers] > $max_peers } {
+ # We have too many peerings open.
+ # We put a strong bias towards destroying our most recent peerings,
+ # however from time to time we destroy the oldest peering that
+ # we have.
+ if { [expr int(rand() * 10)] > 1 } {
+ set victim [lindex $peers end]
+ } else {
+ set victim [lindex $peers 0]
+ }
+ DestroyPeering $victim
+ }
+}
+
+
+proc DumpAttribs {} {
+ global active_hosts debug_level host_attrib_tbl
+
+ if { $debug_level < 3 } {
+ return
+ }
+
+ log_debug "dumping current attribs:"
+ foreach host_id [lsort $active_hosts] {
+ set first 1
+ foreach attrib [lsort $host_attrib_tbl($host_id)] {
+ if { $first } {
+ puts "$host_id: $attrib"
+ set first 0
+ } else {
+ puts " $attrib"
+ }
+ }
+ }
+}
+
+
+proc DumpTable {} {
+ global active_hosts host_paths_tbl peers dead_hosts my_ip
+ global debug_level
+
+ if { $debug_level < 3 } {
+ return
+ }
+
+ log_debug "dumping current table:"
+ foreach host_id [lsort $active_hosts] {
+ set first 1
+ foreach path [lsort $host_paths_tbl($host_id)] {
+ if { $first } {
+ puts "$host_id: $path"
+ set first 0
+ } else {
+ puts " $path"
+ }
+ }
+ }
+ puts "[llength $peers] peering(s): [lsort $peers]"
+ log_debug "active ([llength $active_hosts]): [lsort $active_hosts]"
+ log_debug "dead ([llength $dead_hosts]): [lsort $dead_hosts]"
+}
+
+
+proc KeepaliveLoop {} {
+ global peers deadline keepalive_timo
+
+ set now [clock seconds]
+ set expiration_time [expr $now - 2 * $keepalive_timo]
+ foreach p_i $peers {
+ if { $deadline($p_i) < $expiration_time } {
+ VerboseDestroyPeering $p_i "Keepalive timeout exceeded."
+ } else {
+ EnqueueCmd $p_i "keepalive"
+ }
+ }
+
+ after [expr $keepalive_timo * 1000] KeepaliveLoop
+}
+
+
+proc MyAttribLoop {} {
+ global peers host_attrib_version host_attrib_tbl
+ global my_host_id
+
+ #set data [list "peers \{$peers\}" "[exec sysctl vm.loadavg]"]
+ set data [list "peers \{$peers\}"]
+ if { $data != $host_attrib_tbl($my_host_id) } {
+ set host_attrib_tbl($my_host_id) $data
+ incr host_attrib_version($my_host_id)
+ set version $host_attrib_version($my_host_id)
+ set msg "attributes $my_host_id $version $data"
+ foreach p_i $peers {
+ EnqueueCmd $p_i $msg
+ }
+ }
+
+ after 30000 MyAttribLoop
+}
+
+
+proc InitiateNewPeerings {} {
+ global peers dead_hosts active_hosts
+ global my_ip my_port min_peers max_peers my_host_id
+
+ set n_peers [llength $peers]
+ if { $n_peers < [expr 2 * $max_peers] } {
+ if { $n_peers < $min_peers } {
+ set i [lsearch -exact $active_hosts $my_host_id]
+ set host_pool \
+ [concat $dead_hosts [lreplace $active_hosts $i $i]]
+ } else {
+ set host_pool $dead_hosts
+ }
+ # Select a new potential peering and attempt a connection
+ set i [expr int(rand() * [llength $host_pool])]
+ set newpeer [lindex $host_pool $i]
+ if { $newpeer != "" && [lsearch -exact $peers $newpeer] < 0 } {
+ if { ![catch \
+ {socket -myaddr $my_ip -async $newpeer $my_port} so] } {
+ fileevent $so writable [list OpenPeering $so $newpeer $my_port]
+ }
+ }
+ }
+
+ after [expr (int(pow($n_peers + rand() + 2, 3) + [llength $active_hosts]) \
+ * 1000)] InitiateNewPeerings
+}
+
+
+proc OpenPeering { so peer port } {
+ fileevent $so writable ""
+ fconfigure $so -blocking 0 -buffering line
+ set iv [expr int(rand() * 2000000000)]
+ if { [catch { puts $so "open $iv" }] } {
+ catch { close $so }
+ } else {
+ # XXX TODO: Configure TX encription using key + iv
+ AddPeering $so $peer $port
+ }
+}
+
+
+proc AddPeering { so peer_id port } {
+ global peers max_peers min_peers my_host_id
+ global active_hosts host_paths_tbl host_attr_tbl
+ global peer_outq peer_so deadline
+ global fanout_queued announce_delay
+
+ fconfigure $so -blocking 0 -buffering line
+ set n_peers [llength $peers]
+ if { $n_peers >= $max_peers && \
+ [lsearch $active_hosts $peer_id] >= 0 } {
+ # Too many peerings already, drop this request
+ log_notice "Too many peerings, rejecting (active) $peer_id."
+ catch { puts $so "notify {Too many peerings active, rejected}" }
+ catch { close $so }
+ } elseif { [lsearch -exact $peers $peer_id] < 0 && \
+ $peer_id != $my_host_id} {
+ if { $n_peers >= [expr 2 * $max_peers] } {
+ # Too many peerings already, drop this request
+ log_notice "Too many peerings, rejecting (new) $peer_id."
+ catch { puts $so "notify {Too many peerings active, rejected}" }
+ catch { close $so }
+ }
+ log_notice "Peering with $peer_id established."
+ set peer_outq($peer_id) {}
+ set peer_so($peer_id) $so
+ set deadline($peer_id) [clock seconds]
+ # Dump all current state to new peer
+ foreach host_id $active_hosts {
+ if { [lsearch -exact $fanout_queued $host_id] < 0 } {
+ set best_path [BestPath $host_id]
+ if { $best_path != "" } {
+ EnqueueAnnounce $peer_id $host_id "$my_host_id $best_path"
+ }
+ }
+ }
+ if { [catch { fileevent $so readable [list ReadSocket $peer_id] }] } {
+ catch { close $so }
+ return
+ }
+ lappend peers $peer_id
+ if { $n_peers >= $max_peers } {
+ after 10000 PrunePeerings
+ }
+ } else {
+ # This peering is already active.
+ log_notice "Rejecting duplicate connection from $peer_id."
+ close $so
+ }
+}
+
+
+proc VerboseDestroyPeering { peer_id msg } {
+ global peer_so
+
+ set so $peer_so($peer_id)
+ log_warning $msg
+ catch {puts $so "notify \{$msg\}"}
+ DestroyPeering $peer_id
+}
+
+
+proc DestroyPeering { peer_id } {
+ global peers peer_so
+ global active_hosts dead_hosts
+ global host_paths_tbl
+
+ close $peer_so($peer_id)
+ set i [lsearch $peers $peer_id]
+ set peers [lreplace $peers $i $i]
+ # Clean up all stale state!
+ foreach host_id $active_hosts {
+ if { [lsearch $host_paths_tbl($host_id) "$peer_id *"] >= 0 } {
+ ProcessWithdraw $peer_id $host_id
+ }
+ }
+ log_notice "Peering with $peer_id disconnected."
+}
+
+
+proc EnqueueCmd { peer_id cmd } {
+ global peer_outq peer_so
+
+ set so $peer_so($peer_id)
+
+ if { [llength $peer_outq($peer_id)] == 0 } {
+ if { [catch { fileevent $so writable [list WriteSocket $peer_id] }] } {
+ #XXX
+ #DestroyPeering $peer_id
+ }
+ }
+ lappend peer_outq($peer_id) $cmd
+}
+
+
+proc WriteSocket { peer_id } {
+ global peer_outq peer_so
+
+ set so $peer_so($peer_id)
+
+ if { [llength $peer_outq($peer_id)] > 0 } {
+ if { [catch {puts $so [lindex $peer_outq($peer_id) 0]}] } {
+ DestroyPeering $peer_id
+ } else {
+ set peer_outq($peer_id) [lrange $peer_outq($peer_id) 1 end]
+ }
+ } else {
+ fileevent $so writable ""
+ }
+}
+
+
+proc ReadSocket { peer_id } {
+ global peers peer_outq peer_so my_host_id deadline
+ global active_hosts dead_hosts host_paths_tbl host_attr_tbl
+
+ set so $peer_so($peer_id)
+
+ if { [gets $so line] < 0} {
+ if { [eof $so] } {
+ DestroyPeering $peer_id
+ return
+ }
+ } else {
+ set deadline($peer_id) [clock seconds]
+ set cmd [lindex $line 0]
+ set host_id [lindex $line 1]
+ switch $cmd {
+ open {
+ # XXX TODO: Configure RX encription using key + iv
+ }
+ announce {
+ ProcessAnnounce $peer_id $host_id [lindex $line 2] $line
+ }
+ withdraw {
+ ProcessWithdraw $peer_id $host_id
+ }
+ attributes {
+ ProcessAttributes \
+ $peer_id $host_id [lindex $line 2] [lrange $line 3 end]
+ }
+ notify {
+ log_warning "$peer_id terminated: $host_id"
+ DestroyPeering $peer_id
+ }
+ keepalive {
+ }
+ default {
+ VerboseDestroyPeering $peer_id \
+ "Unrecognized command from $peer_id: $line"
+ return
+ }
+ }
+ global last_received
+ set last_received($host_id) $line
+ }
+}
+
+
+proc ProcessFanoutQueue {} {
+ global peers announce_delay my_host_id
+ global fanout_queued fanout_last_best
+
+ set current_time [clock seconds]
+
+ set remain_queued {}
+ foreach host_id $fanout_queued {
+ set old_update_time [lindex $fanout_last_best($host_id) 0]
+ if { [expr $current_time - $old_update_time] > $announce_delay } {
+
+ set best_path [lindex $fanout_last_best($host_id) 1]
+ set exclude_peer [lindex $fanout_last_best($host_id) 2]
+ set path "$my_host_id $best_path"
+ foreach peer_id $peers {
+ if { $peer_id != $exclude_peer } {
+ EnqueueAnnounce $peer_id $host_id $path
+ }
+ }
+ } else {
+ lappend remain_queued $host_id
+ }
+ }
+
+ if { $remain_queued != "" } {
+ after 1000 ProcessFanoutQueue
+ }
+ set fanout_queued $remain_queued
+}
+
+
+proc FanoutAnnounce { host_id exclude_peer old_best_path } {
+ global peers announce_delay my_host_id
+ global fanout_queued fanout_last_best
+ global announce_delay
+
+ set current_time [clock seconds]
+ set best_path [BestPath $host_id]
+
+ if { $old_best_path != "" && \
+ [llength $best_path] >= [llength $old_best_path] && \
+ [expr $current_time - [lindex $fanout_last_best($host_id) 0]] \
+ <= $announce_delay } {
+ set fanout_last_best($host_id) \
+ [list $current_time $best_path $exclude_peer]
+
+ lappend fanout_queued $host_id
+ if { $fanout_queued == $host_id } {
+ after $announce_delay ProcessFanoutQueue
+ }
+ return
+ }
+
+ set fanout_last_best($host_id) \
+ [list $current_time $best_path $exclude_peer]
+ set path "$my_host_id $best_path"
+ foreach peer_id $peers {
+ if { $peer_id != $exclude_peer } {
+ EnqueueAnnounce $peer_id $host_id $path
+ }
+ }
+}
+
+
+proc FanoutWithdraw { host_id exclude_peer old_best_path } {
+ global peers
+ global fanout_queued fanout_last_best
+
+ if { $old_best_path == "" } {
+ return
+ }
+
+ set i [lsearch -exact $fanout_queued $host_id]
+ if { $i >= 0 } {
+ set last_exclude_peer [lindex $fanout_last_best($host_id) 2]
+ set fanout_queued [lreplace $fanout_queued $i $i]
+ set fanout_last_best($host_id) [list [clock seconds] $old_best_path]
+ return
+ }
+
+ set fanout_last_best($host_id) [list [clock seconds] $old_best_path]
+ foreach peer_id $peers {
+ if { $peer_id != $exclude_peer } {
+ EnqueueWithdraw $peer_id $host_id
+ }
+ }
+}
+
+
+proc EnqueueAnnounce { peer_id host_id path } {
+ global host_paths_tbl my_host_id
+
+ EnqueueCmd $peer_id "announce $host_id \{$path\}"
+}
+
+
+proc EnqueueWithdraw { peer_id host_id } {
+ global peer_outq
+
+ set i [lsearch peer_outq($peer_id) "announce $host_id *"]
+ if { $i >= 0 } {
+ set peer_outq($peer_id) [lreplace $peer_outq($peer_id) $i $i]
+ } else {
+ EnqueueCmd $peer_id "withdraw $host_id"
+ }
+}
+
+
+proc ProcessAttributes { peer_id host_id version data } {
+ global host_attrib_version host_attrib_tbl
+ global peers
+
+ if { $version > $host_attrib_version($host_id) } {
+ set host_attrib_version($host_id) $version
+ set host_attrib_tbl($host_id) $data
+
+ set msg "attributes $host_id $version $data"
+ foreach p_i $peers {
+ if { $p_i != $peer_id } {
+ EnqueueCmd $p_i $msg
+ }
+ }
+ }
+}
+
+
+proc ProcessAnnounce { peer_id host_id path line } {
+ global my_host_id active_hosts dead_hosts host_paths_tbl peers
+ global host_attrib_tbl host_attrib_version debug_level
+
+ set old_active $active_hosts
+ if { [lindex $path 0] != $peer_id } {
+ # Each received route must begin with the peer's id.
+ VerboseDestroyPeering $peer_id \
+ "rcvd bogus route from $peer_id for $host_id: $path"
+ return
+ } elseif { [lsearch -exact $path $host_id] != \
+ [expr [llength $path] - 2] } {
+ # Each received route must have the target as the last element.
+ VerboseDestroyPeering $peer_id \
+ "rcvd bogus route from $peer_id for $host_id: $path"
+ return
+ } elseif { [lsearch -exact $path "."] < 0 } {
+ # Each route must terminate with a dot sign.
+ VerboseDestroyPeering $peer_id \
+ "rcvd bogus route from $peer_id for $host_id: $path"
+ return
+ } elseif { [lsearch $active_hosts $host_id] < 0 } {
+ # Not in table
+ set host_paths_tbl($host_id) {}
+ lappend host_paths_tbl($host_id) "$path"
+ # Remove from list of dead_hosts, append to active_hosts
+ set i [lsearch $dead_hosts $host_id]
+ if { $i >= 0 } {
+ set dead_hosts [lreplace $dead_hosts $i $i]
+ }
+ lappend active_hosts $host_id
+ set host_attrib_version($host_id) 0
+ set host_attrib_tbl($host_id) {}
+ if { [BestPath $host_id] != "" } {
+ FanoutAnnounce $host_id $peer_id ""
+ }
+ } else {
+ # Found, compute new best path
+ if { [lsearch $host_paths_tbl($host_id) "$peer_id *"] >= 0 } {
+ # Duplicate entry from single peer - this must never happen!
+ VerboseDestroyPeering $peer_id \
+ "rcvd duplicate announcement from $peer_id: $path"
+ return
+ }
+ set old_best [BestPath $host_id]
+ set old_peer [lindex $old_best 0]
+ lappend host_paths_tbl($host_id) $path
+ set new_best [BestPath $host_id]
+ if { $old_best != $new_best } {
+ FanoutWithdraw $host_id $old_peer $old_best
+ if { $new_best != "" } {
+ FanoutAnnounce $host_id $peer_id $old_best
+ } else {
+ log_error "new_best for $host_id empty"
+ log_error \
+ "current table($host_id) $host_paths_tbl($host_id)"
+ exit
+ }
+ }
+ }
+
+ if { $debug_level >= 2 && $old_active != $active_hosts } {
+ log_notice "active ([llength $active_hosts]): [lsort $active_hosts]"
+ log_notice "dead ([llength $dead_hosts]): [lsort $dead_hosts]"
+ }
+}
+
+
+proc ProcessWithdraw { peer_id host_id } {
+ global my_host_id active_hosts dead_hosts host_paths_tbl peers
+ global host_attrib_tbl host_attrib_version debug_level
+
+ set old_active $active_hosts
+ if { [lsearch -exact $active_hosts $host_id] < 0 } {
+ # Not in table. Drop peering.
+ VerboseDestroyPeering $peer_id \
+ "withdraw from $peer_id for $host_id - not in table"
+ return
+ } elseif { [lsearch $host_paths_tbl($host_id) "$peer_id *"] < 0 } {
+ # Found in table, but never received from $peer_id
+ VerboseDestroyPeering $peer_id \
+ "withdraw from $peer_id for $host_id - was never announced"
+ return
+ }
+ set old_best [BestPath $host_id]
+ set old_peer [lindex $old_best 0]
+ set i [lsearch $host_paths_tbl($host_id) "$peer_id *"]
+ if { $i >= 0 } {
+ set host_paths_tbl($host_id) \
+ [lreplace $host_paths_tbl($host_id) $i $i]
+ } else {
+ log_error "ProcessWithdraw: XXX how did we get here?"
+ exit
+ }
+
+ set new_best [BestPath $host_id]
+ set new_peer [lindex $new_best 0]
+
+ if { $old_best != $new_best } {
+ FanoutWithdraw $host_id $old_peer $old_best
+ if { $new_best != "" } {
+ FanoutAnnounce $host_id $new_peer $old_best
+ }
+ }
+
+ if { $host_paths_tbl($host_id) == {} } {
+ # Last reference gone, remove from active list
+ set i [lsearch -exact $active_hosts $host_id]
+ set active_hosts [lreplace $active_hosts $i $i]
+ lappend dead_hosts $host_id
+ set host_attrib_version($host_id) 0
+ set host_attrib_tbl($host_id) {}
+ }
+
+ if { $debug_level >= 2 && $old_active != $active_hosts } {
+ log_notice "active ([llength $active_hosts]): [lsort $active_hosts]"
+ log_notice "dead ([llength $dead_hosts]): [lsort $dead_hosts]"
+ }
+}
+
+
+proc BestPath { host_id } {
+ global host_paths_tbl my_host_id
+
+ set path {}
+ set path_len 99999
+
+ foreach p_i [lsort $host_paths_tbl($host_id)] {
+ if { [lsearch $p_i $my_host_id] >= 0 || [lsearch $p_i "."] < 0} {
+ # Loop avoidance
+ continue
+ }
+ set p_i_len [llength $p_i]
+ if { $p_i_len > 32 } {
+ # Insanely long paths are symptoms of withdrawal-triggered
+ # oscillations -> ignore those
+ continue
+ }
+ if { $p_i_len < $path_len } {
+ set path $p_i
+ set path_len $p_i_len
+ } elseif { $path_len == $p_i_len & $p_i < $path } {
+ set path $p_i
+ }
+ }
+ return $path
+}
+
+
+proc log_debug { str } {
+ global my_host_id debug_level
+ if { $debug_level >= 3 } {
+ puts "$my_host_id DEBUG: $str"
+ }
+}
+
+
+proc log_notice { str } {
+ global my_host_id debug_level
+ if { $debug_level >= 2 } {
+ puts "$my_host_id NOTICE: $str"
+ }
+}
+
+
+proc log_warning { str } {
+ global my_host_id debug_level
+
+ if { $debug_level >= 1 } {
+ puts "$my_host_id WARNING: $str"
+ }
+}
+
+
+proc log_error { str } {
+ global my_host_id debug_level
+ puts "$my_host_id ERROR: $str"
+}
+
+
+#
+# Global init
+#
+
+set my_port 2001
+set min_peers 2
+set max_peers 4
+set debug_level 1
+set keepalive_timo 30
+set announce_delay 10
+
+set my_ip [string trim \
+ [exec netstat -in -f inet | head -2 | tail -1 | cut -c26-40]]
+set my_host_id $my_ip
+
+# Parse command-line options
+while { [string index [set str [lindex $argv 0]] 0] == "-" } {
+ switch [string range $str 1 end] {
+ "d" {
+ set debug_level [lindex $argv 1]
+ set argv [lrange $argv 2 end]
+ }
+ default {
+ puts "Usage: $argv0 \[-d debug_level\] \[initial_peers\]"
+ exit
+ }
+ }
+}
+
+set dead_hosts [lrange $argv 0 end]
+if { [lsearch $dead_hosts $my_host_id] >= 0 } {
+ set dead_hosts {}
+}
+
+set active_hosts $my_host_id
+set host_paths_tbl($my_host_id) "."
+set host_attrib_version($my_host_id) 0
+set host_attrib_tbl($my_host_id) {}
+
+set peers {}
+set fanout_queued {}
+
+# Seed our random number generator
+set seed [clock clicks]
+append seed [clock seconds] [pid]
+
+log_notice "Listening on $my_ip port $my_port"
+
+#socket -server OpenPeering -myaddr $my_ip $my_port
+socket -server OpenPeering $my_port
+
+InitiateNewPeerings
+KeepaliveLoop
+#MyAttribLoop
+
+vwait forever
+
+