Initial commit

This commit is contained in:
David Fairbanks 2024-06-14 10:35:03 -04:00
commit d222270fee
Signed by: david-fairbanks42
GPG Key ID: 23A5FB8E1952978F
6 changed files with 1572 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
.env
vendor
.idea

24
composer.json Normal file
View File

@ -0,0 +1,24 @@
{
"name": "maker-dave/elastic-push",
"description": "Pull remote log files from S3 and inject new lines into local elasticsearch or analysis",
"type": "project",
"version": "1.0",
"require": {
"php": "^8.1",
"ext-curl": "*",
"aws/aws-sdk-php": "^3.314",
"vlucas/phpdotenv": "^5.6"
},
"license": "private",
"autoload": {
"psr-4": {
"MakerDave\\ElasticPush\\": "src/"
}
},
"authors": [
{
"name": "david-fairbanks42",
"email": "david@makerdave.com"
}
]
}

1293
composer.lock generated Normal file

File diff suppressed because it is too large Load Diff

2
config/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
*
!.gitignore

24
main.php Executable file
View File

@ -0,0 +1,24 @@
#!/usr/bin/env php
<?php
require __DIR__ . '/vendor/autoload.php';
use MakerDave\ElasticPush\LogSource;
use Dotenv\Dotenv;
chdir(__DIR__);
$env = Dotenv::createImmutable(__DIR__);
$env->safeLoad();
foreach (new DirectoryIterator(__DIR__ . '/config') as $file) {
if ($file->isDot() || $file->isDir()) {
continue;
}
if ($file->getExtension() !== 'json') {
continue;
}
$worker = new LogSource($file->getPathname());
$worker->work();
}

226
src/LogSource.php Normal file
View File

