Following more and more European regulations (NIS2,...), we would like to export Audit logs( object creation, deletion, modification,...) from Centreon to another place (Elastic).
A stream connector can’t be used for that because these logs doesn’t pass by Broker but are directly in Database.
Could we have a solution to export easily these logs (Connector, script,...)?
Audit Logs
Page 1 / 1
Hi @LaurentV, currently configuration log changes are only store into centreon_storage database for following tables:
log_action
log_action_modification (details of each modification)
Do you want a manual export over a given period or an automatic export in a dedicated format (syslog, etc.)?
Regards
New→Discussion ongoing
Hi,
for us, we have 2 solutions:
automatic export to a syslog (and our Elastic can import this log)
Possibility to export the data via a CLAPI command (to possibly automate it)
Take the easiest way for you, we’ll adapt :-)
Regards,
Laurent
Discussion ongoing→Needs Votes
I integrated all centreon logs in ElasticSearch -audit logs (Administration Logs) -event logs (Monitoring > Event Logs > Event Logs) -system logs (Monitoring > Event Logs > System Logs)
Prerequisite
# ---- Pre-requis: # CREATE USER 'elastic'@'localhost' IDENTIFIED BY 'xxxx'; # GRANT SELECT ON centreon_storage.log_action TO 'elastic'@'localhost'; # GRANT SELECT ON centreon_storage.log_action_modification TO 'elastic'@'localhost'; # GRANT SELECT ON centreon_storage.services TO 'elastic'@'localhost'; # GRANT SELECT ON centreon_storage.hosts TO 'elastic'@'localhost'; # GRANT SELECT ON centreon_storage.logs TO 'elastic'@'localhost'; # GRANT SELECT ON centreon.contact TO 'elastic'@'localhost'; # # ---- Modif DB Mysql 8.0 # this is incompatible with sql_mode=only_full_group_by # SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY','')); # SET @@sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY',''));
SQL statement for audit logs (to get exactly same information as web GUI)
SELECT 'audit_log' as log_type, date_format(from_unixtime(action_log_date), '%Y-%m-%d %H:%i:%s') as datetime, CASE WHEN e.action_type = 'c' THEN 'changed' WHEN e.action_type = 'a' THEN 'added' WHEN e.action_type = 'd' THEN 'deleted' WHEN e.action_type = 'disable' THEN 'disabled' WHEN e.action_type = 'enable' THEN 'enabled' END AS action_type, e.action_log_id, e.object_type, #e.object_id, CASE WHEN e.object_type = 'service' AND h.name IS NOT NULL THEN CONCAT(h.name, ' / ', e.object_name) WHEN e.object_type = 'service' AND h.name IS NULL THEN CONCAT(' Linked resource has changed / ', e.object_name) WHEN e.object_type != 'service' THEN e.object_name END AS object_name, CONCAT(c.contact_name, ' (', c.contact_alias, ')') as author, CONCAT('{', GROUP_CONCAT( CONCAT(d.field_name, ':', CASE WHEN d.field_name LIKE '%passwd%' THEN '<obfuscated>' ELSE d.field_value END) SEPARATOR ','), '}') AS detail_json FROM centreon_storage.log_action e LEFT JOIN centreon.contact c ON e.log_contact_id = c.contact_id LEFT JOIN centreon_storage.services s ON e.object_type = 'service' AND e.object_id = s.service_id LEFT JOIN centreon_storage.hosts h ON e.object_type = 'service' AND s.host_id = h.host_id LEFT JOIN centreon_storage.log_action_modification d ON e.action_log_id = d.action_log_id GROUP BY e.action_log_id, e.object_type, object_name, e.action_type ORDER BY e.action_log_id DESC
SQL statement for event+system logs (to get exactly same information as web GUI)
SELECT CASE WHEN msg_type='0' THEN 'event_log' WHEN msg_type='1' THEN 'event_log' WHEN msg_type='2' THEN 'event_log' WHEN msg_type='3' THEN 'event_log' WHEN msg_type='4' THEN 'system_log' WHEN msg_type='5' THEN 'system_log' ELSE msg_type END AS log_type, date_format(from_unixtime(ctime), '%Y-%m-%d %H:%i:%s') as datetime, log_id,instance_name,host_name,service_description, CASE #msg_type=0 alerts services WHEN status = '0' and msg_type='0' THEN 'OK' WHEN status = '1' and msg_type='0' THEN 'WARNING' WHEN status = '2' and msg_type='0' THEN 'CRITICAL' WHEN status = '3' and msg_type='0' THEN 'UNKNOWN' #msg_type=1 alerts hosts WHEN status = '0' and msg_type='1' THEN 'UP' WHEN status = '1' and msg_type='1' THEN 'DOWN' #msg_type=2 notif service WHEN status = '0' and msg_type='2' THEN 'OK' WHEN status = '1' and msg_type='2' THEN 'WARNING' WHEN status = '2' and msg_type='2' THEN 'CRITICAL' WHEN status = '3' and msg_type='2' THEN 'UNKNOWN' #msg_type=3 notif host WHEN status = '0' and msg_type='3' THEN 'UP' WHEN status = '1' and msg_type='3' THEN 'DOWN' #msg_type=4,5 system WHEN status = '0' and msg_type='4' THEN '' WHEN status = '0' and msg_type='5' THEN '' ELSE status END AS statusGUI, CASE WHEN type = '0' and msg_type='0' THEN 'SOFT' WHEN type = '1' and msg_type='0' THEN 'HARD' WHEN type = '0' and msg_type='1' THEN 'SOFT' WHEN type = '1' and msg_type='1' THEN 'HARD' WHEN type = '0' and msg_type='2' THEN 'NOTIF' WHEN type = '0' and msg_type='3' THEN 'NOTIF' WHEN type = '0' and msg_type='4' THEN '' WHEN type = '0' and msg_type='5' THEN '' ELSE type END AS typeGUI, retry,output, #juste pour info host_id, service_id, msg_type, type, status FROM centreon_storage.logs ORDER BY ctime DESC,log_id DESC
Then deploy elastic agent on the server where database is located. Add to the policy of this agent an “SQL input” integration (don’t forget to tick “Display beta integrations”)
In SQL queries field (transofrm SQL statement in one line) :
- query: SELECT 'audit_log' as log_type, date_format(from_unixtime(action_log_date), '%Y-%m-%d %H:%i:%s') as datetime, CASE WHEN e.action_type = 'c' THEN 'changed' WHEN e.action_type = 'a' THEN 'added' WHEN e.action_type = 'd' THEN 'deleted' WHEN e.action_type = 'disable' THEN 'disabled' WHEN e.action_type = 'enable' THEN 'enabled' END AS action_type, e.action_log_id, e.object_type, CASE WHEN e.object_type = 'service' AND h.name IS NOT NULL THEN CONCAT(h.name, ' / ', e.object_name) WHEN e.object_type = 'service' AND h.name IS NULL THEN CONCAT(' Linked resource has changed / ', e.object_name) WHEN e.object_type != 'service' THEN e.object_name END AS object_name, CONCAT(c.contact_name, ' (', c.contact_alias, ')') as author, CONCAT('{', GROUP_CONCAT(CONCAT(d.field_name, ':', CASE WHEN d.field_name LIKE '%passwd%' THEN '<obfuscated>' ELSE d.field_value END) SEPARATOR ','), '}') AS detail_json FROM centreon_storage.log_action e LEFT JOIN centreon.contact c ON e.log_contact_id = c.contact_id LEFT JOIN centreon_storage.services s ON e.object_type = 'service' AND e.object_id = s.service_id LEFT JOIN centreon_storage.hosts h ON e.object_type = 'service' AND s.host_id = h.host_id LEFT JOIN centreon_storage.log_action_modification d ON e.action_log_id = d.action_log_id GROUP BY e.action_log_id, e.object_type, object_name, e.action_type ORDER BY e.action_log_id DESC response_format: table - query: SELECT CASE WHEN msg_type='0' THEN 'event_log' WHEN msg_type='1' THEN 'event_log' WHEN msg_type='2' THEN 'event_log' WHEN msg_type='3' THEN 'event_log' WHEN msg_type='4' THEN 'system_log' WHEN msg_type='5' THEN 'system_log' ELSE msg_type END AS log_type, date_format(from_unixtime(ctime), '%Y-%m-%d %H:%i:%s') as datetime, log_id,instance_name,host_name,service_description, CASE WHEN status = '0' and msg_type='0' THEN 'OK' WHEN status = '1' and msg_type='0' THEN 'WARNING' WHEN status = '2' and msg_type='0' THEN 'CRITICAL' WHEN status = '3' and msg_type='0' THEN 'UNKNOWN' WHEN status = '0' and msg_type='1' THEN 'UP' WHEN status = '1' and msg_type='1' THEN 'DOWN' WHEN status = '0' and msg_type='2' THEN 'OK' WHEN status = '1' and msg_type='2' THEN 'WARNING' WHEN status = '2' and msg_type='2' THEN 'CRITICAL' WHEN status = '3' and msg_type='2' THEN 'UNKNOWN' WHEN status = '0' and msg_type='3' THEN 'UP' WHEN status = '1' and msg_type='3' THEN 'DOWN' WHEN status = '0' and msg_type='4' THEN '' WHEN status = '0' and msg_type='5' THEN '' ELSE status END AS statusGUI, CASE WHEN type = '0' and msg_type='0' THEN 'SOFT' WHEN type = '1' and msg_type='0' THEN 'HARD' WHEN type = '0' and msg_type='1' THEN 'SOFT' WHEN type = '1' and msg_type='1' THEN 'HARD' WHEN type = '0' and msg_type='2' THEN 'NOTIF' WHEN type = '0' and msg_type='3' THEN 'NOTIF' WHEN type = '0' and msg_type='4' THEN '' WHEN type = '0' and msg_type='5' THEN '' ELSE type END AS typeGUI, retry,output,host_id,service_id,msg_type,type,status FROM centreon_storage.logs ORDER BY ctime DESC,log_id DESC response_format: table
-one processor to rename sql.metrics into centreon field -one fingerprint processor to use action_log_id as unique value to avoid duplicates for audit_log -one fingerprint processor to use log_id as unique value to avoid duplicates for event_log/system_log -one date processor to convert “datetime” sql field into elastic compatible timestamp
Once all logs has been synchronized, re-edit your “sql input” integration to add a WHERE clause to fetch only records newer than less 1 hour (to avoid overloading sql server)
- query: SELECT 'audit_log' as log_type, date_format(from_unixtime(action_log_date), '%Y-%m-%d %H:%i:%s') as datetime, CASE WHEN e.action_type = 'c' THEN 'changed' WHEN e.action_type = 'a' THEN 'added' WHEN e.action_type = 'd' THEN 'deleted' WHEN e.action_type = 'disable' THEN 'disabled' WHEN e.action_type = 'enable' THEN 'enabled' END AS action_type, e.action_log_id, e.object_type, CASE WHEN e.object_type = 'service' AND h.name IS NOT NULL THEN CONCAT(h.name, ' / ', e.object_name) WHEN e.object_type = 'service' AND h.name IS NULL THEN CONCAT(' Linked resource has changed / ', e.object_name) WHEN e.object_type != 'service' THEN e.object_name END AS object_name, CONCAT(c.contact_name, ' (', c.contact_alias, ')') as author, CONCAT('{', GROUP_CONCAT(CONCAT(d.field_name, ':', CASE WHEN d.field_name LIKE '%passwd%' THEN '<obfuscated>' ELSE d.field_value END) SEPARATOR ','), '}') AS detail_json FROM centreon_storage.log_action e LEFT JOIN centreon.contact c ON e.log_contact_id = c.contact_id LEFT JOIN centreon_storage.services s ON e.object_type = 'service' AND e.object_id = s.service_id LEFT JOIN centreon_storage.hosts h ON e.object_type = 'service' AND s.host_id = h.host_id LEFT JOIN centreon_storage.log_action_modification d ON e.action_log_id = d.action_log_id WHERE from_unixtime(action_log_date) >= NOW() - INTERVAL 1 HOUR GROUP BY e.action_log_id, e.object_type, object_name, e.action_type ORDER BY e.action_log_id DESC response_format: table - query: SELECT CASE WHEN msg_type='0' THEN 'event_log' WHEN msg_type='1' THEN 'event_log' WHEN msg_type='2' THEN 'event_log' WHEN msg_type='3' THEN 'event_log' WHEN msg_type='4' THEN 'system_log' WHEN msg_type='5' THEN 'system_log' ELSE msg_type END AS log_type, date_format(from_unixtime(ctime), '%Y-%m-%d %H:%i:%s') as datetime, log_id,instance_name,host_name,service_description, CASE WHEN status = '0' and msg_type='0' THEN 'OK' WHEN status = '1' and msg_type='0' THEN 'WARNING' WHEN status = '2' and msg_type='0' THEN 'CRITICAL' WHEN status = '3' and msg_type='0' THEN 'UNKNOWN' WHEN status = '0' and msg_type='1' THEN 'UP' WHEN status = '1' and msg_type='1' THEN 'DOWN' WHEN status = '0' and msg_type='2' THEN 'OK' WHEN status = '1' and msg_type='2' THEN 'WARNING' WHEN status = '2' and msg_type='2' THEN 'CRITICAL' WHEN status = '3' and msg_type='2' THEN 'UNKNOWN' WHEN status = '0' and msg_type='3' THEN 'UP' WHEN status = '1' and msg_type='3' THEN 'DOWN' WHEN status = '0' and msg_type='4' THEN '' WHEN status = '0' and msg_type='5' THEN '' ELSE status END AS statusGUI, CASE WHEN type = '0' and msg_type='0' THEN 'SOFT' WHEN type = '1' and msg_type='0' THEN 'HARD' WHEN type = '0' and msg_type='1' THEN 'SOFT' WHEN type = '1' and msg_type='1' THEN 'HARD' WHEN type = '0' and msg_type='2' THEN 'NOTIF' WHEN type = '0' and msg_type='3' THEN 'NOTIF' WHEN type = '0' and msg_type='4' THEN '' WHEN type = '0' and msg_type='5' THEN '' ELSE type END AS typeGUI, retry,output,host_id,service_id,msg_type,type,status FROM centreon_storage.logs WHERE from_unixtime(ctime) >= NOW() - INTERVAL 1 HOUR ORDER BY ctime DESC,log_id DESC response_format: table
Then finally 3 “views” with colorized fields -centreon-audit (with log_type=audit_log) -centreon-system (with log_type=system_log) -centreon-event (with log_type=event_log)
I hope these instructions can help those trying to integrate centreon logs into elastic
Hello,
thanks for your instructions.
For all logs except audit, we use a stream connector.
But for Audit, Centreon Support explained us we can lose our support if we touch to database structure, as user creation So we’re blocked...
Hello, You can use “centreon” db user instead of creating a new db user ? I’m just using select statements, shouldn’t be a big deal ?