Skip to main content

Hello,

 

Following more and more European regulations (NIS2,...), we would like to export Audit logs( object creation, deletion, modification,...) from Centreon to another place (Elastic).

A stream connector can’t be used for that because these logs doesn’t pass by Broker but are directly in Database.

Could we have a solution to export easily these logs (Connector, script,...)?

 

Audit Logs

 

Hi @LaurentV, currently configuration log changes are only store into centreon_storage database for following tables:

  • log_action
  • log_action_modification (details of each modification)

Do you want a manual export over a given period or an automatic export in a dedicated format (syslog, etc.)?

Regards


NewDiscussion ongoing

Hi, 

for us, we have 2 solutions:

  • automatic export to a syslog (and our Elastic can import this log)
  • Possibility to export the data via a CLAPI command (to possibly automate it)

Take the easiest way for you, we’ll adapt :-)

 

Regards,

Laurent


Discussion ongoingNeeds Votes

I integrated all centreon logs in ElasticSearch
-audit logs (Administration Logs)
-event logs (Monitoring > Event Logs > Event Logs)
-system logs  (Monitoring > Event Logs > System Logs)


Prerequisite
 

# ---- Pre-requis:
# CREATE USER 'elastic'@'localhost' IDENTIFIED BY 'xxxx';
# GRANT SELECT ON centreon_storage.log_action TO 'elastic'@'localhost';
# GRANT SELECT ON centreon_storage.log_action_modification TO 'elastic'@'localhost';
# GRANT SELECT ON centreon_storage.services TO 'elastic'@'localhost';
# GRANT SELECT ON centreon_storage.hosts TO 'elastic'@'localhost';
# GRANT SELECT ON centreon_storage.logs TO 'elastic'@'localhost';
# GRANT SELECT ON centreon.contact TO 'elastic'@'localhost';
#
# ---- Modif DB Mysql 8.0
# this is incompatible with sql_mode=only_full_group_by
# SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY',''));
# SET @@sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY',''));


SQL statement for audit logs (to get exactly same information as web GUI)
​​​​​​

SELECT 
'audit_log' as log_type,
date_format(from_unixtime(action_log_date), '%Y-%m-%d %H:%i:%s') as datetime,
CASE
WHEN e.action_type = 'c' THEN 'changed'
WHEN e.action_type = 'a' THEN 'added'
WHEN e.action_type = 'd' THEN 'deleted'
WHEN e.action_type = 'disable' THEN 'disabled'
WHEN e.action_type = 'enable' THEN 'enabled'
END AS action_type,
e.action_log_id,
e.object_type,
#e.object_id,
CASE
WHEN e.object_type = 'service' AND h.name IS NOT NULL THEN CONCAT(h.name, ' / ', e.object_name)
WHEN e.object_type = 'service' AND h.name IS NULL THEN CONCAT(' Linked resource has changed / ', e.object_name)
WHEN e.object_type != 'service' THEN e.object_name
END AS object_name,
CONCAT(c.contact_name, ' (', c.contact_alias, ')') as author,
CONCAT('{', GROUP_CONCAT(
CONCAT(d.field_name, ':',
CASE
WHEN d.field_name LIKE '%passwd%' THEN '<obfuscated>'
ELSE d.field_value
END)
SEPARATOR ','), '}') AS detail_json
FROM centreon_storage.log_action e
LEFT JOIN centreon.contact c ON e.log_contact_id = c.contact_id
LEFT JOIN centreon_storage.services s ON e.object_type = 'service' AND e.object_id = s.service_id
LEFT JOIN centreon_storage.hosts h ON e.object_type = 'service' AND s.host_id = h.host_id
LEFT JOIN centreon_storage.log_action_modification d ON e.action_log_id = d.action_log_id
GROUP BY e.action_log_id, e.object_type, object_name, e.action_type
ORDER BY e.action_log_id DESC

SQL statement for event+system logs (to get exactly same information as web GUI)
 

