#!/usr/bin/php
<?php
/**
 * Data compression tool
 *
 * This is a tool designed to decrease data resolution based on
 * it's age. Lower resolution results in less data and thus may
 * save a lot of storage space
 *
 * Script must be executed with bin as current workdir
 * Script was only tested with MySQL-Storage
 * Script should only be executed if you've got a recent backup
 *
 * By default we assume the following resolution scheme:
 *   Newer than 7 Days      Keep Original
 *   Older than 7 Days      Data point per 1 Minute
 *   Older than 30 Days     Data point per 5 Minutes
 *   Older than 6 Month     Data point per 15 Minutes
 *   Older than 1 Year      Data point per 30 Minutes
 * You can set your own scheme for all or specific data points at the
 * bottom of this file
 *
 * By default this script saves its state in /tmp/vzcompress2/. You may want
 * to move this files to a location that is not cleaned on reboot. If the files
 * are not present or caching is turned off the script will scan the complete
 * database. If you often insert/import historic data you may want to turn
 * this off. Configuration can be found near the end of this file.
 *
 * @author Florian Knodt <adlerweb@adlerweb.info>
 * @author Andreas Goetz <cpuidle@gmx.de>
 * @copyright Copyright (c) 2011-2020, The volkszaehler.org project
 * @license https://www.gnu.org/licenses/gpl-3.0.txt GNU General Public License version 3
 */

/**
 * This file is part of volkzaehler.org
 *
 * volkzaehler.org is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * any later version.
 *
 * volkzaehler.org is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with volkszaehler.org. If not, see <http://www.gnu.org/licenses/>.
 */

use Volkszaehler\Util;
use Volkszaehler\Definition;
use Doctrine\DBAL;

define('VZ_DIR', realpath(__DIR__ . '/..'));

require VZ_DIR . '/lib/bootstrap.php';

class VZcompress2 {
	/**
	 * @var DBAL\Connection database connection
	 */
	protected $conn;

	protected $config;
	private $channels;

	private $purgecounter = 0;
	private $timestr = 'Y-m-d H:i:s';

	public function __construct($config = array()) {
		$this->config = array_replace(
			array(
				'compressscheme' => array(
					'default' => array( 				// Definition for all other channels
						(7*24*60*60)    => (1*60),      // Older than 7 Days      Datapoint per 1 Minute
						(30*24*60*60)   => (5*60),      // Older than 30 Days     Datapoint per 5 Minutes
						(6*30*24*60*60) => (15*60),     // Older than 6 Month     Datapoint per 15 Minutes
						(365*24*60*60)  => (30*60),     // Older than 1 Year      Datapoint per 30 Minutes
					)
				),
				'verbose' => true,
				'caching' => false,
				'sleep' => 0,
			),
			$config
		);

		$this->cache_init();

		$this->conn = DBAL\DriverManager::getConnection(Util\Configuration::read('db'));

		// SELECT * FROM entities WHERE class = 'channel' plus title property
		$this->channels = $this->sql_query(
			"SELECT entities.*, properties.value AS name FROM entities INNER JOIN properties ON properties.entity_id = entities.id WHERE properties.pkey = ? AND class = ?",
			array('title', 'channel')
		);
	}

	/*
	 * Cache functions
	 */
	private function cache_init() {
		if ($this->config['caching']) {
			if (substr($this->config['caching'], -1) != '/') $this->config['caching'].='/';

			if (file_exists($this->config['caching'])) {
				if (!is_dir($this->config['caching'])) {
					trigger_error('Can not cache to '.$this->config['caching'].' - Not a directory', E_USER_WARNING);
					$this->config['caching'] = false;
				}
				if (!is_writable($this->config['caching'])) {
					trigger_error('Can not cache to'.$this->config['caching'].' - Not writable', E_USER_WARNING);
					$this->config['caching'] = false;
				}
			}
			else {
				if (!mkdir($this->config['caching'], 0755, true)) {
					trigger_error('Can not cache to'.$this->config['caching'].' - Could not create directory', E_USER_WARNING);
					$this->config['caching'] = false;
				}
			}
		}
	}

	private function cache_write($chanid, $timebase, $last) {
		if (!$this->config['caching']) return false;
		if ($timebase == 0 || $last == 0) return false;

		file_put_contents($this->config['caching'].$chanid.'.'.$timebase, $last);
	}

	private function cache_read($chanid, $timebase) {
		if (!$this->config['caching']) return false;
		if (!file_exists($this->config['caching'].$chanid.'.'.$timebase)) return false;

		return (float)file_get_contents($this->config['caching'].$chanid.'.'.$timebase);
	}

	/*
	 * Database functions
	 */
	protected function sql_query($sql, $data = array()) {
		usleep($this->config['sleep']);
		return $this->conn->fetchAll($sql, $data);
	}

	protected function sql_exec($sql, $data = array()) {
		usleep($this->config['sleep']);
		return $this->conn->executeQuery($sql, $data);
	}

	/*
	 * Output functions
	 */
	private function strftime($time = null) {
		return date($this->timestr, intval($time));
	}

	private function out($str, $delim = "\n") {
		echo $delim . $this->strftime() . ' - ' . $str;
	}

	public function run() {
		$start = time();
		$count = 0;

		foreach ($this->channels AS $channel) {
			if ($this->skipChannel($channel)) continue;

			$this->out('Processing channel '.$channel['uuid'].' ('.$channel['name'].')...');
			$this->compressChannel($channel);
			$count++;
		}

		$this->out('Done. Purged '.$this->purgecounter.' data points from '.$count.' channels in '.(time()-$start).' seconds');
	}

	private function skipChannel($channel) {
		if (isset($this->config['channels']) && count($this->config['channels'])) {
			if (!in_array($channel['uuid'], $this->config['channels'])) return true;
		}
		return false;
	}

