API Requests Examples

Understand how to execute an API call with the examples outlined in this section.

We will provide examples of one off API calls, so they authenticate every time. However, if you are running the following in a loop as part of a scheduled script, you don’t need to authenticate every time and you should cache and use a session ID.

Note:

If you try to authenticate more than 10 times in one minute, you will receive an error message.

Simple API call using Python

This is an example of a simple API request using Python. To make an API request to one of the special case APIs, use the attached notebook below this example. For both examples, you can use the MyGeotab Python wrapper to create the API object:
import json
import mygeotab

api = mygeotab.API(username=<your username>, password=<your password>, database=<your database>, server='altitudeserver.geotab.com')
api.authenticate()
request = {
	"method":"GetAltitudeData",
	"params": {
		"serviceName": "dna-altitude-general",
		"functionName": "getData",          
		"functionParameters":{
			"queryType": "getCityBoundaries", 
			"zones": [<your zones>]
		}
	}
}
results = api.call(method=request['method'], **request['params'])
Or more explicitly:
results = api.call(method="GetAltitudeData", 
                      serviceName="dna-altitude-general", 
                      functionName= "getData", 
                      functionParameters={
                        "queryType": "getCityBoundaries",
                        "zones": [<your zones>]
                      }
)

Special Case API call using Python

The following notebook is a Python example of how to make an API request in 3 steps, where create_job, wait_for_job_to_complete and get_data are just functions that make the API requests outlined in steps 1, 2, 3 above. Altitude Python API notebook

Special Case API call using Javascript

The following is a JavaScript example of how to make an API request in 3 steps, where createQueryJob, waitForBigQueryJobToComplete and fetchBigQueryData are just functions that make the API requests outlined in steps 1, 2, 3 above.
// install packages as required
// npm install mg-api-js
// mygeotab js documentation - https://github.com/Geotab/mg-api-js

const GeotabApi = require('mg-api-js');
const authentication = {
	credentials: {
		database: "<your database>",
		userName: "your userName>",
		password: "<your password>"
	},
	path: "altitudeserver.geotab.com"
};

const api = new GeotabApi(authentication);

function getMyGeotabBQData(serviceName, functionName, functionParameters) {
  return new Promise(function (resolve, reject) {
      api.call("GetAltitudeData", {
          serviceName,
          functionName,
          functionParameters,
        })
        .then((result) => {
          resolve(result);
        })
        .catch((error) => {
          reject(error);
        });
    });
  }

async function createQueryJob(functionParameters) {
  try {
    const { apiResult, errors } = await getMyGeotabBQData(
      "dna-altitude-general",
      "createQueryJob",
      functionParameters
    );

    if (errors && errors.length) {
      return [];
    }

    if (apiResult.errors && apiResult.errors.length) {
      return apiResult.errors;
    }

    return apiResult.results[0]; //jobId will be at apiResult.results[0].id
  } catch (err) {
    return { error: err };
  }
}




async function waitForBigQueryJobToComplete(functionParameters) {
  try {
    const { apiResult, errors } = await getMyGeotabBQData(
      "dna-altitude-general",
      "getJobStatus",
      functionParameters
    );

    if (errors && errors.length) {
      console.error(errors);
      return [];
    }

    const job = apiResult.results[0];
    if (
      job &&
      job.status &&
      job.status.state !== "DONE"
    ) {
      await sleep(5000); //poll every 5 seconds to see if job is done
      return await waitForBigQueryJobToComplete(bqParams);
    }

    return job;
  } catch (err) {
    return { error: err };
  }
}

const sleep = async (time) => await new Promise((r) => setTimeout(r, time));

async function* fetchBigQueryData(functionParameters) {
  let index = 1;
  while(index > 0){
    try {
      const { apiResult } = await getMyGeotabBQData(
        "dna-altitude-general",
        "getQueryResults",
        functionParameters
      );
      const { error, rows, pageToken, totalRows } = apiResult.results[0];
      if (error && error.length) {
        throw new Error(error);
      }

      functionParameters["pageToken"] = pageToken;
      yield { data: [rows, totalRows, index] };
      index++;

      if (!pageToken) {
        break;
      }

    } catch (err) {
      yield { error: err, data: [] };
    }
  }
}



// How to call the above functions in 3 parts:

const job = await createQueryJob(functionParameters); //create the job

const results = await waitForBigQueryJobToComplete({
    ...functionParameters,
    jobId: job.id,
}); //wait for the job to finish by polling to see if it's done

const jobIterator = fetchBigQueryData({
    ...functionParameters,
    resultsLimit: 50000,
    jobId: job.id,
}); //get the results once done





//loop through the results one page at a time and append to results array
let data = [];
for await (const {
    data: [page],
    error: error,
} of jobIterator) {
    if (error) {
        return;
    }
    data = [].concat.apply([], [results, page]);
}

console.log(data);

Generating Shapefiles using API results with Python

Below is an example of generating a Shapefile based on API results. You can call any API (special case or regular) that returns geography to generate a Shapefile. The examples below leverage the getCityBoundaries and getRoadSegments APIs.

getCityBoundaries API Example
import shutil
import mygeotab
import pandas as pd
import geopandas as gpd
from shapely.geometry import shape

import warnings
warnings.filterwarnings('ignore')

# Call API
# ----------------------------------------------------------------------------------------
API_params = {
    'username': '<username>',
    'password': '<password>',
    'database': '<database>',
      'server': '<server>'
}

