#!/usr/bin/ruby.ruby3.4 
# Copyright 2010-2018 The NATS Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'optparse'
require 'net/http'
require 'uri'
require 'io/wait'

require 'rubygems'
require 'json'

def usage
  puts "Usage: nats-top [-s server_uri] [-m local monitor port] [-n num_connections] [-d delay_secs] [--sort sort_by]"
  puts "--sort_options for more help"
  exit
end

$valid_sort_options = ['pending_size', 'msgs_to', 'msgs_from', 'bytes_to', 'bytes_from', 'subs']

def sort_options_help
  puts "Available sort_by options: #{$valid_sort_options.join(', ')}."
  puts "E.g. #{$0} -s bytes_to"
  exit
end

args = ARGV.dup
opts_parser = OptionParser.new do |opts|
  opts.on('-s server_uri')      { |server|   $nats_server = server }
  opts.on('-m local_port')      { |port|     $nats_port = port.to_i }
  opts.on('-n num_connections') { |num|      $num_connections = num.to_i }
  opts.on('-d delay')           { |delay|    $delay = delay.to_f }
  opts.on('--sort sort_by')     { |sort_key| $sort_key = sort_key }
  opts.on('--sort_options')     { sort_options_help }
  opts.on('-h')     { usage }
  opts.on('--help') { usage }
end
args = opts_parser.parse!(args)

DEFAULT_MONITOR_PORT = 9222
DEFAULT_NUM_CONNECTIONS = 10
DEFAULT_DELAY = 1 #sec
DEFAULT_SORT = 'pending_size'

$nats_port = DEFAULT_MONITOR_PORT if $nats_port.nil?
$num_connections = DEFAULT_NUM_CONNECTIONS if $num_connections.nil?
$nats_server = "http://localhost:#{$nats_port}" if $nats_server.nil?

$nats_server = "http://#{$nats_server}" unless $nats_server.start_with?('http')

$delay = DEFAULT_DELAY if $delay.nil?
$sort_key = DEFAULT_SORT if $sort_key.nil?
$sort_key.downcase!

unless $valid_sort_options.include?($sort_key)
  puts "Invalid sort_by argument: #{$sort_key}"
  sort_options_help
end

varz_uri = URI.parse("#{$nats_server}/varz")
connz_uri = URI.parse("#{$nats_server}/connz?n=#{$num_connections}&s=#{$sort_key}")

def psize(size, prec=1)
  return 'NA' unless size
  return sprintf("%.#{prec}f", size) if size < 1024
  return sprintf("%.#{prec}fK", size/1024.0) if size < (1024*1024)
  return sprintf("%.#{prec}fM", size/(1024.0*1024.0)) if size < (1024*1024*1024)
  return sprintf("%.#{prec}fG", size/(1024.0*1024.0*1024.0))
end

def clear_screen
  print "\e[H\e[2J"
end

['TERM', 'INT'].each { |s| trap(s) {  clear_screen; exit! } }

in_last_msgs  = in_last_bytes = 0
out_last_msgs = out_last_bytes = 0

poll = Time.now
first = true

while true
  begin

    varz_response = Net::HTTP::get_response(varz_uri)
    varz = JSON.parse(varz_response.body, :symbolize_keys => true, :symbolize_names => true)

    # Simple rates
    delta_in_msgs, in_last_msgs     = varz[:in_msgs] - in_last_msgs, varz[:in_msgs]
    delta_in_bytes, in_last_bytes   = varz[:in_bytes] - in_last_bytes, varz[:in_bytes]
    delta_out_msgs, out_last_msgs   = varz[:out_msgs] - out_last_msgs, varz[:out_msgs]
    delta_out_bytes, out_last_bytes = varz[:out_bytes] - out_last_bytes, varz[:out_bytes]

    now = Time.now
    tdelta, poll = now - poll, now

    unless first
      rate_in_msgs = delta_in_msgs / tdelta
      rate_in_bytes = delta_in_bytes / tdelta
      rate_out_msgs = delta_out_msgs / tdelta
      rate_out_bytes = delta_out_bytes / tdelta
    end

    connz_response = Net::HTTP::get_response(connz_uri)
    connz = JSON.parse(connz_response.body, :symbolize_keys => true, :symbolize_names => true)

    clear_screen

    puts  "\nServer:"
    puts  "  Load: CPU: #{varz[:cpu]}%  Memory: #{psize(varz[:mem])}"
    print "  In:   Msgs: #{psize(varz[:in_msgs])}  Bytes: #{psize(varz[:in_bytes])}"
    puts  "  Msgs/Sec: #{psize(rate_in_msgs)}  Bytes/Sec: #{psize(rate_in_bytes)}"

    print "  Out:  Msgs: #{psize(varz[:out_msgs])}  Bytes: #{psize(varz[:out_bytes])}"
    puts  "  Msgs/Sec: #{psize(rate_out_msgs)}  Bytes/Sec: #{psize(rate_out_bytes)}"

    puts "\nConnections: #{psize(connz[:num_connections], 0)}"

    conn_t = "  %-20s %-8s %-6s  %-10s  %-10s  %-10s  %-10s  %-10s\n"
    printf(conn_t, 'HOST', 'CID', 'SUBS', 'PENDING', 'MSGS_TO', 'MSGS_FROM', 'BYTES_TO', 'BYTES_FROM')

    connz[:connections].each do |conn|
      printf(conn_t, "#{conn[:ip]}:#{conn[:port]}",
                     conn[:cid],
                     psize(conn[:subscriptions]),
                     psize(conn[:pending_size]),
                     psize(conn[:out_msgs]),
                     psize(conn[:in_msgs]),
                     psize(conn[:out_bytes]),
                     psize(conn[:in_bytes])
             )
    end
    puts

    first = false

    sleep($delay)

  rescue => e
    puts "Error: #{e}"
    exit(1)
  end

end