	private function compressChannel($channel) {
		if (null == ($definition = Definition\EntityDefinition::get($channel['type']))) {
			trigger_error('Could not find definition for type '.$channel['type'], E_USER_WARNING);
			return false;
		}

		// interpreter class - provides grouping function
		$interpreter = $definition->interpreter;

		// Detect compressscheme
		if (isset($this->config['compressscheme'][$channel['uuid']])) {
			$cs = $this->config['compressscheme'][$channel['uuid']];
		}
		else {
			$cs = $this->config['compressscheme']['default'];
		}

		// Prepare compressscheme
		ksort($cs);
		$times = array_keys($cs);
		$times[] = 0;

		$timestamp = time(); // Local timestamp should be consistent during our transactions

		// Run compression passes
		for ($i=0; $i<count($times)-1; $i++) {
			if ($cs[$times[$i]] == 0) continue;

			// Step 1: Detect oldest and newest dataset
			$datatimes = $this->sql_query(
				"SELECT MIN(timestamp) AS min, MAX(timestamp) AS max FROM data WHERE channel_id = ? AND timestamp <= ? AND timestamp > ?",
				array($channel['id'], ($timestamp-$times[$i])*1000, ($times[$i+1] > 0) ? ($timestamp-$times[$i+1])*1000 : 0)
			);

			if ((float)$datatimes[0]['max'] == 0) {
				$this->out('  Skipping compression pass for data points between '.$this->strftime($timestamp-$times[$i+1]).' and '.$this->strftime($timestamp-$times[$i]).' using a '.$cs[$times[$i]].' seconds window: No data points found');
				continue;
			}

			// Caching
			$from = (float)$datatimes[0]['min'];
			$lastrun = (float)$this->cache_read($channel['id'], $times[$i]);

			if ($lastrun && (float)$lastrun >= $from) {
				$this->out('  Skipping data points between '.$this->strftime($from/1000).' and '.$this->strftime((float)$lastrun/1000).' (Cached)');
				(float)$datatimes[0]['min'] = $lastrun;
			}

			$this->out('  Compressing data points between '.$this->strftime($from/1000).' and '.$this->strftime((float)$datatimes[0]['max']/1000).' using a '.$cs[$times[$i]].' seconds window');

			// Step 2: Loop new possible timeframes
			$curtime = (float)$datatimes[0]['min'];
			$lastpurgecount = $this->purgecounter;

			$steps = ((float)$datatimes[0]['max']/1000 - $from/1000) / $cs[$times[$i]];
			if ($steps == 0) continue;

			$step = 0;
			$passstart = time();

			do {
				// Step 2.1: Increase timestamps
				$lastcurtime = $curtime;
				$curtime += $cs[$times[$i]]*1000;
				$step++;

				// Print status
				if ($this->config['verbose']) {
					$this->out('    Processing: '.$this->strftime($lastcurtime/1000).' - '.$this->strftime($curtime/1000).' ('.round(100/$steps*$step).'%)...', "\r");
				}

				// Step 2.1: Get new Value for timeframe
				$newset = $this->sql_query(
					"SELECT " . $interpreter::groupExprSQL("value") . " AS newval, COUNT(value) AS datapoints ".
					"FROM data WHERE channel_id = ? AND timestamp > ? AND timestamp <= ?",
					array($channel['id'], $lastcurtime, $curtime)
				);

				// Step 2.2: Skip if current timeframe has no or already just one datapoint
				if (count($newset) == 0 || $newset[0]['datapoints'] < 2) continue;

				// wrap inside transaction
				$this->conn->transactional(function() use ($channel, $newset, $curtime, $lastcurtime) {
					// Step 2.3: Delete old data points
					$this->sql_exec(
						'DELETE FROM data WHERE channel_id = ? AND timestamp > ? AND timestamp <= ?',
						array($channel['id'], $lastcurtime, $curtime)
					);
					$this->purgecounter += $newset[0]['datapoints']-1;

					// Step 2.4: Insert new Datapoint
					$this->sql_exec(
						'INSERT into data (timestamp,value,channel_id) values (?,?,?)',
						array($curtime-1, $newset[0]['newval'], $channel['id'])
					);
				});
			}
			while ($curtime <= (float)$datatimes[0]['max']);
			$this->out('    Removed '.($this->purgecounter-$lastpurgecount).' data points in '.(time()-$passstart).' seconds.', "\r");

			$this->cache_write($channel['id'], $times[$i], (float)$datatimes[0]['max']);
		}
	}
 }

/**
 * Sample Configuration
 */
$config = array(
	'verbose' => true,      				// Show times/percentage - should be disabled on slow TTYs
	'caching' => '/tmp/vzcompress2/', 		// Path or false
	'sleep' => 0, 							// Microseconds to sleep between requests

	'channels' => array(  					// If defined only these channels are compressed
		//	   'abcd-abcd-abcd', ...		// List of channel uuids
	),

	'compressscheme' => array(
		//  'abcd-abcdabcd' => array(      // Definition for channel with uuid 'abcd-abcdabcd'
		//      // ...see below...
		//  ),
		'default' => array( 				// Definition for all other channels
			(7*24*60*60)    => (1*60),      // Older than 7 Days      Datapoint per 1 Minute
			(30*24*60*60)   => (5*60),      // Older than 30 Days     Datapoint per 5 Minutes
			(6*30*24*60*60) => (15*60),     // Older than 6 Month     Datapoint per 15 Minutes
			(365*24*60*60)  => (30*60),     // Older than 1 Year      Datapoint per 30 Minutes
		)
	)
);

$compress = new VZcompress2($config);
$compress->run();

?>