SELECT 
CASE
WHEN msg_type='0' THEN 'event_log'
WHEN msg_type='1' THEN 'event_log'
WHEN msg_type='2' THEN 'event_log'
WHEN msg_type='3' THEN 'event_log'
WHEN msg_type='4' THEN 'system_log'
WHEN msg_type='5' THEN 'system_log'
ELSE msg_type
END AS log_type,
date_format(from_unixtime(ctime), '%Y-%m-%d %H:%i:%s') as datetime,
log_id,instance_name,host_name,service_description,
CASE
#msg_type=0 alerts services
WHEN status = '0' and msg_type='0' THEN 'OK'
WHEN status = '1' and msg_type='0' THEN 'WARNING'
WHEN status = '2' and msg_type='0' THEN 'CRITICAL'
WHEN status = '3' and msg_type='0' THEN 'UNKNOWN'
#msg_type=1 alerts hosts
WHEN status = '0' and msg_type='1' THEN 'UP'
WHEN status = '1' and msg_type='1' THEN 'DOWN'
#msg_type=2 notif service
WHEN status = '0' and msg_type='2' THEN 'OK'
WHEN status = '1' and msg_type='2' THEN 'WARNING'
WHEN status = '2' and msg_type='2' THEN 'CRITICAL'
WHEN status = '3' and msg_type='2' THEN 'UNKNOWN'
#msg_type=3 notif host
WHEN status = '0' and msg_type='3' THEN 'UP'
WHEN status = '1' and msg_type='3' THEN 'DOWN'
#msg_type=4,5 system
WHEN status = '0' and msg_type='4' THEN ''
WHEN status = '0' and msg_type='5' THEN ''
ELSE status
END AS statusGUI,
CASE
WHEN type = '0' and msg_type='0' THEN 'SOFT'
WHEN type = '1' and msg_type='0' THEN 'HARD'
WHEN type = '0' and msg_type='1' THEN 'SOFT'
WHEN type = '1' and msg_type='1' THEN 'HARD'
WHEN type = '0' and msg_type='2' THEN 'NOTIF'
WHEN type = '0' and msg_type='3' THEN 'NOTIF'
WHEN type = '0' and msg_type='4' THEN ''
WHEN type = '0' and msg_type='5' THEN ''
ELSE type
END AS typeGUI,
retry,output,
#juste pour info
host_id,
service_id,
msg_type,
type,
status
FROM centreon_storage.logs
ORDER BY ctime DESC,log_id DESC

Then deploy elastic agent on the server where database is located. Add to the policy of this agent an “SQL input” integration   (don’t forget to tick “Display beta integrations”)

 


 

 In SQL queries field (transofrm SQL statement in one line) :
 

