read blob data

private async Task<string> GetBlobMessage(string deviceType, ReplayMessage replayMessage, string connectionString, Models.Platform platformDetails)
{
string userName = HttpContext.User.Identity.Name;

Microsoft.Azure.Devices.Device device;
registryManager = RegistryManager.CreateFromConnectionString(connectionString);
string year = replayMessage.Year.ToString();
string date = String.Format(“{0:D2}”, replayMessage.Date);
string month = String.Format(“{0:D2}”, replayMessage.Month);
string hour = String.Format(“{0:D2}”, replayMessage.Hour);
telemetry.TrackEvent(“BlobMessage started “);
//string time = month + “/” + date + “/” + year + ” ” + String.Format(“{0:D2}”, replayMessage.Hour);
JArray replayMessageMetadataArray = new JArray();
JArray replayMessageTelemetryArray = new JArray();
JArray replayMessageEventArray = new JArray();
string messageDiv = string.Empty;
try
{
CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();
CloudBlobContainer container = blobClient.GetContainerReference(“ecolabempowered”);

string directoryreference = “coldstorage/rawjson/meta/” + year + “/” + month + “/” + date + “/” + hour;
var blobDirectory = container.GetDirectoryReference(directoryreference);
IEnumerable<IListBlobItem> items = blobDirectory.ListBlobs(useFlatBlobListing: true);

foreach (var fileitem in items.OfType<CloudBlob>())
{
CloudBlockBlob cbb = container.GetBlockBlobReference(fileitem.Name);

using (MemoryStream stream = new MemoryStream())
{
cbb.DownloadToStream(stream);
string messageData = System.Text.Encoding.UTF8.GetString(stream.ToArray());
messageData = messageData.TrimEnd();
if (!messageData.EndsWith(“]”))
{
messageData = messageData + “]”;
}

JArray messageDataArray = JArray.Parse(messageData);
//string months = String.Format(“{0}”, replayMessage.Month);
//string time = months + “/” + date + “/” + year + ” ” + String.Format(“{0}”, replayMessage.Hour);
DateTime value = new DateTime(replayMessage.Year, replayMessage.Month, replayMessage.Date, replayMessage.Hour, 0, 0);
value = value.AddSeconds(-1 * value.Second);
value = value.AddMinutes(-1 * value.Minute);

foreach (JObject obj in messageDataArray.Children<JObject>())
{
DateTime time1 = Convert.ToDateTime(obj[“EventProcessedUtcTime”]);
time1 = time1.AddSeconds(-1 * time1.Second);
time1 = time1.AddMinutes(-1 * time1.Minute);
if (obj[“messageType”].ToString().ToUpper().Contains(deviceType.ToUpper()) && value.ToString().Contains(time1.ToString()))
{
if (obj[“messageType”].ToString().Contains(“Metadata”))
{
replayMessageMetadataArray.Add(obj);
}
else if (obj[“messageType”].ToString().Contains(“Telemetry”))
{
replayMessageTelemetryArray.Add(obj);
}
}
}
}
}

directoryreference = “coldstorage/rawjson/events/” + year + “/” + month + “/” + date + “/” + hour;
blobDirectory = container.GetDirectoryReference(directoryreference);
IEnumerable<IListBlobItem> eventitems = blobDirectory.ListBlobs(useFlatBlobListing: true);

foreach (var fileitem in eventitems.OfType<CloudBlob>())
{
CloudBlockBlob cbb = container.GetBlockBlobReference(fileitem.Name);

using (MemoryStream stream = new MemoryStream())
{
cbb.DownloadToStream(stream);
string messageData = System.Text.Encoding.UTF8.GetString(stream.ToArray());
messageData = messageData.TrimEnd();
if (!messageData.EndsWith(“]”))
{
messageData = messageData + “]”;
}