api = mygeotab.API(**API_params)
api.authenticate()
request = {
    "method":"GetAltitudeData",
    "params": {
        "serviceName": "dna-altitude-general",
        "functionName": "getData",          
        "functionParameters":{
            "queryType": "getCityBoundaries", 
            "zones": [<your zones>]
        }
    }
}
results = api.call(method=request['method'], **request['params'])['apiResult']['results']

shapefile_name = '<shapefile_name>'

# Generate ShapeFile zip
# ----------------------------------------------------------------------------------------
df_results = pd.DataFrame(results)
df_results['Bounds'] = df_results.apply(lambda x: shape(eval(x['Bounds'])), axis=1)
df_results = gpd.GeoDataFrame(df_results[['CentreLongitude', 'CentreLatitude', 'MinLongitude', 'MinLatitude', 'MaxLongitude', 'MaxLatitude']], geometry=gpd.GeoSeries(df_results['Bounds'].values))
_ = df_results.to_file(filename=shapefile_name, driver='ESRI Shapefile')

_ = shutil.make_archive(shapefile_name, 'zip', shapefile_name)
getRoadSegments API Example (special case API with geography output)
import json
import time
import shutil

import mygeotab

import pandas as pd
import geopandas as gpd

from shapely.geometry import shape

import warnings
warnings.filterwarnings('ignore')

class API():
    def __init__(self, params):
        self.username = params['username']
        self.password = params['password']
        self.database = params['database']
        self.server = params['server']
        self.jobId = None
        _ = self._authenticate()
        
    def _authenticate(self):
        self.client = mygeotab.API(username=self.username, password=self.password, database=self.database, server=self.server)
        
    def get_data(self, service_name: str, function_name: str, function_parameters: dict) -> dict:
        results = self.client.call("GetAltitudeData",
                                   serviceName=service_name,
                                   functionName=function_name,
                                   functionParameters=function_parameters)
        return results
    
    def create_job(self, params: dict) -> dict:
        results = self.get_data(service_name="dna-altitude-general",
                                function_name="createQueryJob",
                                function_parameters=params)

        self.jobId = results["apiResult"]["results"][0]["id"]
        return results["apiResult"]["results"][0]
    
    def wait_for_job_to_complete(self, params: dict) -> dict:
        results = self.get_data(service_name="dna-altitude-general",
                                function_name="getJobStatus",
                                function_parameters=params
                                )['apiResult']

        results = results["results"][0]
        if results and results["status"] and results["status"]["state"] != "DONE":
            time.sleep(5)
            return self.wait_for_job_to_complete(params)

        return results
    
    def fetch_data(self, params: dict) -> dict:
        index = 1
        while index is not None:
        	results = self.get_data(service_name="dna-altitude-general",
                                function_name="getQueryResults",
                                function_parameters=params
                                )['apiResult']

        	error = results["results"][0].get("error", None)
        	rows = results["results"][0].get("rows", None)
        	page_token = results["results"][0].get("pageToken", None)
        	total_rows = results["results"][0].get("totalRows", None)


        	params["pageToken"] = page_token
        	yield {"data": [rows, total_rows, index], "error": error}
        	index += 1
        	if not page_token:
            		index = None
            		yield
example_params = {
    "queryType": "getRoadSegments", 
    "zones": [<your zones>],
    "asOfDate": "2023-03-31",
    "excludeServiceRoads": True,
    "roadTypes": ["motorway", "trunk", "primary", "secondary", "tertiary"],
    "resultsLimit": 50000
}
client_params = {'username': '<username>',
                 'password': '<password>',
                 'database': '<database>', 
                 'server': '<server>'
                }
api = API(client_params)

job = api.create_job(example_params)
example_params["jobId"] = api.jobId
results = api.wait_for_job_to_complete(example_params)
results_iterator = api.fetch_data(example_params)

data = []
for data_page in results_iterator:
    page = [] if data_page is None else data_page["data"][0]
    data.extend(page)

df = pd.DataFrame(data)
df = df.astype(str)
df['Geography'] = df.apply(lambda x: shape(eval(x['Geography'])), axis=1)
gpd_df = gpd.GeoDataFrame(df[[x for x in df.columns if x != 'Geography']], geometry=gpd.GeoSeries(df['Geography'].values))

gpd_df.to_file(filename='ShapeFile', driver='ESRI Shapefile')
_ = shutil.make_archive('test_file', 'zip', 'ShapeFile')

How to Retrieve Time Series Results

For APIs with a "generateTimeSeries" parameter, retrieve time series results by making a separate “getSavedResults” call from the standard results call. Each getSavedResults call creates its own job with a unique jobId, and you must track and use separate jobIds for standard vs. time series results. To get the time series results, call “getSavedResults” and pass the job id of the parent analysis into the “analysisJobId” parameter and the queryType of the parent plus “TimeSeries” to the “analysisType” parameter.

Note:

This API call will only retrieve the time series results. In order to retrieve the results of the getOriginDestinationMatrix call, follow the process in the getOriginDestinationMatrix call.

For example, let’s say you run a “getOriginDestinationMatrix” analysis with generateTimeSeries set to true and jobId 1234. In order to get the time series results, you would call the “getSavedResults” API with the following function parameters:

"functionParameters": {
"queryType": "getSavedResults",
	"jobId": "<to be determined>",		
	"analysisJobId": "1234",
	"analysisType":"getOriginDestinationMatrixTimeSeries"
}