Skip to content

Data Collector API

Prepare your environment

Writing the script

First, import the py42 SDK client and required classes for building your Alert query:

import py42.sdk
from py42.sdk.queries.alerts.alert_query import AlertQuery
from py42.sdk.queries.alerts.filters import DateObserved

Then construct your query. This example will search for Alerts created within the past 3 days. See py42 documentation for details on how to customize your own query.

from datetime import datetime, timedelta

date_filter = DateObserved.on_or_after(datetime.utcnow() - timedelta(days=3))
alert_query = AlertQuery(date_filter)

Initialize the py42 SDK object with your API client details:

sdk = py42.sdk.from_api_client(
    host_address="https://console.us.code42.com",
    client_id="<your_client_id>",
    client_secret="<your_secret>"
)

Fetch the alerts with your query:

response = sdk.alerts.search(query)

The Sentinel HTTP endpoint accepts a list of JSON objects in the request body, to prepare the py42 response for sending to Sentinel, you just need to extract the alerts data from the py42 response and convert it to a JSON string:

import json

response = sdk.alerts.search(query)
body = json.dumps(response.data["alerts"])

Then we can use the Python example from the Data Collector API documentation to authenticate and send the Incydr Alerts to Sentinel. The example is recreated here with the py42 code replacing the example data:

import json
import requests
import datetime
import hashlib
import hmac
import base64

import py42.sdk
from py42.sdk.queries.alerts.alert_query import AlertQuery
from py42.sdk.queries.alerts.filters import DateObserved

# Update the customer ID to your Log Analytics workspace ID
customer_id = 'xxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx'

# For the shared key, use either the primary or the secondary Connected Sources client authentication key
shared_key = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

# This creates a new Custom Log type for Incydr Alerts
log_type = 'IncydrAlert'


#####################
##Incydr Alert Query#
#####################

date_filter = DateObserved.on_or_after(datetime.datetime.utcnow() - datetime.timedelta(days=3))
alert_query = AlertQuery(date_filter)

sdk = py42.sdk.from_api_client(
    host_address="https://console.us.code42.com",
    client_id="<your_client_id>",
    client_secret="<your_secret>"
)
response = sdk.alerts.search(query)
body = json.dumps(response.data["alerts"])

#####################
##Sentinel Functions#
#####################

# Build the API signature
def build_signature(customer_id, shared_key, date, content_length, method, content_type, resource):
    x_headers = 'x-ms-date:' + date
    string_to_hash = method + "\n" + str(content_length) + "\n" + content_type + "\n" + x_headers + "\n" + resource
    bytes_to_hash = bytes(string_to_hash, encoding="utf-8")
    decoded_key = base64.b64decode(shared_key)
    encoded_hash = base64.b64encode(hmac.new(decoded_key, bytes_to_hash, digestmod=hashlib.sha256).digest()).decode()
    authorization = "SharedKey {}:{}".format(customer_id,encoded_hash)
    return authorization

# Build and send a request to the POST API
def post_data(customer_id, shared_key, body, log_type):
    method = 'POST'
    content_type = 'application/json'
    resource = '/api/logs'
    rfc1123date = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
    content_length = len(body)
    signature = build_signature(customer_id, shared_key, rfc1123date, content_length, method, content_type, resource)
    uri = 'https://' + customer_id + '.ods.opinsights.azure.com' + resource + '?api-version=2016-04-01'

    headers = {
        'content-type': content_type,
        'Authorization': signature,
        'Log-Type': log_type,
        'x-ms-date': rfc1123date
    }

    response = requests.post(uri,data=body, headers=headers)
    if (response.status_code >= 200 and response.status_code <= 299):
        print('Accepted')
    else:
        print("Response code: {}".format(response.status_code))

post_data(customer_id, shared_key, body, log_type)

After the script is run, you should be able to query the IncydrAlert_CL log table and see the Incydr Alert data.