JArray messageDataArray = JArray.Parse(messageData);
DateTime value = new DateTime(replayMessage.Year, replayMessage.Month, replayMessage.Date, replayMessage.Hour, 0, 0);
value = value.AddSeconds(-1 * value.Second);
value = value.AddMinutes(-1 * value.Minute);

foreach (JObject obj in messageDataArray.Children<JObject>())
{
DateTime time1 = Convert.ToDateTime(obj[“EventProcessedUtcTime”]);
time1 = time1.AddSeconds(-1 * time1.Second);
time1 = time1.AddMinutes(-1 * time1.Minute);
if (obj[“messageType”].ToString().ToUpper().Contains(deviceType.ToUpper()) && value.ToString().Contains(time1.ToString()))
{
replayMessageEventArray.Add(obj);
}
}
}
}

if (replayMessageEventArray.ToList().Count == 0 && replayMessageMetadataArray.ToList().Count == 0 && replayMessageTelemetryArray.ToList().Count == 0)
{
//NotificationSend.SendToUser(“No data found”);
messageDiv = “Error”;
return messageDiv;
}

string deviceId = string.Empty;
string messageType = string.Empty;

if (replayMessageMetadataArray.ToList().Count != 0)
{
foreach (var messageObject in replayMessageMetadataArray.Children<JObject>())
{
foreach (var pair in messageObject)
{
string key1 = pair.Key.ToString();
if (Regex.IsMatch(key1, “deviceID”, RegexOptions.IgnoreCase))
{
deviceId = messageObject[key1].ToString();
}
if (Regex.IsMatch(key1, “messageType”, RegexOptions.IgnoreCase))
{
messageType = messageObject[key1].ToString();
}
}
device = await registryManager.GetDeviceAsync(deviceId);
if (device != null)
{
SendMessagesFromDeviceToCloud(deviceId, device.Authentication.SymmetricKey.PrimaryKey, platformDetails.Uri, messageObject.ToString(), messageType);
}
else
{
messageDiv = “Device not found”;
}
}
}
if (replayMessageTelemetryArray.ToList().Count != 0)
{
foreach (var messageObject in replayMessageTelemetryArray.Children<JObject>())
{
foreach (var pair in messageObject)
{
string key1 = pair.Key.ToString();
if (Regex.IsMatch(key1, “deviceID”, RegexOptions.IgnoreCase))
{
deviceId = messageObject[key1].ToString();
}
if (Regex.IsMatch(key1, “messageType”, RegexOptions.IgnoreCase))
{
messageType = messageObject[key1].ToString();
}
}
device = await registryManager.GetDeviceAsync(deviceId);
if (device != null)
{
SendMessagesFromDeviceToCloud(deviceId, device.Authentication.SymmetricKey.PrimaryKey, platformDetails.Uri, messageObject.ToString(), messageType);
}
else
{
messageDiv = “Device not found”;
}
}
}
if (replayMessageEventArray.ToList().Count != 0)
{
foreach (var messageObject in replayMessageEventArray.Children<JObject>())
{
foreach (var pair in messageObject)
{
string key1 = pair.Key.ToString();
if (Regex.IsMatch(key1, “deviceID”, RegexOptions.IgnoreCase))
{
deviceId = messageObject[key1].ToString();
}
if (Regex.IsMatch(key1, “messageType”, RegexOptions.IgnoreCase))
{
messageType = messageObject[key1].ToString();
}
}
device = await registryManager.GetDeviceAsync(deviceId);
if (device != null)
{
SendMessagesFromDeviceToCloud(deviceId, device.Authentication.SymmetricKey.PrimaryKey, platformDetails.Uri, messageObject.ToString(), messageType);
}
else
{
messageDiv = “Device not found”;
}
}
}
}

catch (Exception ex)
{
messageDiv = “Error”;
telemetry.TrackTrace(“BlobMessage-Exception” + ex.Message);
}
if(string.IsNullOrEmpty(messageDiv))
{
messageDiv = “Success”;
}
return messageDiv;
}

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s