@ -0,0 +1,226 @@
<?php
/**
* LogSource.php
*
* @copyright 2024 Fairbanks Publishing LLC
* @license Proprietary
*/
namespace MakerDave\ElasticPush;
use Aws\Exception\AwsException;
use Aws\S3\S3Client;
use DateTime;
use DateTimeZone;
use Exception;
/**
* Class LogSource
*
* @author David Fairbanks <david@makerdave.com>
* @version 1.0
*/
class LogSource
{
protected const BULK_SIZE = 50;
protected object $config;
protected array $files;
protected DateTime|null $lastDate = null;
protected string|null $lastHash = null;
protected bool $lastFound = false;
protected array $events;
protected int $skipCount = 0;
protected int $sendCount = 0;
protected S3Client $client;
/**
* @param string $configPath
* @throws Exception
*/
public function __construct(
protected string $configPath
) {
$config = json_decode(file_get_contents($configPath));
if (!is_object($config)) {
throw new Exception('Unable to parse log configuration file: ' . $configPath);
}
$this->config = $config;
if (isset($this->config->file) && is_string($this->config->file)) {
$this->files = [$this->config->file];
} elseif (isset($this->config->file) && is_array($this->config->file)) {
$this->files = $this->config->file;
} elseif (isset($this->config->files) && is_string($this->config->files)) {
$this->files = [$this->config->files];
} else {
$this->files = $this->config->files;
}
if (count($this->files) > 1) {
$this->files = array_unique($this->files);
rsort($this->files);
}
if ($this->config->date !== null) {
$lastDate = date_create($this->config->date);
if ($lastDate === false) {
throw new Exception('Unable to parse last date: ' . $configPath);
}
$lastDate->setTimezone(new DateTimeZone('UTC'));
$this->lastDate = $lastDate;
}
if ($this->config->hash !== null) {
$this->lastHash = $this->config->hash;
} else {
$this->lastFound = true;
}
$this->client = new S3Client([
'profile' => 'david-fairbanks42',
'version' => 'latest',
'region' => 'us-east-1',
]);
}
public function work(): void
{
foreach ($this->files as $file) {
if ($this->getFile($file) === false) {
continue;
}
$this->processFile($file);
unlink($_ENV['TEMP_LOG_STORE_DIR'] . DIRECTORY_SEPARATOR . $file);
}
if (! empty($this->events)) {
$this->send();
}
echo sprintf(
"%s\n\tSkipped %d log entries\n\tSent %d log entries\n",
basename($this->configPath),
$this->skipCount,
$this->sendCount
);
if ($this->config->hash !== null) {
file_put_contents($this->configPath, json_encode($this->config));
}
}
protected function getFile(string $file): bool
{
if (! file_exists($_ENV['TEMP_LOG_STORE_DIR'])) {
mkdir($_ENV['TEMP_LOG_STORE_DIR']);
}
if (! is_dir($_ENV['TEMP_LOG_STORE_DIR'])) {
throw new Exception('Target directory ' . $_ENV['TEMP_LOG_STORE_DIR'] . ' exists but is not a directory');
}
try {
$this->client->getObject([
'Bucket' => 'fairbanks-publishing-cloudtrail',
'Key' => 'app-logs/' . $file,
'SaveAs' => $_ENV['TEMP_LOG_STORE_DIR'] . DIRECTORY_SEPARATOR . $file,
]);
return true;
} catch (AwsException $e) {
echo sprintf('ERROR: Unable get log file %s: %s', $file, $e->getMessage());
return false;
}
}
protected function processFile(string $file): void
{
$fh = fopen($_ENV['TEMP_LOG_STORE_DIR'] . DIRECTORY_SEPARATOR . $file, 'r');
$lineNumber = 0;
while (!feof($fh)) {
$lineNumber ++;
$line = trim(fgets($fh));
if (empty($line)) {
continue;
}
$data = json_decode($line, true);
if (! is_array($data)) {
continue;
}
$date = date_create($data['time']);
if ($date === false) {
continue;
}
$date->setTimezone(new DateTimeZone('UTC'));
if ($this->lastDate !== null && $date < $this->lastDate) {
$this->skipCount ++;
continue;
}
$hash = hash('sha1', '(' . $lineNumber . ') ' . $line);
if ($this->lastFound === false && $this->lastDate !== null && $date == $this->lastDate && $hash == $this->lastHash) {
$this->lastFound = true;
$this->skipCount ++;
continue;
}
if ($this->lastFound === false && $this->lastDate !== null && $date > $this->lastDate) {
$this->lastFound = true;
}
if ($this->lastFound === false) {
continue;
}
$data['@timestamp'] = date_format($date, 'c');
$data['time'] = date_format($date, 'c');
$this->events[] = ['hash' => $hash, 'date' => $data['time'], 'line' => json_encode($data)];
$this->sendCount ++;
$this->config->date = $data['time'];
$this->config->hash = $hash;
if (count($this->events) == self::BULK_SIZE) {
$this->send();
$this->events = [];
}
}
fclose($fh);
}
protected function send(): void
{
$body = '';
foreach ($this->events as $item) {
$body .= json_encode(['create' => ['_index' => $this->config->index, '_id' => $item['hash']]]) . "\n";
$body .= $item['line'] . "\n";
}
$curl = curl_init();
curl_setopt_array($curl, [
CURLOPT_URL => $_ENV['ELASTICSEARCH_HOST'] . '/_bulk',
CURLOPT_HTTPHEADER => [
'Content-Type: application/json',
'Authorization: ApiKey ' . $_ENV['ELASTICSEARCH_API_KEY'],
],
CURLOPT_RETURNTRANSFER => true,
CURLOPT_SSL_VERIFYPEER => false,
CURLOPT_SSL_VERIFYHOST => false,
CURLOPT_USERAGENT => 'MakerDave Log Sender 1.0',
CURLOPT_POST => true,
CURLOPT_POSTFIELDS => $body,
//CURLOPT_VERBOSE => true,
]);
$resp= curl_exec($curl);
curl_close($curl);
$response = json_decode($resp, true);
if ($response['errors']) {
echo $resp . "\n";
}
}
}