- query: SELECT 'audit_log' as log_type, date_format(from_unixtime(action_log_date), '%Y-%m-%d %H:%i:%s') as datetime, CASE WHEN e.action_type = 'c' THEN 'changed' WHEN e.action_type = 'a' THEN 'added' WHEN e.action_type = 'd' THEN 'deleted' WHEN e.action_type = 'disable' THEN 'disabled' WHEN e.action_type = 'enable' THEN 'enabled' END AS action_type, e.action_log_id, e.object_type, CASE WHEN e.object_type = 'service' AND h.name IS NOT NULL THEN CONCAT(h.name, ' / ', e.object_name) WHEN e.object_type = 'service' AND h.name IS NULL THEN CONCAT(' Linked resource has changed / ', e.object_name) WHEN e.object_type != 'service' THEN e.object_name END AS object_name, CONCAT(c.contact_name, ' (', c.contact_alias, ')') as author, CONCAT('{', GROUP_CONCAT(CONCAT(d.field_name, ':', CASE WHEN d.field_name LIKE '%passwd%' THEN '<obfuscated>' ELSE d.field_value END) SEPARATOR ','), '}') AS detail_json FROM centreon_storage.log_action e LEFT JOIN centreon.contact c ON  e.log_contact_id = c.contact_id LEFT JOIN centreon_storage.services s ON e.object_type = 'service' AND e.object_id = s.service_id LEFT JOIN centreon_storage.hosts h ON e.object_type = 'service' AND s.host_id = h.host_id LEFT JOIN centreon_storage.log_action_modification d ON e.action_log_id = d.action_log_id GROUP BY e.action_log_id, e.object_type, object_name, e.action_type ORDER BY e.action_log_id DESC
response_format: table
- query: SELECT CASE WHEN msg_type='0' THEN 'event_log' WHEN msg_type='1' THEN 'event_log' WHEN msg_type='2' THEN 'event_log' WHEN msg_type='3' THEN 'event_log' WHEN msg_type='4' THEN 'system_log' WHEN msg_type='5' THEN 'system_log' ELSE msg_type END AS log_type, date_format(from_unixtime(ctime), '%Y-%m-%d %H:%i:%s') as datetime, log_id,instance_name,host_name,service_description, CASE WHEN status = '0' and msg_type='0' THEN 'OK' WHEN status = '1' and msg_type='0' THEN 'WARNING' WHEN status = '2' and msg_type='0' THEN 'CRITICAL' WHEN status = '3' and msg_type='0' THEN 'UNKNOWN' WHEN status = '0' and msg_type='1' THEN 'UP' WHEN status = '1' and msg_type='1' THEN 'DOWN' WHEN status = '0' and msg_type='2' THEN 'OK' WHEN status = '1' and msg_type='2' THEN 'WARNING' WHEN status = '2' and msg_type='2' THEN 'CRITICAL' WHEN status = '3' and msg_type='2' THEN 'UNKNOWN' WHEN status = '0' and msg_type='3' THEN 'UP' WHEN status = '1' and msg_type='3' THEN 'DOWN' WHEN status = '0' and msg_type='4' THEN '' WHEN status = '0' and msg_type='5' THEN '' ELSE status END AS statusGUI, CASE WHEN type = '0' and msg_type='0' THEN 'SOFT' WHEN type = '1' and msg_type='0' THEN 'HARD' WHEN type = '0' and msg_type='1' THEN 'SOFT' WHEN type = '1' and msg_type='1' THEN 'HARD' WHEN type = '0' and msg_type='2' THEN 'NOTIF' WHEN type = '0' and msg_type='3' THEN 'NOTIF' WHEN type = '0' and msg_type='4' THEN '' WHEN type = '0' and msg_type='5' THEN '' ELSE type END AS typeGUI, retry,output,host_id,service_id,msg_type,type,status FROM centreon_storage.logs ORDER BY ctime DESC,log_id DESC
response_format: table

 


Re-edit SQL input integration, add custom pipeline
 


Create a pipeline with

-one processor to rename sql.metrics into centreon field
-one fingerprint processor to use action_log_id as unique value to avoid duplicates for audit_log
-one fingerprint processor to use log_id as unique value to avoid duplicates for event_log/system_log
-one date processor to convert “datetime” sql field into elastic compatible timestamp
 



JSON for “metrics-centreon @ custom” pipeline

c
{
"rename": {
"field": "sql.metrics",
"target_field": "centreon"
}
},
{
"fingerprint": {
"fields": /
"centreon.action_log_id"
],
"target_field": "_id",
"if": "ctx.centreon.log_type == 'audit_log'",
"description": "fingerprint for audit logs"
}
},
{
"fingerprint": {
"fields": /
"centreon.action_log_id"
],
"target_field": "_id",
"if": "ctx.centreon.log_type == 'event_log' || ctx.centreon.log_type == 'system_log'",
"description": "fingerprint for event/system logs"
}
},
{
"date": {
"field": "centreon.datetime",
"formats":
"yyyy-MM-dd HH:mm:ss"
],
"timezone": "Europe/Zurich"
}
}
]


Create a dataview “metrics-centreon”
 



Once all logs has been synchronized, re-edit your “sql input” integration to add a WHERE clause to fetch only records newer than less 1 hour (to avoid overloading sql server)

 

