Uploading data
Getting your data ready for use by the Pipeless algorithms requires events be sent through the API in JSON format where an event is an object having a relationship to another object. (In graph db terms, this would be an entity/node with a relationship/edge to another entity/node).
Data schema
Objects and relationships are limited to a selection of common options, a few include: "user", "post", "video", "tag" and "liked", "viewed", "taggedWith", "followed", etc. The complete list can be seen in Objects & Relationship Types or in the API Reference docs. You can choose whatever object type and relationship type make the most sense for your data, as the algorithm runs based on the object types you choose, so for instance, picking "user" vs. "account" for an object type will not have an impact on how the algorithm runs.
How you choose to translate your data to object-relationship-object is up to you, though we recommend keeping things simple. For instance, if you have a 5-star review system, translating 4 or 5 stars to a "liked" relationship and 1 or 2 stars to "disliked" will produce dependable results.
Custom Naming
Sticking to standardized (non-custom) names for objects and relationships helps speed up our service. If you cannot find a way to make the current naming options work for you, let us know and we can work with you to add more to the list.
Identifying Users
For identifying users, we recommend having a unique user ID alphanumeric sequence. If you're concerned about restricting private information, try to avoid using an email address, phone number or other potentially revealing identifier. For anonymous users, you can set a temporary user ID unique identifier (IP address could be used here, but be aware that is typically considered PII) that can later be switched out using the Edit Object request if they go on to create an account or log in.
Authentication
Once if you have an app added (check out Adding an app if you haven't already), you'll need two things from the Pipeless dashboard Apps page: 1) your App ID and 2) your app API Key.
See the screenshot below for where to find those two values. (Note: Do not use these example values, you'll need to use your own listed under your app in the dashboard).
Create event requests
There are a few ways to get your historical data up into Pipeless so you can start testing our our recommendation and activity feed algorithms. You can use our built-in data importer, our google sheets uploader or directly use our API.
Data Importer
The Pipeless data importer is a quick and easy way to get your historical data into Pipeless for testing or to prep for production implementation, all without having to leave the Pipeless web dashboard.
Read about the Pipeless Data Importer
Google Sheets Uploader Add-On
Another option for uploading events is to try our Google Sheets Uploader Add-On. This lets you upload rows of event data from a spreadsheet in batches of 10 events per call, all without having to write any code.
Read about the Google Sheets Uploader Add-On
API Requests
Sending your data to Pipeless with POST requests can be done either as one call for single event or you can batch events so one call can include up to 10 events. Batching can help you reduce your monthly call limits.
With your app ID and app API token, you can start adding events using the Create Event endpoint or the Create Events Batch endpoint.
Create Event Call (single event)
Here's an example of sending a event to App ID "123" with an object user "Tim" having the relationship "interestedIn" with object skill "photography".
const data = JSON.stringify({
"event": {
"start_object": {
"id": "Tim",
"type": "user",
"created_on": "2020-09-20T21:57:06"
},
"relationship": {
"type": "interestedIn",
"created_on": "2020-09-28T09:31:18"
},
"end_object": {
"id": "photography",
"type": "skill",
"created_on": "2020-06-03T15:40:22"
}
},
"synchronous": false
});
const xhr = new XMLHttpRequest();
xhr.addEventListener("readystatechange", function () {
if (this.readyState === this.DONE) {
console.log(this.responseText);
}
});
xhr.open("POST", "https://api.pipeless.io/v1/apps/123/events");
xhr.setRequestHeader("accept", "application/json");
xhr.setRequestHeader("content-type", "application/json");
xhr.send(data);
import requests
url = "https://api.pipeless.io/v1/apps/123/events"
payload = {
"event": {
"start_object": {
"id": "Tim",
"type": "user",
"created_on": "2020-09-20T21:57:06"
},
"relationship": {
"type": "interestedIn",
"created_on": "2020-09-28T09:31:18"
},
"end_object": {
"id": "photography",
"type": "skill",
"created_on": "2020-06-03T15:40:22"
}
},
"synchronous": False
}
headers = {
"accept": "application/json",
"content-type": "application/json"
}
response = requests.request("POST", url, json=payload, headers=headers)
print(response.text)
require 'uri'
require 'net/http'
require 'openssl'
url = URI("https://api.pipeless.io/v1/apps/123/events")
http = Net::HTTP.new(url.host, url.port)
http.use_ssl = true
http.verify_mode = OpenSSL::SSL::VERIFY_NONE
request = Net::HTTP::Post.new(url)
request["accept"] = 'application/json'
request["content-type"] = 'application/json'
request.body = "{\"event\":{\"start_object\":{\"id\":\"Tim\",\"type\":\"user\",\"created_on\":\"2020-09-20T21:57:06\"},\"relationship\":{\"type\":\"interestedIn\",\"created_on\":\"2020-09-28T09:31:18\"},\"end_object\":{\"id\":\"photography\",\"type\":\"skill\",\"created_on\":\"2020-06-03T15:40:22\"}},\"synchronous\":false}"
response = http.request(request)
puts response.read_body
<?php
$curl = curl_init();
curl_setopt_array($curl, [
CURLOPT_URL => "https://api.pipeless.io/v1/apps/123/events",
CURLOPT_RETURNTRANSFER => true,
CURLOPT_ENCODING => "",
CURLOPT_MAXREDIRS => 10,
CURLOPT_TIMEOUT => 30,
CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
CURLOPT_CUSTOMREQUEST => "POST",
CURLOPT_POSTFIELDS => "{\"event\":{\"start_object\":{\"id\":\"Tim\",\"type\":\"user\",\"created_on\":\"2020-09-20T21:57:06\"},\"relationship\":{\"type\":\"interestedIn\",\"created_on\":\"2020-09-28T09:31:18\"},\"end_object\":{\"id\":\"photography\",\"type\":\"skill\",\"created_on\":\"2020-06-03T15:40:22\"}},\"synchronous\":false}",
CURLOPT_HTTPHEADER => [
"accept: application/json",
"content-type: application/json"
],
]);
$response = curl_exec($curl);
$err = curl_error($curl);
curl_close($curl);
if ($err) {
echo "cURL Error #:" . $err;
} else {
echo $response;
}
OkHttpClient client = new OkHttpClient();
MediaType mediaType = MediaType.parse("application/json");
RequestBody body = RequestBody.create(mediaType, "{\"event\":{\"start_object\":{\"id\":\"Tim\",\"type\":\"user\",\"created_on\":\"2020-09-20T21:57:06\"},\"relationship\":{\"type\":\"interestedIn\",\"created_on\":\"2020-09-28T09:31:18\"},\"end_object\":{\"id\":\"photography\",\"type\":\"skill\",\"created_on\":\"2020-06-03T15:40:22\"}},\"synchronous\":false}");
Request request = new Request.Builder()
.url("https://api.pipeless.io/v1/apps/123/events")
.post(body)
.addHeader("accept", "application/json")
.addHeader("content-type", "application/json")
.build();
Response response = client.newCall(request).execute();
curl --request POST \
--url https://api.pipeless.io/v1/apps/123/events \
--header 'accept: application/json' \
--header 'content-type: application/json' \
--data '{"event":{"start_object":{"id":"Tim","type":"user","created_on":"2020-09-20T21:57:06"},"relationship":{"type":"interestedIn","created_on":"2020-09-28T09:31:18"},"end_object":{"id":"photography","type":"skill","created_on":"2020-06-03T15:40:22"}},"synchronous":false}'
The Create Event (and Create Events Batch) request has an optional "single" parameter for the relationship that is defaulted to "true" which means that this exact relationship will override any previous "interestedIn" relationship between these two specific objects, which in this case would just update a modified_on timestamp. If you wanted to create multiple "interestedIn" relationships that have different timestamps, you could set that param to "false".
Read the Create Event ref docs here
Create Events Batched Call (multiple events)
This example for a batched event shows two events being added to App ID "123" with the same call (we support up to 10 events in one batched call). The first event show the object user "Tim" having the relationship "interestedIn" with the object skill "photography". The second event has object user "Tim" having the relationship "liked" with object article "Article 123".
const data = JSON.stringify({
"events": [
{
"start_object": {
"id": "Tim",
"type": "user",
"created_on": "2020-09-20T21:57:06"
},
"relationship": {
"type": "interestedIn",
"created_on": "2020-09-28T09:31:18"
},
"end_object": {
"id": "photography",
"type": "skill",
"created_on": "2020-06-03T15:40:22"
}
},
{
"start_object": {
"id": "Tim",
"type": "user",
"created_on": "2020-09-20T21:57:06"
},
"relationship": {
"type": "liked",
"created_on": "2020-09-24T014:20:11"
},
"end_object": {
"id": "Article 123",
"type": "article",
"created_on": "2020-07-05T05:51:02"
}
}
],
"synchronous": false
});
const xhr = new XMLHttpRequest();
xhr.addEventListener("readystatechange", function () {
if (this.readyState === this.DONE) {
console.log(this.responseText);
}
});
xhr.open("POST", "https://api.pipeless.io/v1/apps/123/events/batch");
xhr.setRequestHeader("accept", "application/json");
xhr.setRequestHeader("content-type", "application/json");
xhr.send(data);
import requests
url = "https://api.pipeless.io/v1/apps/123/events/batch"
payload = {
"events": [
{
"start_object": {
"id": "Tim",
"type": "user",
"created_on": "2020-09-20T21:57:06"
},
"relationship": {
"type": "interestedIn",
"created_on": "2020-09-28T09:31:18"
},
"end_object": {
"id": "photography",
"type": "skill",
"created_on": "2020-06-03T15:40:22"
}
},
{
"start_object": {
"id": "Tim",
"type": "user",
"created_on": "2020-09-20T21:57:06"
},
"relationship": {
"type": "liked",
"created_on": "2020-09-24T014:20:11"
},
"end_object": {
"id": "Article 123",
"type": "article",
"created_on": "2020-07-05T05:51:02"
}
}
],
"synchronous": False
}
headers = {
"accept": "application/json",
"content-type": "application/json"
}
response = requests.request("POST", url, json=payload, headers=headers)
print(response.text)
require 'uri'
require 'net/http'
require 'openssl'
url = URI("https://api.pipeless.io/v1/apps/123/events/batch")
http = Net::HTTP.new(url.host, url.port)
http.use_ssl = true
http.verify_mode = OpenSSL::SSL::VERIFY_NONE
request = Net::HTTP::Post.new(url)
request["accept"] = 'application/json'
request["content-type"] = 'application/json'
request.body = "{\"events\":[{\"start_object\":{\"id\":\"Tim\",\"type\":\"user\",\"created_on\":\"2020-09-20T21:57:06\"},\"relationship\":{\"type\":\"interestedIn\",\"created_on\":\"2020-09-28T09:31:18\"},\"end_object\":{\"id\":\"photography\",\"type\":\"skill\",\"created_on\":\"2020-06-03T15:40:22\"}},{\"start_object\":{\"id\":\"Tim\",\"type\":\"user\",\"created_on\":\"2020-09-20T21:57:06\"},\"relationship\":{\"type\":\"liked\",\"created_on\":\"2020-09-24T014:20:11\"},\"end_object\":{\"id\":\"Article 123\",\"type\":\"article\",\"created_on\":\"2020-07-05T05:51:02\"}}],\"synchronous\":false}"
response = http.request(request)
puts response.read_body
<?php
$curl = curl_init();
curl_setopt_array($curl, [
CURLOPT_URL => "https://api.pipeless.io/v1/apps/123/events/batch",
CURLOPT_RETURNTRANSFER => true,
CURLOPT_ENCODING => "",
CURLOPT_MAXREDIRS => 10,
CURLOPT_TIMEOUT => 30,
CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
CURLOPT_CUSTOMREQUEST => "POST",
CURLOPT_POSTFIELDS => "{\"events\":[{\"start_object\":{\"id\":\"Tim\",\"type\":\"user\",\"created_on\":\"2020-09-20T21:57:06\"},\"relationship\":{\"type\":\"interestedIn\",\"created_on\":\"2020-09-28T09:31:18\"},\"end_object\":{\"id\":\"photography\",\"type\":\"skill\",\"created_on\":\"2020-06-03T15:40:22\"}},{\"start_object\":{\"id\":\"Tim\",\"type\":\"user\",\"created_on\":\"2020-09-20T21:57:06\"},\"relationship\":{\"type\":\"liked\",\"created_on\":\"2020-09-24T014:20:11\"},\"end_object\":{\"id\":\"Article 123\",\"type\":\"article\",\"created_on\":\"2020-07-05T05:51:02\"}}],\"synchronous\":false}",
CURLOPT_HTTPHEADER => [
"accept: application/json",
"content-type: application/json"
],
]);
$response = curl_exec($curl);
$err = curl_error($curl);
curl_close($curl);
if ($err) {
echo "cURL Error #:" . $err;
} else {
echo $response;
}
OkHttpClient client = new OkHttpClient();
MediaType mediaType = MediaType.parse("application/json");
RequestBody body = RequestBody.create(mediaType, "{\"events\":[{\"start_object\":{\"id\":\"Tim\",\"type\":\"user\",\"created_on\":\"2020-09-20T21:57:06\"},\"relationship\":{\"type\":\"interestedIn\",\"created_on\":\"2020-09-28T09:31:18\"},\"end_object\":{\"id\":\"photography\",\"type\":\"skill\",\"created_on\":\"2020-06-03T15:40:22\"}},{\"start_object\":{\"id\":\"Tim\",\"type\":\"user\",\"created_on\":\"2020-09-20T21:57:06\"},\"relationship\":{\"type\":\"liked\",\"created_on\":\"2020-09-24T014:20:11\"},\"end_object\":{\"id\":\"Article 123\",\"type\":\"article\",\"created_on\":\"2020-07-05T05:51:02\"}}],\"synchronous\":false}");
Request request = new Request.Builder()
.url("https://api.pipeless.io/v1/apps/123/events/batch")
.post(body)
.addHeader("accept", "application/json")
.addHeader("content-type", "application/json")
.build();
Response response = client.newCall(request).execute();
curl --request POST \
--url https://api.pipeless.io/v1/apps/123/events/batch \
--header 'accept: application/json' \
--header 'content-type: application/json' \
--data '{"events":[{"start_object":{"id":"Tim","type":"user","created_on":"2020-09-20T21:57:06"},"relationship":{"type":"interestedIn","created_on":"2020-09-28T09:31:18"},"end_object":{"id":"photography","type":"skill","created_on":"2020-06-03T15:40:22"}},{"start_object":{"id":"Tim","type":"user","created_on":"2020-09-20T21:57:06"},"relationship":{"type":"liked","created_on":"2020-09-24T014:20:11"},"end_object":{"id":"Article 123","type":"article","created_on":"2020-07-05T05:51:02"}}],"synchronous":false}'
Read the Create Events Batch ref docs here.
You can always send these events up one by one using the single event call, but batching these together will help reduce your monthly call volume and associated costs, as well as helping you stay under the rate limits. You can also use the batched event call to send a single event, so if you'd like to only utilize the batched event endpoint, that would work too.
Rate Limiting
We employ rate limiting to maintain reliability of our services, so if you plan on sending a larger volume of data in a short period of time, please spread out those calls to avoid receiving an error from our servers due to rate limits (the response from the API will let you know if rate limit was hit). Current limits Create Event and Create Events Batch are 50 calls per second for async calls and 15 calls per second for sync calls (default is async).
Response Errors
When sending your event data up to Pipeless, any things don't go perfectly, there are a few response errors you might receive:
400 - Bad Request
Something is off with the request format, these errors will describe what specifically is off. Here are a couple of the most common and what you can do about them.
JSON Formatting
Take another look at your json formatting and refer to the API reference docs examples.
Invalid Characters
We have some limitations on characters that can be saved. Here's an example of using regular expressions to identify and remove any invalid characters from your strings:
// import replace
import re
// function takes string and returns string with non-matching characters removed
def char_check(string):
str_re = re.sub('[^A-Za-z0-9_$\/@\-!#%^&*()+=;:\x27"?<>\\\[\]{}\. ,]+', "", string)
return str_re
401 - Incorrect Authentication
Make sure you are using the right App ID in the endpoint URL and the corresponding API Key for authentication.
404 - App Not Found
Make sure you are using the right App ID in the endpoint URL. If you delete your app it will be permanently removed immediately, and you will also get this error.
429 - Rate Limit Exceeded
Current limits Create Event are 50 calls per second for async calls and 15 calls per second for sync calls (default is async). Create Events Batch is 45 calls per second for async calls and 10 calls per second for sync calls (default is async).
Example batch uploading with rate limiting
// import libraries
var request = require('request');
var limit = require('simple-rate-limiter');
var fs = require('fs');
// set rate limiting to 45 requests per 1000ms (1 second)
var request = limit(require("request")).to(45).per(1000);
// prepare your array of events
// for example an array of events like:
// '{"start_object":{"id":"Toy Story","type":"film"},"relationship":{"type":"taggedWith"},"end_object":{"id":"animation","type":"tag"}}'
var eventsArray;
// iterate through events to break into max batched groups
var batchSize = 10;
var tropesBatched = [];
for (var i = 0; i < eventsArray.length; i += batchSize) {
eventsBatched.push(eventsArray.slice(n,n+batchSize));
}
// iterate through max batched events break into batched events calls
for (let i = 0; i < eventsBatched.length; i++) {
var jsonPost = JSON.parse('{"events":['+eventsBatched[i]+']}');
// make sure to use your own App ID and not "123"
// set your own API Key in place of XXXX...XXXX
var options = {
'method': 'POST',
'url': 'https://api.pipeless.io/v1/apps/123/events/batch',
'headers': {
'Authorization': 'Bearer XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX',
'Content-Type': 'application/json'
},
body: JSON.stringify(jsonPost)
};
request(options, function (error, response) {
if (error) throw new Error(error);
});
}
Client libraries
The examples above can also be executed by utilizing our client libraries:
Node Client Library
PHP Client Library
Need help uploading your historical data?
If you're having trouble mapping your current data to the Pipeless event format, we can help out, just message us using the Intercom chat widget in the bottom right of your browser window, or email us at [email protected].
Updated almost 2 years ago