- query: SELECT 'audit_log' as log_type, date_format(from_unixtime(action_log_date), '%Y-%m-%d %H:%i:%s') as datetime, CASE WHEN e.action_type = 'c' THEN 'changed' WHEN e.action_type = 'a' THEN 'added' WHEN e.action_type = 'd' THEN 'deleted' WHEN e.action_type = 'disable' THEN 'disabled' WHEN e.action_type = 'enable' THEN 'enabled' END AS action_type, e.action_log_id, e.object_type, CASE WHEN e.object_type = 'service' AND h.name IS NOT NULL THEN CONCAT(h.name, ' / ', e.object_name) WHEN e.object_type = 'service' AND h.name IS NULL THEN CONCAT(' Linked resource has changed / ', e.object_name) WHEN e.object_type != 'service' THEN e.object_name END AS object_name, CONCAT(c.contact_name, ' (', c.contact_alias, ')') as author, CONCAT('{', GROUP_CONCAT(CONCAT(d.field_name, ':', CASE WHEN d.field_name LIKE '%passwd%' THEN '<obfuscated>' ELSE d.field_value END) SEPARATOR ','), '}') AS detail_json FROM centreon_storage.log_action e LEFT JOIN centreon.contact c ON  e.log_contact_id = c.contact_id LEFT JOIN centreon_storage.services s ON e.object_type = 'service' AND e.object_id = s.service_id LEFT JOIN centreon_storage.hosts h ON e.object_type = 'service' AND s.host_id = h.host_id LEFT JOIN centreon_storage.log_action_modification d ON e.action_log_id = d.action_log_id WHERE from_unixtime(action_log_date) >= NOW() - INTERVAL 1 HOUR GROUP BY e.action_log_id, e.object_type, object_name, e.action_type ORDER BY e.action_log_id DESC
response_format: table
- query: SELECT CASE WHEN msg_type='0' THEN 'event_log' WHEN msg_type='1' THEN 'event_log' WHEN msg_type='2' THEN 'event_log' WHEN msg_type='3' THEN 'event_log' WHEN msg_type='4' THEN 'system_log' WHEN msg_type='5' THEN 'system_log' ELSE msg_type END AS log_type, date_format(from_unixtime(ctime), '%Y-%m-%d %H:%i:%s') as datetime, log_id,instance_name,host_name,service_description, CASE WHEN status = '0' and msg_type='0' THEN 'OK' WHEN status = '1' and msg_type='0' THEN 'WARNING' WHEN status = '2' and msg_type='0' THEN 'CRITICAL' WHEN status = '3' and msg_type='0' THEN 'UNKNOWN' WHEN status = '0' and msg_type='1' THEN 'UP' WHEN status = '1' and msg_type='1' THEN 'DOWN' WHEN status = '0' and msg_type='2' THEN 'OK' WHEN status = '1' and msg_type='2' THEN 'WARNING' WHEN status = '2' and msg_type='2' THEN 'CRITICAL' WHEN status = '3' and msg_type='2' THEN 'UNKNOWN' WHEN status = '0' and msg_type='3' THEN 'UP' WHEN status = '1' and msg_type='3' THEN 'DOWN' WHEN status = '0' and msg_type='4' THEN '' WHEN status = '0' and msg_type='5' THEN '' ELSE status END AS statusGUI, CASE WHEN type = '0' and msg_type='0' THEN 'SOFT' WHEN type = '1' and msg_type='0' THEN 'HARD' WHEN type = '0' and msg_type='1' THEN 'SOFT' WHEN type = '1' and msg_type='1' THEN 'HARD' WHEN type = '0' and msg_type='2' THEN 'NOTIF' WHEN type = '0' and msg_type='3' THEN 'NOTIF' WHEN type = '0' and msg_type='4' THEN '' WHEN type = '0' and msg_type='5' THEN '' ELSE type END AS typeGUI, retry,output,host_id,service_id,msg_type,type,status FROM centreon_storage.logs WHERE from_unixtime(ctime) >= NOW() - INTERVAL 1 HOUR ORDER BY ctime DESC,log_id DESC
response_format: table


Then finally 3 “views” with colorized fields
-centreon-audit (with log_type=audit_log)
-centreon-system  (with log_type=system_log)
-centreon-event   (with log_type=event_log)


 

 

 

 

 

I hope these instructions can help those trying to integrate centreon logs into elastic

Hello,

 

thanks for your instructions.

For all logs except audit, we use a stream connector.

But for Audit, Centreon Support explained us we can lose our support if we touch to database structure, as user creation 😕 So we’re blocked...


Hello,
You can use “centreon” db user instead of creating a new db user ?
I’m just using select statements, shouldn’t be a big deal ?